SLIDER-967 Use nodemap to build up location restrictions on AA placement. This is the core of the AA placement algorithm: it finds nodes that are free to use.
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/7899f59a Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/7899f59a Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/7899f59a Branch: refs/heads/feature/SLIDER-82-pass-3.1 Commit: 7899f59a1cf12ae88775dcdd85a712f96cd6eb7c Parents: 856ab84 Author: Steve Loughran <[email protected]> Authored: Wed Nov 11 20:11:50 2015 +0000 Committer: Steve Loughran <[email protected]> Committed: Wed Nov 11 20:11:50 2015 +0000 ---------------------------------------------------------------------- .../slider/api/proto/RestTypeMarshalling.java | 10 +- .../slider/api/types/NodeInformation.java | 4 +- .../server/appmaster/SliderAppMaster.java | 6 +- .../server/appmaster/rpc/SliderIPCService.java | 1 - .../slider/server/appmaster/state/AppState.java | 91 ++++++++------- .../server/appmaster/state/NodeEntry.java | 2 +- .../server/appmaster/state/NodeInstance.java | 11 +- .../slider/server/appmaster/state/NodeMap.java | 4 +- .../appmaster/state/OutstandingRequest.java | 51 ++++++++- .../state/OutstandingRequestTracker.java | 55 +++++++++ .../server/appmaster/state/RoleHistory.java | 112 +++++++++++++++++-- .../server/appmaster/state/RoleStatus.java | 26 ++++- .../appstate/TestMockAppStateAAPlacement.groovy | 31 ++++- .../model/history/TestRoleHistoryAA.groovy | 25 +++-- ...stRoleHistoryFindNodesForNewInstances.groovy | 20 ++-- ...tRoleHistoryOutstandingRequestTracker.groovy | 30 ++++- .../TestRoleHistoryRequestTracking.groovy | 12 +- .../model/mock/BaseMockAppStateTest.groovy | 4 + 18 files changed, 397 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/api/proto/RestTypeMarshalling.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/api/proto/RestTypeMarshalling.java b/slider-core/src/main/java/org/apache/slider/api/proto/RestTypeMarshalling.java index b7985e6..85a8358 100644 --- a/slider-core/src/main/java/org/apache/slider/api/proto/RestTypeMarshalling.java +++ b/slider-core/src/main/java/org/apache/slider/api/proto/RestTypeMarshalling.java @@ -38,6 +38,8 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; import java.util.List; /** @@ -160,8 +162,8 @@ public class RestTypeMarshalling { builder.setLabels(info.labels); } - List<NodeEntryInformation> entries = info.entries; - if (entries != null) { + if (info.entries != null) { + Collection<NodeEntryInformation> entries = info.entries.values(); for (NodeEntryInformation entry : entries) { Messages.NodeEntryInformationProto.Builder node = Messages.NodeEntryInformationProto.newBuilder(); @@ -192,7 +194,7 @@ public class RestTypeMarshalling { info.state = wire.getState(); List<Messages.NodeEntryInformationProto> entriesList = wire.getEntriesList(); if (entriesList != null) { - info.entries = new ArrayList<>(entriesList.size()); + info.entries = new HashMap<>(entriesList.size()); for (Messages.NodeEntryInformationProto entry : entriesList) { NodeEntryInformation nei = new NodeEntryInformation(); nei.failed = entry.getFailed(); @@ -205,7 +207,7 @@ public class RestTypeMarshalling { nei.releasing = entry.getReleasing(); nei.startFailed = entry.getStartFailed(); nei.starting = entry.getStarting(); - info.entries.add(nei); + info.entries.put(Integer.toString(nei.priority), nei); } } return info; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/api/types/NodeInformation.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/api/types/NodeInformation.java b/slider-core/src/main/java/org/apache/slider/api/types/NodeInformation.java index 049ee52..edf7e21 100644 --- a/slider-core/src/main/java/org/apache/slider/api/types/NodeInformation.java +++ b/slider-core/src/main/java/org/apache/slider/api/types/NodeInformation.java @@ -22,7 +22,9 @@ import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.map.annotate.JsonSerialize; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Serialized node information. Must be kept in sync with the protobuf equivalent. @@ -38,5 +40,5 @@ public class NodeInformation { public long lastUpdated; public String rackName; public String state; - public List<NodeEntryInformation> entries = new ArrayList<>(); + public Map<String, NodeEntryInformation> entries = new HashMap<>(); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index d74688b..171451e 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -1826,7 +1826,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService */ private void releaseAllContainers() { List<AbstractRMOperation> operations = appState.releaseAllContainers(); - providerRMOperationHandler.execute(operations); //now apply the operations execute(operations); } @@ -1852,7 +1851,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService LOG_YARN.info("onNodesUpdated({})", updatedNodes.size()); log.info("Updated nodes {}", updatedNodes); // Check if any nodes are lost or revived and update state accordingly - appState.onNodesUpdated(updatedNodes); + List<AbstractRMOperation> operations = appState.onNodesUpdated(updatedNodes); + execute(operations); + // if there were any operations, trigger a review + reviewRequestAndReleaseNodes("nodes updated"); } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java index bb8f512..a983f53 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderIPCService.java @@ -448,7 +448,6 @@ public class SliderIPCService extends AbstractService } } - @Override public Messages.WrappedJsonProto getModelDesired(Messages.EmptyPayloadProto request) throws IOException { return lookupAggregateConf(MODEL_DESIRED); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 063a7fc..c960510 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -663,7 +663,6 @@ public class AppState { return newRole; } - /** * Actions to perform when an instance definition is updated * Currently: @@ -678,9 +677,9 @@ public class AppState { * * @throws BadConfigException */ - private synchronized void onInstanceDefinitionUpdated() throws - BadConfigException, - IOException { + private synchronized void onInstanceDefinitionUpdated() + throws BadConfigException, IOException { + log.debug("Instance definition updated"); //note the time snapshotTime = now(); @@ -1220,11 +1219,14 @@ public class AppState { * Anti-Affine: the {@link RoleStatus#outstandingAArequest} is set here. * This is where role history information will be used for placement decisions. * @param role role - * @return the container request to submit + * @return the container request to submit or null if there is none */ private AMRMClient.ContainerRequest createContainerRequest(RoleStatus role) { incrementRequestCount(role); OutstandingRequest request = roleHistory.requestContainerForRole(role); + if (request == null) { + return null; + } if (role.isAntiAffinePlacement()) { role.setOutstandingAArequest(request); } @@ -1426,12 +1428,13 @@ public class AppState { * Handle node update from the RM. This syncs up the node map with the RM's view * @param updatedNodes updated nodes */ - public synchronized void onNodesUpdated(List<NodeReport> updatedNodes) { + public synchronized List<AbstractRMOperation> onNodesUpdated(List<NodeReport> updatedNodes) { boolean changed = roleHistory.onNodesUpdated(updatedNodes); if (changed) { - //TODO log.error("TODO: cancel AA requests and re-review"); + return cancelOutstandingAARequests(); } + return new ArrayList<>(0); } /** @@ -1882,6 +1885,20 @@ public class AppState { } /** + * Escalate operation as triggered by external timer. + * @return a (usually empty) list of cancel/request operations. + */ + public synchronized List<AbstractRMOperation> cancelOutstandingAARequests() { + List<AbstractRMOperation> operations = roleHistory.cancelOutstandingAARequests(); + for (RoleStatus roleStatus : roleStatusMap.values()) { + if (roleStatus.isAntiAffinePlacement()) { + roleStatus.cancelOutstandingAARequest(); + } + } + return operations; + } + + /** * Look at the allocation status of one role, and trigger add/release * actions if the number of desired role instances doesn't equal * (actual + pending). @@ -1900,7 +1917,6 @@ public class AppState { long delta; long expected; String name = role.getName(); - boolean isAA = role.isAntiAffinePlacement(); synchronized (role) { delta = role.getDelta(); expected = role.getDesired(); @@ -1920,19 +1936,24 @@ public class AppState { if (delta > 0) { // more workers needed than we have -ask for more - log.info("{}: Asking for {} more nodes(s) for a total of {} ", name, - delta, expected); - - // TODO: AA RH to help here by only allowing one request for an AA + log.info("{}: Asking for {} more nodes(s) for a total of {} ", name, delta, expected); - if (isAA) { - // build one only if there is none outstanding + if (role.isAntiAffinePlacement()) { + // build one only if there is none outstanding, the role history knows + // enough about the cluster to ask, and there is somewhere to place + // the node if (role.getPendingAntiAffineRequests() == 0 + && !role.isAARequestOutstanding() && roleHistory.canPlaceAANodes()) { - log.info("Starting an anti-affine request sequence for {} nodes", delta); // log the number outstanding - role.incPendingAntiAffineRequests(delta - 1); - addContainerRequest(operations, createContainerRequest(role)); + AMRMClient.ContainerRequest request = createContainerRequest(role); + if (request != null) { + log.info("Starting an anti-affine request sequence for {} nodes", delta); + role.incPendingAntiAffineRequests(delta - 1); + addContainerRequest(operations, request); + } else { + log.info("No location for anti-affine request"); + } } else { if (roleHistory.canPlaceAANodes()) { log.info("Adding {} more anti-affine requests", delta); @@ -1955,22 +1976,7 @@ public class AppState { // reduce the number expected (i.e. subtract the delta) long excess = -delta; - if (isAA) { - // there may be pending requests which can be cancelled here - long pending = role.getPendingAntiAffineRequests(); - if (excess <= pending) { - long outstanding = pending - excess; - log.info("Cancelling {} pending AA allocations, leaving {}", excess, outstanding); - role.setPendingAntiAffineRequests(outstanding); - excess = 0; - } else { - // not enough - log.info("Cancelling all pending AA allocations"); - role.setPendingAntiAffineRequests(0); - excess -= pending; - } - } - // how many requests are outstanding? + // how many requests are outstanding? for AA roles, this includes pending long outstandingRequests = role.getRequested(); if (outstandingRequests > 0) { // outstanding requests. @@ -2052,15 +2058,22 @@ public class AppState { return operations; } + /** + * Add a container request if the request is non-null + * @param operations operations to add the entry to + * @param containerAsk what to ask for + */ private void addContainerRequest(List<AbstractRMOperation> operations, AMRMClient.ContainerRequest containerAsk) { - log.info("Container ask is {} and label = {}", containerAsk, - containerAsk.getNodeLabelExpression()); - int askMemory = containerAsk.getCapability().getMemory(); - if (askMemory > this.containerMaxMemory) { - log.warn("Memory requested: {} > max of {}", askMemory, containerMaxMemory); + if (containerAsk != null) { + log.info("Container ask is {} and label = {}", containerAsk, + containerAsk.getNodeLabelExpression()); + int askMemory = containerAsk.getCapability().getMemory(); + if (askMemory > this.containerMaxMemory) { + log.warn("Memory requested: {} > max of {}", askMemory, containerMaxMemory); + } + operations.add(new ContainerRequestOperation(containerAsk)); } - operations.add(new ContainerRequestOperation(containerAsk)); } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java index 6dae3c6..c180f88 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeEntry.java @@ -90,7 +90,7 @@ public class NodeEntry { * the number of instances > 1. */ public synchronized boolean isAvailable() { - return getActive() == 0 && (requested == 0) && starting == 0; + return getActive() == 0 && requested == 0 && starting == 0; } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java index b805ffb..ebd9d5a 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java @@ -22,10 +22,12 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.slider.api.types.NodeInformation; import org.apache.slider.common.tools.Comparators; +import org.apache.slider.common.tools.SliderUtils; import java.io.Serializable; import java.util.ArrayList; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.ListIterator; import java.util.Set; @@ -64,7 +66,8 @@ public class NodeInstance { private String nodeLabels = ""; /** - * The list of node entries of specific roles + * An unordered list of node entries of specific roles. There's nothing + * indexed so as to support sparser datastructures. */ private final List<NodeEntry> nodeEntries; @@ -307,9 +310,9 @@ public class NodeInstance { info.rackName = nodeReport.getRackName(); info.healthReport = nodeReport.getHealthReport(); } - info.entries = new ArrayList<>(nodeEntries.size()); + info.entries = new HashMap<>(nodeEntries.size()); for (NodeEntry nodeEntry : nodeEntries) { - info.entries.add(nodeEntry.serialize()); + info.entries.put(Integer.toString(nodeEntry.rolePriority), nodeEntry.serialize()); } return info; } @@ -323,7 +326,7 @@ public class NodeInstance { */ public boolean canHost(int role, String label) { return isOnline() - && (label.isEmpty() || label.equals(nodeLabels)) // label match + && (SliderUtils.isUnset(label) || label.equals(nodeLabels)) // label match && (get(role) == null || get(role).isAvailable()); // no live role } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java index 2887c9e..aea48b3 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java @@ -146,9 +146,9 @@ public class NodeMap extends HashMap<String, NodeInstance> { * Scan the current node map for all nodes capable of hosting an instance * @param role role ID * @param label label which must match, or "" for no label checks - * @return a list of node instances matching the criteria. + * @return a possibly empty list of node instances matching the criteria. */ - public List<NodeInstance> findNodesForRole(int role, String label) { + public List<NodeInstance> findAllNodesForRole(int role, String label) { List<NodeInstance> nodes = new ArrayList<>(size()); for (NodeInstance instance : values()) { if (instance.canHost(role, label)) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java index a9d4b52..e211e7f 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java @@ -29,6 +29,7 @@ import org.apache.slider.server.appmaster.operations.CancelSingleRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; /** @@ -53,6 +54,13 @@ public final class OutstandingRequest extends RoleHostnamePair { public final NodeInstance node; /** + * A list of all possible nodes to list in an AA request. For a non-AA + * request where {@link #node} is set, element 0 of the list is the same + * value. + */ + public final List<NodeInstance> nodes = new ArrayList<>(1); + + /** * Optional label. This is cached as the request option (explicit-location + label) is forbidden, * yet the label needs to be retained for escalation. */ @@ -95,6 +103,12 @@ public final class OutstandingRequest extends RoleHostnamePair { private int priority = -1; /** + * Is this an Anti-affine request which should be cancelled on + * a cluster resize? + */ + private boolean antiAffine = false; + + /** * Create a request * @param roleId role * @param node node -can be null @@ -103,6 +117,7 @@ public final class OutstandingRequest extends RoleHostnamePair { NodeInstance node) { super(roleId, node != null ? node.hostname : null); this.node = node; + nodes.add(node); } /** @@ -119,6 +134,19 @@ public final class OutstandingRequest extends RoleHostnamePair { } /** + * Create an Anti-affine reques, including all listed nodes (there must be one) + * as targets. + * @param roleId role + * @param nodes list of nodes + */ + public OutstandingRequest(int roleId, List<NodeInstance> nodes) { + super(roleId, nodes.get(0).hostname); + this.node = null; + this.antiAffine = true; + this.nodes.addAll(nodes); + } + + /** * Is the request located in the cluster, that is: does it have a node. * @return true if a node instance was supplied in the constructor */ @@ -150,6 +178,14 @@ public final class OutstandingRequest extends RoleHostnamePair { return priority; } + public boolean isAntiAffine() { + return antiAffine; + } + + public void setAntiAffine(boolean antiAffine) { + this.antiAffine = antiAffine; + } + /** * Build a container request. * <p> @@ -183,7 +219,19 @@ public final class OutstandingRequest extends RoleHostnamePair { NodeInstance target = this.node; String nodeLabels; - if (target != null) { + if (isAntiAffine()) { + hosts = new String[nodes.size()]; + int c = 0; + for (NodeInstance nodeInstance : nodes) { + hosts[c] = nodeInstance.hostname; + c++; + } + log.info("Creating anti-affine request across {} nodes; first node = {}", c, hostname); + escalated = false; + mayEscalate = false; + relaxLocality = false; + nodeLabels = label; + } else if (target != null) { // placed request. Hostname is used in request hosts = new String[1]; hosts[0] = target.hostname; @@ -215,7 +263,6 @@ public final class OutstandingRequest extends RoleHostnamePair { pri, relaxLocality, nodeLabels); - validate(); return issuedRequest; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java index ecdc07a..4209449 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java @@ -93,6 +93,23 @@ public class OutstandingRequestTracker { } /** + * Create a new Anti-affine request for the specific role + * <p> + * It is added to {@link #openRequests} + * <p> + * This does not update the node instance's role's request count + * @param role role index + * @param nodes list of suitable nodes + * @return a new request + */ + public synchronized OutstandingRequest newAARequest(int role, List<NodeInstance> nodes) { + Preconditions.checkArgument(!nodes.isEmpty()); + OutstandingRequest request = new OutstandingRequest(role, nodes); + openRequests.add(request); + return request; + } + + /** * Look up any oustanding request to a (role, hostname). * @param role role index * @param hostname hostname @@ -364,6 +381,43 @@ public class OutstandingRequestTracker { } /** + * Cancel all outstanding AA requests from the lists of requests. + * + * This does not remove them from the role status; they must be reset + * by the caller. + * + */ + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") + public synchronized List<AbstractRMOperation> cancelOutstandingAARequests() { + + List<AbstractRMOperation> operations = new ArrayList<>(); + + // first, all placed requests + for (Map.Entry<RoleHostnamePair, OutstandingRequest> entry : placedRequests.entrySet()) { + OutstandingRequest outstandingRequest = entry.getValue(); + synchronized (outstandingRequest) { + if (outstandingRequest.isAntiAffine()) { + // time to escalate + operations.add(outstandingRequest.createCancelOperation()); + placedRequests.remove(entry.getKey()); + } + } + } + // second, all open requests + for (OutstandingRequest outstandingRequest : openRequests) { + synchronized (outstandingRequest) { + if (outstandingRequest.isAntiAffine()) { + // time to escalate + operations.add(outstandingRequest.createCancelOperation()); + openRequests.remove(outstandingRequest); + } + } + } + + return operations; + } + + /** * Extract a specific number of open requests for a role * @param roleId role Id * @param count count to extract @@ -382,6 +436,7 @@ public class OutstandingRequestTracker { } return results; } + /** * Extract a specific number of placed requests for a role * @param roleId role Id http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java index df1f4e1..2ca5367 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -19,13 +19,13 @@ package org.apache.slider.server.appmaster.state; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.slider.api.types.NodeInformation; import org.apache.slider.common.tools.SliderUtils; import org.apache.slider.core.exceptions.BadConfigException; @@ -46,11 +46,9 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** @@ -549,7 +547,7 @@ public class RoleHistory { * @return the instance, or null for none */ @VisibleForTesting - public synchronized NodeInstance findNodeForNewInstance(RoleStatus role) { + public synchronized NodeInstance findRecentNodeForNewInstance(RoleStatus role) { if (!role.isPlacementDesired()) { // no data locality policy return null; @@ -591,6 +589,18 @@ public class RoleHistory { } /** + * Find a node for use + * @param role role + * @return the instance, or null for none + */ + @VisibleForTesting + public synchronized List<NodeInstance> findNodeForNewAAInstance(RoleStatus role) { + // all nodes that are live and can host the role; no attempt to exclude ones + // considered failing + return nodemap.findAllNodesForRole(role.getKey(), role.getLabelExpression()); + } + + /** * Request an instance on a given node. * An outstanding request is created & tracked, with the * relevant node entry for that role updated. @@ -615,15 +625,29 @@ public class RoleHistory { * Find a node for a role and request an instance on that (or a location-less * instance) * @param role role status - * @return a request ready to go + * @return a request ready to go, or null if this is an AA request and no + * location can be found. */ public synchronized OutstandingRequest requestContainerForRole(RoleStatus role) { Resource resource = recordFactory.newResource(); role.copyResourceRequirements(resource); - NodeInstance node = findNodeForNewInstance(role); - // TODO AA -what if there are no suitable nodes? - return requestInstanceOnNode(node, role, resource); + if (role.isAntiAffinePlacement()) { + // if a placement can be found, return it. + List<NodeInstance> nodes = findNodeForNewAAInstance(role); + if (!nodes.isEmpty()) { + OutstandingRequest outstanding + = outstandingRequests.newAARequest(role.getKey(), nodes); + outstanding.buildContainerRequest(resource, role, now()); + return outstanding; + } else { + log.warn("No suitable location for {}", role.getName()); + return null; + } + } else { + NodeInstance node = findRecentNodeForNewInstance(role); + return requestInstanceOnNode(node, role, resource); + } } /** @@ -972,6 +996,26 @@ public class RoleHistory { public List<AbstractRMOperation> escalateOutstandingRequests() { return outstandingRequests.escalateOutstandingRequests(now()); } + /** + * Escalate operation as triggered by external timer. + * @return a (usually empty) list of cancel/request operations. + */ + public List<AbstractRMOperation> cancelOutstandingAARequests() { + return outstandingRequests.cancelOutstandingAARequests(); + } + + /** + * Cancel a number of outstanding requests for a role -that is, not + * actual containers, just requests for new ones. + * @param role role + * @param toCancel number to cancel + * @return a list of cancellable operations. + */ + public List<AbstractRMOperation> cancelRequestsForRole(RoleStatus role, int toCancel) { + return role.isAntiAffinePlacement() ? + cancelRequestsForAARole(role, toCancel) + : cancelRequestsForSimpleRole(role, toCancel); + } /** * Build the list of requests to cancel from the outstanding list. @@ -979,19 +1023,67 @@ public class RoleHistory { * @param toCancel number to cancel * @return a list of cancellable operations. */ - public synchronized List<AbstractRMOperation> cancelRequestsForRole(RoleStatus role, int toCancel) { + private synchronized List<AbstractRMOperation> cancelRequestsForSimpleRole(RoleStatus role, int toCancel) { + Preconditions.checkArgument(toCancel > 0, + "trying to cancel invalid number of requests: " + toCancel); List<AbstractRMOperation> results = new ArrayList<>(toCancel); // first scan through the unplaced request list to find all of a role int roleId = role.getKey(); List<OutstandingRequest> requests = outstandingRequests.extractOpenRequestsForRole(roleId, toCancel); + if (role.isAntiAffinePlacement()) { + // TODO: AA + // AA placement, so clear the role info + role.cancelOutstandingAARequest(); + } // are there any left? int remaining = toCancel - requests.size(); // ask for some placed nodes requests.addAll(outstandingRequests.extractPlacedRequestsForRole(roleId, remaining)); - // TODO AA: clear anything here? + // build cancellations + for (OutstandingRequest request : requests) { + results.add(request.createCancelOperation()); + } + return results; + } + + /** + * Build the list of requests to cancel for an AA role. This reduces the number + * of outstanding pending requests first, then cancels any active request, + * before finally asking for any placed containers + * @param role role + * @param toCancel number to cancel + * @return a list of cancellable operations. + */ + private synchronized List<AbstractRMOperation> cancelRequestsForAARole(RoleStatus role, int toCancel) { + List<AbstractRMOperation> results = new ArrayList<>(toCancel); + int roleId = role.getKey(); + List<OutstandingRequest> requests = new ArrayList<>(toCancel); + // there may be pending requests which can be cancelled here + long pending = role.getPendingAntiAffineRequests(); + if (pending > 0) { + // there are some pending ones which can be cancelled first + long pendingToCancel = Math.min(pending, toCancel); + log.info("Cancelling {} pending AA allocations, leaving {}", toCancel, + pendingToCancel); + role.setPendingAntiAffineRequests(pending - pendingToCancel); + toCancel -= pendingToCancel; + } + if (toCancel > 0 && role.isAARequestOutstanding()) { + // not enough + log.info("Cancelling current AA request"); + // find the single entry which may be running + requests = outstandingRequests.extractOpenRequestsForRole(roleId, toCancel); + role.cancelOutstandingAARequest(); + toCancel--; + } + + // ask for some excess nodes + if (toCancel > 0) { + requests.addAll(outstandingRequests.extractPlacedRequestsForRole(roleId, toCancel)); + } // build cancellations for (OutstandingRequest request : requests) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java index 1beaddc..a14a84b 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java @@ -71,7 +71,7 @@ public final class RoleStatus implements Cloneable { private final LongGauge pendingAntiAffineRequests = new LongGauge(0); /** any pending AA request */ - private OutstandingRequest outstandingAArequest = null; + private volatile OutstandingRequest outstandingAArequest = null; private String failureMessage = ""; @@ -155,8 +155,12 @@ public final class RoleStatus implements Cloneable { return actual.decToFloor(1); } + /** + * Get the request count. For AA roles, this includes pending ones. + * @return a count of requested containers + */ public long getRequested() { - return requested.get(); + return requested.get() + pendingAntiAffineRequests.get(); } public long incRequested() { @@ -209,6 +213,14 @@ public final class RoleStatus implements Cloneable { } /** + * Probe for an outstanding AA request being true + * @return true if there is an outstanding AA Request + */ + public boolean isAARequestOutstanding() { + return outstandingAArequest != null; + } + + /** * Note that a role failed, text will * be used in any diagnostics if an exception * is later raised. @@ -312,13 +324,21 @@ public final class RoleStatus implements Cloneable { /** * Complete the outstanding AA request (there's no check for one in progress, caller * expected to have done that). - * @return the number of outstanding requests */ public void completeOutstandingAARequest() { setOutstandingAArequest(null); } /** + * Cancel any outstanding AA request. Harmless if the role is non-AA, or + * if there are no outstanding requests. + */ + public void cancelOutstandingAARequest() { + setOutstandingAArequest(null); + setPendingAntiAffineRequests(0); + } + + /** * Get the number of roles we are short of. * nodes released are ignored. * @return the positive or negative number of roles to add/release. http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy index baf88dc..c7f59e3 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy @@ -93,8 +93,8 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() AMRMClient.ContainerRequest request = getSingleRequest(ops) - assert request.relaxLocality - assert request.nodes == null + assert !request.relaxLocality + assert request.nodes.size() == engine.cluster.clusterSize assert request.racks == null assert request.capability @@ -131,6 +131,7 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest assert appState.onNodeManagerContainerStarted(container.id) ops = appState.reviewRequestAndReleaseNodes() assert ops.size() == 0 + assertAllContainersAA(); } @Test @@ -160,6 +161,8 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest assert 1 == submitOperations(ops2, [], ops3).size() assert 2 == ops3.size() assert aaRole.pendingAntiAffineRequests == 0 + assertAllContainersAA() + } @Test @@ -169,14 +172,17 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() getSingleRequest(ops) assert aaRole.pendingAntiAffineRequests == 1 + assert aaRole.AARequestOutstanding // flex down so that the next request should be cancelled aaRole.desired = 1 - // expect: no new reqests, pending count -- + // expect: no new requests, pending count -- List<AbstractRMOperation> ops2 = appState.reviewRequestAndReleaseNodes() assert ops2.empty + assert aaRole.AARequestOutstanding assert aaRole.pendingAntiAffineRequests == 0 + assertAllContainersAA() // next iter submitOperations(ops, [], ops2).size() @@ -195,12 +201,14 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() getSingleRequest(ops) assert aaRole.pendingAntiAffineRequests == 0 + assert aaRole.AARequestOutstanding // flex down so that the next request should be cancelled aaRole.desired = 0 // expect: no new reqests, pending count -- List<AbstractRMOperation> ops2 = appState.reviewRequestAndReleaseNodes() assert aaRole.pendingAntiAffineRequests == 0 + assert !aaRole.AARequestOutstanding assert ops2.size() == 1 getSingleCancel(ops2) @@ -209,5 +217,22 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest assert 1 == ops2.size() } + void assertAllContainersAA() { + assertAllContainersAA(Integer.toString(aaRole.key)) + } + + /** + * Scan through all containers and assert that the assignment is AA + * @param index role index + */ + void assertAllContainersAA(String index) { + def nodemap = stateAccess.nodeInformationSnapshot + nodemap.each { name, info -> + def nodeEntry = info.entries[index] + assert nodeEntry == null || + (nodeEntry.live + nodeEntry.starting + nodeEntry.releasing) <= 1 , + "too many instances on node $name" + } + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy index f99326f..fdbc3b4 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy @@ -21,10 +21,15 @@ package org.apache.slider.server.appmaster.model.history import groovy.util.logging.Slf4j import org.apache.hadoop.yarn.api.records.NodeReport import org.apache.hadoop.yarn.api.records.NodeState +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.slider.server.appmaster.model.mock.MockFactory import org.apache.slider.server.appmaster.model.mock.MockNodeReport +import org.apache.slider.server.appmaster.model.mock.MockRoleHistory import org.apache.slider.server.appmaster.state.NodeEntry import org.apache.slider.server.appmaster.state.NodeInstance import org.apache.slider.server.appmaster.state.NodeMap +import org.apache.slider.server.appmaster.state.RoleHistory +import org.apache.slider.server.appmaster.state.RoleStatus import org.apache.slider.test.SliderTestBase import org.junit.Test @@ -37,44 +42,48 @@ class TestRoleHistoryAA extends SliderTestBase { List<String> hostnames = ["one", "two", "three"] NodeMap nodeMap, gpuNodeMap + RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) + + AMRMClient.ContainerRequest requestContainer(RoleStatus roleStatus) { + roleHistory.requestContainerForRole(roleStatus).issuedRequest + } @Override void setup() { super.setup() nodeMap = createNodeMap(hostnames, NodeState.RUNNING) gpuNodeMap = createNodeMap(hostnames, NodeState.RUNNING, "GPU") - } @Test public void testFindNodesInFullCluster() throws Throwable { // all three will surface at first - assertResultSize(3, nodeMap.findNodesForRole(1, "")) + assertResultSize(3, nodeMap.findAllNodesForRole(1, "")) } @Test public void testFindNodesInUnhealthyCluster() throws Throwable { // all three will surface at first nodeMap.get("one").updateNode(new MockNodeReport("one",NodeState.UNHEALTHY)) - assertResultSize(2, nodeMap.findNodesForRole(1, "")) + assertResultSize(2, nodeMap.findAllNodesForRole(1, "")) } @Test public void testFindNoNodesWrongLabel() throws Throwable { // all three will surface at first - assertResultSize(0, nodeMap.findNodesForRole(1, "GPU")) + assertResultSize(0, nodeMap.findAllNodesForRole(1, "GPU")) } @Test public void testFindNoNodesRightLabel() throws Throwable { // all three will surface at first - assertResultSize(3, gpuNodeMap.findNodesForRole(1, "GPU")) + assertResultSize(3, gpuNodeMap.findAllNodesForRole(1, "GPU")) } @Test public void testFindNoNodesNoLabel() throws Throwable { // all three will surface at first - assertResultSize(3, gpuNodeMap.findNodesForRole(1, "")) + assertResultSize(3, gpuNodeMap.findAllNodesForRole(1, "")) } @Test @@ -83,7 +92,7 @@ class TestRoleHistoryAA extends SliderTestBase { applyToNodeEntries(nodeMap) { NodeEntry it -> it.request() } - assertResultSize(0, nodeMap.findNodesForRole(1, "")) + assertResultSize(0, nodeMap.findAllNodesForRole(1, "")) } @Test @@ -92,7 +101,7 @@ class TestRoleHistoryAA extends SliderTestBase { applyToNodeEntries(nodeMap) { NodeEntry it -> it.request() } - assertResultSize(0, nodeMap.findNodesForRole(1, "")) + assertResultSize(0, nodeMap.findAllNodesForRole(1, "")) } def assertResultSize(int size, List<NodeInstance> list) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy index 63aa6d2..f36724e 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryFindNodesForNewInstances.groovy @@ -33,10 +33,8 @@ import org.junit.Test /** * Testing finding nodes for new instances. - * These tests validate the (currently) suboptimal - * behavior of not listing any known nodes when there - * are none in the available list -even if there are nodes - * known to be not running live instances in the cluster. + * + * This stresses the non-AA codepath */ @Slf4j @CompileStatic @@ -71,7 +69,7 @@ class TestRoleHistoryFindNodesForNewInstances extends BaseMockAppStateTest { public List<NodeInstance> findNodes(int count, RoleStatus roleStatus = roleStat) { List < NodeInstance > found = []; for (int i = 0; i < count; i++) { - NodeInstance f = roleHistory.findNodeForNewInstance(roleStatus) + NodeInstance f = roleHistory.findRecentNodeForNewInstance(roleStatus) if (f) { found << f }; @@ -81,17 +79,17 @@ class TestRoleHistoryFindNodesForNewInstances extends BaseMockAppStateTest { @Test public void testFind1NodeR0() throws Throwable { - NodeInstance found = roleHistory.findNodeForNewInstance(roleStat) + NodeInstance found = roleHistory.findRecentNodeForNewInstance(roleStat) log.info("found: $found") assert [age3Active0].contains(found) } @Test public void testFind2NodeR0() throws Throwable { - NodeInstance found = roleHistory.findNodeForNewInstance(roleStat) + NodeInstance found = roleHistory.findRecentNodeForNewInstance(roleStat) log.info("found: $found") assert [age2Active0, age3Active0].contains(found) - NodeInstance found2 = roleHistory.findNodeForNewInstance(roleStat) + NodeInstance found2 = roleHistory.findRecentNodeForNewInstance(roleStat) log.info("found: $found2") assert [age2Active0, age3Active0].contains(found2) assert found != found2; @@ -100,7 +98,7 @@ class TestRoleHistoryFindNodesForNewInstances extends BaseMockAppStateTest { @Test public void testFind3NodeR0ReturnsNull() throws Throwable { assert 2== findNodes(2).size() - NodeInstance found = roleHistory.findNodeForNewInstance(roleStat) + NodeInstance found = roleHistory.findRecentNodeForNewInstance(roleStat) assert found == null; } @@ -124,7 +122,7 @@ class TestRoleHistoryFindNodesForNewInstances extends BaseMockAppStateTest { assert age2Active0.getActiveRoleInstances(0) != 0 age3Active0.get(0).onStartCompleted() assert age3Active0.getActiveRoleInstances(0) != 0 - NodeInstance found = roleHistory.findNodeForNewInstance(roleStat) + NodeInstance found = roleHistory.findRecentNodeForNewInstance(roleStat) log.info(found ?.toFullString()) assert found == null } @@ -148,7 +146,7 @@ class TestRoleHistoryFindNodesForNewInstances extends BaseMockAppStateTest { assert age2Active0.exceedsFailureThreshold(roleStat) // get the role & expect age3 to be picked up, even though it is older - NodeInstance found = roleHistory.findNodeForNewInstance(roleStat) + NodeInstance found = roleHistory.findRecentNodeForNewInstance(roleStat) assert age3Active0.is(found) } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy index 1c99c04..7b389cd 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy @@ -240,6 +240,7 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { def yarnRequest = req1.buildContainerRequest(resource, workerRole, 0) assert (yarnRequest.nodeLabelExpression == null) assert (!yarnRequest.relaxLocality) + // escalation def yarnRequest2 = req1.escalate() assert (yarnRequest2.nodeLabelExpression == WORKERS_LABEL) assert (yarnRequest2.relaxLocality) @@ -258,7 +259,6 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { resource.virtualCores = 1 resource.memory = 48; - def label = null // initial request def yarnRequest = req1.buildContainerRequest(resource, role0Status, 0) assert yarnRequest.nodes != null @@ -269,6 +269,34 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { assert yarnRequest2.relaxLocality } + @Test(expected = IllegalArgumentException) + public void testAARequestNoNodes() throws Throwable { + tracker.newAARequest(role0Status.key, []) + } + + @Test + public void testAARequest() throws Throwable { + def role0 = role0Status.key + OutstandingRequest request = tracker.newAARequest(role0, [host1]) + assert host1.hostname == request.hostname + assert !request.located + } + + @Test + public void testAARequestPair() throws Throwable { + def role0 = role0Status.key + OutstandingRequest request = tracker.newAARequest(role0, [host1, host2]) + assert host1.hostname == request.hostname + assert !request.located + def yarnRequest = request.buildContainerRequest( + role0Status.copyResourceRequirements(new MockResource(0, 0)), + role0Status, + 0) + assert !yarnRequest.relaxLocality + assert !request.mayEscalate() + + assert yarnRequest.nodes.size() == 2 + } /** * Create a new request (always against host1) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy index 14ac32a..2f160cb 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy @@ -87,7 +87,7 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest { @Test public void testRequestedNodeOffList() throws Throwable { - NodeInstance ni = roleHistory.findNodeForNewInstance(roleStatus) + NodeInstance ni = roleHistory.findRecentNodeForNewInstance(roleStatus) assert age3Active0 == ni assertListEquals([age2Active0], roleHistory.cloneRecentNodeList(0)) roleHistory.requestInstanceOnNode(ni, @@ -106,7 +106,7 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest { recordAsFailed(age2Active0, 0, 4) assert age2Active0.isConsideredUnreliable(0, roleStatus.nodeFailureThreshold) // expect to get a null node back - NodeInstance ni = roleHistory.findNodeForNewInstance(roleStatus) + NodeInstance ni = roleHistory.findRecentNodeForNewInstance(roleStatus) assert !ni // which is translated to a no-location request @@ -123,7 +123,7 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest { assert !age3Active0.isConsideredUnreliable(0, roleStatus.nodeFailureThreshold) assert !roleHistory.cloneRecentNodeList(0).empty // looking for a node should now find one - ni = roleHistory.findNodeForNewInstance(roleStatus) + ni = roleHistory.findRecentNodeForNewInstance(roleStatus) assert ni == age3Active0 req = roleHistory.requestInstanceOnNode(ni, roleStatus, resource).issuedRequest assert 1 == req.nodes.size() @@ -153,13 +153,13 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest { assert recentRole0.indexOf(age3Active0) < recentRole0.indexOf(age2Active0) // the non-strict role has no suitable nodes - assert null == roleHistory.findNodeForNewInstance(role0Status) + assert null == roleHistory.findRecentNodeForNewInstance(role0Status) - def ni = roleHistory.findNodeForNewInstance(targetRole) + def ni = roleHistory.findRecentNodeForNewInstance(targetRole) assert ni - def ni2 = roleHistory.findNodeForNewInstance(targetRole) + def ni2 = roleHistory.findRecentNodeForNewInstance(targetRole) assert ni2 assert ni != ni2 } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7899f59a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy index 14e556a..e1660ee 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy @@ -42,8 +42,10 @@ import org.apache.slider.server.appmaster.state.ContainerAssignment import org.apache.slider.server.appmaster.state.ContainerOutcome import org.apache.slider.server.appmaster.state.NodeEntry import org.apache.slider.server.appmaster.state.NodeInstance +import org.apache.slider.server.appmaster.state.ProviderAppState import org.apache.slider.server.appmaster.state.RoleInstance import org.apache.slider.server.appmaster.state.RoleStatus +import org.apache.slider.server.appmaster.state.StateAccessForProviders import org.apache.slider.test.SliderTestBase import org.junit.Before @@ -59,6 +61,7 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles protected Path historyPath; protected MockApplicationId applicationId; protected MockApplicationAttemptId applicationAttemptId; + protected StateAccessForProviders stateAccess /** * Override point: called in setup() to create the YARN engine; can @@ -97,6 +100,7 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles fs.delete(historyPath, true) appState = new MockAppState() appState.buildInstance(buildBindingInfo()) + stateAccess = new ProviderAppState(testName, appState) } /**
