http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
new file mode 100644
index 0000000..9854a15
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestCapacitySchedulerAsyncScheduling {
+  private final int GB = 1024;
+
+  private YarnConfiguration conf;
+
+  RMNodeLabelsManager mgr;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+  }
+
+  @Test(timeout = 300000)
+  public void testSingleThreadAsyncContainerAllocation() throws Exception {
+    testAsyncContainerAllocation(1);
+  }
+
+  @Test(timeout = 300000)
+  public void testTwoThreadsAsyncContainerAllocation() throws Exception {
+    testAsyncContainerAllocation(2);
+  }
+
+  @Test(timeout = 300000)
+  public void testThreeThreadsAsyncContainerAllocation() throws Exception {
+    testAsyncContainerAllocation(3);
+  }
+
+  public void testAsyncContainerAllocation(int numThreads) throws Exception {
+    conf.setInt(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
+        numThreads);
+    conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+        + ".scheduling-interval-ms", 100);
+
+    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+
+    // inject node label manager
+    MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) 
{
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+
+    List<MockNM> nms = new ArrayList<>();
+    // Add 10 nodes to the cluster, in the cluster we have 200 GB resource
+    for (int i = 0; i < 10; i++) {
+      nms.add(rm.registerNode("h-" + i + ":1234", 20 * GB));
+    }
+
+    List<MockAM> ams = new ArrayList<>();
+    // Add 3 applications to the cluster, one app in one queue
+    // the i-th app ask (20 * i) containers. So in total we will have
+    // 123G container allocated
+    int totalAsked = 3 * GB; // 3 AMs
+
+    for (int i = 0; i < 3; i++) {
+      RMApp rmApp = rm.submitApp(1024, "app", "user", null, false,
+          Character.toString((char) (i % 34 + 97)), 1, null, null, false);
+      MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
+      am.registerAppAttempt();
+      ams.add(am);
+    }
+
+    for (int i = 0; i < 3; i++) {
+      ams.get(i).allocate("*", 1024, 20 * (i + 1), new ArrayList<>());
+      totalAsked += 20 * (i + 1) * GB;
+    }
+
+    // Wait for at most 15000 ms
+    int waitTime = 15000; // ms
+    while (waitTime > 0) {
+      if (rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB()
+          == totalAsked) {
+        break;
+      }
+      Thread.sleep(50);
+      waitTime -= 50;
+    }
+
+    Assert.assertEquals(
+        rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB(),
+        totalAsked);
+
+    // Wait for another 2 sec to make sure we will not allocate more than
+    // required
+    waitTime = 2000; // ms
+    while (waitTime > 0) {
+      Assert.assertEquals(
+          rm.getResourceScheduler().getRootQueueMetrics().getAllocatedMB(),
+          totalAsked);
+      waitTime -= 50;
+      Thread.sleep(50);
+    }
+
+    rm.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 7f4fc2c..40e5d2a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -54,6 +54,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -110,15 +111,16 @@ public class TestChildQueueOrder {
     return application;
   }
 
-  private void stubQueueAllocation(final CSQueue queue, 
-      final Resource clusterResource, final FiCaSchedulerNode node, 
+  private void stubQueueAllocation(final CSQueue queue,
+      final Resource clusterResource, final FiCaSchedulerNode node,
       final int allocation) {
-    stubQueueAllocation(queue, clusterResource, node, allocation, 
+    stubQueueAllocation(queue, clusterResource, node, allocation,
         NodeType.NODE_LOCAL);
   }
 
-  private void stubQueueAllocation(final CSQueue queue, 
-      final Resource clusterResource, final FiCaSchedulerNode node, 
+  @SuppressWarnings("unchecked")
+  private void stubQueueAllocation(final CSQueue queue,
+      final Resource clusterResource, final FiCaSchedulerNode node,
       final int allocation, final NodeType type) {
 
     // Simulate the queue allocation
@@ -145,7 +147,7 @@ public class TestChildQueueOrder {
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).
           when(queue)
-              .assignContainers(eq(clusterResource), eq(node),
+              .assignContainers(eq(clusterResource), any(PlacementSet.class),
                   any(ResourceLimits.class), any(SchedulingMode.class));
 
           // Mock the node's resource availability
@@ -157,7 +159,7 @@ public class TestChildQueueOrder {
         return new CSAssignment(allocatedResource, type);
       }
     }).
-    when(queue).assignContainers(eq(clusterResource), eq(node), 
+    when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class),
         any(ResourceLimits.class), any(SchedulingMode.class));
     doNothing().when(node).releaseContainer(any(Container.class));
   }
@@ -214,6 +216,7 @@ public class TestChildQueueOrder {
   }
 
   @Test
+  @SuppressWarnings("unchecked")
   public void testSortedQueues() throws Exception {
     // Setup queue configs
     setupSortedQueues(csConf);
@@ -418,10 +421,10 @@ public class TestChildQueueOrder {
         clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(d,b);
     allocationOrder.verify(d).assignContainers(eq(clusterResource),
-        any(FiCaSchedulerNode.class), any(ResourceLimits.class),
+        any(PlacementSet.class), any(ResourceLimits.class),
         any(SchedulingMode.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource),
-        any(FiCaSchedulerNode.class), any(ResourceLimits.class),
+        any(PlacementSet.class), any(ResourceLimits.class),
         any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index e2b4952..555e0fd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -81,7 +81,7 @@ public class TestContainerAllocation {
     mgr.init(conf);
   }
 
-  @Test(timeout = 3000000)
+  @Test(timeout = 60000)
   public void testExcessReservationThanNodeManagerCapacity() throws Exception {
     @SuppressWarnings("resource")
     MockRM rm = new MockRM(conf);
@@ -598,4 +598,47 @@ public class TestContainerAllocation {
 
     rm1.close();
   }
+
+  @Test(timeout = 60000)
+  public void testAssignMultipleOffswitchContainers() throws Exception {
+    MockRM rm1 = new MockRM();
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 80 * GB);
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    // Do node heartbeats once
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    FiCaSchedulerApp schedulerApp1 =
+        cs.getApplicationAttempt(am1.getApplicationAttemptId());
+
+    // App1 will get one container allocated (plus AM container
+    Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
+
+    // Set assign multiple off-switch containers to 3
+    CapacitySchedulerConfiguration newCSConf = new 
CapacitySchedulerConfiguration();
+    newCSConf.setInt(
+        CapacitySchedulerConfiguration.OFFSWITCH_PER_HEARTBEAT_LIMIT, 3);
+
+    cs.reinitialize(newCSConf, rm1.getRMContext());
+
+    // Do node heartbeats once
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    // App1 will get 3 new container allocated (plus 2 previously allocated
+    // container)
+    Assert.assertEquals(5, schedulerApp1.getLiveContainers().size());
+
+    rm1.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
index 2614630..0696f57 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
@@ -59,9 +59,12 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
     .FiCaSchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestContainerResizing {
@@ -97,13 +100,14 @@ public class TestContainerResizing {
     }
 
     @Override
-    public synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
+    public CSAssignment allocateContainersToNode(
+        PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
       try {
         Thread.sleep(1000);
       } catch(InterruptedException e) {
         LOG.debug("Thread interrupted.");
       }
-      super.allocateContainersToNode(node);
+      return super.allocateContainersToNode(ps, withNodeHeartbeat);
     }
   }
 
@@ -452,7 +456,7 @@ public class TestContainerResizing {
         ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
     sentRMContainerLaunched(rm1, containerId1);
 
-    // am1 asks to change its AM container from 1GB to 3GB
+    // am1 asks to change its AM container from 1GB to 7GB
     am1.sendContainerResizingRequest(Arrays.asList(
             UpdateContainerRequest
                 .newInstance(0, containerId1,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to