Aleksandr Iushmanov created ZEPPELIN-5909:
---------------------------------------------

             Summary: Flink115SqlInterpreter throws IOException on successful 
insert operation
                 Key: ZEPPELIN-5909
                 URL: https://issues.apache.org/jira/browse/ZEPPELIN-5909
             Project: Zeppelin
          Issue Type: Bug
          Components: zeppelin-interpreter
    Affects Versions: 0.11.0
            Reporter: Aleksandr Iushmanov


There is an existing race condition in Flink-1.15 and above that breaks code 
below:
TableResult tableResult = ((TableEnvironmentInternal) 
tbenv).executeInternal(operations);
    checkState(tableResult.getJobClient().isPresent());
    try \{
      tableResult.await();
      JobClient jobClient = tableResult.getJobClient().get();
      if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
        context.out.write("Insertion successfully.\n");
      } else \{
        throw new IOException("Job is failed, " + 
jobClient.getJobExecutionResult().get().toString());
      }
    } catch (InterruptedException e) \{
      throw new IOException("Flink job is interrupted", e);
    } catch (ExecutionException e) \{
      throw new IOException("Flink job is failed", e);
    }
jobClient.getJobStatus() can return status RUNNIG for some short period of time 
when job result already available which leads to IOException thrown from 
zeppelin interpreter:
Job is failed, Program execution finished
 
 
Example of working patch:

{code:java}
private void waitForJobStatusFinishedOrThrow(
        final JobClient jobClient,
        final long timeoutMillis,
        final long retryIntervalMillis)
        throws IOException, InterruptedException, ExecutionException {
  final long startTime = System.currentTimeMillis();
  JobStatus status = jobClient.getJobStatus().get();
  while (status.equals(JobStatus.RUNNING) && System.currentTimeMillis() - 
startTime < timeoutMillis) {
    Thread.sleep(retryIntervalMillis);
    status = jobClient.getJobStatus().get();
  }
  if (!status.equals(JobStatus.FINISHED)) {
    throw new IOException("Job reached terminal state with result: "
            + jobClient.getJobExecutionResult().get().toString()
            + " but job status is not FINISHED after timeout: " + 
timeoutMillis);
  }
} {code}
{code:java}
tableResult.await();
// Check https://i.amazon.com/issues/MERRIMAC-32509 for more details on bug 
investigation and linked upstream
// Jira tickets for Flink 1.16 onwards and Zeppelin support for Flink 1.15 SQL 
interpreter
waitForJobStatusFinishedOrThrow(
        tableResult.getJobClient().get(),
        TimeUnit.SECONDS.toMillis(30),
        TimeUnit.SECONDS.toMillis(1)
);
context.out.write("Insertion successfully.\n"); {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to