[ 
https://issues.apache.org/jira/browse/FLINK-10400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16630088#comment-16630088
 ] 

ASF GitHub Bot commented on FLINK-10400:
----------------------------------------

asfgit closed pull request #6742: [FLINK-10400] Fail JobResult if application 
finished in CANCELED or FAILED state
URL: https://github.com/apache/flink/pull/6742
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 81cf784441d..3077f183acb 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -94,8 +95,8 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, 
ClassLoader classLoader)
 
                        try {
                                return 
jobResult.toJobExecutionResult(classLoader);
-                       } catch (JobResult.WrappedJobException e) {
-                               throw new ProgramInvocationException("Job 
failed", jobGraph.getJobID(), e.getCause());
+                       } catch (JobExecutionException e) {
+                               throw new ProgramInvocationException("Job 
failed", jobGraph.getJobID(), e);
                        } catch (IOException | ClassNotFoundException e) {
                                throw new ProgramInvocationException("Job 
failed", jobGraph.getJobID(), e);
                        }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 935a07faf89..86cc52da3b2 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -32,6 +32,7 @@
 import org.apache.flink.client.program.rest.retry.WaitStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -263,8 +264,8 @@ public JobSubmissionResult submitJob(JobGraph jobGraph, 
ClassLoader classLoader)
                        try {
                                this.lastJobExecutionResult = 
jobResult.toJobExecutionResult(classLoader);
                                return lastJobExecutionResult;
-                       } catch (JobResult.WrappedJobException we) {
-                               throw new ProgramInvocationException("Job 
failed.", jobGraph.getJobID(), we.getCause());
+                       } catch (JobExecutionException e) {
+                               throw new ProgramInvocationException("Job 
failed.", jobGraph.getJobID(), e);
                        } catch (IOException | ClassNotFoundException e) {
                                throw new ProgramInvocationException("Job 
failed.", jobGraph.getJobID(), e);
                        }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 75f16c03330..abe59d38bb6 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -31,6 +31,7 @@
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
@@ -122,6 +123,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -229,6 +231,7 @@ public void testJobSubmitCancelStop() throws Exception {
                TestJobExecutionResultHandler testJobExecutionResultHandler =
                        new TestJobExecutionResultHandler(
                                JobExecutionResultResponseBody.created(new 
JobResult.Builder()
+                                       
.applicationStatus(ApplicationStatus.SUCCEEDED)
                                        .jobId(jobId)
                                        .netRuntime(Long.MAX_VALUE)
                                        .build()));
@@ -351,11 +354,13 @@ public void testSubmitJobAndWaitForExecutionResult() 
throws Exception {
                                new RestHandlerException("should trigger 
retry", HttpResponseStatus.SERVICE_UNAVAILABLE),
                                JobExecutionResultResponseBody.inProgress(),
                                JobExecutionResultResponseBody.created(new 
JobResult.Builder()
+                                       
.applicationStatus(ApplicationStatus.SUCCEEDED)
                                        .jobId(jobId)
                                        .netRuntime(Long.MAX_VALUE)
                                        
.accumulatorResults(Collections.singletonMap("testName", new 
SerializedValue<>(OptionalFailure.of(1.0))))
                                        .build()),
                                JobExecutionResultResponseBody.created(new 
JobResult.Builder()
+                                       
.applicationStatus(ApplicationStatus.FAILED)
                                        .jobId(jobId)
                                        .netRuntime(Long.MAX_VALUE)
                                        .serializedThrowable(new 
SerializedThrowable(new RuntimeException("expected")))
@@ -385,8 +390,10 @@ public void testSubmitJobAndWaitForExecutionResult() 
throws Exception {
                                restClusterClient.submitJob(jobGraph, 
ClassLoader.getSystemClassLoader());
                                fail("Expected exception not thrown.");
                        } catch (final ProgramInvocationException e) {
-                               assertThat(e.getCause(), 
instanceOf(RuntimeException.class));
-                               assertThat(e.getCause().getMessage(), 
equalTo("expected"));
+                               final Optional<RuntimeException> cause = 
ExceptionUtils.findThrowable(e, RuntimeException.class);
+
+                               assertThat(cause.isPresent(), is(true));
+                               assertThat(cause.get().getMessage(), 
equalTo("expected"));
                        }
                }
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
index 60ddbe3e475..eb7c4734f1d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
@@ -22,11 +22,13 @@
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
@@ -54,6 +56,8 @@
 
        private final JobID jobId;
 
+       private final ApplicationStatus applicationStatus;
+
        private final Map<String, SerializedValue<OptionalFailure<Object>>> 
accumulatorResults;
 
        private final long netRuntime;
@@ -64,6 +68,7 @@
 
        private JobResult(
                        final JobID jobId,
+                       final ApplicationStatus applicationStatus,
                        final Map<String, 
SerializedValue<OptionalFailure<Object>>> accumulatorResults,
                        final long netRuntime,
                        @Nullable final SerializedThrowable 
serializedThrowable) {
@@ -71,6 +76,7 @@ private JobResult(
                checkArgument(netRuntime >= 0, "netRuntime must be greater than 
or equals 0");
 
                this.jobId = requireNonNull(jobId);
+               this.applicationStatus = requireNonNull(applicationStatus);
                this.accumulatorResults = requireNonNull(accumulatorResults);
                this.netRuntime = netRuntime;
                this.serializedThrowable = serializedThrowable;
@@ -80,13 +86,17 @@ private JobResult(
         * Returns {@code true} if the job finished successfully.
         */
        public boolean isSuccess() {
-               return serializedThrowable == null;
+               return applicationStatus == ApplicationStatus.SUCCEEDED || 
(applicationStatus == ApplicationStatus.UNKNOWN && serializedThrowable == null);
        }
 
        public JobID getJobId() {
                return jobId;
        }
 
+       public ApplicationStatus getApplicationStatus() {
+               return applicationStatus;
+       }
+
        public Map<String, SerializedValue<OptionalFailure<Object>>> 
getAccumulatorResults() {
                return accumulatorResults;
        }
@@ -108,22 +118,40 @@ public long getNetRuntime() {
         *
         * @param classLoader to use for deserialization
         * @return JobExecutionResult
-        * @throws WrappedJobException if the JobResult contains a serialized 
exception
+        * @throws JobCancellationException if the job was cancelled
+        * @throws JobExecutionException if the job execution did not succeed
         * @throws IOException if the accumulator could not be deserialized
         * @throws ClassNotFoundException if the accumulator could not 
deserialized
         */
-       public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) 
throws WrappedJobException, IOException, ClassNotFoundException {
-               if (serializedThrowable != null) {
-                       final Throwable throwable = 
serializedThrowable.deserializeError(classLoader);
-                       throw new WrappedJobException(throwable);
-               }
+       public JobExecutionResult toJobExecutionResult(ClassLoader classLoader) 
throws JobExecutionException, IOException, ClassNotFoundException {
+               if (applicationStatus == ApplicationStatus.SUCCEEDED) {
+                       return new JobExecutionResult(
+                               jobId,
+                               netRuntime,
+                               AccumulatorHelper.deserializeAccumulators(
+                                       accumulatorResults,
+                                       classLoader));
+               } else {
+                       final Throwable cause;
+
+                       if (serializedThrowable == null) {
+                               cause = null;
+                       } else {
+                               cause = 
serializedThrowable.deserializeError(classLoader);
+                       }
+
+                       final JobExecutionException exception;
+
+                       if (applicationStatus == ApplicationStatus.FAILED) {
+                               exception = new JobExecutionException(jobId, 
"Job execution failed.", cause);
+                       } else if (applicationStatus == 
ApplicationStatus.CANCELED) {
+                               exception = new JobCancellationException(jobId, 
"Job was cancelled.", cause);
+                       } else {
+                               exception = new JobExecutionException(jobId, 
"Job completed with illegal application status: " + applicationStatus + '.', 
cause);
+                       }
 
-               return new JobExecutionResult(
-                       jobId,
-                       netRuntime,
-                       AccumulatorHelper.deserializeAccumulators(
-                               accumulatorResults,
-                               classLoader));
+                       throw exception;
+               }
        }
 
        /**
@@ -134,6 +162,8 @@ public JobExecutionResult toJobExecutionResult(ClassLoader 
classLoader) throws W
 
                private JobID jobId;
 
+               private ApplicationStatus applicationStatus = 
ApplicationStatus.UNKNOWN;
+
                private Map<String, SerializedValue<OptionalFailure<Object>>> 
accumulatorResults;
 
                private long netRuntime = -1;
@@ -145,6 +175,11 @@ public Builder jobId(final JobID jobId) {
                        return this;
                }
 
+               public Builder applicationStatus(final ApplicationStatus 
applicationStatus) {
+                       this.applicationStatus = applicationStatus;
+                       return this;
+               }
+
                public Builder accumulatorResults(final Map<String, 
SerializedValue<OptionalFailure<Object>>> accumulatorResults) {
                        this.accumulatorResults = accumulatorResults;
                        return this;
@@ -163,6 +198,7 @@ public Builder serializedThrowable(final 
SerializedThrowable serializedThrowable
                public JobResult build() {
                        return new JobResult(
                                jobId,
+                               applicationStatus,
                                accumulatorResults == null ? 
Collections.emptyMap() : accumulatorResults,
                                netRuntime,
                                serializedThrowable);
@@ -188,6 +224,8 @@ public static JobResult createFrom(AccessExecutionGraph 
accessExecutionGraph) {
                final JobResult.Builder builder = new JobResult.Builder();
                builder.jobId(jobId);
 
+               
builder.applicationStatus(ApplicationStatus.fromJobStatus(accessExecutionGraph.getState()));
+
                final long netRuntime = 
accessExecutionGraph.getStatusTimestamp(jobStatus) - 
accessExecutionGraph.getStatusTimestamp(JobStatus.CREATED);
                // guard against clock changes
                final long guardedNetRuntime = Math.max(netRuntime, 0L);
@@ -204,17 +242,4 @@ public static JobResult createFrom(AccessExecutionGraph 
accessExecutionGraph) {
 
                return builder.build();
        }
-
-       /**
-        * Exception which indicates that the job has finished with an {@link 
Exception}.
-        */
-       public static final class WrappedJobException extends FlinkException {
-
-               private static final long serialVersionUID = 
6535061898650156019L;
-
-               public WrappedJobException(Throwable cause) {
-                       super(cause);
-               }
-       }
-
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 8054a383739..bbdb099ae0a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -623,8 +623,6 @@ public JobExecutionResult executeJobBlocking(JobGraph job) 
throws JobExecutionEx
 
                try {
                        return 
jobResult.toJobExecutionResult(Thread.currentThread().getContextClassLoader());
-               } catch (JobResult.WrappedJobException e) {
-                       throw new JobExecutionException(job.getJobID(), 
e.getCause());
                } catch (IOException | ClassNotFoundException e) {
                        throw new JobExecutionException(job.getJobID(), e);
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
index e568f476c7e..8342eb374a2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultDeserializer.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.messages.json;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedThrowable;
@@ -68,6 +69,7 @@ public JobResultDeserializer() {
        @Override
        public JobResult deserialize(final JsonParser p, final 
DeserializationContext ctxt) throws IOException {
                JobID jobId = null;
+               ApplicationStatus applicationStatus = ApplicationStatus.UNKNOWN;
                long netRuntime = -1;
                SerializedThrowable serializedThrowable = null;
                Map<String, SerializedValue<OptionalFailure<Object>>> 
accumulatorResults = null;
@@ -85,6 +87,10 @@ public JobResult deserialize(final JsonParser p, final 
DeserializationContext ct
                                        assertNextToken(p, 
JsonToken.VALUE_STRING);
                                        jobId = 
jobIdDeserializer.deserialize(p, ctxt);
                                        break;
+                               case 
JobResultSerializer.FIELD_NAME_APPLICATION_STATUS:
+                                       assertNextToken(p, 
JsonToken.VALUE_STRING);
+                                       applicationStatus = 
ApplicationStatus.valueOf(p.getValueAsString().toUpperCase());
+                                       break;
                                case JobResultSerializer.FIELD_NAME_NET_RUNTIME:
                                        assertNextToken(p, 
JsonToken.VALUE_NUMBER_INT);
                                        netRuntime = p.getLongValue();
@@ -105,6 +111,7 @@ public JobResult deserialize(final JsonParser p, final 
DeserializationContext ct
                try {
                        return new JobResult.Builder()
                                .jobId(jobId)
+                               .applicationStatus(applicationStatus)
                                .netRuntime(netRuntime)
                                .accumulatorResults(accumulatorResults)
                                .serializedThrowable(serializedThrowable)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
index 694fa2f529b..cdf3541fe3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/JobResultSerializer.java
@@ -44,6 +44,8 @@
 
        static final String FIELD_NAME_JOB_ID = "id";
 
+       static final String FIELD_NAME_APPLICATION_STATUS = 
"application-status";
+
        static final String FIELD_NAME_NET_RUNTIME = "net-runtime";
 
        static final String FIELD_NAME_ACCUMULATOR_RESULTS = 
"accumulator-results";
@@ -76,6 +78,9 @@ public void serialize(
                gen.writeFieldName(FIELD_NAME_JOB_ID);
                jobIdSerializer.serialize(result.getJobId(), gen, provider);
 
+               gen.writeFieldName(FIELD_NAME_APPLICATION_STATUS);
+               gen.writeString(result.getApplicationStatus().name());
+
                gen.writeFieldName(FIELD_NAME_ACCUMULATOR_RESULTS);
                gen.writeStartObject();
                final Map<String, SerializedValue<OptionalFailure<Object>>> 
accumulatorResults = result.getAccumulatorResults();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
index 84c9da5e7de..6543fa24b1d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
@@ -19,12 +19,19 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -66,4 +73,62 @@ public void testIsSuccess() throws Exception {
                assertThat(jobResult.isSuccess(), equalTo(true));
        }
 
+       @Test
+       public void testCancelledJobIsFailureResult() {
+               final JobResult jobResult = JobResult.createFrom(
+                       new ArchivedExecutionGraphBuilder()
+                               .setJobID(new JobID())
+                               .setState(JobStatus.CANCELED)
+                               .build());
+
+               assertThat(jobResult.isSuccess(), is(false));
+       }
+
+       @Test
+       public void testFailedJobIsFailureResult() {
+               final JobResult jobResult = JobResult.createFrom(
+                       new ArchivedExecutionGraphBuilder()
+                               .setJobID(new JobID())
+                               .setState(JobStatus.FAILED)
+                               .setFailureCause(new ErrorInfo(new 
FlinkException("Test exception"), 42L))
+                               .build());
+
+               assertThat(jobResult.isSuccess(), is(false));
+       }
+
+       @Test
+       public void testCancelledJobThrowsJobCancellationException() throws 
Exception {
+               final FlinkException cause = new FlinkException("Test 
exception");
+               final JobResult jobResult = JobResult.createFrom(
+                       new ArchivedExecutionGraphBuilder()
+                               .setJobID(new JobID())
+                               .setState(JobStatus.CANCELED)
+                               .setFailureCause(new ErrorInfo(cause, 42L))
+                               .build());
+
+               try {
+                       
jobResult.toJobExecutionResult(getClass().getClassLoader());
+                       fail("Job should fail with an 
JobCancellationException.");
+               } catch (JobCancellationException expected) {
+                       assertThat(expected.getCause(), is(equalTo(cause)));
+               }
+       }
+
+       @Test
+       public void testFailedJobThrowsJobExecutionException() throws Exception 
{
+               final FlinkException cause = new FlinkException("Test 
exception");
+               final JobResult jobResult = JobResult.createFrom(
+                       new ArchivedExecutionGraphBuilder()
+                               .setJobID(new JobID())
+                               .setState(JobStatus.FAILED)
+                               .setFailureCause(new ErrorInfo(cause, 42L))
+                               .build());
+
+               try {
+                       
jobResult.toJobExecutionResult(getClass().getClassLoader());
+                       fail("Job should fail with JobExecutionException.");
+               } catch (JobExecutionException expected) {
+                       assertThat(expected.getCause(), is(equalTo(cause)));
+               }
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
index 9534d2bca7c..c8cc7f3e7c4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobExecutionResultResponseBodyTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.rest.messages.job;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
 import org.apache.flink.util.OptionalFailure;
@@ -64,12 +65,14 @@
                return Arrays.asList(new Object[][] {
                        {JobExecutionResultResponseBody.created(new 
JobResult.Builder()
                                .jobId(TEST_JOB_ID)
+                               .applicationStatus(ApplicationStatus.SUCCEEDED)
                                .netRuntime(TEST_NET_RUNTIME)
                                .accumulatorResults(TEST_ACCUMULATORS)
                                .serializedThrowable(new 
SerializedThrowable(new RuntimeException("expected")))
                                .build())},
                        {JobExecutionResultResponseBody.created(new 
JobResult.Builder()
                                .jobId(TEST_JOB_ID)
+                               .applicationStatus(ApplicationStatus.FAILED)
                                .netRuntime(TEST_NET_RUNTIME)
                                .accumulatorResults(TEST_ACCUMULATORS)
                                .build())},
@@ -108,6 +111,7 @@ protected void assertOriginalEqualsToUnmarshalled(
                        assertNotNull(actualJobExecutionResult);
 
                        assertThat(actualJobExecutionResult.getJobId(), 
equalTo(expectedJobExecutionResult.getJobId()));
+                       
assertThat(actualJobExecutionResult.getApplicationStatus(), 
equalTo(expectedJobExecutionResult.getApplicationStatus()));
                        assertThat(actualJobExecutionResult.getNetRuntime(), 
equalTo(expectedJobExecutionResult.getNetRuntime()));
                        
assertThat(actualJobExecutionResult.getAccumulatorResults(), 
equalTo(expectedJobExecutionResult.getAccumulatorResults()));
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Return failed JobResult if job terminates in state FAILED or CANCELED
> ---------------------------------------------------------------------
>
>                 Key: FLINK-10400
>                 URL: https://issues.apache.org/jira/browse/FLINK-10400
>             Project: Flink
>          Issue Type: Bug
>          Components: Client
>    Affects Versions: 1.6.1, 1.7.0, 1.5.4
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0, 1.6.2, 1.5.5
>
>
> If the job reaches the globally terminal state {{FAILED}} or {{CANCELED}}, 
> the {{JobResult}} must return a non-successful result. At the moment, it can 
> happen that in the {{CANCELED}} state where we don't find a failure cause 
> that we return a successful {{JobResult}}.
> In order to change this I propose to always return a {{JobResult}} with a 
> {{JobCancellationException}} in case of {{CANCELED}} and a 
> {{JobExecutionException}} in case of {{FAILED}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to