Repository: incubator-reef Updated Branches: refs/heads/master 8a6e65585 -> 4aeef3e9e
[REEF-632] Register containers with YarnContainerManager on restart This adds `YarnContainerManager.onContainersRecovered()` and calls it on recovering containers in YarnDriverRuntimeRestartManager. JIRA: [REEF-632](https://issues.apache.org/jira/browse/REEF-632) Pull Request: This closes #405 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/4aeef3e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/4aeef3e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/4aeef3e9 Branch: refs/heads/master Commit: 4aeef3e9e4be2c2e60ffb735b6bdd91d040c0642 Parents: 8a6e655 Author: Andrew Chung <[email protected]> Authored: Fri Aug 21 23:29:05 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Mon Aug 24 09:13:31 2015 -0700 ---------------------------------------------------------------------- .../runtime/yarn/driver/YarnContainerManager.java | 10 ++++++++++ .../driver/YarnDriverRuntimeRestartManager.java | 17 +++++++++++++---- 2 files changed, 23 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4aeef3e9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java index 1f51614..a1afe94 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java @@ -199,6 +199,16 @@ final class YarnContainerManager } /** + * Called by {@link YarnDriverRuntimeRestartManager} to record recovered containers + * such that containers can be released properly on unrecoverable containers. + */ + public void onContainersRecovered(final Set<Container> recoveredContainers) { + for (final Container container : recoveredContainers) { + containers.add(container); + } + } + + /** * Submit the given launchContext to the given container. */ void submit(final Container container, final ContainerLaunchContext launchContext) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4aeef3e9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java index 5a51ca0..d076e2d 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java @@ -55,16 +55,20 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta private final EvaluatorPreserver evaluatorPreserver; private final ApplicationMasterRegistration registration; private final REEFEventHandlers reefEventHandlers; + private final YarnContainerManager yarnContainerManager; + private Set<Container> previousContainers; @Inject private YarnDriverRuntimeRestartManager(@Parameter(YarnEvaluatorPreserver.class) final EvaluatorPreserver evaluatorPreserver, final REEFEventHandlers reefEventHandlers, - final ApplicationMasterRegistration registration){ + final ApplicationMasterRegistration registration, + final YarnContainerManager yarnContainerManager) { this.registration = registration; this.evaluatorPreserver = evaluatorPreserver; this.reefEventHandlers = reefEventHandlers; + this.yarnContainerManager = yarnContainerManager; this.previousContainers = null; } @@ -132,12 +136,17 @@ public final class YarnDriverRuntimeRestartManager implements DriverRuntimeResta */ private synchronized void initializeListOfPreviousContainers() { if (this.previousContainers == null) { - this.previousContainers = new HashSet<>(this.registration.getRegistration().getContainersFromPreviousAttempts()); + final List<Container> yarnPrevContainers = + this.registration.getRegistration().getContainersFromPreviousAttempts(); // If it's still null, create an empty list to indicate that it's not a restart. - if (this.previousContainers == null) { - this.previousContainers = new HashSet<>(); + if (yarnPrevContainers == null) { + this.previousContainers = Collections.unmodifiableSet(new HashSet<Container>()); + } else { + this.previousContainers = Collections.unmodifiableSet(new HashSet<>(yarnPrevContainers)); } + + yarnContainerManager.onContainersRecovered(this.previousContainers); } }
