Repository: apex-core Updated Branches: refs/heads/release-3.5 66bf590c8 -> bd8f7bade
APEXCORE-624 decrement unallocated containers and released containers so exit condition for shutdown check is satisfied. Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/bd8f7bad Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/bd8f7bad Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/bd8f7bad Branch: refs/heads/release-3.5 Commit: bd8f7bade65f03e7c7729da383a29cd424664f91 Parents: 66bf590 Author: Sanjay Pujare <[email protected]> Authored: Sat Feb 18 12:33:31 2017 -0800 Committer: Vlad Rozov <[email protected]> Committed: Fri Feb 24 17:15:58 2017 -0800 ---------------------------------------------------------------------- .../stram/ResourceRequestHandler.java | 1 + .../stram/StreamingAppMasterService.java | 26 +++++++++----------- 2 files changed, 13 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/bd8f7bad/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java index c56f64f..e7f9672 100644 --- a/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java +++ b/engine/src/main/java/com/datatorrent/stram/ResourceRequestHandler.java @@ -81,6 +81,7 @@ public class ResourceRequestHandler */ if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) { StreamingContainerAgent.ContainerStartRequest csr = entry.getKey(); + LOG.debug("Request for container {} timed out. Re-requesting container", csr.container); removedContainerRequests.add(entry.getValue().getRight()); ContainerRequest cr = resourceRequestor.createContainerRequest(csr, false); entry.getValue().setLeft(loopCounter); http://git-wip-us.apache.org/repos/asf/apex-core/blob/bd8f7bad/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java index 15b6402..3898dbc 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java @@ -705,7 +705,7 @@ public class StreamingAppMasterService extends CompositeService int loopCounter = -1; long nodeReportUpdateTime = 0; List<ContainerId> releasedContainers = new ArrayList<>(); - int numTotalContainers = 0; + // keep track of already requested containers to not request them again while waiting for allocation int numRequestedContainers = 0; int numReleasedContainers = 0; @@ -729,7 +729,7 @@ public class StreamingAppMasterService extends CompositeService dnmgr.shutdownDiagnosticsMessage = String.format("Application master failed due to application %s with duplicate application name \"%s\" by the same user \"%s\" is already started.", ar.getApplicationId().toString(), ar.getName(), ar.getUser()); LOG.info("Forced shutdown due to {}", dnmgr.shutdownDiagnosticsMessage); - finishApplication(FinalApplicationStatus.FAILED, numTotalContainers); + finishApplication(FinalApplicationStatus.FAILED); return; } resourceRequestor.updateNodeReports(clientRMService.getNodeReports()); @@ -829,7 +829,7 @@ public class StreamingAppMasterService extends CompositeService resourceRequestor.reissueContainerRequests(amRmClient, requestedResources, loopCounter, resourceRequestor, containerRequests, removedContainerRequests); - /* Remove nodes from blacklist after timeout */ + /* Remove nodes from blacklist after timeout */ List<String> blacklistRemovals = new ArrayList<>(); for (String hostname : failedBlackListedNodes) { Long timeDiff = currentTimeMillis - failedContainerNodesMap.get(hostname).blackListAdditionTime; @@ -844,8 +844,7 @@ public class StreamingAppMasterService extends CompositeService failedBlackListedNodes.removeAll(blacklistRemovals); } - numTotalContainers += containerRequests.size(); - numRequestedContainers += containerRequests.size(); + numRequestedContainers += containerRequests.size() - removedContainerRequests.size(); AllocateResponse amResp = sendContainerAskToRM(containerRequests, removedContainerRequests, releasedContainers); if (amResp.getAMCommand() != null) { LOG.info(" statement executed:{}", amResp.getAMCommand()); @@ -884,7 +883,7 @@ public class StreamingAppMasterService extends CompositeService LOG.info("Releasing {} as resource with priority {} was already assigned", allocatedContainer.getId(), allocatedContainer.getPriority()); releasedContainers.add(allocatedContainer.getId()); numReleasedContainers++; - numRequestedContainers++; + numRequestedContainers--; continue; } if (csr != null) { @@ -1025,23 +1024,24 @@ public class StreamingAppMasterService extends CompositeService appDone = true; } - LOG.debug("Current application state: loop=" + loopCounter + ", appDone=" + appDone + ", total=" + numTotalContainers + ", requested=" + numRequestedContainers + ", released=" + numReleasedContainers + ", completed=" + numCompletedContainers + ", failed=" + numFailedContainers + ", currentAllocated=" + allocatedContainers.size()); + LOG.debug("Current application state: loop={}, appDone={}, requested={}, released={}, completed={}, failed={}, currentAllocated={}, dnmgr.containerStartRequests={}", + loopCounter, appDone, numRequestedContainers, numReleasedContainers, numCompletedContainers, numFailedContainers, allocatedContainers.size(), dnmgr.containerStartRequests); // monitor child containers dnmgr.monitorHeartbeat(); } - finishApplication(finalStatus, numTotalContainers); + finishApplication(finalStatus); } - private void finishApplication(FinalApplicationStatus finalStatus, int numTotalContainers) throws YarnException, IOException + private void finishApplication(FinalApplicationStatus finalStatus) throws YarnException, IOException { LOG.info("Application completed. Signalling finish to RM"); FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class); finishReq.setFinalApplicationStatus(finalStatus); if (finalStatus != FinalApplicationStatus.SUCCEEDED) { - String diagnostics = "Diagnostics." + ", total=" + numTotalContainers + ", completed=" + numCompletedContainers.get() + ", allocated=" + allocatedContainers.size() + ", failed=" + numFailedContainers.get(); + String diagnostics = "Diagnostics." + " completed=" + numCompletedContainers.get() + ", allocated=" + allocatedContainers.size() + ", failed=" + numFailedContainers.get(); if (!StringUtils.isEmpty(dnmgr.shutdownDiagnosticsMessage)) { diagnostics += "\n"; diagnostics += dnmgr.shutdownDiagnosticsMessage; @@ -1099,16 +1099,14 @@ public class StreamingAppMasterService extends CompositeService private AllocateResponse sendContainerAskToRM(List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests, List<ContainerId> releasedContainers) throws YarnException, IOException { if (removedContainerRequests.size() > 0) { - LOG.info(" Removing container request: " + removedContainerRequests); + LOG.debug("Removing container request: {}", removedContainerRequests); for (ContainerRequest cr : removedContainerRequests) { - LOG.info("Removed container: {}", cr.toString()); amRmClient.removeContainerRequest(cr); } } if (containerRequests.size() > 0) { - LOG.info("Asking RM for containers: " + containerRequests); + LOG.debug("Asking RM for containers: {}", containerRequests); for (ContainerRequest cr : containerRequests) { - LOG.info("Requested container: {} on host: [{}]", cr.toString(), StringUtils.join(cr.getNodes(), ", ")); amRmClient.addContainerRequest(cr); } }
