Repository: hadoop
Updated Branches:
  refs/heads/HDFS-10467 57c46a57a -> 53e8d0d03 (forced update)


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
new file mode 100644
index 0000000..788b0b3
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/scheduler/TestOpportunisticContainerAllocator.java
@@ -0,0 +1,599 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.scheduler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class TestOpportunisticContainerAllocator {
+
+  private static final int GB = 1024;
+  private OpportunisticContainerAllocator allocator = null;
+  private OpportunisticContainerContext oppCntxt = null;
+
+  @Before
+  public void setup() {
+    SecurityUtil.setTokenServiceUseIp(false);
+    final MasterKey mKey = new MasterKey() {
+      @Override
+      public int getKeyId() {
+        return 1;
+      }
+      @Override
+      public void setKeyId(int keyId) {}
+      @Override
+      public ByteBuffer getBytes() {
+        return ByteBuffer.allocate(8);
+      }
+      @Override
+      public void setBytes(ByteBuffer bytes) {}
+    };
+    BaseContainerTokenSecretManager secMan =
+        new BaseContainerTokenSecretManager(new Configuration()) {
+          @Override
+          public MasterKey getCurrentKey() {
+            return mKey;
+          }
+
+          @Override
+          public byte[] createPassword(ContainerTokenIdentifier identifier) {
+            return new byte[]{1, 2};
+          }
+        };
+    allocator = new OpportunisticContainerAllocator(secMan);
+    oppCntxt = new OpportunisticContainerContext();
+    oppCntxt.getAppParams().setMinResource(Resource.newInstance(1024, 1));
+    oppCntxt.getAppParams().setIncrementResource(Resource.newInstance(512, 1));
+    oppCntxt.getAppParams().setMaxResource(Resource.newInstance(1024, 10));
+  }
+
+  @Test
+  public void testSimpleAllocation() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<>(), new ArrayList<>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+        "*", Resources.createResource(1 * GB), 1, true, null,
+        ExecutionTypeRequest.newInstance(
+            ExecutionType.OPPORTUNISTIC, true)));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h1", 1234), "h1:1234", "/r1")));
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    Assert.assertEquals(1, containers.size());
+    Assert.assertEquals(0, oppCntxt.getOutstandingOpReqs().size());
+  }
+
+  @Test
+  public void testBlacklistRejection() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            Arrays.asList("h1", "h2"), new ArrayList<>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+            "*", Resources.createResource(1 * GB), 1, true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.OPPORTUNISTIC, true)));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r2")));
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    Assert.assertEquals(0, containers.size());
+    Assert.assertEquals(1, oppCntxt.getOutstandingOpReqs().size());
+  }
+
+  @Test
+  public void testRoundRobinSimpleAllocation() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<>(), new ArrayList<>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newBuilder().allocationRequestId(1)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(3)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build());
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r1")));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    System.out.println(containers);
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeHttpAddress());
+    }
+    Assert.assertTrue(allocatedHosts.contains("h1:1234"));
+    Assert.assertTrue(allocatedHosts.contains("h2:1234"));
+    Assert.assertTrue(allocatedHosts.contains("h3:1234"));
+    Assert.assertEquals(3, containers.size());
+  }
+
+  @Test
+  public void testNodeLocalAllocation() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<>(), new ArrayList<>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newBuilder().allocationRequestId(1)
+                .priority(Priority.newInstance(1))
+                .resourceName("/r1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(1)
+                .priority(Priority.newInstance(1))
+                .resourceName("h1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(1)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
+                .resourceName("/r1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
+                .resourceName("h1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build());
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r1")));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    System.out.println(containers);
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeHttpAddress());
+    }
+    Assert.assertEquals(2, containers.size());
+    Assert.assertTrue(allocatedHosts.contains("h1:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h2:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+  }
+
+  @Test
+  public void testNodeLocalAllocationSameSchedKey() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<>(), new ArrayList<>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .numContainers(2)
+                .priority(Priority.newInstance(1))
+                .resourceName("/r1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .numContainers(2)
+                .priority(Priority.newInstance(1))
+                .resourceName("h1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .numContainers(2)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build());
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h1", 1234), "h1:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r1")));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    System.out.println(containers);
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeHttpAddress());
+    }
+    Assert.assertEquals(2, containers.size());
+    Assert.assertTrue(allocatedHosts.contains("h1:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h2:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+  }
+
+  @Test
+  public void testSimpleRackLocalAllocation() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<>(), new ArrayList<>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newInstance(Priority.newInstance(1), "*",
+                Resources.createResource(1 * GB), 1, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "h1",
+                Resources.createResource(1 * GB), 1, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
+                Resources.createResource(1 * GB), 1, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeHttpAddress());
+    }
+    Assert.assertTrue(allocatedHosts.contains("h2:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h4:1234"));
+    Assert.assertEquals(1, containers.size());
+  }
+
+  @Test
+  public void testRoundRobinRackLocalAllocation() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<>(), new ArrayList<>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newBuilder().allocationRequestId(1)
+                .priority(Priority.newInstance(1))
+                .resourceName("/r1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(1)
+                .priority(Priority.newInstance(1))
+                .resourceName("h1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(1)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
+                .resourceName("/r1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
+                .resourceName("h1")
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build(),
+            ResourceRequest.newBuilder().allocationRequestId(2)
+                .priority(Priority.newInstance(1))
+                .resourceName(ResourceRequest.ANY)
+                .capability(Resources.createResource(1 * GB))
+                .relaxLocality(true)
+                .executionType(ExecutionType.OPPORTUNISTIC).build());
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeHttpAddress());
+    }
+    System.out.println(containers);
+    Assert.assertTrue(allocatedHosts.contains("h2:1234"));
+    Assert.assertTrue(allocatedHosts.contains("h5:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h4:1234"));
+    Assert.assertEquals(2, containers.size());
+  }
+
+  @Test
+  public void testRoundRobinRackLocalAllocationSameSchedKey() throws Exception 
{
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<>(), new ArrayList<>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newInstance(Priority.newInstance(1), "*",
+                Resources.createResource(1 * GB), 2, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "h1",
+                Resources.createResource(1 * GB), 2, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
+                Resources.createResource(1 * GB), 2, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    Set<String> allocatedHosts = new HashSet<>();
+    for (Container c : containers) {
+      allocatedHosts.add(c.getNodeHttpAddress());
+    }
+    System.out.println(containers);
+    Assert.assertTrue(allocatedHosts.contains("h2:1234"));
+    Assert.assertTrue(allocatedHosts.contains("h5:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h3:1234"));
+    Assert.assertFalse(allocatedHosts.contains("h4:1234"));
+    Assert.assertEquals(2, containers.size());
+  }
+
+  @Test
+  public void testOffSwitchAllocationWhenNoNodeOrRack() throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<>(), new ArrayList<>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newInstance(Priority.newInstance(1), "*",
+                Resources.createResource(1 * GB), 2, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "h6",
+                Resources.createResource(1 * GB), 2, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "/r3",
+                Resources.createResource(1 * GB), 2, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+    List<Container> containers = allocator.allocateContainers(
+        blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser");
+    System.out.println(containers);
+    Assert.assertEquals(2, containers.size());
+  }
+
+  @Test
+  public void testLotsOfContainersRackLocalAllocationSameSchedKey()
+      throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<>(), new ArrayList<>());
+    List<ResourceRequest> reqs =
+        Arrays.asList(
+            ResourceRequest.newInstance(Priority.newInstance(1), "*",
+                Resources.createResource(1 * GB), 1000, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "h1",
+                Resources.createResource(1 * GB), 1000, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)),
+            ResourceRequest.newInstance(Priority.newInstance(1), "/r1",
+                Resources.createResource(1 * GB), 1000, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true)));
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+    List<Container> containers = new ArrayList<>();
+    for (int i = 0; i < 250; i++) {
+      containers.addAll(allocator.allocateContainers(
+          blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"));
+    }
+    Assert.assertEquals(1000, containers.size());
+  }
+
+  @Test
+  public void testLotsOfContainersRackLocalAllocation()
+      throws Exception {
+    ResourceBlacklistRequest blacklistRequest =
+        ResourceBlacklistRequest.newInstance(
+            new ArrayList<>(), new ArrayList<>());
+    List<ResourceRequest> reqs = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
+          .priority(Priority.newInstance(1))
+          .resourceName("*")
+          .capability(Resources.createResource(1 * GB))
+          .relaxLocality(true)
+          .executionType(ExecutionType.OPPORTUNISTIC).build());
+      reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
+          .priority(Priority.newInstance(1))
+          .resourceName("h1")
+          .capability(Resources.createResource(1 * GB))
+          .relaxLocality(true)
+          .executionType(ExecutionType.OPPORTUNISTIC).build());
+      reqs.add(ResourceRequest.newBuilder().allocationRequestId(i + 1)
+          .priority(Priority.newInstance(1))
+          .resourceName("/r1")
+          .capability(Resources.createResource(1 * GB))
+          .relaxLocality(true)
+          .executionType(ExecutionType.OPPORTUNISTIC).build());
+    }
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0L, 1), 1);
+
+    oppCntxt.updateNodeList(
+        Arrays.asList(
+            RemoteNode.newInstance(
+                NodeId.newInstance("h3", 1234), "h3:1234", "/r2"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h2", 1234), "h2:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h5", 1234), "h5:1234", "/r1"),
+            RemoteNode.newInstance(
+                NodeId.newInstance("h4", 1234), "h4:1234", "/r2")));
+
+    List<Container> containers = new ArrayList<>();
+    for (int i = 0; i < 25; i++) {
+      containers.addAll(allocator.allocateContainers(
+          blacklistRequest, reqs, appAttId, oppCntxt, 1L, "luser"));
+    }
+    Assert.assertEquals(100, containers.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
index c3ed7d5..ce425df 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java
@@ -433,8 +433,12 @@ public class OpportunisticContainerAllocatorAMService
   private RemoteNode convertToRemoteNode(NodeId nodeId) {
     SchedulerNode node =
         ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId);
-    return node != null ? RemoteNode.newInstance(nodeId, node.getHttpAddress())
-        : null;
+    if (node != null) {
+      RemoteNode rNode = RemoteNode.newInstance(nodeId, node.getHttpAddress());
+      rNode.setRackName(node.getRackName());
+      return rNode;
+    }
+    return null;
   }
 
   private static ApplicationAttemptId getAppAttemptId() throws YarnException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b733348d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
index 9b9eb3c..1af930f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java
@@ -610,6 +610,8 @@ public class TestOpportunisticContainerAllocatorAMService {
                 .newInstance(ExecutionType.OPPORTUNISTIC, true))), null);
     List<Container> allocatedContainers =
         allocateResponse.getAllocatedContainers();
+    allocatedContainers.addAll(
+        am1.allocate(null, null).getAllocatedContainers());
     Assert.assertEquals(2, allocatedContainers.size());
     Container container = allocatedContainers.get(0);
     // Start Container in NM


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to