Repository: apex-core Updated Branches: refs/heads/master 05c798d5c -> 51de67e61
APEXCORE-471 Reissue of the resource was failing for the BlackListBased scheduler, it happened because RequestedResource was never empty. Issue is fixed after making the operation similar to that of ResourceRequestHandler with the handling for Blacklist Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/893551b0 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/893551b0 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/893551b0 Branch: refs/heads/master Commit: 893551b02a5ebe86abbb976dddd6f1d6d87aa2e3 Parents: 3f06ce7 Author: Sandesh Hegde <[email protected]> Authored: Sat Dec 24 21:25:31 2016 -1000 Committer: Sandesh Hegde <[email protected]> Committed: Thu Jan 5 12:01:00 2017 -0800 ---------------------------------------------------------------------- .../stram/BlacklistBasedResourceRequestHandler.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/893551b0/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java index 412f535..53d91a5 100644 --- a/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java +++ b/engine/src/main/java/com/datatorrent/stram/BlacklistBasedResourceRequestHandler.java @@ -54,8 +54,13 @@ public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler @Override public void reissueContainerRequests(AMRMClient<ContainerRequest> amRmClient, Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List<ContainerRequest> containerRequests, List<ContainerRequest> removedContainerRequests) { + if (!requestedResources.isEmpty()) { + // Check if any requests timed out, create new requests in that case + recreateContainerRequest(requestedResources, loopCounter, resourceRequestor, removedContainerRequests); + } + // Issue all host specific requests first - if (!hostSpecificRequestsMap.isEmpty() && requestedResources.isEmpty()) { + if (!hostSpecificRequestsMap.isEmpty()) { LOG.info("Issue Host specific requests first"); // Blacklist all the nodes and issue request for host specific Entry<String, List<ContainerRequest>> set = hostSpecificRequestsMap.entrySet().iterator().next(); @@ -74,10 +79,7 @@ public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler hostSpecificRequests.remove(cr); } hostSpecificRequestsMap.remove(set.getKey()); - } else if (!requestedResources.isEmpty()) { - // Check if any requests timed out, create new requests in that case - recreateContainerRequest(requestedResources, loopCounter, resourceRequestor, removedContainerRequests); - } else { + } else { if (blacklistedNodesForHostSpecificRequests != null) { // Remove the blacklisted nodes during host specific requests LOG.debug("All requests done.. Removing nodes from blacklist {}", blacklistedNodesForHostSpecificRequests); @@ -98,7 +100,7 @@ public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler } } - public void recreateContainerRequest(Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List<ContainerRequest> removedContainerRequests) + private void recreateContainerRequest(Map<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> requestedResources, int loopCounter, ResourceRequestHandler resourceRequestor, List<ContainerRequest> removedContainerRequests) { for (Map.Entry<StreamingContainerAgent.ContainerStartRequest, MutablePair<Integer, ContainerRequest>> entry : requestedResources.entrySet()) { if ((loopCounter - entry.getValue().getKey()) > NUMBER_MISSED_HEARTBEATS) { @@ -126,7 +128,7 @@ public class BlacklistBasedResourceRequestHandler extends ResourceRequestHandler } } - public void addHostSpecificRequest(StreamingContainerAgent.ContainerStartRequest csr, ContainerRequest cr) + private void addHostSpecificRequest(StreamingContainerAgent.ContainerStartRequest csr, ContainerRequest cr) { String hostKey = StringUtils.join(cr.getNodes(), ":"); List<ContainerRequest> requests;
