Repository: flink
Updated Branches:
  refs/heads/master 976d004ce -> ac8225fd5


[FLINK-8449] [flip6] Extend OnCompletionActions to accept an 
SerializableExecutionGraph

This commit changes the OnCompletionActions interface such that it accepts an
ArchivedExecutionGraph instead of a plain JobResult. This allows to
archive the completed ExecutionGraph for further usage in the container
component of the JobMasterRunner.

This closes #5308.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f9dbeca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f9dbeca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f9dbeca

Branch: refs/heads/master
Commit: 8f9dbeca8bbb8f74bc17410b2f39903ea1f95af1
Parents: 976d004
Author: Till Rohrmann <[email protected]>
Authored: Thu Jan 25 17:03:39 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Fri Jan 26 13:49:46 2018 +0100

----------------------------------------------------------------------
 .../common/accumulators/AccumulatorHelper.java  | 23 ++---
 .../FailedAccumulatorSerialization.java         | 73 ++++++++++++++++
 .../FailedAccumulatorSerializationTest.java     | 89 ++++++++++++++++++++
 .../flink/runtime/dispatcher/Dispatcher.java    | 41 ++++-----
 .../entrypoint/JobClusterEntrypoint.java        | 27 ++----
 .../executiongraph/AccessExecutionGraph.java    |  7 +-
 .../executiongraph/ArchivedExecutionGraph.java  | 59 +++++++++++--
 .../flink/runtime/executiongraph/ErrorInfo.java |  3 +-
 .../runtime/executiongraph/ExecutionGraph.java  | 59 ++++---------
 .../runtime/jobmanager/OnCompletionActions.java | 15 +---
 .../runtime/jobmaster/JobManagerRunner.java     | 21 +----
 .../flink/runtime/jobmaster/JobMaster.java      | 61 +-------------
 .../flink/runtime/jobmaster/JobResult.java      | 37 ++++++++
 .../minicluster/MiniClusterJobDispatcher.java   | 20 +----
 .../rest/handler/job/JobExceptionsHandler.java  |  2 +-
 .../handler/legacy/JobExceptionsHandler.java    |  2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  5 +-
 .../runtime/dispatcher/DispatcherTest.java      | 27 +++---
 .../ArchivedExecutionGraphTest.java             |  4 +-
 .../jobmaster/JobManagerRunnerMockTest.java     | 31 +++----
 .../flink/runtime/jobmaster/JobMasterTest.java  |  8 +-
 .../legacy/JobExceptionsHandlerTest.java        |  3 +-
 .../utils/ArchivedExecutionGraphBuilder.java    |  6 +-
 23 files changed, 377 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index 3282302..78fb68b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -28,13 +28,16 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * Helper functions for the interaction with {@link Accumulator}.
