Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5207#discussion_r160980332
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
@@ -104,11 +122,26 @@ protected JobSubmissionResult submitJob(JobGraph
jobGraph, ClassLoader classLoad
} catch (JobSubmissionException e) {
throw new ProgramInvocationException(e);
}
- // don't return just a JobSubmissionResult here, the signature
is lying
- // The CliFrontend expects this to be a JobExecutionResult
- // TOOD: do not exit this method until job is finished
- return new JobExecutionResult(jobGraph.getJobID(), 1,
Collections.emptyMap());
+ final JobExecutionResult jobExecutionResult =
waitForJobExecutionResult(jobGraph.getJobID());
+
+ if (jobExecutionResult.getSerializedThrowable().isPresent()) {
+ final SerializedThrowable serializedThrowable =
jobExecutionResult.getSerializedThrowable().get();
+ final Throwable throwable =
serializedThrowable.deserializeError(classLoader);
+ throw new ProgramInvocationException(throwable);
+ }
+
+ try {
+ // don't return just a JobSubmissionResult here, the
signature is lying
+ // The CliFrontend expects this to be a
JobExecutionResult
+ this.lastJobExecutionResult = new
SerializedJobExecutionResult(
+ jobExecutionResult.getJobId(),
+ jobExecutionResult.getNetRuntime(),
+
jobExecutionResult.getAccumulatorResults()).toJobExecutionResult(classLoader);
--- End diff --
I need the `JobExecutionResult` here; not only accumulators.
---