SLIDER-967 AA placement with labels working. More precisely, the new test is now working; most of the production-side source changes are related to debugging that, checking invariants, improving logging and similar
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/178bd96d Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/178bd96d Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/178bd96d Branch: refs/heads/develop Commit: 178bd96d8772de7046d3510dc640abc2cf25d1e8 Parents: 5a61b4c Author: Steve Loughran <[email protected]> Authored: Fri Nov 13 17:46:54 2015 +0000 Committer: Steve Loughran <[email protected]> Committed: Fri Nov 13 17:46:54 2015 +0000 ---------------------------------------------------------------------- .../slider/api/types/NodeEntryInformation.java | 1 - .../operations/CancelSingleRequest.java | 7 +- .../operations/ContainerReleaseOperation.java | 3 + .../operations/ContainerRequestOperation.java | 4 +- .../slider/server/appmaster/state/AppState.java | 52 ++++-- .../appmaster/state/ContainerPriority.java | 5 +- .../server/appmaster/state/NodeEntry.java | 18 +- .../server/appmaster/state/NodeInstance.java | 16 +- .../slider/server/appmaster/state/NodeMap.java | 13 ++ .../appmaster/state/OutstandingRequest.java | 16 +- .../state/OutstandingRequestTracker.java | 16 +- .../server/appmaster/state/RoleHistory.java | 57 ++++--- .../appstate/TestMockAppStateAAPlacement.groovy | 20 +-- .../appstate/TestMockLabelledAAPlacement.groovy | 168 +++++++++++++++++++ .../model/history/TestRoleHistoryAA.groovy | 52 ++++-- ...tRoleHistoryOutstandingRequestTracker.groovy | 6 +- .../model/mock/BaseMockAppStateTest.groovy | 59 ++++++- .../appmaster/model/mock/MockNodeReport.groovy | 1 + .../apache/slider/test/SliderTestUtils.groovy | 9 + 19 files changed, 428 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/main/java/org/apache/slider/api/types/NodeEntryInformation.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/api/types/NodeEntryInformation.java b/slider-core/src/main/java/org/apache/slider/api/types/NodeEntryInformation.java index 15b57b0..8424be2 100644 --- a/slider-core/src/main/java/org/apache/slider/api/types/NodeEntryInformation.java +++ b/slider-core/src/main/java/org/apache/slider/api/types/NodeEntryInformation.java @@ -28,7 +28,6 @@ import org.codehaus.jackson.map.annotate.JsonSerialize; @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) public class NodeEntryInformation { - /** incrementing counter of instances that failed */ public int failed; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelSingleRequest.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelSingleRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelSingleRequest.java index 08eb5bc..d7673d3 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelSingleRequest.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelSingleRequest.java @@ -18,7 +18,9 @@ package org.apache.slider.server.appmaster.operations; +import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.slider.server.appmaster.state.ContainerPriority; /** * Cancel a container request @@ -28,6 +30,7 @@ public class CancelSingleRequest extends AbstractRMOperation { private final AMRMClient.ContainerRequest request; public CancelSingleRequest(AMRMClient.ContainerRequest request) { + Preconditions.checkArgument(request != null, "Null container request"); this.request = request; } @@ -42,7 +45,9 @@ public class CancelSingleRequest extends AbstractRMOperation { @Override public String toString() { - return "cancel single request for container at " + request.getPriority().toString(); + return "Cancel container request" + + " for :" + ContainerPriority.toString(request.getPriority()) + + " request " + request; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerReleaseOperation.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerReleaseOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerReleaseOperation.java index 46da536..4271d50 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerReleaseOperation.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerReleaseOperation.java @@ -18,13 +18,16 @@ package org.apache.slider.server.appmaster.operations; +import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.slider.server.appmaster.state.ContainerPriority; public class ContainerReleaseOperation extends AbstractRMOperation { private final ContainerId containerId; public ContainerReleaseOperation(ContainerId containerId) { + Preconditions.checkArgument(containerId != null, "Null containerId"); this.containerId = containerId; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java index 6685b2a..e29ddd0 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/ContainerRequestOperation.java @@ -54,7 +54,9 @@ public class ContainerRequestOperation extends AbstractRMOperation { @Override public String toString() { - return "request container for " + ContainerPriority.toString(getPriority()) + return "request container for role " + + ContainerPriority.toString(getPriority()) + + " request " + request + " relaxLocality=" + getRelaxLocality(); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 6f38eb5..4a3cc45 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -1222,14 +1222,28 @@ public class AppState { * @return the container request to submit or null if there is none */ private AMRMClient.ContainerRequest createContainerRequest(RoleStatus role) { - OutstandingRequest request = roleHistory.requestContainerForRole(role); + incrementRequestCount(role); + if (role.isAntiAffinePlacement()) { + return createAAContainerRequest(role); + } else { + return roleHistory.requestContainerForRole(role).getIssuedRequest(); + } + } + /** + * Create a container request. + * Update internal state, such as the role request count. + * Anti-Affine: the {@link RoleStatus#outstandingAArequest} is set here. + * This is where role history information will be used for placement decisions. + * @param role role + * @return the container request to submit or null if there is none + */ + private AMRMClient.ContainerRequest createAAContainerRequest(RoleStatus role) { + OutstandingRequest request = roleHistory.requestContainerForAARole(role); if (request == null) { return null; } incrementRequestCount(role); - if (role.isAntiAffinePlacement()) { - role.setOutstandingAArequest(request); - } + role.setOutstandingAArequest(request); return request.getIssuedRequest(); } @@ -1383,7 +1397,7 @@ public class AppState { RoleInstance starting = getStartingContainers().remove(containerId); if (null == starting) { throw new YarnRuntimeException( - "Container "+ containerId +"%s is already started"); + "Container "+ containerId +" is already started"); } instance.state = STATE_LIVE; RoleStatus roleStatus = lookupRoleStatus(instance.roleId); @@ -1965,7 +1979,7 @@ public class AppState { && !role.isAARequestOutstanding() && roleHistory.canPlaceAANodes()) { // log the number outstanding - AMRMClient.ContainerRequest request = createContainerRequest(role); + AMRMClient.ContainerRequest request = createAAContainerRequest(role); if (request != null) { log.info("Starting an anti-affine request sequence for {} nodes", delta); role.incPendingAntiAffineRequests(delta - 1); @@ -2081,8 +2095,9 @@ public class AppState { * Add a container request if the request is non-null * @param operations operations to add the entry to * @param containerAsk what to ask for + * @return true if a request was added */ - private void addContainerRequest(List<AbstractRMOperation> operations, + private boolean addContainerRequest(List<AbstractRMOperation> operations, AMRMClient.ContainerRequest containerAsk) { if (containerAsk != null) { log.info("Container ask is {} and label = {}", containerAsk, @@ -2092,6 +2107,9 @@ public class AppState { log.warn("Memory requested: {} > max of {}", askMemory, containerMaxMemory); } operations.add(new ContainerRequestOperation(containerAsk)); + return true; + } else { + return false; } } @@ -2208,8 +2226,7 @@ public class AppState { //look for condition where we get more back than we asked if (allocated > desired) { - log.info("Discarding surplus {} container {} on {}", roleName, cid, - containerHostInfo); + log.info("Discarding surplus {} container {} on {}", roleName, cid, containerHostInfo); operations.add(new ContainerReleaseOperation(cid)); //register as a surplus node surplusNodes.add(cid); @@ -2227,23 +2244,26 @@ public class AppState { roleName, cid, nodeId.getHost(), - nodeId.getPort() - ); + nodeId.getPort()); assignments.add(new ContainerAssignment(container, role, outcome)); //add to the history - AbstractRMOperation request = roleHistory.onContainerAssigned(container); - if (request != null) { - operations.add(request); - } + roleHistory.onContainerAssigned(container); // now for AA requests, add some more if (role.isAntiAffinePlacement()) { role.completeOutstandingAARequest(); + // check invariants. The new node must become unavailable. + NodeInstance node = roleHistory.getOrCreateNodeInstance(container); + if (node.canHost(role.getKey(), role.getLabelExpression())) { + log.error("Assigned node still declares as available {}", node.toFullString() ); + } if (role.getPendingAntiAffineRequests() > 0) { // still an outstanding AA request: need to issue a new one. log.info("Asking for next container for AA role {}", roleName); role.decPendingAntiAffineRequests(); - addContainerRequest(operations, createContainerRequest(role)); + if (!addContainerRequest(operations, createAAContainerRequest(role))) { + log.info("No capacity in cluster for new requests"); + } log.debug("Current AA role status {}", role); } else { log.info("AA request sequence completed for role {}", role); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java index 3cc2106..0322f83 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerPriority.java @@ -57,10 +57,9 @@ public final class ContainerPriority { locationSpecified)); return pri; } - - + public static int extractRole(int priority) { - return priority >= NOLOCATION ? priority^NOLOCATION : priority; + return priority >= NOLOCATION ? priority ^ NOLOCATION : priority; } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java index c180f88..cf3881e 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java @@ -38,7 +38,7 @@ import org.apache.slider.api.types.NodeEntryInformation; * <p> * */ -public class NodeEntry { +public class NodeEntry implements Cloneable { public final int rolePriority; @@ -90,7 +90,16 @@ public class NodeEntry { * the number of instances > 1. */ public synchronized boolean isAvailable() { - return getActive() == 0 && requested == 0 && starting == 0; + return live + requested + starting - releasing <= 0; + } + + /** + * Are the anti-affinity constraints held. That is, zero or one + * node running or starting + * @return true if the constraint holds. + */ + public synchronized boolean isAntiAffinityConstraintHeld() { + return (live - releasing + starting) <= 1; } /** @@ -308,4 +317,9 @@ public class NodeEntry { info.lastUsed = lastUsed; return info; } + + @Override + public Object clone() throws CloneNotSupportedException { + return super.clone(); + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java index 8110bff..2b8f01c 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java @@ -18,6 +18,7 @@ package org.apache.slider.server.appmaster.state; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.slider.api.types.NodeInformation; @@ -112,7 +113,6 @@ public class NodeInstance { || newUsable && !this.nodeLabels.equals(labels); } - public String getNodeLabels() { return nodeLabels; } @@ -148,6 +148,15 @@ public class NodeInstance { } /** + * Get the node entry matching a container on this node + * @param container container + * @return matching node instance for the role + */ + public NodeEntry getOrCreate(Container container) { + return getOrCreate(ContainerPriority.extractRole(container)); + } + + /** * Count the number of active role instances on this node * @param role role index * @return 0 if there are none, otherwise the #of nodes that are running and @@ -247,11 +256,12 @@ public class NodeInstance { public String toFullString() { final StringBuilder sb = new StringBuilder(toString()); - int i = 0; + sb.append("{ "); for (NodeEntry entry : nodeEntries) { sb.append(String.format("\n [%02d] ", entry.rolePriority)); sb.append(entry.toString()); } + sb.append("} "); return sb.toString(); } @@ -326,7 +336,7 @@ public class NodeInstance { public boolean canHost(int role, String label) { return isOnline() && (SliderUtils.isUnset(label) || label.equals(nodeLabels)) // label match - && (get(role) == null || get(role).isAvailable()); // no live role + && getOrCreate(role).isAvailable(); // no live role } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java index 23411ca..3858b68 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java @@ -158,4 +158,17 @@ public class NodeMap extends HashMap<String, NodeInstance> { Collections.sort(nodes, new NodeInstance.CompareNames()); return nodes; } + + @Override + public synchronized String toString() { + final StringBuilder sb = new StringBuilder("NodeMap{"); + List<String> keys = new ArrayList<>(keySet()); + Collections.sort(keys); + for (String key : keys) { + sb.append(key).append(": "); + sb.append(get(key).toFullString()).append("\n"); + } + sb.append('}'); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java index e211e7f..129fd4c 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java @@ -220,17 +220,21 @@ public final class OutstandingRequest extends RoleHostnamePair { String nodeLabels; if (isAntiAffine()) { - hosts = new String[nodes.size()]; + int size = nodes.size(); + log.info("Creating anti-affine request across {} nodes; first node = {}", + size, hostname); + hosts = new String[size]; + StringBuilder builder = new StringBuilder(size * 16); int c = 0; for (NodeInstance nodeInstance : nodes) { - hosts[c] = nodeInstance.hostname; - c++; + hosts[c++] = nodeInstance.hostname; + builder.append(nodeInstance.hostname).append(" "); } - log.info("Creating anti-affine request across {} nodes; first node = {}", c, hostname); + log.debug("Full host list: [ {}]", builder); escalated = false; mayEscalate = false; relaxLocality = false; - nodeLabels = label; + nodeLabels = null; } else if (target != null) { // placed request. Hostname is used in request hosts = new String[1]; @@ -385,7 +389,7 @@ public final class OutstandingRequest extends RoleHostnamePair { + " in a single node label expression: " + this); } - // Don't allow specify node label against ANY request + // Don't allow specify node label against ANY request listing hosts or racks if ((containerRequest.getRacks() != null && (!containerRequest.getRacks().isEmpty())) || http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java index 66d201f..64698f2 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java @@ -100,10 +100,18 @@ public class OutstandingRequestTracker { * This does not update the node instance's role's request count * @param role role index * @param nodes list of suitable nodes + * @param label label to use * @return a new request */ - public synchronized OutstandingRequest newAARequest(int role, List<NodeInstance> nodes) { + public synchronized OutstandingRequest newAARequest(int role, + List<NodeInstance> nodes, + String label) { Preconditions.checkArgument(!nodes.isEmpty()); + // safety check to verify the allocation will hold + for (NodeInstance node : nodes) { + Preconditions.checkState(node.canHost(role, label), + "Cannot allocate role ID %d to node %s", role, node); + } OutstandingRequest request = new OutstandingRequest(role, nodes); openRequests.add(request); return request; @@ -155,7 +163,7 @@ public class OutstandingRequestTracker { OutstandingRequest request = placedRequests.remove(new OutstandingRequest(role, hostname)); if (request != null) { //satisfied request - log.debug("Found placed request for container: {}", request); + log.debug("Found oustanding placed request for container: {}", request); request.completed(); // derive outcome from status of tracked request outcome = request.isEscalated() @@ -166,11 +174,11 @@ public class OutstandingRequestTracker { // scan through all containers in the open request list request = removeOpenRequest(container); if (request != null) { - log.debug("Found open request for container: {}", request); + log.debug("Found open outstanding request for container: {}", request); request.completed(); outcome = ContainerAllocationOutcome.Open; } else { - log.warn("No open request found for container {}, outstanding queue has {} entries ", + log.warn("No oustanding request found for container {}, outstanding queue has {} entries ", containerDetails, openRequests.size()); outcome = ContainerAllocationOutcome.Unallocated; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java index 00b5226..4d9781d 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -310,10 +310,9 @@ public class RoleHistory { * Get snapshot of the node map * @return a snapshot of the current node state */ - public Map<String, NodeInformation> getNodeInformationSnapshot() { - NodeMap map = cloneNodemap(); - Map<String, NodeInformation> result = new HashMap<>(map.size()); - for (Map.Entry<String, NodeInstance> entry : map.entrySet()) { + public synchronized Map<String, NodeInformation> getNodeInformationSnapshot() { + Map<String, NodeInformation> result = new HashMap<>(nodemap.size()); + for (Map.Entry<String, NodeInstance> entry : nodemap.entrySet()) { result.put(entry.getKey(), entry.getValue().serialize()); } return result; @@ -630,27 +629,37 @@ public class RoleHistory { */ public synchronized OutstandingRequest requestContainerForRole(RoleStatus role) { - Resource resource = recordFactory.newResource(); - role.copyResourceRequirements(resource); if (role.isAntiAffinePlacement()) { - // if a placement can be found, return it. - List<NodeInstance> nodes = findNodeForNewAAInstance(role); - if (!nodes.isEmpty()) { - OutstandingRequest outstanding - = outstandingRequests.newAARequest(role.getKey(), nodes); - outstanding.buildContainerRequest(resource, role, now()); - return outstanding; - } else { - log.warn("No suitable location for {}", role.getName()); - return null; - } + return requestContainerForAARole(role); } else { + Resource resource = recordFactory.newResource(); + role.copyResourceRequirements(resource); NodeInstance node = findRecentNodeForNewInstance(role); return requestInstanceOnNode(node, role, resource); } } /** + * Find a node for an AA role and request an instance on that (or a location-less + * instance) + * @param role role status + * @return a request ready to go, or null if no location can be found. + */ + public synchronized OutstandingRequest requestContainerForAARole(RoleStatus role) { + List<NodeInstance> nodes = findNodeForNewAAInstance(role); + if (!nodes.isEmpty()) { + OutstandingRequest outstanding = outstandingRequests.newAARequest( + role.getKey(), nodes, role.getLabelExpression()); + Resource resource = recordFactory.newResource(); + role.copyResourceRequirements(resource); + outstanding.buildContainerRequest(resource, role, now()); + return outstanding; + } else { + log.warn("No suitable location for {}", role.getName()); + return null; + } + } + /** * Get the list of active nodes ... walks the node map so * is {@code O(nodes)} * @param role role index @@ -667,8 +676,7 @@ public class RoleHistory { * @throws RuntimeException if the container has no hostname */ public NodeEntry getOrCreateNodeEntry(Container container) { - NodeInstance node = getOrCreateNodeInstance(container); - return node.getOrCreate(ContainerPriority.extractRole(container)); + return getOrCreateNodeInstance(container).getOrCreate(container); } /** @@ -756,10 +764,11 @@ public class RoleHistory { * A container has been assigned to a role instance on a node -update the data structures * @param container container */ - public AbstractRMOperation onContainerAssigned(Container container) { - NodeEntry nodeEntry = getOrCreateNodeEntry(container); + public void onContainerAssigned(Container container) { + NodeInstance node = getOrCreateNodeInstance(container); + NodeEntry nodeEntry = node.getOrCreate(container); nodeEntry.onStarting(); - return null; + log.debug("Node {} has updated NodeEntry {}", node, nodeEntry); } /** @@ -769,9 +778,7 @@ public class RoleHistory { */ public void onContainerStartSubmitted(Container container, RoleInstance instance) { - NodeEntry nodeEntry = getOrCreateNodeEntry(container); - int role = ContainerPriority.extractRole(container); - // any actions we want here + // no actions here } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy index 9a325d7..01ca2f1 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy @@ -84,16 +84,6 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest new MockYarnEngine(NODES, 8) } - /** - * Get the single request of a list of operations; includes the check for the size - * @param ops operations list of size 1 - * @return the request within the first operation - */ - public AMRMClient.ContainerRequest getSingleCancel(List<AbstractRMOperation> ops) { - assert 1 == ops.size() - getCancel(ops, 0) - } - @Test public void testAllocateAANoLabel() throws Throwable { assert cloneNodemap().size() > 0 @@ -154,10 +144,6 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest assertAllContainersAA(); } - protected NodeMap cloneNodemap() { - appState.roleHistory.cloneNodemap() - } - @Test public void testAllocateFlexUp() throws Throwable { // want multiple instances, so there will be iterations @@ -245,7 +231,7 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest } void assertAllContainersAA() { - assertAllContainersAA(Integer.toString(aaRole.key)) + assertAllContainersAA(aaRole.key) } /** @@ -295,8 +281,7 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest } protected AppState.NodeUpdatedOutcome addNewNode() { - NodeReport report = new MockNodeReport("four", NodeState.RUNNING) as NodeReport - appState.onNodesUpdated([report]) + updateNodes(new MockNodeReport("4", NodeState.RUNNING, "gpu")) } @Test @@ -314,5 +299,4 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest assert 1 == appState.reviewRequestAndReleaseNodes().size() } - } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockLabelledAAPlacement.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockLabelledAAPlacement.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockLabelledAAPlacement.groovy new file mode 100644 index 0000000..790a80e --- /dev/null +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockLabelledAAPlacement.groovy @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.appmaster.model.appstate + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import org.apache.hadoop.yarn.api.records.Container +import org.apache.hadoop.yarn.api.records.NodeState +import org.apache.slider.providers.PlacementPolicy +import org.apache.slider.providers.ProviderRole +import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest +import org.apache.slider.server.appmaster.model.mock.MockFactory +import org.apache.slider.server.appmaster.model.mock.MockNodeReport +import org.apache.slider.server.appmaster.model.mock.MockRoles +import org.apache.slider.server.appmaster.model.mock.MockYarnEngine +import org.apache.slider.server.appmaster.operations.AbstractRMOperation +import org.apache.slider.server.appmaster.state.AppState +import org.apache.slider.server.appmaster.state.AppStateBindingInfo +import org.apache.slider.server.appmaster.state.RoleStatus +import org.junit.Test + +/** + * Test Anti-affine placement + */ +@CompileStatic +@Slf4j +class TestMockLabelledAAPlacement extends BaseMockAppStateTest + implements MockRoles { + + /** + * Patch up a "role2" role to have anti-affinity set and the label of GPU + */ + public static final ProviderRole AAROLE = new ProviderRole( + MockRoles.ROLE2, + 2, + PlacementPolicy.ANTI_AFFINITY_REQUIRED, + 2, + 2, + "gpu") + + RoleStatus aaRole + private int NODES = 3 + private int GPU_NODES = 2 + private String HOST0 = "00000000" + private String HOST1 = "00000001" + + @Override + AppStateBindingInfo buildBindingInfo() { + def bindingInfo = super.buildBindingInfo() + bindingInfo.roles = [ + MockFactory.PROVIDER_ROLE0, + MockFactory.PROVIDER_ROLE1, + AAROLE, + ] + bindingInfo + } + + @Override + void setup() { + super.setup() + aaRole = lookupRole(AAROLE.name) + // node 1 is GPU + + updateNodes(new MockNodeReport(HOST0, NodeState.RUNNING, "gpu")) + updateNodes(new MockNodeReport(HOST1, NodeState.RUNNING, "gpu")) + } + + @Override + MockYarnEngine createYarnEngine() { + new MockYarnEngine(NODES, 8) + } + + void assertAllContainersAA() { + assertAllContainersAA(aaRole.key) + } + + /** + * + * @throws Throwable + */ + @Test + public void testAskForTooMany() throws Throwable { + + describe("Ask for 1 more than the no of available nodes;" + + " expect the final request to be unsatisfied until the cluster changes size") + //more than expected + int size = GPU_NODES + aaRole.desired = size + 1 + + List<AbstractRMOperation > operations = appState.reviewRequestAndReleaseNodes() + assert aaRole.AARequestOutstanding + + assert aaRole.pendingAntiAffineRequests == size + for (int i = 0; i < size; i++) { + def iter = "Iteration $i role = $aaRole" + describe iter + List<AbstractRMOperation > operationsOut = [] + + def roleInstances = submitOperations(operations, [], operationsOut) + // one instance per request + assert 1 == roleInstances.size() + appState.onNodeManagerContainerStarted(roleInstances[0].containerId) + assertAllContainersAA() + // there should be none left + log.debug(nodeInformationSnapshotAsString()) + operations = operationsOut + if (i + 1 < size) { + assert operations.size() == 2 + } else { + assert operations.size() == 1 + } + } + // expect an outstanding AA request to be unsatisfied + assert aaRole.actual < aaRole.desired + assert !aaRole.requested + assert !aaRole.AARequestOutstanding + List<Container> allocatedContainers = engine.execute(operations, []) + assert 0 == allocatedContainers.size() + // in a review now, no more requests can be generated, as there is no space for AA placements, + // even though there is cluster capacity + assert 0 == appState.reviewRequestAndReleaseNodes().size() + + // switch node 2 into being labelled + def outcome = updateNodes(new MockNodeReport("00000002", NodeState.RUNNING, "gpu")) + + assert cloneNodemap().size() == NODES + assert outcome.clusterChanged + // no active calls to empty + assert outcome.operations.empty + assert 1 == appState.reviewRequestAndReleaseNodes().size() + } + + protected AppState.NodeUpdatedOutcome addNewNode() { + updateNodes(new MockNodeReport("00000004", NodeState.RUNNING, "gpu")) + } + + @Test + public void testClusterSizeChangesDuringRequestSequence() throws Throwable { + describe("Change the cluster size where the cluster size changes during a test sequence.") + aaRole.desired = GPU_NODES + 1 + List<AbstractRMOperation> operations = appState.reviewRequestAndReleaseNodes() + assert aaRole.AARequestOutstanding + assert GPU_NODES == aaRole.pendingAntiAffineRequests + def outcome = addNewNode() + assert outcome.clusterChanged + // one call to cancel + assert 1 == outcome.operations.size() + // and on a review, one more to rebuild + assert 1 == appState.reviewRequestAndReleaseNodes().size() + } + +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy index de85bba..bf8d1b4 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy @@ -21,7 +21,6 @@ package org.apache.slider.server.appmaster.model.history import groovy.util.logging.Slf4j import org.apache.hadoop.yarn.api.records.NodeReport import org.apache.hadoop.yarn.api.records.NodeState -import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.slider.server.appmaster.model.mock.MockFactory import org.apache.slider.server.appmaster.model.mock.MockNodeReport import org.apache.slider.server.appmaster.model.mock.MockRoleHistory @@ -29,7 +28,6 @@ import org.apache.slider.server.appmaster.state.NodeEntry import org.apache.slider.server.appmaster.state.NodeInstance import org.apache.slider.server.appmaster.state.NodeMap import org.apache.slider.server.appmaster.state.RoleHistory -import org.apache.slider.server.appmaster.state.RoleStatus import org.apache.slider.test.SliderTestBase import org.junit.Test @@ -40,7 +38,7 @@ import org.junit.Test @Slf4j class TestRoleHistoryAA extends SliderTestBase { - List<String> hostnames = ["one", "two", "three"] + List<String> hostnames = ["1", "2", "3"] NodeMap nodeMap, gpuNodeMap RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) @@ -66,11 +64,11 @@ class TestRoleHistoryAA extends SliderTestBase { } public boolean markNodeOneUnhealthy() { - return setNodeState(nodeMap.get("one"), NodeState.UNHEALTHY) + return setNodeState(nodeMap.get("1"), NodeState.UNHEALTHY) } protected boolean setNodeState(NodeInstance node, NodeState state) { - node.updateNode(new MockNodeReport(node.hostname, state)) + node.updateNode(new MockNodeReport(node.hostname, state)) } @Test @@ -80,6 +78,20 @@ class TestRoleHistoryAA extends SliderTestBase { } @Test + public void testFindSomeNodesSomeLabel() throws Throwable { + // all three will surface at first + update(nodeMap, [new MockNodeReport("1", NodeState.RUNNING, "GPU")]) + def gpuNodes = nodeMap.findAllNodesForRole(1, "GPU") + verifyResultSize(1, gpuNodes) + def instance = gpuNodes[0] + instance.getOrCreate(1).onStarting() + assert !instance.canHost(1, "GPU") + assert !instance.canHost(1, "") + verifyResultSize(0, nodeMap.findAllNodesForRole(1, "GPU")) + + } + + @Test public void testFindNoNodesRightLabel() throws Throwable { // all three will surface at first verifyResultSize(3, gpuNodeMap.findAllNodesForRole(1, "GPU")) @@ -122,7 +134,7 @@ class TestRoleHistoryAA extends SliderTestBase { assertNoAvailableNodes(1) // walk one of the nodes through the lifecycle - def node1 = nodeMap.get("one") + def node1 = nodeMap.get("1") assert !node1.canHost(1,"") node1.get(1).onStartCompleted() assert !node1.canHost(1,"") @@ -130,7 +142,7 @@ class TestRoleHistoryAA extends SliderTestBase { node1.get(1).release() assert node1.canHost(1,"") def list2 = verifyResultSize(1, nodeMap.findAllNodesForRole(1, "")) - assert list2[0].hostname == "one" + assert list2[0].hostname == "1" // now tag that node as unhealthy and expect it to go away markNodeOneUnhealthy() @@ -139,11 +151,11 @@ class TestRoleHistoryAA extends SliderTestBase { @Test public void testRolesIndependent() throws Throwable { - def node1 = nodeMap.get("one") + def node1 = nodeMap.get("1") def role1 = node1.getOrCreate(1) def role2 = node1.getOrCreate(2) nodeMap.values().each { - it.updateNode(new MockNodeReport("", NodeState.UNHEALTHY)) + it.updateNode(new MockNodeReport("0", NodeState.UNHEALTHY)) } assertNoAvailableNodes(1) assertNoAvailableNodes(2) @@ -156,6 +168,22 @@ class TestRoleHistoryAA extends SliderTestBase { assert node1.canHost(2,"") } + @Test + public void testNodeEntryAvailablity() throws Throwable { + def entry = new NodeEntry(1) + assert entry.available + entry.onStarting() + assert !entry.available + entry.onStartCompleted() + assert !entry.available + entry.release() + assert entry.available + entry.onStarting() + assert !entry.available + entry.onStartFailed() + assert entry.available + } + public List<NodeInstance> assertNoAvailableNodes(int role = 1, String label = "") { return verifyResultSize(0, nodeMap.findAllNodesForRole(role, label)) } @@ -178,10 +206,14 @@ class TestRoleHistoryAA extends SliderTestBase { def NodeMap createNodeMap(List<NodeReport> nodeReports) { NodeMap nodeMap = new NodeMap(1) - nodeMap.buildOrUpdate(nodeReports) + update(nodeMap, nodeReports) nodeMap } + protected boolean update(NodeMap nodeMap, List<NodeReport> nodeReports) { + nodeMap.buildOrUpdate(nodeReports) + } + def NodeMap createNodeMap(List<String> hosts, NodeState state, String label = "") { createNodeMap(MockNodeReport.createInstances(hosts, state, label)) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy index 7b389cd..6969b38 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy @@ -271,13 +271,13 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { @Test(expected = IllegalArgumentException) public void testAARequestNoNodes() throws Throwable { - tracker.newAARequest(role0Status.key, []) + tracker.newAARequest(role0Status.key, [], "") } @Test public void testAARequest() throws Throwable { def role0 = role0Status.key - OutstandingRequest request = tracker.newAARequest(role0, [host1]) + OutstandingRequest request = tracker.newAARequest(role0, [host1], "") assert host1.hostname == request.hostname assert !request.located } @@ -285,7 +285,7 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { @Test public void testAARequestPair() throws Throwable { def role0 = role0Status.key - OutstandingRequest request = tracker.newAARequest(role0, [host1, host2]) + OutstandingRequest request = tracker.newAARequest(role0, [host1, host2], "") assert host1.hostname == request.hostname assert !request.located def yarnRequest = request.buildContainerRequest( http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy index 4cb441d..da1bcb9 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy @@ -42,12 +42,12 @@ import org.apache.slider.server.appmaster.state.ContainerAssignment import org.apache.slider.server.appmaster.state.ContainerOutcome import org.apache.slider.server.appmaster.state.NodeEntry import org.apache.slider.server.appmaster.state.NodeInstance +import org.apache.slider.server.appmaster.state.NodeMap import org.apache.slider.server.appmaster.state.ProviderAppState import org.apache.slider.server.appmaster.state.RoleInstance import org.apache.slider.server.appmaster.state.RoleStatus import org.apache.slider.server.appmaster.state.StateAccessForProviders import org.apache.slider.test.SliderTestBase -import org.junit.Before @CompileStatic @Slf4j @@ -404,7 +404,7 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles * Scan through all containers and assert that the assignment is AA * @param index role index */ - void assertAllContainersAA(String index) { + void assertAllContainersAAOld(String index) { def nodemap = stateAccess.nodeInformationSnapshot nodemap.each { name, info -> def nodeEntry = info.entries[index] @@ -413,4 +413,59 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles "too many instances on node $name" } } + + /** + * Get the node information as a large JSON String + * @return + */ + String nodeInformationSnapshotAsString() { + prettyPrintAsJson(stateAccess.nodeInformationSnapshot) + } + + /** + * Scan through all containers and assert that the assignment is AA + * @param index role index + */ + void assertAllContainersAA(int index) { + cloneNodemap().each { name, info -> + def nodeEntry = info.get(index) + assert nodeEntry == null || nodeEntry.antiAffinityConstraintHeld + "too many instances on node $name" + } + } + + List<NodeInstance> verifyNodeInstanceCount(int size, List<NodeInstance> list) { + if (list.size() != size) { + list.each { log.error(it.toFullString()) } + } + assert size == list.size() + list + } + + /** + * Get the single request of a list of operations; includes the check for the size + * @param ops operations list of size 1 + * @return the request within the first operation + */ + public AMRMClient.ContainerRequest getSingleCancel(List<AbstractRMOperation> ops) { + assert 1 == ops.size() + getCancel(ops, 0) + } + + /** + * Get a snapshot of the nodemap of the application state + * @return a cloned nodemap + */ + protected NodeMap cloneNodemap() { + appState.roleHistory.cloneNodemap() + } + + /** + * Issue a nodes updated event + * @param report report to notify + * @return response of AM + */ + protected AppState.NodeUpdatedOutcome updateNodes(NodeReport report) { + appState.onNodesUpdated([report]) + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockNodeReport.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockNodeReport.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockNodeReport.groovy index 43eef3e..8c3b712 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockNodeReport.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockNodeReport.groovy @@ -51,6 +51,7 @@ class MockNodeReport extends NodeReport { */ MockNodeReport(String hostname, NodeState nodeState, String label ="") { nodeId = NodeId.newInstance(hostname, 80) + Integer.valueOf(hostname, 16) this.nodeState = nodeState this.httpAddress = "http$hostname:80" this.nodeLabels = new HashSet<>() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/178bd96d/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy b/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy index ae07187..e1f2f75 100644 --- a/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy @@ -109,6 +109,15 @@ class SliderTestUtils extends Assert { JsonOutput.prettyPrint(json) } + /** + * Convert a JSON string to something readable + * @param json + * @return a string for printing + */ + public static String prettyPrintAsJson(Object src) { + JsonOutput.prettyPrint(JsonOutput.toJson(src)) + } + public static void skip(String message) { log.warn("Skipping test: {}", message) Assume.assumeTrue(message, false);
