[hotfix] [dist. coordination] Small code cleanups in ExecutionGraph and related classes
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/874d9565 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/874d9565 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/874d9565 Branch: refs/heads/master Commit: 874d956561f817a2578d7d7e6686d598323dc4c8 Parents: 69843fe Author: Stephan Ewen <[email protected]> Authored: Tue Mar 21 18:12:23 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Wed Mar 29 17:11:49 2017 +0200 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 6 +- .../runtime/executiongraph/ExecutionGraph.java | 140 +++++++++---------- .../flink/runtime/executiongraph/IOMetrics.java | 2 + .../runtime/executiongraph/JobInformation.java | 8 ++ .../runtime/taskmanager/TaskExecutionState.java | 3 +- ...ExecutionGraphCheckpointCoordinatorTest.java | 12 +- .../ExecutionGraphMetricsTest.java | 2 +- 7 files changed, 87 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index e17a3e5..1a3ef11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -820,7 +820,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution void sendPartitionInfos() { // check if the ExecutionVertex has already been archived and thus cleared the // partial partition infos queue - if(partialInputChannelDeploymentDescriptors != null && !partialInputChannelDeploymentDescriptors.isEmpty()) { + if (partialInputChannelDeploymentDescriptors != null && !partialInputChannelDeploymentDescriptors.isEmpty()) { PartialInputChannelDeploymentDescriptor partialInputChannelDeploymentDescriptor; @@ -931,7 +931,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } else if (currentState == CANCELING || currentState == FAILED) { if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Concurrent canceling/failing of %s while deployment was in progress.", getVertexWithAttempt())); + // this log statement is guarded because the 'getVertexWithAttempt()' method + // performs string concatenations + LOG.debug("Concurrent canceling/failing of {} while deployment was in progress.", getVertexWithAttempt()); } sendCancelRpcCall(); } http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 6bb3455..e911f49 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.commons.lang3.StringUtils; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.Archiveable; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.api.common.ExecutionConfig; @@ -63,7 +64,6 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.taskmanager.TaskExecutionState; -import org.apache.flink.runtime.util.SerializableObject; import org.apache.flink.runtime.util.SerializedThrowable; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; @@ -92,6 +92,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -131,7 +132,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive /** The lock used to secure all access to mutable fields, especially the tracking of progress * within the job. */ - private final SerializableObject progressLock = new SerializableObject(); + private final Object progressLock = new Object(); /** Job specific information like the job id, job name, job configuration, etc. */ private final JobInformation jobInformation; @@ -222,7 +223,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive /** Checkpoint stats tracker separate from the coordinator in order to be * available after archiving. */ - @SuppressWarnings("NonSerializableFieldInSerializableClass") private CheckpointStatsTracker checkpointStatsTracker; // ------ Fields that are only relevant for archived execution graphs ------------ @@ -235,6 +235,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive /** * This constructor is for tests only, because it does not include class loading information. */ + @VisibleForTesting ExecutionGraph( ScheduledExecutorService futureExecutor, Executor ioExecutor, @@ -369,24 +370,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive CheckpointStatsTracker statsTracker) { // simple sanity checks - if (interval < 10 || checkpointTimeout < 10) { - throw new IllegalArgumentException(); - } - if (state != JobStatus.CREATED) { - throw new IllegalStateException("Job must be in CREATED state"); - } + checkArgument(interval >= 10, "checkpoint interval must not be below 10ms"); + checkArgument(checkpointTimeout >= 10, "checkpoint timeout must not be below 10ms"); + + checkState(state == JobStatus.CREATED, "Job must be in CREATED state"); + checkState(checkpointCoordinator == null, "checkpointing already enabled"); ExecutionVertex[] tasksToTrigger = collectExecutionVertices(verticesToTrigger); ExecutionVertex[] tasksToWaitFor = collectExecutionVertices(verticesToWaitFor); ExecutionVertex[] tasksToCommitTo = collectExecutionVertices(verticesToCommitTo); - // disable to make sure existing checkpoint coordinators are cleared - try { - disableSnaphotCheckpointing(); - } catch (Throwable t) { - LOG.error("Error while shutting down checkpointer."); - } - checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker"); // create the coordinator that triggers and commits checkpoints and holds the state @@ -416,24 +409,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } } - /** - * Disables checkpointing. - * - * <p>The shutdown of the checkpoint coordinator might block. Make sure that calls to this - * method don't block the job manager actor and run asynchronously. - */ - public void disableSnaphotCheckpointing() throws Exception { - if (state != JobStatus.CREATED) { - throw new IllegalStateException("Job must be in CREATED state"); - } - - if (checkpointCoordinator != null) { - checkpointCoordinator.shutdown(state); - checkpointCoordinator = null; - checkpointStatsTracker = null; - } - } - @Override public CheckpointCoordinator getCheckpointCoordinator() { return checkpointCoordinator; @@ -761,7 +736,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException { // simply take the vertices without inputs. - for (ExecutionJobVertex ejv : this.tasks.values()) { + for (ExecutionJobVertex ejv : verticesInCreationOrder) { if (ejv.getJobVertex().isInputVertex()) { ejv.scheduleAll(slotProvider, allowQueuedScheduling); } @@ -932,9 +907,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } public void stop() throws StoppingException { - if(this.isStoppable) { - for(ExecutionVertex ev : this.getAllExecutionVertices()) { - if(ev.getNumberOfInputs() == 0) { // send signal to sources only + if (isStoppable) { + for (ExecutionVertex ev : this.getAllExecutionVertices()) { + if (ev.getNumberOfInputs() == 0) { // send signal to sources only ev.stop(); } } @@ -1011,7 +986,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive return; } - // no need to treat other states + // else: concurrent change to execution state, retry } } @@ -1273,35 +1248,47 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive * @return True, if the task update was properly applied, false, if the execution attempt was not found. */ public boolean updateState(TaskExecutionState state) { - Execution attempt = this.currentExecutions.get(state.getID()); + final Execution attempt = currentExecutions.get(state.getID()); + if (attempt != null) { + try { + Map<String, Accumulator<?, ?>> accumulators; + + switch (state.getExecutionState()) { + case RUNNING: + return attempt.switchToRunning(); + + case FINISHED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.markFinished(accumulators, state.getIOMetrics()); + return true; + + case CANCELED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.cancelingComplete(accumulators, state.getIOMetrics()); + return true; + + case FAILED: + // this deserialization is exception-free + accumulators = deserializeAccumulators(state); + attempt.markFailed(state.getError(userClassLoader), accumulators, state.getIOMetrics()); + return true; + + default: + // we mark as failed and return false, which triggers the TaskManager + // to remove the task + attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState())); + return false; + } + } + catch (Throwable t) { + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - switch (state.getExecutionState()) { - case RUNNING: - return attempt.switchToRunning(); - case FINISHED: - try { - Map<String, Accumulator<?, ?>> userAccumulators = deserializeAccumulators(state); - attempt.markFinished(userAccumulators, state.getIOMetrics()); - } - catch (Exception e) { - LOG.error("Failed to deserialize final accumulator results.", e); - attempt.markFailed(e); - } - return true; - case CANCELED: - Map<String, Accumulator<?, ?>> userAcc1 = deserializeAccumulators(state); - attempt.cancelingComplete(userAcc1, state.getIOMetrics()); - return true; - case FAILED: - Map<String, Accumulator<?, ?>> userAcc2 = deserializeAccumulators(state); - attempt.markFailed(state.getError(userClassLoader), userAcc2, state.getIOMetrics()); - return true; - default: - // we mark as failed and return false, which triggers the TaskManager - // to remove the task - attempt.fail(new Exception("TaskManager sent illegal state update: " + state.getExecutionState())); - return false; + // failures during updates leave the ExecutionGraph inconsistent + fail(t); + return false; } } else { @@ -1309,17 +1296,28 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive } } + /** + * Deserializes accumulators from a task state update. + * + * <p>This method never throws an exception! + * + * @param state The task execution state from which to deserialize the accumulators. + * @return The deserialized accumulators, of null, if there are no accumulators or an error occurred. + */ private Map<String, Accumulator<?, ?>> deserializeAccumulators(TaskExecutionState state) { AccumulatorSnapshot serializedAccumulators = state.getAccumulators(); - Map<String, Accumulator<?, ?>> accumulators = null; + if (serializedAccumulators != null) { try { - accumulators = serializedAccumulators.deserializeUserAccumulators(userClassLoader); - } catch (Exception e) { - LOG.error("Failed to deserialize final accumulator results.", e); + return serializedAccumulators.deserializeUserAccumulators(userClassLoader); + } + catch (Throwable t) { + // we catch Throwable here to include all form of linking errors that may + // occur if user classes are missing in the classpath + LOG.error("Failed to deserialize final accumulator results.", t); } } - return accumulators; + return null; } /** http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java index 82c376e..668418d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java @@ -25,7 +25,9 @@ import java.io.Serializable; * An instance of this class represents a snapshot of the io-related metrics of a single task. */ public class IOMetrics implements Serializable { + private static final long serialVersionUID = -7208093607556457183L; + protected long numRecordsIn; protected long numRecordsOut; http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java index 6e3c1e8..f497f8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java @@ -93,4 +93,12 @@ public class JobInformation implements Serializable { public Collection<URL> getRequiredClasspathURLs() { return requiredClasspathURLs; } + + // ------------------------------------------------------------------------ + + + @Override + public String toString() { + return "JobInformation for '" + jobName + "' (" + jobId + ')'; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java index 5cc2484..9395435 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java @@ -49,8 +49,9 @@ public class TaskExecutionState implements Serializable { private final SerializedThrowable throwable; - /** Serialized flink and user-defined accumulators */ + /** Serialized user-defined accumulators */ private final AccumulatorSnapshot accumulators; + private final IOMetrics ioMetrics; /** http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index 98b4c4d..81162b6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -18,14 +18,11 @@ package org.apache.flink.runtime.checkpoint; -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; @@ -37,7 +34,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.SerializedValue; -import org.junit.AfterClass; + import org.junit.Test; import org.mockito.Matchers; @@ -50,13 +47,6 @@ import static org.mockito.Mockito.verify; public class ExecutionGraphCheckpointCoordinatorTest { - private static ActorSystem system = AkkaUtils.createLocalActorSystem(new Configuration()); - - @AfterClass - public static void teardown() { - JavaTestKit.shutdownActorSystem(system); - } - /** * Tests that a shut down checkpoint coordinator calls shutdown on * the store and counter. http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index 203c547..5496e35 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -162,7 +162,7 @@ public class ExecutionGraphMetricsTest extends TestLogger { testingRestartStrategy, Collections.<BlobKey>emptyList(), Collections.<URL>emptyList(), - scheduler, + scheduler, getClass().getClassLoader(), metricGroup);
