[ 
https://issues.apache.org/jira/browse/FLINK-32069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17722692#comment-17722692
 ] 

Matthias Pohl commented on FLINK-32069:
---------------------------------------

I already mentioned it in Slack and will add the comment for completeness here:

Could it be also related to the {{ResultProvider}} that is used within the 
{{{}TableResult{}}}? That {{ResultProvider}} seems to depend on the operations 
that are processed. It is used within the {{TableResult.await()}} that calls 
[TableResultImpl.awaitInternal(..)|https://github.com/apache/flink/blob/c8e6a6f1dc7b7f61e578aa799ee96d383479a9e4/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java#L93].
 The latter one waits for {{ResultProvider.isFirstRowReady}} to return true in 
[TableResultImpl:105|https://github.com/apache/flink/blob/c8e6a6f1dc7b7f61e578aa799ee96d383479a9e4/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java#L105].
 I see three {{ResultProvider}} implementations:
 * {{CollectResultProvider}} waits for all the data to be collected before 
returning {{true}}
 * {{StaticResultProvider}} returns {{true}} right away
 * But {{InsertResultProvider}} returns {{true}} after the first data is 
processed (which would, in theory, allow the job to still process data)

{{InsertResultProvider}} was also added in 1.15.

> jobClient.getJobStatus() can return status RUNNING for finished insert 
> operation
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-32069
>                 URL: https://issues.apache.org/jira/browse/FLINK-32069
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.16.1, 1.15.4
>            Reporter: Aleksandr Iushmanov
>            Priority: Major
>
> Using zeppelin with remote cluster I came across some race condition issue 
> leading to failed expectations for SQL insert operations. 
>  
> Below is an example of zeppelin code that is failing because 
> jobClient.getJobStatus() returns running even after job has finished. I have 
> verified that same failover can happen if I use 
> jobClient.getJobExecutionResult().get() (Job execution result is: "Program 
> execution finished" but job status is not consistently finished)
> {code:java}
> TableResult tableResult = ((TableEnvironmentInternal) 
> tbenv).executeInternal(operations);
>     checkState(tableResult.getJobClient().isPresent());
>     try {
>       tableResult.await();
>       JobClient jobClient = tableResult.getJobClient().get();
>       if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
>         context.out.write("Insertion successfully.\n");
>       } else {
>         throw new IOException("Job is failed, " + 
> jobClient.getJobExecutionResult().get().toString());
>       }
>     } catch (InterruptedException e) {
>       throw new IOException("Flink job is interrupted", e);
>     } catch (ExecutionException e) {
>       throw new IOException("Flink job is failed", e);
>     } {code}
>  ZeppelinCode: 
> [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384]
> I suspect that job status is returned based on runningJobsRegistry and since 
> 1.15 this registry is not updated with FINISHED status prior to job result 
> future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} 
> [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387]
>  
>  
> It looks like as race condition that is hard to reproduce on lightweight 
> setup. I was reproducing this running zeppelin notebook with remote flink 
> cluster and triggering SQL insert operation. If I find a smaller setup to 
> reproduce on small local cluster with lightweight client, I will update this 
> ticket when I have more input. I am open to suggestions on how to fix this. 
>  
> For Zeppelin I have a separate ticket because Flink 1.15 is not going to be 
> fixed but this issue if I understand it correctly should be common for all 
> versions starting 1.15, therefore it makes sense to address this starting 
> 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909
>  
> [~mapohl], Thank you for assistance in slack, I have created this ticket to 
> back our  conversation, could you please add your thoughts on this failure 
> mode?
>  
> One possible solution would be to have additional check for presence of 
> JobResult in Result store before returning jobStatus (if there is a result, 
> job shouldn't be reported as running based on this documentation: 
> [https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to