[
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722691#comment-17722691
]
Matthias Pohl commented on FLINK-32069:
---------------------------------------
Thanks for documenting this issue in Jira, [~izeren]. Here are my findings so
far:
I struggle to find a connection between the {{RunningJobRegistry}} and the
{{getJobStatus}} call of the client (which calls
{{Dispatcher.requestJobStatus}} in the end. [~izeren] is right with claiming
that we did a slight modification of the code when removing the
{{RunningJobRegistry}} in
[JobMasterServiceLeadershipRunner:385ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL385-L395].
Marking this job as done happened before completing the
{{JobMasterServiceLeadershipRunner#resultFuture}} through the
{{{}RunningJobsRegistry{}}}. In the current code, we mark the job as completed
after completing {{JobMasterServiceLeadershipRunner#resultFuture}} through the
{{{}JobResultStore{}}}.
My issue is, though, that we're not relying on the {{RunningJobsRegistry}} in
any way for the {{Dispatcher#requestJob}} call. The {{RunningJobsRegistry}} was
only used for leader recovery in
[JobMasterServiceLeadershipRunner.verifyJobSchedulingStatusAndCreateJobMasterServiceProcess:272ff|https://github.com/apache/flink/commit/01b14fc4b9a9487a144f515bb7d4f6ad14cbe013#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL272-L278]
and when submitting a job through
[Dispatcher#isInGloballyTerminalState:375ff|https://github.com/apache/flink/pull/18189/files#diff-a4b690fb2c4975d25b05eb4161617af0d704a85ff7b1cad19d3c817c12f1e29cL375]
in {{{}Dispatcher#submitJob{}}}. I would have expected that we would find
{{Dispatcher#requestJobStatus}} somewhere in the call hierarchy of
{{{}RunningJobsRegistry#getJobSchedulingStatus{}}}, if it would have had an
influence on {{{}Dispatcher#requestJobStatus{}}}.
I don't want to say that [~izeren]'s conclusion is wrong, yet. It just doesn't
match my findings in the code. I could perfectly be that I'm missing a codepath
here.
[~dmvk], do you have something to add?
> jobClient.getJobStatus() can return status RUNNING for finished insert
> operation
> --------------------------------------------------------------------------------
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.16.1, 1.15.4
> Reporter: Aleksandr Iushmanov
> Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue
> leading to failed expectations for SQL insert operations.
>
> Below is an example of zeppelin code that is failing because
> jobClient.getJobStatus() returns running even after job has finished. I have
> verified that same failover can happen if I use
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal)
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
> tableResult.await();
> JobClient jobClient = tableResult.getJobClient().get();
> if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
> } else {
> throw new IOException("Job is failed, " +
> jobClient.getJobExecutionResult().get().toString());
> }
> } catch (InterruptedException e) {
> throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
> throw new IOException("Flink job is failed", e);
> } {code}
> ZeppelinCode:
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since
> 1.15 this registry is not updated with FINISHED status prior to job result
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}}
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>
>
> It looks like as race condition that is hard to reproduce on lightweight
> setup. I was reproducing this running zeppelin notebook with remote flink
> cluster and triggering SQL insert operation. If I find a smaller setup to
> reproduce on small local cluster with lightweight client, I will update this
> ticket when I have more input. I am open to suggestions on how to fix this.
>
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be
> fixed but this issue if I understand it correctly should be common for all
> versions starting 1.15, therefore it makes sense to address this starting
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>
> [~mapohl], Thank you for assistance in slack, I have created this ticket to
> back our conversation, could you please add your thoughts on this failure
> mode?
>
> One possible solution would be to have additional check for presence of
> JobResult in Result store before returning jobStatus (if there is a result,
> job shouldn't be reported as running based on this documentation:
> [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])
--
This message was sent by Atlassian Jira
(v8.20.10#820010)