SLIDER-966 when flexing down, pending requests are decremented before trying to cancel anything outstanding
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/e0fb5291 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/e0fb5291 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/e0fb5291 Branch: refs/heads/feature/SLIDER-82-pass-3.1 Commit: e0fb529161ae2fa70dc719af4ccc35ee0e5f9c1f Parents: ee0c8da Author: Steve Loughran <[email protected]> Authored: Fri Nov 6 23:27:04 2015 +0000 Committer: Steve Loughran <[email protected]> Committed: Fri Nov 6 23:27:04 2015 +0000 ---------------------------------------------------------------------- .../operations/AbstractRMOperation.java | 2 +- .../slider/server/appmaster/state/AppState.java | 45 ++++++++++++++++-- .../appstate/TestMockAppStateAAPlacement.groovy | 50 +++++++++++++++++++- 3 files changed, 91 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/e0fb5291/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AbstractRMOperation.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AbstractRMOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AbstractRMOperation.java index b5b27c5..ed3f197 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AbstractRMOperation.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/AbstractRMOperation.java @@ -26,5 +26,5 @@ public abstract class AbstractRMOperation { * @param handler handler to perform the execution */ public abstract void execute(RMOperationHandlerActions handler); - + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/e0fb5291/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 53ab2fe..21f59a1 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -84,9 +84,33 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.slider.api.ResourceKeys.*; -import static org.apache.slider.api.RoleKeys.*; -import static org.apache.slider.api.StateValues.*; +import static org.apache.slider.api.ResourceKeys.COMPONENT_INSTANCES; +import static org.apache.slider.api.ResourceKeys.COMPONENT_PLACEMENT_POLICY; +import static org.apache.slider.api.ResourceKeys.COMPONENT_PRIORITY; +import static org.apache.slider.api.ResourceKeys.CONTAINER_FAILURE_THRESHOLD; +import static org.apache.slider.api.ResourceKeys.DEFAULT_CONTAINER_FAILURE_THRESHOLD; +import static org.apache.slider.api.ResourceKeys.DEFAULT_NODE_FAILURE_THRESHOLD; +import static org.apache.slider.api.ResourceKeys.DEFAULT_PLACEMENT_ESCALATE_DELAY_SECONDS; +import static org.apache.slider.api.ResourceKeys.DEF_YARN_CORES; +import static org.apache.slider.api.ResourceKeys.DEF_YARN_LABEL_EXPRESSION; +import static org.apache.slider.api.ResourceKeys.DEF_YARN_MEMORY; +import static org.apache.slider.api.ResourceKeys.NODE_FAILURE_THRESHOLD; +import static org.apache.slider.api.ResourceKeys.PLACEMENT_ESCALATE_DELAY; +import static org.apache.slider.api.ResourceKeys.YARN_CORES; +import static org.apache.slider.api.ResourceKeys.YARN_LABEL_EXPRESSION; +import static org.apache.slider.api.ResourceKeys.YARN_MEMORY; +import static org.apache.slider.api.ResourceKeys.YARN_RESOURCE_MAX; +import static org.apache.slider.api.RoleKeys.ROLE_FAILED_INSTANCES; +import static org.apache.slider.api.RoleKeys.ROLE_FAILED_RECENTLY_INSTANCES; +import static org.apache.slider.api.RoleKeys.ROLE_FAILED_STARTING_INSTANCES; +import static org.apache.slider.api.RoleKeys.ROLE_NODE_FAILED_INSTANCES; +import static org.apache.slider.api.RoleKeys.ROLE_PREEMPTED_INSTANCES; +import static org.apache.slider.api.RoleKeys.ROLE_RELEASING_INSTANCES; +import static org.apache.slider.api.RoleKeys.ROLE_REQUESTED_INSTANCES; +import static org.apache.slider.api.StateValues.STATE_CREATED; +import static org.apache.slider.api.StateValues.STATE_DESTROYED; +import static org.apache.slider.api.StateValues.STATE_LIVE; +import static org.apache.slider.api.StateValues.STATE_SUBMITTED; /** @@ -1909,6 +1933,21 @@ public class AppState { // reduce the number expected (i.e. subtract the delta) long excess = -delta; + if (isAA) { + // there may be pending requests which can be cancelled here + long pending = role.getPendingAntiAffineRequests(); + if (excess <= pending) { + long outstanding = pending - excess; + log.info("Cancelling {} pending AA allocations, leaving {}", excess, outstanding); + role.setPendingAntiAffineRequests(outstanding); + excess = 0; + } else { + // not enough + log.info("Cancelling all pending AA allocations"); + role.setPendingAntiAffineRequests(0); + excess -= pending; + } + } // how many requests are outstanding? long outstandingRequests = role.getRequested(); if (outstandingRequests > 0) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/e0fb5291/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy index 928e355..baf88dc 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy @@ -154,9 +154,7 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest assert 2 == ops2.size() assert aaRole.pendingAntiAffineRequests == 1 - assert 0 == appState.reviewRequestAndReleaseNodes().size() - // now trigger the next execution cycle List<AbstractRMOperation> ops3 = [] assert 1 == submitOperations(ops2, [], ops3).size() @@ -164,4 +162,52 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest assert aaRole.pendingAntiAffineRequests == 0 } + @Test + public void testAllocateFlexDown() throws Throwable { + // want multiple instances, so there will be iterations + aaRole.desired = 2 + List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() + getSingleRequest(ops) + assert aaRole.pendingAntiAffineRequests == 1 + + // flex down so that the next request should be cancelled + aaRole.desired = 1 + + // expect: no new reqests, pending count -- + List<AbstractRMOperation> ops2 = appState.reviewRequestAndReleaseNodes() + assert ops2.empty + assert aaRole.pendingAntiAffineRequests == 0 + + // next iter + submitOperations(ops, [], ops2).size() + assert 1 == ops2.size() + } + + /** + * Here flex down while there is only one outstanding request. + * The outstanding flex should be cancelled + * @throws Throwable + */ + @Test + public void testAllocateFlexDownForcesCancel() throws Throwable { + // want multiple instances, so there will be iterations + aaRole.desired = 1 + List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() + getSingleRequest(ops) + assert aaRole.pendingAntiAffineRequests == 0 + + // flex down so that the next request should be cancelled + aaRole.desired = 0 + // expect: no new reqests, pending count -- + List<AbstractRMOperation> ops2 = appState.reviewRequestAndReleaseNodes() + assert aaRole.pendingAntiAffineRequests == 0 + assert ops2.size() == 1 + getSingleCancel(ops2) + + // next iter + submitOperations(ops, [], ops2).size() + assert 1 == ops2.size() + } + + }
