YARN-3873. PendingApplications in LeafQueue should also use OrderingPolicy. 
(Sunil G via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cf9d3c92
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cf9d3c92
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cf9d3c92

Branch: refs/heads/HDFS-7285
Commit: cf9d3c925608e8bc650d43975382ed3014081057
Parents: 8f73bdd
Author: Wangda Tan <wan...@apache.org>
Authored: Mon Aug 10 14:54:55 2015 -0700
Committer: Wangda Tan <wan...@apache.org>
Committed: Mon Aug 10 14:54:55 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 +
 .../scheduler/capacity/CapacityScheduler.java   | 18 +----
 .../capacity/CapacitySchedulerContext.java      |  2 -
 .../scheduler/capacity/LeafQueue.java           | 55 ++++++++-----
 ...pacityPreemptionPolicyForNodePartitions.java | 10 ++-
 .../capacity/TestApplicationLimits.java         | 14 +---
 .../capacity/TestCapacityScheduler.java         | 83 +++++++-------------
 .../scheduler/capacity/TestChildQueueOrder.java |  2 -
 .../scheduler/capacity/TestLeafQueue.java       | 49 ++++++------
 .../scheduler/capacity/TestParentQueue.java     |  2 -
 .../scheduler/capacity/TestReservations.java    |  2 -
 11 files changed, 106 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 7d34eeb..5e27a2f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -161,6 +161,9 @@ Release 2.8.0 - UNRELEASED
 
     YARN-3948. Display Application Priority in RM Web UI.(Sunil G via 
rohithsharmaks)
 
+    YARN-3873. PendingApplications in LeafQueue should also use 
OrderingPolicy. 
+    (Sunil G via wangda)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 1d353a6..b4d0095 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -156,17 +156,6 @@ public class CapacityScheduler extends
   static final PartitionedQueueComparator partitionedQueueComparator =
       new PartitionedQueueComparator();
 
-  public static final Comparator<FiCaSchedulerApp> applicationComparator =
-    new Comparator<FiCaSchedulerApp>() {
-    @Override
-    public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
-      if (!a1.getPriority().equals(a2.getPriority())) {
-        return a1.getPriority().compareTo(a2.getPriority());
-      }
-      return a1.getApplicationId().compareTo(a2.getApplicationId());
-    }
-  };
-
   @Override
   public void setConf(Configuration conf) {
       yarnConf = conf;
@@ -275,11 +264,6 @@ public class CapacityScheduler extends
   }
 
   @Override
-  public Comparator<FiCaSchedulerApp> getApplicationComparator() {
-    return applicationComparator;
-  }
-
-  @Override
   public ResourceCalculator getResourceCalculator() {
     return calculator;
   }
