YARN-3361. CapacityScheduler side changes to support non-exclusive node labels. 
Contributed by Wangda Tan
(cherry picked from commit 0fefda645bca935b87b6bb8ca63e6f18340d59f5)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9ebbf1bf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9ebbf1bf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9ebbf1bf

Branch: refs/heads/branch-2
Commit: 9ebbf1bfcea9942117727c08c6905dd444c230ae
Parents: 81bbee6
Author: Jian He <jia...@apache.org>
Authored: Tue Apr 14 11:36:37 2015 -0700
Committer: Jian He <jia...@apache.org>
Committed: Tue Apr 14 11:46:35 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |    3 +
 .../hadoop/yarn/server/utils/BuilderUtils.java  |    1 +
 .../rmapp/attempt/RMAppAttemptImpl.java         |   10 +-
 .../scheduler/AppSchedulingInfo.java            |   25 +-
 .../scheduler/ResourceUsage.java                |    8 +
 .../scheduler/SchedulerApplicationAttempt.java  |   44 +-
 .../scheduler/SchedulerUtils.java               |   87 +-
 .../scheduler/capacity/AbstractCSQueue.java     |  243 +++--
 .../scheduler/capacity/CSQueue.java             |    5 +-
 .../scheduler/capacity/CapacityScheduler.java   |   91 +-
 .../CapacitySchedulerConfiguration.java         |    5 +
 .../scheduler/capacity/LeafQueue.java           |  368 ++++---
 .../scheduler/capacity/ParentQueue.java         |   59 +-
 .../scheduler/capacity/SchedulingMode.java      |   44 +
 .../server/resourcemanager/Application.java     |    4 +
 .../yarn/server/resourcemanager/MockAM.java     |    8 +-
 .../yarn/server/resourcemanager/MockRM.java     |   35 +-
 .../capacity/TestApplicationLimits.java         |    8 +-
 .../scheduler/capacity/TestChildQueueOrder.java |   41 +-
 .../capacity/TestContainerAllocation.java       |  390 +------
 .../scheduler/capacity/TestLeafQueue.java       |  148 +--
 .../TestNodeLabelContainerAllocation.java       | 1027 ++++++++++++++++++
 .../scheduler/capacity/TestParentQueue.java     |  111 +-
 .../scheduler/capacity/TestReservations.java    |  101 +-
 .../scheduler/capacity/TestUtils.java           |    2 +
 25 files changed, 1914 insertions(+), 954 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 478d0ae..059c5a3 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -24,6 +24,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3443. Create a 'ResourceHandler' subsystem to ease addition of 
support 
     for new resource types on the NM. (Sidharta Seethana via junping_du)
 
+    YARN-3361. CapacityScheduler side changes to support non-exclusive node
+    labels. (Wangda Tan via jianhe)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 68d4ef9..f2146c8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -313,6 +313,7 @@ public class BuilderUtils {
     request.setResourceName(r.getResourceName());
     request.setCapability(r.getCapability());
     request.setNumContainers(r.getNumContainers());
+    request.setNodeLabelExpression(r.getNodeLabelExpression());
     return request;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 1be1727..1071831 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -146,7 +146,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
   private ConcurrentMap<NodeId, List<ContainerStatus>>
       finishedContainersSentToAM =
       new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
-  private Container masterContainer;
+  private volatile Container masterContainer;
 
   private float progress = 0;
   private String host = "N/A";
@@ -762,13 +762,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, 
Recoverable {
 
   @Override
   public Container getMasterContainer() {
-    this.readLock.lock();
-
-    try {
-      return this.masterContainer;
-    } finally {
-      this.readLock.unlock();
-    }
+    return this.masterContainer;
   }
 
   @InterfaceAudience.Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 5521d47..5604f0f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -73,10 +73,11 @@ public class AppSchedulingInfo {
   /* Allocated by scheduler */
   boolean pending = true; // for app metrics
   
+  private ResourceUsage appResourceUsage;
  
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
       String user, Queue queue, ActiveUsersManager activeUsersManager,
-      long epoch) {
+      long epoch, ResourceUsage appResourceUsage) {
     this.applicationAttemptId = appAttemptId;
     this.applicationId = appAttemptId.getApplicationId();
     this.queue = queue;
@@ -84,6 +85,7 @@ public class AppSchedulingInfo {
     this.user = user;
     this.activeUsersManager = activeUsersManager;
     this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT);
+    this.appResourceUsage = appResourceUsage;
   }
 
   public ApplicationId getApplicationId() {
@@ -191,13 +193,19 @@ public class AppSchedulingInfo {
             lastRequestCapability);
         
         // update queue:
+        Resource increasedResource = 
Resources.multiply(request.getCapability(),
+            request.getNumContainers());
         queue.incPendingResource(
             request.getNodeLabelExpression(),
-            Resources.multiply(request.getCapability(),
-                request.getNumContainers()));
+            increasedResource);
+        appResourceUsage.incPending(request.getNodeLabelExpression(), 
increasedResource);
         if (lastRequest != null) {
+          Resource decreasedResource =
+              Resources.multiply(lastRequestCapability, lastRequestContainers);
           queue.decPendingResource(lastRequest.getNodeLabelExpression(),
-              Resources.multiply(lastRequestCapability, 
lastRequestContainers));
+              decreasedResource);
+          appResourceUsage.decPending(lastRequest.getNodeLabelExpression(),
+              decreasedResource);
         }
       }
     }
@@ -385,6 +393,8 @@ public class AppSchedulingInfo {
       checkForDeactivation();
     }
     
+    appResourceUsage.decPending(offSwitchRequest.getNodeLabelExpression(),
+        offSwitchRequest.getCapability());
     queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(),
         offSwitchRequest.getCapability());
   }
