Repository: hadoop Updated Branches: refs/heads/trunk f207e3014 -> 176bb3f81
YARN-8984. AMRMClient#OutstandingSchedRequests leaks when AllocationTags is null or empty. Contributed by Yang Wang. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/176bb3f8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/176bb3f8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/176bb3f8 Branch: refs/heads/trunk Commit: 176bb3f812e49b0fe3abddf54eebfc7219b5d718 Parents: f207e30 Author: Weiwei Yang <[email protected]> Authored: Thu Nov 22 16:52:29 2018 +0800 Committer: Weiwei Yang <[email protected]> Committed: Thu Nov 22 16:52:29 2018 +0800 ---------------------------------------------------------------------- .../yarn/client/api/impl/AMRMClientImpl.java | 5 + .../client/api/impl/BaseAMRMClientTest.java | 4 +- .../TestAMRMClientPlacementConstraints.java | 232 +++++++++++++++---- .../hadoop/yarn/client/AMRMClientUtils.java | 3 +- 4 files changed, 194 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/176bb3f8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java index 6dcecde..c05f7ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/AMRMClientImpl.java @@ -1037,6 +1037,11 @@ public class AMRMClientImpl<T extends ContainerRequest> extends AMRMClient<T> { return remoteRequests.get(Long.valueOf(allocationRequestId)); } + @VisibleForTesting + Map<Set<String>, List<SchedulingRequest>> getOutstandingSchedRequests() { + return outstandingSchedRequests; + } + RemoteRequestsTable<T> putTable(long allocationRequestId, RemoteRequestsTable<T> table) { return remoteRequests.put(Long.valueOf(allocationRequestId), table); http://git-wip-us.apache.org/repos/asf/hadoop/blob/176bb3f8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java index d18652f..3465274 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/BaseAMRMClientTest.java @@ -196,7 +196,9 @@ public class BaseAMRMClientTest { @After public void teardown() throws YarnException, IOException { - yarnClient.killApplication(attemptId.getApplicationId()); + if (yarnClient != null) { + yarnClient.killApplication(attemptId.getApplicationId()); + } attemptId = null; if (yarnClient != null && http://git-wip-us.apache.org/repos/asf/hadoop/blob/176bb3f8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java index 0e88299..993391f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClientPlacementConstraints.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import java.util.ArrayList; @@ -58,66 +59,46 @@ import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.Placement */ public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest { + private List<Container> allocatedContainers = null; + private List<RejectedSchedulingRequest> rejectedSchedulingRequests = null; + private Map<Set<String>, PlacementConstraint> pcMapping = null; + + @Before + public void setup() throws Exception { + conf = new YarnConfiguration(); + allocatedContainers = new ArrayList<>(); + rejectedSchedulingRequests = new ArrayList<>(); + pcMapping = new HashMap<>(); + pcMapping.put(Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))); + pcMapping.put(Collections.singleton("bar"), + PlacementConstraints.build( + PlacementConstraints.targetNotIn(NODE, allocationTag("bar")))); + } + @Test(timeout=60000) - public void testAMRMClientWithPlacementConstraints() + public void testAMRMClientWithPlacementConstraintsByPlacementProcessor() throws Exception { // we have to create a new instance of MiniYARNCluster to avoid SASL qop // mismatches between client and server - teardown(); - conf = new YarnConfiguration(); conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER); createClusterAndStartApplication(conf); + allocatedContainers.clear(); + rejectedSchedulingRequests.clear(); AMRMClient<AMRMClient.ContainerRequest> amClient = AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient(); amClient.setNMTokenCache(new NMTokenCache()); //asserting we are not using the singleton instance cache Assert.assertNotSame(NMTokenCache.getSingleton(), amClient.getNMTokenCache()); - - final List<Container> allocatedContainers = new ArrayList<>(); - final List<RejectedSchedulingRequest> rejectedSchedulingRequests = - new ArrayList<>(); - AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient, 1000, - new AMRMClientAsync.AbstractCallbackHandler() { - @Override - public void onContainersAllocated(List<Container> containers) { - allocatedContainers.addAll(containers); - } - - @Override - public void onRequestsRejected( - List<RejectedSchedulingRequest> rejReqs) { - rejectedSchedulingRequests.addAll(rejReqs); - } - - @Override - public void onContainersCompleted(List<ContainerStatus> statuses) {} - @Override - public void onContainersUpdated(List<UpdatedContainer> containers) {} - @Override - public void onShutdownRequest() {} - @Override - public void onNodesUpdated(List<NodeReport> updatedNodes) {} - @Override - public void onError(Throwable e) {} - - @Override - public float getProgress() { - return 0.1f; - } - }); - + AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient, + 1000, new TestCallbackHandler()); asyncClient.init(conf); asyncClient.start(); - Map<Set<String>, PlacementConstraint> pcMapping = new HashMap<>(); - pcMapping.put(Collections.singleton("foo"), - PlacementConstraints.build( - PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))); - pcMapping.put(Collections.singleton("bar"), - PlacementConstraints.build( - PlacementConstraints.targetNotIn(NODE, allocationTag("bar")))); + asyncClient.registerApplicationMaster("Host", 10000, "", pcMapping); // Send two types of requests - 4 with source tag "foo" have numAlloc = 1 @@ -144,6 +125,15 @@ public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest { allocatedContainers.stream().collect( Collectors.groupingBy(Container::getNodeId)); + Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests = + ((AMRMClientImpl)amClient).getOutstandingSchedRequests(); + // Check the outstanding SchedulingRequests + Assert.assertEquals(2, outstandingSchedRequests.size()); + Assert.assertEquals(1, outstandingSchedRequests.get( + new HashSet<>(Collections.singletonList("foo"))).size()); + Assert.assertEquals(1, outstandingSchedRequests.get( + new HashSet<>(Collections.singletonList("bar"))).size()); + // Ensure 2 containers allocated per node. // Each node should have a "foo" and a "bar" container. Assert.assertEquals(3, containersPerNode.entrySet().size()); @@ -169,6 +159,140 @@ public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest { asyncClient.stop(); } + @Test(timeout=60000) + public void testAMRMClientWithPlacementConstraintsByScheduler() + throws Exception { + // we have to create a new instance of MiniYARNCluster to avoid SASL qop + // mismatches between client and server + conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + createClusterAndStartApplication(conf); + + allocatedContainers.clear(); + rejectedSchedulingRequests.clear(); + AMRMClient<AMRMClient.ContainerRequest> amClient = + AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient(); + amClient.setNMTokenCache(new NMTokenCache()); + //asserting we are not using the singleton instance cache + Assert.assertNotSame(NMTokenCache.getSingleton(), + amClient.getNMTokenCache()); + AMRMClientAsync asyncClient = new AMRMClientAsyncImpl<>(amClient, + 1000, new TestCallbackHandler()); + asyncClient.init(conf); + asyncClient.start(); + + asyncClient.registerApplicationMaster("Host", 10000, "", pcMapping); + + // Send two types of requests - 4 with source tag "foo" have numAlloc = 1 + // and 1 with source tag "bar" and has numAlloc = 4. Both should be + // handled similarly. i.e: Since there are only 3 nodes, + // 2 schedulingRequests - 1 with source tag "foo" on one with source + // tag "bar" should get rejected. + asyncClient.addSchedulingRequests( + Arrays.asList( + // 4 reqs with numAlloc = 1 + schedulingRequest(1, 1, 1, 1, 512, "foo"), + schedulingRequest(1, 1, 2, 1, 512, "foo"), + schedulingRequest(1, 1, 3, 1, 512, "foo"), + schedulingRequest(1, 1, 4, 1, 512, "foo"), + // 1 req with numAlloc = 4 + schedulingRequest(4, 1, 5, 1, 512, "bar"), + // 1 empty tag + schedulingRequest(1, 1, 6, 1, 512, new HashSet<>()))); + + // kick the scheduler + waitForContainerAllocation(allocatedContainers, + rejectedSchedulingRequests, 7, 0); + + Assert.assertEquals(7, allocatedContainers.size()); + Map<NodeId, List<Container>> containersPerNode = + allocatedContainers.stream().collect( + Collectors.groupingBy(Container::getNodeId)); + + Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests = + ((AMRMClientImpl)amClient).getOutstandingSchedRequests(); + // Check the outstanding SchedulingRequests + Assert.assertEquals(3, outstandingSchedRequests.size()); + Assert.assertEquals(1, outstandingSchedRequests.get( + new HashSet<>(Collections.singletonList("foo"))).size()); + Assert.assertEquals(1, outstandingSchedRequests.get( + new HashSet<>(Collections.singletonList("bar"))).size()); + Assert.assertEquals(0, outstandingSchedRequests.get( + new HashSet<String>()).size()); + + // Each node should have a "foo" and a "bar" container. + Assert.assertEquals(3, containersPerNode.entrySet().size()); + HashSet<String> srcTags = new HashSet<>(Arrays.asList("foo", "bar")); + containersPerNode.entrySet().forEach( + x -> + Assert.assertEquals( + srcTags, + x.getValue() + .stream() + .filter(y -> !y.getAllocationTags().isEmpty()) + .map(y -> y.getAllocationTags().iterator().next()) + .collect(Collectors.toSet())) + ); + + // The rejected requests were not set by scheduler + Assert.assertEquals(0, rejectedSchedulingRequests.size()); + + asyncClient.stop(); + } + + + @Test + /* + * Three cases of empty HashSet key of outstandingSchedRequests + * 1. Not set any tags + * 2. Set a empty set, e.g ImmutableSet.of(), new HashSet<>() + * 3. Set tag as null + */ + public void testEmptyKeyOfOutstandingSchedRequests() { + AMRMClient<AMRMClient.ContainerRequest> amClient = + AMRMClient.<AMRMClient.ContainerRequest>createAMRMClient(); + HashSet<String> schedRequest = null; + amClient.addSchedulingRequests(Arrays.asList( + schedulingRequest(1, 1, 1, 1, 512, ExecutionType.GUARANTEED), + schedulingRequest(1, 1, 2, 1, 512, new HashSet<>()), + schedulingRequest(1, 1, 3, 1, 512, schedRequest))); + Map<Set<String>, List<SchedulingRequest>> outstandingSchedRequests = + ((AMRMClientImpl)amClient).getOutstandingSchedRequests(); + Assert.assertEquals(1, outstandingSchedRequests.size()); + Assert.assertEquals(3, outstandingSchedRequests + .get(new HashSet<String>()).size()); + } + + private class TestCallbackHandler extends + AMRMClientAsync.AbstractCallbackHandler { + @Override + public void onContainersAllocated(List<Container> containers) { + allocatedContainers.addAll(containers); + } + + @Override + public void onRequestsRejected( + List<RejectedSchedulingRequest> rejReqs) { + rejectedSchedulingRequests.addAll(rejReqs); + } + + @Override + public void onContainersCompleted(List<ContainerStatus> statuses) {} + @Override + public void onContainersUpdated(List<UpdatedContainer> containers) {} + @Override + public void onShutdownRequest() {} + @Override + public void onNodesUpdated(List<NodeReport> updatedNodes) {} + @Override + public void onError(Throwable e) {} + + @Override + public float getProgress() { + return 0.1f; + } + } + private static void waitForContainerAllocation( List<Container> allocatedContainers, List<RejectedSchedulingRequest> rejectedRequests, @@ -186,16 +310,30 @@ public class TestAMRMClientPlacementConstraints extends BaseAMRMClientTest { private static SchedulingRequest schedulingRequest(int numAllocations, int priority, long allocReqId, int cores, int mem, String... tags) { return schedulingRequest(numAllocations, priority, allocReqId, cores, mem, - ExecutionType.GUARANTEED, tags); + ExecutionType.GUARANTEED, new HashSet<>(Arrays.asList(tags))); + } + + private static SchedulingRequest schedulingRequest(int numAllocations, + int priority, long allocReqId, int cores, int mem, Set<String> tags) { + return schedulingRequest(numAllocations, + priority, allocReqId, cores, mem, ExecutionType.GUARANTEED, tags); + } + + private static SchedulingRequest schedulingRequest(int numAllocations, + int priority, long allocReqId, int cores, int mem, + ExecutionType execType, Set<String> tags) { + SchedulingRequest schedRequest = schedulingRequest(numAllocations, + priority, allocReqId, cores, mem, execType); + schedRequest.setAllocationTags(tags); + return schedRequest; } private static SchedulingRequest schedulingRequest(int numAllocations, int priority, long allocReqId, int cores, int mem, - ExecutionType execType, String... tags) { + ExecutionType execType) { return SchedulingRequest.newBuilder() .priority(Priority.newInstance(priority)) .allocationRequestId(allocReqId) - .allocationTags(new HashSet<>(Arrays.asList(tags))) .executionType(ExecutionTypeRequest.newInstance(execType, true)) .resourceSizing( ResourceSizing.newInstance(numAllocations, http://git-wip-us.apache.org/repos/asf/hadoop/blob/176bb3f8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java index 34a9b34..1e363cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java @@ -197,8 +197,7 @@ public final class AMRMClientUtils { return; } for (Container container : containers) { - if (container.getAllocationTags() != null - && !container.getAllocationTags().isEmpty()) { + if (container.getAllocationTags() != null) { List<SchedulingRequest> schedReqs = outstandingSchedRequests.get(container.getAllocationTags()); if (schedReqs != null && !schedReqs.isEmpty()) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
