[
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aleksandr Iushmanov updated FLINK-32069:
----------------------------------------
Description:
Using zeppelin with remote cluster I came across some race condition issue
leading to failed expectations for SQL insert operations.
Below is an example of zeppelin code that is failing because
jobClient.getJobStatus() returns running even after job has finished. I have
verified that same failover can happen if I use
jobClient.getJobExecutionResult().get() (Job execution result is: "Program
execution finished" but job status is not consistently finished)
{code:java}
TableResult tableResult = ((TableEnvironmentInternal)
tbenv).executeInternal(operations);
checkState(tableResult.getJobClient().isPresent());
try {
tableResult.await();
JobClient jobClient = tableResult.getJobClient().get();
if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
context.out.write("Insertion successfully.\n");
} else {
throw new IOException("Job is failed, " +
jobClient.getJobExecutionResult().get().toString());
}
} catch (InterruptedException e) {
throw new IOException("Flink job is interrupted", e);
} catch (ExecutionException e) {
throw new IOException("Flink job is failed", e);
} {code}
ZeppelinCode:
[https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
I suspect that job status is returned based on runningJobsRegistry and since
1.15 this registry is not updated with FINISHED status prior to job result
future completion, see this change: {{JobMasterServiceLeadershipRunner.java}}
[https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
It looks like as race condition that is hard to reproduce on lightweight setup.
I was reproducing this running zeppelin notebook with remote flink cluster and
triggering SQL insert operation. If I find a smaller setup to reproduce on
small local cluster with lightweight client, I will update this ticket when I
have more input. I am open to suggestions on how to fix this.
For Zeppelin I have a separate ticket because Flink 1.15 is not going to be
fixed but this issue if I understand it correctly should be common for all
versions starting 1.15, therefore it makes sense to address this starting 1.16.
https://issues.apache.org/jira/browse/ZEPPELIN-5909
[~mapohl], Thank you for assistance in slack, I have created this ticket to
back our conversation, could you please add your thoughts on this failure mode?
One possible solution would be to have additional check for presence of
JobResult in Result store before returning jobStatus (if there is a result, job
shouldn't be reported as running based on this documentation:
[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])
was:
Using zeppelin with remote cluster I came across some race condition issue
leading to failed expectations for SQL insert operations.
Below is an example of zeppelin code that is failing because
jobClient.getJobStatus() returns running even after job has finished. I have
verified that same failover can happen if I use
jobClient.getJobExecutionResult().get() (Job execution result is: "Program
execution finished" but job status is not consistently finished)
{code:java}
TableResult tableResult = ((TableEnvironmentInternal)
tbenv).executeInternal(operations);
checkState(tableResult.getJobClient().isPresent());
try {
tableResult.await();
JobClient jobClient = tableResult.getJobClient().get();
if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
context.out.write("Insertion successfully.\n");
} else {
throw new IOException("Job is failed, " +
jobClient.getJobExecutionResult().get().toString());
}
} catch (InterruptedException e) {
throw new IOException("Flink job is interrupted", e);
} catch (ExecutionException e) {
throw new IOException("Flink job is failed", e);
} {code}
[
https://github.com/apache/flink/pull/18189|https://github.com/apache/flink/pull/18189].
ZeppelinCode:
[https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
I suspect that job status is returned based on runningJobsRegistry and since
1.15 this registry is not updated with FINISHED status prior to job result
future completion, see this change: {{JobMasterServiceLeadershipRunner.java}}
[https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
It looks like as race condition that is hard to reproduce on lightweight setup.
I was reproducing this running zeppelin notebook with remote flink cluster and
triggering SQL insert operation. If I find a smaller setup to reproduce on
small local cluster with lightweight client, I will update this ticket when I
have more input. I am open to suggestions on how to fix this.
For Zeppelin I have a separate ticket because Flink 1.15 is not going to be
fixed but this issue if I understand it correctly should be common for all
versions starting 1.15, therefore it makes sense to address this starting 1.16.
https://issues.apache.org/jira/browse/ZEPPELIN-5909
[~mapohl], Thank you for assistance in slack, I have created this ticket to
back our conversation, could you please add your thoughts on this failure mode?
One possible solution would be to have additional check for presence of
JobResult in Result store before returning jobStatus (if there is a result, job
shouldn't be reported as running based on this documentation:
[https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])
> jobClient.getJobStatus() can return status RUNNING for finished insert
> operation
> --------------------------------------------------------------------------------
>
> Key: FLINK-32069
> URL: https://issues.apache.org/jira/browse/FLINK-32069
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Coordination
> Affects Versions: 1.16.1, 1.15.4
> Reporter: Aleksandr Iushmanov
> Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue
> leading to failed expectations for SQL insert operations.
>
> Below is an example of zeppelin code that is failing because
> jobClient.getJobStatus() returns running even after job has finished. I have
> verified that same failover can happen if I use
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal)
> tbenv).executeInternal(operations);
> checkState(tableResult.getJobClient().isPresent());
> try {
> tableResult.await();
> JobClient jobClient = tableResult.getJobClient().get();
> if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
> context.out.write("Insertion successfully.\n");
> } else {
> throw new IOException("Job is failed, " +
> jobClient.getJobExecutionResult().get().toString());
> }
> } catch (InterruptedException e) {
> throw new IOException("Flink job is interrupted", e);
> } catch (ExecutionException e) {
> throw new IOException("Flink job is failed", e);
> } {code}
> ZeppelinCode:
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since
> 1.15 this registry is not updated with FINISHED status prior to job result
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}}
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>
>
> It looks like as race condition that is hard to reproduce on lightweight
> setup. I was reproducing this running zeppelin notebook with remote flink
> cluster and triggering SQL insert operation. If I find a smaller setup to
> reproduce on small local cluster with lightweight client, I will update this
> ticket when I have more input. I am open to suggestions on how to fix this.
>
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be
> fixed but this issue if I understand it correctly should be common for all
> versions starting 1.15, therefore it makes sense to address this starting
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>
> [~mapohl], Thank you for assistance in slack, I have created this ticket to
> back our conversation, could you please add your thoughts on this failure
> mode?
>
> One possible solution would be to have additional check for presence of
> JobResult in Result store before returning jobStatus (if there is a result,
> job shouldn't be reported as running based on this documentation:
> [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])
--
This message was sent by Atlassian Jira
(v8.20.10#820010)