Repository: apex-core
Updated Branches:
  refs/heads/master 04a352b3e -> 9d6408ea4


APEXCORE-663 Restart of the App was failing because the containers allocation 
was not handled. There are 2 schenarios 2 handle
 1. AppMaster was restarted
 2. App was restarted

In case of 1: Compare the list of running containers returned by the YARN and 
Streaming Container Manager containers and take the appropriate actions.

In case of 2 (Also when YARN returns empty list of running containers) : 
Redeploy the whole App.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/9d6408ea
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/9d6408ea
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/9d6408ea

Branch: refs/heads/master
Commit: 9d6408ea4c6e26df489a5e5dd1ecfdb9afca6d42
Parents: 04a352b
Author: Sandesh Hegde <[email protected]>
Authored: Tue Mar 7 23:43:24 2017 -0800
Committer: Sandesh Hegde <[email protected]>
Committed: Mon Mar 27 19:16:00 2017 -0700

----------------------------------------------------------------------
 .../stram/StreamingAppMasterService.java        | 53 +++++++++++++++++---
 .../stram/StreamingContainerManager.java        | 24 +++++++++
 2 files changed, 70 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/9d6408ea/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index a885a49..b15c98f 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -27,6 +27,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -720,7 +721,6 @@ public class StreamingAppMasterService extends 
CompositeService
 
     int loopCounter = -1;
     long nodeReportUpdateTime = 0;
-    List<ContainerId> releasedContainers = new ArrayList<>();
 
     // keep track of already requested containers to not request them again 
while waiting for allocation
     int numRequestedContainers = 0;
@@ -761,7 +761,7 @@ public class StreamingAppMasterService extends 
CompositeService
     // Running containers might take a while to register with the new app 
master and send the heartbeat signal.
     int waitForRecovery = containers.size() > 0 ? 
dag.getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS) / 1000 : 0;
 
-    previouslyAllocatedContainers(containers);
+    List<ContainerId> releasedContainers = 
previouslyAllocatedContainers(containers);
     FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
     final InetSocketAddress rmAddress = 
conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
         YarnConfiguration.DEFAULT_RM_ADDRESS,
@@ -1089,13 +1089,52 @@ public class StreamingAppMasterService extends 
CompositeService
    * Check for containers that were allocated in a previous attempt.
    * If the containers are still alive, wait for them to check in via 
heartbeat.
    */
-  private void previouslyAllocatedContainers(List<Container> containers)
+  private List<ContainerId> previouslyAllocatedContainers(List<Container> 
containersListByYarn)
   {
-    for (Container container : containers) {
-      this.allocatedContainers.put(container.getId().toString(), new 
AllocatedContainer(container));
-      //check the status
-      nmClient.getContainerStatusAsync(container.getId(), 
container.getNodeId());
+    List<ContainerId> containersToRelease = new ArrayList<>();
+
+    if (containersListByYarn.size() != 0) {
+
+      LOG.debug("Containers list by YARN - {}", containersListByYarn);
+      LOG.debug("Containers list by Streaming Container Manger - {}", 
dnmgr.getPhysicalPlan().getContainers());
+
+      Map<String, Container> fromYarn = new HashMap<>();
+
+      for (Container container : containersListByYarn) {
+        fromYarn.put(container.getId().toString(), container);
+      }
+
+      for (PTContainer ptContainer : dnmgr.getPhysicalPlan().getContainers()) {
+
+        String containerId = ptContainer.getExternalId();
+
+        // SCM starts the container without external ID.
+        if (containerId == null) {
+          continue;
+        }
+
+        Container container = fromYarn.get(containerId);
+
+        if (container != null) {
+          allocatedContainers.put(containerId, new 
AllocatedContainer(container));
+          fromYarn.remove(containerId);
+        } else {
+          dnmgr.scheduleContainerRestart(containerId);
+        }
+      }
+
+      for (Container container : fromYarn.values()) {
+        containersToRelease.add(container.getId());
+      }
+
+      if (fromYarn.size() != 0) {
+        LOG.info("Containers list returned by YARN, has the following 
container(s) which are not present in PhysicalPlan {}", fromYarn);
+      }
+    } else {
+      dnmgr.deployAfterRestart();
     }
+
+    return containersToRelease;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-core/blob/9d6408ea/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 070022f..c68df14 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2315,6 +2315,27 @@ public class StreamingContainerManager implements 
PlanContext
     }
   }
 
+  public void deployAfterRestart()
+  {
+    if (startedFromCheckpoint) {
+      try {
+        this.deployChangeInProgress.set(true);
+
+        for (PTContainer c : getPhysicalPlan().getContainers()) {
+          c.setState(PTContainer.State.NEW);
+          requestContainer(c);
+
+          for (PTOperator ptOperator : c.getOperators()) {
+            ptOperator.setState(State.PENDING_DEPLOY);
+          }
+        }
+      } finally {
+        this.deployChangeCnt++;
+        this.deployChangeInProgress.set(false);
+      }
+    }
+  }
+
   @Override
   public void deploy(Set<PTContainer> releaseContainers, 
Collection<PTOperator> undeploy, Set<PTContainer> startContainers, 
Collection<PTOperator> deploy)
   {
@@ -3130,6 +3151,7 @@ public class StreamingContainerManager implements 
PlanContext
             scm.requestContainer(c);
           }
         }
+        scm.startedFromCheckpoint = true;
       }
       scm.recoveryHandler = rh;
       scm.checkpoint();
@@ -3139,6 +3161,8 @@ public class StreamingContainerManager implements 
PlanContext
     }
   }
 
+  private boolean startedFromCheckpoint = false;
+
   private static class FinalVars implements java.io.Serializable
   {
     private static final long serialVersionUID = 3827310557521807024L;

Reply via email to