Repository: flink
Updated Branches:
  refs/heads/master d52d006c6 -> 7b1857d84


[FLINK-5499] [JobManager] Reuse the resource location of prior execution 
attempt in allocating slot

This closes #3125


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2e107b1c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2e107b1c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2e107b1c

Branch: refs/heads/master
Commit: 2e107b1cfaa6e31fe478191c74aa25d53ab49943
Parents: d52d006
Author: 淘江 <[email protected]>
Authored: Mon Jan 16 17:28:19 2017 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Fri Feb 3 10:28:22 2017 +0100

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionVertex.java    | 17 +++++++++++++++++
 .../apache/flink/runtime/instance/SlotPool.java    |  4 ++--
 2 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2e107b1c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6084ad6..d840d89 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -264,6 +264,23 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                }
        }
 
+       /**
+        * Just return the last assigned resource location if found
+        *
+        * @return The collection of TaskManagerLocation
+        */
+       public List<TaskManagerLocation> getPriorAssignedResourceLocations() {
+               List<TaskManagerLocation> list = new ArrayList<>();
+               for (int i = priorExecutions.size() - 1 ; i >= 0 ; i--) {
+                       Execution prior = priorExecutions.get(i) ;
+                       if (prior.getAssignedResourceLocation() != null) {
+                               list.add(prior.getAssignedResourceLocation());
+                               break;
+                       }
+               }
+               return list;
+       }
+
        EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() {
                synchronized (priorExecutions) {
                        return new EvictingBoundedList<>(priorExecutions);

http://git-wip-us.apache.org/repos/asf/flink/blob/2e107b1c/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 2a9aca7..6fac3c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -984,8 +984,8 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
                @Override
                public Future<SimpleSlot> allocateSlot(ScheduledUnit task, 
boolean allowQueued) {
-                       return gateway.allocateSlot(
-                                       task, ResourceProfile.UNKNOWN, 
Collections.<TaskManagerLocation>emptyList(), timeout);
+                       return gateway.allocateSlot(task, 
ResourceProfile.UNKNOWN,
+                                       
task.getTaskToExecute().getVertex().getPriorAssignedResourceLocations(), 
timeout);
                }
        }
 

Reply via email to