SLIDER-967 AA placement with nodemap updates working
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/5a61b4cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/5a61b4cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/5a61b4cd Branch: refs/heads/develop Commit: 5a61b4cd8189ae02eb9eaeb8ffdb25604dcc4376 Parents: 6b13042 Author: Steve Loughran <[email protected]> Authored: Thu Nov 12 18:15:07 2015 +0000 Committer: Steve Loughran <[email protected]> Committed: Thu Nov 12 18:15:07 2015 +0000 ---------------------------------------------------------------------- .../server/appmaster/SliderAppMaster.java | 13 ++- .../slider/server/appmaster/state/AppState.java | 51 ++++++++++-- .../state/OutstandingRequestTracker.java | 8 +- .../server/appmaster/state/RoleHistory.java | 5 -- .../server/appmaster/state/RoleStatus.java | 55 ++++++++----- .../appstate/TestMockAppStateAAPlacement.groovy | 85 +++++++++++++++++--- .../model/history/TestRoleHistoryAA.groovy | 4 - .../model/mock/BaseMockAppStateTest.groovy | 21 ++++- 8 files changed, 183 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index b54ea6c..eb7b26a 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -1851,10 +1851,15 @@ public class SliderAppMaster extends AbstractSliderLaunchedService LOG_YARN.info("onNodesUpdated({})", updatedNodes.size()); log.info("Updated nodes {}", updatedNodes); // Check if any nodes are lost or revived and update state accordingly - List<AbstractRMOperation> operations = appState.onNodesUpdated(updatedNodes); - execute(operations); - // if there were any operations, trigger a review - reviewRequestAndReleaseNodes("nodes updated"); + + AppState.NodeUpdatedOutcome outcome = appState.onNodesUpdated(updatedNodes); + if (!outcome.operations.isEmpty()) { + execute(outcome.operations); + } + // rigger a review if the cluster changed + if (outcome.clusterChanged) { + reviewRequestAndReleaseNodes("nodes updated"); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 0c66e25..6f38eb5 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -1222,11 +1222,11 @@ public class AppState { * @return the container request to submit or null if there is none */ private AMRMClient.ContainerRequest createContainerRequest(RoleStatus role) { - incrementRequestCount(role); OutstandingRequest request = roleHistory.requestContainerForRole(role); if (request == null) { return null; } + incrementRequestCount(role); if (role.isAntiAffinePlacement()) { role.setOutstandingAArequest(request); } @@ -1428,16 +1428,31 @@ public class AppState { * Handle node update from the RM. This syncs up the node map with the RM's view * @param updatedNodes updated nodes */ - public synchronized List<AbstractRMOperation> onNodesUpdated(List<NodeReport> updatedNodes) { + public synchronized NodeUpdatedOutcome onNodesUpdated(List<NodeReport> updatedNodes) { boolean changed = roleHistory.onNodesUpdated(updatedNodes); if (changed) { - log.error("TODO: cancel AA requests and re-review"); - return cancelOutstandingAARequests(); + log.info("YARN cluster changed âcancelling current AA requests"); + List<AbstractRMOperation> operations = cancelOutstandingAARequests(); + log.debug("Created {} cancel requests", operations.size()); + return new NodeUpdatedOutcome(true, operations); } - return new ArrayList<>(0); + return new NodeUpdatedOutcome(false, new ArrayList<AbstractRMOperation>(0)); } /** + * Return value of the {@link #onNodesUpdated(List)} call. + */ + public static class NodeUpdatedOutcome { + public final boolean clusterChanged; + public final List<AbstractRMOperation> operations; + + public NodeUpdatedOutcome(boolean clusterChanged, + List<AbstractRMOperation> operations) { + this.clusterChanged = clusterChanged; + this.operations = operations; + } + } + /** * Is a role short lived by the threshold set for this application * @param instance instance * @return true if the instance is considered short lived @@ -1885,13 +1900,17 @@ public class AppState { } /** - * Escalate operation as triggered by external timer. + * Cancel any outstanding AA Requests, building up the list of ops to + * cancel, removing them from RoleHistory structures and the RoleStatus + * entries. * @return a (usually empty) list of cancel/request operations. */ public synchronized List<AbstractRMOperation> cancelOutstandingAARequests() { + // get the list of cancel operations List<AbstractRMOperation> operations = roleHistory.cancelOutstandingAARequests(); for (RoleStatus roleStatus : roleStatusMap.values()) { - if (roleStatus.isAntiAffinePlacement()) { + if (roleStatus.isAARequestOutstanding()) { + log.info("Cancelling outstanding AA request for {}", roleStatus); roleStatus.cancelOutstandingAARequest(); } } @@ -2225,6 +2244,9 @@ public class AppState { log.info("Asking for next container for AA role {}", roleName); role.decPendingAntiAffineRequests(); addContainerRequest(operations, createContainerRequest(role)); + log.debug("Current AA role status {}", role); + } else { + log.info("AA request sequence completed for role {}", role); } } @@ -2310,4 +2332,19 @@ public class AppState { // now pretend it has just started innerOnNodeManagerContainerStarted(cid); } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("AppState{"); + sb.append("applicationLive=").append(applicationLive); + sb.append(", live nodes=").append(liveNodes.size()); + sb.append(", startedContainers=").append(startedContainers); + sb.append(", startFailedContainerCount=").append(startFailedContainerCount); + sb.append(", surplusContainers=").append(surplusContainers); + sb.append(", failedContainerCount=").append(failedContainerCount); + sb.append(", outstandingContainerRequests=") + .append(outstandingContainerRequests); + sb.append('}'); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java index 4209449..66d201f 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java @@ -390,6 +390,7 @@ public class OutstandingRequestTracker { @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") public synchronized List<AbstractRMOperation> cancelOutstandingAARequests() { + log.debug("Looking for AA request to cancel"); List<AbstractRMOperation> operations = new ArrayList<>(); // first, all placed requests @@ -404,15 +405,18 @@ public class OutstandingRequestTracker { } } // second, all open requests - for (OutstandingRequest outstandingRequest : openRequests) { + ListIterator<OutstandingRequest> orit = openRequests.listIterator(); + while (orit.hasNext()) { + OutstandingRequest outstandingRequest = orit.next(); synchronized (outstandingRequest) { if (outstandingRequest.isAntiAffine()) { // time to escalate operations.add(outstandingRequest.createCancelOperation()); - openRequests.remove(outstandingRequest); + orit.remove(); } } } + log.info("Cancelling {} outstanding AA requests", operations.size()); return operations; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java index d7e6050..00b5226 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -1030,11 +1030,6 @@ public class RoleHistory { List<OutstandingRequest> requests = outstandingRequests.extractOpenRequestsForRole(roleId, toCancel); - if (role.isAntiAffinePlacement()) { - // TODO: AA - // AA placement, so clear the role info - role.cancelOutstandingAARequest(); - } // are there any left? int remaining = toCancel - requests.size(); // ask for some placed nodes http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java index a14a84b..b530d18 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleStatus.java @@ -171,7 +171,7 @@ public final class RoleStatus implements Cloneable { public void cancel(long count) { requested.decToFloor(count); } - + public void decRequested() { cancel(1); } @@ -334,8 +334,11 @@ public final class RoleStatus implements Cloneable { * if there are no outstanding requests. */ public void cancelOutstandingAARequest() { - setOutstandingAArequest(null); - setPendingAntiAffineRequests(0); + if (outstandingAArequest != null) { + setOutstandingAArequest(null); + setPendingAntiAffineRequests(0); + decRequested(); + } } /** @@ -366,25 +369,33 @@ public final class RoleStatus implements Cloneable { } @Override - public synchronized String toString() { - return "RoleStatus{" + - "name='" + name + '\'' + - ", key=" + key + - ", desired=" + desired + - ", actual=" + actual + - ", requested=" + requested + - ", releasing=" + releasing + - ", pendingAntiAffineRequests=" + pendingAntiAffineRequests + - ", failed=" + failed + - ", failed recently=" + failedRecently.get() + - ", node failed=" + nodeFailed.get() + - ", pre-empted=" + preempted.get() + - ", started=" + started + - ", startFailed=" + startFailed + - ", completed=" + completed + - ", failureMessage='" + failureMessage + '\'' + - ", providerRole=" + providerRole + - '}'; + public String toString() { + final StringBuilder sb = new StringBuilder("RoleStatus{"); + sb.append("name='").append(name).append('\''); + sb.append(", key=").append(key); + sb.append(", desired=").append(desired); + sb.append(", actual=").append(actual); + sb.append(", requested=").append(requested); + sb.append(", releasing=").append(releasing); + sb.append(", failed=").append(failed); + sb.append(", startFailed=").append(startFailed); + sb.append(", started=").append(started); + sb.append(", completed=").append(completed); + sb.append(", totalRequested=").append(totalRequested); + sb.append(", preempted=").append(preempted); + sb.append(", nodeFailed=").append(nodeFailed); + sb.append(", failedRecently=").append(failedRecently); + sb.append(", limitsExceeded=").append(limitsExceeded); + sb.append(", resourceRequirements=").append(resourceRequirements); + sb.append(", isAntiAffinePlacement=").append(isAntiAffinePlacement()); + if (isAntiAffinePlacement()) { + sb.append(", pendingAntiAffineRequests=").append(pendingAntiAffineRequests); + sb.append(", outstandingAArequest=").append(outstandingAArequest); + } + sb.append(", failureMessage='").append(failureMessage).append('\''); + sb.append(", providerRole=").append(providerRole); + sb.append('}'); + return sb.toString(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy index c98f3bf..9a325d7 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateAAPlacement.groovy @@ -21,13 +21,18 @@ package org.apache.slider.server.appmaster.model.appstate import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import org.apache.hadoop.yarn.api.records.Container +import org.apache.hadoop.yarn.api.records.NodeReport +import org.apache.hadoop.yarn.api.records.NodeState import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.slider.providers.PlacementPolicy import org.apache.slider.providers.ProviderRole import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest import org.apache.slider.server.appmaster.model.mock.MockFactory +import org.apache.slider.server.appmaster.model.mock.MockNodeReport import org.apache.slider.server.appmaster.model.mock.MockRoles +import org.apache.slider.server.appmaster.model.mock.MockYarnEngine import org.apache.slider.server.appmaster.operations.AbstractRMOperation +import org.apache.slider.server.appmaster.state.AppState import org.apache.slider.server.appmaster.state.AppStateBindingInfo import org.apache.slider.server.appmaster.state.ContainerAssignment import org.apache.slider.server.appmaster.state.NodeMap @@ -55,6 +60,7 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest null) RoleStatus aaRole + private int NODES = 3 @Override AppStateBindingInfo buildBindingInfo() { @@ -73,6 +79,11 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest aaRole = lookupRole(AAROLE.name) } + @Override + MockYarnEngine createYarnEngine() { + new MockYarnEngine(NODES, 8) + } + /** * Get the single request of a list of operations; includes the check for the size * @param ops operations list of size 1 @@ -87,7 +98,6 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest public void testAllocateAANoLabel() throws Throwable { assert cloneNodemap().size() > 0 - // want multiple instances, so there will be iterations aaRole.desired = 2 @@ -111,7 +121,6 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest assert !hostInstance.canHost(aaRole.key, "") assert !hostInstance.canHost(aaRole.key, null) - // assignment assert assignments.size() == 1 @@ -205,7 +214,6 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest submitOperations(ops, [], ops2).size() assert 1 == ops2.size() assertAllContainersAA() - } /** @@ -241,17 +249,70 @@ class TestMockAppStateAAPlacement extends BaseMockAppStateTest } /** - * Scan through all containers and assert that the assignment is AA - * @param index role index + * + * @throws Throwable */ - void assertAllContainersAA(String index) { - def nodemap = stateAccess.nodeInformationSnapshot - nodemap.each { name, info -> - def nodeEntry = info.entries[index] - assert nodeEntry == null || - (nodeEntry.live -nodeEntry.releasing + nodeEntry.starting) <= 1 , - "too many instances on node $name" + @Test + public void testAskForTooMany() throws Throwable { + + describe("Ask for 1 more than the no of available nodes;" + + " expect the final request to be unsatisfied until the cluster changes size") + //more than expected + aaRole.desired = NODES + 1 + List<AbstractRMOperation > operations = appState.reviewRequestAndReleaseNodes() + assert aaRole.AARequestOutstanding + assert NODES == aaRole.pendingAntiAffineRequests + for (int i = 0; i < NODES; i++) { + def iter = "Iteration $i role = $aaRole" + log.info(iter) + List<AbstractRMOperation > operationsOut = [] + assert 1 == submitOperations(operations, [], operationsOut).size(), iter + operations = operationsOut + if (i + 1 < NODES) { + assert operations.size() == 2 + } else { + assert operations.size() == 1 + } + assertAllContainersAA() } + // expect an outstanding AA request to be unsatisfied + assert aaRole.actual < aaRole.desired + assert !aaRole.requested + assert !aaRole.AARequestOutstanding + List<Container> allocatedContainers = engine.execute(operations, []) + assert 0 == allocatedContainers.size() + // in a review now, no more requests can be generated, as there is no space for AA placements, + // even though there is cluster capacity + assert 0 == appState.reviewRequestAndReleaseNodes().size() + + // now do a node update (this doesn't touch the YARN engine; the node isn't really there) + def outcome = addNewNode() + assert cloneNodemap().size() == NODES + 1 + assert outcome.clusterChanged + // no active calls to empty + assert outcome.operations.empty + assert 1 == appState.reviewRequestAndReleaseNodes().size() + } + + protected AppState.NodeUpdatedOutcome addNewNode() { + NodeReport report = new MockNodeReport("four", NodeState.RUNNING) as NodeReport + appState.onNodesUpdated([report]) } + @Test + public void testClusterSizeChangesDuringRequestSequence() throws Throwable { + describe("Change the cluster size where the cluster size changes during a test sequence.") + aaRole.desired = NODES + 1 + List<AbstractRMOperation> operations = appState.reviewRequestAndReleaseNodes() + assert aaRole.AARequestOutstanding + assert NODES == aaRole.pendingAntiAffineRequests + def outcome = addNewNode() + assert outcome.clusterChanged + // one call to cancel + assert 1 == outcome.operations.size() + // and on a review, one more to rebuild + assert 1 == appState.reviewRequestAndReleaseNodes().size() + } + + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy index 9d0efa2..de85bba 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryAA.groovy @@ -44,9 +44,6 @@ class TestRoleHistoryAA extends SliderTestBase { NodeMap nodeMap, gpuNodeMap RoleHistory roleHistory = new MockRoleHistory(MockFactory.ROLES) - AMRMClient.ContainerRequest requestContainer(RoleStatus roleStatus) { - roleHistory.requestContainerForRole(roleStatus).issuedRequest - } @Override void setup() { @@ -159,7 +156,6 @@ class TestRoleHistoryAA extends SliderTestBase { assert node1.canHost(2,"") } - public List<NodeInstance> assertNoAvailableNodes(int role = 1, String label = "") { return verifyResultSize(0, nodeMap.findAllNodesForRole(role, label)) } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5a61b4cd/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy index 3d472f1..4cb441d 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/BaseMockAppStateTest.groovy @@ -279,7 +279,7 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles * @return a list of roles */ public List<RoleInstance> createAndSubmitNodes() { - return createAndSubmitNodes([]) + return createAndSubmitNodes([], []) } /** @@ -288,9 +288,10 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles * @return a list of roles allocated */ public List<RoleInstance> createAndSubmitNodes( - List<ContainerId> containerIds) { + List<ContainerId> containerIds, + List<AbstractRMOperation> operationsOut = []) { List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() - return submitOperations(ops, containerIds) + return submitOperations(ops, containerIds, operationsOut) } /** @@ -398,4 +399,18 @@ abstract class BaseMockAppStateTest extends SliderTestBase implements MockRoles assert 1 == ops.size() getRequest(ops, 0) } + + /** + * Scan through all containers and assert that the assignment is AA + * @param index role index + */ + void assertAllContainersAA(String index) { + def nodemap = stateAccess.nodeInformationSnapshot + nodemap.each { name, info -> + def nodeEntry = info.entries[index] + assert nodeEntry == null || + (nodeEntry.live - nodeEntry.releasing + nodeEntry.starting) <= 1, + "too many instances on node $name" + } + } }
