SLIDER-799 SLIDER-817 request tracker builds cancel operation from the resource used in the request...tests updated to handle the changes
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/b952b640 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/b952b640 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/b952b640 Branch: refs/heads/feature/SLIDER-799-AM-managed-relax Commit: b952b6401a22c65053c85a6a1238f1928d3eb243 Parents: 43c61fb Author: Steve Loughran <[email protected]> Authored: Tue Mar 24 19:33:36 2015 +0000 Committer: Steve Loughran <[email protected]> Committed: Tue Mar 24 19:33:36 2015 +0000 ---------------------------------------------------------------------- .../operations/CancelRequestOperation.java | 68 --------------- .../slider/server/appmaster/state/AppState.java | 48 +++++++---- .../state/ContainerAllocationOutcome.java | 15 ++++ .../appmaster/state/OutstandingRequest.java | 12 ++- .../state/OutstandingRequestTracker.java | 87 ++++++++++++++++---- .../server/appmaster/state/RoleHistory.java | 30 ++++++- .../appstate/TestMockAppStateFlexing.groovy | 20 ++++- .../TestMockAppStateRMOperations.groovy | 62 ++++++++------ .../TestMockAppStateRolePlacement.groovy | 14 +++- ...tRoleHistoryOutstandingRequestTracker.groovy | 12 +-- .../TestRoleHistoryRequestTracking.groovy | 8 +- .../appmaster/model/mock/Allocator.groovy | 1 + .../appmaster/model/mock/MockYarnEngine.groovy | 2 +- 13 files changed, 236 insertions(+), 143 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java deleted file mode 100644 index 754bf28..0000000 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/operations/CancelRequestOperation.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.slider.server.appmaster.operations; - -import com.google.common.base.Preconditions; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.slider.server.appmaster.state.ContainerPriority; - -/** - * Cancel a container request at the given priority/proirities. - */ -public class CancelRequestOperation extends AbstractRMOperation { - - private final Priority priority1; - private final Priority priority2; - private final int count; - - /** - * Create an instance - * @param priority1 first priority, the one that is released first - * @param priority2 optional second priority - * @param count number of requests to cancel - */ - public CancelRequestOperation(Priority priority1, Priority priority2, int count) { - Preconditions.checkArgument(priority1 != null, "null priority"); - Preconditions.checkArgument(count >= 0, "negative count"); - this.priority1 = priority1; - this.priority2 = priority2; - this.count = count; - } - - @Override - public void execute(RMOperationHandlerActions handler) { - handler.cancelContainerRequests(priority1, priority2, count); - } - - @Override - public String toString() { - return "release " + count - + " requests for " + ContainerPriority.toString(priority1) - + (priority2 != null ? - (" and " + ContainerPriority.toString(priority2)) : ""); - } - - /** - * Get the number to release - * @return the number of containers to release - */ - public int getCount() { - return count; - } -} http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 20c1792..c68b2a9 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -31,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.client.api.AMRMClient; @@ -68,7 +67,6 @@ import org.apache.slider.providers.ProviderRole; import org.apache.slider.server.appmaster.management.MetricsAndMonitoring; import org.apache.slider.server.appmaster.management.MetricsConstants; import org.apache.slider.server.appmaster.operations.AbstractRMOperation; -import org.apache.slider.server.appmaster.operations.CancelRequestOperation; import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation; import org.apache.slider.server.appmaster.operations.ContainerRequestOperation; import org.slf4j.Logger; @@ -1941,25 +1939,33 @@ public class AppState { if (outstandingRequests > 0) { // outstanding requests. int toCancel = Math.min(outstandingRequests, excess); - Priority p1 = - ContainerPriority.createPriority(role.getPriority(), true); - Priority p2 = - ContainerPriority.createPriority(role.getPriority(), false); - // TODO Delegate to Role History - operations.add(new CancelRequestOperation(p1, p2, toCancel)); + + // Delegate to Role History + + List<AbstractRMOperation> cancellations = roleHistory.cancelRequestsForRole(role, toCancel); + log.info("Found {} outstanding requests to cancel", cancellations.size()); + operations.addAll(cancellations); + if (toCancel != cancellations.size()) { + log.error("Tracking of outstanding requests is not in sync with the summary statistics:" + + " expected to be able to cancel {} requests, but got {}", + toCancel, cancellations.size()); + } + role.cancel(toCancel); excess -= toCancel; assert excess >= 0 : "Attempted to cancel too many requests"; log.info("Submitted {} cancellations, leaving {} to release", toCancel, excess); if (excess == 0) { - log.info("After cancelling requests, application is at desired size"); + log.info("After cancelling requests, application is now at desired size"); } } // after the cancellation there may be no excess if (excess > 0) { + + // there's an excess, so more to cancel // get the nodes to release int roleId = role.getKey(); @@ -1978,7 +1984,7 @@ public class AppState { } } - // warn if the desired state can't be reaced + // warn if the desired state can't be reached int numberAvailableForRelease = containersToRelease.size(); if (numberAvailableForRelease < excess) { log.warn("Not enough containers to release, have {} and need {} more", @@ -2001,7 +2007,7 @@ public class AppState { // then build up a release operation, logging each container as released for (RoleInstance possible : finalCandidates) { - log.debug("Targeting for release: {}", possible); + log.info("Targeting for release: {}", possible); containerReleaseSubmitted(possible.container); operations.add(new ContainerReleaseOperation(possible.getId())); } @@ -2009,6 +2015,7 @@ public class AppState { } + // list of operations to execute return operations; } @@ -2111,18 +2118,27 @@ public class AppState { //dec requested count decrementRequestCount(role); - // cancel an allocation request which granted this, so as to avoid repeated - // requests - releaseOperations.add(new CancelRequestOperation(container.getPriority(), null, 1)); - //inc allocated count -this may need to be dropped in a moment, // but us needed to update the logic below final int allocated = role.incActual(); final int desired = role.getDesired(); final String roleName = role.getName(); - final ContainerAllocationOutcome outcome = + final ContainerAllocation allocation = roleHistory.onContainerAllocated(container, desired, allocated); + final ContainerAllocationOutcome outcome = allocation.outcome; + + // cancel an allocation request which granted this, so as to avoid repeated + // requests + if (allocation.origin != null && allocation.origin.getIssuedRequest() != null) { + releaseOperations.add(allocation.origin.createCancelOperation()); + } else { + // there's a request, but no idea what to cancel. + // rather than try to recover from it inelegantly, (and cause more confusion), + // log the event, but otherwise continue + log.warn("Unexpected allocation of container " + + SliderUtils.containerToString(container)); + } //look for condition where we get more back than we asked if (allocated > desired) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java index 6639300..5b3a93c 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/ContainerAllocationOutcome.java @@ -22,8 +22,23 @@ package org.apache.slider.server.appmaster.state; * Outcome of the assignment */ public enum ContainerAllocationOutcome { + /** + * There wasn't a request for this + */ Unallocated, + + /** + * Open placement + */ Open, + + /** + * Allocated explicitly where requested + */ Placed, + + /** + * This was an escalated placement + */ Escalated } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java index 24946af..12b4b53 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.slider.server.appmaster.operations.CancelSingleRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -244,7 +245,7 @@ public final class OutstandingRequest { * so as to place it into the relaxed list. */ public synchronized AMRMClient.ContainerRequest escalate() { - Preconditions.checkNotNull(issuedRequest, "cannot escalate if request not issued "+ this); + Preconditions.checkNotNull(issuedRequest, "cannot escalate if request not issued " + this); escalated = true; // this is now the priority @@ -352,4 +353,13 @@ public final class OutstandingRequest { sb.append('}'); return sb.toString(); } + + /** + * Create a cancel operation + * @return an operation that can be used to cancel the request + */ + public CancelSingleRequest createCancelOperation() { + Preconditions.checkState(issuedRequest!=null, "No issued request to cancel"); + return new CancelSingleRequest(issuedRequest); + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java index 05a8052..97d321c 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java @@ -119,38 +119,44 @@ public class OutstandingRequestTracker { * from the {@link #placedRequests} structure. * @param role role index * @param hostname hostname - * @param resource * @return the allocation outcome */ - public synchronized ContainerAllocationOutcome onContainerAllocated(int role, + public synchronized ContainerAllocation onContainerAllocated(int role, String hostname, Container container) { + final String containerDetails = SliderUtils.containerToString(container); + log.debug("Processing allocation for role {} on {}", role, + containerDetails); + ContainerAllocation allocation = new ContainerAllocation(); ContainerAllocationOutcome outcome; OutstandingRequest request = - placedRequests.remove(new OutstandingRequest(role, hostname)); + placedRequests.remove(new OutstandingRequest(role, hostname)); if (request != null) { //satisfied request - log.info("Found placed request for container: {}", request); + log.debug("Found placed request for container: {}", request); request.completed(); // derive outcome from status of tracked request outcome = request.isEscalated() - ? ContainerAllocationOutcome.Escalated - : ContainerAllocationOutcome.Placed; + ? ContainerAllocationOutcome.Escalated + : ContainerAllocationOutcome.Placed; } else { // not in the list; this is an open placement // scan through all containers in the open request list request = removeOpenRequest(container); if (request != null) { - log.info("Found open request for container: {}", request); + log.debug("Found open request for container: {}", request); request.completed(); outcome = ContainerAllocationOutcome.Open; } else { - log.warn("Container allocation was not expected :" - + SliderUtils.containerToString(container)); + log.warn("No open request found for container {}, outstanding queue has {} entries ", + containerDetails, + openRequests.size()); outcome = ContainerAllocationOutcome.Unallocated; } } - return outcome; + allocation.origin = request; + allocation.outcome = outcome; + return allocation; } /** @@ -167,11 +173,15 @@ public class OutstandingRequestTracker { ListIterator<OutstandingRequest> openlist = openRequests.listIterator(); while (openlist.hasNext() && request == null) { OutstandingRequest r = openlist.next(); - if (r.getPriority() == pri - && r.resourceRequirementsMatch(resource)) { - // match of priority and resources - request = r; - openlist.remove(); + if (r.getPriority() == pri) { + // matching resource + if (r.resourceRequirementsMatch(resource)) { + // match of priority and resources + request = r; + openlist.remove(); + } else { + log.debug("Matched priorities but resources different"); + } } } return request; @@ -314,6 +324,7 @@ public class OutstandingRequestTracker { * Escalate operation as triggered by external timer. * @return a (usually empty) list of cancel/request operations. */ + @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") public synchronized List<AbstractRMOperation> escalateOutstandingRequests(long now) { if (placedRequests.isEmpty()) { return NO_REQUESTS; @@ -327,7 +338,7 @@ public class OutstandingRequestTracker { if (outstandingRequest.shouldEscalate(now)) { // time to escalate - CancelSingleRequest cancel = new CancelSingleRequest(outstandingRequest.getIssuedRequest()); + CancelSingleRequest cancel = outstandingRequest.createCancelOperation(); operations.add(cancel); AMRMClient.ContainerRequest escalated = outstandingRequest.escalate(); @@ -338,4 +349,48 @@ public class OutstandingRequestTracker { } return operations; } + + /** + * Extract a specific number of open requests for a role + * @param roleId role Id + * @param count count to extract + * @return a list of requests which are no longer in the open request list + */ + public synchronized List<OutstandingRequest> extractOpenRequestsForRole(int roleId, int count) { + List<OutstandingRequest> results = new ArrayList<>(); + ListIterator<OutstandingRequest> openlist = openRequests.listIterator(); + while (openlist.hasNext() && count > 0) { + OutstandingRequest openRequest = openlist.next(); + if (openRequest.roleId == roleId) { + results.add(openRequest); + openlist.remove(); + count--; + } + } + return results; + } + /** + * Extract a specific number of placed requests for a role + * @param roleId role Id + * @param count count to extract + * @return a list of requests which are no longer in the placed request data structure + */ + public synchronized List<OutstandingRequest> extractPlacedRequestsForRole(int roleId, int count) { + List<OutstandingRequest> results = new ArrayList<>(); + Iterator<OutstandingRequest> iterator = placedRequests.keySet().iterator(); + while (iterator.hasNext() && count > 0) { + OutstandingRequest request = iterator.next(); + if (request.roleId == roleId) { + results.add(request); + count--; + } + } + // now cull them from the map + for (OutstandingRequest result : results) { + placedRequests.remove(result); + } + + return results; + } + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java index 0b981b8..9ab40bd 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -638,13 +638,13 @@ public class RoleHistory { * @param actualCount current count of instances * @return The allocation outcome */ - public synchronized ContainerAllocationOutcome onContainerAllocated(Container container, + public synchronized ContainerAllocation onContainerAllocated(Container container, int desiredCount, int actualCount) { int role = ContainerPriority.extractRole(container); String hostname = RoleHistoryUtils.hostnameOf(container); List<NodeInstance> nodeInstances = getOrCreateNodesForRoleId(role); - ContainerAllocationOutcome outcome = + ContainerAllocation outcome = outstandingRequests.onContainerAllocated(role, hostname, container); if (desiredCount <= actualCount) { // all outstanding requests have been satisfied @@ -874,4 +874,30 @@ public class RoleHistory { long now = now(); return outstandingRequests.escalateOutstandingRequests(now); } + + /** + * Build the list of requests to cancel from the outstanding list. + * @param role + * @param toCancel + * @return a list of cancellable operations. + */ + public synchronized List<AbstractRMOperation> cancelRequestsForRole(RoleStatus role, int toCancel) { + List<AbstractRMOperation> results = new ArrayList<>(toCancel); + // first scan through the unplaced request list to find all of a role + int roleId = role.getKey(); + List<OutstandingRequest> requests = + outstandingRequests.extractOpenRequestsForRole(roleId, toCancel); + + // are there any left? + int remaining = toCancel - requests.size(); + // ask for some placed nodes + requests.addAll(outstandingRequests.extractPlacedRequestsForRole(roleId, remaining)); + + // build cancellations + for (OutstandingRequest request : requests) { + results.add(request.createCancelOperation()); + } + return results; + } + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateFlexing.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateFlexing.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateFlexing.groovy index d962438..257092a 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateFlexing.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateFlexing.groovy @@ -25,6 +25,7 @@ import org.apache.slider.core.exceptions.TriggerClusterTeardownException import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest import org.apache.slider.server.appmaster.model.mock.MockRoles import org.apache.slider.server.appmaster.operations.AbstractRMOperation +import org.apache.slider.server.appmaster.operations.CancelSingleRequest import org.apache.slider.server.appmaster.state.AppState import org.apache.slider.server.appmaster.state.ContainerAssignment import org.apache.slider.server.appmaster.state.RoleInstance @@ -163,6 +164,23 @@ class TestMockAppStateFlexing extends BaseMockAppStateTest implements MockRoles } } - + + @Test + public void testCancelWithRequestsOutstanding() throws Throwable { + // flex cluster size before the original set were allocated + + + role0Status.desired = 6 + // build the ops + List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes() + // here the data structures exist + + // go down + role0Status.desired = 3 + List<AbstractRMOperation> ops2 = appState.reviewRequestAndReleaseNodes() + assert ops2.size() == 3 + ops2.each { assert it instanceof CancelSingleRequest} + + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy index ee5eead..9ac6fcf 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRMOperations.groovy @@ -20,6 +20,7 @@ package org.apache.slider.server.appmaster.model.appstate import groovy.util.logging.Slf4j import org.apache.hadoop.yarn.api.records.Container +import org.apache.hadoop.yarn.api.records.ContainerId import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest import org.apache.slider.server.appmaster.model.mock.MockFactory @@ -27,10 +28,11 @@ import org.apache.slider.server.appmaster.model.mock.MockRMOperationHandler import org.apache.slider.server.appmaster.model.mock.MockRoles import org.apache.slider.server.appmaster.model.mock.MockYarnEngine import org.apache.slider.server.appmaster.operations.AbstractRMOperation -import org.apache.slider.server.appmaster.operations.CancelRequestOperation +import org.apache.slider.server.appmaster.operations.CancelSingleRequest import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation import org.apache.slider.server.appmaster.operations.ContainerRequestOperation import org.apache.slider.server.appmaster.operations.RMOperationHandler +import org.apache.slider.server.appmaster.state.AppState import org.apache.slider.server.appmaster.state.ContainerAssignment import org.apache.slider.server.appmaster.state.RoleInstance import org.junit.Test @@ -95,10 +97,17 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR assertListLength(ops, 5) // now 5 outstanding requests. assert role0.requested == 5 - + // allocate one - role0.incActual() - role0.decRequested() + List<AbstractRMOperation> processed = [ops[0]] + List<ContainerId> released = [] + List<AppState.NodeCompletionResult> completionResults = [] + submitOperations(processed, released) + List<RoleInstance> instances = createAndSubmitNodes(released) + processSubmissionOperations(instances, completionResults, released) + + + // four outstanding assert role0.requested == 4 @@ -106,9 +115,10 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR role0.desired = 3 ops = appState.reviewRequestAndReleaseNodes() - // expect a cancel operation from review - assertListLength(ops, 1) - assert ops[0] instanceof CancelRequestOperation + // expect two cancel operation from review + assertListLength(ops, 2) + ops.each { assert it instanceof CancelSingleRequest } + RMOperationHandler handler = new MockRMOperationHandler() handler.availableToCancel = 4; handler.execute(ops) @@ -119,11 +129,10 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR role0.desired = 2 ops = appState.reviewRequestAndReleaseNodes() assertListLength(ops, 1) - assert ops[0] instanceof CancelRequestOperation + ops.each { assert it instanceof CancelSingleRequest } handler.execute(ops) assert handler.availableToCancel == 1 assert role0.requested == 1 - } @Test @@ -136,9 +145,8 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR assert role0.requested == 5 role0.desired = 0 ops = appState.reviewRequestAndReleaseNodes() - assertListLength(ops, 1) - CancelRequestOperation cancel = ops[0] as CancelRequestOperation - assert cancel.count == 5 + assertListLength(ops, 5) + } @@ -160,7 +168,9 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR role0.desired = 1; assert role0.delta == -3 ops = appState.reviewRequestAndReleaseNodes() - assertListLength(ops, 2) + assertListLength(ops, 3) + assert 2 == (ops.findAll {it instanceof CancelSingleRequest}).size() + assert 1 == (ops.findAll {it instanceof ContainerReleaseOperation}).size() assert role0.requested == 0 assert role0.releasing == 1 } @@ -171,31 +181,31 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR // role: desired = 2, requested = 1, actual=1 def role0 = role0Status role0.desired = 2 - role0.incRequested() - role0.incRequested() List<AbstractRMOperation> ops - - // there are now two outstanding, two actual + ops = appState.reviewRequestAndReleaseNodes() + assert 2 == (ops.findAll { it instanceof ContainerRequestOperation }).size() + + // there are now two outstanding, two actual // Release 3 and verify that the two // cancellations were combined with a release role0.desired = 0; ops = appState.reviewRequestAndReleaseNodes() - assertListLength(ops, 1) - CancelRequestOperation cancel = ops[0] as CancelRequestOperation - assert cancel.getCount() == 2 + assert ops.size() == 2 + assert 2 == (ops.findAll { it instanceof CancelSingleRequest }).size() } @Test public void testFlexUpOutstandingRequests() throws Throwable { - - // role: desired = 2, requested = 1, actual=1 + + List<AbstractRMOperation> ops + // role: desired = 2, requested = 1, actual=1 def role0 = role0Status role0.desired = 2 role0.incActual(); role0.incRequested() - - List<AbstractRMOperation> ops + + // flex up 2 nodes, yet expect only one node to be requested, // as the outstanding request is taken into account @@ -283,7 +293,9 @@ class TestMockAppStateRMOperations extends BaseMockAppStateTest implements MockR List<ContainerAssignment> assignments = []; List<AbstractRMOperation> releases = [] appState.onContainersAllocated(allocations, assignments, releases) - assertListLength(releases, 0) + // we expect four release requests here for all the allocated containers + assertListLength(releases, 4) + releases.each { assert it instanceof CancelSingleRequest } assertListLength(assignments, 4) assignments.each { ContainerAssignment assigned -> Container target = assigned.container http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRolePlacement.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRolePlacement.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRolePlacement.groovy index 8fd9858..4726e71 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRolePlacement.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateRolePlacement.groovy @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest import org.apache.slider.server.appmaster.model.mock.MockRoles import org.apache.slider.server.appmaster.operations.AbstractRMOperation +import org.apache.slider.server.appmaster.operations.CancelSingleRequest import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation import org.apache.slider.server.appmaster.operations.ContainerRequestOperation import org.apache.slider.server.appmaster.state.ContainerAssignment @@ -59,12 +60,19 @@ class TestMockAppStateRolePlacement extends BaseMockAppStateTest assert request.relaxLocality assert request.nodes == null assert request.racks == null + assert request.capability Container allocated = engine.allocateContainer(request) List<ContainerAssignment> assignments = []; - List<AbstractRMOperation> operations = [] - appState.onContainersAllocated([(Container)allocated], assignments, operations) - assert operations.size() == 0 + List<AbstractRMOperation> releaseOperations = [] + appState.onContainersAllocated([(Container)allocated], assignments, releaseOperations) + // verify the release matches the allocation + assert releaseOperations.size() == 1 + CancelSingleRequest cancelOp = releaseOperations[0] as CancelSingleRequest; + assert cancelOp.request + assert cancelOp.request.capability + assert cancelOp.request.capability.equals(allocated.resource) + // now the assignment assert assignments.size() == 1 ContainerAssignment assigned = assignments[0] Container container = assigned.container http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy index 3f41dfd..3d396f8 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy @@ -23,8 +23,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.slider.providers.PlacementPolicy import org.apache.slider.providers.ProviderRole import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest -import org.apache.slider.server.appmaster.model.mock.MockContainer -import org.apache.slider.server.appmaster.model.mock.MockNodeId import org.apache.slider.server.appmaster.model.mock.MockPriority import org.apache.slider.server.appmaster.model.mock.MockResource import org.apache.slider.server.appmaster.operations.AbstractRMOperation @@ -63,7 +61,7 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { tracker.newRequest(host1, 0) tracker.newRequest(host2, 0) tracker.newRequest(host1, 1) - assert tracker.onContainerAllocated(1, "host1", null) == ContainerAllocationOutcome.Placed + assert tracker.onContainerAllocated(1, "host1", null).outcome == ContainerAllocationOutcome.Placed assert !tracker.lookupPlacedRequest(1, "host1") assert tracker.lookupPlacedRequest(0, "host1") } @@ -91,7 +89,7 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { resource.virtualCores=1 resource.memory = 48; c1.setResource(resource) - ContainerAllocationOutcome outcome = tracker.onContainerAllocated(0, "host1", c1) + ContainerAllocationOutcome outcome = tracker.onContainerAllocated(0, "host1", c1).outcome assert outcome == ContainerAllocationOutcome.Unallocated assert tracker.listOpenRequests().size() == 1 } @@ -120,9 +118,11 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { assert issued.capability == resource assert issued.priority.priority == c1.getPriority().getPriority() assert req1.resourceRequirementsMatch(resource) - ContainerAllocationOutcome outcome = tracker.onContainerAllocated(0, nodeId.host, c1) + + def allocation = tracker.onContainerAllocated(0, nodeId.host, c1) assert tracker.listOpenRequests().size() == 0 - assert outcome == ContainerAllocationOutcome.Open + assert allocation.outcome == ContainerAllocationOutcome.Open + assert allocation.origin.is(req1) } @Test http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy index 34a0a4d..82750a3 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryRequestTracking.groovy @@ -143,11 +143,11 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest { } public void assertOnContainerAllocated(Container c1, int p1, int p2) { - assert ContainerAllocationOutcome.Open != roleHistory.onContainerAllocated(c1, p1, p2) + assert ContainerAllocationOutcome.Open != roleHistory.onContainerAllocated(c1, p1, p2).outcome } public void assertOnContainerAllocationOpen(Container c1, int p1, int p2) { - assert ContainerAllocationOutcome.Open == roleHistory.onContainerAllocated(c1, p1, p2) + assert ContainerAllocationOutcome.Open == roleHistory.onContainerAllocated(c1, p1, p2).outcome } def assertNoOutstandingPlacedRequests() { @@ -190,7 +190,8 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest { // the final allocation will trigger a cleanup container = factory.newContainer(req2, "four") // no node dropped - assert ContainerAllocationOutcome.Unallocated == roleHistory.onContainerAllocated(container, 3, 3) + assert ContainerAllocationOutcome.Unallocated == + roleHistory.onContainerAllocated(container, 3, 3).outcome // yet the list is now empty assertNoOutstandingPlacedRequests() roleHistory.listOpenRequests().empty @@ -198,7 +199,6 @@ class TestRoleHistoryRequestTracking extends BaseMockAppStateTest { // and the remainder goes onto the available list List<NodeInstance> a2 = roleHistory.cloneAvailableList(0) assertListEquals([age2Active0], a2) - } @Test http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/Allocator.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/Allocator.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/Allocator.groovy index a027098..ca5d805 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/Allocator.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/Allocator.groovy @@ -114,6 +114,7 @@ class Allocator { container.nodeId = node.nodeId container.nodeHttpAddress = node.httpAddress() container.priority = request.priority + container.resource = request.capability return container; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b952b640/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy index 04466c6..e3d509a 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockYarnEngine.groovy @@ -114,8 +114,8 @@ class MockYarnEngine { } else { ContainerRequestOperation req = (ContainerRequestOperation) op Container container = allocateContainer(req.request) - log.info("allocated container $container for $req") if (container != null) { + log.info("allocated container $container for $req") allocation.add(container) } else { log.debug("Unsatisfied allocation $req")
