tillrohrmann commented on a change in pull request #13540:
URL: https://github.com/apache/flink/pull/13540#discussion_r499630797



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
##########
@@ -39,6 +39,8 @@
 
        private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
 
+       private final CompletableFuture<Void> closeAsyncCalledFuture = new 
CompletableFuture<>();

Review comment:
       I would suggest to use a `OneShotLatch`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -350,6 +350,10 @@ public void testJobSubmissionUnderSameJobId() throws 
Exception {
                final TestingJobManagerRunner testingJobManagerRunner = 
jobManagerRunnerFactory.takeCreatedJobManagerRunner();
                testingJobManagerRunner.completeResultFutureExceptionally(new 
JobNotFinishedException(jobId));
 
+               // wait until termination JobManagerRunner closeAsync has been 
called.
+               // this is necessary to avoid race conditions with completion 
of the 1st job and the submission of the 2nd job 
(DuplicateJobSubmissionException).
+               testingJobManagerRunner.getCloseAsyncCalledFuture().get();
+

Review comment:
       I agree that relying on the fact that the testing main thread will 
enqueue the handleAsync payload before the `submitJob` is not a safe 
assumption. Hence, the fix should be fine. Still I would like to understand 
what exactly is going wrong here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -350,6 +350,10 @@ public void testJobSubmissionUnderSameJobId() throws 
Exception {
                final TestingJobManagerRunner testingJobManagerRunner = 
jobManagerRunnerFactory.takeCreatedJobManagerRunner();
                testingJobManagerRunner.completeResultFutureExceptionally(new 
JobNotFinishedException(jobId));
 
+               // wait until termination JobManagerRunner closeAsync has been 
called.
+               // this is necessary to avoid race conditions with completion 
of the 1st job and the submission of the 2nd job 
(DuplicateJobSubmissionException).
+               testingJobManagerRunner.getCloseAsyncCalledFuture().get();
+

Review comment:
       I don't fully understand why this additional synchronization step is 
necessary. If I am not mistaken, then 
`testingJobManagerRunner.completeResultFutureExceptionally` won't trigger 
`Dispatcher.jobNotFinished` directly but at least it will enqueue the 
`RunAsync` message which will run this task into the mailbox of the 
`Dispatcher`. `dispatcherGateway.submitJob` should do the same just that the 
submit message is enqueued after the `RunAsync` message.
   
   Could you show me an execution order in which the `submitJob` RPC call is 
executed before the `handleAsync` 
(https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L374)?
   
   Could you reproduce the problem locally?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to