[hotfix] Minor code cleanups in the ExecutionGraph's Execution

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10e4e321
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10e4e321
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10e4e321

Branch: refs/heads/master
Commit: 10e4e321b335b6f9376501f90715e31b71b02da8
Parents: 2e107b1
Author: Stephan Ewen <[email protected]>
Authored: Tue Jan 31 17:21:36 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Feb 3 10:28:23 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 24 ++++++++------------
 .../taskmanager/TaskManagerLocation.java        |  2 +-
 2 files changed, 10 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10e4e321/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 18a4445..c2fe5ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -49,6 +49,7 @@ import 
org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
@@ -70,6 +71,7 @@ import static 
org.apache.flink.runtime.execution.ExecutionState.FINISHED;
 import static org.apache.flink.runtime.execution.ExecutionState.RUNNING;
 import static org.apache.flink.runtime.execution.ExecutionState.SCHEDULED;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A single execution of a vertex. While an {@link ExecutionVertex} can be 
executed multiple times (for recovery,
@@ -112,7 +114,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
        private final Time timeout;
 
-       private ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> 
partialInputChannelDeploymentDescriptors;
+       private final 
ConcurrentLinkedQueue<PartialInputChannelDeploymentDescriptor> 
partialInputChannelDeploymentDescriptors;
 
        private volatile ExecutionState state = CREATED;
 
@@ -120,8 +122,6 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
        private volatile Throwable failureCause;          // once assigned, 
never changes
 
-       private volatile TaskManagerLocation assignedResourceLocation; // for 
the archived execution
-
        private TaskStateHandles taskStateHandles;
 
        /** The executor which is used to execute futures. */
@@ -189,7 +189,8 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
        @Override
        public TaskManagerLocation getAssignedResourceLocation() {
-               return assignedResourceLocation;
+               // returns non-null only when a location is already assigned
+               return assignedResource != null ? 
assignedResource.getTaskManagerLocation() : null;
        }
 
        public Throwable getFailureCause() {
@@ -226,11 +227,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         * @param checkpointStateHandles all checkpointed operator state
         */
        public void setInitialState(TaskStateHandles checkpointStateHandles) {
-
-               if (state != ExecutionState.CREATED) {
-                       throw new IllegalArgumentException("Can only assign 
operator state when execution attempt is in CREATED");
-               }
-
+               checkState(state == CREATED, "Can only assign operator state 
when execution attempt is in CREATED");
                this.taskStateHandles = checkpointStateHandles;
        }
 
@@ -343,7 +340,6 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                throw new JobException("Could not assign the 
ExecutionVertex to the slot " + slot);
                        }
                        this.assignedResource = slot;
-                       this.assignedResourceLocation = 
slot.getTaskManagerLocation();
 
                        // race double check, did we fail/cancel and do we need 
to release the slot?
                        if (this.state != DEPLOYING) {
@@ -353,7 +349,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
                        if (LOG.isInfoEnabled()) {
                                LOG.info(String.format("Deploying %s (attempt 
#%d) to %s", vertex.getSimpleName(),
-                                               attemptNumber, 
assignedResourceLocation.getHostname()));
+                                               attemptNumber, 
getAssignedResourceLocation().getHostname()));
                        }
 
                        final TaskDeploymentDescriptor deployment = 
vertex.createDeploymentDescriptor(
@@ -373,12 +369,10 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                @Override
                                public Void apply(Throwable failure) {
                                        if (failure instanceof 
TimeoutException) {
-                                               String taskname = 
vertex.getTaskName() + '(' +
-                                                       
(getParallelSubtaskIndex() + 1) + '/' +
-                                                       
vertex.getTotalNumberOfParallelSubtasks() + ") (" + attemptId + ')';
+                                               String taskname = 
vertex.getTaskNameWithSubtaskIndex()+ " (" + attemptId + ')';
 
                                                markFailed(new Exception(
-                                                       "Cannot deploy task " + 
taskname + " - TaskManager (" + assignedResourceLocation
+                                                       "Cannot deploy task " + 
taskname + " - TaskManager (" + getAssignedResourceLocation()
                                                                + ") not 
responding after a timeout of " + timeout, failure));
                                        }
                                        else {

http://git-wip-us.apache.org/repos/asf/flink/blob/10e4e321/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
index 01d0654..956a2a2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
@@ -219,7 +219,7 @@ public class TaskManagerLocation implements 
Comparable<TaskManagerLocation>, jav
 
        @Override
        public int compareTo(@Nonnull TaskManagerLocation o) {
-               // decide based on address first
+               // decide based on resource ID first
                int resourceIdCmp = 
this.resourceID.getResourceIdString().compareTo(o.resourceID.getResourceIdString());
                if (resourceIdCmp != 0) {
                        return resourceIdCmp;

Reply via email to