Repository: apex-core
Updated Branches:
  refs/heads/release-3.3 232eba368 -> 8b359fc58


APEXCORE-624 decrement unallocated containers and also released containers so 
that the exit condition for the shutdown check is satisfied.


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/8b359fc5
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/8b359fc5
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/8b359fc5

Branch: refs/heads/release-3.3
Commit: 8b359fc58188012f96b1f2b827987fe67c98d74c
Parents: 232eba3
Author: Sanjay Pujare <[email protected]>
Authored: Fri Jan 27 10:35:09 2017 -0800
Committer: Vlad Rozov <[email protected]>
Committed: Fri Feb 24 17:31:32 2017 -0800

----------------------------------------------------------------------
 .../stram/StreamingAppMasterService.java            | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/8b359fc5/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java 
b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 3b2c4de..087d6d5 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -770,6 +770,7 @@ public class StreamingAppMasterService extends 
CompositeService
         for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, 
MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
           if ((loopCounter - entry.getValue().getKey()) > 
NUMBER_MISSED_HEARTBEATS) {
             StreamingContainerAgent.ContainerStartRequest csr = entry.getKey();
+            LOG.debug("Request for container {} timed out. Re-requesting 
container", csr.container);
             removedContainerRequests.add(entry.getValue().getRight());
             ContainerRequest cr = 
resourceRequestor.createContainerRequest(csr, false);
             entry.getValue().setLeft(loopCounter);
@@ -779,7 +780,7 @@ public class StreamingAppMasterService extends 
CompositeService
         }
       }
 
-     /* Remove nodes from blacklist after timeout */
+      /* Remove nodes from blacklist after timeout */
       long currentTime = System.currentTimeMillis();
       List<String> blacklistRemovals = new ArrayList<String>();
       for (Iterator<Pair<Long, List<String>>> it = 
blacklistedNodesQueueWithTimeStamp.iterator(); it.hasNext();) {
@@ -797,7 +798,7 @@ public class StreamingAppMasterService extends 
CompositeService
       }
 
       numTotalContainers += containerRequests.size();
-      numRequestedContainers += containerRequests.size();
+      numRequestedContainers += containerRequests.size() - 
removedContainerRequests.size();
       AllocateResponse amResp = sendContainerAskToRM(containerRequests, 
removedContainerRequests, releasedContainers);
       if (amResp.getAMCommand() != null) {
         LOG.info(" statement executed:{}", amResp.getAMCommand());
@@ -836,7 +837,7 @@ public class StreamingAppMasterService extends 
CompositeService
           LOG.info("Releasing {} as resource with priority {} was already 
assigned", allocatedContainer.getId(), allocatedContainer.getPriority());
           releasedContainers.add(allocatedContainer.getId());
           numReleasedContainers++;
-          numRequestedContainers++;
+          numRequestedContainers--;
           continue;
         }
         if (csr != null) {
@@ -964,7 +965,8 @@ public class StreamingAppMasterService extends 
CompositeService
         appDone = true;
       }
 
-      LOG.debug("Current application state: loop=" + loopCounter + ", 
appDone=" + appDone + ", total=" + numTotalContainers + ", requested=" + 
numRequestedContainers + ", released=" + numReleasedContainers + ", completed=" 
+ numCompletedContainers + ", failed=" + numFailedContainers + ", 
currentAllocated=" + allocatedContainers.size());
+      LOG.debug("Current application state: loop={}, appDone={}, total={}, 
requested={}, released={}, completed={}, failed={}, currentAllocated={}, 
dnmgr.containerStartRequests={}",
+            loopCounter, appDone, numTotalContainers, numRequestedContainers, 
numReleasedContainers, numCompletedContainers, numFailedContainers, 
allocatedContainers.size(), dnmgr.containerStartRequests);
 
       // monitor child containers
       dnmgr.monitorHeartbeat();
@@ -1038,16 +1040,14 @@ public class StreamingAppMasterService extends 
CompositeService
   private AllocateResponse sendContainerAskToRM(List<ContainerRequest> 
containerRequests, List<ContainerRequest> removedContainerRequests, 
List<ContainerId> releasedContainers) throws YarnException, IOException
   {
     if (removedContainerRequests.size() > 0) {
-      LOG.info(" Removing container request: " + removedContainerRequests);
+      LOG.debug("Removing container request: {}", removedContainerRequests);
       for (ContainerRequest cr : removedContainerRequests) {
-        LOG.info("Removed container: {}", cr.toString());
         amRmClient.removeContainerRequest(cr);
       }
     }
     if (containerRequests.size() > 0) {
-      LOG.info("Asking RM for containers: " + containerRequests);
+      LOG.debug("Asking RM for containers: {}", containerRequests);
       for (ContainerRequest cr : containerRequests) {
-        LOG.info("Requested container: {}", cr.toString());
         amRmClient.addContainerRequest(cr);
       }
     }

Reply via email to