[
https://issues.apache.org/jira/browse/FLINK-30596?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686006#comment-17686006
]
Mohsen Rezaei edited comment on FLINK-30596 at 2/8/23 5:42 PM:
---------------------------------------------------------------
I was hoping to shine some light on this issue given that it affects any
application using the REST endpoints to run/submit jobs, causing confusion on
the actual state of the submitted jobs in a cluster.
I've created a PR against the {{master}} branch, and I'd like to be able to
port it back to {{1.16.2}} and {{1.15.4}} to close the loop on the issue for
the last three major releases:
https://github.com/apache/flink/pull/21849
Could I get someone from the Runtime team to take a look at the PR?
was (Author: JIRAUSER298550):
I was hoping to shine some light on this issue given that it affects any
application using the REST endpoints to run/submit jobs, causing confusion on
the actual state of the submitted jobs in a cluster.
I've created a PR against the {{master}} branch, and I'd like to be able to
port it back to {{1.16.2}} and {{1.15.4}} to close the loop on the issue for
the last three major releases:
https://github.com/apache/flink/pull/21849
> Multiple POST /jars/:jarid/run requests with the same jobId, runs duplicate
> jobs
> --------------------------------------------------------------------------------
>
> Key: FLINK-30596
> URL: https://issues.apache.org/jira/browse/FLINK-30596
> Project: Flink
> Issue Type: Bug
> Components: Runtime / REST
> Affects Versions: 1.17.0, 1.15.3, 1.16.1
> Reporter: Mohsen Rezaei
> Priority: Critical
> Labels: pull-request-available
> Fix For: 1.17.0, 1.15.4, 1.16.2
>
>
> Analysis from [~trohrmann]:
> {quote}
> The problem is the following: When submitting a job, then the {{Dispatcher}}
> will wait for the termination of a previous {{JobMaster}}. This is done to
> enable the proper cleanup of the job resources. In the initial submission
> case, there is no previous {{JobMaster}} with the same {{jobId}}. The problem
> is now that Flink schedules the
> [{{persistAndRunJob}}|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L571]
> action, which runs the newly submitted job, as [an asynchronous
> task|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L1312-L1318].
> This is done to ensure that the action is run on the {{Dispatcher}}'s main
> thread since the termination future can be run on a different thread. Due to
> this behaviour, there can be other tasks enqueued in the {{Dispatcher}}'s
> work queue which are executed before. Such a task could be another job
> submission which wouldn't see that there is already a job submitted with the
> same {{jobId}} since [we only do this in
> {{runJob}}|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L602]
> which is called by {{persistAndRunJob}}. This is the reason why you don't
> see a duplicate job submission exception for the second job submission. Even
> worse, this will eventually [lead to an invalid
> state|https://github.com/apache/flink/blob/5f924bc84227a3a6c67b44e82c45fe444393f577/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L611-L615]
> and fail the whole cluster entrypoint.
> {quote}
> The following fix to the {{Dispatcher}} seems to fix the issue, but before
> submitting a PR, I wanted to post this for possible follow up discussions:
> {code:language=java}
> private CompletableFuture<Void> waitForTerminatingJob(
> JobID jobId, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?>
> action) {
> ...
> return FutureUtils.thenAcceptAsyncIfNotDone(
> jobManagerTerminationFuture,
> getMainThreadExecutor(),
> FunctionUtils.uncheckedConsumer(
> (ignored) -> {
> jobManagerRunnerTerminationFutures.remove(jobId);
> action.accept(jobGraph);
> }));
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)