Repository: flink Updated Branches: refs/heads/master 976d004ce -> ac8225fd5
[FLINK-8449] [flip6] Extend OnCompletionActions to accept an SerializableExecutionGraph This commit changes the OnCompletionActions interface such that it accepts an ArchivedExecutionGraph instead of a plain JobResult. This allows to archive the completed ExecutionGraph for further usage in the container component of the JobMasterRunner. This closes #5308. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f9dbeca Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f9dbeca Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f9dbeca Branch: refs/heads/master Commit: 8f9dbeca8bbb8f74bc17410b2f39903ea1f95af1 Parents: 976d004 Author: Till Rohrmann <[email protected]> Authored: Thu Jan 25 17:03:39 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Fri Jan 26 13:49:46 2018 +0100 ---------------------------------------------------------------------- .../common/accumulators/AccumulatorHelper.java | 23 ++--- .../FailedAccumulatorSerialization.java | 73 ++++++++++++++++ .../FailedAccumulatorSerializationTest.java | 89 ++++++++++++++++++++ .../flink/runtime/dispatcher/Dispatcher.java | 41 ++++----- .../entrypoint/JobClusterEntrypoint.java | 27 ++---- .../executiongraph/AccessExecutionGraph.java | 7 +- .../executiongraph/ArchivedExecutionGraph.java | 59 +++++++++++-- .../flink/runtime/executiongraph/ErrorInfo.java | 3 +- .../runtime/executiongraph/ExecutionGraph.java | 59 ++++--------- .../runtime/jobmanager/OnCompletionActions.java | 15 +--- .../runtime/jobmaster/JobManagerRunner.java | 21 +---- .../flink/runtime/jobmaster/JobMaster.java | 61 +------------- .../flink/runtime/jobmaster/JobResult.java | 37 ++++++++ .../minicluster/MiniClusterJobDispatcher.java | 20 +---- .../rest/handler/job/JobExceptionsHandler.java | 2 +- .../handler/legacy/JobExceptionsHandler.java | 2 +- .../flink/runtime/jobmanager/JobManager.scala | 5 +- .../runtime/dispatcher/DispatcherTest.java | 27 +++--- .../ArchivedExecutionGraphTest.java | 4 +- .../jobmaster/JobManagerRunnerMockTest.java | 31 +++---- .../flink/runtime/jobmaster/JobMasterTest.java | 8 +- .../legacy/JobExceptionsHandlerTest.java | 3 +- .../utils/ArchivedExecutionGraphBuilder.java | 6 +- 23 files changed, 377 insertions(+), 246 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java index 3282302..78fb68b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java @@ -28,13 +28,16 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +/** + * Helper functions for the interaction with {@link Accumulator}. + */ @Internal public class AccumulatorHelper { /** * Merge two collections of accumulators. The second will be merged into the * first. - * + * * @param target * The collection of accumulators that will be updated * @param toMerge @@ -59,7 +62,7 @@ public class AccumulatorHelper { } /** - * Workaround method for type safety + * Workaround method for type safety. */ private static <V, R extends Serializable> void mergeSingle(Accumulator<?, ?> target, Accumulator<?, ?> toMerge) { @@ -74,14 +77,13 @@ public class AccumulatorHelper { /** * Compare both classes and throw {@link UnsupportedOperationException} if - * they differ + * they differ. */ @SuppressWarnings("rawtypes") - public static void compareAccumulatorTypes(Object name, - Class<? extends Accumulator> first, - Class<? extends Accumulator> second) - throws UnsupportedOperationException - { + public static void compareAccumulatorTypes( + Object name, + Class<? extends Accumulator> first, + Class<? extends Accumulator> second) throws UnsupportedOperationException { if (first == null || second == null) { throw new NullPointerException(); } @@ -102,7 +104,7 @@ public class AccumulatorHelper { /** * Transform the Map with accumulators into a Map containing only the - * results + * results. */ public static Map<String, Object> toResultMap(Map<String, Accumulator<?, ?>> accumulators) { Map<String, Object> resultMap = new HashMap<String, Object>(); @@ -134,7 +136,7 @@ public class AccumulatorHelper { public static Map<String, Accumulator<?, ?>> copy(Map<String, Accumulator<?, ?>> accumulators) { Map<String, Accumulator<?, ?>> result = new HashMap<String, Accumulator<?, ?>>(); - for(Map.Entry<String, Accumulator<?, ?>> entry: accumulators.entrySet()){ + for (Map.Entry<String, Accumulator<?, ?>> entry: accumulators.entrySet()){ result.put(entry.getKey(), entry.getValue().clone()); } @@ -172,5 +174,4 @@ public class AccumulatorHelper { return accumulators; } - } http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-core/src/main/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerialization.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerialization.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerialization.java new file mode 100644 index 0000000..b208b9e --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerialization.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.accumulators; + +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; + +/** + * {@link Accumulator} implementation which indicates a serialization problem with the original + * accumulator. Accessing any of the {@link Accumulator} method will result in throwing the + * serialization exception. + * + * @param <V> type of the value + * @param <R> type of the accumulator result + */ +public class FailedAccumulatorSerialization<V, R extends Serializable> implements Accumulator<V, R> { + private static final long serialVersionUID = 6965908827065879760L; + + private final Throwable throwable; + + public FailedAccumulatorSerialization(Throwable throwable) { + this.throwable = Preconditions.checkNotNull(throwable); + } + + public Throwable getThrowable() { + return throwable; + } + + @Override + public void add(V value) { + ExceptionUtils.rethrow(throwable); + } + + @Override + public R getLocalValue() { + ExceptionUtils.rethrow(throwable); + return null; + } + + @Override + public void resetLocal() { + ExceptionUtils.rethrow(throwable); + } + + @Override + public void merge(Accumulator<V, R> other) { + ExceptionUtils.rethrow(throwable); + } + + @Override + public Accumulator<V, R> clone() { + ExceptionUtils.rethrow(throwable); + return null; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-core/src/test/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerializationTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerializationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerializationTest.java new file mode 100644 index 0000000..7335d30 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerializationTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.accumulators; + +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for the {@link FailedAccumulatorSerialization}. + */ +public class FailedAccumulatorSerializationTest extends TestLogger { + + private static final IOException TEST_EXCEPTION = new IOException("Test exception"); + + /** + * Tests that any method call will throw the contained throwable (wrapped in an + * unchecked exception if it is checked). + */ + @Test + public void testMethodCallThrowsException() { + final FailedAccumulatorSerialization<Integer, Integer> accumulator = new FailedAccumulatorSerialization<>(TEST_EXCEPTION); + + try { + accumulator.getLocalValue(); + } catch (RuntimeException re) { + assertThat(ExceptionUtils.findThrowableWithMessage(re, TEST_EXCEPTION.getMessage()).isPresent(), is(true)); + } + + try { + accumulator.resetLocal(); + } catch (RuntimeException re) { + assertThat(ExceptionUtils.findThrowableWithMessage(re, TEST_EXCEPTION.getMessage()).isPresent(), is(true)); + } + + try { + accumulator.add(1); + } catch (RuntimeException re) { + assertThat(ExceptionUtils.findThrowableWithMessage(re, TEST_EXCEPTION.getMessage()).isPresent(), is(true)); + } + + try { + accumulator.merge(new IntMinimum()); + } catch (RuntimeException re) { + assertThat(ExceptionUtils.findThrowableWithMessage(re, TEST_EXCEPTION.getMessage()).isPresent(), is(true)); + } + } + + /** + * Tests that the class can be serialized and deserialized using Java serialization. + */ + @Test + public void testSerialization() throws Exception { + final FailedAccumulatorSerialization<?, ?> accumulator = new FailedAccumulatorSerialization<>(TEST_EXCEPTION); + + final byte[] serializedAccumulator = InstantiationUtil.serializeObject(accumulator); + + final FailedAccumulatorSerialization<?, ?> deserializedAccumulator = InstantiationUtil.deserializeObject(serializedAccumulator, ClassLoader.getSystemClassLoader()); + + assertThat(deserializedAccumulator.getThrowable(), is(instanceOf(TEST_EXCEPTION.getClass()))); + assertThat(deserializedAccumulator.getThrowable().getMessage(), is(equalTo(TEST_EXCEPTION.getMessage()))); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index d0b1591..0271913 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; @@ -485,6 +486,19 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme fatalErrorHandler.onFatalError(throwable); } + private void jobReachedGloballyTerminalState(AccessExecutionGraph accessExecutionGraph) { + final JobResult jobResult = JobResult.createFrom(accessExecutionGraph); + + jobExecutionResultCache.put(jobResult); + final JobID jobId = accessExecutionGraph.getJobID(); + + try { + removeJob(jobId, true); + } catch (Exception e) { + log.warn("Could not properly remove job {} from the dispatcher.", jobId, e); + } + } + protected abstract JobManagerRunner createJobManagerRunner( ResourceID resourceId, JobGraph jobGraph, @@ -607,31 +621,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme } @Override - public void jobFinished(JobResult result) { - log.info("Job {} finished.", jobId); + public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) { + log.info("Job {} reached globally terminal state {}.", jobId, executionGraph.getState()); - runAsync(() -> { - jobExecutionResultCache.put(result); - try { - removeJob(jobId, true); - } catch (Exception e) { - log.warn("Could not properly remove job {} from the dispatcher.", jobId, e); - } - }); - } - - @Override - public void jobFailed(JobResult result) { - log.info("Job {} failed.", jobId); - - runAsync(() -> { - jobExecutionResultCache.put(result); - try { - removeJob(jobId, true); - } catch (Exception e) { - log.warn("Could not properly remove job {} from the dispatcher.", jobId, e); - } - }); + runAsync(() -> Dispatcher.this.jobReachedGloballyTerminalState(executionGraph)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java index ede8d13..b90253a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java @@ -25,6 +25,8 @@ import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -34,7 +36,6 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.JobMasterRestEndpoint; -import org.apache.flink.runtime.jobmaster.JobResult; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; @@ -54,7 +55,6 @@ import org.apache.flink.util.ConfigurationException; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.SerializedThrowable; import akka.actor.ActorSystem; @@ -62,8 +62,6 @@ import javax.annotation.Nullable; import java.util.concurrent.Executor; -import static org.apache.flink.util.Preconditions.checkArgument; - /** * Base class for per-job cluster entry points. */ @@ -298,23 +296,16 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint { } @Override - public void jobFinished(JobResult result) { + public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) { LOG.info("Job({}) finished.", jobId); - shutDownAndTerminate(true, ApplicationStatus.SUCCEEDED, null); - } - - @Override - public void jobFailed(JobResult result) { - checkArgument(result.getSerializedThrowable().isPresent()); - - final SerializedThrowable serializedThrowable = result.getSerializedThrowable().get(); - - final String errorMessage = serializedThrowable.getMessage(); + final ErrorInfo errorInfo = executionGraph.getFailureInfo(); - LOG.info("Job({}) failed: {}.", jobId, errorMessage); - - shutDownAndTerminate(true, ApplicationStatus.FAILED, errorMessage); + if (errorInfo == null) { + shutDownAndTerminate(true, ApplicationStatus.SUCCEEDED, null); + } else { + shutDownAndTerminate(true, ApplicationStatus.FAILED, errorInfo.getExceptionAsString()); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java index 8d1fa1d..cc56209 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.ArchivedExecutionConfig; @@ -28,7 +29,6 @@ import org.apache.flink.util.SerializedValue; import javax.annotation.Nullable; -import java.io.IOException; import java.util.Map; /** @@ -103,7 +103,7 @@ public interface AccessExecutionGraph { Iterable<? extends AccessExecutionVertex> getAllExecutionVertices(); /** - * Returns the timestamp for the given {@link JobStatus} + * Returns the timestamp for the given {@link JobStatus}. * * @param status status for which the timestamp should be returned * @return timestamp for the given job status @@ -154,9 +154,8 @@ public interface AccessExecutionGraph { * Returns a map containing the serialized values of user-defined accumulators. * * @return map containing serialized values of user-defined accumulators - * @throws IOException indicates that the serialization has failed */ - Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException; + Map<String, SerializedValue<Object>> getAccumulatorsSerialized(); /** * Returns whether this execution graph was archived. http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java index 20c2c8f..d285b20 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.ArchivedExecutionConfig; @@ -30,12 +31,17 @@ import org.apache.flink.util.SerializedValue; import javax.annotation.Nullable; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +/** + * An archived execution graph represents a serializable form of the {@link ExecutionGraph}. + */ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializable { private static final long serialVersionUID = 7231383912742578428L; @@ -47,10 +53,10 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl /** The name of the original job graph. */ private final String jobName; - /** All job vertices that are part of this graph */ + /** All job vertices that are part of this graph. */ private final Map<JobVertexID, ArchivedExecutionJobVertex> tasks; - /** All vertices, in the order in which they were created **/ + /** All vertices, in the order in which they were created. **/ private final List<ArchivedExecutionJobVertex> verticesInCreationOrder; /** @@ -65,7 +71,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl // ------ Execution status and progress. These values are volatile, and accessed under the lock ------- - /** Current status of the job execution */ + /** Current status of the job execution. */ private final JobStatus state; /** @@ -142,6 +148,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl return state; } + @Nullable @Override public ErrorInfo getFailureInfo() { return failureCause; @@ -253,12 +260,10 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl private int currPos; - public AllVerticesIterator(Iterator<ArchivedExecutionJobVertex> jobVertices) { this.jobVertices = jobVertices; } - @Override public boolean hasNext() { while (true) { @@ -291,4 +296,48 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl throw new UnsupportedOperationException(); } } + + /** + * Create a {@link ArchivedExecutionGraph} from the given {@link ExecutionGraph}. + * + * @param executionGraph to create the ArchivedExecutionGraph from + * @return ArchivedExecutionGraph created from the given ExecutionGraph + */ + public static ArchivedExecutionGraph createFrom(ExecutionGraph executionGraph) { + final int numberVertices = executionGraph.getTotalNumberOfVertices(); + + Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<>(numberVertices); + List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>(numberVertices); + + for (ExecutionJobVertex task : executionGraph.getVerticesTopologically()) { + ArchivedExecutionJobVertex archivedTask = task.archive(); + archivedVerticesInCreationOrder.add(archivedTask); + archivedTasks.put(task.getJobVertexId(), archivedTask); + } + + final Map<String, SerializedValue<Object>> serializedUserAccumulators = executionGraph.getAccumulatorsSerialized(); + + final long[] timestamps = new long[JobStatus.values().length]; + + for (JobStatus jobStatus : JobStatus.values()) { + final int ordinal = jobStatus.ordinal(); + timestamps[ordinal] = executionGraph.getStatusTimestamp(jobStatus); + } + + return new ArchivedExecutionGraph( + executionGraph.getJobID(), + executionGraph.getJobName(), + archivedTasks, + archivedVerticesInCreationOrder, + timestamps, + executionGraph.getState(), + executionGraph.getFailureInfo(), + executionGraph.getJsonPlan(), + executionGraph.getAccumulatorResultsStringified(), + serializedUserAccumulators, + executionGraph.getArchivedExecutionConfig(), + executionGraph.isStoppable(), + executionGraph.getCheckpointCoordinatorConfiguration(), + executionGraph.getCheckpointStatsSnapshot()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java index 9fe569f..311effb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java @@ -34,7 +34,6 @@ public class ErrorInfo implements Serializable { private final SerializedThrowable exception; private final long timestamp; - public ErrorInfo(Throwable exception, long timestamp) { Preconditions.checkNotNull(exception); Preconditions.checkArgument(timestamp > 0); @@ -48,7 +47,7 @@ public class ErrorInfo implements Serializable { * Returns the serialized form of the original exception. */ public SerializedThrowable getException() { - return this.exception; + return exception; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index d187faa..188f6da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -19,12 +19,12 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.Archiveable; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.accumulators.AccumulatorHelper; +import org.apache.flink.api.common.accumulators.FailedAccumulatorSerialization; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.JobException; @@ -51,7 +51,6 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; -import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -62,6 +61,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.StateBackend; @@ -148,7 +148,7 @@ import static org.apache.flink.util.Preconditions.checkState; * local failover (meaning there is a concurrent global failover), the failover strategy has to * yield before the global failover. */ -public class ExecutionGraph implements AccessExecutionGraph, Archiveable<ArchivedExecutionGraph> { +public class ExecutionGraph implements AccessExecutionGraph { /** In place updater for the execution graph's current state. Avoids having to use an * AtomicReference and thus makes the frequent read access a bit faster */ @@ -761,16 +761,27 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive /** * Gets a serialized accumulator map. * @return The accumulator map with serialized accumulator values. - * @throws IOException */ @Override - public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException { + public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() { Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators(); Map<String, SerializedValue<Object>> result = new HashMap<>(accumulatorMap.size()); for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) { - result.put(entry.getKey(), new SerializedValue<>(entry.getValue().getLocalValue())); + + try { + final SerializedValue<Object> serializedValue = new SerializedValue<>(entry.getValue().getLocalValue()); + result.put(entry.getKey(), serializedValue); + } catch (IOException ioe) { + LOG.error("Could not serialize accumulator " + entry.getKey() + '.', ioe); + + try { + result.put(entry.getKey(), new SerializedValue<>(new FailedAccumulatorSerialization(ioe))); + } catch (IOException e) { + throw new RuntimeException("It should never happen that we cannot serialize the accumulator serialization exception.", e); + } + } } return result; @@ -1687,40 +1698,4 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } } } - - @Override - public ArchivedExecutionGraph archive() { - Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<>(verticesInCreationOrder.size()); - List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>(verticesInCreationOrder.size()); - - for (ExecutionJobVertex task : verticesInCreationOrder) { - ArchivedExecutionJobVertex archivedTask = task.archive(); - archivedVerticesInCreationOrder.add(archivedTask); - archivedTasks.put(task.getJobVertexId(), archivedTask); - } - - Map<String, SerializedValue<Object>> serializedUserAccumulators; - try { - serializedUserAccumulators = getAccumulatorsSerialized(); - } catch (Exception e) { - LOG.warn("Error occurred while archiving user accumulators.", e); - serializedUserAccumulators = Collections.emptyMap(); - } - - return new ArchivedExecutionGraph( - getJobID(), - getJobName(), - archivedTasks, - archivedVerticesInCreationOrder, - stateTimestamps, - getState(), - failureInfo, - getJsonPlan(), - getAccumulatorResultsStringified(), - serializedUserAccumulators, - getArchivedExecutionConfig(), - isStoppable(), - getCheckpointCoordinatorConfiguration(), - getCheckpointStatsSnapshot()); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java index 149ea0f..66ca4ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.jobmanager; -import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; /** * Interface for completion actions once a Flink job has reached @@ -27,18 +27,11 @@ import org.apache.flink.runtime.jobmaster.JobResult; public interface OnCompletionActions { /** - * Job finished successfully. + * Job reached a globally terminal state. * - * @param result of the job execution + * @param executionGraph serializable execution graph */ - void jobFinished(JobResult result); - - /** - * Job failed with an exception. - * - * @param result The result of the job carrying the failure cause. - */ - void jobFailed(JobResult result); + void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph); /** * Job was finished by another JobMaster. http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index 4833cbd..5af6c67 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; @@ -251,30 +252,14 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F * Job completion notification triggered by JobManager. */ @Override - public void jobFinished(JobResult result) { + public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) { try { unregisterJobFromHighAvailability(); shutdownInternally(); } finally { if (toNotifyOnComplete != null) { - toNotifyOnComplete.jobFinished(result); - } - } - } - - /** - * Job completion notification triggered by JobManager. - */ - @Override - public void jobFailed(JobResult result) { - try { - unregisterJobFromHighAvailability(); - shutdownInternally(); - } - finally { - if (toNotifyOnComplete != null) { - toNotifyOnComplete.jobFailed(result); + toNotifyOnComplete.jobReachedGloballyTerminalState(executionGraph); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 46518eb..ef99d52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -34,13 +34,13 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; -import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; @@ -100,8 +100,6 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.SerializedThrowable; -import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; @@ -787,7 +785,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast @Override public CompletableFuture<AccessExecutionGraph> requestArchivedExecutionGraph(Time timeout) { - return CompletableFuture.completedFuture(executionGraph.archive()); + return CompletableFuture.completedFuture(ArchivedExecutionGraph.createFrom(executionGraph)); } @Override @@ -989,60 +987,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast @Nullable final Throwable error) { validateRunsInMainThread(); - final JobID jobID = executionGraph.getJobID(); - final String jobName = executionGraph.getJobName(); - final JobResult.Builder builder = new JobResult.Builder() - .jobId(jobID) - .netRuntime(0); - if (newJobStatus.isGloballyTerminalState()) { - switch (newJobStatus) { - case FINISHED: - try { - // TODO get correct job duration - // job done, let's get the accumulators - final Map<String, SerializedValue<Object>> accumulatorsSerialized = executionGraph.getAccumulatorsSerialized(); - builder.accumulatorResults(accumulatorsSerialized); - executor.execute(() -> jobCompletionActions.jobFinished(builder.build())); - } - catch (Exception e) { - log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e); - - final JobExecutionException exception = new JobExecutionException(jobID, - "Failed to retrieve accumulator results. " + - "The job is registered as 'FINISHED (successful), but this notification describes " + - "a failure, since the resulting accumulators could not be fetched.", e); - - executor.execute(() -> jobCompletionActions.jobFailed(builder - .serializedThrowable(new SerializedThrowable(exception)) - .build())); - } - break; - - case CANCELED: { - final JobExecutionException exception = new JobExecutionException( - jobID, "Job was cancelled.", new Exception("The job was cancelled")); - - executor.execute(() -> jobCompletionActions.jobFailed(builder - .serializedThrowable(new SerializedThrowable(exception)) - .build())); - break; - } - - case FAILED: { - final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader); - final JobExecutionException exception = new JobExecutionException( - jobID, "Job execution failed.", unpackedError); - executor.execute(() -> jobCompletionActions.jobFailed(builder - .serializedThrowable(new SerializedThrowable(exception)) - .build())); - break; - } - - default: - // this can happen only if the enum is buggy - throw new IllegalStateException(newJobStatus.toString()); - } + final ArchivedExecutionGraph archivedExecutionGraph = ArchivedExecutionGraph.createFrom(executionGraph); + executor.execute(() -> jobCompletionActions.jobReachedGloballyTerminalState(archivedExecutionGraph)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java index 4a409d5..5a5e713 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java @@ -21,6 +21,9 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.SerializedValue; @@ -138,4 +141,38 @@ public class JobResult implements Serializable { } } + /** + * Creates the {@link JobResult} from the given {@link AccessExecutionGraph} which + * must be in a globally terminal state. + * + * @param accessExecutionGraph to create the JobResult from + * @return JobResult of the given AccessExecutionGraph + */ + public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) { + final JobID jobId = accessExecutionGraph.getJobID(); + final JobStatus jobStatus = accessExecutionGraph.getState(); + + checkArgument( + jobStatus.isGloballyTerminalState(), + "The job " + accessExecutionGraph.getJobName() + '(' + jobId + ") is not in a globally " + + "terminal state. It is in state " + jobStatus + '.'); + + final JobResult.Builder builder = new JobResult.Builder(); + builder.jobId(jobId); + + final long netRuntime = accessExecutionGraph.getStatusTimestamp(jobStatus) - accessExecutionGraph.getStatusTimestamp(JobStatus.CREATED); + builder.netRuntime(netRuntime); + builder.accumulatorResults(accessExecutionGraph.getAccumulatorsSerialized()); + + if (jobStatus != JobStatus.FINISHED) { + final ErrorInfo errorInfo = accessExecutionGraph.getFailureInfo(); + + if (errorInfo != null) { + builder.serializedThrowable(errorInfo.getException()); + } + } + + return builder.build(); + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java index b9d76da..7e49cef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -361,12 +362,7 @@ public class MiniClusterJobDispatcher { } @Override - public void jobFinished(JobResult result) { - decrementCheckAndCleanup(); - } - - @Override - public void jobFailed(JobResult result) { + public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) { decrementCheckAndCleanup(); } @@ -414,16 +410,8 @@ public class MiniClusterJobDispatcher { } @Override - public void jobFinished(JobResult result) { - this.result = result; - jobMastersToWaitFor.countDown(); - } - - @Override - public void jobFailed(JobResult result) { - checkArgument(result.getSerializedThrowable().isPresent()); - - this.result = result; + public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) { + this.result = JobResult.createFrom(executionGraph); jobMastersToWaitFor.countDown(); } http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java index 70b9d35..63dc604 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java @@ -71,7 +71,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExcep ErrorInfo rootException = executionGraph.getFailureInfo(); String rootExceptionMessage = null; Long rootTimestamp = null; - if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { + if (rootException != null) { rootExceptionMessage = rootException.getExceptionAsString(); rootTimestamp = rootException.getTimestamp(); } http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java index a6bae86..7b1487c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java @@ -93,7 +93,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { // most important is the root failure cause ErrorInfo rootException = graph.getFailureInfo(); - if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { + if (rootException != null) { gen.writeStringField("root-exception", rootException.getExceptionAsString()); gen.writeNumberField("timestamp", rootException.getTimestamp()); } http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 325e955..b795609 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1727,7 +1727,10 @@ class JobManager( }(context.dispatcher)) try { - archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg.archive())) + archive ! decorateMessage( + ArchiveExecutionGraph( + jobID, + ArchivedExecutionGraph.createFrom(eg))) } catch { case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t) } http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java index b5fcd18..76a3c98 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java @@ -26,11 +26,14 @@ import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; @@ -45,6 +48,7 @@ import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.NoOpMetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcUtils; @@ -54,7 +58,6 @@ import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.testutils.category.Flip6; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.TestLogger; import org.junit.After; @@ -270,11 +273,13 @@ public class DispatcherTest extends TestLogger { final JobID failedJobId = new JobID(); onCompletionActions = dispatcher.new DispatcherOnCompleteActions(failedJobId); - onCompletionActions.jobFailed(new JobResult.Builder() - .jobId(failedJobId) - .serializedThrowable(new SerializedThrowable(new RuntimeException("expected"))) - .netRuntime(Long.MAX_VALUE) - .build()); + final ArchivedExecutionGraph failedExecutionGraph = new ArchivedExecutionGraphBuilder() + .setJobID(failedJobId) + .setState(JobStatus.FAILED) + .setFailureCause(new ErrorInfo(new RuntimeException("expected"), 1L)) + .build(); + + onCompletionActions.jobReachedGloballyTerminalState(failedExecutionGraph); assertThat( dispatcherGateway.isJobExecutionResultPresent(failedJobId, TIMEOUT).get(), @@ -288,10 +293,12 @@ public class DispatcherTest extends TestLogger { final JobID successJobId = new JobID(); onCompletionActions = dispatcher.new DispatcherOnCompleteActions(successJobId); - onCompletionActions.jobFinished(new JobResult.Builder() - .jobId(successJobId) - .netRuntime(Long.MAX_VALUE) - .build()); + final ArchivedExecutionGraph succeededExecutionGraph = new ArchivedExecutionGraphBuilder() + .setJobID(successJobId) + .setState(JobStatus.FINISHED) + .build(); + + onCompletionActions.jobReachedGloballyTerminalState(succeededExecutionGraph); assertThat( dispatcherGateway.isJobExecutionResultPresent(successJobId, TIMEOUT).get(), http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 5635763..8bc5170 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -142,14 +142,14 @@ public class ArchivedExecutionGraphTest extends TestLogger { @Test public void testArchive() throws IOException, ClassNotFoundException { - ArchivedExecutionGraph archivedGraph = runtimeGraph.archive(); + ArchivedExecutionGraph archivedGraph = ArchivedExecutionGraph.createFrom(runtimeGraph); compareExecutionGraph(runtimeGraph, archivedGraph); } @Test public void testSerialization() throws IOException, ClassNotFoundException { - ArchivedExecutionGraph archivedGraph = runtimeGraph.archive(); + ArchivedExecutionGraph archivedGraph = ArchivedExecutionGraph.createFrom(runtimeGraph); verifySerializability(archivedGraph); } http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java index 245ea27..a69235a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java @@ -24,20 +24,23 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.BlobStore; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.testutils.category.Flip6; -import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.TestLogger; import org.junit.After; @@ -176,7 +179,7 @@ public class JobManagerRunnerMockTest extends TestLogger { assertTrue(!jobCompletion.isJobFinished()); // runner been told by JobManager that job is finished - runner.jobFinished(mock(JobResult.class)); + runner.jobReachedGloballyTerminalState(mock(ArchivedExecutionGraph.class)); assertTrue(jobCompletion.isJobFinished()); assertFalse(jobCompletion.isJobFinishedByOther()); @@ -195,11 +198,13 @@ public class JobManagerRunnerMockTest extends TestLogger { verify(jobManager).start(eq(jobMasterId), any(Time.class)); assertTrue(!jobCompletion.isJobFinished()); + final ArchivedExecutionGraph failedExecutionGraph = new ArchivedExecutionGraphBuilder() + .setFailureCause(new ErrorInfo(new Exception("failed manually"), 1L)) + .setState(JobStatus.FAILED) + .build(); + // runner been told by JobManager that job is failed - runner.jobFailed(new JobResult.Builder() - .jobId(new JobID()) - .serializedThrowable(new SerializedThrowable(new Exception("failed manually"))) - .build()); + runner.jobReachedGloballyTerminalState(failedExecutionGraph); assertTrue(jobCompletion.isJobFailed()); verify(leaderElectionService).stop(); @@ -250,19 +255,15 @@ public class JobManagerRunnerMockTest extends TestLogger { private volatile boolean finishedByOther; @Override - public void jobFinished(JobResult result) { + public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) { checkArgument(!isJobFinished(), "job finished already"); checkArgument(!isJobFailed(), "job failed already"); - this.result = result; - } - - @Override - public void jobFailed(JobResult result) { - checkArgument(!isJobFinished(), "job finished already"); - checkArgument(!isJobFailed(), "job failed already"); + this.result = JobResult.createFrom(executionGraph); - this.failedCause = result.getSerializedThrowable().get(); + if (!result.isSuccess()) { + failedCause = result.getSerializedThrowable().get(); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index d77a1d4..d20e1fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; @@ -265,12 +266,7 @@ public class JobMasterTest extends TestLogger { private static final class NoOpOnCompletionActions implements OnCompletionActions { @Override - public void jobFinished(final JobResult result) { - - } - - @Override - public void jobFailed(final JobResult result) { + public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) { } http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java index 0e96f36..7381a59 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -42,7 +43,7 @@ import static org.mockito.Mockito.mock; /** * Tests for the JobExceptionsHandler. */ -public class JobExceptionsHandlerTest { +public class JobExceptionsHandlerTest extends TestLogger { @Test public void testArchiver() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java index 68077ba..ee7ceda 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java @@ -117,9 +117,13 @@ public class ArchivedExecutionGraphBuilder { } public ArchivedExecutionGraph build() { - Preconditions.checkNotNull(tasks, "Tasks must not be null."); JobID jobID = this.jobID != null ? this.jobID : new JobID(); String jobName = this.jobName != null ? this.jobName : "job_" + RANDOM.nextInt(); + + if (tasks == null) { + tasks = Collections.emptyMap(); + } + return new ArchivedExecutionGraph( jobID, jobName,
