Repository: hadoop
Updated Branches:
  refs/heads/trunk c92f6f360 -> 44872b76f


YARN-3463. Integrate OrderingPolicy Framework with CapacityScheduler. (Craig 
Welch via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/44872b76
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/44872b76
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/44872b76

Branch: refs/heads/trunk
Commit: 44872b76fcc0ddfbc7b0a4e54eef50fe8708e0f5
Parents: c92f6f3
Author: Wangda Tan <wan...@apache.org>
Authored: Mon Apr 20 17:12:32 2015 -0700
Committer: Wangda Tan <wan...@apache.org>
Committed: Mon Apr 20 17:12:32 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../ProportionalCapacityPreemptionPolicy.java   |   5 +-
 .../scheduler/SchedulerApplicationAttempt.java  |  24 ++++-
 .../CapacitySchedulerConfiguration.java         |  31 +++++-
 .../scheduler/capacity/LeafQueue.java           |  63 +++++++++---
 .../AbstractComparatorOrderingPolicy.java       |  13 +--
 .../scheduler/policy/FifoOrderingPolicy.java    |   7 +-
 .../scheduler/policy/OrderingPolicy.java        |   4 +-
 .../webapp/CapacitySchedulerPage.java           |   1 +
 .../dao/CapacitySchedulerLeafQueueInfo.java     |   9 ++
 ...estProportionalCapacityPreemptionPolicy.java |  14 ++-
 .../capacity/TestApplicationLimits.java         |  15 +--
 .../scheduler/capacity/TestLeafQueue.java       | 102 ++++++++++++++++++-
 13 files changed, 242 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/44872b76/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index fa3061a..3f1551f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -87,6 +87,9 @@ Release 2.8.0 - UNRELEASED
     YARN-1402. Update related Web UI and CLI with exposing client API to check
     log aggregation status. (Xuan Gong via junping_du)
 
+    YARN-3463. Integrate OrderingPolicy Framework with CapacityScheduler.
+    (Craig Welch via wangda)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

http://git-wip-us.apache.org/repos/asf/hadoop/blob/44872b76/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 87a2a00..2ab4197 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -550,9 +550,8 @@ public class ProportionalCapacityPreemptionPolicy 
implements SchedulingEditPolic
 
         // lock the leafqueue while we scan applications and unreserve
         synchronized (qT.leafQueue) {
-          NavigableSet<FiCaSchedulerApp> ns = 
-              (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
-          Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
+          Iterator<FiCaSchedulerApp> desc =   
+            qT.leafQueue.getOrderingPolicy().getPreemptionIterator();
           qT.actuallyPreempted = Resources.clone(resToObtain);
           while (desc.hasNext()) {
             FiCaSchedulerApp fc = desc.next();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/44872b76/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 4823390..0554c04 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -59,6 +59,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainer
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
+
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -72,7 +74,7 @@ import com.google.common.collect.Multiset;
  */
 @Private
 @Unstable
-public class SchedulerApplicationAttempt {
+public class SchedulerApplicationAttempt implements SchedulableEntity {
   
   private static final Log LOG = LogFactory
     .getLog(SchedulerApplicationAttempt.class);
@@ -710,4 +712,24 @@ public class SchedulerApplicationAttempt {
   public ResourceUsage getAppAttemptResourceUsage() {
     return this.attemptResourceUsage;
   }
+  
+  @Override
+  public String getId() {
+    return getApplicationId().toString();
+  }
+  
+  @Override
+  public int compareInputOrderTo(SchedulableEntity other) {
+    if (other instanceof SchedulerApplicationAttempt) {
+      return getApplicationId().compareTo(
+        ((SchedulerApplicationAttempt)other).getApplicationId());
+    }
+    return 1;//let other types go before this, if any
+  }
+  
+  @Override
+  public synchronized ResourceUsage getSchedulingResourceUsage() {
+    return attemptResourceUsage;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/44872b76/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 4e8d617..c9e83a1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -48,6 +48,9 @@ import 
org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
+
+
 import com.google.common.collect.ImmutableSet;
 
 public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfiguration {
@@ -116,7 +119,11 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
   @Private
   public static final String MAXIMUM_ALLOCATION_VCORES =
       "maximum-allocation-vcores";
-
+  
+  public static final String ORDERING_POLICY = "ordering-policy";
+  
+  public static final String DEFAULT_ORDERING_POLICY = "fifo";
+  
   @Private
   public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
   
@@ -378,6 +385,28 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
         DEFAULT_USER_LIMIT);
     return userLimit;
   }
+  
+  @SuppressWarnings("unchecked")
+  public <S extends SchedulableEntity> OrderingPolicy<S> getOrderingPolicy(
+      String queue) {
+  
+    String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY, 
+      DEFAULT_ORDERING_POLICY);
+    
+    OrderingPolicy<S> orderingPolicy;
+    
+    if (policyType.trim().equals("fifo")) {
+       policyType = FifoOrderingPolicy.class.getName();
+    }
+    try {
+      orderingPolicy = (OrderingPolicy<S>)
+        Class.forName(policyType).newInstance();
+    } catch (Exception e) {
+      String message = "Unable to construct ordering policy for: " + 
policyType + ", " + e.getMessage();
+      throw new RuntimeException(message, e);
+    }
+    return orderingPolicy;
+  }
 
   public void setUserLimit(String queue, int userLimit) {
     setInt(getQueuePrefix(queue) + USER_LIMIT, userLimit);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/44872b76/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index f860574..452d4a8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -68,6 +68,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -93,7 +94,6 @@ public class LeafQueue extends AbstractCSQueue {
   
   private int nodeLocalityDelay;
 
-  Set<FiCaSchedulerApp> activeApplications;
   Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap = 
       new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
   
@@ -121,6 +121,9 @@ public class LeafQueue extends AbstractCSQueue {
   
   private volatile ResourceLimits currentResourceLimits = null;
   
+  private OrderingPolicy<FiCaSchedulerApp> 
+    orderingPolicy = new FifoOrderingPolicy<FiCaSchedulerApp>();
+  
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
@@ -137,7 +140,6 @@ public class LeafQueue extends AbstractCSQueue {
         cs.getApplicationComparator();
     this.pendingApplications = 
         new TreeSet<FiCaSchedulerApp>(applicationComparator);
-    this.activeApplications = new 
TreeSet<FiCaSchedulerApp>(applicationComparator);
     
     setupQueueConfigs(cs.getClusterResource());
   }
@@ -159,6 +161,9 @@ public class LeafQueue extends AbstractCSQueue {
     setQueueResourceLimitsInfo(clusterResource);
 
     CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+    
+    
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
+    
     userLimit = conf.getUserLimit(getQueuePath());
     userLimitFactor = conf.getUserLimitFactor(getQueuePath());
 
@@ -322,7 +327,7 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   public synchronized int getNumActiveApplications() {
-    return activeApplications.size();
+    return orderingPolicy.getNumSchedulableEntities();
   }
 
   @Private
@@ -637,7 +642,7 @@ public class LeafQueue extends AbstractCSQueue {
         }
       }
       user.activateApplication();
-      activeApplications.add(application);
+      orderingPolicy.addSchedulableEntity(application);
       queueUsage.incAMUsed(application.getAMResource());
       user.getResourceUsage().incAMUsed(application.getAMResource());
       i.remove();
@@ -686,7 +691,8 @@ public class LeafQueue extends AbstractCSQueue {
 
   public synchronized void removeApplicationAttempt(
       FiCaSchedulerApp application, User user) {
-    boolean wasActive = activeApplications.remove(application);
+    boolean wasActive =
+      orderingPolicy.removeSchedulableEntity(application);
     if (!wasActive) {
       pendingApplications.remove(application);
     } else {
@@ -727,7 +733,8 @@ public class LeafQueue extends AbstractCSQueue {
     
     if(LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
-        + " #applications=" + activeApplications.size());
+        + " #applications=" + 
+        orderingPolicy.getNumSchedulableEntities());
     }
     
     // Check for reserved resources
@@ -759,9 +766,10 @@ public class LeafQueue extends AbstractCSQueue {
       return NULL_ASSIGNMENT;
     }
     
-    // Try to assign containers to applications in order
-    for (FiCaSchedulerApp application : activeApplications) {
-      
+    for (Iterator<FiCaSchedulerApp> assignmentIterator =
+        orderingPolicy.getAssignmentIterator();
+        assignmentIterator.hasNext();) {
+      FiCaSchedulerApp application = assignmentIterator.next();
       if(LOG.isDebugEnabled()) {
         LOG.debug("pre-assignContainers for application "
         + application.getApplicationId());
@@ -1606,6 +1614,9 @@ public class LeafQueue extends AbstractCSQueue {
 
       // Inform the node
       node.allocateContainer(allocatedContainer);
+            
+      // Inform the ordering policy
+      orderingPolicy.containerAllocated(application, allocatedContainer);
 
       LOG.info("assignedContainer" +
           " application attempt=" + application.getApplicationAttemptId() +
@@ -1715,11 +1726,16 @@ public class LeafQueue extends AbstractCSQueue {
           removed =
               application.containerCompleted(rmContainer, containerStatus,
                   event, node.getPartition());
+          
           node.releaseContainer(container);
         }
 
         // Book-keeping
         if (removed) {
+          
+          // Inform the ordering policy
+          orderingPolicy.containerReleased(application, rmContainer);
+          
           releaseResource(clusterResource, application,
               container.getResource(), node.getPartition());
           LOG.info("completedContainer" +
@@ -1822,7 +1838,8 @@ public class LeafQueue extends AbstractCSQueue {
     activateApplications();
 
     // Update application properties
-    for (FiCaSchedulerApp application : activeApplications) {
+    for (FiCaSchedulerApp application :
+      orderingPolicy.getSchedulableEntities()) {
       synchronized (application) {
         computeUserLimitAndSetHeadroom(application, clusterResource,
             Resources.none(), RMNodeLabelsManager.NO_LABEL,
@@ -1916,19 +1933,19 @@ public class LeafQueue extends AbstractCSQueue {
     }
     getParent().recoverContainer(clusterResource, attempt, rmContainer);
   }
-
+  
   /**
    * Obtain (read-only) collection of active applications.
    */
-  public Set<FiCaSchedulerApp> getApplications() {
-    // need to access the list of apps from the preemption monitor
-    return activeApplications;
+  public Collection<FiCaSchedulerApp> getApplications() {
+    return orderingPolicy.getSchedulableEntities();
   }
 
   // return a single Resource capturing the overal amount of pending resources
   public synchronized Resource getTotalResourcePending() {
     Resource ret = BuilderUtils.newResource(0, 0);
-    for (FiCaSchedulerApp f : activeApplications) {
+    for (FiCaSchedulerApp f : 
+      orderingPolicy.getSchedulableEntities()) {
       Resources.addTo(ret, f.getTotalPendingRequests());
     }
     return ret;
@@ -1940,7 +1957,8 @@ public class LeafQueue extends AbstractCSQueue {
     for (FiCaSchedulerApp pendingApp : pendingApplications) {
       apps.add(pendingApp.getApplicationAttemptId());
     }
-    for (FiCaSchedulerApp app : activeApplications) {
+    for (FiCaSchedulerApp app : 
+      orderingPolicy.getSchedulableEntities()) {
       apps.add(app.getApplicationAttemptId());
     }
   }
@@ -1993,6 +2011,19 @@ public class LeafQueue extends AbstractCSQueue {
     this.maxApplications = maxApplications;
   }
   
+  public synchronized OrderingPolicy<FiCaSchedulerApp>
+      getOrderingPolicy() {
+    return orderingPolicy;
+  }
+  
+  public synchronized void setOrderingPolicy(
+      OrderingPolicy<FiCaSchedulerApp> orderingPolicy) {
+   orderingPolicy.addAllSchedulableEntities(
+     this.orderingPolicy.getSchedulableEntities()
+     );
+    this.orderingPolicy = orderingPolicy;
+  }
+  
   /*
    * Holds shared values used by all applications in
    * the queue to calculate headroom on demand

http://git-wip-us.apache.org/repos/asf/hadoop/blob/44872b76/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
index 5b32b3d..e046fcf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
@@ -68,15 +68,6 @@ public abstract class AbstractComparatorOrderingPolicy<S 
extends SchedulableEnti
     schedulableEntities.add(schedulableEntity);
   }
   
-  public void setComparator(Comparator<SchedulableEntity> comparator) {
-    this.comparator = comparator;
-    TreeSet<S> schedulableEntities = new TreeSet<S>(comparator);
-    if (this.schedulableEntities != null) {
-      schedulableEntities.addAll(this.schedulableEntities);
-    }
-    this.schedulableEntities = schedulableEntities;
-  }
-  
   @VisibleForTesting
   public Comparator<SchedulableEntity> getComparator() {
     return comparator; 
@@ -103,7 +94,7 @@ public abstract class AbstractComparatorOrderingPolicy<S 
extends SchedulableEnti
   }
   
   @Override
-  public abstract void configure(String conf);
+  public abstract void configure(Map<String, String> conf);
   
   @Override
   public abstract void containerAllocated(S schedulableEntity, 
@@ -114,6 +105,6 @@ public abstract class AbstractComparatorOrderingPolicy<S 
extends SchedulableEnti
     RMContainer r);
   
   @Override
-  public abstract String getStatusMessage();
+  public abstract String getInfo();
   
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/44872b76/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
index 8a2f5ba..932a5f9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
@@ -28,11 +28,12 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
 public class FifoOrderingPolicy<S extends SchedulableEntity> extends 
AbstractComparatorOrderingPolicy<S> {
   
   public FifoOrderingPolicy() {
-    setComparator(new FifoComparator());
+    this.comparator = new FifoComparator();
+    this.schedulableEntities = new TreeSet<S>(comparator);
   }
   
   @Override
-  public void configure(String conf) {
+  public void configure(Map<String, String> conf) {
     
   }
   
@@ -47,7 +48,7 @@ public class FifoOrderingPolicy<S extends SchedulableEntity> 
extends AbstractCom
     }
   
   @Override
-  public String getStatusMessage() {
+  public String getInfo() {
     return "FifoOrderingPolicy";
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/44872b76/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
index f907cea..aebdcde 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
@@ -83,7 +83,7 @@ public interface OrderingPolicy<S extends SchedulableEntity> {
    * Provides configuration information for the policy from the scheduler
    * configuration
    */
-  public void configure(String conf);
+  public void configure(Map<String, String> conf);
   
   /**
    * The passed SchedulableEntity has been allocated the passed Container,
@@ -104,6 +104,6 @@ public interface OrderingPolicy<S extends 
SchedulableEntity> {
   /**
    * Display information regarding configuration & status
    */
-  public String getStatusMessage();
+  public String getInfo();
   
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/44872b76/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
index 6c34d99..2eeda66 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
@@ -94,6 +94,7 @@ class CapacitySchedulerPage extends RmView {
           _("Configured Minimum User Limit Percent:", 
Integer.toString(lqinfo.getUserLimit()) + "%").
           _("Configured User Limit Factor:", String.format("%.1f", 
lqinfo.getUserLimitFactor())).
           _("Accessible Node Labels:", StringUtils.join(",", 
lqinfo.getNodeLabels())).
+          _("Ordering Policy: ", lqinfo.getOrderingPolicyInfo()).
           _("Preemption:", lqinfo.getPreemptionDisabled() ? "disabled" : 
"enabled");
 
       html._(InfoBlock.class);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/44872b76/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
index 5258b3d..dc61078 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
@@ -20,6 +20,7 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
 
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 
@@ -39,6 +40,9 @@ public class CapacitySchedulerLeafQueueInfo extends 
CapacitySchedulerQueueInfo {
   protected ResourceInfo usedAMResource;
   protected ResourceInfo userAMResourceLimit;
   protected boolean preemptionDisabled;
+  
+  @XmlTransient
+  protected String orderingPolicyInfo;
 
   CapacitySchedulerLeafQueueInfo() {
   };
@@ -57,6 +61,7 @@ public class CapacitySchedulerLeafQueueInfo extends 
CapacitySchedulerQueueInfo {
     usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());
     userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit());
     preemptionDisabled = q.getPreemptionDisabled();
+    orderingPolicyInfo = q.getOrderingPolicy().getInfo();
   }
 
   public int getNumActiveApplications() {
@@ -107,4 +112,8 @@ public class CapacitySchedulerLeafQueueInfo extends 
CapacitySchedulerQueueInfo {
   public boolean getPreemptionDisabled() {
     return preemptionDisabled;
   }
+  
+  public String getOrderingPolicyInfo() {
+    return orderingPolicyInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/44872b76/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 8f5237e..9e8b769 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -25,6 +25,7 @@ import static 
org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
 import static 
org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
 import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
 import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -38,6 +39,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer; 
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -46,6 +49,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Random;
@@ -1032,7 +1036,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(lq.getTotalResourcePending()).thenReturn(
         Resource.newInstance(pending[i], 0));
     // consider moving where CapacityScheduler::comparator accessible
-    NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
+    final NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
       new Comparator<FiCaSchedulerApp>() {
         @Override
         public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
@@ -1056,6 +1060,14 @@ public class TestProportionalCapacityPreemptionPolicy {
               .thenReturn(appAttemptIdList);
     }
     when(lq.getApplications()).thenReturn(qApps);
+    @SuppressWarnings("unchecked")
+    OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
+    when(so.getPreemptionIterator()).thenAnswer(new Answer() {
+     public Object answer(InvocationOnMock invocation) {
+         return qApps.descendingIterator();
+       }
+     });
+    when(lq.getOrderingPolicy()).thenReturn(so);
     if(setAMResourcePercent != 0.0f){
       
when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/44872b76/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index a41fdfa..929b3e1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -155,6 +155,7 @@ public class TestApplicationLimits {
     doReturn(applicationAttemptId). 
when(application).getApplicationAttemptId();
     doReturn(user).when(application).getUser();
     doReturn(amResource).when(application).getAMResource();
+    
when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod();
 
     return application;
   }
   
@@ -469,7 +470,7 @@ public class TestApplicationLimits {
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumActiveApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
-    assertTrue(queue.activeApplications.contains(app_0));
+    assertTrue(queue.getApplications().contains(app_0));
 
     // Submit second application
     FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
@@ -479,7 +480,7 @@ public class TestApplicationLimits {
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
-    assertTrue(queue.activeApplications.contains(app_1));
+    assertTrue(queue.getApplications().contains(app_1));
 
     // Submit third application, should remain pending
     FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0,
@@ -508,7 +509,7 @@ public class TestApplicationLimits {
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
     assertFalse(queue.pendingApplications.contains(app_2));
-    assertFalse(queue.activeApplications.contains(app_2));
+    assertFalse(queue.getApplications().contains(app_2));
 
     // Finish 1st application, app_3 should become active
     queue.finishApplicationAttempt(app_0, A);
@@ -516,9 +517,9 @@ public class TestApplicationLimits {
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
-    assertTrue(queue.activeApplications.contains(app_3));
+    assertTrue(queue.getApplications().contains(app_3));
     assertFalse(queue.pendingApplications.contains(app_3));
-    assertFalse(queue.activeApplications.contains(app_0));
+    assertFalse(queue.getApplications().contains(app_0));
 
     // Finish 2nd application
     queue.finishApplicationAttempt(app_1, A);
@@ -526,7 +527,7 @@ public class TestApplicationLimits {
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumActiveApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
-    assertFalse(queue.activeApplications.contains(app_1));
+    assertFalse(queue.getApplications().contains(app_1));
 
     // Finish 4th application
     queue.finishApplicationAttempt(app_3, A);
@@ -534,7 +535,7 @@ public class TestApplicationLimits {
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(0, queue.getNumActiveApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
-    assertFalse(queue.activeApplications.contains(app_3));
+    assertFalse(queue.getApplications().contains(app_3));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/44872b76/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 0a19604..33b8f56 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -73,6 +73,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -381,6 +384,20 @@ public class TestLeafQueue {
     d.submitApplicationAttempt(app_1, user_d); // same user
   }
 
+  @Test
+  public void testPolicyConfiguration() throws Exception {
+    
+    CapacitySchedulerConfiguration testConf = 
+        new CapacitySchedulerConfiguration();
+    
+    String tproot = CapacitySchedulerConfiguration.ROOT + "." + 
+      "testPolicyRoot" + System.currentTimeMillis();
+
+    OrderingPolicy<FiCaSchedulerApp> comPol =    
+      testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot);
+    
+    
+  }
 
   @Test
   public void testAppAttemptMetrics() throws Exception {
@@ -2011,7 +2028,7 @@ public class TestLeafQueue {
     e.submitApplicationAttempt(app_2, user_e);  // same user
 
     // before reinitialization
-    assertEquals(2, e.activeApplications.size());
+    assertEquals(2, e.getNumActiveApplications());
     assertEquals(1, e.pendingApplications.size());
 
     csConf.setDouble(CapacitySchedulerConfiguration
@@ -2028,7 +2045,7 @@ public class TestLeafQueue {
     root.reinitialize(newRoot, csContext.getClusterResource());
 
     // after reinitialization
-    assertEquals(3, e.activeApplications.size());
+    assertEquals(3, e.getNumActiveApplications());
     assertEquals(0, e.pendingApplications.size());
   }
   
@@ -2092,7 +2109,7 @@ public class TestLeafQueue {
     e.submitApplicationAttempt(app_2, user_e);  // same user
 
     // before updating cluster resource
-    assertEquals(2, e.activeApplications.size());
+    assertEquals(2, e.getNumActiveApplications());
     assertEquals(1, e.pendingApplications.size());
 
     Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 
32); 
@@ -2100,7 +2117,7 @@ public class TestLeafQueue {
         new ResourceLimits(clusterResource));
 
     // after updating cluster resource
-    assertEquals(3, e.activeApplications.size());
+    assertEquals(3, e.getNumActiveApplications());
     assertEquals(0, e.pendingApplications.size());
   }
 
@@ -2451,6 +2468,83 @@ public class TestLeafQueue {
           + "forget to set off-switch request should be handled");
     }
   }
+  
+    @Test
+  public void testFifoAssignment() throws Exception {
+
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+    
+    a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>()); 
+    
+    String host_0_0 = "127.0.0.1";
+    String rack_0 = "rack_0";
+    FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 
16*GB);
+    
+    final int numNodes = 4;
+    Resource clusterResource = Resources.createResource(
+        numNodes * (16*GB), numNodes * 16);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    String user_0 = "user_0";
+    
+    final ApplicationAttemptId appAttemptId_0 = 
+        TestUtils.getMockApplicationAttemptId(0, 0); 
+    FiCaSchedulerApp app_0 = 
+        spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), spyRMContext));
+    a.submitApplicationAttempt(app_0, user_0);
+    
+    final ApplicationAttemptId appAttemptId_1 = 
+        TestUtils.getMockApplicationAttemptId(1, 0); 
+    FiCaSchedulerApp app_1 = 
+        spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
+            mock(ActiveUsersManager.class), spyRMContext));
+    a.submitApplicationAttempt(app_1, user_0);
+ 
+    Priority priority = TestUtils.createMockPriority(1);
+    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
+    List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>();
+    
+    app_0_requests_0.clear();
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, 
+            true, priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+    
+    app_1_requests_0.clear();
+    app_1_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_1.updateResourceRequests(app_1_requests_0);
+    
+    a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+    
+    app_0_requests_0.clear();
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+    
+    app_1_requests_0.clear();
+    app_1_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_1.updateResourceRequests(app_1_requests_0);
+    
+    //Even thought it already has more resources, app_0 will still get 
+    //assigned first
+    a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
+    Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+    
+    //and only then will app_1
+    a.assignContainers(clusterResource, node_0_0, new 
ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
+
+  }
 
   @Test
   public void testConcurrentAccess() throws Exception {

Reply via email to