SLIDER-799 moving towards escalation; this code has a sort algorithm for the node target list that compares failure history before age -about to change this
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/559ed00a Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/559ed00a Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/559ed00a Branch: refs/heads/feature/SLIDER-799-AM-managed-relax Commit: 559ed00a6b9f3aa8c2ec6c99eb3f85e9807f63ad Parents: a0d76b7 Author: Steve Loughran <[email protected]> Authored: Mon Mar 16 16:35:46 2015 +0000 Committer: Steve Loughran <[email protected]> Committed: Mon Mar 16 16:35:46 2015 +0000 ---------------------------------------------------------------------- .../slider/server/appmaster/state/AppState.java | 9 +-- .../MostRecentContainerReleaseSelector.java | 2 +- .../server/appmaster/state/NodeInstance.java | 62 ++++++++++++--- .../slider/server/appmaster/state/NodeMap.java | 4 +- .../appmaster/state/OutstandingRequest.java | 83 +++++++++++++++++--- .../state/OutstandingRequestTracker.java | 27 ++++--- .../appmaster/state/ProviderAppState.java | 8 +- .../server/appmaster/state/RoleHistory.java | 53 ++++++------- .../server/appmaster/state/RoleStatus.java | 4 + .../slider/server/avro/NewerFilesFirst.java | 43 ++++++++++ .../slider/server/avro/OlderFilesFirst.java | 43 ++++++++++ .../slider/server/avro/RoleHistoryWriter.java | 62 +++------------ .../TestRoleHistoryContainerEvents.groovy | 4 +- .../history/TestRoleHistoryNIComparators.groovy | 25 +++++- ...tRoleHistoryOutstandingRequestTracker.groovy | 14 ++-- .../history/TestRoleHistoryRWOrdering.groovy | 3 +- 16 files changed, 305 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 4713ef1..f2c237c 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -745,7 +745,7 @@ public class AppState { */ private List<ProviderRole> buildRoleRequirementsFromResources() throws BadConfigException { - List<ProviderRole> newRoles = new ArrayList<ProviderRole>(0); + List<ProviderRole> newRoles = new ArrayList<>(0); //now update every role's desired count. //if there are no instance values, that role count goes to zero @@ -992,7 +992,7 @@ public class AppState { public synchronized List<RoleInstance> cloneLiveContainerInfoList() { List<RoleInstance> allRoleInstances; Collection<RoleInstance> values = getLiveNodes().values(); - allRoleInstances = new ArrayList<RoleInstance>(values); + allRoleInstances = new ArrayList<>(values); return allRoleInstances; } @@ -1220,7 +1220,6 @@ public class AppState { AMRMClient.ContainerRequest request; request = roleHistory.requestNode(role, resource, labelExpression); incrementRequestCount(role); - return request; } @@ -1509,7 +1508,7 @@ public class AppState { actual, releasing, completedCount); - roleHistory.onReleaseCompleted(container, true); + roleHistory.onReleaseCompleted(container); } else if (surplusNodes.remove(containerId)) { //its a surplus one being purged @@ -1873,7 +1872,7 @@ public class AppState { @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") private List<AbstractRMOperation> reviewOneRole(RoleStatus role) throws SliderInternalStateException, TriggerClusterTeardownException { - List<AbstractRMOperation> operations = new ArrayList<AbstractRMOperation>(); + List<AbstractRMOperation> operations = new ArrayList<>(); int delta; String details; int expected; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java index 841dda3..9d936a1 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/MostRecentContainerReleaseSelector.java @@ -40,7 +40,7 @@ public class MostRecentContainerReleaseSelector implements ContainerReleaseSelec private static class newerThan implements Comparator<RoleInstance>, Serializable { private final Comparator<Long> innerComparator = - new Comparators.ComparatorReverser<Long>(new Comparators.LongComparator()); + new Comparators.ComparatorReverser<>(new Comparators.LongComparator()); public int compare(RoleInstance o1, RoleInstance o2) { return innerComparator.compare(o1.createTime, o2.createTime); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java index 231865e..71b74fc 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeInstance.java @@ -96,6 +96,22 @@ public class NodeInstance { } /** + * Query for a node being considered unreliable + * @param role role key + * @param threshold threshold above which a node is considered unreliable + * @return true if the node is considered unreliable + */ + public boolean isConsideredUnreliable(int role, int threshold) { + NodeEntry entry = get(role); + + if (entry != null) { + return entry.getFailedRecently() > threshold; + } else { + return false; + } + } + + /** * Get the entry for a role -and remove it if present * @param role the role index * @return the entry that WAS there @@ -113,6 +129,7 @@ public class NodeInstance { nodeEntries.add(nodeEntry); } + /** * run through each entry; gc'ing & removing old ones * @param absoluteTime age in millis @@ -185,28 +202,49 @@ public class NodeInstance { } /** - * A comparator for sorting entries where the role is newer than - * the other. - * This sort only compares the lastUsed field, not whether the - * node is in use or not + * A comparator for sorting entries where the node is preferred over another. + * If there's no entry for an element then it's failure count is set to -1, age to 0 + * for the purposes of the comparison + * <ol> + * <li>Entry exists => end of list as unknown</li> + * <li>Lowest failure count</li> + * <li>Age</li> + * </ol> + * + * @return +ve int if left is preferred to right; -ve if right over left, 0 for equal */ - public static class newerThan implements Comparator<NodeInstance>, + public static class Preferred implements Comparator<NodeInstance>, Serializable { final int role; - public newerThan(int role) { + public Preferred(int role) { this.role = role; } @Override public int compare(NodeInstance o1, NodeInstance o2) { - long age = o1.getOrCreate(role).getLastUsed(); - long age2 = o2.getOrCreate(role).getLastUsed(); + NodeEntry left = o1.get(role); + NodeEntry right = o2.get(role); + + // sort by failure count first + int failL = left != null ? left.getFailedRecently() : -1; + int failR = right != null ? right.getFailedRecently() : -1; + + if (failL < failR) { + return 1; + } + if (failR > failL) { + return -1; + } + + // failure counts are equal: compare age + long ageL = left != null ? left.getLastUsed() : 0; + long ageR = right != null ? right.getLastUsed() : 0; - if (age > age2) { + if (ageL > ageR) { return -1; - } else if (age < age2) { + } else if (ageL < ageR) { return 1; } // equal @@ -220,12 +258,12 @@ public class NodeInstance { * This sort only compares the lastUsed field, not whether the * node is in use or not */ - public static class moreActiveThan implements Comparator<NodeInstance>, + public static class MoreActiveThan implements Comparator<NodeInstance>, Serializable { final int role; - public moreActiveThan(int role) { + public MoreActiveThan(int role) { this.role = role; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java index fe40086..aa50baa 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/NodeMap.java @@ -73,13 +73,13 @@ public class NodeMap extends HashMap<String, NodeInstance> { * in that role */ public List<NodeInstance> listActiveNodes(int role) { - List<NodeInstance> nodes = new ArrayList<NodeInstance>(); + List<NodeInstance> nodes = new ArrayList<>(); for (NodeInstance instance : values()) { if (instance.getActiveRoleInstances(role) > 0) { nodes.add(instance); } } - Collections.sort(nodes, new NodeInstance.moreActiveThan(role)); + Collections.sort(nodes, new NodeInstance.MoreActiveThan(role)); return nodes; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java index c6a9cff..f6b83a7 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java @@ -19,12 +19,17 @@ package org.apache.slider.server.appmaster.state; +import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + /** * Tracks an outstanding request. This is used to correlate an allocation response * with the node and role used in the request. @@ -50,18 +55,35 @@ public final class OutstandingRequest { * Node the request is for -may be null */ public final NodeInstance node; + /** * hostname -will be null if node==null */ public final String hostname; /** - * requested time -only valid after {@link #buildContainerRequest(Resource, RoleStatus, long, String)} + * Requested time in millis. + * <p> + * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long, String)} + */ + public AMRMClient.ContainerRequest issuedRequest; + + /** + * Requested time in millis. + * <p> + * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long, String)} + */ + public long requestedTimeMillis; + + /** + * Time in millis after which escalation should be triggered.. + * <p> + * Only valid after {@link #buildContainerRequest(Resource, RoleStatus, long, String)} */ - public long requestedTime; + public long escalationTimeoutMillis; /** - * Has the placement request been escalated by cancel and re-request + * Has the placement request been escalated? */ public boolean escalated; @@ -91,9 +113,14 @@ public final class OutstandingRequest { this.hostname = hostname; } + /** + * Is the request located in the cluster, that is: does it have a node. + * @return + */ public boolean isLocated() { return node != null; } + /** * Build a container request. * If the request has an address, it is set in the container request @@ -103,7 +130,7 @@ public final class OutstandingRequest { * on outstanding requests * @param resource resource * @param role role - * @param time time to record as request time + * @param time time in millis to record as request time * @param labelExpression label to satisfy * @return the request to raise */ @@ -111,7 +138,8 @@ public final class OutstandingRequest { Resource resource, RoleStatus role, long time, String labelExpression) { String[] hosts; boolean relaxLocality; - requestedTime = time; + requestedTimeMillis = time; + escalationTimeoutMillis = time + role.getPlacementTimeoutSeconds() * 1000; boolean usePlacementHistory = role.isStrictPlacement(); if (!usePlacementHistory) { // If strict placement does not mandate using placement then check @@ -133,23 +161,56 @@ public final class OutstandingRequest { // tell the node it is in play node.getOrCreate(roleId); log.info("Submitting request for container on {}", hosts[0]); + escalated = false; } else { + // the placement is implicitly escalated. + escalated = true; hosts = null; relaxLocality = true; } - Priority pri = ContainerPriority.createPriority(roleId, - !relaxLocality); - AMRMClient.ContainerRequest request = - new AMRMClient.ContainerRequest(resource, + Priority pri = ContainerPriority.createPriority(roleId, !relaxLocality); + issuedRequest = new AMRMClient.ContainerRequest(resource, hosts, null, pri, relaxLocality, labelExpression); - return request; + return issuedRequest; } + + /** + * Build an escalated container request, updating {@link #issuedRequest} with + * the new value. + * @return the new container request, which has the same resource and label requirements + * as the original one, and the same host, but: relaxed placement, and a changed priority + * so as to place it into the relaxed list. + */ + public AMRMClient.ContainerRequest buildEscalatedContainerRequest() { + escalated = true; + Preconditions.checkNotNull(issuedRequest, "issued request"); + Priority pri = ContainerPriority.createPriority(roleId, true); + String[] nodes; + List<String> issuedRequestNodes = issuedRequest.getNodes(); + if (issuedRequestNodes != null) { + nodes = issuedRequestNodes.toArray(new String[issuedRequestNodes.size()]); + } else { + nodes = null; + } + + + AMRMClient.ContainerRequest newRequest = + new AMRMClient.ContainerRequest(issuedRequest.getCapability(), + nodes, + null, + pri, + true, + issuedRequest.getNodeLabelExpression()); + issuedRequest = newRequest; + return issuedRequest; + } + /** * Mark the request as completed (or canceled). */ @@ -202,7 +263,7 @@ public final class OutstandingRequest { new StringBuilder("OutstandingRequest{"); sb.append("roleId=").append(roleId); sb.append(", node='").append(node).append('\''); - sb.append(", requestedTime=").append(requestedTime); + sb.append(", requestedTime=").append(requestedTimeMillis); sb.append('}'); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java index f6dc2de..e197a86 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java @@ -34,6 +34,11 @@ import java.util.Map; /** * Tracks outstanding requests made with a specific placement option. * <p> + * <ol> + * <li>Used to decide when to return a node to 'can request containers here' list</li> + * <li>Used to identify requests where placement has timed out, and so issue relaxed requests</li> + * </ol> + * <p> * If an allocation comes in that is not in the map: either the allocation * was unplaced, or the placed allocation could not be met on the specified * host, and the RM/scheduler fell back to another location. @@ -56,7 +61,7 @@ public class OutstandingRequestTracker { * @param role role index * @return a new request */ - public synchronized OutstandingRequest addRequest(NodeInstance instance, int role) { + public synchronized OutstandingRequest newRequest(NodeInstance instance, int role) { OutstandingRequest request = new OutstandingRequest(role, instance); if (request.isLocated()) { @@ -69,7 +74,7 @@ public class OutstandingRequestTracker { * Look up any oustanding request to a (role, hostname). * @param role role index * @param hostname hostname - * @return the request or null if there was no outstanding one + * @return the request or null if there was no outstanding one in the {@link #placedRequests} */ public synchronized OutstandingRequest lookup(int role, String hostname) { return placedRequests.get(new OutstandingRequest(role, hostname)); @@ -78,7 +83,7 @@ public class OutstandingRequestTracker { /** * Remove a request * @param request matching request to find - * @return the request + * @return the request or null for no match in the {@link #placedRequests} */ public synchronized OutstandingRequest remove(OutstandingRequest request) { return placedRequests.remove(request); @@ -86,10 +91,10 @@ public class OutstandingRequestTracker { /** * Notification that a container has been allocated -drop it - * from the list of outstanding roles if need be + * from the {@link #placedRequests} structure. * @param role role index * @param hostname hostname - * @return true if an entry was found and dropped + * @return true if an entry was found and removed */ public synchronized boolean onContainerAllocated(int role, String hostname) { OutstandingRequest request = @@ -167,21 +172,21 @@ public class OutstandingRequestTracker { * in container assignments if more come back than expected * @param rh RoleHistory instance * @param inAllocated the list of allocated containers - * @param outRequested initially empty list of requested locations - * @param outUnrequested initially empty list of unrequested hosts + * @param outPlaceRequested initially empty list of requested locations + * @param outUnplaced initially empty list of unrequested hosts */ public synchronized void partitionRequests(RoleHistory rh, List<Container> inAllocated, - List<Container> outRequested, - List<Container> outUnrequested) { + List<Container> outPlaceRequested, + List<Container> outUnplaced) { Collections.sort(inAllocated, new newerThan(rh)); for (Container container : inAllocated) { int role = ContainerPriority.extractRole(container); String hostname = RoleHistoryUtils.hostnameOf(container); if (placedRequests.containsKey(new OutstandingRequest(role, hostname))) { - outRequested.add(container); + outPlaceRequested.add(container); } else { - outUnrequested.add(container); + outUnplaced.add(container); } } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java index 632551e..0508579 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ProviderAppState.java @@ -47,7 +47,7 @@ public class ProviderAppState implements StateAccessForProviders { private final Map<String, PublishedConfigSet> publishedConfigSets = - new ConcurrentHashMap<String, PublishedConfigSet>(5); + new ConcurrentHashMap<>(5); private final PublishedExportsSet publishedExportsSets = new PublishedExportsSet(); private static final PatternValidator validator = new PatternValidator( RestPaths.PUBLISHED_CONFIGURATION_SET_REGEXP); @@ -106,7 +106,7 @@ public class ProviderAppState implements StateAccessForProviders { public List<String> listConfigSets() { synchronized (publishedConfigSets) { - List<String> sets = new ArrayList<String>(publishedConfigSets.keySet()); + List<String> sets = new ArrayList<>(publishedConfigSets.keySet()); return sets; } } @@ -266,7 +266,7 @@ public class ProviderAppState implements StateAccessForProviders { RoleStatus roleStatus = lookupRoleStatus(component); List<RoleInstance> ownedContainerList = cloneOwnedContainerList(); List<RoleInstance> matching = - new ArrayList<RoleInstance>(ownedContainerList.size()); + new ArrayList<>(ownedContainerList.size()); int roleId = roleStatus.getPriority(); for (RoleInstance instance : ownedContainerList) { if (instance.roleId == roleId) { @@ -281,7 +281,7 @@ public class ProviderAppState implements StateAccessForProviders { RoleStatus roleStatus = lookupRoleStatus(component); ComponentInformation info = roleStatus.serialize(); List<RoleInstance> containers = lookupRoleContainers(component); - info.containers = new ArrayList<String>(containers.size()); + info.containers = new ArrayList<>(containers.size()); for (RoleInstance container : containers) { info.containers.add(container.id); } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java index eef2b8f..acfe606 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -94,10 +94,8 @@ public class RoleHistory { * Track the failed nodes. Currently used to make wiser decision of container * ask with/without locality. Has other potential uses as well. */ - private Set<String> failedNodes = new HashSet<String>(); - - // dummy to be used in maps for faster lookup where we don't care about values - private final Object DUMMY_VALUE = new Object(); + private Set<String> failedNodes = new HashSet<>(); + public RoleHistory(List<ProviderRole> providerRoles) throws BadConfigException { @@ -117,7 +115,7 @@ public class RoleHistory { outstandingRequests = new OutstandingRequestTracker(); - Map<Integer, RoleStatus> roleStats = new HashMap<Integer, RoleStatus>(); + Map<Integer, RoleStatus> roleStats = new HashMap<>(); for (ProviderRole providerRole : providerRoles) { checkProviderRole(roleStats, providerRole); } @@ -156,7 +154,7 @@ public class RoleHistory { throws BadConfigException { log.debug("Validating/adding new provider role to role history: {} ", providerRole); - Map<Integer, RoleStatus> roleStats = new HashMap<Integer, RoleStatus>(); + Map<Integer, RoleStatus> roleStats = new HashMap<>(); for (ProviderRole role : providerRoles) { roleStats.put(role.id, new RoleStatus(role)); @@ -171,7 +169,7 @@ public class RoleHistory { * Clear the lists of available nodes */ private synchronized void resetAvailableNodeLists() { - availableNodes = new HashMap<Integer, LinkedList<NodeInstance>>(roleSize); + availableNodes = new HashMap<>(roleSize); } /** @@ -279,8 +277,10 @@ public class RoleHistory { } /** - * Garbage collect the structure -this will dropp - * all nodes that have been inactive since the (relative) age + * Garbage collect the structure -this will drop + * all nodes that have been inactive since the (relative) age. + * This will drop the failure counts of the nodes too, so it will + * lose information that matters. * @param age relative age */ public void gc(long age) { @@ -423,8 +423,7 @@ public class RoleHistory { public synchronized void buildAvailableNodeLists() { resetAvailableNodeLists(); // build the list of available nodes - for (Map.Entry<String, NodeInstance> entry : nodemap - .entrySet()) { + for (Map.Entry<String, NodeInstance> entry : nodemap.entrySet()) { NodeInstance ni = entry.getValue(); for (int i = 0; i < roleSize; i++) { NodeEntry nodeEntry = ni.get(i); @@ -458,13 +457,12 @@ public class RoleHistory { private LinkedList<NodeInstance> getOrCreateNodesForRoleId(int id) { LinkedList<NodeInstance> instances = availableNodes.get(id); if (instances == null) { - instances = new LinkedList<NodeInstance>(); + instances = new LinkedList<>(); availableNodes.put(id, instances); } return instances; } - /** * Sort an available node list * @param role role to sort @@ -472,7 +470,7 @@ public class RoleHistory { private void sortAvailableNodeList(int role) { List<NodeInstance> nodesForRoleId = getNodesForRoleId(role); if (nodesForRoleId != null) { - Collections.sort(nodesForRoleId, new NodeInstance.newerThan(role)); + Collections.sort(nodesForRoleId, new NodeInstance.Preferred(role)); } } @@ -492,7 +490,7 @@ public class RoleHistory { List<NodeInstance> targets = getNodesForRoleId(roleKey); if (targets == null) { // add an empty list here for ease downstream - targets = new ArrayList<NodeInstance>(0); + targets = new ArrayList<>(0); } int cnt = targets.size(); log.debug("There are {} node(s) to consider for {}", cnt, role.getName()); @@ -527,7 +525,7 @@ public class RoleHistory { */ public synchronized AMRMClient.ContainerRequest requestInstanceOnNode( NodeInstance node, RoleStatus role, Resource resource, String labelExpression) { - OutstandingRequest outstanding = outstandingRequests.addRequest(node, role.getKey()); + OutstandingRequest outstanding = outstandingRequests.newRequest(node, role.getKey()); return outstanding.buildContainerRequest(resource, role, now(), labelExpression); } @@ -622,9 +620,9 @@ public class RoleHistory { //partition into requested and unrequested List<Container> requested = - new ArrayList<Container>(allocatedContainers.size()); + new ArrayList<>(allocatedContainers.size()); List<Container> unrequested = - new ArrayList<Container>(allocatedContainers.size()); + new ArrayList<>(allocatedContainers.size()); outstandingRequests.partitionRequests(this, allocatedContainers, requested, unrequested); //give the unrequested ones lower priority @@ -647,12 +645,13 @@ public class RoleHistory { boolean requestFound = outstandingRequests.onContainerAllocated(role, hostname); if (desiredCount <= actualCount) { - //cancel the nodes + // all oustanding requests have been satisfied + // tag nodes as available List<NodeInstance> hosts = outstandingRequests.cancelOutstandingRequests(role); if (!hosts.isEmpty()) { //add the list - log.debug("Adding {} hosts for role {}", hosts.size(), role); + log.info("Adding {} hosts for role {}", hosts.size(), role); nodeInstances.addAll(hosts); sortAvailableNodeList(role); } @@ -708,8 +707,9 @@ public class RoleHistory { */ public synchronized void onNodesUpdated(List<NodeReport> updatedNodes) { for (NodeReport updatedNode : updatedNodes) { - String hostname = updatedNode.getNodeId() == null ? null : updatedNode - .getNodeId().getHost(); + String hostname = updatedNode.getNodeId() == null + ? null + : updatedNode.getNodeId().getHost(); if (hostname == null) { continue; } @@ -735,11 +735,10 @@ public class RoleHistory { /** * App state notified of a container completed * @param container completed container - * @param wasReleased * @return true if the node was queued */ - public boolean onReleaseCompleted(Container container, boolean wasReleased) { - return markContainerFinished(container, wasReleased, false); + public boolean onReleaseCompleted(Container container) { + return markContainerFinished(container, true, false); } /** @@ -832,7 +831,7 @@ public class RoleHistory { */ @VisibleForTesting public List<NodeInstance> cloneAvailableList(int role) { - return new LinkedList<NodeInstance>(getOrCreateNodesForRoleId(role)); + return new LinkedList<>(getOrCreateNodesForRoleId(role)); } /** @@ -850,7 +849,7 @@ public class RoleHistory { * @return the list */ public List<String> cloneFailedNodes() { - List<String> lst = new ArrayList<String>(); + List<String> lst = new ArrayList<>(); lst.addAll(failedNodes); return lst; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java index ac1b131..7ab7dbe 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java @@ -74,6 +74,10 @@ public final class RoleStatus implements Cloneable { return providerRole.placementPolicy; } + public long getPlacementTimeoutSeconds() { + return providerRole.placementTimeoutSeconds; + } + /** * The number of failures on a specific node that can be tolerated * before selecting a different node for placement http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/avro/NewerFilesFirst.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/avro/NewerFilesFirst.java b/slider-core/src/main/java/org/apache/slider/server/avro/NewerFilesFirst.java new file mode 100644 index 0000000..2e049cb --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/server/avro/NewerFilesFirst.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.avro; + +import org.apache.hadoop.fs.Path; + +import java.io.Serializable; +import java.util.Comparator; + +/** + * Compare two filenames by name; the more recent one comes first + */ +public class NewerFilesFirst implements Comparator<Path>, Serializable { + + /** + * Takes the ordering of path names from the normal string comparison + * and negates it, so that names that come after other names in + * the string sort come before here + * @param o1 leftmost + * @param o2 rightmost + * @return positive if o1 > o2 + */ + @Override + public int compare(Path o1, Path o2) { + return (o2.getName().compareTo(o1.getName())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/avro/OlderFilesFirst.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/avro/OlderFilesFirst.java b/slider-core/src/main/java/org/apache/slider/server/avro/OlderFilesFirst.java new file mode 100644 index 0000000..407aaa6 --- /dev/null +++ b/slider-core/src/main/java/org/apache/slider/server/avro/OlderFilesFirst.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.slider.server.avro; + +import org.apache.hadoop.fs.Path; + +import java.io.Serializable; +import java.util.Comparator; + +/** + * Compare two filenames by name; the older ones comes first + */ +public class OlderFilesFirst implements Comparator<Path>, Serializable { + + /** + * Takes the ordering of path names from the normal string comparison + * and negates it, so that names that come after other names in + * the string sort come before here + * @param o1 leftmost + * @param o2 rightmost + * @return positive if o1 > o2 + */ + @Override + public int compare(Path o1, Path o2) { + return (o1.getName().compareTo(o2.getName())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java b/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java index 422ffeb..031632b 100644 --- a/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java +++ b/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java @@ -52,11 +52,9 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.ListIterator; import java.util.Locale; @@ -90,7 +88,7 @@ public class RoleHistoryWriter { throws IOException { try { DatumWriter<RoleHistoryRecord> writer = - new SpecificDatumWriter<RoleHistoryRecord>(RoleHistoryRecord.class); + new SpecificDatumWriter<>(RoleHistoryRecord.class); int roles = history.getRoleSize(); RoleHistoryHeader header = new RoleHistoryHeader(); @@ -184,7 +182,7 @@ public class RoleHistoryWriter { BadConfigException { try { DatumReader<RoleHistoryRecord> reader = - new SpecificDatumReader<RoleHistoryRecord>(RoleHistoryRecord.class); + new SpecificDatumReader<>(RoleHistoryRecord.class); Decoder decoder = DecoderFactory.get().jsonDecoder(RoleHistoryRecord.getClassSchema(), in); @@ -351,16 +349,15 @@ public class RoleHistoryWriter { public static void sortHistoryPaths(List<Path> paths) { Collections.sort(paths, new NewerFilesFirst()); } - - + /** * Iterate through the paths until one can be loaded * @param roleHistory role history * @param paths paths to load * @return the path of any loaded history -or null if all failed to load */ - public Path attemptToReadHistory(RoleHistory roleHistory, FileSystem fileSystem, List<Path> paths) throws - BadConfigException { + public Path attemptToReadHistory(RoleHistory roleHistory, FileSystem fileSystem, List<Path> paths) + throws BadConfigException { ListIterator<Path> pathIterator = paths.listIterator(); boolean success = false; Path path = null; @@ -389,9 +386,8 @@ public class RoleHistoryWriter { * @throws IOException if indexing the history directory fails. */ public Path loadFromHistoryDir(FileSystem fs, Path dir, - RoleHistory roleHistory) throws - IOException, - BadConfigException { + RoleHistory roleHistory) + throws IOException, BadConfigException { assert fs != null: "null filesystem"; List<Path> entries = findAllHistoryEntries(fs, dir, false); return attemptToReadHistory(roleHistory, fs, entries); @@ -407,9 +403,8 @@ public class RoleHistoryWriter { * check to stop the entire dir being purged) * @throws IOException IO problems */ - public int purgeOlderHistoryEntries(FileSystem fileSystem, Path keep) throws - IOException { - assert fileSystem != null : "null filesystem"; + public int purgeOlderHistoryEntries(FileSystem fileSystem, Path keep) + throws IOException { assert fileSystem != null : "null filesystem"; if (!fileSystem.exists(keep)) { throw new FileNotFoundException(keep.toString()); } @@ -429,44 +424,5 @@ public class RoleHistoryWriter { } return deleteCount; } - - /** - * Compare two filenames by name; the more recent one comes first - */ - public static class NewerFilesFirst implements Comparator<Path> , - Serializable { - /** - * Takes the ordering of path names from the normal string comparison - * and negates it, so that names that come after other names in - * the string sort come before here - * @param o1 leftmost - * @param o2 rightmost - * @return positive if o1 > o2 - */ - @Override - public int compare(Path o1, Path o2) { - return (o2.getName().compareTo(o1.getName())); - } - } - /** - * Compare two filenames by name; the older ones comes first - */ - public static class OlderFilesFirst implements Comparator<Path> , - Serializable { - - /** - * Takes the ordering of path names from the normal string comparison - * and negates it, so that names that come after other names in - * the string sort come before here - * @param o1 leftmost - * @param o2 rightmost - * @return positive if o1 > o2 - */ - @Override - public int compare(Path o1, Path o2) { - return (o1.getName().compareTo(o2.getName())); - } - } - } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy index dbb70fa..c7a38f5 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy @@ -18,8 +18,6 @@ package org.apache.slider.server.appmaster.model.history -import java.util.List; - import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import org.apache.hadoop.yarn.api.records.Container @@ -164,7 +162,7 @@ class TestRoleHistoryContainerEvents extends BaseMockAppStateTest { assert roleEntry.active == 0 // release completed - roleHistory.onReleaseCompleted(container, true) + roleHistory.onReleaseCompleted(container) assert roleEntry.releasing == 0 assert roleEntry.live == 0 assert roleEntry.active == 0 http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy index 612cce8..74dfd42 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryNIComparators.groovy @@ -21,6 +21,7 @@ package org.apache.slider.server.appmaster.model.history import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest import org.apache.slider.server.appmaster.model.mock.MockFactory import org.apache.slider.server.appmaster.state.NodeInstance +import org.junit.Before import org.junit.Test /** @@ -33,9 +34,17 @@ class TestRoleHistoryNIComparators extends BaseMockAppStateTest { NodeInstance age3Active0 = nodeInstance(1002, 0, 0, 0) NodeInstance age4Active1 = nodeInstance(1005, 0, 0, 0) NodeInstance empty = new NodeInstance("empty", MockFactory.ROLE_COUNT) + NodeInstance age6failing = nodeInstance(1006, 0, 0, 0) + NodeInstance age1failing = nodeInstance(1000, 0, 0, 0) List<NodeInstance> nodes = [age2Active2, age4Active1, age1Active4, age3Active0] + @Before + public void setup() { + age6failing.get(0).failedRecently = 2; + age1failing.get(0).failedRecently = 1; + } + @Override String getTestName() { return "TestNIComparators" @@ -44,16 +53,24 @@ class TestRoleHistoryNIComparators extends BaseMockAppStateTest { @Test public void testNewerThan() throws Throwable { - Collections.sort(nodes, new NodeInstance.newerThan(0)) + Collections.sort(nodes, new NodeInstance.Preferred(0)) assertListEquals(nodes, [age4Active1, age3Active0, age2Active2, age1Active4]) } @Test + public void testFailureCountFirst() throws Throwable { + def preferred = new NodeInstance.Preferred(0) + assert preferred.compare(age6failing, age1failing) == -1 + assert preferred.compare(age1failing, age6failing) == 1 + assert preferred.compare(age1failing, age1failing) == 0 + } + + @Test public void testNewerThanNoRole() throws Throwable { nodes << empty - Collections.sort(nodes, new NodeInstance.newerThan(0)) + Collections.sort(nodes, new NodeInstance.Preferred(0)) assertListEquals(nodes, [age4Active1, age3Active0, age2Active2, age1Active4, empty]) } @@ -61,7 +78,7 @@ class TestRoleHistoryNIComparators extends BaseMockAppStateTest { @Test public void testMoreActiveThan() throws Throwable { - Collections.sort(nodes, new NodeInstance.moreActiveThan(0)) + Collections.sort(nodes, new NodeInstance.MoreActiveThan(0)) assertListEquals(nodes, [age1Active4, age2Active2, age4Active1, age3Active0],) } @@ -69,7 +86,7 @@ class TestRoleHistoryNIComparators extends BaseMockAppStateTest { @Test public void testMoreActiveThanEmpty() throws Throwable { nodes << empty - Collections.sort(nodes, new NodeInstance.moreActiveThan(0)) + Collections.sort(nodes, new NodeInstance.MoreActiveThan(0)) assertListEquals(nodes, [age1Active4, age2Active2, age4Active1, age3Active0, empty]) } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy index 7085678..79a9bd6 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy @@ -38,7 +38,7 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { @Test public void testAddRetrieveEntry() throws Throwable { - OutstandingRequest request = tracker.addRequest(host1, 0) + OutstandingRequest request = tracker.newRequest(host1, 0) assert tracker.lookup(0, "host1").equals(request) assert tracker.remove(request).equals(request) assert !tracker.lookup(0, "host1") @@ -46,9 +46,9 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { @Test public void testAddCompleteEntry() throws Throwable { - tracker.addRequest(host1, 0) - tracker.addRequest(host2, 0) - tracker.addRequest(host1, 1) + tracker.newRequest(host1, 0) + tracker.newRequest(host2, 0) + tracker.newRequest(host1, 1) assert tracker.onContainerAllocated(1, "host1") assert !tracker.lookup(1, "host1") assert tracker.lookup(0, "host1") @@ -56,9 +56,9 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { @Test public void testCancelEntries() throws Throwable { - OutstandingRequest r1 = tracker.addRequest(host1, 0) - OutstandingRequest r2 = tracker.addRequest(host2, 0) - OutstandingRequest r3 = tracker.addRequest(host1, 1) + OutstandingRequest r1 = tracker.newRequest(host1, 0) + OutstandingRequest r2 = tracker.newRequest(host2, 0) + OutstandingRequest r3 = tracker.newRequest(host1, 1) List<NodeInstance> canceled = tracker.cancelOutstandingRequests(0) assert canceled.size() == 2 assert canceled.contains(host1) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/559ed00a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy index a0663e8..aef22fb 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRWOrdering.groovy @@ -26,6 +26,7 @@ import org.apache.slider.server.appmaster.model.mock.MockFactory import org.apache.slider.server.appmaster.state.NodeEntry import org.apache.slider.server.appmaster.state.NodeInstance import org.apache.slider.server.appmaster.state.RoleHistory +import org.apache.slider.server.avro.NewerFilesFirst import org.apache.slider.server.avro.RoleHistoryWriter import org.junit.Test @@ -119,7 +120,7 @@ class TestRoleHistoryRWOrdering extends BaseMockAppStateTest { @Test public void testPathnameComparator() throws Throwable { - def newerName = new RoleHistoryWriter.NewerFilesFirst() + def newerName = new NewerFilesFirst() log.info("$h_5fffa name is ${h_5fffa.getName()}") log.info("$h_0406c name is ${h_0406c.getName()}")
