tillrohrmann commented on a change in pull request #13540:
URL: https://github.com/apache/flink/pull/13540#discussion_r499630797
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
##########
@@ -39,6 +39,8 @@
private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
+ private final CompletableFuture<Void> closeAsyncCalledFuture = new
CompletableFuture<>();
Review comment:
I would suggest to use a `OneShotLatch`.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -350,6 +350,10 @@ public void testJobSubmissionUnderSameJobId() throws
Exception {
final TestingJobManagerRunner testingJobManagerRunner =
jobManagerRunnerFactory.takeCreatedJobManagerRunner();
testingJobManagerRunner.completeResultFutureExceptionally(new
JobNotFinishedException(jobId));
+ // wait until termination JobManagerRunner closeAsync has been
called.
+ // this is necessary to avoid race conditions with completion
of the 1st job and the submission of the 2nd job
(DuplicateJobSubmissionException).
+ testingJobManagerRunner.getCloseAsyncCalledFuture().get();
+
Review comment:
I agree that relying on the fact that the testing main thread will
enqueue the handleAsync payload before the `submitJob` is not a safe
assumption. Hence, the fix should be fine. Still I would like to understand
what exactly is going wrong here.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -350,6 +350,10 @@ public void testJobSubmissionUnderSameJobId() throws
Exception {
final TestingJobManagerRunner testingJobManagerRunner =
jobManagerRunnerFactory.takeCreatedJobManagerRunner();
testingJobManagerRunner.completeResultFutureExceptionally(new
JobNotFinishedException(jobId));
+ // wait until termination JobManagerRunner closeAsync has been
called.
+ // this is necessary to avoid race conditions with completion
of the 1st job and the submission of the 2nd job
(DuplicateJobSubmissionException).
+ testingJobManagerRunner.getCloseAsyncCalledFuture().get();
+
Review comment:
I don't fully understand why this additional synchronization step is
necessary. If I am not mistaken, then
`testingJobManagerRunner.completeResultFutureExceptionally` won't trigger
`Dispatcher.jobNotFinished` directly but at least it will enqueue the
`RunAsync` message which will run this task into the mailbox of the
`Dispatcher`. `dispatcherGateway.submitJob` should do the same just that the
submit message is enqueued after the `RunAsync` message.
Could you show me an execution order in which the `submitJob` RPC call is
executed before the `handleAsync`
(https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L374)?
Could you reproduce the problem locally?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]