[hotfix] [jobmanager] Cleanups in the ExecutionGraph - Making fields final where possible - Making fields volatile where needed or advisable - Remove some dead code/functionality
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fe4fe587 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fe4fe587 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fe4fe587 Branch: refs/heads/master Commit: fe4fe5872883f3de362c4d6864b21a66bcbf5d4e Parents: 4820b41 Author: Stephan Ewen <[email protected]> Authored: Tue Jan 31 19:55:59 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Feb 3 10:28:23 2017 +0100 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 33 +++++++++++--------- .../runtime/executiongraph/ExecutionVertex.java | 19 ++--------- .../runtime/jobmanager/scheduler/Scheduler.java | 5 +-- 3 files changed, 24 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index c2fe5ea..e29e5b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -104,8 +104,13 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution // -------------------------------------------------------------------------------------------- + /** The executor which is used to execute futures. */ + private final Executor executor; + + /** The execution vertex whose task this execution executes */ private final ExecutionVertex vertex; + /** The unique ID marking the specific execution instant of the task */ private final ExecutionAttemptID attemptId; private final long[] stateTimestamps; @@ -122,41 +127,39 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution private volatile Throwable failureCause; // once assigned, never changes - private TaskStateHandles taskStateHandles; + /** The handle to the state that the task gets on restore */ + private volatile TaskStateHandles taskState; - /** The executor which is used to execute futures. */ - private Executor executor; + // ------------------------ Accumulators & Metrics ------------------------ - // ------------------------- Accumulators --------------------------------- - - /* Lock for updating the accumulators atomically. Prevents final accumulators to be overwritten - * by partial accumulators on a late heartbeat*/ + /** Lock for updating the accumulators atomically. + * Prevents final accumulators to be overwritten by partial accumulators on a late heartbeat */ private final Object accumulatorLock = new Object(); /* Continuously updated map of user-defined accumulators */ private volatile Map<String, Accumulator<?, ?>> userAccumulators; - private IOMetrics ioMetrics; + + private volatile IOMetrics ioMetrics; // -------------------------------------------------------------------------------------------- - + public Execution( Executor executor, ExecutionVertex vertex, int attemptNumber, long startTimestamp, Time timeout) { - this.executor = checkNotNull(executor); + this.executor = checkNotNull(executor); this.vertex = checkNotNull(vertex); this.attemptId = new ExecutionAttemptID(); + this.timeout = checkNotNull(timeout); this.attemptNumber = attemptNumber; this.stateTimestamps = new long[ExecutionState.values().length]; markTimestamp(ExecutionState.CREATED, startTimestamp); - this.timeout = checkNotNull(timeout); - this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>(); } @@ -217,7 +220,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } public TaskStateHandles getTaskStateHandles() { - return taskStateHandles; + return taskState; } /** @@ -228,7 +231,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution */ public void setInitialState(TaskStateHandles checkpointStateHandles) { checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED"); - this.taskStateHandles = checkpointStateHandles; + this.taskState = checkpointStateHandles; } // -------------------------------------------------------------------------------------------- @@ -355,7 +358,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( attemptId, slot, - taskStateHandles, + taskState, attemptNumber); // register this execution at the execution graph, to receive call backs http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index d840d89..0bb3514 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -77,9 +77,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi private final ExecutionJobVertex jobVertex; - private Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions; + private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions; - private ExecutionEdge[][] inputEdges; + private final ExecutionEdge[][] inputEdges; private final int subTaskIndex; @@ -92,10 +92,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi private volatile CoLocationConstraint locationConstraint; + /** The current or latest execution attempt of this vertex's task */ private volatile Execution currentExecution; // this field must never be null - private volatile boolean scheduleLocalOnly; - // -------------------------------------------------------------------------------------------- public ExecutionVertex( @@ -398,18 +397,6 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi } } - public void setScheduleLocalOnly(boolean scheduleLocalOnly) { - if (scheduleLocalOnly && inputEdges != null && inputEdges.length > 0) { - throw new IllegalArgumentException("Strictly local scheduling is only supported for sources."); - } - - this.scheduleLocalOnly = scheduleLocalOnly; - } - - public boolean isScheduleLocalOnly() { - return scheduleLocalOnly; - } - /** * Gets the location preferences of this task, determined by the locations of the predecessors from which * it receives input data. http://git-wip-us.apache.org/repos/asf/flink/blob/fe4fe587/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index aa09314..466a148 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -153,9 +153,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl return FlinkCompletableFuture.completed((SimpleSlot) ret); } else if (ret instanceof Future) { - return (Future) ret; + return (Future<SimpleSlot>) ret; } else { + // this should never happen, simply guard this case with an exception throw new RuntimeException(); } } @@ -174,7 +175,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl final ExecutionVertex vertex = task.getTaskToExecute().getVertex(); final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations(); - final boolean forceExternalLocation = vertex.isScheduleLocalOnly() && + final boolean forceExternalLocation = false && preferredLocations != null && preferredLocations.iterator().hasNext(); synchronized (globalLock) {
