Repository: apex-core Updated Branches: refs/heads/release-3.3 232eba368 -> 8b359fc58
APEXCORE-624 decrement unallocated containers and also released containers so that the exit condition for the shutdown check is satisfied. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8b359fc5 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8b359fc5 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8b359fc5 Branch: refs/heads/release-3.3 Commit: 8b359fc58188012f96b1f2b827987fe67c98d74c Parents: 232eba3 Author: Sanjay Pujare <[email protected]> Authored: Fri Jan 27 10:35:09 2017 -0800 Committer: Vlad Rozov <[email protected]> Committed: Fri Feb 24 17:31:32 2017 -0800 ---------------------------------------------------------------------- .../stram/StreamingAppMasterService.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/8b359fc5/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 3b2c4de..087d6d5 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -770,6 +770,7 @@ public class StreamingAppMasterService extends CompositeService for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) { if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) { StreamingContainerAgent.ContainerStartRequest csr = entry.getKey(); + LOG.debug("Request for container {} timed out. Re-requesting container", csr.container); removedContainerRequests.add(entry.getValue().getRight()); ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false); entry.getValue().setLeft(loopCounter); @@ -779,7 +780,7 @@ public class StreamingAppMasterService extends CompositeService } } - /* Remove nodes from blacklist after timeout */ + /* Remove nodes from blacklist after timeout */ long currentTime = System.currentTimeMillis(); List<String> blacklistRemovals = new ArrayList<String>(); for (Iterator<Pair<Long, List<String>>> it = blacklistedNodesQueueWithTimeStamp.iterator(); it.hasNext();) { @@ -797,7 +798,7 @@ public class StreamingAppMasterService extends CompositeService } numTotalContainers += containerRequests.size(); - numRequestedContainers += containerRequests.size(); + numRequestedContainers += containerRequests.size() - removedContainerRequests.size(); AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers); if (amResp.getAMCommand() != null) { LOG.info(" statement executed:{}", amResp.getAMCommand()); @@ -836,7 +837,7 @@ public class StreamingAppMasterService extends CompositeService LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(), allocatedContainer.getPriority()); releasedContainers.add(allocatedContainer.getId()); numReleasedContainers++; - numRequestedContainers++; + numRequestedContainers--; continue; } if (csr != null) { @@ -964,7 +965,8 @@ public class StreamingAppMasterService extends CompositeService appDone = true; } - LOG.debug("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" + numTotalContainers + ", requested=" + numRequestedContainers + ", released=" + numReleasedContainers + ", completed=" + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated=" + allocatedContainers.size()); + LOG.debug("Current application state: loop={}, appDone={}, total={}, requested={}, released={}, completed={}, failed={}, currentAllocated={}, dnmgr.containerStartRequests={}", + loopCounter, appDone, numTotalContainers, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests); // monitor child containers dnmgr.monitorHeartbeat(); @@ -1038,16 +1040,14 @@ public class StreamingAppMasterService extends CompositeService private AllocateResponse sendContainerAskToRM(List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests, List<ContainerId> releasedContainers) throws YarnException, IOException { if (removedContainerRequests.size() > 0) { - LOG.info(" Removing container request: " + removedContainerRequests); + LOG.debug("Removing container request: {}", removedContainerRequests); for (ContainerRequest cr : removedContainerRequests) { - LOG.info("Removed container: {}", cr.toString()); amRmClient.removeContainerRequest(cr); } } if (containerRequests.size() > 0) { - LOG.info("Asking RM for containers: " + containerRequests); + LOG.debug("Asking RM for containers: {}", containerRequests); for (ContainerRequest cr : containerRequests) { - LOG.info("Requested container: {}", cr.toString()); amRmClient.addContainerRequest(cr); } }