+ */
 @Internal
 public class AccumulatorHelper {
 
        /**
         * Merge two collections of accumulators. The second will be merged 
into the
         * first.
-        * 
+        *
         * @param target
         *            The collection of accumulators that will be updated
         * @param toMerge
@@ -59,7 +62,7 @@ public class AccumulatorHelper {
        }
 
        /**
-        * Workaround method for type safety
+        * Workaround method for type safety.
         */
        private static <V, R extends Serializable> void 
mergeSingle(Accumulator<?, ?> target,
                                                                                
                                        Accumulator<?, ?> toMerge) {
@@ -74,14 +77,13 @@ public class AccumulatorHelper {
 
        /**
         * Compare both classes and throw {@link UnsupportedOperationException} 
if
-        * they differ
+        * they differ.
         */
        @SuppressWarnings("rawtypes")
-       public static void compareAccumulatorTypes(Object name,
-                                                                               
                Class<? extends Accumulator> first,
-                                                                               
                Class<? extends Accumulator> second)
-                       throws UnsupportedOperationException
-       {
+       public static void compareAccumulatorTypes(
+                       Object name,
+                       Class<? extends Accumulator> first,
+                       Class<? extends Accumulator> second) throws 
UnsupportedOperationException {
                if (first == null || second == null) {
                        throw new NullPointerException();
                }
@@ -102,7 +104,7 @@ public class AccumulatorHelper {
 
        /**
         * Transform the Map with accumulators into a Map containing only the
-        * results
+        * results.
         */
        public static Map<String, Object> toResultMap(Map<String, 
Accumulator<?, ?>> accumulators) {
                Map<String, Object> resultMap = new HashMap<String, Object>();
@@ -134,7 +136,7 @@ public class AccumulatorHelper {
        public static Map<String, Accumulator<?, ?>> copy(Map<String, 
Accumulator<?, ?>> accumulators) {
                Map<String, Accumulator<?, ?>> result = new HashMap<String, 
Accumulator<?, ?>>();
 
-               for(Map.Entry<String, Accumulator<?, ?>> entry: 
accumulators.entrySet()){
+               for (Map.Entry<String, Accumulator<?, ?>> entry: 
accumulators.entrySet()){
                        result.put(entry.getKey(), entry.getValue().clone());
                }
 
@@ -172,5 +174,4 @@ public class AccumulatorHelper {
 
                return accumulators;
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-core/src/main/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerialization.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerialization.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerialization.java
new file mode 100644
index 0000000..b208b9e
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerialization.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.accumulators;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * {@link Accumulator} implementation which indicates a serialization problem 
with the original
+ * accumulator. Accessing any of the {@link Accumulator} method will result in 
throwing the
+ * serialization exception.
+ *
+ * @param <V> type of the value
+ * @param <R> type of the accumulator result
+ */
+public class FailedAccumulatorSerialization<V, R extends Serializable> 
implements Accumulator<V, R> {
+       private static final long serialVersionUID = 6965908827065879760L;
+
+       private final Throwable throwable;
+
+       public FailedAccumulatorSerialization(Throwable throwable) {
+               this.throwable = Preconditions.checkNotNull(throwable);
+       }
+
+       public Throwable getThrowable() {
+               return throwable;
+       }
+
+       @Override
+       public void add(V value) {
+               ExceptionUtils.rethrow(throwable);
+       }
+
+       @Override
+       public R getLocalValue() {
+               ExceptionUtils.rethrow(throwable);
+               return null;
+       }
+
+       @Override
+       public void resetLocal() {
+               ExceptionUtils.rethrow(throwable);
+       }
+
+       @Override
+       public void merge(Accumulator<V, R> other) {
+               ExceptionUtils.rethrow(throwable);
+       }
+
+       @Override
+       public Accumulator<V, R> clone() {
+               ExceptionUtils.rethrow(throwable);
+               return null;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-core/src/test/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerializationTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerializationTest.java
new file mode 100644
index 0000000..7335d30
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerializationTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.accumulators;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FailedAccumulatorSerialization}.
+ */
+public class FailedAccumulatorSerializationTest extends TestLogger {
+
+       private static final IOException TEST_EXCEPTION = new IOException("Test 
exception");
+
+       /**
+        * Tests that any method call will throw the contained throwable 
(wrapped in an
+        * unchecked exception if it is checked).
+        */
+       @Test
+       public void testMethodCallThrowsException() {
+               final FailedAccumulatorSerialization<Integer, Integer> 
accumulator = new FailedAccumulatorSerialization<>(TEST_EXCEPTION);
+
+               try {
+                       accumulator.getLocalValue();
+               } catch (RuntimeException re) {
+                       assertThat(ExceptionUtils.findThrowableWithMessage(re, 
TEST_EXCEPTION.getMessage()).isPresent(), is(true));
+               }
+
+               try {
+                       accumulator.resetLocal();
+               } catch (RuntimeException re) {
+                       assertThat(ExceptionUtils.findThrowableWithMessage(re, 
TEST_EXCEPTION.getMessage()).isPresent(), is(true));
+               }
+
+               try {
+                       accumulator.add(1);
+               } catch (RuntimeException re) {
+                       assertThat(ExceptionUtils.findThrowableWithMessage(re, 
TEST_EXCEPTION.getMessage()).isPresent(), is(true));
+               }
+
+               try {
+                       accumulator.merge(new IntMinimum());
+               } catch (RuntimeException re) {
+                       assertThat(ExceptionUtils.findThrowableWithMessage(re, 
TEST_EXCEPTION.getMessage()).isPresent(), is(true));
+               }
+       }
+
+       /**
+        * Tests that the class can be serialized and deserialized using Java 
serialization.
+        */
+       @Test
+       public void testSerialization() throws Exception {
+               final FailedAccumulatorSerialization<?, ?> accumulator = new 
FailedAccumulatorSerialization<>(TEST_EXCEPTION);
+
+               final byte[] serializedAccumulator = 
InstantiationUtil.serializeObject(accumulator);
+
+               final FailedAccumulatorSerialization<?, ?> 
deserializedAccumulator = 
InstantiationUtil.deserializeObject(serializedAccumulator, 
ClassLoader.getSystemClassLoader());
+
+               assertThat(deserializedAccumulator.getThrowable(), 
is(instanceOf(TEST_EXCEPTION.getClass())));
+               assertThat(deserializedAccumulator.getThrowable().getMessage(), 
is(equalTo(TEST_EXCEPTION.getMessage())));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index d0b1591..0271913 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
@@ -485,6 +486,19 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                fatalErrorHandler.onFatalError(throwable);
        }
 
+       private void jobReachedGloballyTerminalState(AccessExecutionGraph 
accessExecutionGraph) {
+               final JobResult jobResult = 
JobResult.createFrom(accessExecutionGraph);
+
+               jobExecutionResultCache.put(jobResult);
+               final JobID jobId = accessExecutionGraph.getJobID();
+
+               try {
+                       removeJob(jobId, true);
+               } catch (Exception e) {
+                       log.warn("Could not properly remove job {} from the 
dispatcher.", jobId, e);
+               }
+       }
+
        protected abstract JobManagerRunner createJobManagerRunner(
                ResourceID resourceId,
                JobGraph jobGraph,
@@ -607,31 +621,10 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
                }
 
                @Override
-               public void jobFinished(JobResult result) {
-                       log.info("Job {} finished.", jobId);
+               public void 
jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
+                       log.info("Job {} reached globally terminal state {}.", 
jobId, executionGraph.getState());
 
-                       runAsync(() -> {
-                               jobExecutionResultCache.put(result);
-                               try {
-                                       removeJob(jobId, true);
-                               } catch (Exception e) {
-                                       log.warn("Could not properly remove job 
{} from the dispatcher.", jobId, e);
-                               }
-                       });
-               }
-
-               @Override
-               public void jobFailed(JobResult result) {
-                       log.info("Job {} failed.", jobId);
-
-                       runAsync(() -> {
-                               jobExecutionResultCache.put(result);
-                               try {
-                                       removeJob(jobId, true);
-                               } catch (Exception e) {
-                                       log.warn("Could not properly remove job 
{} from the dispatcher.", jobId, e);
-                               }
-                       });
+                       runAsync(() -> 
Dispatcher.this.jobReachedGloballyTerminalState(executionGraph));
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index ede8d13..b90253a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -34,7 +36,6 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRestEndpoint;
-import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
@@ -54,7 +55,6 @@ import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedThrowable;
 
 import akka.actor.ActorSystem;
 
@@ -62,8 +62,6 @@ import javax.annotation.Nullable;
 
 import java.util.concurrent.Executor;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /**
  * Base class for per-job cluster entry points.
  */
@@ -298,23 +296,16 @@ public abstract class JobClusterEntrypoint extends 
ClusterEntrypoint {
                }
 
                @Override
-               public void jobFinished(JobResult result) {
+               public void 
jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
                        LOG.info("Job({}) finished.", jobId);
 
-                       shutDownAndTerminate(true, ApplicationStatus.SUCCEEDED, 
null);
-               }
-
-               @Override
-               public void jobFailed(JobResult result) {
-                       
checkArgument(result.getSerializedThrowable().isPresent());
-
-                       final SerializedThrowable serializedThrowable = 
result.getSerializedThrowable().get();
-
-                       final String errorMessage = 
serializedThrowable.getMessage();
+                       final ErrorInfo errorInfo = 
executionGraph.getFailureInfo();
 
-                       LOG.info("Job({}) failed: {}.", jobId, errorMessage);
-
-                       shutDownAndTerminate(true, ApplicationStatus.FAILED, 
errorMessage);
+                       if (errorInfo == null) {
+                               shutDownAndTerminate(true, 
ApplicationStatus.SUCCEEDED, null);
+                       } else {
+                               shutDownAndTerminate(true, 
ApplicationStatus.FAILED, errorInfo.getExceptionAsString());
+                       }
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 8d1fa1d..cc56209 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
@@ -28,7 +29,6 @@ import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.Map;
 
 /**
@@ -103,7 +103,7 @@ public interface AccessExecutionGraph {
        Iterable<? extends AccessExecutionVertex> getAllExecutionVertices();
 
        /**
-        * Returns the timestamp for the given {@link JobStatus}
+        * Returns the timestamp for the given {@link JobStatus}.
         *
         * @param status status for which the timestamp should be returned
         * @return timestamp for the given job status
@@ -154,9 +154,8 @@ public interface AccessExecutionGraph {
         * Returns a map containing the serialized values of user-defined 
accumulators.
         *
         * @return map containing serialized values of user-defined accumulators
-        * @throws IOException indicates that the serialization has failed
         */
-       Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws 
IOException;
+       Map<String, SerializedValue<Object>> getAccumulatorsSerialized();
 
        /**
         * Returns whether this execution graph was archived.

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 20c2c8f..d285b20 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
@@ -30,12 +31,17 @@ import org.apache.flink.util.SerializedValue;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
+/**
+ * An archived execution graph represents a serializable form of the {@link 
ExecutionGraph}.
+ */
 public class ArchivedExecutionGraph implements AccessExecutionGraph, 
Serializable {
 
        private static final long serialVersionUID = 7231383912742578428L;
@@ -47,10 +53,10 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
        /** The name of the original job graph. */
        private final String jobName;
 
-       /** All job vertices that are part of this graph */
+       /** All job vertices that are part of this graph. */
        private final Map<JobVertexID, ArchivedExecutionJobVertex> tasks;
 
-       /** All vertices, in the order in which they were created **/
+       /** All vertices, in the order in which they were created. **/
        private final List<ArchivedExecutionJobVertex> verticesInCreationOrder;
 
        /**
@@ -65,7 +71,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
 
        // ------ Execution status and progress. These values are volatile, and 
accessed under the lock -------
 
-       /** Current status of the job execution */
+       /** Current status of the job execution. */
        private final JobStatus state;
 
        /**
@@ -142,6 +148,7 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                return state;
        }
 
+       @Nullable
        @Override
        public ErrorInfo getFailureInfo() {
                return failureCause;
@@ -253,12 +260,10 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
 
                private int currPos;
 
-
                public AllVerticesIterator(Iterator<ArchivedExecutionJobVertex> 
jobVertices) {
                        this.jobVertices = jobVertices;
                }
 
-
                @Override
                public boolean hasNext() {
                        while (true) {
@@ -291,4 +296,48 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                        throw new UnsupportedOperationException();
                }
        }
+
+       /**
+        * Create a {@link ArchivedExecutionGraph} from the given {@link 
ExecutionGraph}.
+        *
+        * @param executionGraph to create the ArchivedExecutionGraph from
+        * @return ArchivedExecutionGraph created from the given ExecutionGraph
+        */
+       public static ArchivedExecutionGraph createFrom(ExecutionGraph 
executionGraph) {
+               final int numberVertices = 
executionGraph.getTotalNumberOfVertices();
+
+               Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = 
new HashMap<>(numberVertices);
+               List<ArchivedExecutionJobVertex> 
archivedVerticesInCreationOrder = new ArrayList<>(numberVertices);
+
+               for (ExecutionJobVertex task : 
executionGraph.getVerticesTopologically()) {
+                       ArchivedExecutionJobVertex archivedTask = 
task.archive();
+                       archivedVerticesInCreationOrder.add(archivedTask);
+                       archivedTasks.put(task.getJobVertexId(), archivedTask);
+               }
+
+               final Map<String, SerializedValue<Object>> 
serializedUserAccumulators = executionGraph.getAccumulatorsSerialized();
+
+               final long[] timestamps = new long[JobStatus.values().length];
+
+               for (JobStatus jobStatus : JobStatus.values()) {
+                       final int ordinal = jobStatus.ordinal();
+                       timestamps[ordinal] = 
executionGraph.getStatusTimestamp(jobStatus);
+               }
+
+               return new ArchivedExecutionGraph(
+                       executionGraph.getJobID(),
+                       executionGraph.getJobName(),
+                       archivedTasks,
+                       archivedVerticesInCreationOrder,
+                       timestamps,
+                       executionGraph.getState(),
+                       executionGraph.getFailureInfo(),
+                       executionGraph.getJsonPlan(),
+                       executionGraph.getAccumulatorResultsStringified(),
+                       serializedUserAccumulators,
+                       executionGraph.getArchivedExecutionConfig(),
+                       executionGraph.isStoppable(),
+                       executionGraph.getCheckpointCoordinatorConfiguration(),
+                       executionGraph.getCheckpointStatsSnapshot());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
index 9fe569f..311effb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
@@ -34,7 +34,6 @@ public class ErrorInfo implements Serializable {
        private final SerializedThrowable exception;
 
        private final long timestamp;
-
        public ErrorInfo(Throwable exception, long timestamp) {
                Preconditions.checkNotNull(exception);
                Preconditions.checkArgument(timestamp > 0);
@@ -48,7 +47,7 @@ public class ErrorInfo implements Serializable {
         * Returns the serialized form of the original exception.
         */
        public SerializedThrowable getException() {
-               return this.exception;
+               return exception;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index d187faa..188f6da 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.common.accumulators.FailedAccumulatorSerialization;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
@@ -51,7 +51,6 @@ import 
org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import 
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -62,6 +61,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateBackend;
@@ -148,7 +148,7 @@ import static 
org.apache.flink.util.Preconditions.checkState;
  * local failover (meaning there is a concurrent global failover), the 
failover strategy has to
  * yield before the global failover.
  */
-public class ExecutionGraph implements AccessExecutionGraph, 
Archiveable<ArchivedExecutionGraph> {
+public class ExecutionGraph implements AccessExecutionGraph {
 
        /** In place updater for the execution graph's current state. Avoids 
having to use an
         * AtomicReference and thus makes the frequent read access a bit faster 
*/
@@ -761,16 +761,27 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        /**
         * Gets a serialized accumulator map.
         * @return The accumulator map with serialized accumulator values.
-        * @throws IOException
         */
        @Override
-       public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() 
throws IOException {
+       public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() 
{
 
                Map<String, Accumulator<?, ?>> accumulatorMap = 
aggregateUserAccumulators();
 
                Map<String, SerializedValue<Object>> result = new 
HashMap<>(accumulatorMap.size());
                for (Map.Entry<String, Accumulator<?, ?>> entry : 
accumulatorMap.entrySet()) {
-                       result.put(entry.getKey(), new 
SerializedValue<>(entry.getValue().getLocalValue()));
+
+                       try {
+                               final SerializedValue<Object> serializedValue = 
new SerializedValue<>(entry.getValue().getLocalValue());
+                               result.put(entry.getKey(), serializedValue);
+                       } catch (IOException ioe) {
+                               LOG.error("Could not serialize accumulator " + 
entry.getKey() + '.', ioe);
+
+                               try {
+                                       result.put(entry.getKey(), new 
SerializedValue<>(new FailedAccumulatorSerialization(ioe)));
+                               } catch (IOException e) {
+                                       throw new RuntimeException("It should 
never happen that we cannot serialize the accumulator serialization 
exception.", e);
+                               }
+                       }
                }
 
                return result;
@@ -1687,40 +1698,4 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        }
                }
        }
-
-       @Override
-       public ArchivedExecutionGraph archive() {
-               Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = 
new HashMap<>(verticesInCreationOrder.size());
-               List<ArchivedExecutionJobVertex> 
archivedVerticesInCreationOrder = new 
ArrayList<>(verticesInCreationOrder.size());
-
-               for (ExecutionJobVertex task : verticesInCreationOrder) {
-                       ArchivedExecutionJobVertex archivedTask = 
task.archive();
-                       archivedVerticesInCreationOrder.add(archivedTask);
-                       archivedTasks.put(task.getJobVertexId(), archivedTask);
-               }
-
-               Map<String, SerializedValue<Object>> serializedUserAccumulators;
-               try {
-                       serializedUserAccumulators = 
getAccumulatorsSerialized();
-               } catch (Exception e) {
-                       LOG.warn("Error occurred while archiving user 
accumulators.", e);
-                       serializedUserAccumulators = Collections.emptyMap();
-               }
-
-               return new ArchivedExecutionGraph(
-                       getJobID(),
-                       getJobName(),
-                       archivedTasks,
-                       archivedVerticesInCreationOrder,
-                       stateTimestamps,
-                       getState(),
-                       failureInfo,
-                       getJsonPlan(),
-                       getAccumulatorResultsStringified(),
-                       serializedUserAccumulators,
-                       getArchivedExecutionConfig(),
-                       isStoppable(),
-                       getCheckpointCoordinatorConfiguration(),
-                       getCheckpointStatsSnapshot());
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
index 149ea0f..66ca4ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 
 /**
  * Interface for completion actions once a Flink job has reached
@@ -27,18 +27,11 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 public interface OnCompletionActions {
 
        /**
-        * Job finished successfully.
+        * Job reached a globally terminal state.
         *
-        * @param result of the job execution
+        * @param executionGraph serializable execution graph
         */
-       void jobFinished(JobResult result);
-
-       /**
-        * Job failed with an exception.
-        *
-        * @param result The result of the job carrying the failure cause.
-        */
-       void jobFailed(JobResult result);
+       void jobReachedGloballyTerminalState(ArchivedExecutionGraph 
executionGraph);
 
        /**
         * Job was finished by another JobMaster.

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 4833cbd..5af6c67 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
@@ -251,30 +252,14 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
         * Job completion notification triggered by JobManager.
         */
        @Override
-       public void jobFinished(JobResult result) {
+       public void jobReachedGloballyTerminalState(ArchivedExecutionGraph 
executionGraph) {
                try {
                        unregisterJobFromHighAvailability();
                        shutdownInternally();
                }
                finally {
                        if (toNotifyOnComplete != null) {
-                               toNotifyOnComplete.jobFinished(result);
-                       }
-               }
-       }
-
-       /**
-        * Job completion notification triggered by JobManager.
-        */
-       @Override
-       public void jobFailed(JobResult result) {
-               try {
-                       unregisterJobFromHighAvailability();
-                       shutdownInternally();
-               }
-               finally {
-                       if (toNotifyOnComplete != null) {
-                               toNotifyOnComplete.jobFailed(result);
+                               
toNotifyOnComplete.jobReachedGloballyTerminalState(executionGraph);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 46518eb..ef99d52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -34,13 +34,13 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -100,8 +100,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedThrowable;
-import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 
@@ -787,7 +785,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
        @Override
        public CompletableFuture<AccessExecutionGraph> 
requestArchivedExecutionGraph(Time timeout) {
-               return 
CompletableFuture.completedFuture(executionGraph.archive());
+               return 
CompletableFuture.completedFuture(ArchivedExecutionGraph.createFrom(executionGraph));
        }
 
        @Override
@@ -989,60 +987,9 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        @Nullable final Throwable error) {
                validateRunsInMainThread();
 
-               final JobID jobID = executionGraph.getJobID();
-               final String jobName = executionGraph.getJobName();
-               final JobResult.Builder builder = new JobResult.Builder()
-                       .jobId(jobID)
-                       .netRuntime(0);
-
                if (newJobStatus.isGloballyTerminalState()) {
-                       switch (newJobStatus) {
-                               case FINISHED:
-                                       try {
-                                               // TODO get correct job duration
-                                               // job done, let's get the 
accumulators
-                                               final Map<String, 
SerializedValue<Object>> accumulatorsSerialized = 
executionGraph.getAccumulatorsSerialized();
-                                               
builder.accumulatorResults(accumulatorsSerialized);
-                                               executor.execute(() -> 
jobCompletionActions.jobFinished(builder.build()));
-                                       }
-                                       catch (Exception e) {
-                                               log.error("Cannot fetch final 
accumulators for job {} ({})", jobName, jobID, e);
-
-                                               final JobExecutionException 
exception = new JobExecutionException(jobID,
-                                                               "Failed to 
retrieve accumulator results. " +
-                                                               "The job is 
registered as 'FINISHED (successful), but this notification describes " +
-                                                               "a failure, 
since the resulting accumulators could not be fetched.", e);
-
-                                               executor.execute(() -> 
jobCompletionActions.jobFailed(builder
-                                                       
.serializedThrowable(new SerializedThrowable(exception))
-                                                       .build()));
-                                       }
-                                       break;
-
-                               case CANCELED: {
-                                       final JobExecutionException exception = 
new JobExecutionException(
-                                               jobID, "Job was cancelled.", 
new Exception("The job was cancelled"));
-
-                                       executor.execute(() -> 
jobCompletionActions.jobFailed(builder
-                                               .serializedThrowable(new 
SerializedThrowable(exception))
-                                               .build()));
-                                       break;
-                               }
-
-                               case FAILED: {
-                                       final Throwable unpackedError = 
SerializedThrowable.get(error, userCodeLoader);
-                                       final JobExecutionException exception = 
new JobExecutionException(
-                                                       jobID, "Job execution 
failed.", unpackedError);
-                                       executor.execute(() -> 
jobCompletionActions.jobFailed(builder
-                                               .serializedThrowable(new 
SerializedThrowable(exception))
-                                               .build()));
-                                       break;
-                               }
-
-                               default:
-                                       // this can happen only if the enum is 
buggy
-                                       throw new 
IllegalStateException(newJobStatus.toString());
-                       }
+                       final ArchivedExecutionGraph archivedExecutionGraph = 
ArchivedExecutionGraph.createFrom(executionGraph);
+                       executor.execute(() -> 
jobCompletionActions.jobReachedGloballyTerminalState(archivedExecutionGraph));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
index 4a409d5..5a5e713 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
 
@@ -138,4 +141,38 @@ public class JobResult implements Serializable {
                }
        }
 
+       /**
+        * Creates the {@link JobResult} from the given {@link 
AccessExecutionGraph} which
+        * must be in a globally terminal state.
+        *
+        * @param accessExecutionGraph to create the JobResult from
+        * @return JobResult of the given AccessExecutionGraph
+        */
+       public static JobResult createFrom(AccessExecutionGraph 
accessExecutionGraph) {
+               final JobID jobId = accessExecutionGraph.getJobID();
+               final JobStatus jobStatus = accessExecutionGraph.getState();
+
+               checkArgument(
+                       jobStatus.isGloballyTerminalState(),
+                       "The job " + accessExecutionGraph.getJobName() + '(' + 
jobId + ") is not in a globally " +
+                               "terminal state. It is in state " + jobStatus + 
'.');
+
+               final JobResult.Builder builder = new JobResult.Builder();
+               builder.jobId(jobId);
+
+               final long netRuntime = 
accessExecutionGraph.getStatusTimestamp(jobStatus) - 
accessExecutionGraph.getStatusTimestamp(JobStatus.CREATED);
+               builder.netRuntime(netRuntime);
+               
builder.accumulatorResults(accessExecutionGraph.getAccumulatorsSerialized());
+
+               if (jobStatus != JobStatus.FINISHED) {
+                       final ErrorInfo errorInfo = 
accessExecutionGraph.getFailureInfo();
+
+                       if (errorInfo != null) {
+                               
builder.serializedThrowable(errorInfo.getException());
+                       }
+               }
+
+               return builder.build();
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index b9d76da..7e49cef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -361,12 +362,7 @@ public class MiniClusterJobDispatcher {
                }
 
                @Override
-               public void jobFinished(JobResult result) {
-                       decrementCheckAndCleanup();
-               }
-
-               @Override
-               public void jobFailed(JobResult result) {
+               public void 
jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
                        decrementCheckAndCleanup();
                }
 
@@ -414,16 +410,8 @@ public class MiniClusterJobDispatcher {
                }
 
                @Override
-               public void jobFinished(JobResult result) {
-                       this.result = result;
-                       jobMastersToWaitFor.countDown();
-               }
-
-               @Override
-               public void jobFailed(JobResult result) {
-                       
checkArgument(result.getSerializedThrowable().isPresent());
-
-                       this.result = result;
+               public void 
jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
+                       this.result = JobResult.createFrom(executionGraph);
                        jobMastersToWaitFor.countDown();
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 70b9d35..63dc604 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -71,7 +71,7 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphHandler<JobExcep
                ErrorInfo rootException = executionGraph.getFailureInfo();
                String rootExceptionMessage = null;
                Long rootTimestamp = null;
-               if (rootException != null && 
!rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION))
 {
+               if (rootException != null) {
                        rootExceptionMessage = 
rootException.getExceptionAsString();
                        rootTimestamp = rootException.getTimestamp();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
index a6bae86..7b1487c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
@@ -93,7 +93,7 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
 
                // most important is the root failure cause
                ErrorInfo rootException = graph.getFailureInfo();
-               if (rootException != null && 
!rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION))
 {
+               if (rootException != null) {
                        gen.writeStringField("root-exception", 
rootException.getExceptionAsString());
                        gen.writeNumberField("timestamp", 
rootException.getTimestamp());
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 325e955..b795609 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1727,7 +1727,10 @@ class JobManager(
           }(context.dispatcher))
 
           try {
-            archive ! decorateMessage(ArchiveExecutionGraph(jobID, 
eg.archive()))
+            archive ! decorateMessage(
+              ArchiveExecutionGraph(
+                jobID,
+                ArchivedExecutionGraph.createFrom(eg)))
           } catch {
             case t: Throwable => log.warn(s"Could not archive the execution 
graph $eg.", t)
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index b5fcd18..76a3c98 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -26,11 +26,14 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
@@ -45,6 +48,7 @@ import 
org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
@@ -54,7 +58,6 @@ import 
org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -270,11 +273,13 @@ public class DispatcherTest extends TestLogger {
                final JobID failedJobId = new JobID();
                onCompletionActions = dispatcher.new 
DispatcherOnCompleteActions(failedJobId);
 
-               onCompletionActions.jobFailed(new JobResult.Builder()
-                       .jobId(failedJobId)
-                       .serializedThrowable(new SerializedThrowable(new 
RuntimeException("expected")))
-                       .netRuntime(Long.MAX_VALUE)
-                       .build());
+               final ArchivedExecutionGraph failedExecutionGraph = new 
ArchivedExecutionGraphBuilder()
+                       .setJobID(failedJobId)
+                       .setState(JobStatus.FAILED)
+                       .setFailureCause(new ErrorInfo(new 
RuntimeException("expected"), 1L))
+                       .build();
+
+               
onCompletionActions.jobReachedGloballyTerminalState(failedExecutionGraph);
 
                assertThat(
                        
dispatcherGateway.isJobExecutionResultPresent(failedJobId, TIMEOUT).get(),
@@ -288,10 +293,12 @@ public class DispatcherTest extends TestLogger {
                final JobID successJobId = new JobID();
                onCompletionActions = dispatcher.new 
DispatcherOnCompleteActions(successJobId);
 
-               onCompletionActions.jobFinished(new JobResult.Builder()
-                       .jobId(successJobId)
-                       .netRuntime(Long.MAX_VALUE)
-                       .build());
+               final ArchivedExecutionGraph succeededExecutionGraph = new 
ArchivedExecutionGraphBuilder()
+                       .setJobID(successJobId)
+                       .setState(JobStatus.FINISHED)
+                       .build();
+
+               
onCompletionActions.jobReachedGloballyTerminalState(succeededExecutionGraph);
 
                assertThat(
                        
dispatcherGateway.isJobExecutionResultPresent(successJobId, TIMEOUT).get(),

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 5635763..8bc5170 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -142,14 +142,14 @@ public class ArchivedExecutionGraphTest extends 
TestLogger {
 
        @Test
        public void testArchive() throws IOException, ClassNotFoundException {
-               ArchivedExecutionGraph archivedGraph = runtimeGraph.archive();
+               ArchivedExecutionGraph archivedGraph = 
ArchivedExecutionGraph.createFrom(runtimeGraph);
 
                compareExecutionGraph(runtimeGraph, archivedGraph);
        }
 
        @Test
        public void testSerialization() throws IOException, 
ClassNotFoundException {
-               ArchivedExecutionGraph archivedGraph = runtimeGraph.archive();
+               ArchivedExecutionGraph archivedGraph = 
ArchivedExecutionGraph.createFrom(runtimeGraph);
 
                verifySerializability(archivedGraph);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index 245ea27..a69235a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -24,20 +24,23 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.testutils.category.Flip6;
-import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -176,7 +179,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
                assertTrue(!jobCompletion.isJobFinished());
 
                // runner been told by JobManager that job is finished
-               runner.jobFinished(mock(JobResult.class));
+               
runner.jobReachedGloballyTerminalState(mock(ArchivedExecutionGraph.class));
 
                assertTrue(jobCompletion.isJobFinished());
                assertFalse(jobCompletion.isJobFinishedByOther());
@@ -195,11 +198,13 @@ public class JobManagerRunnerMockTest extends TestLogger {
                verify(jobManager).start(eq(jobMasterId), any(Time.class));
                assertTrue(!jobCompletion.isJobFinished());
 
+               final ArchivedExecutionGraph failedExecutionGraph = new 
ArchivedExecutionGraphBuilder()
+                       .setFailureCause(new ErrorInfo(new Exception("failed 
manually"), 1L))
+                       .setState(JobStatus.FAILED)
+                       .build();
+
                // runner been told by JobManager that job is failed
-               runner.jobFailed(new JobResult.Builder()
-                       .jobId(new JobID())
-                       .serializedThrowable(new SerializedThrowable(new 
Exception("failed manually")))
-                       .build());
+               runner.jobReachedGloballyTerminalState(failedExecutionGraph);
 
                assertTrue(jobCompletion.isJobFailed());
                verify(leaderElectionService).stop();
@@ -250,19 +255,15 @@ public class JobManagerRunnerMockTest extends TestLogger {
                private volatile boolean finishedByOther;
 
                @Override
-               public void jobFinished(JobResult result) {
+               public void 
jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
                        checkArgument(!isJobFinished(), "job finished already");
                        checkArgument(!isJobFailed(), "job failed already");
 
-                       this.result = result;
-               }
-
-               @Override
-               public void jobFailed(JobResult result) {
-                       checkArgument(!isJobFinished(), "job finished already");
-                       checkArgument(!isJobFailed(), "job failed already");
+                       this.result = JobResult.createFrom(executionGraph);
 
-                       this.failedCause = 
result.getSerializedThrowable().get();
+                       if (!result.isSuccess()) {
+                               failedCause = 
result.getSerializedThrowable().get();
+                       }
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index d77a1d4..d20e1fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
@@ -265,12 +266,7 @@ public class JobMasterTest extends TestLogger {
        private static final class NoOpOnCompletionActions implements 
OnCompletionActions {
 
                @Override
-               public void jobFinished(final JobResult result) {
-
-               }
-
-               @Override
-               public void jobFailed(final JobResult result) {
+               public void 
jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
 
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
index 0e96f36..7381a59 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -42,7 +43,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the JobExceptionsHandler.
  */
-public class JobExceptionsHandlerTest {
+public class JobExceptionsHandlerTest extends TestLogger {
 
        @Test
        public void testArchiver() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
index 68077ba..ee7ceda 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
@@ -117,9 +117,13 @@ public class ArchivedExecutionGraphBuilder {
        }
 
        public ArchivedExecutionGraph build() {
-               Preconditions.checkNotNull(tasks, "Tasks must not be null.");
                JobID jobID = this.jobID != null ? this.jobID : new JobID();
                String jobName = this.jobName != null ? this.jobName : "job_" + 
RANDOM.nextInt();
+
+               if (tasks == null) {
+                       tasks = Collections.emptyMap();
+               }
+
                return new ArchivedExecutionGraph(
                        jobID,
                        jobName,

Reply via email to