Aleksandr Iushmanov created FLINK-32069:
-------------------------------------------
Summary: jobClient.getJobStatus() can return status RUNNING for
finished insert operation
Key: FLINK-32069
URL: https://issues.apache.org/jira/browse/FLINK-32069
Project: Flink
Issue Type: Bug
Components: Runtime / Coordination
Affects Versions: 1.15.4, 1.16.1
Reporter: Aleksandr Iushmanov
Using zeppelin with remote cluster I came across some race condition issue
leading to failed expectations for SQL insert operations.
Below is an example of zeppelin code that is failing because
jobClient.getJobStatus() returns running even after job has finished. I have
verified that same failover can happen if I use
jobClient.getJobExecutionResult().get() (Job execution result is: "Program
execution finished" but job status is not consistently finished)
{code:java}
TableResult tableResult = ((TableEnvironmentInternal)
tbenv).executeInternal(operations);
checkState(tableResult.getJobClient().isPresent());
try {
tableResult.await();
JobClient jobClient = tableResult.getJobClient().get();
if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
context.out.write("Insertion successfully.\n");
} else {
throw new IOException("Job is failed, " +
jobClient.getJobExecutionResult().get().toString());
}
} catch (InterruptedException e) {
throw new IOException("Flink job is interrupted", e);
} catch (ExecutionException e) {
throw new IOException("Flink job is failed", e);
} {code}
[
https://github.com/apache/flink/pull/18189|https://github.com/apache/flink/pull/18189].
ZeppelinCode:
[https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
TableResult tableResult = ((TableEnvironmentInternal)
tbenv).executeInternal(operations);
checkState(tableResult.getJobClient().isPresent());
try \{
tableResult.await();
JobClient jobClient = tableResult.getJobClient().get();
if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
context.out.write("Insertion successfully.\n");
} else \{
throw new IOException("Job is failed, " +
jobClient.getJobExecutionResult().get().toString());
}
} catch (InterruptedException e) \{
throw new IOException("Flink job is interrupted", e);
} catch (ExecutionException e) \{
throw new IOException("Flink job is failed", e);
}
I suspect that job status is returned based on runningJobsRegistry and since
1.15 this registry is not updated with FINISHED status prior to job result
future completion, see this change: {{JobMasterServiceLeadershipRunner.java}}
[https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
It looks like as race condition that is hard to reproduce on lightweight setup.
I was reproducing this running zeppelin notebook with remote flink cluster and
triggering SQL insert operation. If I find a smaller setup to reproduce on
small local cluster with lightweight client, I will update this ticket when I
have more input. I am open to suggestions on how to fix this.
For Zeppelin I have a separate ticket because Flink 1.15 is not going to be
fixed but this issue if I understand it correctly should be common for all
versions starting 1.15, therefore it makes sense to address this starting 1.16.
https://issues.apache.org/jira/browse/ZEPPELIN-5909
[~mapohl], Thank you for assistance in slack, I have created this ticket to
back our conversation, could you please add your thoughts on this failure mode?
One possible solution would be to have additional check for presence of
JobResult in Result store before returning jobStatus (if there is a result, job
shouldn't be reported as running based on this documentation:
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)