SLIDER-799 AM-managed-placement. Starting to implement this; saving current work
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/b2b8750e Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/b2b8750e Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/b2b8750e Branch: refs/heads/feature/SLIDER-799-AM-managed-relax Commit: b2b8750ee28688b4b07055086d68ff8edf043f2b Parents: 8c8682f Author: Steve Loughran <[email protected]> Authored: Tue Mar 3 16:01:10 2015 -0800 Committer: Steve Loughran <[email protected]> Committed: Tue Mar 3 16:01:10 2015 -0800 ---------------------------------------------------------------------- .../org/apache/slider/api/ResourceKeys.java | 18 ++++ .../apache/slider/providers/ProviderRole.java | 24 ++++- .../slideram/SliderAMClientProvider.java | 3 +- .../slider/server/appmaster/state/AppState.java | 15 ++-- .../appmaster/state/OutstandingRequest.java | 7 +- .../state/OutstandingRequestTracker.java | 93 ++++++++++++++------ .../server/appmaster/state/RoleHistory.java | 4 +- 7 files changed, 125 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b2b8750e/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java b/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java index 50ca82f..ce2a54f 100644 --- a/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java +++ b/slider-core/src/main/java/org/apache/slider/api/ResourceKeys.java @@ -140,6 +140,24 @@ public interface ResourceKeys { */ int DEFAULT_NODE_FAILURE_THRESHOLD = 3; + + /** + * Time in seconds to relax placement delay + */ + String PLACEMENT_RELAX_DELAY = + "yarn.placement.relax.delay.seconds"; + + /** + * Time to have a strict placement policy outstanding before + * downgrading to a lax placement (for those components which permit that). + * <ol> + * <li>For strictly placed components, there's no relaxation.</li> + * <li>For components with no locality, there's no need to relax</li> + * </ol> + * + */ + int DEFAULT_PLACEMENT_RELAX_DELAY_SECONDS = 30; + /** * Log aggregation include, exclude patterns */ http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b2b8750e/slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java b/slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java index 17124d2..7caae48 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java +++ b/slider-core/src/main/java/org/apache/slider/providers/ProviderRole.java @@ -31,16 +31,36 @@ public final class ProviderRole { public final int id; public final int placementPolicy; public final int nodeFailureThreshold; + public final long placementTimeoutSeconds; public ProviderRole(String name, int id) { - this(name, id, PlacementPolicy.DEFAULT, ResourceKeys.DEFAULT_NODE_FAILURE_THRESHOLD); + this(name, + id, + PlacementPolicy.DEFAULT, + ResourceKeys.DEFAULT_NODE_FAILURE_THRESHOLD, + ResourceKeys.DEFAULT_PLACEMENT_RELAX_DELAY_SECONDS); } - public ProviderRole(String name, int id, int policy, int nodeFailureThreshold) { + /** + * Create a provider role + * @param name role/component name + * @param id ID. This becomes the YARN priority + * @param policy placement policy + * @param nodeFailureThreshold threshold for node failures (within a reset interval) + * after which a node failure is considered an app failure + * @param placementTimeoutSeconds for lax placement, timeout in seconds before + * a relaxed placement request is generated. + */ + public ProviderRole(String name, + int id, + int policy, + int nodeFailureThreshold, + long placementTimeoutSeconds) { this.name = name; this.id = id; this.placementPolicy = policy; this.nodeFailureThreshold = nodeFailureThreshold; + this.placementTimeoutSeconds = placementTimeoutSeconds; } @Override http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b2b8750e/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMClientProvider.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMClientProvider.java b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMClientProvider.java index 1666c84..abb9648 100644 --- a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMClientProvider.java +++ b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMClientProvider.java @@ -89,7 +89,8 @@ public class SliderAMClientProvider extends AbstractClientProvider public static final ProviderRole APPMASTER = new ProviderRole(COMPONENT_AM, KEY_AM, PlacementPolicy.EXCLUDE_FROM_FLEXING, - ResourceKeys.DEFAULT_NODE_FAILURE_THRESHOLD); + ResourceKeys.DEFAULT_NODE_FAILURE_THRESHOLD, + 0); /** * Initialize role list http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b2b8750e/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 9e2d2cf..4713ef1 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -630,7 +630,6 @@ public class AppState { * @return a new provider role * @throws BadConfigException bad configuration */ - @VisibleForTesting public ProviderRole createDynamicProviderRole(String name, MapOperations component) throws BadConfigException { @@ -639,13 +638,19 @@ public class AppState { ResourceKeys.COMPONENT_PRIORITY, priOpt, 0, 1, -1); String placementOpt = component.getOption( - ResourceKeys.COMPONENT_PLACEMENT_POLICY, + ResourceKeys.COMPONENT_PLACEMENT_POLICY, Integer.toString(PlacementPolicy.DEFAULT)); int placement = SliderUtils.parseAndValidate("value of " + name + " " + - ResourceKeys.COMPONENT_PLACEMENT_POLICY, + ResourceKeys.COMPONENT_PLACEMENT_POLICY, placementOpt, 0, 0, -1); - ProviderRole newRole = new ProviderRole(name, priority, placement, - getNodeFailureThresholdForRole(name)); + int placementTimeout = + component.getOptionInt(ResourceKeys.PLACEMENT_RELAX_DELAY, + ResourceKeys.DEFAULT_PLACEMENT_RELAX_DELAY_SECONDS); + ProviderRole newRole = new ProviderRole(name, + priority, + placement, + getNodeFailureThresholdForRole(name), + placementTimeout); log.info("New {} ", newRole); return newRole; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b2b8750e/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java index 2bc211a..3563a29 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java @@ -92,10 +92,13 @@ public final class OutstandingRequest { /** * Build a container request. * If the request has an address, it is set in the container request - * (with a flag to enable relaxed priorities) + * (with a flag to enable relaxed priorities). + * <p> + * This operation sets the requested time flag, used for tracking timeouts + * on outstanding requests * @param resource resource * @param role role - * @param time: time to record + * @param time time to record as request time * @param labelExpression label to satisfy * @return the request to raise */ http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b2b8750e/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java index d847962..f6dc2de 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java @@ -33,6 +33,7 @@ import java.util.Map; /** * Tracks outstanding requests made with a specific placement option. + * <p> * If an allocation comes in that is not in the map: either the allocation * was unplaced, or the placed allocation could not be met on the specified * host, and the RM/scheduler fell back to another location. @@ -42,13 +43,14 @@ public class OutstandingRequestTracker { protected static final Logger log = LoggerFactory.getLogger(OutstandingRequestTracker.class); - private Map<OutstandingRequest, OutstandingRequest> requests = - new HashMap<OutstandingRequest, OutstandingRequest>(); + private Map<OutstandingRequest, OutstandingRequest> placedRequests = + new HashMap<>(); /** * Create a new request for the specific role. If a * location is set, the request is added to the list of requests to track. - * if it isn't -it isn't. + * if it isn't, it is not tracked. + * <p> * This does not update the node instance's role's request count * @param instance node instance to manager * @param role role index @@ -58,7 +60,7 @@ public class OutstandingRequestTracker { OutstandingRequest request = new OutstandingRequest(role, instance); if (request.isLocated()) { - requests.put(request, request); + placedRequests.put(request, request); } return request; } @@ -70,7 +72,7 @@ public class OutstandingRequestTracker { * @return the request or null if there was no outstanding one */ public synchronized OutstandingRequest lookup(int role, String hostname) { - return requests.get(new OutstandingRequest(role, hostname)); + return placedRequests.get(new OutstandingRequest(role, hostname)); } /** @@ -79,7 +81,7 @@ public class OutstandingRequestTracker { * @return the request */ public synchronized OutstandingRequest remove(OutstandingRequest request) { - return requests.remove(request); + return placedRequests.remove(request); } /** @@ -91,7 +93,7 @@ public class OutstandingRequestTracker { */ public synchronized boolean onContainerAllocated(int role, String hostname) { OutstandingRequest request = - requests.remove(new OutstandingRequest(role, hostname)); + placedRequests.remove(new OutstandingRequest(role, hostname)); if (request == null) { return false; } else { @@ -100,14 +102,45 @@ public class OutstandingRequestTracker { } return true; } - + + /** + * Determine which host was a role type most recently used on, so that + * if a choice is made of which (potentially surplus) containers to use, + * the most recent one is picked first. This operation <i>does not</i> + * change the role history, though it queries it. + */ static class newerThan implements Comparator<Container>, Serializable { private RoleHistory rh; public newerThan(RoleHistory rh) { this.rh = rh; } - + + /** + * Get the age of a container. If it is not known in the history, + * return 0. + * @param c container + * @return age, null if + */ + private long getAgeOf(Container c) { + long age = 0; + NodeInstance node = rh.getExistingNodeInstance(c); + int role = ContainerPriority.extractRole(c); + if (node != null) { + NodeEntry nodeEntry = node.get(role); + if (nodeEntry != null) { + age = nodeEntry.getLastUsed(); + } + } + return age; + } + + /** + * Comparator: which host is more recent? + * @param c1 container 1 + * @param c2 container 2 + * @return 1 if c2 older-than c1, 0 if equal; -1 if c1 older-than c2 + */ @Override public int compare(Container c1, Container c2) { int role1 = ContainerPriority.extractRole(c1); @@ -115,9 +148,8 @@ public class OutstandingRequestTracker { if (role1 < role2) return -1; if (role1 > role2) return 1; - NodeInstance o1 = rh.getOrCreateNodeInstance(c1), o2 = rh.getOrCreateNodeInstance(c2); - long age = o1.getOrCreate(role1).getLastUsed(); - long age2 = o2.getOrCreate(role1).getLastUsed(); + long age = getAgeOf(c1); + long age2 = getAgeOf(c2); if (age > age2) { return -1; @@ -128,26 +160,28 @@ public class OutstandingRequestTracker { return 0; } } + /** * Take a list of requests and split them into specific host requests and * generic assignments. This is to give requested hosts priority * in container assignments if more come back than expected * @param rh RoleHistory instance - * @param allocatedContainers the list of allocated containers - * @param requested empty list of requested locations - * @param unrequested empty list of unrequested hosts + * @param inAllocated the list of allocated containers + * @param outRequested initially empty list of requested locations + * @param outUnrequested initially empty list of unrequested hosts */ - public synchronized void partitionRequests(RoleHistory rh, List<Container> allocatedContainers, - List<Container> requested, - List<Container> unrequested) { - Collections.sort(allocatedContainers, new newerThan(rh)); - for (Container container : allocatedContainers) { + public synchronized void partitionRequests(RoleHistory rh, + List<Container> inAllocated, + List<Container> outRequested, + List<Container> outUnrequested) { + Collections.sort(inAllocated, new newerThan(rh)); + for (Container container : inAllocated) { int role = ContainerPriority.extractRole(container); String hostname = RoleHistoryUtils.hostnameOf(container); - if (requests.containsKey(new OutstandingRequest(role, hostname))) { - requested.add(container); + if (placedRequests.containsKey(new OutstandingRequest(role, hostname))) { + outRequested.add(container); } else { - unrequested.add(container); + outUnrequested.add(container); } } } @@ -161,9 +195,9 @@ public class OutstandingRequestTracker { * @return possibly empty list of hostnames */ public synchronized List<NodeInstance> cancelOutstandingRequests(int role) { - List<NodeInstance> hosts = new ArrayList<NodeInstance>(); + List<NodeInstance> hosts = new ArrayList<>(); Iterator<Map.Entry<OutstandingRequest,OutstandingRequest>> iterator = - requests.entrySet().iterator(); + placedRequests.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry<OutstandingRequest, OutstandingRequest> next = iterator.next(); @@ -176,8 +210,13 @@ public class OutstandingRequestTracker { } return hosts; } - + + /** + * Get a list of outstanding requests. The list is cloned, but the contents + * are shared + * @return a list of the current outstanding requests + */ public synchronized List<OutstandingRequest> listOutstandingRequests() { - return new ArrayList<OutstandingRequest>(requests.values()); + return new ArrayList<>(placedRequests.values()); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b2b8750e/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java index 5509cac..eef2b8f 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -513,9 +513,9 @@ public class RoleHistory { * Request an instance on a given node. * An outstanding request is created & tracked, with the * relevant node entry for that role updated. - * + *<p> * The role status entries will also be tracked - * + * <p> * Returns the request that is now being tracked. * If the node instance is not null, it's details about the role is incremented *
