[ https://issues.apache.org/jira/browse/FLINK-30596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686006#comment-17686006 ]
Mohsen Rezaei commented on FLINK-30596: --------------------------------------- I was hoping to shine some light on this issue given that it affects any application using the REST endpoints to run/submit jobs, causing confusion on the actual state of the submitted jobs in a cluster. I've created a PR against the {{master}} branch, and I'd like to be able to port it back to {{1.16.2}} and {{1.15.4}} to close the loop on the issue for the last three major releases: https://github.com/apache/flink/pull/21849 > Multiple POST /jars/:jarid/run requests with the same jobId, runs duplicate > jobs > -------------------------------------------------------------------------------- > > Key: FLINK-30596 > URL: https://issues.apache.org/jira/browse/FLINK-30596 > Project: Flink > Issue Type: Bug > Components: Runtime / REST > Affects Versions: 1.17.0, 1.15.3, 1.16.1 > Reporter: Mohsen Rezaei > Priority: Critical > Labels: pull-request-available > Fix For: 1.17.0, 1.15.4, 1.16.2 > > > Analysis from [~trohrmann]: > {quote} > The problem is the following: When submitting a job, then the {{Dispatcher}} > will wait for the termination of a previous {{JobMaster}}. This is done to > enable the proper cleanup of the job resources. In the initial submission > case, there is no previous {{JobMaster}} with the same {{jobId}}. The problem > is now that Flink schedules the > [{{persistAndRunJob}}|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L571] > action, which runs the newly submitted job, as [an asynchronous > task|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1312-L1318]. > This is done to ensure that the action is run on the {{Dispatcher}}'s main > thread since the termination future can be run on a different thread. Due to > this behaviour, there can be other tasks enqueued in the {{Dispatcher}}'s > work queue which are executed before. Such a task could be another job > submission which wouldn't see that there is already a job submitted with the > same {{jobId}} since [we only do this in > {{runJob}}|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L602] > which is called by {{persistAndRunJob}}. This is the reason why you don't > see a duplicate job submission exception for the second job submission. Even > worse, this will eventually [lead to an invalid > state|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L611-L615] > and fail the whole cluster entrypoint. > {quote} > The following fix to the {{Dispatcher}} seems to fix the issue, but before > submitting a PR, I wanted to post this for possible follow up discussions: > {code:language=java} > private CompletableFuture<Void> waitForTerminatingJob( > JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> > action) { > ... > return FutureUtils.thenAcceptAsyncIfNotDone( > jobManagerTerminationFuture, > getMainThreadExecutor(), > FunctionUtils.uncheckedConsumer( > (ignored) -> { > jobManagerRunnerTerminationFutures.remove(jobId); > action.accept(jobGraph); > })); > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)