Aleksandr Iushmanov created FLINK-32069: -------------------------------------------
Summary: jobClient.getJobStatus() can return status RUNNING for finished insert operation Key: FLINK-32069 URL: https://issues.apache.org/jira/browse/FLINK-32069 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.15.4, 1.16.1 Reporter: Aleksandr Iushmanov Using zeppelin with remote cluster I came across some race condition issue leading to failed expectations for SQL insert operations. Below is an example of zeppelin code that is failing because jobClient.getJobStatus() returns running even after job has finished. I have verified that same failover can happen if I use jobClient.getJobExecutionResult().get() (Job execution result is: "Program execution finished" but job status is not consistently finished) {code:java} TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); checkState(tableResult.getJobClient().isPresent()); try { tableResult.await(); JobClient jobClient = tableResult.getJobClient().get(); if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { context.out.write("Insertion successfully.\n"); } else { throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); } } catch (InterruptedException e) { throw new IOException("Flink job is interrupted", e); } catch (ExecutionException e) { throw new IOException("Flink job is failed", e); } {code} [ https://github.com/apache/flink/pull/18189|https://github.com/apache/flink/pull/18189]. ZeppelinCode: [https://github.com/apache/zeppelin/blob/master/flink/flink1.15-shims/src/main/java/org/apache/zeppelin/flink/Flink115SqlInterpreter.java#L384] TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); checkState(tableResult.getJobClient().isPresent()); try \{ tableResult.await(); JobClient jobClient = tableResult.getJobClient().get(); if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { context.out.write("Insertion successfully.\n"); } else \{ throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); } } catch (InterruptedException e) \{ throw new IOException("Flink job is interrupted", e); } catch (ExecutionException e) \{ throw new IOException("Flink job is failed", e); } I suspect that job status is returned based on runningJobsRegistry and since 1.15 this registry is not updated with FINISHED status prior to job result future completion, see this change: {{JobMasterServiceLeadershipRunner.java}} [https://github.com/apache/flink/pull/18189/files#diff-3eb433f18b85c0f5329a4b312a219583189d777fe9bdd547f1114f4a22989f8bL387] It looks like as race condition that is hard to reproduce on lightweight setup. I was reproducing this running zeppelin notebook with remote flink cluster and triggering SQL insert operation. If I find a smaller setup to reproduce on small local cluster with lightweight client, I will update this ticket when I have more input. I am open to suggestions on how to fix this. For Zeppelin I have a separate ticket because Flink 1.15 is not going to be fixed but this issue if I understand it correctly should be common for all versions starting 1.15, therefore it makes sense to address this starting 1.16. https://issues.apache.org/jira/browse/ZEPPELIN-5909 [~mapohl], Thank you for assistance in slack, I have created this ticket to back our conversation, could you please add your thoughts on this failure mode? One possible solution would be to have additional check for presence of JobResult in Result store before returning jobStatus (if there is a result, job shouldn't be reported as running based on this documentation: https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/table/api/TableResult.html#await--) -- This message was sent by Atlassian Jira (v8.20.10#820010)