SLIDER--799 SLIDER-828 when containers are allocated, explicitly cancel the request
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/1938323c Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/1938323c Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/1938323c Branch: refs/heads/feature/SLIDER-799-AM-managed-relax Commit: 1938323c7fef2666a429a06a55208a528e93f64b Parents: 7d9a9e9 Author: Steve Loughran <[email protected]> Authored: Tue Mar 24 13:44:09 2015 +0000 Committer: Steve Loughran <[email protected]> Committed: Tue Mar 24 13:49:14 2015 +0000 ---------------------------------------------------------------------- .../server/appmaster/SliderAppMaster.java | 2 +- .../appmaster/management/MetricsConstants.java | 33 ++++- .../operations/AsyncRMOperationHandler.java | 6 +- .../operations/CancelRequestOperation.java | 14 +- .../slider/server/appmaster/state/AppState.java | 147 +++++++++---------- .../server/appmaster/state/RoleHistory.java | 2 +- .../TestMockAppStateRebuildOnAMRestart.groovy | 4 +- 7 files changed, 124 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index ab6b55c..b8584f7 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -1840,7 +1840,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService rmOperationHandler.cancelSingleRequest(request); } - /* =================================================================== */ +/* =================================================================== */ /* END */ /* =================================================================== */ http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java index e55cf60..31a82a3 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/management/MetricsConstants.java @@ -22,6 +22,35 @@ package org.apache.slider.server.appmaster.management; * Constants used in slider for metrics registration and lookup */ public class MetricsConstants { - public static final String CONTAINERS_OUTSTANDING_REQUESTS = - "containers.outstanding-requests"; + + /** + * {@value} + */ + public static final String CONTAINERS_OUTSTANDING_REQUESTS = "containers.outstanding-requests"; + + /** + * {@value} + */ + public static final String CONTAINERS_STARTED = "containers.started"; + + /** + * {@value} + */ + public static final String CONTAINERS_SURPLUS = "containers.surplus"; + + /** + * {@value} + */ + public static final String CONTAINERS_COMPLETED = "containers.completed"; + + /** + * {@value} + */ + public static final String CONTAINERS_FAILED = "containers.failed"; + + /** + * {@value} + */ + public static final String CONTAINERS_START_FAILED = "containers.start-failed"; + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java index 7c98551..11afc0e 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AsyncRMOperationHandler.java @@ -50,8 +50,10 @@ public class AsyncRMOperationHandler extends RMOperationHandler { // need to revoke a previously issued container request // so enum the sets and pick some int remaining = cancelSinglePriorityRequests(priority1, count); - remaining = cancelSinglePriorityRequests(priority2, remaining); - + if (priority2 != null) { + remaining = cancelSinglePriorityRequests(priority2, remaining); + } + return remaining; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java index 9e9f277..754bf28 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java @@ -18,11 +18,12 @@ package org.apache.slider.server.appmaster.operations; +import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.slider.server.appmaster.state.ContainerPriority; /** - * Cancel a container request + * Cancel a container request at the given priority/proirities. */ public class CancelRequestOperation extends AbstractRMOperation { @@ -30,7 +31,15 @@ public class CancelRequestOperation extends AbstractRMOperation { private final Priority priority2; private final int count; + /** + * Create an instance + * @param priority1 first priority, the one that is released first + * @param priority2 optional second priority + * @param count number of requests to cancel + */ public CancelRequestOperation(Priority priority1, Priority priority2, int count) { + Preconditions.checkArgument(priority1 != null, "null priority"); + Preconditions.checkArgument(count >= 0, "negative count"); this.priority1 = priority1; this.priority2 = priority2; this.count = count; @@ -45,7 +54,8 @@ public class CancelRequestOperation extends AbstractRMOperation { public String toString() { return "release " + count + " requests for " + ContainerPriority.toString(priority1) - + " and " + ContainerPriority.toString(priority2); + + (priority2 != null ? + (" and " + ContainerPriority.toString(priority2)) : ""); } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 20e2fc0..34b0492 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -170,7 +170,7 @@ public class AppState { * Client properties created via the provider -static for the life * of the application */ - private Map<String, String> clientProperties = new HashMap<String, String>(); + private Map<String, String> clientProperties = new HashMap<>(); /** * This is a template of the cluster status @@ -196,7 +196,7 @@ public class AppState { * been allocated but are not live; it is a superset of the live list */ private final ConcurrentMap<ContainerId, RoleInstance> ownedContainers = - new ConcurrentHashMap<ContainerId, RoleInstance>(); + new ConcurrentHashMap<>(); /** * Hash map of the containers we have released, but we @@ -204,33 +204,33 @@ public class AppState { * containers is treated as a successful outcome */ private final ConcurrentMap<ContainerId, Container> containersBeingReleased = - new ConcurrentHashMap<ContainerId, Container>(); + new ConcurrentHashMap<>(); /** * Counter for completed containers ( complete denotes successful or failed ) */ - private final AtomicInteger completedContainerCount = new AtomicInteger(); + private final Counter completedContainerCount = new Counter(); /** * Count of failed containers */ - private final AtomicInteger failedContainerCount = new AtomicInteger(); + private final Counter failedContainerCount = new Counter(); /** * # of started containers */ - private final AtomicInteger startedContainers = new AtomicInteger(); + private final Counter startedContainers = new Counter(); /** * # of containers that failed to start */ - private final AtomicInteger startFailedContainers = new AtomicInteger(); + private final Counter startFailedContainerCount = new Counter(); /** * Track the number of surplus containers received and discarded */ - private final AtomicInteger surplusContainers = new AtomicInteger(); + private final Counter surplusContainers = new Counter(); /** @@ -244,21 +244,21 @@ public class AppState { * the node is promoted from here to the containerMap */ private final Map<ContainerId, RoleInstance> startingNodes = - new ConcurrentHashMap<ContainerId, RoleInstance>(); + new ConcurrentHashMap<>(); /** * List of completed nodes. This isn't kept in the CD as it gets too * big for the RPC responses. Indeed, we should think about how deep to get this */ private final Map<ContainerId, RoleInstance> completedNodes - = new ConcurrentHashMap<ContainerId, RoleInstance>(); + = new ConcurrentHashMap<>(); /** * Nodes that failed to start. * Again, kept out of the CD */ private final Map<ContainerId, RoleInstance> failedNodes = - new ConcurrentHashMap<ContainerId, RoleInstance>(); + new ConcurrentHashMap<>(); /** * Nodes that came assigned to a role above that @@ -267,11 +267,11 @@ public class AppState { private final Set<ContainerId> surplusNodes = new HashSet<ContainerId>(); /** - * Map of containerID -> cluster nodes, for status reports. + * Map of containerID to cluster nodes, for status reports. * Access to this should be synchronized on the clusterDescription */ private final Map<ContainerId, RoleInstance> liveNodes = - new ConcurrentHashMap<ContainerId, RoleInstance>(); + new ConcurrentHashMap<>(); private final AtomicInteger completionOfNodeNotInLiveListEvent = new AtomicInteger(); private final AtomicInteger completionOfUnknownContainerEvent = @@ -311,54 +311,53 @@ public class AppState { MetricsAndMonitoring metricsAndMonitoring) { this.recordFactory = recordFactory; this.metricsAndMonitoring = metricsAndMonitoring; - + // register any metrics - MetricRegistry metrics = metricsAndMonitoring.getMetrics(); - metrics.register( + register(MetricsConstants.CONTAINERS_OUTSTANDING_REQUESTS, outstandingContainerRequests); + register(MetricsConstants.CONTAINERS_SURPLUS, surplusContainers); + register(MetricsConstants.CONTAINERS_STARTED, startedContainers); + register(MetricsConstants.CONTAINERS_COMPLETED, completedContainerCount); + register(MetricsConstants.CONTAINERS_FAILED, failedContainerCount); + register(MetricsConstants.CONTAINERS_START_FAILED, startFailedContainerCount); + } + + private void register(String name, Counter counter) { + this.metricsAndMonitoring.getMetrics().register( MetricRegistry.name(AppState.class, - MetricsConstants.CONTAINERS_OUTSTANDING_REQUESTS), - outstandingContainerRequests); + name), counter); } - public int getFailedCountainerCount() { - return failedContainerCount.get(); + public long getFailedCountainerCount() { + return failedContainerCount.getCount(); } /** - * Increment the count and return the new value - * @return the latest failed container count + * Increment the count */ - public int incFailedCountainerCount() { - return failedContainerCount.incrementAndGet(); + public void incFailedCountainerCount() { + failedContainerCount.inc(); } - public int getStartFailedCountainerCount() { - return startFailedContainers.get(); + public long getStartFailedCountainerCount() { + return startFailedContainerCount.getCount(); } /** * Increment the count and return the new value - * @return the latest failed container count */ - public int incStartedCountainerCount() { - return startedContainers.incrementAndGet(); + public void incStartedCountainerCount() { + startedContainers.inc(); } - public int getStartedCountainerCount() { - return startedContainers.get(); + public long getStartedCountainerCount() { + return startedContainers.getCount(); } /** * Increment the count and return the new value - * @return the latest failed container count */ - public int incStartFailedCountainerCount() { - return startFailedContainers.incrementAndGet(); - } - - - public AtomicInteger getStartFailedContainers() { - return startFailedContainers; + public void incStartFailedCountainerCount() { + startFailedContainerCount.inc(); } public AtomicInteger getCompletionOfNodeNotInLiveListEvent() { @@ -535,7 +534,7 @@ public class AppState { this.applicationInfo = applicationInfo != null ? applicationInfo : new HashMap<String, String>(); - clientProperties = new HashMap<String, String>(); + clientProperties = new HashMap<>(); containerReleaseSelector = releaseSelector; @@ -1744,19 +1743,19 @@ public class AppState { * keylist. */ protected Map<String, Integer> getLiveStatistics() { - Map<String, Integer> sliderstats = new HashMap<String, Integer>(); + Map<String, Integer> sliderstats = new HashMap<>(); + sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_LIVE, + liveNodes.size()); sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_COMPLETED, - completedContainerCount.get()); + (int)completedContainerCount.getCount()); sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_FAILED, - failedContainerCount.get()); - sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_LIVE, - liveNodes.size()); + (int)failedContainerCount.getCount()); sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_STARTED, - startedContainers.get()); + (int)startedContainers.getCount()); sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_START_FAILED, - startFailedContainers.get()); + (int) startFailedContainerCount.getCount()); sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_SURPLUS, - surplusContainers.get()); + (int)surplusContainers.getCount()); sliderstats.put(StatusKeys.STATISTICS_CONTAINERS_UNKNOWN_COMPLETED, completionOfUnknownContainerEvent.get()); return sliderstats; @@ -1893,7 +1892,6 @@ public class AppState { throws SliderInternalStateException, TriggerClusterTeardownException { List<AbstractRMOperation> operations = new ArrayList<>(); int delta; - String details; int expected; String name = role.getName(); synchronized (role) { @@ -1903,7 +1901,7 @@ public class AppState { log.info("Reviewing {} : expected {}", role, expected); checkFailureThreshold(role); - + if (expected < 0 ) { // negative value: fail throw new TriggerClusterTeardownException( @@ -1912,7 +1910,7 @@ public class AppState { "Negative component count of %d desired for component %s", expected, role); } - + if (delta > 0) { log.info("{}: Asking for {} more nodes(s) for a total of {} ", name, delta, expected); @@ -2004,7 +2002,7 @@ public class AppState { for (RoleInstance possible : finalCandidates) { log.debug("Targeting for release: {}", possible); containerReleaseSubmitted(possible.container); - operations.add(new ContainerReleaseOperation(possible.getId())); + operations.add(new ContainerReleaseOperation(possible.getId())); } } @@ -2033,7 +2031,6 @@ public class AppState { return operations; } - /** * Find a container running on a specific host -looking * into the node ID to determine this. @@ -2055,7 +2052,7 @@ public class AppState { } return null; } - + /** * Release all containers. * @return a list of operations to execute @@ -2105,40 +2102,42 @@ public class AppState { String containerHostInfo = container.getNodeId().getHost() + ":" + container.getNodeId().getPort(); - int allocated; - int desired; //get the role - ContainerId cid = container.getId(); - RoleStatus role = lookupRoleStatus(container); + final ContainerId cid = container.getId(); + final RoleStatus role = lookupRoleStatus(container); //dec requested count decrementRequestCount(role); + + // cancel an allocation request which granted this, so as to avoid repeated + // requests + releaseOperations.add(new CancelRequestOperation(container.getPriority(), null, 1)); + //inc allocated count -this may need to be dropped in a moment, // but us needed to update the logic below - allocated = role.incActual(); + final int allocated = role.incActual(); + final int desired = role.getDesired(); - //look for (race condition) where we get more back than we asked - desired = role.getDesired(); - - ContainerAllocationOutcome outcome = roleHistory.onContainerAllocated(container, - desired, - allocated); + final String roleName = role.getName(); + final ContainerAllocationOutcome outcome = + roleHistory.onContainerAllocated(container, desired, allocated); + //look for condition where we get more back than we asked if (allocated > desired) { - log.info("Discarding surplus container {} on {}", cid, - containerHostInfo); + log.info("Discarding surplus {} container {} on {}", roleName, cid, + containerHostInfo); releaseOperations.add(new ContainerReleaseOperation(cid)); //register as a surplus node surplusNodes.add(cid); - surplusContainers.incrementAndGet(); + surplusContainers.inc(); //and, as we aren't binding it to role, dec that role's actual count role.decActual(); } else { - // this is valid, so decrement the number of outstanding requests + // Allocation being accepted -so decrement the number of outstanding requests decOutstandingContainerRequests(); - String roleName = role.getName(); + log.info("Assigning role {} to container" + " {}," + " on {}:{},", @@ -2174,8 +2173,8 @@ public class AppState { * @return true if a rebuild took place (even if size 0) * @throws RuntimeException on problems */ - private boolean rebuildModelFromRestart(List<Container> liveContainers) throws - BadClusterStateException { + private boolean rebuildModelFromRestart(List<Container> liveContainers) + throws BadClusterStateException { if (liveContainers == null) { return false; } @@ -2193,8 +2192,8 @@ public class AppState { * @param container container that was running before the AM restarted * @throws RuntimeException on problems */ - private void addRestartedContainer(Container container) throws - BadClusterStateException { + private void addRestartedContainer(Container container) + throws BadClusterStateException { String containerHostInfo = container.getNodeId().getHost() + ":" + container.getNodeId().getPort(); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java index 99108fe..64f9184 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -625,7 +625,7 @@ public class RoleHistory { List<Container> unrequested = new ArrayList<>(allocatedContainers.size()); outstandingRequests.partitionRequests(this, allocatedContainers, requested, unrequested); - + //give the unrequested ones lower priority requested.addAll(unrequested); return requested; http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1938323c/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy index e0fdf1b..c310583 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRebuildOnAMRestart.groovy @@ -85,9 +85,9 @@ class TestMockAppStateRebuildOnAMRestart extends BaseMockAppStateTest null, new MostRecentContainerReleaseSelector()) - assert appState.getStartedCountainerCount() == clusterSize + assert appState.startedCountainerCount == clusterSize - appState.getRoleHistory().dump(); + appState.roleHistory.dump(); //check that the app state direct structures match List<RoleInstance> r0live = appState.enumLiveNodesInRole(ROLE0)
