asfgit closed pull request #6742: [FLINK-10400] Fail JobResult if application
finished in CANCELED or FAILED state
URL: https://github.com/apache/flink/pull/6742
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 81cf784441d..3077f183acb 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -94,8 +95,8 @@ public JobSubmissionResult submitJob(JobGraph jobGraph,
ClassLoader classLoader)
try {
return
jobResult.toJobExecutionResult(classLoader);
- } catch (JobResult.WrappedJobException e) {
- throw new ProgramInvocationException("Job
failed", jobGraph.getJobID(), e.getCause());
+ } catch (JobExecutionException e) {
+ throw new ProgramInvocationException("Job
failed", jobGraph.getJobID(), e);
} catch (IOException | ClassNotFoundException e) {
throw new ProgramInvocationException("Job
failed", jobGraph.getJobID(), e);
}
diff --git
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 935a07faf89..86cc52da3b2 100644
---
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -32,6 +32,7 @@
import org.apache.flink.client.program.rest.retry.WaitStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.client.JobSubmissionException;
import
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -263,8 +264,8 @@ public JobSubmissionResult submitJob(JobGraph jobGraph,
ClassLoader classLoader)
try {
this.lastJobExecutionResult =
jobResult.toJobExecutionResult(classLoader);
return lastJobExecutionResult;
- } catch (JobResult.WrappedJobException we) {
- throw new ProgramInvocationException("Job
failed.", jobGraph.getJobID(), we.getCause());
+ } catch (JobExecutionException e) {
+ throw new ProgramInvocationException("Job
failed.", jobGraph.getJobID(), e);
} catch (IOException | ClassNotFoundException e) {
throw new ProgramInvocationException("Job
failed.", jobGraph.getJobID(), e);
}
diff --git
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 75f16c03330..abe59d38bb6 100644
---
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -31,6 +31,7 @@
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -122,6 +123,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -229,6 +231,7 @@ public void testJobSubmitCancelStop() throws Exception {
TestJobExecutionResultHandler testJobExecutionResultHandler =
new TestJobExecutionResultHandler(
JobExecutionResultResponseBody.created(new
JobResult.Builder()
+
.applicationStatus(ApplicationStatus.SUCCEEDED)
.jobId(jobId)
.netRuntime(Long.MAX_VALUE)
.build()));
@@ -351,11 +354,13 @@ public void testSubmitJobAndWaitForExecutionResult()
throws Exception {
new RestHandlerException("should trigger
retry", HttpResponseStatus.SERVICE_UNAVAILABLE),
JobExecutionResultResponseBody.inProgress(),
JobExecutionResultResponseBody.created(new
JobResult.Builder()
+
.applicationStatus(ApplicationStatus.SUCCEEDED)
.jobId(jobId)
.netRuntime(Long.MAX_VALUE)
.accumulatorResults(Collections.singletonMap("testName", new
SerializedValue<>(OptionalFailure.of(1.0))))
.build()),
JobExecutionResultResponseBody.created(new
JobResult.Builder()
+
.applicationStatus(ApplicationStatus.FAILED)
.jobId(jobId)
.netRuntime(Long.MAX_VALUE)
.serializedThrowable(new
SerializedThrowable(new RuntimeException("expected")))
@@ -385,8 +390,10 @@ public void testSubmitJobAndWaitForExecutionResult()
throws Exception {
restClusterClient.submitJob(jobGraph,
ClassLoader.getSystemClassLoader());
fail("Expected exception not thrown.");
} catch (final ProgramInvocationException e) {
- assertThat(e.getCause(),
instanceOf(RuntimeException.class));
- assertThat(e.getCause().getMessage(),
equalTo("expected"));
+ final Optional<RuntimeException> cause =
ExceptionUtils.findThrowable(e, RuntimeException.class);
+
+ assertThat(cause.isPresent(), is(true));
+ assertThat(cause.get().getMessage(),
equalTo("expected"));
}
}
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
index 60ddbe3e475..eb7c4734f1d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
@@ -22,11 +22,13 @@
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.util.FlinkException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.SerializedValue;
@@ -54,6 +56,8 @@
private final JobID jobId;
+ private final ApplicationStatus applicationStatus;
+
private final Map<String, SerializedValue<OptionalFailure<Object>>>
accumulatorResults;
private final long netRuntime;
@@ -64,6 +68,7 @@
private JobResult(
final JobID jobId,
+ final ApplicationStatus applicationStatus,
final Map<String,
SerializedValue<OptionalFailure<Object>>> accumulatorResults,
final long netRuntime,
@Nullable final SerializedThrowable
serializedThrowable) {
@@ -71,6 +76,7 @@ private JobResult(
checkArgument(netRuntime >= 0, "netRuntime must be greater than
or equals 0");
this.jobId = requireNonNull(jobId);
+ this.applicationStatus = requireNonNull(applicationStatus);
this.accumulatorResults = requireNonNull(accumulatorResults);
this.netRuntime = netRuntime;
this.serializedThrowable = serializedThrowable;
@@ -80,13 +86,17 @@ private JobResult(
* Returns {@code true} if the job finished successfully.
*/
public boolean isSuccess() {
- return serializedThrowable == null;
+ return applicationStatus == ApplicationStatus.SUCCEEDED ||
(applicationStatus == ApplicationStatus.UNKNOWN && serializedThrowable == null);
}
public JobID getJobId() {
return jobId;
}
+ public ApplicationStatus getApplicationStatus() {
+ return applicationStatus;
+ }
+
public Map<String, SerializedValue<OptionalFailure<Object>>>
getAccumulatorResults() {
return accumulatorResults;
}
@@ -108,22 +118,40 @@ public long getNetRuntime() {
*
* @param classLoader to use for deserialization
* @return JobExecutionResult
- * @throws WrappedJobException if the JobResult contains a serialized
exception
+ * @throws JobCancellationException if the job was cancelled
+ * @throws JobExecutionException if the job execution did not succeed
* @throws IOException if the accumulator could not be deserialized
* @throws ClassNotFoundException if the accumulator could not
deserialized
*/
- public JobExecutionResult toJobExecutionResult(ClassLoader classLoader)
throws WrappedJobException, IOException, ClassNotFoundException {
- if (serializedThrowable != null) {
- final Throwable throwable =
serializedThrowable.deserializeError(classLoader);
- throw new WrappedJobException(throwable);
- }
+ public JobExecutionResult toJobExecutionResult(ClassLoader classLoader)
throws JobExecutionException, IOException, ClassNotFoundException {
+ if (applicationStatus == ApplicationStatus.SUCCEEDED) {
+ return new JobExecutionResult(
+ jobId,
+ netRuntime,
+ AccumulatorHelper.deserializeAccumulators(
+ accumulatorResults,
+ classLoader));
+ } else {
+ final Throwable cause;
+
+ if (serializedThrowable == null) {
+ cause = null;
+ } else {
+ cause =
serializedThrowable.deserializeError(classLoader);
+ }
+
+ final JobExecutionException exception;
+
+ if (applicationStatus == ApplicationStatus.FAILED) {
+ exception = new JobExecutionException(jobId,
"Job execution failed.", cause);
+ } else if (applicationStatus ==
ApplicationStatus.CANCELED) {
+ exception = new JobCancellationException(jobId,
"Job was cancelled.", cause);
+ } else {
+ exception = new JobExecutionException(jobId,
"Job completed with illegal application status: " + applicationStatus + '.',
cause);
+ }
- return new JobExecutionResult(
- jobId,
- netRuntime,
- AccumulatorHelper.deserializeAccumulators(
- accumulatorResults,
- classLoader));
+ throw exception;
+ }
}
/**
@@ -134,6 +162,8 @@ public JobExecutionResult toJobExecutionResult(ClassLoader
classLoader) throws W
private JobID jobId;
+ private ApplicationStatus applicationStatus =
ApplicationStatus.UNKNOWN;
+
private Map<String, SerializedValue<OptionalFailure<Object>>>
accumulatorResults;
private long netRuntime = -1;
@@ -145,6 +175,11 @@ public Builder jobId(final JobID jobId) {
return this;
}
+ public Builder applicationStatus(final ApplicationStatus
applicationStatus) {
+ this.applicationStatus = applicationStatus;
+ return this;
+ }
+
public Builder accumulatorResults(final Map<String,
SerializedValue<OptionalFailure<Object>>> accumulatorResults) {
this.accumulatorResults = accumulatorResults;
return this;
@@ -163,6 +198,7 @@ public Builder serializedThrowable(final
SerializedThrowable serializedThrowable
public JobResult build() {
return new JobResult(
jobId,
+ applicationStatus,
accumulatorResults == null ?
Collections.emptyMap() : accumulatorResults,
netRuntime,
serializedThrowable);
@@ -188,6 +224,8 @@ public static JobResult createFrom(AccessExecutionGraph
accessExecutionGraph) {
final JobResult.Builder builder = new JobResult.Builder();
builder.jobId(jobId);
+
builder.applicationStatus(ApplicationStatus.fromJobStatus(accessExecutionGraph.getState()));
+
final long netRuntime =
accessExecutionGraph.getStatusTimestamp(jobStatus) -
accessExecutionGraph.getStatusTimestamp(JobStatus.CREATED);
// guard against clock changes
final long guardedNetRuntime = Math.max(netRuntime, 0L);
@@ -204,17 +242,4 @@ public static JobResult createFrom(AccessExecutionGraph
accessExecutionGraph) {
return builder.build();
}
-
- /**
- * Exception which indicates that the job has finished with an {@link
Exception}.
- */
- public static final class WrappedJobException extends FlinkException {
-
- private static final long serialVersionUID =
6535061898650156019L;
-
- public WrappedJobException(Throwable cause) {
- super(cause);
- }
- }
-
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 8054a383739..bbdb099ae0a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -623,8 +623,6 @@ public JobExecutionResult executeJobBlocking(JobGraph job)
throws JobExecutionEx
try {
return
jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
- } catch (JobResult.WrappedJobException e) {
- throw new JobExecutionException(job.getJobID(),
e.getCause());
} catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(job.getJobID(), e);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
index e568f476c7e..8342eb374a2 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.messages.json;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedThrowable;
@@ -68,6 +69,7 @@ public JobResultDeserializer() {
@Override
public JobResult deserialize(final JsonParser p, final
DeserializationContext ctxt) throws IOException {
JobID jobId = null;
+ ApplicationStatus applicationStatus = ApplicationStatus.UNKNOWN;
long netRuntime = -1;
SerializedThrowable serializedThrowable = null;
Map<String, SerializedValue<OptionalFailure<Object>>>
accumulatorResults = null;
@@ -85,6 +87,10 @@ public JobResult deserialize(final JsonParser p, final
DeserializationContext ct
assertNextToken(p,
JsonToken.VALUE_STRING);
jobId =
jobIdDeserializer.deserialize(p, ctxt);
break;
+ case
JobResultSerializer.FIELD_NAME_APPLICATION_STATUS:
+ assertNextToken(p,
JsonToken.VALUE_STRING);
+ applicationStatus =
ApplicationStatus.valueOf(p.getValueAsString().toUpperCase());
+ break;
case JobResultSerializer.FIELD_NAME_NET_RUNTIME:
assertNextToken(p,
JsonToken.VALUE_NUMBER_INT);
netRuntime = p.getLongValue();
@@ -105,6 +111,7 @@ public JobResult deserialize(final JsonParser p, final
DeserializationContext ct
try {
return new JobResult.Builder()
.jobId(jobId)
+ .applicationStatus(applicationStatus)
.netRuntime(netRuntime)
.accumulatorResults(accumulatorResults)
.serializedThrowable(serializedThrowable)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
index 694fa2f529b..cdf3541fe3a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
@@ -44,6 +44,8 @@
static final String FIELD_NAME_JOB_ID = "id";
+ static final String FIELD_NAME_APPLICATION_STATUS =
"application-status";
+
static final String FIELD_NAME_NET_RUNTIME = "net-runtime";
static final String FIELD_NAME_ACCUMULATOR_RESULTS =
"accumulator-results";
@@ -76,6 +78,9 @@ public void serialize(
gen.writeFieldName(FIELD_NAME_JOB_ID);
jobIdSerializer.serialize(result.getJobId(), gen, provider);
+ gen.writeFieldName(FIELD_NAME_APPLICATION_STATUS);
+ gen.writeString(result.getApplicationStatus().name());
+
gen.writeFieldName(FIELD_NAME_ACCUMULATOR_RESULTS);
gen.writeStartObject();
final Map<String, SerializedValue<OptionalFailure<Object>>>
accumulatorResults = result.getAccumulatorResults();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
index 84c9da5e7de..6543fa24b1d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
@@ -19,12 +19,19 @@
package org.apache.flink.runtime.jobmaster;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -66,4 +73,62 @@ public void testIsSuccess() throws Exception {
assertThat(jobResult.isSuccess(), equalTo(true));
}
+ @Test
+ public void testCancelledJobIsFailureResult() {
+ final JobResult jobResult = JobResult.createFrom(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(new JobID())
+ .setState(JobStatus.CANCELED)
+ .build());
+
+ assertThat(jobResult.isSuccess(), is(false));
+ }
+
+ @Test
+ public void testFailedJobIsFailureResult() {
+ final JobResult jobResult = JobResult.createFrom(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(new JobID())
+ .setState(JobStatus.FAILED)
+ .setFailureCause(new ErrorInfo(new
FlinkException("Test exception"), 42L))
+ .build());
+
+ assertThat(jobResult.isSuccess(), is(false));
+ }
+
+ @Test
+ public void testCancelledJobThrowsJobCancellationException() throws
Exception {
+ final FlinkException cause = new FlinkException("Test
exception");
+ final JobResult jobResult = JobResult.createFrom(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(new JobID())
+ .setState(JobStatus.CANCELED)
+ .setFailureCause(new ErrorInfo(cause, 42L))
+ .build());
+
+ try {
+
jobResult.toJobExecutionResult(getClass().getClassLoader());
+ fail("Job should fail with an
JobCancellationException.");
+ } catch (JobCancellationException expected) {
+ assertThat(expected.getCause(), is(equalTo(cause)));
+ }
+ }
+
+ @Test
+ public void testFailedJobThrowsJobExecutionException() throws Exception
{
+ final FlinkException cause = new FlinkException("Test
exception");
+ final JobResult jobResult = JobResult.createFrom(
+ new ArchivedExecutionGraphBuilder()
+ .setJobID(new JobID())
+ .setState(JobStatus.FAILED)
+ .setFailureCause(new ErrorInfo(cause, 42L))
+ .build());
+
+ try {
+
jobResult.toJobExecutionResult(getClass().getClassLoader());
+ fail("Job should fail with JobExecutionException.");
+ } catch (JobExecutionException expected) {
+ assertThat(expected.getCause(), is(equalTo(cause)));
+ }
+ }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
index 9534d2bca7c..c8cc7f3e7c4 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.rest.messages.job;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
import org.apache.flink.util.OptionalFailure;
@@ -64,12 +65,14 @@
return Arrays.asList(new Object[][] {
{JobExecutionResultResponseBody.created(new
JobResult.Builder()
.jobId(TEST_JOB_ID)
+ .applicationStatus(ApplicationStatus.SUCCEEDED)
.netRuntime(TEST_NET_RUNTIME)
.accumulatorResults(TEST_ACCUMULATORS)
.serializedThrowable(new
SerializedThrowable(new RuntimeException("expected")))
.build())},
{JobExecutionResultResponseBody.created(new
JobResult.Builder()
.jobId(TEST_JOB_ID)
+ .applicationStatus(ApplicationStatus.FAILED)
.netRuntime(TEST_NET_RUNTIME)
.accumulatorResults(TEST_ACCUMULATORS)
.build())},
@@ -108,6 +111,7 @@ protected void assertOriginalEqualsToUnmarshalled(
assertNotNull(actualJobExecutionResult);
assertThat(actualJobExecutionResult.getJobId(),
equalTo(expectedJobExecutionResult.getJobId()));
+
assertThat(actualJobExecutionResult.getApplicationStatus(),
equalTo(expectedJobExecutionResult.getApplicationStatus()));
assertThat(actualJobExecutionResult.getNetRuntime(),
equalTo(expectedJobExecutionResult.getNetRuntime()));
assertThat(actualJobExecutionResult.getAccumulatorResults(),
equalTo(expectedJobExecutionResult.getAccumulatorResults()));
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services