Aleksandr Iushmanov created ZEPPELIN-5909: ---------------------------------------------
Summary: Flink115SqlInterpreter throws IOException on successful insert operation Key: ZEPPELIN-5909 URL: https://issues.apache.org/jira/browse/ZEPPELIN-5909 Project: Zeppelin Issue Type: Bug Components: zeppelin-interpreter Affects Versions: 0.11.0 Reporter: Aleksandr Iushmanov There is an existing race condition in Flink-1.15 and above that breaks code below: TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations); checkState(tableResult.getJobClient().isPresent()); try \{ tableResult.await(); JobClient jobClient = tableResult.getJobClient().get(); if (jobClient.getJobStatus().get() == JobStatus.FINISHED) { context.out.write("Insertion successfully.\n"); } else \{ throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString()); } } catch (InterruptedException e) \{ throw new IOException("Flink job is interrupted", e); } catch (ExecutionException e) \{ throw new IOException("Flink job is failed", e); } jobClient.getJobStatus() can return status RUNNIG for some short period of time when job result already available which leads to IOException thrown from zeppelin interpreter: Job is failed, Program execution finished Example of working patch: {code:java} private void waitForJobStatusFinishedOrThrow( final JobClient jobClient, final long timeoutMillis, final long retryIntervalMillis) throws IOException, InterruptedException, ExecutionException { final long startTime = System.currentTimeMillis(); JobStatus status = jobClient.getJobStatus().get(); while (status.equals(JobStatus.RUNNING) && System.currentTimeMillis() - startTime < timeoutMillis) { Thread.sleep(retryIntervalMillis); status = jobClient.getJobStatus().get(); } if (!status.equals(JobStatus.FINISHED)) { throw new IOException("Job reached terminal state with result: " + jobClient.getJobExecutionResult().get().toString() + " but job status is not FINISHED after timeout: " + timeoutMillis); } } {code} {code:java} tableResult.await(); // Check https://i.amazon.com/issues/MERRIMAC-32509 for more details on bug investigation and linked upstream // Jira tickets for Flink 1.16 onwards and Zeppelin support for Flink 1.15 SQL interpreter waitForJobStatusFinishedOrThrow( tableResult.getJobClient().get(), TimeUnit.SECONDS.toMillis(30), TimeUnit.SECONDS.toMillis(1) ); context.out.write("Insertion successfully.\n"); {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)