@@ -492,9 +502,10 @@ public class AppSchedulingInfo {
   }
   
   public ResourceRequest cloneResourceRequest(ResourceRequest request) {
-    ResourceRequest newRequest = ResourceRequest.newInstance(
-        request.getPriority(), request.getResourceName(),
-        request.getCapability(), 1, request.getRelaxLocality());
+    ResourceRequest newRequest =
+        ResourceRequest.newInstance(request.getPriority(),
+            request.getResourceName(), request.getCapability(), 1,
+            request.getRelaxLocality(), request.getNodeLabelExpression());
     return newRequest;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
index 36ee4da..5169b78 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
@@ -27,6 +27,7 @@ import 
java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -250,6 +251,10 @@ public class ResourceUsage {
   }
 
   private Resource _get(String label, ResourceType type) {
+    if (label == null) {
+      label = RMNodeLabelsManager.NO_LABEL;
+    }
+    
     try {
       readLock.lock();
       UsageByLabel usage = usages.get(label);
@@ -263,6 +268,9 @@ public class ResourceUsage {
   }
 
   private UsageByLabel getAndAddIfMissing(String label) {
+    if (label == null) {
+      label = RMNodeLabelsManager.NO_LABEL;
+    }
     if (!usages.containsKey(label)) {
       UsageByLabel u = new UsageByLabel(label);
       usages.put(label, u);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 5e0bbc7..fccf766 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -56,6 +56,8 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.base.Preconditions;
@@ -108,14 +110,24 @@ public class SchedulerApplicationAttempt {
   private Set<ContainerId> pendingRelease = null;
 
   /**
-   * Count how many times the application has been given an opportunity
-   * to schedule a task at each priority. Each time the scheduler
-   * asks the application for a task at this priority, it is incremented,
-   * and each time the application successfully schedules a task, it
+   * Count how many times the application has been given an opportunity to
+   * schedule a task at each priority. Each time the scheduler asks the
+   * application for a task at this priority, it is incremented, and each time
+   * the application successfully schedules a task (at rack or node local), it
    * is reset to 0.
    */
   Multiset<Priority> schedulingOpportunities = HashMultiset.create();
   
+  /**
+   * Count how many times the application has been given an opportunity to
+   * schedule a non-partitioned resource request at each priority. Each time 
the
+   * scheduler asks the application for a task at this priority, it is
+   * incremented, and each time the application successfully schedules a task,
+   * it is reset to 0 when schedule any task at corresponding priority.
+   */
+  Multiset<Priority> missedNonPartitionedRequestSchedulingOpportunity =
+      HashMultiset.create();
+  
   // Time of the last container scheduled at the current allowed level
   protected Map<Priority, Long> lastScheduledContainer =
       new HashMap<Priority, Long>();
@@ -132,7 +144,7 @@ public class SchedulerApplicationAttempt {
     this.rmContext = rmContext;
     this.appSchedulingInfo = 
         new AppSchedulingInfo(applicationAttemptId, user, queue,  
-            activeUsersManager, rmContext.getEpoch());
+            activeUsersManager, rmContext.getEpoch(), attemptResourceUsage);
     this.queue = queue;
     this.pendingRelease = new HashSet<ContainerId>();
     this.attemptId = applicationAttemptId;
@@ -489,6 +501,18 @@ public class SchedulerApplicationAttempt {
     return this.appSchedulingInfo.isBlacklisted(resourceName);
   }
 
+  public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity(
+      Priority priority) {
+    missedNonPartitionedRequestSchedulingOpportunity.add(priority);
+    return missedNonPartitionedRequestSchedulingOpportunity.count(priority);
+  }
+
+  public synchronized void
+      resetMissedNonPartitionedRequestSchedulingOpportunity(Priority priority) 
{
+    missedNonPartitionedRequestSchedulingOpportunity.setCount(priority, 0);
+  }
+
+  
   public synchronized void addSchedulingOpportunity(Priority priority) {
     schedulingOpportunities.setCount(priority,
         schedulingOpportunities.count(priority) + 1);
@@ -518,6 +542,7 @@ public class SchedulerApplicationAttempt {
   public synchronized void resetSchedulingOpportunities(Priority priority) {
     resetSchedulingOpportunities(priority, System.currentTimeMillis());
   }
+
   // used for continuous scheduling
   public synchronized void resetSchedulingOpportunities(Priority priority,
       long currentTimeMs) {
@@ -669,4 +694,13 @@ public class SchedulerApplicationAttempt {
   public Set<String> getBlacklistedNodes() {
     return this.appSchedulingInfo.getBlackListCopy();
   }
+  
+  @Private
+  public boolean hasPendingResourceRequest(ResourceCalculator rc,
+      String nodePartition, Resource cluster,
+      SchedulingMode schedulingMode) {
+    return SchedulerUtils.hasPendingResourceRequest(rc,
+        this.attemptResourceUsage, nodePartition, cluster,
+        schedulingMode);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 248cc08..7a1a528 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -37,11 +37,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AccessType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.common.collect.Sets;
-
 /**
  * Utilities shared by schedulers. 
  */
@@ -235,9 +234,13 @@ public class SchedulerUtils {
     if (labelExp == null && queueInfo != null
         && ResourceRequest.ANY.equals(resReq.getResourceName())) {
       labelExp = queueInfo.getDefaultNodeLabelExpression();
-      resReq.setNodeLabelExpression(labelExp);
     }
     
+    // If labelExp still equals to null, set it to be NO_LABEL
+    resReq
+        .setNodeLabelExpression(labelExp == null ? RMNodeLabelsManager.NO_LABEL
+            : labelExp);
+    
     // we don't allow specify label expression other than resourceName=ANY now
     if (!ResourceRequest.ANY.equals(resReq.getResourceName())
         && labelExp != null && !labelExp.trim().isEmpty()) {
@@ -273,25 +276,6 @@ public class SchedulerUtils {
     }
   }
   
-  public static boolean checkQueueAccessToNode(Set<String> queueLabels,
-      Set<String> nodeLabels) {
-    // if queue's label is *, it can access any node
-    if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) {
-      return true;
-    }
-    // any queue can access to a node without label
-    if (nodeLabels == null || nodeLabels.isEmpty()) {
-      return true;
-    }
-    // a queue can access to a node only if it contains any label of the node
-    if (queueLabels != null
-        && Sets.intersection(queueLabels, nodeLabels).size() > 0) {
-      return true;
-    }
-    // sorry, you cannot access
-    return false;
-  }
-  
   public static void checkIfLabelInClusterNodeLabels(RMNodeLabelsManager mgr,
       Set<String> labels) throws IOException {
     if (mgr == null) {
@@ -311,26 +295,6 @@ public class SchedulerUtils {
       }
     }
   }
-  
-  public static boolean checkNodeLabelExpression(Set<String> nodeLabels,
-      String labelExpression) {
-    // empty label expression can only allocate on node with empty labels
-    if (labelExpression == null || labelExpression.trim().isEmpty()) {
-      if (!nodeLabels.isEmpty()) {
-        return false;
-      }
-    }
-
-    if (labelExpression != null) {
-      for (String str : labelExpression.split("&&")) {
-        if (!str.trim().isEmpty()
-            && (nodeLabels == null || !nodeLabels.contains(str.trim()))) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
 
   public static boolean checkQueueLabelExpression(Set<String> queueLabels,
       String labelExpression) {
@@ -360,4 +324,43 @@ public class SchedulerUtils {
     }
     return null;
   }
+  
+  public static boolean checkResourceRequestMatchingNodePartition(
+      ResourceRequest offswitchResourceRequest, String nodePartition,
+      SchedulingMode schedulingMode) {
+    // We will only look at node label = nodeLabelToLookAt according to
+    // schedulingMode and partition of node.
+    String nodePartitionToLookAt = null;
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+      nodePartitionToLookAt = nodePartition;
+    } else {
+      nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
+    }
+    
+    String askedNodePartition = 
offswitchResourceRequest.getNodeLabelExpression();
+    if (null == askedNodePartition) {
+      askedNodePartition = RMNodeLabelsManager.NO_LABEL;
+    }
+    return askedNodePartition.equals(nodePartitionToLookAt);
+  }
+  
+  private static boolean hasPendingResourceRequest(ResourceCalculator rc,
+      ResourceUsage usage, String partitionToLookAt, Resource cluster) {
+    if (Resources.greaterThan(rc, cluster,
+        usage.getPending(partitionToLookAt), Resources.none())) {
+      return true;
+    }
+    return false;
+  }
+
+  @Private
+  public static boolean hasPendingResourceRequest(ResourceCalculator rc,
+      ResourceUsage usage, String nodePartition, Resource cluster,
+      SchedulingMode schedulingMode) {
+    String partitionToLookAt = nodePartition;
+    if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+      partitionToLookAt = RMNodeLabelsManager.NO_LABEL;
+    }
+    return hasPendingResourceRequest(rc, usage, partitionToLookAt, cluster);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 42ea089..d95c45c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -20,7 +20,6 @@ package 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -38,12 +37,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.security.PrivilegedEntity;
 import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
@@ -56,6 +55,11 @@ import com.google.common.collect.Sets;
 public abstract class AbstractCSQueue implements CSQueue {
   private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
   
+  static final CSAssignment NULL_ASSIGNMENT =
+      new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
+  
+  static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
+  
   CSQueue parent;
   final String queueName;
   volatile int numContainers;
@@ -343,16 +347,8 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
   
   synchronized void allocateResource(Resource clusterResource, 
-      Resource resource, Set<String> nodeLabels) {
-    
-    // Update usedResources by labels
-    if (nodeLabels == null || nodeLabels.isEmpty()) {
-      queueUsage.incUsed(resource);
-    } else {
-      for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
-        queueUsage.incUsed(label, resource);
-      }
-    }
+      Resource resource, String nodePartition) {
+    queueUsage.incUsed(nodePartition, resource);
 
     ++numContainers;
     CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
@@ -360,15 +356,8 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
   
   protected synchronized void releaseResource(Resource clusterResource,
-      Resource resource, Set<String> nodeLabels) {
-    // Update usedResources by labels
-    if (null == nodeLabels || nodeLabels.isEmpty()) {
-      queueUsage.decUsed(resource);
-    } else {
-      for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
-        queueUsage.decUsed(label, resource);
-      }
-    }
+      Resource resource, String nodePartition) {
+    queueUsage.decUsed(nodePartition, resource);
 
     CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
         clusterResource, minimumAllocation);
@@ -434,103 +423,108 @@ public abstract class AbstractCSQueue implements 
CSQueue {
                                         parentQ.getPreemptionDisabled());
   }
   
-  private Resource getCurrentLimitResource(String nodeLabel,
-      Resource clusterResource, ResourceLimits currentResourceLimits) {
-    /*
-     * Current limit resource: For labeled resource: limit = queue-max-resource
-     * (TODO, this part need update when we support labeled-limit) For
-     * non-labeled resource: limit = min(queue-max-resource,
-     * limit-set-by-parent)
-     */
-    Resource queueMaxResource =
-        Resources.multiplyAndNormalizeDown(resourceCalculator,
-            labelManager.getResourceByLabel(nodeLabel, clusterResource),
-            queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), 
minimumAllocation);
-    if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) {
-      return Resources.min(resourceCalculator, clusterResource,
-          queueMaxResource, currentResourceLimits.getLimit());
+  private Resource getCurrentLimitResource(String nodePartition,
+      Resource clusterResource, ResourceLimits currentResourceLimits,
+      SchedulingMode schedulingMode) {
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+      /*
+       * Current limit resource: For labeled resource: limit = 
queue-max-resource
+       * (TODO, this part need update when we support labeled-limit) For
+       * non-labeled resource: limit = min(queue-max-resource,
+       * limit-set-by-parent)
+       */
+      Resource queueMaxResource =
+          Resources.multiplyAndNormalizeDown(resourceCalculator,
+              labelManager.getResourceByLabel(nodePartition, clusterResource),
+              queueCapacities.getAbsoluteMaximumCapacity(nodePartition), 
minimumAllocation);
+      if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+        return Resources.min(resourceCalculator, clusterResource,
+            queueMaxResource, currentResourceLimits.getLimit());
+      }
+      return queueMaxResource;  
+    } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+      // When we doing non-exclusive resource allocation, maximum capacity of
+      // all queues on this label equals to total resource with the label.
+      return labelManager.getResourceByLabel(nodePartition, clusterResource);
     }
-    return queueMaxResource;
+    
+    return Resources.none();
   }
   
   synchronized boolean canAssignToThisQueue(Resource clusterResource,
-      Set<String> nodeLabels, ResourceLimits currentResourceLimits,
-      Resource nowRequired, Resource resourceCouldBeUnreserved) {
-    // Get label of this queue can access, it's (nodeLabel AND queueLabel)
-    Set<String> labelCanAccess;
-    if (null == nodeLabels || nodeLabels.isEmpty()) {
-      labelCanAccess = new HashSet<String>();
-      // Any queue can always access any node without label
-      labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
-    } else {
-      labelCanAccess = new HashSet<String>(
-          accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
-              : Sets.intersection(accessibleLabels, nodeLabels));
-    }
-    
-    for (String label : labelCanAccess) {
-      // New total resource = used + required
-      Resource newTotalResource =
-          Resources.add(queueUsage.getUsed(label), nowRequired);
-
-      Resource currentLimitResource =
-          getCurrentLimitResource(label, clusterResource, 
currentResourceLimits);
-
-      // if reservation continous looking enabled, check to see if could we
-      // potentially use this node instead of a reserved node if the 
application
-      // has reserved containers.
-      // TODO, now only consider reservation cases when the node has no label
-      if (this.reservationsContinueLooking
-          && label.equals(RMNodeLabelsManager.NO_LABEL)
-          && Resources.greaterThan(resourceCalculator, clusterResource,
-              resourceCouldBeUnreserved, Resources.none())) {
-        // resource-without-reserved = used - reserved
-        Resource newTotalWithoutReservedResource =
-            Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
-        
-        // when total-used-without-reserved-resource < currentLimit, we still
-        // have chance to allocate on this node by unreserving some containers
-        if (Resources.lessThan(resourceCalculator, clusterResource,
-            newTotalWithoutReservedResource, currentLimitResource)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("try to use reserved: " + getQueueName()
-                + " usedResources: " + queueUsage.getUsed()
-                + ", clusterResources: " + clusterResource
-                + ", reservedResources: " + resourceCouldBeUnreserved
-                + ", capacity-without-reserved: "
-                + newTotalWithoutReservedResource + ", maxLimitCapacity: "
-                + currentLimitResource); 
-          }
-          return true;
+      String nodePartition, ResourceLimits currentResourceLimits,
+      Resource nowRequired, Resource resourceCouldBeUnreserved,
+      SchedulingMode schedulingMode) {
+    // New total resource = used + required
+    Resource newTotalResource =
+        Resources.add(queueUsage.getUsed(nodePartition), nowRequired);
+
+    // Get current limited resource: 
+    // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
+    // queues' max capacity.
+    // - When doing IGNORE_PARTITION_EXCLUSIVITY allocation, we will not 
respect
+    // queue's max capacity, queue's max capacity on the partition will be
+    // considered to be 100%. Which is a queue can use all resource in the
+    // partition. 
+    // Doing this because: for non-exclusive allocation, we make sure there's
+    // idle resource on the partition, to avoid wastage, such resource will be
+    // leveraged as much as we can, and preemption policy will reclaim it back
+    // when partitoned-resource-request comes back.  
+    Resource currentLimitResource =
+        getCurrentLimitResource(nodePartition, clusterResource,
+            currentResourceLimits, schedulingMode);
+
+    // if reservation continous looking enabled, check to see if could we
+    // potentially use this node instead of a reserved node if the application
+    // has reserved containers.
+    // TODO, now only consider reservation cases when the node has no label
+    if (this.reservationsContinueLooking
+        && nodePartition.equals(RMNodeLabelsManager.NO_LABEL)
+        && Resources.greaterThan(resourceCalculator, clusterResource,
+            resourceCouldBeUnreserved, Resources.none())) {
+      // resource-without-reserved = used - reserved
+      Resource newTotalWithoutReservedResource =
+          Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
+
+      // when total-used-without-reserved-resource < currentLimit, we still
+      // have chance to allocate on this node by unreserving some containers
+      if (Resources.lessThan(resourceCalculator, clusterResource,
+          newTotalWithoutReservedResource, currentLimitResource)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("try to use reserved: " + getQueueName()
+              + " usedResources: " + queueUsage.getUsed()
+              + ", clusterResources: " + clusterResource
+              + ", reservedResources: " + resourceCouldBeUnreserved
+              + ", capacity-without-reserved: "
+              + newTotalWithoutReservedResource + ", maxLimitCapacity: "
+              + currentLimitResource);
         }
+        return true;
       }
-      
-      // Otherwise, if any of the label of this node beyond queue limit, we
-      // cannot allocate on this node. Consider a small epsilon here.
-      if (Resources.greaterThan(resourceCalculator, clusterResource,
-          newTotalResource, currentLimitResource)) {
-        return false;
-      }
+    }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(getQueueName()
-            + "Check assign to queue, label=" + label
-            + " usedResources: " + queueUsage.getUsed(label)
-            + " clusterResources: " + clusterResource
-            + " currentUsedCapacity "
-            + Resources.divide(resourceCalculator, clusterResource,
-                queueUsage.getUsed(label),
-                labelManager.getResourceByLabel(label, clusterResource))
-            + " max-capacity: "
-            + queueCapacities.getAbsoluteMaximumCapacity(label)
-            + ")");
-      }
-      return true;
+    // Check if we over current-resource-limit computed.
+    if (Resources.greaterThan(resourceCalculator, clusterResource,
+        newTotalResource, currentLimitResource)) {
+      return false;
     }
-    
-    // Actually, this will not happen, since labelCanAccess will be always
-    // non-empty
-    return false;
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(getQueueName()
+          + "Check assign to queue, nodePartition="
+          + nodePartition
+          + " usedResources: "
+          + queueUsage.getUsed(nodePartition)
+          + " clusterResources: "
+          + clusterResource
+          + " currentUsedCapacity "
+          + Resources.divide(resourceCalculator, clusterResource,
+              queueUsage.getUsed(nodePartition),
+              labelManager.getResourceByLabel(nodePartition, clusterResource))
+          + " max-capacity: "
+          + queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")");
+    }
+    return true;
   }
   
   @Override
@@ -556,4 +550,33 @@ public abstract class AbstractCSQueue implements CSQueue {
       parent.decPendingResource(nodeLabel, resourceToDec);
     }
   }
+  
+  /**
+   * Return if the queue has pending resource on given nodePartition and
+   * schedulingMode. 
+   */
+  boolean hasPendingResourceRequest(String nodePartition, 
+      Resource cluster, SchedulingMode schedulingMode) {
+    return SchedulerUtils.hasPendingResourceRequest(resourceCalculator,
+        queueUsage, nodePartition, cluster, schedulingMode);
+  }
+  
+  boolean accessibleToPartition(String nodePartition) {
+    // if queue's label is *, it can access any node
+    if (accessibleLabels != null
+        && accessibleLabels.contains(RMNodeLabelsManager.ANY)) {
+      return true;
+    }
+    // any queue can access to a node without label
+    if (nodePartition == null
+        || nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      return true;
+    }
+    // a queue can access to a node only if it contains any label of the node
+    if (accessibleLabels != null && accessibleLabels.contains(nodePartition)) {
+      return true;
+    }
+    // sorry, you cannot access
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 1a9448a..b06a646 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -190,10 +190,13 @@ extends 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * @param clusterResource the resource of the cluster.
    * @param node node on which resources are available
    * @param resourceLimits how much overall resource of this queue can use. 
+   * @param schedulingMode Type of exclusive check when assign container on a 
+   * NodeManager, see {@link SchedulingMode}.
    * @return the assignment
    */
   public CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, ResourceLimits resourceLimits);
+      FiCaSchedulerNode node, ResourceLimits resourceLimits,
+      SchedulingMode schedulingMode);
   
   /**
    * A container assigned to the queue has completed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e93c529..cfeee37 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -1114,28 +1115,30 @@ public class CapacityScheduler extends
     if (reservedContainer != null) {
       FiCaSchedulerApp reservedApplication =
           getCurrentAttemptForContainer(reservedContainer.getContainerId());
-      
+
       // Try to fulfill the reservation
-      LOG.info("Trying to fulfill reservation for application " + 
-          reservedApplication.getApplicationId() + " on node: " + 
-          node.getNodeID());
-      
-      LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
-      assignment = queue.assignContainers(
+      LOG.info("Trying to fulfill reservation for application "
+          + reservedApplication.getApplicationId() + " on node: "
+          + node.getNodeID());
+
+      LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
+      assignment =
+          queue.assignContainers(
               clusterResource,
               node,
               // TODO, now we only consider limits for parent for non-labeled
               // resources, should consider labeled resources as well.
               new ResourceLimits(labelManager.getResourceByLabel(
-                  RMNodeLabelsManager.NO_LABEL, clusterResource)));
+                  RMNodeLabelsManager.NO_LABEL, clusterResource)),
+              SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
       if (assignment.isFulfilledReservation()) {
         CSAssignment tmp =
             new CSAssignment(reservedContainer.getReservedResource(),
-              assignment.getType());
+                assignment.getType());
         Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
-          reservedContainer.getReservedResource());
+            reservedContainer.getReservedResource());
         tmp.getAssignmentInformation().addAllocationDetails(
-          reservedContainer.getContainerId(), queue.getQueuePath());
+            reservedContainer.getContainerId(), queue.getQueuePath());
         tmp.getAssignmentInformation().incrAllocations();
         updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
         schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
@@ -1143,16 +1146,13 @@ public class CapacityScheduler extends
 
       RMContainer excessReservation = assignment.getExcessReservation();
       if (excessReservation != null) {
-      Container container = excessReservation.getContainer();
-      queue.completedContainer(
-          clusterResource, assignment.getApplication(), node, 
-          excessReservation, 
-          SchedulerUtils.createAbnormalContainerStatus(
-              container.getId(), 
-              SchedulerUtils.UNRESERVED_CONTAINER), 
-          RMContainerEventType.RELEASED, null, true);
+        Container container = excessReservation.getContainer();
+        queue.completedContainer(clusterResource, assignment.getApplication(),
+            node, excessReservation, SchedulerUtils
+                .createAbnormalContainerStatus(container.getId(),
+                    SchedulerUtils.UNRESERVED_CONTAINER),
+            RMContainerEventType.RELEASED, null, true);
       }
-
     }
 
     // Try to schedule more if there are no reservations to fulfill
@@ -1163,22 +1163,61 @@ public class CapacityScheduler extends
           LOG.debug("Trying to schedule on node: " + node.getNodeName() +
               ", available: " + node.getAvailableResource());
         }
+
         assignment = root.assignContainers(
             clusterResource,
             node,
             // TODO, now we only consider limits for parent for non-labeled
             // resources, should consider labeled resources as well.
             new ResourceLimits(labelManager.getResourceByLabel(
-                RMNodeLabelsManager.NO_LABEL, clusterResource)));
+                RMNodeLabelsManager.NO_LABEL, clusterResource)),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+        if (Resources.greaterThan(calculator, clusterResource,
+            assignment.getResource(), Resources.none())) {
+          updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
+          return;
+        }
+        
+        // Only do non-exclusive allocation when node has node-labels.
+        if (StringUtils.equals(node.getPartition(),
+            RMNodeLabelsManager.NO_LABEL)) {
+          return;
+        }
+        
+        // Only do non-exclusive allocation when the node-label supports that
+        try {
+          if (rmContext.getNodeLabelManager().isExclusiveNodeLabel(
+              node.getPartition())) {
+            return;
+          }
+        } catch (IOException e) {
+          LOG.warn("Exception when trying to get exclusivity of node label="
+              + node.getPartition(), e);
+          return;
+        }
+        
+        // Try to use NON_EXCLUSIVE
+        assignment = root.assignContainers(
+            clusterResource,
+            node,
+            // TODO, now we only consider limits for parent for non-labeled
+            // resources, should consider labeled resources as well.
+            new ResourceLimits(labelManager.getResourceByLabel(
+                RMNodeLabelsManager.NO_LABEL, clusterResource)),
+            SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
         updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
+        if (Resources.greaterThan(calculator, clusterResource,
+            assignment.getResource(), Resources.none())) {
+          return;
+        }
       }
     } else {
-      LOG.info("Skipping scheduling since node " + node.getNodeID() + 
-          " is reserved by application " + 
-          
node.getReservedContainer().getContainerId().getApplicationAttemptId()
-          );
+      LOG.info("Skipping scheduling since node "
+          + node.getNodeID()
+          + " is reserved by application "
+          + node.getReservedContainer().getContainerId()
+              .getApplicationAttemptId());
     }
-  
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 102e553..4e8d617 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -319,6 +319,11 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
                getMaximumApplicationMasterResourcePercent());
   }
   
+  public void setMaximumApplicationMasterResourcePerQueuePercent(String queue,
+      float percent) {
+    setFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
+  }
+  
   public float getNonLabeledQueueCapacity(String queue) {
     float capacity = queue.equals("root") ? 100.0f : getFloat(
         getQueuePrefix(queue) + CAPACITY, UNDEFINED);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ebbf1bf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 59a016f..8a6a601 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -24,7 +24,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -58,6 +57,7 @@ import 
org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.AccessType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -718,39 +718,11 @@ public class LeafQueue extends AbstractCSQueue {
       ApplicationAttemptId applicationAttemptId) {
     return applicationAttemptMap.get(applicationAttemptId);
   }
-
-  private static final CSAssignment NULL_ASSIGNMENT =
-      new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
-  
-  private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
-  
-  private static Set<String> getRequestLabelSetByExpression(
-      String labelExpression) {
-    Set<String> labels = new HashSet<String>();
-    if (null == labelExpression) {
-      return labels;
-    }
-    for (String l : labelExpression.split("&&")) {
-      if (l.trim().isEmpty()) {
-        continue;
-      }
-      labels.add(l.trim());
-    }
-    return labels;
-  }
-  
-  private boolean checkResourceRequestMatchingNodeLabel(ResourceRequest 
offswitchResourceRequest,
-      FiCaSchedulerNode node) {
-    String askedNodeLabel = offswitchResourceRequest.getNodeLabelExpression();
-    if (null == askedNodeLabel) {
-      askedNodeLabel = RMNodeLabelsManager.NO_LABEL;
-    }
-    return askedNodeLabel.equals(node.getPartition());
-  }
   
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
+      FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
+      SchedulingMode schedulingMode) {
     updateCurrentResourceLimits(currentResourceLimits, clusterResource);
     
     if(LOG.isDebugEnabled()) {
@@ -758,12 +730,6 @@ public class LeafQueue extends AbstractCSQueue {
         + " #applications=" + activeApplications.size());
     }
     
-    // if our queue cannot access this node, just return
-    if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
-        node.getLabels())) {
-      return NULL_ASSIGNMENT;
-    }
-    
     // Check for reserved resources
     RMContainer reservedContainer = node.getReservedContainer();
     if (reservedContainer != null) {
@@ -771,8 +737,26 @@ public class LeafQueue extends AbstractCSQueue {
           getApplication(reservedContainer.getApplicationAttemptId());
       synchronized (application) {
         return assignReservedContainer(application, node, reservedContainer,
-            clusterResource);
+            clusterResource, schedulingMode);
+      }
+    }
+    
+    // if our queue cannot access this node, just return
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
+        && !accessibleToPartition(node.getPartition())) {
+      return NULL_ASSIGNMENT;
+    }
+    
+    // Check if this queue need more resource, simply skip allocation if this
+    // queue doesn't need more resources.
+    if (!hasPendingResourceRequest(node.getPartition(),
+        clusterResource, schedulingMode)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skip this queue=" + getQueuePath()
+            + ", because it doesn't need more resource, schedulingMode="
+            + schedulingMode.name() + " node-partition=" + 
node.getPartition());
       }
+      return NULL_ASSIGNMENT;
     }
     
     // Try to assign containers to applications in order
@@ -783,6 +767,17 @@ public class LeafQueue extends AbstractCSQueue {
         + application.getApplicationId());
         application.showRequests();
       }
+      
+      // Check if application needs more resource, skip if it doesn't need 
more.
+      if (!application.hasPendingResourceRequest(resourceCalculator,
+          node.getPartition(), clusterResource, schedulingMode)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
+              + ", because it doesn't need more resource, schedulingMode="
+              + schedulingMode.name() + " node-label=" + node.getPartition());
+        }
+        continue;
+      }
 
       synchronized (application) {
         // Check if this resource is on the blacklist
@@ -806,10 +801,27 @@ public class LeafQueue extends AbstractCSQueue {
             continue;
           }
           
+          // AM container allocation doesn't support non-exclusive allocation 
to
+          // avoid painful of preempt an AM container
+          if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+            RMAppAttempt rmAppAttempt =
+                csContext.getRMContext().getRMApps()
+                    
.get(application.getApplicationId()).getCurrentAppAttempt();
+            if (null == rmAppAttempt.getMasterContainer()) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Skip allocating AM container to app_attempt="
+                    + application.getApplicationAttemptId()
+                    + ", don't allow to allocate AM container in non-exclusive 
mode");
+              }
+              break;
+            }
+          }
+          
           // Is the node-label-expression of this offswitch resource request
           // matches the node's label?
           // If not match, jump to next priority.
-          if (!checkResourceRequestMatchingNodeLabel(anyRequest, node)) {
+          if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
+              anyRequest, node.getPartition(), schedulingMode)) {
             continue;
           }
           
@@ -822,10 +834,6 @@ public class LeafQueue extends AbstractCSQueue {
             }
           }
           
-          Set<String> requestedNodeLabels =
-              getRequestLabelSetByExpression(anyRequest
-                  .getNodeLabelExpression());
-
           // Compute user-limit & set headroom
           // Note: We compute both user-limit & headroom with the highest 
           //       priority request as the target. 
@@ -833,27 +841,61 @@ public class LeafQueue extends AbstractCSQueue {
           //       before all higher priority ones are serviced.
           Resource userLimit = 
               computeUserLimitAndSetHeadroom(application, clusterResource, 
-                  required, requestedNodeLabels);          
+                  required, node.getPartition(), schedulingMode);          
           
           // Check queue max-capacity limit
-          if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
-              this.currentResourceLimits, required, 
application.getCurrentReservation())) {
+          if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
+              this.currentResourceLimits, required,
+              application.getCurrentReservation(), schedulingMode)) {
             return NULL_ASSIGNMENT;
           }
 
           // Check user limit
           if (!canAssignToUser(clusterResource, application.getUser(), 
userLimit,
-              application, true, requestedNodeLabels)) {
+              application, true, node.getPartition())) {
             break;
           }
 
           // Inform the application it is about to get a scheduling opportunity
           application.addSchedulingOpportunity(priority);
           
+          // Increase missed-non-partitioned-resource-request-opportunity.
+          // This is to make sure non-partitioned-resource-request will prefer
+          // to be allocated to non-partitioned nodes
+          int missedNonPartitionedRequestSchedulingOpportunity = 0;
+          if (anyRequest.getNodeLabelExpression().equals(
+              RMNodeLabelsManager.NO_LABEL)) {
+            missedNonPartitionedRequestSchedulingOpportunity =
+                application
+                    
.addMissedNonPartitionedRequestSchedulingOpportunity(priority);
+          }
+          
+          if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+            // Before doing allocation, we need to check scheduling 
opportunity to
+            // make sure : non-partitioned resource request should be 
scheduled to
+            // non-partitioned partition first.
+            if (missedNonPartitionedRequestSchedulingOpportunity < scheduler
+                .getNumClusterNodes()) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Skip app_attempt="
+                    + application.getApplicationAttemptId()
+                    + " priority="
+                    + priority
+                    + " because missed-non-partitioned-resource-request"
+                    + " opportunity under requred:"
+                    + " Now=" + 
missedNonPartitionedRequestSchedulingOpportunity
+                    + " required="
+                    + scheduler.getNumClusterNodes());
+              }
+
+              break;
+            }
+          }
+          
           // Try to schedule
           CSAssignment assignment =  
             assignContainersOnNode(clusterResource, node, application, 
priority, 
-                null);
+                null, schedulingMode);
 
           // Did the application skip this node?
           if (assignment.getSkipped()) {
@@ -870,9 +912,9 @@ public class LeafQueue extends AbstractCSQueue {
             // Book-keeping 
             // Note: Update headroom to account for current allocation too...
             allocateResource(clusterResource, application, assigned,
-                node.getLabels());
+                node.getPartition());
             
-            // Don't reset scheduling opportunities for non-local assignments
+            // Don't reset scheduling opportunities for offswitch assignments
             // otherwise the app will be delayed for each non-local assignment.
             // This helps apps with many off-cluster requests schedule faster.
             if (assignment.getType() != NodeType.OFF_SWITCH) {
@@ -881,6 +923,10 @@ public class LeafQueue extends AbstractCSQueue {
               }
               application.resetSchedulingOpportunities(priority);
             }
+            // Non-exclusive scheduling opportunity is different: we need reset
+            // it every time to make sure non-labeled resource request will be
+            // most likely allocated on non-labeled nodes first. 
+            
application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
             
             // Done
             return assignment;
@@ -904,7 +950,8 @@ public class LeafQueue extends AbstractCSQueue {
 
   private synchronized CSAssignment assignReservedContainer(
       FiCaSchedulerApp application, FiCaSchedulerNode node,
-      RMContainer rmContainer, Resource clusterResource) {
+      RMContainer rmContainer, Resource clusterResource,
+      SchedulingMode schedulingMode) {
     // Do we still need this reservation?
     Priority priority = rmContainer.getReservedPriority();
     if (application.getTotalRequiredResources(priority) == 0) {
@@ -915,7 +962,7 @@ public class LeafQueue extends AbstractCSQueue {
     // Try to assign if we have sufficient resources
     CSAssignment tmp =
         assignContainersOnNode(clusterResource, node, application, priority,
-          rmContainer);
+          rmContainer, schedulingMode);
     
     // Doesn't matter... since it's already charged for at time of reservation
     // "re-reservation" is *free*
@@ -929,7 +976,8 @@ public class LeafQueue extends AbstractCSQueue {
   protected Resource getHeadroom(User user, Resource queueCurrentLimit,
       Resource clusterResource, FiCaSchedulerApp application, Resource 
required) {
     return getHeadroom(user, queueCurrentLimit, clusterResource,
-         computeUserLimit(application, clusterResource, required, user, null));
+        computeUserLimit(application, clusterResource, required, user,
+            RMNodeLabelsManager.NO_LABEL, 
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
   }
   
   private Resource getHeadroom(User user, Resource currentResourceLimit,
@@ -973,7 +1021,8 @@ public class LeafQueue extends AbstractCSQueue {
 
   @Lock({LeafQueue.class, FiCaSchedulerApp.class})
   Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
-      Resource clusterResource, Resource required, Set<String> 
requestedLabels) {
+      Resource clusterResource, Resource required, String nodePartition,
+      SchedulingMode schedulingMode) {
     String user = application.getUser();
     User queueUser = getUser(user);
 
@@ -981,7 +1030,7 @@ public class LeafQueue extends AbstractCSQueue {
     // TODO, need consider headroom respect labels also
     Resource userLimit =
         computeUserLimit(application, clusterResource, required,
-            queueUser, requestedLabels);
+            queueUser, nodePartition, schedulingMode);
 
     setQueueResourceLimitsInfo(clusterResource);
     
@@ -1010,34 +1059,18 @@ public class LeafQueue extends AbstractCSQueue {
   @Lock(NoLock.class)
   private Resource computeUserLimit(FiCaSchedulerApp application,
       Resource clusterResource, Resource required, User user,
-      Set<String> requestedLabels) {
+      String nodePartition, SchedulingMode schedulingMode) {
     // What is our current capacity? 
     // * It is equal to the max(required, queue-capacity) if
     //   we're running below capacity. The 'max' ensures that jobs in queues
     //   with miniscule capacity (< 1 slot) make progress
     // * If we're running over capacity, then its
     //   (usedResources + required) (which extra resources we are allocating)
-    Resource queueCapacity = Resource.newInstance(0, 0);
-    if (requestedLabels != null && !requestedLabels.isEmpty()) {
-      // if we have multiple labels to request, we will choose to use the first
-      // label
-      String firstLabel = requestedLabels.iterator().next();
-      queueCapacity =
-          Resources
-              .max(resourceCalculator, clusterResource, queueCapacity,
-                  Resources.multiplyAndNormalizeUp(resourceCalculator,
-                      labelManager.getResourceByLabel(firstLabel,
-                          clusterResource),
-                      queueCapacities.getAbsoluteCapacity(firstLabel),
-                      minimumAllocation));
-    } else {
-      // else there's no label on request, just to use absolute capacity as
-      // capacity for nodes without label
-      queueCapacity =
-          Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager
-                .getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, 
clusterResource),
-              queueCapacities.getAbsoluteCapacity(), minimumAllocation);
-    }
+    Resource queueCapacity =
+        Resources.multiplyAndNormalizeUp(resourceCalculator,
+            labelManager.getResourceByLabel(nodePartition, clusterResource),
+            queueCapacities.getAbsoluteCapacity(nodePartition),
+            minimumAllocation);
 
     // Allow progress for queues with miniscule capacity
     queueCapacity =
@@ -1047,33 +1080,56 @@ public class LeafQueue extends AbstractCSQueue {
             required);
 
     Resource currentCapacity =
-        Resources.lessThan(resourceCalculator, clusterResource, 
-            queueUsage.getUsed(), queueCapacity) ?
-            queueCapacity : Resources.add(queueUsage.getUsed(), required);
+        Resources.lessThan(resourceCalculator, clusterResource,
+            queueUsage.getUsed(nodePartition), queueCapacity) ? queueCapacity
+            : Resources.add(queueUsage.getUsed(nodePartition), required);
     
     // Never allow a single user to take more than the 
     // queue's configured capacity * user-limit-factor.
     // Also, the queue's configured capacity should be higher than 
     // queue-hard-limit * ulMin
     
-    final int activeUsers = activeUsersManager.getNumActiveUsers();  
-               
-    Resource limit =
+    final int activeUsers = activeUsersManager.getNumActiveUsers();
+    
+    // User limit resource is determined by:
+    // max{currentCapacity / #activeUsers, currentCapacity * 
user-limit-percentage%)
+    Resource userLimitResource = Resources.max(
+        resourceCalculator, clusterResource, 
+        Resources.divideAndCeil(
+            resourceCalculator, currentCapacity, activeUsers),
+        Resources.divideAndCeil(
+            resourceCalculator, 
+            Resources.multiplyAndRoundDown(
+                currentCapacity, userLimit), 
+            100)
+        );
+    
+    // User limit is capped by maxUserLimit
+    // - maxUserLimit = queueCapacity * user-limit-factor 
(RESPECT_PARTITION_EXCLUSIVITY)
+    // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY)
+    //
+    // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a
+    // partition, its guaranteed resource on that partition is 0. And
+    // user-limit-factor computation is based on queue's guaranteed capacity. 
So
+    // we will not cap user-limit as well as used resource when doing
+    // IGNORE_PARTITION_EXCLUSIVITY allocation.
+    Resource maxUserLimit = Resources.none();
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+      maxUserLimit =
+          Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor);
+    } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+      maxUserLimit =
+          labelManager.getResourceByLabel(nodePartition, clusterResource);
+    }
+    
+    // Cap final user limit with maxUserLimit
+    userLimitResource =
         Resources.roundUp(
             resourceCalculator, 
             Resources.min(
                 resourceCalculator, clusterResource,   
-                Resources.max(
-                    resourceCalculator, clusterResource, 
-                    Resources.divideAndCeil(
-                        resourceCalculator, currentCapacity, activeUsers),
-                    Resources.divideAndCeil(
-                        resourceCalculator, 
-                        Resources.multiplyAndRoundDown(
-                            currentCapacity, userLimit), 
-                        100)
-                    ), 
-                Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor)
+                  userLimitResource,
+                  maxUserLimit
                 ), 
             minimumAllocation);
 
@@ -1081,11 +1137,11 @@ public class LeafQueue extends AbstractCSQueue {
       String userName = application.getUser();
       LOG.debug("User limit computation for " + userName + 
           " in queue " + getQueueName() +
-          " userLimit=" + userLimit +
+          " userLimitPercent=" + userLimit +
           " userLimitFactor=" + userLimitFactor +
           " required: " + required + 
           " consumed: " + user.getUsed() + 
-          " limit: " + limit +
+          " user-limit-resource: " + userLimitResource +
           " queueCapacity: " + queueCapacity + 
           " qconsumed: " + queueUsage.getUsed() +
           " currentCapacity: " + currentCapacity +
@@ -1093,31 +1149,26 @@ public class LeafQueue extends AbstractCSQueue {
           " clusterCapacity: " + clusterResource
       );
     }
-    user.setUserResourceLimit(limit);
-    return limit;
+    user.setUserResourceLimit(userLimitResource);
+    return userLimitResource;
   }
   
   @Private
   protected synchronized boolean canAssignToUser(Resource clusterResource,
       String userName, Resource limit, FiCaSchedulerApp application,
-      boolean checkReservations, Set<String> requestLabels) {
+      boolean checkReservations, String nodePartition) {
     User user = getUser(userName);
-    
-    String label = CommonNodeLabelsManager.NO_LABEL;
-    if (requestLabels != null && !requestLabels.isEmpty()) {
-      label = requestLabels.iterator().next();
-    }
 
     // Note: We aren't considering the current request since there is a fixed
     // overhead of the AM, but it's a > check, not a >= check, so...
     if (Resources
         .greaterThan(resourceCalculator, clusterResource,
-            user.getUsed(label),
+            user.getUsed(nodePartition),
             limit)) {
       // if enabled, check to see if could we potentially use this node instead
       // of a reserved node if the application has reserved containers
       if (this.reservationsContinueLooking && checkReservations
-          && label.equals(CommonNodeLabelsManager.NO_LABEL)) {
+          && nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) {
         if (Resources.lessThanOrEqual(
             resourceCalculator,
             clusterResource,
@@ -1136,7 +1187,7 @@ public class LeafQueue extends AbstractCSQueue {
       if (LOG.isDebugEnabled()) {
         LOG.debug("User " + userName + " in queue " + getQueueName()
             + " will exceed limit - " + " consumed: "
-            + user.getUsed() + " limit: " + limit);
+            + user.getUsed(nodePartition) + " limit: " + limit);
       }
       return false;
     }
@@ -1176,7 +1227,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   private CSAssignment assignContainersOnNode(Resource clusterResource,
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer) {
+      RMContainer reservedContainer, SchedulingMode schedulingMode) {
 
     CSAssignment assigned;
 
@@ -1190,7 +1241,7 @@ public class LeafQueue extends AbstractCSQueue {
       assigned =
           assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, 
             node, application, priority, reservedContainer,
-            allocatedContainer);
+            allocatedContainer, schedulingMode);
       if (Resources.greaterThan(resourceCalculator, clusterResource,
         assigned.getResource(), Resources.none())) {
 
@@ -1219,7 +1270,7 @@ public class LeafQueue extends AbstractCSQueue {
       assigned = 
           assignRackLocalContainers(clusterResource, rackLocalResourceRequest, 
             node, application, priority, reservedContainer,
-            allocatedContainer);
+            allocatedContainer, schedulingMode);
       if (Resources.greaterThan(resourceCalculator, clusterResource,
         assigned.getResource(), Resources.none())) {
 
@@ -1248,7 +1299,7 @@ public class LeafQueue extends AbstractCSQueue {
       assigned =
           assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
             node, application, priority, reservedContainer,
-            allocatedContainer);
+            allocatedContainer, schedulingMode);
 
       // update locality statistics
       if (allocatedContainer.getValue() != null) {
@@ -1314,16 +1365,17 @@ public class LeafQueue extends AbstractCSQueue {
 
   @Private
   protected boolean checkLimitsToReserve(Resource clusterResource,
-      FiCaSchedulerApp application, Resource capability) {
+      FiCaSchedulerApp application, Resource capability, String nodePartition,
+      SchedulingMode schedulingMode) {
     // we can't reserve if we got here based on the limit
     // checks assuming we could unreserve!!!
     Resource userLimit = computeUserLimitAndSetHeadroom(application,
-        clusterResource, capability, null);
+        clusterResource, capability, nodePartition, schedulingMode);
 
     // Check queue max-capacity limit,
     // TODO: Consider reservation on labels
-    if (!canAssignToThisQueue(clusterResource, null,
-        this.currentResourceLimits, capability, Resources.none())) {
+    if (!canAssignToThisQueue(clusterResource, RMNodeLabelsManager.NO_LABEL,
+        this.currentResourceLimits, capability, Resources.none(), 
schedulingMode)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("was going to reserve but hit queue limit");
       }
@@ -1332,7 +1384,7 @@ public class LeafQueue extends AbstractCSQueue {
 
     // Check user limit
     if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
-        application, false, null)) {
+        application, false, nodePartition)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("was going to reserve but hit user limit");
       }
@@ -1345,12 +1397,13 @@ public class LeafQueue extends AbstractCSQueue {
   private CSAssignment assignNodeLocalContainers(Resource clusterResource,
       ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer) {
+      RMContainer reservedContainer, MutableObject allocatedContainer,
+      SchedulingMode schedulingMode) {
     if (canAssign(application, priority, node, NodeType.NODE_LOCAL, 
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
-          allocatedContainer);
+          allocatedContainer, schedulingMode);
     }
 
     return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
@@ -1359,12 +1412,13 @@ public class LeafQueue extends AbstractCSQueue {
   private CSAssignment assignRackLocalContainers(Resource clusterResource,
       ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer) {
+      RMContainer reservedContainer, MutableObject allocatedContainer,
+      SchedulingMode schedulingMode) {
     if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
-          allocatedContainer);
+          allocatedContainer, schedulingMode);
     }
 
     return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL);
@@ -1373,16 +1427,21 @@ public class LeafQueue extends AbstractCSQueue {
   private CSAssignment assignOffSwitchContainers(Resource clusterResource,
       ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer) {
+      RMContainer reservedContainer, MutableObject allocatedContainer,
+      SchedulingMode schedulingMode) {
     if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
-          allocatedContainer);
+          allocatedContainer, schedulingMode);
     }
     
     return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH);
   }
+  
+  private int getActualNodeLocalityDelay() {
+    return Math.min(scheduler.getNumClusterNodes(), getNodeLocalityDelay());
+  }
 
   boolean canAssign(FiCaSchedulerApp application, Priority priority, 
       FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) {
@@ -1417,10 +1476,7 @@ public class LeafQueue extends AbstractCSQueue {
     if (type == NodeType.RACK_LOCAL) {
       // 'Delay' rack-local just a little bit...
       long missedOpportunities = 
application.getSchedulingOpportunities(priority);
-      return (
-          Math.min(scheduler.getNumClusterNodes(), getNodeLocalityDelay()) < 
-          missedOpportunities
-          );
+      return getActualNodeLocalityDelay() < missedOpportunities;
     }
 
     // Check if we need containers on this host
@@ -1460,7 +1516,7 @@ public class LeafQueue extends AbstractCSQueue {
   private CSAssignment assignContainer(Resource clusterResource, 
FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority, 
       ResourceRequest request, NodeType type, RMContainer rmContainer,
-      MutableObject createdContainer) {
+      MutableObject createdContainer, SchedulingMode schedulingMode) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
         + " application=" + application.getApplicationId()
@@ -1469,9 +1525,8 @@ public class LeafQueue extends AbstractCSQueue {
     }
     
     // check if the resource request can access the label
-    if (!SchedulerUtils.checkNodeLabelExpression(
-        node.getLabels(),
-        request.getNodeLabelExpression())) {
+    if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request,
+        node.getPartition(), schedulingMode)) {
       // this is a reserved container, but we cannot allocate it now according
       // to label not match. This can be caused by node label changed
       // We should un-reserve this container.
@@ -1576,8 +1631,8 @@ public class LeafQueue extends AbstractCSQueue {
           // If we're trying to reserve a container here, not container will be
           // unreserved for reserving the new one. Check limits again before
           // reserve the new container
-          if (!checkLimitsToReserve(clusterResource, 
-              application, capability)) {
+          if (!checkLimitsToReserve(clusterResource,
+              application, capability, node.getPartition(), schedulingMode)) {
             return new CSAssignment(Resources.none(), type);
           }
         }
@@ -1666,7 +1721,7 @@ public class LeafQueue extends AbstractCSQueue {
         // Book-keeping
         if (removed) {
           releaseResource(clusterResource, application,
-              container.getResource(), node.getLabels());
+              container.getResource(), node.getPartition());
           LOG.info("completedContainer" +
               " container=" + container +
               " queue=" + this +
@@ -1684,13 +1739,13 @@ public class LeafQueue extends AbstractCSQueue {
 
   synchronized void allocateResource(Resource clusterResource,
       SchedulerApplicationAttempt application, Resource resource,
-      Set<String> nodeLabels) {
-    super.allocateResource(clusterResource, resource, nodeLabels);
+      String nodePartition) {
+    super.allocateResource(clusterResource, resource, nodePartition);
     
     // Update user metrics
     String userName = application.getUser();
     User user = getUser(userName);
-    user.assignContainer(resource, nodeLabels);
+    user.assignContainer(resource, nodePartition);
     // Note this is a bit unconventional since it gets the object and modifies
     // it here, rather then using set routine
     Resources.subtractFrom(application.getHeadroom(), resource); // headroom
@@ -1707,13 +1762,13 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   synchronized void releaseResource(Resource clusterResource, 
-      FiCaSchedulerApp application, Resource resource, Set<String> nodeLabels) 
{
-    super.releaseResource(clusterResource, resource, nodeLabels);
+      FiCaSchedulerApp application, Resource resource, String nodePartition) {
+    super.releaseResource(clusterResource, resource, nodePartition);
     
     // Update user metrics
     String userName = application.getUser();
     User user = getUser(userName);
-    user.releaseContainer(resource, nodeLabels);
+    user.releaseContainer(resource, nodePartition);
     metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
       
     LOG.info(getQueueName() + 
@@ -1723,7 +1778,8 @@ public class LeafQueue extends AbstractCSQueue {
   
   private void updateAbsoluteCapacityResource(Resource clusterResource) {
     absoluteCapacityResource =
-        Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource,
+        Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager
+            .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
             queueCapacities.getAbsoluteCapacity(), minimumAllocation);
   }
   
@@ -1769,8 +1825,9 @@ public class LeafQueue extends AbstractCSQueue {
     // Update application properties
     for (FiCaSchedulerApp application : activeApplications) {
       synchronized (application) {
-        computeUserLimitAndSetHeadroom(application, clusterResource, 
-            Resources.none(), null);
+        computeUserLimitAndSetHeadroom(application, clusterResource,
+            Resources.none(), RMNodeLabelsManager.NO_LABEL,
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
       }
     }
   }
@@ -1828,25 +1885,12 @@ public class LeafQueue extends AbstractCSQueue {
       }
     }
 
-    public void assignContainer(Resource resource,
-        Set<String> nodeLabels) {
-      if (nodeLabels == null || nodeLabels.isEmpty()) {
-        userResourceUsage.incUsed(resource);
-      } else {
-        for (String label : nodeLabels) {
-          userResourceUsage.incUsed(label, resource);
-        }
-      }
+    public void assignContainer(Resource resource, String nodePartition) {
+      userResourceUsage.incUsed(nodePartition, resource);
     }
 
-    public void releaseContainer(Resource resource, Set<String> nodeLabels) {
-      if (nodeLabels == null || nodeLabels.isEmpty()) {
-        userResourceUsage.decUsed(resource);
-      } else {
-        for (String label : nodeLabels) {
-          userResourceUsage.decUsed(label, resource);
-        }
-      }
+    public void releaseContainer(Resource resource, String nodePartition) {
+      userResourceUsage.decUsed(nodePartition, resource);
     }
 
     public Resource getUserResourceLimit() {
@@ -1869,7 +1913,7 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, attempt, rmContainer.getContainer()
-          .getResource(), node.getLabels());
+          .getResource(), node.getPartition());
     }
     getParent().recoverContainer(clusterResource, attempt, rmContainer);
   }
@@ -1909,7 +1953,7 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), node.getLabels());
+          .getResource(), node.getPartition());
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1927,7 +1971,7 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       releaseResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), node.getLabels());
+          .getResource(), node.getPartition());
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()

Reply via email to