SLIDER-799 code & Test to ensure that there's no race condition during the escalation scan + execute cycle with entries being created & issued
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/3dd2f724 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/3dd2f724 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/3dd2f724 Branch: refs/heads/feature/SLIDER-799-AM-managed-relax Commit: 3dd2f7244026cedf62b651ee8f97f5d0dd885859 Parents: 92e7af6 Author: Steve Loughran <[email protected]> Authored: Wed Mar 18 14:34:18 2015 +0000 Committer: Steve Loughran <[email protected]> Committed: Wed Mar 18 14:34:18 2015 +0000 ---------------------------------------------------------------------- .../appmaster/state/OutstandingRequest.java | 14 +++-- .../state/OutstandingRequestTracker.java | 18 ++++--- ...tRoleHistoryOutstandingRequestTracker.groovy | 56 +++++++++++++------- 3 files changed, 53 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3dd2f724/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java index 799d08e..4fd2933 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequest.java @@ -26,8 +26,6 @@ import org.apache.hadoop.yarn.client.api.AMRMClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; -import java.util.Collections; import java.util.List; /** @@ -134,7 +132,7 @@ public final class OutstandingRequest { * @param labelExpression label to satisfy * @return the request to raise */ - public AMRMClient.ContainerRequest buildContainerRequest( + public synchronized AMRMClient.ContainerRequest buildContainerRequest( Resource resource, RoleStatus role, long time, String labelExpression) { String[] hosts; boolean relaxLocality; @@ -187,9 +185,9 @@ public final class OutstandingRequest { * as the original one, and the same host, but: relaxed placement, and a changed priority * so as to place it into the relaxed list. */ - public AMRMClient.ContainerRequest escalate() { + public synchronized AMRMClient.ContainerRequest escalate() { + Preconditions.checkNotNull(issuedRequest, "cannot escalate if request not issued "+ this); escalated = true; - Preconditions.checkNotNull(issuedRequest, "issued request"); Priority pri = ContainerPriority.createPriority(roleId, true); String[] nodes; List<String> issuedRequestNodes = issuedRequest.getNodes(); @@ -224,8 +222,8 @@ public final class OutstandingRequest { * @param time time to check against * @return true if escalation should begin */ - public boolean shouldEscalate(long time) { - return !escalated && escalationTimeoutMillis < time; + public synchronized boolean shouldEscalate(long time) { + return issuedRequest != null && !escalated && escalationTimeoutMillis < time; } /** @@ -267,7 +265,7 @@ public final class OutstandingRequest { } @Override - public String toString() { + public synchronized String toString() { final StringBuilder sb = new StringBuilder("OutstandingRequest{"); sb.append("roleId=").append(roleId); sb.append(", node=").append(node); http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3dd2f724/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java index 93db3fa..48f6e57 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java @@ -245,14 +245,18 @@ public class OutstandingRequestTracker { List<AbstractRMOperation> operations = new ArrayList<>(); for (OutstandingRequest outstandingRequest : placedRequests.values()) { - if (outstandingRequest.shouldEscalate(now)) { + synchronized (outstandingRequest) { + // sync escalation check with operation so that nothing can happen to state + // of the request during the escalation + if (outstandingRequest.shouldEscalate(now)) { - // time to escalate - CancelSingleRequest cancel = new CancelSingleRequest(outstandingRequest.issuedRequest); - operations.add(cancel); - AMRMClient.ContainerRequest escalated = - outstandingRequest.escalate(); - operations.add(new ContainerRequestOperation(escalated)); + // time to escalate + CancelSingleRequest cancel = new CancelSingleRequest(outstandingRequest.issuedRequest); + operations.add(cancel); + AMRMClient.ContainerRequest escalated = + outstandingRequest.escalate(); + operations.add(new ContainerRequestOperation(escalated)); + } } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/3dd2f724/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy index 8c79bbf..2fe6763 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy @@ -29,6 +29,7 @@ import org.apache.slider.server.appmaster.operations.ContainerRequestOperation import org.apache.slider.server.appmaster.state.NodeInstance import org.apache.slider.server.appmaster.state.OutstandingRequest import org.apache.slider.server.appmaster.state.OutstandingRequestTracker +import org.apache.slider.server.appmaster.state.RoleStatus import org.junit.Test class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { @@ -88,30 +89,32 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { */ @Test - public void testEscalationOfStrictPlacement() throws Throwable { - final def roleStatus = role1Status - - - ProviderRole role = roleStatus.providerRole - assert role.placementPolicy == PlacementPolicy.STRICT; - Resource resource = new MockResource() - - appState.buildResourceRequirements(roleStatus, resource) - - // first requst - OutstandingRequest r1 = tracker.newRequest(host1, roleStatus.key) - final def initialRequest = r1.buildContainerRequest(resource, roleStatus, 0, null) - assert r1.issuedRequest != null; - assert r1.located - assert !r1.escalated - + public void testEscalation() throws Throwable { + ProviderRole providerRole1 = role1Status.providerRole + assert providerRole1.placementPolicy == PlacementPolicy.STRICT; + // first request + final def (res1, outstanding1) = newRequest(role1Status) + final def initialRequest = outstanding1.buildContainerRequest(res1, role1Status, 0, null) + assert outstanding1.issuedRequest != null; + assert outstanding1.located + assert !outstanding1.escalated assert !initialRequest.relaxLocality assert tracker.listOutstandingRequests().size() == 1 - // simulate a few minutes; escalation MUST now be triggered - List<AbstractRMOperation> escalations = tracker.escalateOutstandingRequests(180 * 1000) + // second. This one doesn't get launched. This is to verify that the escalation + // process skips entries which are in the list but have not been issued. + // ...which can be a race condition between request issuance & escalation. + // (not one observed outside test authoring, but retained for completeness) + assert role2Status.placementPolicy == PlacementPolicy.ANTI_AFFINITY_REQUIRED + def (res2, outstanding2) = newRequest(role2Status) + + // simulate some time escalation of role 1 MUST now be triggered + List<AbstractRMOperation> escalations = + tracker.escalateOutstandingRequests(providerRole1.placementTimeoutSeconds * 1000 + 500 ) + + assert outstanding1.escalated + assert !outstanding2.escalated - assert r1.escalated // two entries assert escalations.size() == 2; final def e1 = escalations[0] @@ -124,5 +127,18 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { def req2 = escRequest.request assert req2.relaxLocality + def (res3, outstanding3) = newRequest(role2Status) + outstanding3.buildContainerRequest(res3, role2Status, 0, null) + + List<AbstractRMOperation> escalations2 = + tracker.escalateOutstandingRequests(providerRole1.placementTimeoutSeconds * 1000 + 500) + assert escalations2.size() == 0 + } + + public def newRequest(RoleStatus r) { + final Resource res2 = new MockResource() + appState.buildResourceRequirements(r, res2) + final OutstandingRequest outstanding2 = tracker.newRequest(host1, r.key) + return [res2, outstanding2] } }
