Aleksandr Iushmanov created FLINK-32069:
-------------------------------------------

             Summary: jobClient.getJobStatus() can return status RUNNING for 
finished insert operation
                 Key: FLINK-32069
                 URL: https://issues.apache.org/jira/browse/FLINK-32069
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 1.15.4, 1.16.1
            Reporter: Aleksandr Iushmanov


Using zeppelin with remote cluster I came across some race condition issue 
leading to failed expectations for SQL insert operations. 
 
Below is an example of zeppelin code that is failing because 
jobClient.getJobStatus() returns running even after job has finished. I have 
verified that same failover can happen if I use 
jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
execution finished" but job status is not consistently finished)
{code:java}
TableResult tableResult = ((TableEnvironmentInternal) 
tbenv).executeInternal(operations);
    checkState(tableResult.getJobClient().isPresent());
    try {
      tableResult.await();
      JobClient jobClient = tableResult.getJobClient().get();
      if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
        context.out.write("Insertion successfully.\n");
      } else {
        throw new IOException("Job is failed, " + 
jobClient.getJobExecutionResult().get().toString());
      }
    } catch (InterruptedException e) {
      throw new IOException("Flink job is interrupted", e);
    } catch (ExecutionException e) {
      throw new IOException("Flink job is failed", e);
    } {code}
 
[ 
https://github.com/apache/flink/pull/18189|https://github.com/apache/flink/pull/18189].
ZeppelinCode: 
[https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
TableResult tableResult = ((TableEnvironmentInternal) 
tbenv).executeInternal(operations);
    checkState(tableResult.getJobClient().isPresent());
    try \{
      tableResult.await();
      JobClient jobClient = tableResult.getJobClient().get();
      if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
        context.out.write("Insertion successfully.\n");
      } else \{
        throw new IOException("Job is failed, " + 
jobClient.getJobExecutionResult().get().toString());
      }
    } catch (InterruptedException e) \{
      throw new IOException("Flink job is interrupted", e);
    } catch (ExecutionException e) \{
      throw new IOException("Flink job is failed", e);
    }
 I suspect that job status is returned based on runningJobsRegistry and since 
1.15 this registry is not updated with FINISHED status prior to job result 
future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
[https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
 
 
It looks like as race condition that is hard to reproduce on lightweight setup. 
I was reproducing this running zeppelin notebook with remote flink cluster and 
triggering SQL insert operation. If I find a smaller setup to reproduce on 
small local cluster with lightweight client, I will update this ticket when I 
have more input. I am open to suggestions on how to fix this. 
 
For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
fixed but this issue if I understand it correctly should be common for all 
versions starting 1.15, therefore it makes sense to address this starting 1.16. 
https://issues.apache.org/jira/browse/ZEPPELIN-5909
 
[~mapohl], Thank you for assistance in slack, I have created this ticket to 
back our  conversation, could you please add your thoughts on this failure mode?
 
One possible solution would be to have additional check for presence of 
JobResult in Result store before returning jobStatus (if there is a result, job 
shouldn't be reported as running based on this documentation: 
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to