[hotfix] Minor code cleanups in the ExecutionGraph's Execution
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10e4e321 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10e4e321 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10e4e321 Branch: refs/heads/master Commit: 10e4e321b335b6f9376501f90715e31b71b02da8 Parents: 2e107b1 Author: Stephan Ewen <[email protected]> Authored: Tue Jan 31 17:21:36 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Fri Feb 3 10:28:23 2017 +0100 ---------------------------------------------------------------------- .../flink/runtime/executiongraph/Execution.java | 24 ++++++++------------ .../taskmanager/TaskManagerLocation.java | 2 +- 2 files changed, 10 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/10e4e321/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 18a4445..c2fe5ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -49,6 +49,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleResponse; import org.apache.flink.runtime.state.TaskStateHandles; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; + import org.slf4j.Logger; import java.util.ArrayList; @@ -70,6 +71,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED; import static org.apache.flink.runtime.execution.ExecutionState.RUNNING; import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; /** * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery, @@ -112,7 +114,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution private final Time timeout; - private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors; + private final ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> partialInputChannelDeploymentDescriptors; private volatile ExecutionState state = CREATED; @@ -120,8 +122,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution private volatile Throwable failureCause; // once assigned, never changes - private volatile TaskManagerLocation assignedResourceLocation; // for the archived execution - private TaskStateHandles taskStateHandles; /** The executor which is used to execute futures. */ @@ -189,7 +189,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution @Override public TaskManagerLocation getAssignedResourceLocation() { - return assignedResourceLocation; + // returns non-null only when a location is already assigned + return assignedResource != null ? assignedResource.getTaskManagerLocation() : null; } public Throwable getFailureCause() { @@ -226,11 +227,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution * @param checkpointStateHandles all checkpointed operator state */ public void setInitialState(TaskStateHandles checkpointStateHandles) { - - if (state != ExecutionState.CREATED) { - throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED"); - } - + checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED"); this.taskStateHandles = checkpointStateHandles; } @@ -343,7 +340,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution throw new JobException("Could not assign the ExecutionVertex to the slot " + slot); } this.assignedResource = slot; - this.assignedResourceLocation = slot.getTaskManagerLocation(); // race double check, did we fail/cancel and do we need to release the slot? if (this.state != DEPLOYING) { @@ -353,7 +349,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution if (LOG.isInfoEnabled()) { LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getSimpleName(), - attemptNumber, assignedResourceLocation.getHostname())); + attemptNumber, getAssignedResourceLocation().getHostname())); } final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( @@ -373,12 +369,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution @Override public Void apply(Throwable failure) { if (failure instanceof TimeoutException) { - String taskname = vertex.getTaskName() + '(' + - (getParallelSubtaskIndex() + 1) + '/' + - vertex.getTotalNumberOfParallelSubtasks() + ") (" + attemptId + ')'; + String taskname = vertex.getTaskNameWithSubtaskIndex()+ " (" + attemptId + ')'; markFailed(new Exception( - "Cannot deploy task " + taskname + " - TaskManager (" + assignedResourceLocation + "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation() + ") not responding after a timeout of " + timeout, failure)); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/10e4e321/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java index 01d0654..956a2a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java @@ -219,7 +219,7 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav @Override public int compareTo(@Nonnull TaskManagerLocation o) { - // decide based on address first + // decide based on resource ID first int resourceIdCmp = this.resourceID.getResourceIdString().compareTo(o.resourceID.getResourceIdString()); if (resourceIdCmp != 0) { return resourceIdCmp;
