Repository: apex-core
Updated Branches:
  refs/heads/master 05c798d5c -> 51de67e61


APEXCORE-471 Reissue of the resource was failing for the BlackListBased 
scheduler, it happened because RequestedResource was never empty. Issue is 
fixed after making the operation similar to that of ResourceRequestHandler with 
the handling for Blacklist


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/893551b0
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/893551b0
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/893551b0

Branch: refs/heads/master
Commit: 893551b02a5ebe86abbb976dddd6f1d6d87aa2e3
Parents: 3f06ce7
Author: Sandesh Hegde <[email protected]>
Authored: Sat Dec 24 21:25:31 2016 -1000
Committer: Sandesh Hegde <[email protected]>
Committed: Thu Jan 5 12:01:00 2017 -0800

----------------------------------------------------------------------
 .../stram/BlacklistBasedResourceRequestHandler.java | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/893551b0/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
 
b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
index 412f535..53d91a5 100644
--- 
a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
+++ 
b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java
@@ -54,8 +54,13 @@ public class BlacklistBasedResourceRequestHandler extends 
ResourceRequestHandler
   @Override
   public void reissueContainerRequests(AMRMClient<ContainerRequest> 
amRmClient, Map<StreamingContainerAgent.ContainerStartRequest, 
MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, 
ResourceRequestHandler resourceRequestor, List<ContainerRequest> 
containerRequests, List<ContainerRequest> removedContainerRequests)
   {
+    if (!requestedResources.isEmpty()) {
+      // Check if any requests timed out, create new requests in that case
+      recreateContainerRequest(requestedResources, loopCounter, 
resourceRequestor, removedContainerRequests);
+    }
+
     // Issue all host specific requests first
-    if (!hostSpecificRequestsMap.isEmpty() && requestedResources.isEmpty()) {
+    if (!hostSpecificRequestsMap.isEmpty()) {
       LOG.info("Issue Host specific requests first");
       // Blacklist all the nodes and issue request for host specific
       Entry<String, List<ContainerRequest>> set = 
hostSpecificRequestsMap.entrySet().iterator().next();
@@ -74,10 +79,7 @@ public class BlacklistBasedResourceRequestHandler extends 
ResourceRequestHandler
         hostSpecificRequests.remove(cr);
       }
       hostSpecificRequestsMap.remove(set.getKey());
-    } else if (!requestedResources.isEmpty()) {
-      // Check if any requests timed out, create new requests in that case
-      recreateContainerRequest(requestedResources, loopCounter, 
resourceRequestor, removedContainerRequests);
-    } else {
+    }  else {
       if (blacklistedNodesForHostSpecificRequests != null) {
         // Remove the blacklisted nodes during host specific requests
         LOG.debug("All requests done.. Removing nodes from blacklist {}", 
blacklistedNodesForHostSpecificRequests);
@@ -98,7 +100,7 @@ public class BlacklistBasedResourceRequestHandler extends 
ResourceRequestHandler
     }
   }
 
-  public void 
recreateContainerRequest(Map<StreamingContainerAgent.ContainerStartRequest, 
MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, 
ResourceRequestHandler resourceRequestor, List<ContainerRequest> 
removedContainerRequests)
+  private void 
recreateContainerRequest(Map<StreamingContainerAgent.ContainerStartRequest, 
MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, 
ResourceRequestHandler resourceRequestor, List<ContainerRequest> 
removedContainerRequests)
   {
     for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, 
MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) {
       if ((loopCounter - entry.getValue().getKey()) > 
NUMBER_MISSED_HEARTBEATS) {
@@ -126,7 +128,7 @@ public class BlacklistBasedResourceRequestHandler extends 
ResourceRequestHandler
     }
   }
 
-  public void 
addHostSpecificRequest(StreamingContainerAgent.ContainerStartRequest csr, 
ContainerRequest cr)
+  private void 
addHostSpecificRequest(StreamingContainerAgent.ContainerStartRequest csr, 
ContainerRequest cr)
   {
     String hostKey = StringUtils.join(cr.getNodes(), ":");
     List<ContainerRequest> requests;

Reply via email to