SLIDER-82 support anti-affinity: this is the original submission with some minor tweaks and reformatting
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/12893b96 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/12893b96 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/12893b96 Branch: refs/heads/feature/SLIDER-82_ANTI_AFFINITY_REQUIRED Commit: 12893b96bc2e95e1b1ef1aba2ecf39b1330b6a32 Parents: ad0be55 Author: Steve Loughran <[email protected]> Authored: Mon Sep 14 19:08:06 2015 +0100 Committer: Steve Loughran <[email protected]> Committed: Mon Sep 14 19:08:06 2015 +0100 ---------------------------------------------------------------------- pom.xml | 2 +- .../providers/agent/AgentProviderService.java | 5 ++ .../slideram/SliderAMProviderService.java | 5 ++ .../server/appmaster/SliderAppMaster.java | 7 +- .../operations/AsyncRMOperationHandler.java | 11 +++ .../operations/ContainerRequestOperation.java | 72 +++++++++++++++++++- .../ProviderNotifyingOperationHandler.java | 6 ++ .../operations/RMOperationHandlerActions.java | 8 +++ .../slider/server/appmaster/state/AppState.java | 27 ++++++-- .../server/appmaster/state/RoleHistory.java | 40 ++++++++++- .../model/mock/MockProviderService.groovy | 5 ++ .../model/mock/MockRMOperationHandler.groovy | 5 ++ 12 files changed, 181 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 7f8ff5e..98390e5 100644 --- a/pom.xml +++ b/pom.xml @@ -132,7 +132,7 @@ <!-- core artifacts --> - <hadoop.version>2.6.0</hadoop.version> + <hadoop.version>2.7.1</hadoop.version> <hbase.version>0.99.0</hbase.version> <accumulo.version>1.7.0</accumulo.version> http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java index e3dc791..71421bd 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java @@ -2823,4 +2823,9 @@ public class AgentProviderService extends AbstractProviderService implements ""); } } + + @Override + public void updateBlacklist(List<String> blacklistAdditions, + List<String> blacklistRemovals) { + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java index cee7a97..e1dd920 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java +++ b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java @@ -187,4 +187,9 @@ public class SliderAMProviderService extends AbstractProviderService implements throw new IOException(e); } } + + @Override + public void updateBlacklist(List<String> blacklistAdditions, + List<String> blacklistRemovals) { + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 019ec71..aeb9753 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -2283,7 +2283,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService return false; } } - + + @Override + public void updateBlacklist(List<String> blacklistAdditions, + List<String> blacklistRemovals) { + } + /** * This is the main entry point for the service launcher. * @param args command line arguments. http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java index 11afc0e..0329696 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java @@ -63,6 +63,7 @@ public class AsyncRMOperationHandler extends RMOperationHandler { * @param count count to cancel * @return number of requests cancelled */ + @SuppressWarnings("unchecked") protected int cancelSinglePriorityRequests(Priority priority, int count) { List<Collection<AMRMClient.ContainerRequest>> requestSets = @@ -88,6 +89,7 @@ public class AsyncRMOperationHandler extends RMOperationHandler { } @Override + @SuppressWarnings("unchecked") public void cancelSingleRequest(AMRMClient.ContainerRequest request) { // a single release client.removeContainerRequest(request); @@ -103,6 +105,15 @@ public class AsyncRMOperationHandler extends RMOperationHandler { @Override @SuppressWarnings("unchecked") public void addContainerRequest(AMRMClient.ContainerRequest req) { + log.debug("addContainerRequest(): Request = {}, getCapability() = {}, getNodes() = {}", + req, req.getCapability(), req.getNodes()); client.addContainerRequest(req); } + + @Override + public void updateBlacklist(List<String> blacklistAdditions, + List<String> blacklistRemovals) { + client.updateBlacklist(blacklistAdditions, blacklistRemovals); + } + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java index b8120ca..d847360 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java @@ -18,23 +18,93 @@ package org.apache.slider.server.appmaster.operations; +import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.slider.server.appmaster.state.ContainerPriority; +import org.apache.slider.server.appmaster.state.NodeInstance; +import org.apache.slider.server.appmaster.state.RoleHistory; +import org.apache.slider.server.appmaster.state.RoleStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; + +/** + * A container request operation + */ public class ContainerRequestOperation extends AbstractRMOperation { + protected static final Logger log = + LoggerFactory.getLogger(ContainerRequestOperation.class); private final AMRMClient.ContainerRequest request; + private List<String> blacklistAdditions; + private List<String> blacklistRemovals; - public ContainerRequestOperation(AMRMClient.ContainerRequest request) { + /** + * Build a request + * @param request request to use + * @param role role (or null) + * @param activeNodesForRole list of active nodes for this role. Must be set if role is not null. + * @param failedNodesForRole list of failed nodes for this role. Must be set if role is not null. + */ + public ContainerRequestOperation(AMRMClient.ContainerRequest request, + RoleStatus role, List<NodeInstance> activeNodesForRole, + List<String> failedNodesForRole) { + blacklistAdditions = new ArrayList<>(); + blacklistRemovals = new ArrayList<>(); + if (role != null) { + Preconditions.checkArgument(activeNodesForRole != null, "Null activeNodesForRole"); + Preconditions.checkArgument(failedNodesForRole != null, "Null failedNodesForRole"); + log.info("ContainerRequestOperation(): Role: {} , Request {}", role.getName(), request); + if (role.isAntiAffinePlacement()) { + for (NodeInstance nit1 : activeNodesForRole) { + log.info("ContainerRequestOperation(): add to blacklist nodes - Role: {}, Node: {}", + role.getName(), nit1.hostname); + blacklistAdditions.add(nit1.hostname); + } + } + blacklistAdditions.addAll(failedNodesForRole); + } this.request = request; } + /** + * Create a request with no blacklisting/affinity information + * + * @param request request to issue + */ + public ContainerRequestOperation(AMRMClient.ContainerRequest request) { + this(request, null, null, null); + } + + /** + * Get the underlying request + * @return + */ public AMRMClient.ContainerRequest getRequest() { return request; } + /** + * Get the current blacklist additions + * @return the list of additions + */ + public List<String> getBlacklistAdditions() { + return blacklistAdditions; + } + + /** + * get the current blacklist removals + * @return the list of removals + */ + public List<String> getBlacklistRemovals() { + return blacklistRemovals; + } + @Override public void execute(RMOperationHandlerActions handler) { + handler.updateBlacklist(blacklistAdditions, blacklistRemovals); handler.addContainerRequest(request); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java index 184a36a..232a797 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ProviderNotifyingOperationHandler.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.slider.providers.ProviderService; +import java.util.List; public class ProviderNotifyingOperationHandler extends RMOperationHandler { @@ -52,4 +53,9 @@ public class ProviderNotifyingOperationHandler extends RMOperationHandler { public void cancelSingleRequest(AMRMClient.ContainerRequest request) { providerService.cancelSingleRequest(request); } + + @Override + public void updateBlacklist(List<String> blacklistAdditions, + List<String> blacklistRemovals) { + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java index 594ee47..7915ab4 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/RMOperationHandlerActions.java @@ -52,4 +52,12 @@ public interface RMOperationHandlerActions { * @param operations ops */ void execute(List<AbstractRMOperation> operations); + + /** + * Update Blacklist operation + * @param blacklistAdditions possibly null list of additions to the blacklist + * @param blacklistRemovals possibly null list of nodes to remove from the blacklist + */ + void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals); + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 18eb578..b632626 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -92,8 +92,8 @@ import static org.apache.slider.api.ResourceKeys.YARN_CORES; import static org.apache.slider.api.ResourceKeys.YARN_LABEL_EXPRESSION; import static org.apache.slider.api.ResourceKeys.YARN_MEMORY; import static org.apache.slider.api.RoleKeys.ROLE_FAILED_INSTANCES; -import static org.apache.slider.api.RoleKeys.ROLE_FAILED_STARTING_INSTANCES; import static org.apache.slider.api.RoleKeys.ROLE_FAILED_RECENTLY_INSTANCES; +import static org.apache.slider.api.RoleKeys.ROLE_FAILED_STARTING_INSTANCES; import static org.apache.slider.api.RoleKeys.ROLE_NODE_FAILED_INSTANCES; import static org.apache.slider.api.RoleKeys.ROLE_PREEMPTED_INSTANCES; import static org.apache.slider.api.RoleKeys.ROLE_RELEASING_INSTANCES; @@ -1931,7 +1931,7 @@ public class AppState { if (delta > 0) { log.info("{}: Asking for {} more nodes(s) for a total of {} ", name, delta, expected); - //more workers needed than we have -ask for more + // more workers needed than we have -ask for more for (int i = 0; i < delta; i++) { Resource capability = recordFactory.newResource(); AMRMClient.ContainerRequest containerAsk = @@ -1942,15 +1942,20 @@ public class AppState { if (askMemory > this.containerMaxMemory) { log.warn("Memory requested: {} > max of {}", askMemory, containerMaxMemory); } - operations.add(new ContainerRequestOperation(containerAsk)); + // build a container request including placement and blacklisting data + operations.add(new ContainerRequestOperation(containerAsk, + role, + roleHistory.listActiveNodes(role.getKey()), + roleHistory.cloneFailedNodes() + )); } } else if (delta < 0) { log.info("{}: Asking for {} fewer node(s) for a total of {}", name, -delta, expected); - //reduce the number expected (i.e. subtract the delta) + // reduce the number expected (i.e. subtract the delta) - //then pick some containers to kill + // then pick some containers to kill int excess = -delta; // how many requests are outstanding @@ -2125,6 +2130,8 @@ public class AppState { assignments.clear(); releaseOperations.clear(); List<Container> ordered = roleHistory.prepareAllocationList(allocatedContainers); + roleHistory.resetRequestedNodes(); + log.info("onContainersAllocated(): Total containers allocated = "+ordered.size()); for (Container container : ordered) { String containerHostInfo = container.getNodeId().getHost() + ":" + @@ -2132,8 +2139,13 @@ public class AppState { //get the role final ContainerId cid = container.getId(); final RoleStatus role = lookupRoleStatus(container); - - + log.info("onContainersAllocated(): "+containerHostInfo+", cid= "+cid); + log.debug("onContainersAllocated(): "+role); + if((role.isAntiAffinePlacement())&& (roleHistory.nodeAlreadyRequested(role.getKey(),container.getNodeId().getHost()))){ + releaseOperations.add(new ContainerReleaseOperation(cid)); + log.info("onContainersAllocated() "+cid +" is on already requested node "+container.getNodeId().getHost()+", releasing..."); + continue; + } //dec requested count decrementRequestCount(role); @@ -2186,6 +2198,7 @@ public class AppState { assignments.add(new ContainerAssignment(container, role, outcome)); //add to the history roleHistory.onContainerAssigned(container); + roleHistory.addRequestedNodeForRoleId(role.getKey(), container.getNodeId().getHost()); } } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java index 926d440..fe69cb0 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -93,6 +93,12 @@ public class RoleHistory { private Map<Integer, LinkedList<NodeInstance>> availableNodes; /** + * * Track nodes where requests have been submitted + * + */ + private Map<Integer, LinkedList<String>> requestedNodes; + + /** * Track the failed nodes. Currently used to make wiser decision of container * ask with/without locality. Has other potential uses as well. */ @@ -186,6 +192,37 @@ public class RoleHistory { */ private synchronized void resetAvailableNodeLists() { availableNodes = new HashMap<>(roleSize); + resetRequestedNodes(); + } + + /** + * Clear the list of nodes where request has been made + */ + public void resetRequestedNodes() { + requestedNodes = new HashMap<>(roleSize); + } + + /** + * Track nodes where requests have been submitted + */ + public void addRequestedNodeForRoleId(int id, String hostname) { + LinkedList<String> instances = requestedNodes.get(id); + if (instances == null) { + instances = new LinkedList<>(); + } + instances.add(hostname); + requestedNodes.put(id, instances); + } + + /** + * Check if node is in the requested list + * @param id role ID + * @param hostname host + * @return true if there is an outstanding request for a role on that host + */ + public boolean nodeAlreadyRequested(int id, String hostname) { + LinkedList<String> instances = requestedNodes.get(id); + return instances != null && instances.contains(hostname); } /** @@ -717,8 +754,7 @@ public class RoleHistory { if (desiredCount <= actualCount) { // all outstanding requests have been satisfied // clear all the lists, so returning nodes to the available set - List<NodeInstance> - hosts = outstandingRequests.resetOutstandingRequests(role); + List<NodeInstance> hosts = outstandingRequests.resetOutstandingRequests(role); if (!hosts.isEmpty()) { //add the list log.info("Adding {} hosts for role {}", hosts.size(), role); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy index 44415f4..35983c9 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy @@ -297,4 +297,9 @@ class MockProviderService implements ProviderService { void rebuildContainerDetails(List<Container> liveContainers, String applicationId, Map<Integer, ProviderRole> roleProviderMap) { } + + @Override + void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) { + + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/12893b96/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy index a68ce02..bb42cc1 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockRMOperationHandler.groovy @@ -78,4 +78,9 @@ class MockRMOperationHandler extends RMOperationHandler { releases = 0; requests = 0; } + + @Override + void updateBlacklist(List<String> blacklistAdditions, List<String> blacklistRemovals) { + } + }
