Repository: incubator-reef
Updated Branches:
  refs/heads/master 8a6e65585 -> 4aeef3e9e


[REEF-632] Register containers with YarnContainerManager on restart

This adds `YarnContainerManager.onContainersRecovered()` and calls it on
recovering containers in YarnDriverRuntimeRestartManager.

JIRA:
  [REEF-632](https://issues.apache.org/jira/browse/REEF-632)

Pull Request:
  This closes #405


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/4aeef3e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/4aeef3e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/4aeef3e9

Branch: refs/heads/master
Commit: 4aeef3e9e4be2c2e60ffb735b6bdd91d040c0642
Parents: 8a6e655
Author: Andrew Chung <[email protected]>
Authored: Fri Aug 21 23:29:05 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Aug 24 09:13:31 2015 -0700

----------------------------------------------------------------------
 .../runtime/yarn/driver/YarnContainerManager.java  | 10 ++++++++++
 .../driver/YarnDriverRuntimeRestartManager.java    | 17 +++++++++++++----
 2 files changed, 23 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4aeef3e9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index 1f51614..a1afe94 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -199,6 +199,16 @@ final class YarnContainerManager
   }
 
   /**
+   * Called by {@link YarnDriverRuntimeRestartManager} to record recovered 
containers
+   * such that containers can be released properly on unrecoverable containers.
+   */
+  public void onContainersRecovered(final Set<Container> recoveredContainers) {
+    for (final Container container : recoveredContainers) {
+      containers.add(container);
+    }
+  }
+
+  /**
    * Submit the given launchContext to the given container.
    */
   void submit(final Container container, final ContainerLaunchContext 
launchContext) {

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/4aeef3e9/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
index 5a51ca0..d076e2d 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnDriverRuntimeRestartManager.java
@@ -55,16 +55,20 @@ public final class YarnDriverRuntimeRestartManager 
implements DriverRuntimeResta
   private final EvaluatorPreserver evaluatorPreserver;
   private final ApplicationMasterRegistration registration;
   private final REEFEventHandlers reefEventHandlers;
+  private final YarnContainerManager yarnContainerManager;
+
   private Set<Container> previousContainers;
 
   @Inject
   private 
YarnDriverRuntimeRestartManager(@Parameter(YarnEvaluatorPreserver.class)
                                           final EvaluatorPreserver 
evaluatorPreserver,
                                           final REEFEventHandlers 
reefEventHandlers,
-                                          final ApplicationMasterRegistration 
registration){
+                                          final ApplicationMasterRegistration 
registration,
+                                          final YarnContainerManager 
yarnContainerManager) {
     this.registration = registration;
     this.evaluatorPreserver = evaluatorPreserver;
     this.reefEventHandlers = reefEventHandlers;
+    this.yarnContainerManager = yarnContainerManager;
     this.previousContainers = null;
   }
 
@@ -132,12 +136,17 @@ public final class YarnDriverRuntimeRestartManager 
implements DriverRuntimeResta
    */
   private synchronized void initializeListOfPreviousContainers() {
     if (this.previousContainers == null) {
-      this.previousContainers = new 
HashSet<>(this.registration.getRegistration().getContainersFromPreviousAttempts());
+      final List<Container> yarnPrevContainers =
+          
this.registration.getRegistration().getContainersFromPreviousAttempts();
 
       // If it's still null, create an empty list to indicate that it's not a 
restart.
-      if (this.previousContainers == null) {
-        this.previousContainers = new HashSet<>();
+      if (yarnPrevContainers == null) {
+        this.previousContainers = Collections.unmodifiableSet(new 
HashSet<Container>());
+      } else {
+        this.previousContainers = Collections.unmodifiableSet(new 
HashSet<>(yarnPrevContainers));
       }
+
+      yarnContainerManager.onContainersRecovered(this.previousContainers);
     }
   }
 

Reply via email to