rkhachatryan commented on code in PR #22296:
URL: https://github.com/apache/flink/pull/22296#discussion_r1153443702


##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1127,66 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> 
requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource 
requirements is in progress."));

Review Comment:
   The error handling now is a bit inconsistent:
   - here we throw a regular `ConcurrentModificationException` (so http client 
will get HTTP/500?)
   - in `validateMaxParallelism`, we throw `RestHandlerException`
   
   How about throwing `RestHandlerException` here, maybe with code `409 
Conflict`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -1115,6 +1127,66 @@ private void checkJobClientAliveness() {
         }
     }
 
+    @Override
+    public CompletableFuture<JobResourceRequirements> 
requestJobResourceRequirements(JobID jobId) {
+        return performOperationOnJobMasterGateway(
+                jobId, JobMasterGateway::requestJobResourceRequirements);
+    }
+
+    @Override
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) {
+        if (!pendingJobResourceRequirementsUpdates.add(jobId)) {
+            return FutureUtils.completedExceptionally(
+                    new ConcurrentModificationException(
+                            "Another update to the job [%s] resource 
requirements is in progress."));
+        }
+        return performOperationOnJobMasterGateway(
+                        jobId, JobMasterGateway::getMaxParallelismPerVertex)
+                .thenAccept(
+                        maxParallelismPerJobVertex ->
+                                validateMaxParallelism(
+                                        jobResourceRequirements, 
maxParallelismPerJobVertex))
+                .thenRunAsync(
+                        () -> {
+                            try {
+                                jobGraphWriter.putJobResourceRequirements(
+                                        jobId, jobResourceRequirements);
+                            } catch (Exception e) {
+                                throw new CompletionException(
+                                        "The resource requirements could not 
be persisted and have not been applied. Please retry.",
+                                        e);
+                            }
+                        },
+                        ioExecutor)
+                .thenComposeAsync(
+                        ignored ->
+                                performOperationOnJobMasterGateway(
+                                        jobId,
+                                        jobMasterGateway ->
+                                                
jobMasterGateway.updateJobResourceRequirements(
+                                                        
jobResourceRequirements)),
+                        getMainThreadExecutor())
+                .whenComplete((ack, error) -> 
pendingJobResourceRequirementsUpdates.remove(jobId));

Review Comment:
   **nit**: log error on debug level?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##########
@@ -1164,6 +1153,189 @@ public void 
testRetrieveJobResultAfterSubmissionOfFailedJob() throws Exception {
                                         .hasMessage("Test exception."));
     }
 
+    @Test
+    public void testInvalidResourceRequirementsUpdate() throws Exception {
+        dispatcher =
+                createAndStartDispatcher(
+                        heartbeatServices,
+                        haServices,
+                        JobMasterServiceLeadershipRunnerFactory.INSTANCE);
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+        jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+        // We can try updating the JRR once the scheduler has been started.
+        CommonTestUtils.waitUntilCondition(
+                () -> {
+                    final JobStatus status =
+                            dispatcherGateway.requestJobStatus(jobId, 
TIMEOUT).get();
+                    // need to check for CREATED in case adaptive scheduler is 
used
+                    return status == JobStatus.CREATED || status == 
JobStatus.RUNNING;
+                });
+
+        assertThatFuture(
+                        dispatcherGateway.updateJobResourceRequirements(
+                                jobId, JobResourceRequirements.empty()))
+                .eventuallyFailsWith(ExecutionException.class)
+                .withCauseInstanceOf(RestHandlerException.class);
+    }
+
+    @Test
+    public void 
testJobResourceRequirementsCanBeOnlyUpdatedOnInitializedJobMasters()
+            throws Exception {
+        final JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster =
+                new JobManagerRunnerWithBlockingJobMasterFactory(
+                        this::withMaxParallelismPerVertexResponse);
+        dispatcher = createAndStartDispatcher(heartbeatServices, haServices, 
blockingJobMaster);
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+        jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
+
+        assertThatFuture(
+                        dispatcherGateway.updateJobResourceRequirements(
+                                jobId, JobResourceRequirements.empty()))
+                .eventuallyFailsWith(ExecutionException.class)
+                .withCauseInstanceOf(FlinkJobNotFoundException.class);
+
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+        blockingJobMaster.waitForBlockingInit();
+
+        try {
+            assertThatFuture(
+                            dispatcherGateway.updateJobResourceRequirements(
+                                    jobId, JobResourceRequirements.empty()))
+                    .eventuallyFailsWith(ExecutionException.class)
+                    
.withCauseInstanceOf(UnavailableDispatcherOperationException.class);
+        } finally {
+            // Unblocking the job master in the "finally block" prevents 
getting
+            // stuck during the RPC system tear down in case of test failure.
+            blockingJobMaster.unblockJobMasterInitialization();
+        }
+
+        // We can update the JRR once the job transitions to RUNNING.
+        awaitStatus(dispatcherGateway, jobId, JobStatus.RUNNING);
+        assertThatFuture(
+                        dispatcherGateway.updateJobResourceRequirements(
+                                jobId, getJobRequirements()))
+                .eventuallySucceeds();
+    }
+
+    @Test
+    public void 
testJobResourceRequirementsAreGuardedAgainstConcurrentModification()
+            throws Exception {
+        final CompletableFuture<Acknowledge> blockedUpdatesToJobMasterFuture =
+                new CompletableFuture<>();
+        final JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster =
+                new JobManagerRunnerWithBlockingJobMasterFactory(
+                        builder ->
+                                withMaxParallelismPerVertexResponse(builder)
+                                        
.setUpdateJobResourceRequirementsFunction(
+                                                jobResourceRequirements ->
+                                                        
blockedUpdatesToJobMasterFuture));
+        dispatcher = createAndStartDispatcher(heartbeatServices, haServices, 
blockingJobMaster);
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        // We intentionally perform the test on two jobs to make sure the
+        // concurrent modification is only prevented on the per-job level.
+        final JobGraph firstJobGraph = InstantiationUtil.clone(jobGraph);
+        firstJobGraph.setJobID(JobID.generate());
+        final JobGraph secondJobGraph = InstantiationUtil.clone(jobGraph);
+        secondJobGraph.setJobID(JobID.generate());
+
+        final CompletableFuture<?> firstPendingUpdateFuture =
+                testConcurrentModificationIsPrevented(
+                        dispatcherGateway, blockingJobMaster, firstJobGraph);
+        Assertions.assertThat(firstPendingUpdateFuture).isNotCompleted();
+        final CompletableFuture<?> secondPendingUpdateFuture =
+                testConcurrentModificationIsPrevented(
+                        dispatcherGateway, blockingJobMaster, secondJobGraph);
+        Assertions.assertThat(secondPendingUpdateFuture).isNotCompleted();
+
+        blockedUpdatesToJobMasterFuture.complete(Acknowledge.get());
+        assertThatFuture(firstPendingUpdateFuture).eventuallySucceeds();
+        assertThatFuture(secondPendingUpdateFuture).eventuallySucceeds();

Review Comment:
   **nit**: move both assertions closer to `complete` call - to make it clear 
why they shouldn't be completed yet?
   
   ```
           final CompletableFuture<?> firstPendingUpdateFuture =
                   testConcurrentModificationIsPrevented(
                           dispatcherGateway, blockingJobMaster, firstJobGraph);
           final CompletableFuture<?> secondPendingUpdateFuture =
                   testConcurrentModificationIsPrevented(
                           dispatcherGateway, blockingJobMaster, 
secondJobGraph);
   
           assertThat(firstPendingUpdateFuture).isNotCompleted();
           assertThat(secondPendingUpdateFuture).isNotCompleted();
           blockedUpdatesToJobMasterFuture.complete(Acknowledge.get());
           assertThatFuture(firstPendingUpdateFuture).eventuallySucceeds();
           assertThatFuture(secondPendingUpdateFuture).eventuallySucceeds();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to