Repository: hadoop Updated Branches: refs/heads/HDFS-10467 57c46a57a -> 53e8d0d03 (forced update)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java new file mode 100644 index 0000000..788b0b3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java @@ -0,0 +1,599 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class TestOpportunisticContainerAllocator { + + private static final int GB = 1024; + private OpportunisticContainerAllocator allocator = null; + private OpportunisticContainerContext oppCntxt = null; + + @Before + public void setup() { + SecurityUtil.setTokenServiceUseIp(false); + final MasterKey mKey = new MasterKey() { + @Override + public int getKeyId() { + return 1; + } + @Override + public void setKeyId(int keyId) {} + @Override + public ByteBuffer getBytes() { + return ByteBuffer.allocate(8); + } + @Override + public void setBytes(ByteBuffer bytes) {} + }; + BaseContainerTokenSecretManager secMan = + new BaseContainerTokenSecretManager(new Configuration()) { + @Override + public MasterKey getCurrentKey() { + return mKey; + } + + @Override + public byte[] createPassword(ContainerTokenIdentifier identifier) { + return new byte[]{1, 2}; + } + }; + allocator = new OpportunisticContainerAllocator(secMan); + oppCntxt = new OpportunisticContainerContext(); + oppCntxt.getAppParams().setMinResource(Resource.newInstance(1024, 1)); + oppCntxt.getAppParams().setIncrementResource(Resource.newInstance(512, 1)); + oppCntxt.getAppParams().setMaxResource(Resource.newInstance(1024, 10)); + } + + @Test + public void testSimpleAllocation() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List<ResourceRequest> reqs = + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"))); + List<Container> containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Assert.assertEquals(1, containers.size()); + Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size()); + } + + @Test + public void testBlacklistRejection() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + Arrays.asList("h1", "h2"), new ArrayList<>()); + List<ResourceRequest> reqs = + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r2"))); + List<Container> containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Assert.assertEquals(0, containers.size()); + Assert.assertEquals(1, oppCntxt.getOutstandingOpReqs().size()); + } + + @Test + public void testRoundRobinSimpleAllocation() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List<ResourceRequest> reqs = + Arrays.asList( + ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(3) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r1"))); + + List<Container> containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + System.out.println(containers); + Set<String> allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeHttpAddress()); + } + Assert.assertTrue(allocatedHosts.contains("h1:1234")); + Assert.assertTrue(allocatedHosts.contains("h2:1234")); + Assert.assertTrue(allocatedHosts.contains("h3:1234")); + Assert.assertEquals(3, containers.size()); + } + + @Test + public void testNodeLocalAllocation() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List<ResourceRequest> reqs = + Arrays.asList( + ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName("/r1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName("h1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName("/r1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName("h1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r1"))); + + List<Container> containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + System.out.println(containers); + Set<String> allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeHttpAddress()); + } + Assert.assertEquals(2, containers.size()); + Assert.assertTrue(allocatedHosts.contains("h1:1234")); + Assert.assertFalse(allocatedHosts.contains("h2:1234")); + Assert.assertFalse(allocatedHosts.contains("h3:1234")); + } + + @Test + public void testNodeLocalAllocationSameSchedKey() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List<ResourceRequest> reqs = + Arrays.asList( + ResourceRequest.newBuilder().allocationRequestId(2) + .numContainers(2) + .priority(Priority.newInstance(1)) + .resourceName("/r1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .numContainers(2) + .priority(Priority.newInstance(1)) + .resourceName("h1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .numContainers(2) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h1", 1234), "h1:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r1"))); + + List<Container> containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + System.out.println(containers); + Set<String> allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeHttpAddress()); + } + Assert.assertEquals(2, containers.size()); + Assert.assertTrue(allocatedHosts.contains("h1:1234")); + Assert.assertFalse(allocatedHosts.contains("h2:1234")); + Assert.assertFalse(allocatedHosts.contains("h3:1234")); + } + + @Test + public void testSimpleRackLocalAllocation() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List<ResourceRequest> reqs = + Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "h1", + Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "/r1", + Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List<Container> containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Set<String> allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeHttpAddress()); + } + Assert.assertTrue(allocatedHosts.contains("h2:1234")); + Assert.assertFalse(allocatedHosts.contains("h3:1234")); + Assert.assertFalse(allocatedHosts.contains("h4:1234")); + Assert.assertEquals(1, containers.size()); + } + + @Test + public void testRoundRobinRackLocalAllocation() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List<ResourceRequest> reqs = + Arrays.asList( + ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName("/r1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName("h1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(1) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName("/r1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName("h1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build(), + ResourceRequest.newBuilder().allocationRequestId(2) + .priority(Priority.newInstance(1)) + .resourceName(ResourceRequest.ANY) + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List<Container> containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Set<String> allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeHttpAddress()); + } + System.out.println(containers); + Assert.assertTrue(allocatedHosts.contains("h2:1234")); + Assert.assertTrue(allocatedHosts.contains("h5:1234")); + Assert.assertFalse(allocatedHosts.contains("h3:1234")); + Assert.assertFalse(allocatedHosts.contains("h4:1234")); + Assert.assertEquals(2, containers.size()); + } + + @Test + public void testRoundRobinRackLocalAllocationSameSchedKey() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List<ResourceRequest> reqs = + Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "h1", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "/r1", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List<Container> containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + Set<String> allocatedHosts = new HashSet<>(); + for (Container c : containers) { + allocatedHosts.add(c.getNodeHttpAddress()); + } + System.out.println(containers); + Assert.assertTrue(allocatedHosts.contains("h2:1234")); + Assert.assertTrue(allocatedHosts.contains("h5:1234")); + Assert.assertFalse(allocatedHosts.contains("h3:1234")); + Assert.assertFalse(allocatedHosts.contains("h4:1234")); + Assert.assertEquals(2, containers.size()); + } + + @Test + public void testOffSwitchAllocationWhenNoNodeOrRack() throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List<ResourceRequest> reqs = + Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "h6", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "/r3", + Resources.createResource(1 * GB), 2, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List<Container> containers = allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"); + System.out.println(containers); + Assert.assertEquals(2, containers.size()); + } + + @Test + public void testLotsOfContainersRackLocalAllocationSameSchedKey() + throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List<ResourceRequest> reqs = + Arrays.asList( + ResourceRequest.newInstance(Priority.newInstance(1), "*", + Resources.createResource(1 * GB), 1000, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "h1", + Resources.createResource(1 * GB), 1000, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)), + ResourceRequest.newInstance(Priority.newInstance(1), "/r1", + Resources.createResource(1 * GB), 1000, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true))); + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List<Container> containers = new ArrayList<>(); + for (int i = 0; i < 250; i++) { + containers.addAll(allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser")); + } + Assert.assertEquals(1000, containers.size()); + } + + @Test + public void testLotsOfContainersRackLocalAllocation() + throws Exception { + ResourceBlacklistRequest blacklistRequest = + ResourceBlacklistRequest.newInstance( + new ArrayList<>(), new ArrayList<>()); + List<ResourceRequest> reqs = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) + .priority(Priority.newInstance(1)) + .resourceName("*") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) + .priority(Priority.newInstance(1)) + .resourceName("h1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1) + .priority(Priority.newInstance(1)) + .resourceName("/r1") + .capability(Resources.createResource(1 * GB)) + .relaxLocality(true) + .executionType(ExecutionType.OPPORTUNISTIC).build()); + } + ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance( + ApplicationId.newInstance(0L, 1), 1); + + oppCntxt.updateNodeList( + Arrays.asList( + RemoteNode.newInstance( + NodeId.newInstance("h3", 1234), "h3:1234", "/r2"), + RemoteNode.newInstance( + NodeId.newInstance("h2", 1234), "h2:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h5", 1234), "h5:1234", "/r1"), + RemoteNode.newInstance( + NodeId.newInstance("h4", 1234), "h4:1234", "/r2"))); + + List<Container> containers = new ArrayList<>(); + for (int i = 0; i < 25; i++) { + containers.addAll(allocator.allocateContainers( + blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser")); + } + Assert.assertEquals(100, containers.size()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java index c3ed7d5..ce425df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java @@ -433,8 +433,12 @@ public class OpportunisticContainerAllocatorAMService private RemoteNode convertToRemoteNode(NodeId nodeId) { SchedulerNode node = ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId); - return node != null ? RemoteNode.newInstance(nodeId, node.getHttpAddress()) - : null; + if (node != null) { + RemoteNode rNode = RemoteNode.newInstance(nodeId, node.getHttpAddress()); + rNode.setRackName(node.getRackName()); + return rNode; + } + return null; } private static ApplicationAttemptId getAppAttemptId() throws YarnException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java index 9b9eb3c..1af930f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java @@ -610,6 +610,8 @@ public class TestOpportunisticContainerAllocatorAMService { .newInstance(ExecutionType.OPPORTUNISTIC, true))), null); List<Container> allocatedContainers = allocateResponse.getAllocatedContainers(); + allocatedContainers.addAll( + am1.allocate(null, null).getAllocatedContainers()); Assert.assertEquals(2, allocatedContainers.size()); Container container = allocatedContainers.get(0); // Start Container in NM --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org