SLIDER-966 initial sequence and sequence on flex up holding
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/b54eb4a3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/b54eb4a3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/b54eb4a3 Branch: refs/heads/develop Commit: b54eb4a399d3daf88b11c5aea96d74ee10f852e9 Parents: aa46b47 Author: Steve Loughran <[email protected]> Authored: Fri Nov 6 23:15:33 2015 +0000 Committer: Steve Loughran <[email protected]> Committed: Fri Nov 6 23:15:33 2015 +0000 ---------------------------------------------------------------------- .../slider/server/appmaster/state/AppState.java | 28 +++++++--- .../server/appmaster/state/RoleStatus.java | 22 ++++++-- .../appstate/TestMockAppStateAAPlacement.groovy | 56 +++++++++++++++----- .../model/mock/BaseMockAppStateTest.groovy | 4 +- .../appmaster/model/mock/MockYarnEngine.groovy | 7 ++- 5 files changed, 89 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b54eb4a3/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 1e23bef..53ab2fe 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -18,7 +18,6 @@ package org.apache.slider.server.appmaster.state; -import com.codahale.metrics.Counter; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; import com.google.common.annotations.VisibleForTesting; @@ -1187,7 +1186,6 @@ public class AppState { roleHistory.onContainerReleaseSubmitted(container); } - /** * Create a container request. * Update internal state, such as the role request count. @@ -1199,7 +1197,9 @@ public class AppState { private AMRMClient.ContainerRequest createContainerRequest(RoleStatus role) { incrementRequestCount(role); OutstandingRequest request = roleHistory.requestContainerForRole(role); - role.setOutstandingAArequest(request); + if (role.isAntiAffinePlacement()) { + role.setOutstandingAArequest(request); + } return request.getIssuedRequest(); } @@ -1772,8 +1772,10 @@ public class AppState { throws TriggerClusterTeardownException { long failures = role.getFailedRecently(); int threshold = getFailureThresholdForRole(role); - log.debug("Failure count of component: {}: {}, threshold={}", - role.getName(), failures, threshold); + if (log.isDebugEnabled() && failures > 0) { + log.debug("Failure count of component: {}: {}, threshold={}", + role.getName(), failures, threshold); + } if (failures > threshold) { throw new TriggerClusterTeardownException( @@ -1885,8 +1887,9 @@ public class AppState { if (isAA) { // build one only if there is none outstanding if (role.getPendingAntiAffineRequests() == 0) { - log.info("Starting an anti-affine request sequence"); - role.incPendingAntiAffineRequests(delta); + log.info("Starting an anti-affine request sequence for {} nodes", delta); + // log the number outstanding + role.incPendingAntiAffineRequests(delta - 1); addContainerRequest(operations, createContainerRequest(role)); } else { log.info("Adding {} more anti-affine requests", delta); @@ -2110,6 +2113,17 @@ public class AppState { // add all requests to the operations list operations.addAll(allocation.operations); + // now for AA requests, add some more + if (role.isAntiAffinePlacement()) { + role.completeOutstandingAARequest(); + if (role.getPendingAntiAffineRequests() > 0) { + // still an outstanding AA request: need to issue a new one. + log.info("Asking for next container for AA role {}", roleName); + role.decPendingAntiAffineRequests(); + addContainerRequest(operations, createContainerRequest(role)); + } + } + //look for condition where we get more back than we asked if (allocated > desired) { log.info("Discarding surplus {} container {} on {}", roleName, cid, http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b54eb4a3/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java index cba963c..1beaddc 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java @@ -28,7 +28,6 @@ import org.apache.slider.server.appmaster.management.LongGauge; import java.io.Serializable; import java.util.Comparator; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; /** * Models the ongoing status of all nodes in an application. @@ -298,6 +297,10 @@ public final class RoleStatus implements Cloneable { this.pendingAntiAffineRequests.set(pendingAntiAffineRequests); } + public long decPendingAntiAffineRequests() { + return pendingAntiAffineRequests.decToFloor(1); + } + public OutstandingRequest getOutstandingAArequest() { return outstandingAArequest; } @@ -307,6 +310,15 @@ public final class RoleStatus implements Cloneable { } /** + * Complete the outstanding AA request (there's no check for one in progress, caller + * expected to have done that). + * @return the number of outstanding requests + */ + public void completeOutstandingAARequest() { + setOutstandingAArequest(null); + } + + /** * Get the number of roles we are short of. * nodes released are ignored. * @return the positive or negative number of roles to add/release. @@ -326,11 +338,11 @@ public final class RoleStatus implements Cloneable { } /** - * Get count of actual and requested containers - * @return the size of the application when outstanding requests are included + * Get count of actual and requested containers. This includes pending ones + * @return the size of the application when outstanding requests are included. */ public long getActualAndRequested() { - return actual.get() + requested.get(); + return actual.get() + requested.get() + pendingAntiAffineRequests.get(); } @Override @@ -342,7 +354,7 @@ public final class RoleStatus implements Cloneable { ", actual=" + actual + ", requested=" + requested + ", releasing=" + releasing + - ", pendingAntiAffineRequestCount=" + pendingAntiAffineRequests + + ", pendingAntiAffineRequests=" + pendingAntiAffineRequests + ", failed=" + failed + ", failed recently=" + failedRecently.get() + ", node failed=" + nodeFailed.get() + http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b54eb4a3/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy index 42772c5..928e355 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy @@ -22,19 +22,16 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import org.apache.hadoop.yarn.api.records.Container import org.apache.hadoop.yarn.client.api.AMRMClient -import org.apache.slider.api.ResourceKeys import org.apache.slider.providers.PlacementPolicy import org.apache.slider.providers.ProviderRole import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest import org.apache.slider.server.appmaster.model.mock.MockFactory import org.apache.slider.server.appmaster.model.mock.MockRoles import org.apache.slider.server.appmaster.operations.AbstractRMOperation -import org.apache.slider.server.appmaster.operations.CancelSingleRequest -import org.apache.slider.server.appmaster.operations.ContainerRequestOperation import org.apache.slider.server.appmaster.state.AppStateBindingInfo import org.apache.slider.server.appmaster.state.ContainerAssignment -import org.apache.slider.server.appmaster.state.NodeMap import org.apache.slider.server.appmaster.state.RoleInstance +import org.apache.slider.server.appmaster.state.RoleStatus import org.junit.Test /** @@ -56,6 +53,8 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest 2, null) + RoleStatus aaRole + @Override AppStateBindingInfo buildBindingInfo() { def bindingInfo = super.buildBindingInfo() @@ -67,7 +66,11 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest bindingInfo } - private static final int roleId = AAROLE.id + @Override + void setup() { + super.setup() + aaRole = lookupRole(AAROLE.name) + } /** * Get the single request of a list of operations; includes the check for the size @@ -80,18 +83,12 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest } @Test - public void testVerifyNodeMap() throws Throwable { - + public void testAllocateAANoLabel() throws Throwable { def nodemap = appState.roleHistory.cloneNodemap() assert nodemap.size() > 0 - } - @Test - public void testAllocateAANoLabel() throws Throwable { - def aaRole = lookupRole(AAROLE.name) - - // want two instances, so there will be two iterations + // want multiple instances, so there will be iterations aaRole.desired = 2 List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() @@ -118,6 +115,9 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest // we also expect a new allocation request to have been issued def req2 = getRequest(operations, 1) + + // verify the pending couner is down + assert 0L == aaRole.pendingAntiAffineRequests Container allocated2 = engine.allocateContainer(req2) // placement must be on a different host @@ -131,7 +131,37 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest assert appState.onNodeManagerContainerStarted(container.id) ops = appState.reviewRequestAndReleaseNodes() assert ops.size() == 0 + } + + @Test + public void testAllocateFlexUp() throws Throwable { + // want multiple instances, so there will be iterations + aaRole.desired = 2 + List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() + getSingleRequest(ops) + assert aaRole.pendingAntiAffineRequests == 1 + + // now trigger that flex up + aaRole.desired = 3 + + // expect: no new reqests, pending count ++ + List<AbstractRMOperation> ops2 = appState.reviewRequestAndReleaseNodes() + assert ops2.empty + assert aaRole.pendingAntiAffineRequests == 2 + + // next iter + assert 1 == submitOperations(ops, [], ops2).size() + assert 2 == ops2.size() + assert aaRole.pendingAntiAffineRequests == 1 + + + assert 0 == appState.reviewRequestAndReleaseNodes().size() + // now trigger the next execution cycle + List<AbstractRMOperation> ops3 = [] + assert 1 == submitOperations(ops2, [], ops3).size() + assert 2 == ops3.size() + assert aaRole.pendingAntiAffineRequests == 0 } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b54eb4a3/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy index cefba42..14e556a 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy @@ -286,9 +286,9 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles * @return a list of roles allocated */ public List<RoleInstance> createAndSubmitNodes( - List<ContainerId> released) { + List<ContainerId> containerIds) { List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() - return submitOperations(ops, released) + return submitOperations(ops, containerIds) } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b54eb4a3/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy index 965219d..7ab97fa 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.Container import org.apache.hadoop.yarn.api.records.ContainerId import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.slider.server.appmaster.operations.AbstractRMOperation +import org.apache.slider.server.appmaster.operations.CancelSingleRequest import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation import org.apache.slider.server.appmaster.operations.ContainerRequestOperation import org.junit.Assert @@ -113,7 +114,9 @@ class MockYarnEngine { ContainerId cid = cro.containerId assert releaseContainer(cid); released.add(cid) - } else { + } else if (op instanceof CancelSingleRequest) { + // no-op + } else if (op instanceof ContainerRequestOperation) { ContainerRequestOperation req = (ContainerRequestOperation) op Container container = allocateContainer(req.request) if (container != null) { @@ -123,6 +126,8 @@ class MockYarnEngine { log.debug("Unsatisfied allocation $req") pending.add(req) } + } else { + log.warn("Unsupported operation $op") } } return allocation
