[hotfix] [jobmanager] Cleanups in the ExecutionGraph

  - Making fields final where possible
  - Making fields volatile where needed or advisable
  - Remove some dead code/functionality


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe4fe587
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe4fe587
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe4fe587

Branch: refs/heads/master
Commit: fe4fe5872883f3de362c4d6864b21a66bcbf5d4e
Parents: 4820b41
Author: Stephan Ewen <[email protected]>
Authored: Tue Jan 31 19:55:59 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Feb 3 10:28:23 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 33 +++++++++++---------
 .../runtime/executiongraph/ExecutionVertex.java | 19 ++---------
 .../runtime/jobmanager/scheduler/Scheduler.java |  5 +--
 3 files changed, 24 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c2fe5ea..e29e5b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -104,8 +104,13 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
        // 
--------------------------------------------------------------------------------------------
 
+       /** The executor which is used to execute futures. */
+       private final Executor executor;
+
+       /** The execution vertex whose task this execution executes */ 
        private final ExecutionVertex vertex;
 
+       /** The unique ID marking the specific execution instant of the task */
        private final ExecutionAttemptID attemptId;
 
        private final long[] stateTimestamps;
@@ -122,41 +127,39 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
        private volatile Throwable failureCause;          // once assigned, 
never changes
 
-       private TaskStateHandles taskStateHandles;
+       /** The handle to the state that the task gets on restore */
+       private volatile TaskStateHandles taskState;
 
-       /** The executor which is used to execute futures. */
-       private Executor executor;
+       // ------------------------ Accumulators & Metrics 
------------------------
 
-       // ------------------------- Accumulators 
---------------------------------
-       
-       /* Lock for updating the accumulators atomically. Prevents final 
accumulators to be overwritten
-       * by partial accumulators on a late heartbeat*/
+       /** Lock for updating the accumulators atomically.
+        * Prevents final accumulators to be overwritten by partial 
accumulators on a late heartbeat */
        private final Object accumulatorLock = new Object();
 
        /* Continuously updated map of user-defined accumulators */
        private volatile Map<String, Accumulator<?, ?>> userAccumulators;
-       private IOMetrics ioMetrics;
+
+       private volatile IOMetrics ioMetrics;
 
        // 
--------------------------------------------------------------------------------------------
-       
+
        public Execution(
                        Executor executor,
                        ExecutionVertex vertex,
                        int attemptNumber,
                        long startTimestamp,
                        Time timeout) {
-               this.executor = checkNotNull(executor);
 
+               this.executor = checkNotNull(executor);
                this.vertex = checkNotNull(vertex);
                this.attemptId = new ExecutionAttemptID();
+               this.timeout = checkNotNull(timeout);
 
                this.attemptNumber = attemptNumber;
 
                this.stateTimestamps = new long[ExecutionState.values().length];
                markTimestamp(ExecutionState.CREATED, startTimestamp);
 
-               this.timeout = checkNotNull(timeout);
-
                this.partialInputChannelDeploymentDescriptors = new 
ConcurrentLinkedQueue<>();
        }
 
@@ -217,7 +220,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        }
 
        public TaskStateHandles getTaskStateHandles() {
-               return taskStateHandles;
+               return taskState;
        }
 
        /**
@@ -228,7 +231,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         */
        public void setInitialState(TaskStateHandles checkpointStateHandles) {
                checkState(state == CREATED, "Can only assign operator state 
when execution attempt is in CREATED");
-               this.taskStateHandles = checkpointStateHandles;
+               this.taskState = checkpointStateHandles;
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -355,7 +358,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        final TaskDeploymentDescriptor deployment = 
vertex.createDeploymentDescriptor(
                                attemptId,
                                slot,
-                               taskStateHandles,
+                               taskState,
                                attemptNumber);
 
                        // register this execution at the execution graph, to 
receive call backs

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index d840d89..0bb3514 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -77,9 +77,9 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
 
        private final ExecutionJobVertex jobVertex;
 
-       private Map<IntermediateResultPartitionID, IntermediateResultPartition> 
resultPartitions;
+       private final Map<IntermediateResultPartitionID, 
IntermediateResultPartition> resultPartitions;
 
-       private ExecutionEdge[][] inputEdges;
+       private final ExecutionEdge[][] inputEdges;
 
        private final int subTaskIndex;
 
@@ -92,10 +92,9 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
 
        private volatile CoLocationConstraint locationConstraint;
 
+       /** The current or latest execution attempt of this vertex's task */
        private volatile Execution currentExecution;    // this field must 
never be null
 
-       private volatile boolean scheduleLocalOnly;
-
        // 
--------------------------------------------------------------------------------------------
 
        public ExecutionVertex(
@@ -398,18 +397,6 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                }
        }
 
-       public void setScheduleLocalOnly(boolean scheduleLocalOnly) {
-               if (scheduleLocalOnly && inputEdges != null && 
inputEdges.length > 0) {
-                       throw new IllegalArgumentException("Strictly local 
scheduling is only supported for sources.");
-               }
-
-               this.scheduleLocalOnly = scheduleLocalOnly;
-       }
-
-       public boolean isScheduleLocalOnly() {
-               return scheduleLocalOnly;
-       }
-
        /**
         * Gets the location preferences of this task, determined by the 
locations of the predecessors from which
         * it receives input data.

http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index aa09314..466a148 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -153,9 +153,10 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                        return FlinkCompletableFuture.completed((SimpleSlot) 
ret);
                }
                else if (ret instanceof Future) {
-                       return (Future) ret;
+                       return (Future<SimpleSlot>) ret;
                }
                else {
+                       // this should never happen, simply guard this case 
with an exception
                        throw new RuntimeException();
                }
        }
@@ -174,7 +175,7 @@ public class Scheduler implements InstanceListener, 
SlotAvailabilityListener, Sl
                final ExecutionVertex vertex = 
task.getTaskToExecute().getVertex();
                
                final Iterable<TaskManagerLocation> preferredLocations = 
vertex.getPreferredLocations();
-               final boolean forceExternalLocation = 
vertex.isScheduleLocalOnly() &&
+               final boolean forceExternalLocation = false &&
                                                                        
preferredLocations != null && preferredLocations.iterator().hasNext();
        
                synchronized (globalLock) {

Reply via email to