[hotfix] [dist. coordination] Small code cleanups in ExecutionGraph and related 
classes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/874d9565
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/874d9565
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/874d9565

Branch: refs/heads/master
Commit: 874d956561f817a2578d7d7e6686d598323dc4c8
Parents: 69843fe
Author: Stephan Ewen <[email protected]>
Authored: Tue Mar 21 18:12:23 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Wed Mar 29 17:11:49 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |   6 +-
 .../runtime/executiongraph/ExecutionGraph.java  | 140 +++++++++----------
 .../flink/runtime/executiongraph/IOMetrics.java |   2 +
 .../runtime/executiongraph/JobInformation.java  |   8 ++
 .../runtime/taskmanager/TaskExecutionState.java |   3 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |  12 +-
 .../ExecutionGraphMetricsTest.java              |   2 +-
 7 files changed, 87 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index e17a3e5..1a3ef11 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -820,7 +820,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        void sendPartitionInfos() {
                // check if the ExecutionVertex has already been archived and 
thus cleared the
                // partial partition infos queue
-               if(partialInputChannelDeploymentDescriptors != null && 
!partialInputChannelDeploymentDescriptors.isEmpty()) {
+               if (partialInputChannelDeploymentDescriptors != null && 
!partialInputChannelDeploymentDescriptors.isEmpty()) {
 
                        PartialInputChannelDeploymentDescriptor 
partialInputChannelDeploymentDescriptor;
 
@@ -931,7 +931,9 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        }
                        else if (currentState == CANCELING || currentState == 
FAILED) {
                                if (LOG.isDebugEnabled()) {
-                                       LOG.debug(String.format("Concurrent 
canceling/failing of %s while deployment was in progress.", 
getVertexWithAttempt()));
+                                       // this log statement is guarded 
because the 'getVertexWithAttempt()' method
+                                       // performs string concatenations 
+                                       LOG.debug("Concurrent canceling/failing 
of {} while deployment was in progress.", getVertexWithAttempt());
                                }
                                sendCancelRpcCall();
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 6bb3455..e911f49 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.commons.lang3.StringUtils;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfig;
@@ -63,7 +64,6 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
-import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
@@ -92,6 +92,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -131,7 +132,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
 
        /** The lock used to secure all access to mutable fields, especially 
the tracking of progress
         * within the job. */
-       private final SerializableObject progressLock = new 
SerializableObject();
+       private final Object progressLock = new Object();
 
        /** Job specific information like the job id, job name, job 
configuration, etc. */
        private final JobInformation jobInformation;
@@ -222,7 +223,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
 
        /** Checkpoint stats tracker separate from the coordinator in order to 
be
         * available after archiving. */
-       @SuppressWarnings("NonSerializableFieldInSerializableClass")
        private CheckpointStatsTracker checkpointStatsTracker;
 
        // ------ Fields that are only relevant for archived execution graphs 
------------
@@ -235,6 +235,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        /**
         * This constructor is for tests only, because it does not include 
class loading information.
         */
+       @VisibleForTesting
        ExecutionGraph(
                        ScheduledExecutorService futureExecutor,
                        Executor ioExecutor,
@@ -369,24 +370,16 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        CheckpointStatsTracker statsTracker) {
 
                // simple sanity checks
-               if (interval < 10 || checkpointTimeout < 10) {
-                       throw new IllegalArgumentException();
-               }
-               if (state != JobStatus.CREATED) {
-                       throw new IllegalStateException("Job must be in CREATED 
state");
-               }
+               checkArgument(interval >= 10, "checkpoint interval must not be 
below 10ms");
+               checkArgument(checkpointTimeout >= 10, "checkpoint timeout must 
not be below 10ms");
+
+               checkState(state == JobStatus.CREATED, "Job must be in CREATED 
state");
+               checkState(checkpointCoordinator == null, "checkpointing 
already enabled");
 
                ExecutionVertex[] tasksToTrigger = 
collectExecutionVertices(verticesToTrigger);
                ExecutionVertex[] tasksToWaitFor = 
collectExecutionVertices(verticesToWaitFor);
                ExecutionVertex[] tasksToCommitTo = 
collectExecutionVertices(verticesToCommitTo);
 
-               // disable to make sure existing checkpoint coordinators are 
cleared
-               try {
-                       disableSnaphotCheckpointing();
-               } catch (Throwable t) {
-                       LOG.error("Error while shutting down checkpointer.");
-               }
-
                checkpointStatsTracker = checkNotNull(statsTracker, 
"CheckpointStatsTracker");
 
                // create the coordinator that triggers and commits checkpoints 
and holds the state
@@ -416,24 +409,6 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                }
        }
 
-       /**
-        * Disables checkpointing.
-        *
-        * <p>The shutdown of the checkpoint coordinator might block. Make sure 
that calls to this
-        * method don't block the job manager actor and run asynchronously.
-        */
-       public void disableSnaphotCheckpointing() throws Exception {
-               if (state != JobStatus.CREATED) {
-                       throw new IllegalStateException("Job must be in CREATED 
state");
-               }
-
-               if (checkpointCoordinator != null) {
-                       checkpointCoordinator.shutdown(state);
-                       checkpointCoordinator = null;
-                       checkpointStatsTracker = null;
-               }
-       }
-
        @Override
        public CheckpointCoordinator getCheckpointCoordinator() {
                return checkpointCoordinator;
@@ -761,7 +736,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
 
        private void scheduleLazy(SlotProvider slotProvider) throws 
NoResourceAvailableException {
                // simply take the vertices without inputs.
-               for (ExecutionJobVertex ejv : this.tasks.values()) {
+               for (ExecutionJobVertex ejv : verticesInCreationOrder) {
                        if (ejv.getJobVertex().isInputVertex()) {
                                ejv.scheduleAll(slotProvider, 
allowQueuedScheduling);
                        }
@@ -932,9 +907,9 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
        }
 
        public void stop() throws StoppingException {
-               if(this.isStoppable) {
-                       for(ExecutionVertex ev : 
this.getAllExecutionVertices()) {
-                               if(ev.getNumberOfInputs() == 0) { // send 
signal to sources only
+               if (isStoppable) {
+                       for (ExecutionVertex ev : 
this.getAllExecutionVertices()) {
+                               if (ev.getNumberOfInputs() == 0) { // send 
signal to sources only
                                        ev.stop();
                                }
                        }
@@ -1011,7 +986,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                                return;
                        }
 
-                       // no need to treat other states
+                       // else: concurrent change to execution state, retry
                }
        }
 
@@ -1273,35 +1248,47 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
         * @return True, if the task update was properly applied, false, if the 
execution attempt was not found.
         */
        public boolean updateState(TaskExecutionState state) {
-               Execution attempt = this.currentExecutions.get(state.getID());
+               final Execution attempt = currentExecutions.get(state.getID());
+
                if (attempt != null) {
+                       try {
+                               Map<String, Accumulator<?, ?>> accumulators;
+
+                               switch (state.getExecutionState()) {
+                                       case RUNNING:
+                                               return 
attempt.switchToRunning();
+       
+                                       case FINISHED:
+                                               // this deserialization is 
exception-free
+                                               accumulators = 
deserializeAccumulators(state);
+                                               
attempt.markFinished(accumulators, state.getIOMetrics());
+                                               return true;
+       
+                                       case CANCELED:
+                                               // this deserialization is 
exception-free
+                                               accumulators = 
deserializeAccumulators(state);
+                                               
attempt.cancelingComplete(accumulators, state.getIOMetrics());
+                                               return true;
+       
+                                       case FAILED:
+                                               // this deserialization is 
exception-free
+                                               accumulators = 
deserializeAccumulators(state);
+                                               
attempt.markFailed(state.getError(userClassLoader), accumulators, 
state.getIOMetrics());
+                                               return true;
+       
+                                       default:
+                                               // we mark as failed and return 
false, which triggers the TaskManager
+                                               // to remove the task
+                                               attempt.fail(new 
Exception("TaskManager sent illegal state update: " + 
state.getExecutionState()));
+                                               return false;
+                               }
+                       }
+                       catch (Throwable t) {
+                               ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
 
-                       switch (state.getExecutionState()) {
-                               case RUNNING:
-                                       return attempt.switchToRunning();
-                               case FINISHED:
-                                       try {
-                                               Map<String, Accumulator<?, ?>> 
userAccumulators = deserializeAccumulators(state);
-                                               
attempt.markFinished(userAccumulators, state.getIOMetrics());
-                                       }
-                                       catch (Exception e) {
-                                               LOG.error("Failed to 
deserialize final accumulator results.", e);
-                                               attempt.markFailed(e);
-                                       }
-                                       return true;
-                               case CANCELED:
-                                       Map<String, Accumulator<?, ?>> userAcc1 
= deserializeAccumulators(state);
-                                       attempt.cancelingComplete(userAcc1, 
state.getIOMetrics());
-                                       return true;
-                               case FAILED:
-                                       Map<String, Accumulator<?, ?>> userAcc2 
= deserializeAccumulators(state);
-                                       
attempt.markFailed(state.getError(userClassLoader), userAcc2, 
state.getIOMetrics());
-                                       return true;
-                               default:
-                                       // we mark as failed and return false, 
which triggers the TaskManager
-                                       // to remove the task
-                                       attempt.fail(new Exception("TaskManager 
sent illegal state update: " + state.getExecutionState()));
-                                       return false;
+                               // failures during updates leave the 
ExecutionGraph inconsistent
+                               fail(t);
+                               return false;
                        }
                }
                else {
@@ -1309,17 +1296,28 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                }
        }
 
+       /**
+        * Deserializes accumulators from a task state update.
+        * 
+        * <p>This method never throws an exception!
+        * 
+        * @param state The task execution state from which to deserialize the 
accumulators.
+        * @return The deserialized accumulators, of null, if there are no 
accumulators or an error occurred.
+        */
        private Map<String, Accumulator<?, ?>> 
deserializeAccumulators(TaskExecutionState state) {
                AccumulatorSnapshot serializedAccumulators = 
state.getAccumulators();
-               Map<String, Accumulator<?, ?>> accumulators = null;
+
                if (serializedAccumulators != null) {
                        try {
-                               accumulators = 
serializedAccumulators.deserializeUserAccumulators(userClassLoader);
-                       } catch (Exception e) {
-                               LOG.error("Failed to deserialize final 
accumulator results.", e);
+                               return 
serializedAccumulators.deserializeUserAccumulators(userClassLoader);
+                       }
+                       catch (Throwable t) {
+                               // we catch Throwable here to include all form 
of linking errors that may
+                               // occur if user classes are missing in the 
classpath
+                               LOG.error("Failed to deserialize final 
accumulator results.", t);
                        }
                }
-               return accumulators;
+               return null;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
index 82c376e..668418d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -25,7 +25,9 @@ import java.io.Serializable;
  * An instance of this class represents a snapshot of the io-related metrics 
of a single task.
  */
 public class IOMetrics implements Serializable {
+
        private static final long serialVersionUID = -7208093607556457183L;
+
        protected long numRecordsIn;
        protected long numRecordsOut;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
index 6e3c1e8..f497f8c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
@@ -93,4 +93,12 @@ public class JobInformation implements Serializable {
        public Collection<URL> getRequiredClasspathURLs() {
                return requiredClasspathURLs;
        }
+
+       // 
------------------------------------------------------------------------
+
+
+       @Override
+       public String toString() {
+               return "JobInformation for '" + jobName + "' (" + jobId + ')';
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
index 5cc2484..9395435 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionState.java
@@ -49,8 +49,9 @@ public class TaskExecutionState implements Serializable {
 
        private final SerializedThrowable throwable;
 
-       /** Serialized flink and user-defined accumulators */
+       /** Serialized user-defined accumulators */
        private final AccumulatorSnapshot accumulators;
+
        private final IOMetrics ioMetrics;
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 98b4c4d..81162b6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -18,14 +18,11 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -37,7 +34,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.SerializedValue;
-import org.junit.AfterClass;
+
 import org.junit.Test;
 import org.mockito.Matchers;
 
@@ -50,13 +47,6 @@ import static org.mockito.Mockito.verify;
 
 public class ExecutionGraphCheckpointCoordinatorTest {
 
-       private static ActorSystem system = 
AkkaUtils.createLocalActorSystem(new Configuration());
-
-       @AfterClass
-       public static void teardown() {
-               JavaTestKit.shutdownActorSystem(system);
-       }
-
        /**
         * Tests that a shut down checkpoint coordinator calls shutdown on
         * the store and counter.

http://git-wip-us.apache.org/repos/asf/flink/blob/874d9565/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 203c547..5496e35 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -162,7 +162,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                                testingRestartStrategy,
                                Collections.<BlobKey>emptyList(),
                                Collections.<URL>emptyList(),
-                       scheduler,
+                               scheduler,
                                getClass().getClassLoader(),
                                metricGroup);
        

Reply via email to