rkhachatryan commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1153443702
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1127,66 @@ private void checkJobClientAliveness() {
}
}
+ @Override
+ public CompletableFuture<JobResourceRequirements>
requestJobResourceRequirements(JobID jobId) {
+ return performOperationOnJobMasterGateway(
+ jobId, JobMasterGateway::requestJobResourceRequirements);
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+ JobID jobId, JobResourceRequirements jobResourceRequirements) {
+ if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+ return FutureUtils.completedExceptionally(
+ new ConcurrentModificationException(
+ "Another update to the job [%s] resource
requirements is in progress."));
Review Comment:
The error handling now is a bit inconsistent:
- here we throw a regular `ConcurrentModificationException` (so http client
will get HTTP/500?)
- in `validateMaxParallelism`, we throw `RestHandlerException`
How about throwing `RestHandlerException` here, maybe with code `409
Conflict`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1127,66 @@ private void checkJobClientAliveness() {
}
}
+ @Override
+ public CompletableFuture<JobResourceRequirements>
requestJobResourceRequirements(JobID jobId) {
+ return performOperationOnJobMasterGateway(
+ jobId, JobMasterGateway::requestJobResourceRequirements);
+ }
+
+ @Override
+ public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+ JobID jobId, JobResourceRequirements jobResourceRequirements) {
+ if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+ return FutureUtils.completedExceptionally(
+ new ConcurrentModificationException(
+ "Another update to the job [%s] resource
requirements is in progress."));
+ }
+ return performOperationOnJobMasterGateway(
+ jobId, JobMasterGateway::getMaxParallelismPerVertex)
+ .thenAccept(
+ maxParallelismPerJobVertex ->
+ validateMaxParallelism(
+ jobResourceRequirements,
maxParallelismPerJobVertex))
+ .thenRunAsync(
+ () -> {
+ try {
+ jobGraphWriter.putJobResourceRequirements(
+ jobId, jobResourceRequirements);
+ } catch (Exception e) {
+ throw new CompletionException(
+ "The resource requirements could not
be persisted and have not been applied. Please retry.",
+ e);
+ }
+ },
+ ioExecutor)
+ .thenComposeAsync(
+ ignored ->
+ performOperationOnJobMasterGateway(
+ jobId,
+ jobMasterGateway ->
+
jobMasterGateway.updateJobResourceRequirements(
+
jobResourceRequirements)),
+ getMainThreadExecutor())
+ .whenComplete((ack, error) ->
pendingJobResourceRequirementsUpdates.remove(jobId));
Review Comment:
**nit**: log error on debug level?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -1164,6 +1153,189 @@ public void
testRetrieveJobResultAfterSubmissionOfFailedJob() throws Exception {
.hasMessage("Test exception."));
}
+ @Test
+ public void testInvalidResourceRequirementsUpdate() throws Exception {
+ dispatcher =
+ createAndStartDispatcher(
+ heartbeatServices,
+ haServices,
+ JobMasterServiceLeadershipRunnerFactory.INSTANCE);
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+ // We can try updating the JRR once the scheduler has been started.
+ CommonTestUtils.waitUntilCondition(
+ () -> {
+ final JobStatus status =
+ dispatcherGateway.requestJobStatus(jobId,
TIMEOUT).get();
+ // need to check for CREATED in case adaptive scheduler is
used
+ return status == JobStatus.CREATED || status ==
JobStatus.RUNNING;
+ });
+
+ assertThatFuture(
+ dispatcherGateway.updateJobResourceRequirements(
+ jobId, JobResourceRequirements.empty()))
+ .eventuallyFailsWith(ExecutionException.class)
+ .withCauseInstanceOf(RestHandlerException.class);
+ }
+
+ @Test
+ public void
testJobResourceRequirementsCanBeOnlyUpdatedOnInitializedJobMasters()
+ throws Exception {
+ final JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster =
+ new JobManagerRunnerWithBlockingJobMasterFactory(
+ this::withMaxParallelismPerVertexResponse);
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices,
blockingJobMaster);
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+ jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+ assertThatFuture(
+ dispatcherGateway.updateJobResourceRequirements(
+ jobId, JobResourceRequirements.empty()))
+ .eventuallyFailsWith(ExecutionException.class)
+ .withCauseInstanceOf(FlinkJobNotFoundException.class);
+
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+ blockingJobMaster.waitForBlockingInit();
+
+ try {
+ assertThatFuture(
+ dispatcherGateway.updateJobResourceRequirements(
+ jobId, JobResourceRequirements.empty()))
+ .eventuallyFailsWith(ExecutionException.class)
+
.withCauseInstanceOf(UnavailableDispatcherOperationException.class);
+ } finally {
+ // Unblocking the job master in the "finally block" prevents
getting
+ // stuck during the RPC system tear down in case of test failure.
+ blockingJobMaster.unblockJobMasterInitialization();
+ }
+
+ // We can update the JRR once the job transitions to RUNNING.
+ awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
+ assertThatFuture(
+ dispatcherGateway.updateJobResourceRequirements(
+ jobId, getJobRequirements()))
+ .eventuallySucceeds();
+ }
+
+ @Test
+ public void
testJobResourceRequirementsAreGuardedAgainstConcurrentModification()
+ throws Exception {
+ final CompletableFuture<Acknowledge> blockedUpdatesToJobMasterFuture =
+ new CompletableFuture<>();
+ final JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster =
+ new JobManagerRunnerWithBlockingJobMasterFactory(
+ builder ->
+ withMaxParallelismPerVertexResponse(builder)
+
.setUpdateJobResourceRequirementsFunction(
+ jobResourceRequirements ->
+
blockedUpdatesToJobMasterFuture));
+ dispatcher = createAndStartDispatcher(heartbeatServices, haServices,
blockingJobMaster);
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+
+ // We intentionally perform the test on two jobs to make sure the
+ // concurrent modification is only prevented on the per-job level.
+ final JobGraph firstJobGraph = InstantiationUtil.clone(jobGraph);
+ firstJobGraph.setJobID(JobID.generate());
+ final JobGraph secondJobGraph = InstantiationUtil.clone(jobGraph);
+ secondJobGraph.setJobID(JobID.generate());
+
+ final CompletableFuture<?> firstPendingUpdateFuture =
+ testConcurrentModificationIsPrevented(
+ dispatcherGateway, blockingJobMaster, firstJobGraph);
+ Assertions.assertThat(firstPendingUpdateFuture).isNotCompleted();
+ final CompletableFuture<?> secondPendingUpdateFuture =
+ testConcurrentModificationIsPrevented(
+ dispatcherGateway, blockingJobMaster, secondJobGraph);
+ Assertions.assertThat(secondPendingUpdateFuture).isNotCompleted();
+
+ blockedUpdatesToJobMasterFuture.complete(Acknowledge.get());
+ assertThatFuture(firstPendingUpdateFuture).eventuallySucceeds();
+ assertThatFuture(secondPendingUpdateFuture).eventuallySucceeds();
Review Comment:
**nit**: move both assertions closer to `complete` call - to make it clear
why they shouldn't be completed yet?
```
final CompletableFuture<?> firstPendingUpdateFuture =
testConcurrentModificationIsPrevented(
dispatcherGateway, blockingJobMaster, firstJobGraph);
final CompletableFuture<?> secondPendingUpdateFuture =
testConcurrentModificationIsPrevented(
dispatcherGateway, blockingJobMaster,
secondJobGraph);
assertThat(firstPendingUpdateFuture).isNotCompleted();
assertThat(secondPendingUpdateFuture).isNotCompleted();
blockedUpdatesToJobMasterFuture.complete(Acknowledge.get());
assertThatFuture(firstPendingUpdateFuture).eventuallySucceeds();
assertThatFuture(secondPendingUpdateFuture).eventuallySucceeds();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]