Repository: flink Updated Branches: refs/heads/master d52d006c6 -> 7b1857d84
[FLINK-5499] [JobManager] Reuse the resource location of prior execution attempt in allocating slot This closes #3125 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2e107b1c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2e107b1c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2e107b1c Branch: refs/heads/master Commit: 2e107b1cfaa6e31fe478191c74aa25d53ab49943 Parents: d52d006 Author: æ·æ± <[email protected]> Authored: Mon Jan 16 17:28:19 2017 +0800 Committer: Stephan Ewen <[email protected]> Committed: Fri Feb 3 10:28:22 2017 +0100 ---------------------------------------------------------------------- .../runtime/executiongraph/ExecutionVertex.java | 17 +++++++++++++++++ .../apache/flink/runtime/instance/SlotPool.java | 4 ++-- 2 files changed, 19 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2e107b1c/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 6084ad6..d840d89 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -264,6 +264,23 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi } } + /** + * Just return the last assigned resource location if found + * + * @return The collection of TaskManagerLocation + */ + public List<TaskManagerLocation> getPriorAssignedResourceLocations() { + List<TaskManagerLocation> list = new ArrayList<>(); + for (int i = priorExecutions.size() - 1 ; i >= 0 ; i--) { + Execution prior = priorExecutions.get(i) ; + if (prior.getAssignedResourceLocation() != null) { + list.add(prior.getAssignedResourceLocation()); + break; + } + } + return list; + } + EvictingBoundedList<Execution> getCopyOfPriorExecutionsList() { synchronized (priorExecutions) { return new EvictingBoundedList<>(priorExecutions); http://git-wip-us.apache.org/repos/asf/flink/blob/2e107b1c/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 2a9aca7..6fac3c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -984,8 +984,8 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { @Override public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) { - return gateway.allocateSlot( - task, ResourceProfile.UNKNOWN, Collections.<TaskManagerLocation>emptyList(), timeout); + return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, + task.getTaskToExecute().getVertex().getPriorAssignedResourceLocations(), timeout); } }
