Repository: apex-core Updated Branches: refs/heads/master 04a352b3e -> 9d6408ea4
APEXCORE-663 Restart of the App was failing because the containers allocation was not handled. There are 2 schenarios 2 handle 1. AppMaster was restarted 2. App was restarted In case of 1: Compare the list of running containers returned by the YARN and Streaming Container Manager containers and take the appropriate actions. In case of 2 (Also when YARN returns empty list of running containers) : Redeploy the whole App. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/9d6408ea Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/9d6408ea Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/9d6408ea Branch: refs/heads/master Commit: 9d6408ea4c6e26df489a5e5dd1ecfdb9afca6d42 Parents: 04a352b Author: Sandesh Hegde <[email protected]> Authored: Tue Mar 7 23:43:24 2017 -0800 Committer: Sandesh Hegde <[email protected]> Committed: Mon Mar 27 19:16:00 2017 -0700 ---------------------------------------------------------------------- .../stram/StreamingAppMasterService.java | 53 +++++++++++++++++--- .../stram/StreamingContainerManager.java | 24 +++++++++ 2 files changed, 70 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/9d6408ea/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index a885a49..b15c98f 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -27,6 +27,7 @@ import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -720,7 +721,6 @@ public class StreamingAppMasterService extends CompositeService int loopCounter = -1; long nodeReportUpdateTime = 0; - List<ContainerId> releasedContainers = new ArrayList<>(); // keep track of already requested containers to not request them again while waiting for allocation int numRequestedContainers = 0; @@ -761,7 +761,7 @@ public class StreamingAppMasterService extends CompositeService // Running containers might take a while to register with the new app master and send the heartbeat signal. int waitForRecovery = containers.size() > 0 ? dag.getValue(LogicalPlan.HEARTBEAT_TIMEOUT_MILLIS) / 1000 : 0; - previouslyAllocatedContainers(containers); + List<ContainerId> releasedContainers = previouslyAllocatedContainers(containers); FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED; final InetSocketAddress rmAddress = conf.getSocketAddr(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS, @@ -1089,13 +1089,52 @@ public class StreamingAppMasterService extends CompositeService * Check for containers that were allocated in a previous attempt. * If the containers are still alive, wait for them to check in via heartbeat. */ - private void previouslyAllocatedContainers(List<Container> containers) + private List<ContainerId> previouslyAllocatedContainers(List<Container> containersListByYarn) { - for (Container container : containers) { - this.allocatedContainers.put(container.getId().toString(), new AllocatedContainer(container)); - //check the status - nmClient.getContainerStatusAsync(container.getId(), container.getNodeId()); + List<ContainerId> containersToRelease = new ArrayList<>(); + + if (containersListByYarn.size() != 0) { + + LOG.debug("Containers list by YARN - {}", containersListByYarn); + LOG.debug("Containers list by Streaming Container Manger - {}", dnmgr.getPhysicalPlan().getContainers()); + + Map<String, Container> fromYarn = new HashMap<>(); + + for (Container container : containersListByYarn) { + fromYarn.put(container.getId().toString(), container); + } + + for (PTContainer ptContainer : dnmgr.getPhysicalPlan().getContainers()) { + + String containerId = ptContainer.getExternalId(); + + // SCM starts the container without external ID. + if (containerId == null) { + continue; + } + + Container container = fromYarn.get(containerId); + + if (container != null) { + allocatedContainers.put(containerId, new AllocatedContainer(container)); + fromYarn.remove(containerId); + } else { + dnmgr.scheduleContainerRestart(containerId); + } + } + + for (Container container : fromYarn.values()) { + containersToRelease.add(container.getId()); + } + + if (fromYarn.size() != 0) { + LOG.info("Containers list returned by YARN, has the following container(s) which are not present in PhysicalPlan {}", fromYarn); + } + } else { + dnmgr.deployAfterRestart(); } + + return containersToRelease; } /** http://git-wip-us.apache.org/repos/asf/apex-core/blob/9d6408ea/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java index 070022f..c68df14 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java @@ -2315,6 +2315,27 @@ public class StreamingContainerManager implements PlanContext } } + public void deployAfterRestart() + { + if (startedFromCheckpoint) { + try { + this.deployChangeInProgress.set(true); + + for (PTContainer c : getPhysicalPlan().getContainers()) { + c.setState(PTContainer.State.NEW); + requestContainer(c); + + for (PTOperator ptOperator : c.getOperators()) { + ptOperator.setState(State.PENDING_DEPLOY); + } + } + } finally { + this.deployChangeCnt++; + this.deployChangeInProgress.set(false); + } + } + } + @Override public void deploy(Set<PTContainer> releaseContainers, Collection<PTOperator> undeploy, Set<PTContainer> startContainers, Collection<PTOperator> deploy) { @@ -3130,6 +3151,7 @@ public class StreamingContainerManager implements PlanContext scm.requestContainer(c); } } + scm.startedFromCheckpoint = true; } scm.recoveryHandler = rh; scm.checkpoint(); @@ -3139,6 +3161,8 @@ public class StreamingContainerManager implements PlanContext } } + private boolean startedFromCheckpoint = false; + private static class FinalVars implements java.io.Serializable { private static final long serialVersionUID = 3827310557521807024L;