@@ -1633,7 +1617,7 @@ public class CapacityScheduler extends
     if (disposableLeafQueue.getNumApplications() > 0) {
       throw new SchedulerDynamicEditException("The queue " + queueName
           + " is not empty " + disposableLeafQueue.getApplications().size()
-          + " active apps " + disposableLeafQueue.pendingApplications.size()
+          + " active apps " + 
disposableLeafQueue.getPendingApplications().size()
           + " pending apps");
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index 707c463..2a0dd0d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -54,8 +54,6 @@ public interface CapacitySchedulerContext {
    */
   Configuration getConf();
 
-  Comparator<FiCaSchedulerApp> getApplicationComparator();
-
   ResourceCalculator getResourceCalculator();
 
   Comparator<CSQueue> getNonPartitionedQueueComparator();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 2691c33..5976f58 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -93,7 +93,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   private Priority defaultAppPriorityPerQueue;
 
-  Set<FiCaSchedulerApp> pendingApplications;
+  private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null;
   
   private volatile float minimumAllocationFactor;
 
@@ -117,8 +117,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
 
-  private OrderingPolicy<FiCaSchedulerApp> 
-    orderingPolicy = new FifoOrderingPolicy<FiCaSchedulerApp>();
+  private OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
 
   // record all ignore partition exclusivityRMContainer, this will be used to 
do
   // preemption, key is the partition of the RMContainer allocated on
@@ -136,11 +135,6 @@ public class LeafQueue extends AbstractCSQueue {
       LOG.debug("LeafQueue:" + " name=" + queueName
         + ", fullname=" + getQueuePath());
     }
-
-    Comparator<FiCaSchedulerApp> applicationComparator =
-        cs.getApplicationComparator();
-    this.pendingApplications = 
-        new TreeSet<FiCaSchedulerApp>(applicationComparator);
     
     setupQueueConfigs(cs.getClusterResource());
   }
@@ -164,6 +158,8 @@ public class LeafQueue extends AbstractCSQueue {
     CapacitySchedulerConfiguration conf = csContext.getConfiguration();
     
     
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
+    setPendingAppsOrderingPolicy(conf
+        .<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
     
     userLimit = conf.getUserLimit(getQueuePath());
     userLimitFactor = conf.getUserLimitFactor(getQueuePath());
@@ -328,7 +324,7 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   public synchronized int getNumPendingApplications() {
-    return pendingApplications.size();
+    return pendingOrderingPolicy.getNumSchedulableEntities();
   }
 
   public synchronized int getNumActiveApplications() {
@@ -594,8 +590,8 @@ public class LeafQueue extends AbstractCSQueue {
     Resource amLimit = getAMResourceLimit();
     Resource userAMLimit = getUserAMResourceLimit();
         
-    for (Iterator<FiCaSchedulerApp> i=pendingApplications.iterator(); 
-         i.hasNext(); ) {
+    for (Iterator<FiCaSchedulerApp> i = getPendingAppsOrderingPolicy()
+        .getAssignmentIterator(); i.hasNext();) {
       FiCaSchedulerApp application = i.next();
       ApplicationId applicationId = application.getApplicationId();
       // Check am resource limit
@@ -662,7 +658,7 @@ public class LeafQueue extends AbstractCSQueue {
       User user) {
     // Accept 
     user.submitApplication();
-    pendingApplications.add(application);
+    getPendingAppsOrderingPolicy().addSchedulableEntity(application);
     applicationAttemptMap.put(application.getApplicationAttemptId(), 
application);
 
     // Activate applications
@@ -701,7 +697,7 @@ public class LeafQueue extends AbstractCSQueue {
     boolean wasActive =
       orderingPolicy.removeSchedulableEntity(application);
     if (!wasActive) {
-      pendingApplications.remove(application);
+      pendingOrderingPolicy.removeSchedulableEntity(application);
     } else {
       queueUsage.decAMUsed(application.getAMResource());
       user.getResourceUsage().decAMUsed(application.getAMResource());
@@ -1354,7 +1350,14 @@ public class LeafQueue extends AbstractCSQueue {
     }
     getParent().recoverContainer(clusterResource, attempt, rmContainer);
   }
-  
+
+  /**
+   * Obtain (read-only) collection of pending applications.
+   */
+  public Collection<FiCaSchedulerApp> getPendingApplications() {
+    return pendingOrderingPolicy.getSchedulableEntities();
+  }
+
   /**
    * Obtain (read-only) collection of active applications.
    */
@@ -1375,7 +1378,8 @@ public class LeafQueue extends AbstractCSQueue {
   @Override
   public synchronized void collectSchedulerApplications(
       Collection<ApplicationAttemptId> apps) {
-    for (FiCaSchedulerApp pendingApp : pendingApplications) {
+    for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
+        .getSchedulableEntities()) {
       apps.add(pendingApp.getApplicationAttemptId());
     }
     for (FiCaSchedulerApp app : 
@@ -1450,9 +1454,10 @@ public class LeafQueue extends AbstractCSQueue {
   
   public synchronized void setOrderingPolicy(
       OrderingPolicy<FiCaSchedulerApp> orderingPolicy) {
-   orderingPolicy.addAllSchedulableEntities(
-     this.orderingPolicy.getSchedulableEntities()
-     );
+    if (null != this.orderingPolicy) {
+      orderingPolicy.addAllSchedulableEntities(this.orderingPolicy
+          .getSchedulableEntities());
+    }
     this.orderingPolicy = orderingPolicy;
   }
 
@@ -1461,6 +1466,20 @@ public class LeafQueue extends AbstractCSQueue {
     return defaultAppPriorityPerQueue;
   }
 
+  public synchronized OrderingPolicy<FiCaSchedulerApp>
+      getPendingAppsOrderingPolicy() {
+    return pendingOrderingPolicy;
+  }
+  public synchronized void setPendingAppsOrderingPolicy(
+      OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy) {
+    if (null != this.pendingOrderingPolicy) {
+      pendingOrderingPolicy
+          .addAllSchedulableEntities(this.pendingOrderingPolicy
+              .getSchedulableEntities());
+    }
+    this.pendingOrderingPolicy = pendingOrderingPolicy;
+  }
+
   /*
    * Holds shared values used by all applications in
    * the queue to calculate headroom on demand

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index d6f64bf..bbcb625 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -34,6 +34,7 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -1127,8 +1128,13 @@ public class 
TestProportionalCapacityPreemptionPolicyForNodePartitions {
         when(parentQueue.getChildQueues()).thenReturn(children);
       } else {
         LeafQueue leafQueue = mock(LeafQueue.class);
-        final TreeSet<FiCaSchedulerApp> apps =
-            new TreeSet<>(CapacityScheduler.applicationComparator);
+        final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
+            new Comparator<FiCaSchedulerApp>() {
+              @Override
+              public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
+                return a1.getApplicationId().compareTo(a2.getApplicationId());
+              }
+            });
         when(leafQueue.getApplications()).thenReturn(apps);
         OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
         when(so.getPreemptionIterator()).thenAnswer(new Answer() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index fa2a8e3..8c4ffd4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -93,8 +93,6 @@ public class TestApplicationLimits {
         thenReturn(Resources.createResource(16*GB, 32));
     when(csContext.getClusterResource()).
         thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
-    when(csContext.getApplicationComparator()).
-        thenReturn(CapacityScheduler.applicationComparator);
     when(csContext.getNonPartitionedQueueComparator()).
         thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).
@@ -255,8 +253,6 @@ public class TestApplicationLimits {
         thenReturn(Resources.createResource(GB, 1));
     when(csContext.getMaximumResourceCapability()).
         thenReturn(Resources.createResource(16*GB, 16));
-    when(csContext.getApplicationComparator()).
-        thenReturn(CapacityScheduler.applicationComparator);
     when(csContext.getNonPartitionedQueueComparator()).
         thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
@@ -499,7 +495,7 @@ public class TestApplicationLimits {
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
-    assertTrue(queue.pendingApplications.contains(app_2));
+    assertTrue(queue.getPendingApplications().contains(app_2));
 
     // Submit fourth application, should remain pending
     FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0,
@@ -509,7 +505,7 @@ public class TestApplicationLimits {
     assertEquals(2, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(2, queue.getNumPendingApplications(user_0));
-    assertTrue(queue.pendingApplications.contains(app_3));
+    assertTrue(queue.getPendingApplications().contains(app_3));
 
     // Kill 3rd pending application
     queue.finishApplicationAttempt(app_2, A);
@@ -517,7 +513,7 @@ public class TestApplicationLimits {
     assertEquals(1, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
-    assertFalse(queue.pendingApplications.contains(app_2));
+    assertFalse(queue.getPendingApplications().contains(app_2));
     assertFalse(queue.getApplications().contains(app_2));
 
     // Finish 1st application, app_3 should become active
@@ -527,7 +523,7 @@ public class TestApplicationLimits {
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
     assertTrue(queue.getApplications().contains(app_3));
-    assertFalse(queue.pendingApplications.contains(app_3));
+    assertFalse(queue.getPendingApplications().contains(app_3));
     assertFalse(queue.getApplications().contains(app_0));
 
     // Finish 2nd application
@@ -562,8 +558,6 @@ public class TestApplicationLimits {
         thenReturn(Resources.createResource(GB));
     when(csContext.getMaximumResourceCapability()).
         thenReturn(Resources.createResource(16*GB));
-    when(csContext.getApplicationComparator()).
-        thenReturn(CapacityScheduler.applicationComparator);
     when(csContext.getNonPartitionedQueueComparator()).
         thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 6933e41..279299e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -889,61 +889,36 @@ public class TestCapacityScheduler {
         0, alloc1Response.getAllocatedContainers().size());
     rm.stop();
   }
-
-    @Test (timeout = 5000)
-    public void testApplicationComparator()
-    {
-      CapacityScheduler cs = new CapacityScheduler();
-      Comparator<FiCaSchedulerApp> appComparator= 
cs.getApplicationComparator();
-      ApplicationId id1 = ApplicationId.newInstance(1, 1);
-      ApplicationId id2 = ApplicationId.newInstance(1, 2);
-      ApplicationId id3 = ApplicationId.newInstance(2, 1);
-      Priority priority = Priority.newInstance(0);
-      //same clusterId
-      FiCaSchedulerApp app1 = Mockito.mock(FiCaSchedulerApp.class);
-      when(app1.getApplicationId()).thenReturn(id1);
-      when(app1.getPriority()).thenReturn(priority);
-      FiCaSchedulerApp app2 = Mockito.mock(FiCaSchedulerApp.class);
-      when(app2.getApplicationId()).thenReturn(id2);
-      when(app2.getPriority()).thenReturn(priority);
-      FiCaSchedulerApp app3 = Mockito.mock(FiCaSchedulerApp.class);
-      when(app3.getApplicationId()).thenReturn(id3);
-      when(app3.getPriority()).thenReturn(priority);
-      assertTrue(appComparator.compare(app1, app2) < 0);
-      //different clusterId
-      assertTrue(appComparator.compare(app1, app3) < 0);
-      assertTrue(appComparator.compare(app2, app3) < 0);
-    }
     
-    @Test
-    public void testGetAppsInQueue() throws Exception {
-      Application application_0 = new Application("user_0", "a1", 
resourceManager);
-      application_0.submit();
-      
-      Application application_1 = new Application("user_0", "a2", 
resourceManager);
-      application_1.submit();
-      
-      Application application_2 = new Application("user_0", "b2", 
resourceManager);
-      application_2.submit();
-      
-      ResourceScheduler scheduler = resourceManager.getResourceScheduler();
-      
-      List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
-      assertEquals(1, appsInA1.size());
-      
-      List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
-      assertTrue(appsInA.contains(application_0.getApplicationAttemptId()));
-      assertTrue(appsInA.contains(application_1.getApplicationAttemptId()));
-      assertEquals(2, appsInA.size());
-
-      List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
-      assertTrue(appsInRoot.contains(application_0.getApplicationAttemptId()));
-      assertTrue(appsInRoot.contains(application_1.getApplicationAttemptId()));
-      assertTrue(appsInRoot.contains(application_2.getApplicationAttemptId()));
-      assertEquals(3, appsInRoot.size());
-      
-      Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
-    }
+  @Test
+  public void testGetAppsInQueue() throws Exception {
+    Application application_0 = new Application("user_0", "a1", 
resourceManager);
+    application_0.submit();
+
+    Application application_1 = new Application("user_0", "a2", 
resourceManager);
+    application_1.submit();
+
+    Application application_2 = new Application("user_0", "b2", 
resourceManager);
+    application_2.submit();
+
+    ResourceScheduler scheduler = resourceManager.getResourceScheduler();
+
+    List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
+    assertEquals(1, appsInA1.size());
+
+    List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
+    assertTrue(appsInA.contains(application_0.getApplicationAttemptId()));
+    assertTrue(appsInA.contains(application_1.getApplicationAttemptId()));
+    assertEquals(2, appsInA.size());
+
+    List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
+    assertTrue(appsInRoot.contains(application_0.getApplicationAttemptId()));
+    assertTrue(appsInRoot.contains(application_1.getApplicationAttemptId()));
+    assertTrue(appsInRoot.contains(application_2.getApplicationAttemptId()));
+    assertEquals(3, appsInRoot.size());
+
+    Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
+  }
 
   @Test
   public void testAddAndRemoveAppFromCapacityScheduler() throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 295a31a..9dcab2e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -94,8 +94,6 @@ public class TestChildQueueOrder {
         Resources.createResource(16*GB, 32));
     when(csContext.getClusterResource()).
     thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
-    when(csContext.getApplicationComparator()).
-    thenReturn(CapacityScheduler.applicationComparator);
     when(csContext.getNonPartitionedQueueComparator()).
     thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index f419528..0efadc1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -144,8 +144,6 @@ public class TestLeafQueue {
         thenReturn(Resources.createResource(16*GB, 32));
     when(csContext.getClusterResource()).
         thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
-    when(csContext.getApplicationComparator()).
-    thenReturn(CapacityScheduler.applicationComparator);
     when(csContext.getNonPartitionedQueueComparator()).
         thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).
@@ -1910,7 +1908,7 @@ public class TestLeafQueue {
 
     // before reinitialization
     assertEquals(2, e.getNumActiveApplications());
-    assertEquals(1, e.pendingApplications.size());
+    assertEquals(1, e.getNumPendingApplications());
 
     csConf.setDouble(CapacitySchedulerConfiguration
         .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
@@ -1927,7 +1925,7 @@ public class TestLeafQueue {
 
     // after reinitialization
     assertEquals(3, e.getNumActiveApplications());
-    assertEquals(0, e.pendingApplications.size());
+    assertEquals(0, e.getNumPendingApplications());
   }
   
   @Test (timeout = 30000)
@@ -1991,7 +1989,7 @@ public class TestLeafQueue {
 
     // before updating cluster resource
     assertEquals(2, e.getNumActiveApplications());
-    assertEquals(1, e.pendingApplications.size());
+    assertEquals(1, e.getNumPendingApplications());
 
     Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 
32); 
     e.updateClusterResource(clusterResource,
@@ -1999,7 +1997,7 @@ public class TestLeafQueue {
 
     // after updating cluster resource
     assertEquals(3, e.getNumActiveApplications());
-    assertEquals(0, e.pendingApplications.size());
+    assertEquals(0, e.getNumPendingApplications());
   }
 
   public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl) {
@@ -2350,8 +2348,9 @@ public class TestLeafQueue {
 
     LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
     
-    a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>()); 
-    
+    a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());
+    a.setPendingAppsOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>());
+
     String host_0_0 = "127.0.0.1";
     String rack_0 = "rack_0";
     FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 
16*GB);
@@ -2367,14 +2366,14 @@ public class TestLeafQueue {
         TestUtils.getMockApplicationAttemptId(0, 0); 
     FiCaSchedulerApp app_0 = 
         spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
-            mock(ActiveUsersManager.class), spyRMContext));
+            mock(ActiveUsersManager.class), spyRMContext, 
Priority.newInstance(3)));
     a.submitApplicationAttempt(app_0, user_0);
     
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
         spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
-            mock(ActiveUsersManager.class), spyRMContext));
+            mock(ActiveUsersManager.class), spyRMContext, 
Priority.newInstance(5)));
     a.submitApplicationAttempt(app_1, user_0);
  
     Priority priority = TestUtils.createMockPriority(1);
@@ -2392,36 +2391,34 @@ public class TestLeafQueue {
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, 
             true, priority, recordFactory));
     app_1.updateResourceRequests(app_1_requests_0);
-    
-    a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+
+    // app_1 will get containers as it has high priority
     a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
-    
+    a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+
     app_0_requests_0.clear();
     app_0_requests_0.add(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, 
             true, priority, recordFactory));
     app_0.updateResourceRequests(app_0_requests_0);
-    
+
     app_1_requests_0.clear();
     app_1_requests_0.add(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, 
             true, priority, recordFactory));
     app_1.updateResourceRequests(app_1_requests_0);
-    
-    //Even thought it already has more resources, app_0 will still get 
-    //assigned first
-    a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
-    Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
-    
-    //and only then will app_1
+
+    //app_1 will still get assigned first as priority is more.
     a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
+    Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
 
-  }
-
+    //and only then will app_2
+    a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
+}
   @Test
   public void testConcurrentAccess() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
@@ -2500,6 +2497,7 @@ public class TestLeafQueue {
       new FairOrderingPolicy<FiCaSchedulerApp>();
 
     a.setOrderingPolicy(schedulingOrder);
+    a.setPendingAppsOrderingPolicy(new FairOrderingPolicy<FiCaSchedulerApp>());
 
     String host_0_0 = "127.0.0.1";
     String rack_0 = "rack_0";
@@ -2542,6 +2540,7 @@ public class TestLeafQueue {
             true, priority, recordFactory));
     app_1.updateResourceRequests(app_1_requests_0);
 
+    // app_0 will get containers as its submitted first.
     a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 4deaaae..ef35093 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -90,8 +90,6 @@ public class TestParentQueue {
         Resources.createResource(16*GB, 32));
     when(csContext.getClusterResource()).
         thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
-    when(csContext.getApplicationComparator()).
-    thenReturn(CapacityScheduler.applicationComparator);
     when(csContext.getNonPartitionedQueueComparator()).
     thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cf9d3c92/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index 66ad3a8..6a0b11b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -112,8 +112,6 @@ public class TestReservations {
         Resources.createResource(16 * GB, 12));
     when(csContext.getClusterResource()).thenReturn(
         Resources.createResource(100 * 16 * GB, 100 * 12));
-    when(csContext.getApplicationComparator()).thenReturn(
-        CapacityScheduler.applicationComparator);
     when(csContext.getNonPartitionedQueueComparator()).thenReturn(
         CapacityScheduler.nonPartitionedQueueComparator);
     when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);

Reply via email to