http://git-wip-us.apache.org/repos/asf/hadoop/blob/6cdcab90/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index fd31a2d..09b59ee 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -32,7 +32,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.lang.StringUtils;
@@ -113,7 +115,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Alloca
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceAllocationCommitter;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -129,6 +135,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourc
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
 import 
org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@@ -143,11 +152,12 @@ import com.google.common.base.Preconditions;
 @SuppressWarnings("unchecked")
 public class CapacityScheduler extends
     AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
-    PreemptableResourceScheduler, CapacitySchedulerContext, Configurable {
+    PreemptableResourceScheduler, CapacitySchedulerContext, Configurable,
+    ResourceAllocationCommitter {
 
   private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
   private YarnAuthorizationProvider authorizer;
- 
+
   private CSQueue root;
   // timeout to join when we stop this service
   protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
@@ -156,6 +166,8 @@ public class CapacityScheduler extends
 
   private volatile boolean isLazyPreemptionEnabled = false;
 
+  private int offswitchPerHeartbeatLimit;
+
   static final Comparator<CSQueue> nonPartitionedQueueComparator =
       new Comparator<CSQueue>() {
     @Override
@@ -177,7 +189,7 @@ public class CapacityScheduler extends
   public void setConf(Configuration conf) {
       yarnConf = conf;
   }
-  
+
   private void validateConf(Configuration conf) {
     // validate scheduler memory allocation setting
     int minMem = conf.getInt(
@@ -230,7 +242,8 @@ public class CapacityScheduler extends
   private boolean usePortForNodeName;
 
   private boolean scheduleAsynchronously;
-  private AsyncScheduleThread asyncSchedulerThread;
+  private List<AsyncScheduleThread> asyncSchedulerThreads;
+  private ResourceCommitterService resourceCommitterService;
   private RMNodeLabelsManager labelManager;
 
   /**
@@ -254,7 +267,7 @@ public class CapacityScheduler extends
   public CSQueue getRootQueue() {
     return root;
   }
-  
+
   @Override
   public CapacitySchedulerConfiguration getConfiguration() {
     return conf;
@@ -270,11 +283,16 @@ public class CapacityScheduler extends
     return calculator;
   }
 
+  @VisibleForTesting
+  public void setResourceCalculator(ResourceCalculator rc) {
+    this.calculator = rc;
+  }
+
   @Override
   public Comparator<CSQueue> getNonPartitionedQueueComparator() {
     return nonPartitionedQueueComparator;
   }
-  
+
   @Override
   public PartitionedQueueComparator getPartitionedQueueComparator() {
     return partitionedQueueComparator;
@@ -295,7 +313,8 @@ public class CapacityScheduler extends
     this.rmContext = rmContext;
   }
 
-  private void initScheduler(Configuration configuration) throws
+  @VisibleForTesting
+  void initScheduler(Configuration configuration) throws
       IOException {
     try {
       writeLock.lock();
@@ -316,10 +335,24 @@ public class CapacityScheduler extends
       scheduleAsynchronously = this.conf.getScheduleAynschronously();
       asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
           DEFAULT_ASYNC_SCHEDULER_INTERVAL);
+
+      // number of threads for async scheduling
+      int maxAsyncSchedulingThreads = this.conf.getInt(
+          
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
+          1);
+      maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);
+
       if (scheduleAsynchronously) {
-        asyncSchedulerThread = new AsyncScheduleThread(this);
+        asyncSchedulerThreads = new ArrayList<>();
+        for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
+          asyncSchedulerThreads.add(new AsyncScheduleThread(this));
+        }
+        resourceCommitterService = new ResourceCommitterService(this);
       }
 
+      // Setup how many containers we can allocate for each round
+      offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
+
       LOG.info("Initialized CapacityScheduler with " + "calculator="
           + getResourceCalculator().getClass() + ", " + "minimumAllocation=<"
           + getMinimumResourceCapability() + ">, " + "maximumAllocation=<"
@@ -336,9 +369,13 @@ public class CapacityScheduler extends
       writeLock.lock();
       activitiesManager.start();
       if (scheduleAsynchronously) {
-        Preconditions.checkNotNull(asyncSchedulerThread,
-            "asyncSchedulerThread is null");
-        asyncSchedulerThread.start();
+        Preconditions.checkNotNull(asyncSchedulerThreads,
+            "asyncSchedulerThreads is null");
+        for (Thread t : asyncSchedulerThreads) {
+          t.start();
+        }
+
+        resourceCommitterService.start();
       }
     } finally {
       writeLock.unlock();
@@ -362,9 +399,13 @@ public class CapacityScheduler extends
   public void serviceStop() throws Exception {
     try {
       writeLock.lock();
-      if (scheduleAsynchronously && asyncSchedulerThread != null) {
-        asyncSchedulerThread.interrupt();
-        asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS);
+      if (scheduleAsynchronously && asyncSchedulerThreads != null) {
+        for (Thread t : asyncSchedulerThreads) {
+          t.interrupt();
+          t.join(THREAD_JOIN_TIMEOUT_MS);
+        }
+        resourceCommitterService.interrupt();
+        resourceCommitterService.join(THREAD_JOIN_TIMEOUT_MS);
       }
     } finally {
       writeLock.unlock();
@@ -394,17 +435,20 @@ public class CapacityScheduler extends
 
       // update lazy preemption
       this.isLazyPreemptionEnabled = this.conf.getLazyPreemptionEnabled();
+
+      // Setup how many containers we can allocate for each round
+      offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
     } finally {
       writeLock.unlock();
     }
   }
-  
+
   long getAsyncScheduleInterval() {
     return asyncScheduleInterval;
   }
 
   private final static Random random = new Random(System.currentTimeMillis());
-  
+
   /**
    * Schedule on all nodes by starting at a random point.
    * @param cs
@@ -414,20 +458,22 @@ public class CapacityScheduler extends
     int current = 0;
     Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
     int start = random.nextInt(nodes.size());
+
     for (FiCaSchedulerNode node : nodes) {
       if (current++ >= start) {
-        cs.allocateContainersToNode(node);
+        cs.allocateContainersToNode(node.getNodeID(), false);
       }
     }
     // Now, just get everyone to be safe
     for (FiCaSchedulerNode node : nodes) {
-      cs.allocateContainersToNode(node);
+      cs.allocateContainersToNode(node.getNodeID(), false);
     }
+
     try {
       Thread.sleep(cs.getAsyncScheduleInterval());
     } catch (InterruptedException e) {}
   }
-  
+
   static class AsyncScheduleThread extends Thread {
 
     private final CapacityScheduler cs;
@@ -441,12 +487,19 @@ public class CapacityScheduler extends
     @Override
     public void run() {
       while (true) {
-        if (!runSchedules.get()) {
-          try {
+        try {
+          if (!runSchedules.get() || Thread.currentThread().isInterrupted()) {
             Thread.sleep(100);
-          } catch (InterruptedException ie) {}
-        } else {
-          schedule(cs);
+          } else {
+            // Don't run schedule if we have some pending backlogs already
+            if (cs.getAsyncSchedulingPendingBacklogs() > 100) {
+              Thread.sleep(1);
+            } else{
+              schedule(cs);
+            }
+          }
+        } catch (InterruptedException ie) {
+          // Do nothing
         }
       }
     }
@@ -461,6 +514,46 @@ public class CapacityScheduler extends
 
   }
 
+  static class ResourceCommitterService extends Thread {
+    private final CapacityScheduler cs;
+    private BlockingQueue<ResourceCommitRequest<FiCaSchedulerApp, 
FiCaSchedulerNode>>
+        backlogs = new LinkedBlockingQueue<>();
+
+    public ResourceCommitterService(CapacityScheduler cs) {
+      this.cs = cs;
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      while (!Thread.currentThread().isInterrupted()) {
+        try {
+          ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
+              backlogs.take();
+
+          try {
+            cs.writeLock.lock();
+            cs.tryCommit(cs.getClusterResource(), request);
+          } finally {
+            cs.writeLock.unlock();
+          }
+
+        } catch (InterruptedException e) {
+          LOG.error(e);
+        }
+      }
+    }
+
+    public void addNewCommitRequest(
+        ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> proposal) {
+      backlogs.add(proposal);
+    }
+
+    public int getPendingBacklogs() {
+      return backlogs.size();
+    }
+  }
+
   static class QueueHook {
     public CSQueue hook(CSQueue queue) {
       return queue;
@@ -508,14 +601,14 @@ public class CapacityScheduler extends
 
   private void updatePlacementRules() throws IOException {
     List<PlacementRule> placementRules = new ArrayList<>();
-    
+
     // Initialize UserGroupMappingPlacementRule
     // TODO, need make this defineable by configuration.
     UserGroupMappingPlacementRule ugRule = getUserGroupMappingPlacementRule();
     if (null != ugRule) {
       placementRules.add(ugRule);
     }
-    
+
     rmContext.getQueuePlacementManager().updateRules(placementRules);
   }
 
@@ -523,8 +616,8 @@ public class CapacityScheduler extends
   private void initializeQueues(CapacitySchedulerConfiguration conf)
     throws IOException {
 
-    root = 
-        parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, 
+    root =
+        parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
             queues, queues, noop);
     labelManager.reinitializeQueueLabels(getQueueToLabels());
     LOG.info("Initialized root queue " + root);
@@ -540,16 +633,16 @@ public class CapacityScheduler extends
   throws IOException {
     // Parse new queues
     Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
-    CSQueue newRoot = 
+    CSQueue newRoot =
         parseQueue(this, newConf, null, CapacitySchedulerConfiguration.ROOT,
             newQueues, queues, noop);
-    
+
     // Ensure all existing queues are still present
     validateExistingQueues(queues, newQueues);
 
     // Add new queues
     addNewQueues(queues, newQueues);
-    
+
     // Re-configure queues
     root.reinitialize(newRoot, getClusterResource());
     updatePlacementRules();
@@ -593,14 +686,14 @@ public class CapacityScheduler extends
    */
   @Lock(CapacityScheduler.class)
   private void validateExistingQueues(
-      Map<String, CSQueue> queues, Map<String, CSQueue> newQueues) 
+      Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
   throws IOException {
     // check that all static queues are included in the newQueues list
     for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
       if (!(e.getValue() instanceof ReservationQueue)) {
         String queueName = e.getKey();
         CSQueue oldQueue = e.getValue();
-        CSQueue newQueue = newQueues.get(queueName); 
+        CSQueue newQueue = newQueues.get(queueName);
         if (null == newQueue) {
           throw new IOException(queueName + " cannot be found during 
refresh!");
         } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
@@ -620,7 +713,7 @@ public class CapacityScheduler extends
    */
   @Lock(CapacityScheduler.class)
   private void addNewQueues(
-      Map<String, CSQueue> queues, Map<String, CSQueue> newQueues) 
+      Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
   {
     for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
       String queueName = e.getKey();
@@ -630,19 +723,19 @@ public class CapacityScheduler extends
       }
     }
   }
-  
+
   @Lock(CapacityScheduler.class)
   static CSQueue parseQueue(
       CapacitySchedulerContext csContext,
-      CapacitySchedulerConfiguration conf, 
+      CapacitySchedulerConfiguration conf,
       CSQueue parent, String queueName, Map<String, CSQueue> queues,
-      Map<String, CSQueue> oldQueues, 
+      Map<String, CSQueue> oldQueues,
       QueueHook hook) throws IOException {
     CSQueue queue;
     String fullQueueName =
         (parent == null) ? queueName
             : (parent.getQueuePath() + "." + queueName);
-    String[] childQueueNames = 
+    String[] childQueueNames =
       conf.getQueues(fullQueueName);
     boolean isReservableQueue = conf.isReservable(fullQueueName);
     if (childQueueNames == null || childQueueNames.length == 0) {
@@ -677,8 +770,8 @@ public class CapacityScheduler extends
 
       List<CSQueue> childQueues = new ArrayList<CSQueue>();
       for (String childQueueName : childQueueNames) {
-        CSQueue childQueue = 
-          parseQueue(csContext, conf, queue, childQueueName, 
+        CSQueue childQueue =
+          parseQueue(csContext, conf, queue, childQueueName,
               queues, oldQueues, hook);
         childQueues.add(childQueue);
       }
@@ -961,6 +1054,9 @@ public class CapacityScheduler extends
 
   private LeafQueue updateIncreaseRequests(
       List<UpdateContainerRequest> increaseRequests, FiCaSchedulerApp app) {
+    // When application has some pending to-be-removed resource requests,
+    app.removedToBeRemovedIncreaseRequests();
+
     if (null == increaseRequests || increaseRequests.isEmpty()) {
       return null;
     }
@@ -1069,8 +1165,8 @@ public class CapacityScheduler extends
 
   @Override
   @Lock(Lock.NoLock.class)
-  public QueueInfo getQueueInfo(String queueName, 
-      boolean includeChildQueues, boolean recursive) 
+  public QueueInfo getQueueInfo(String queueName,
+      boolean includeChildQueues, boolean recursive)
   throws IOException {
     CSQueue queue = null;
     queue = this.queues.get(queueName);
@@ -1095,20 +1191,33 @@ public class CapacityScheduler extends
   }
 
   @Override
-  protected synchronized void nodeUpdate(RMNode nm) {
+  protected synchronized void nodeUpdate(RMNode rmNode) {
     try {
-      writeLock.lock();
+      readLock.lock();
       setLastNodeUpdateTime(Time.now());
-      super.nodeUpdate(nm);
-      if (!scheduleAsynchronously) {
+      super.nodeUpdate(rmNode);
+    } finally {
+      readLock.unlock();
+    }
+
+    // Try to do scheduling
+    if (!scheduleAsynchronously) {
+      try {
+        writeLock.lock();
         ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
-            nm.getNodeID());
-        allocateContainersToNode(getNode(nm.getNodeID()));
+            rmNode.getNodeID());
+
+        // reset allocation and reservation stats before we start doing any
+        // work
+        updateSchedulerHealth(lastNodeUpdateTime, rmNode.getNodeID(),
+            CSAssignment.NULL_ASSIGNMENT);
+
+        allocateContainersToNode(rmNode.getNodeID(), true);
         ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
-            nm.getNodeID());
+            rmNode.getNodeID());
+      } finally {
+        writeLock.unlock();
       }
-    } finally {
-      writeLock.unlock();
     }
   }
 
@@ -1175,10 +1284,8 @@ public class CapacityScheduler extends
     node.updateLabels(newLabels);
   }
 
-  private void updateSchedulerHealth(long now, FiCaSchedulerNode node,
+  private void updateSchedulerHealth(long now, NodeId nodeId,
       CSAssignment assignment) {
-
-    NodeId nodeId = node.getNodeID();
     List<AssignmentInformation.AssignmentDetails> allocations =
         assignment.getAssignmentInformation().getAllocationDetails();
     List<AssignmentInformation.AssignmentDetails> reservations =
@@ -1204,137 +1311,248 @@ public class CapacityScheduler extends
     schedulerHealth.updateSchedulerRunDetails(now, assignment
       .getAssignmentInformation().getAllocated(), assignment
       .getAssignmentInformation().getReserved());
- }
-
-  @VisibleForTesting
-  public void allocateContainersToNode(FiCaSchedulerNode node) {
-    try {
-      writeLock.lock();
-      if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
-          .isSchedulerReadyForAllocatingContainers()) {
-        return;
-      }
+  }
 
-      if (!nodeTracker.exists(node.getNodeID())) {
-        LOG.info("Skipping scheduling as the node " + node.getNodeID()
-            + " has been removed");
-        return;
+  private boolean canAllocateMore(CSAssignment assignment, int offswitchCount) 
{
+    if (null != assignment && Resources.greaterThan(getResourceCalculator(),
+        getClusterResource(), assignment.getResource(), Resources.none())
+        && offswitchCount < offswitchPerHeartbeatLimit) {
+      // And it should not be a reserved container
+      if (assignment.getAssignmentInformation().getNumReservations() == 0) {
+        return true;
       }
+    }
 
-      // reset allocation and reservation stats before we start doing any work
-      updateSchedulerHealth(lastNodeUpdateTime, node,
-          new CSAssignment(Resources.none(), NodeType.NODE_LOCAL));
-
-      CSAssignment assignment;
+    return false;
+  }
 
-      // Assign new containers...
-      // 1. Check for reserved applications
-      // 2. Schedule if there are no reservations
+  /**
+   * We need to make sure when doing allocation, Node should be existed
+   * And we will construct a {@link PlacementSet} before proceeding
+   */
+  private void allocateContainersToNode(NodeId nodeId,
+      boolean withNodeHeartbeat) {
+    FiCaSchedulerNode node = getNode(nodeId);
+    if (null != node) {
+      int offswitchCount = 0;
+
+      PlacementSet<FiCaSchedulerNode> ps = new SimplePlacementSet<>(node);
+      CSAssignment assignment = allocateContainersToNode(ps, 
withNodeHeartbeat);
+      // Only check if we can allocate more container on the same node when
+      // scheduling is triggered by node heartbeat
+      if (null != assignment && withNodeHeartbeat) {
+        if (assignment.getType() == NodeType.OFF_SWITCH) {
+          offswitchCount++;
+        }
 
-      RMContainer reservedContainer = node.getReservedContainer();
-      if (reservedContainer != null) {
+        while (canAllocateMore(assignment, offswitchCount)) {
+          // Try to see if it is possible to allocate multiple container for
+          // the same node heartbeat
+          assignment = allocateContainersToNode(ps, true);
 
-        FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer(
-            reservedContainer.getContainerId());
-
-        // Try to fulfill the reservation
-        LOG.info("Trying to fulfill reservation for application "
-            + reservedApplication.getApplicationId() + " on node: " + node
-            .getNodeID());
-
-        LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
-        assignment = queue.assignContainers(getClusterResource(), node,
-            // TODO, now we only consider limits for parent for non-labeled
-            // resources, should consider labeled resources as well.
-            new ResourceLimits(labelManager
-                .getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
-                    getClusterResource())),
-            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-        if (assignment.isFulfilledReservation()) {
-          CSAssignment tmp = new CSAssignment(
-              reservedContainer.getReservedResource(), assignment.getType());
-          Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
-              reservedContainer.getReservedResource());
-          tmp.getAssignmentInformation().addAllocationDetails(
-              reservedContainer.getContainerId(), queue.getQueuePath());
-          tmp.getAssignmentInformation().incrAllocations();
-          updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
-          schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
-
-          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-              queue.getParent().getQueueName(), queue.getQueueName(),
-              ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
-          
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
-              node, reservedContainer.getContainerId(),
-              AllocationState.ALLOCATED_FROM_RESERVED);
-        } else{
-          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-              queue.getParent().getQueueName(), queue.getQueueName(),
-              ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
-          
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
-              node, reservedContainer.getContainerId(),
-              AllocationState.SKIPPED);
+          if (null != assignment
+              && assignment.getType() == NodeType.OFF_SWITCH) {
+            offswitchCount++;
+          }
         }
-      }
-
-      // Try to schedule more if there are no reservations to fulfill
-      if (node.getReservedContainer() == null) {
-        if (calculator.computeAvailableContainers(Resources
-            .add(node.getUnallocatedResource(),
-                node.getTotalKillableResources()), minimumAllocation) > 0) {
 
+        if (offswitchCount >= offswitchPerHeartbeatLimit) {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Trying to schedule on node: " + node.getNodeName()
-                + ", available: " + node.getUnallocatedResource());
+            LOG.debug("Assigned maximum number of off-switch containers: "
+                + offswitchCount + ", assignments so far: " + assignment);
           }
+        }
+      }
+    }
+  }
 
-          assignment = root.assignContainers(getClusterResource(), node,
-              new ResourceLimits(labelManager
-                  .getResourceByLabel(node.getPartition(),
-                      getClusterResource())),
-              SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-          if (Resources.greaterThan(calculator, getClusterResource(),
-              assignment.getResource(), Resources.none())) {
-            updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
-            return;
-          }
+  /*
+   * Logics of allocate container on a single node (Old behavior)
+   */
+  private CSAssignment 
allocateContainerOnSingleNode(PlacementSet<FiCaSchedulerNode> ps,
+      FiCaSchedulerNode node, boolean withNodeHeartbeat) {
+    // Backward compatible way to make sure previous behavior which allocation
+    // driven by node heartbeat works.
+    if (getNode(node.getNodeID()) != node) {
+      LOG.error("Trying to schedule on a removed node, please double check.");
+      return null;
+    }
 
-          // Only do non-exclusive allocation when node has node-labels.
-          if (StringUtils.equals(node.getPartition(),
-              RMNodeLabelsManager.NO_LABEL)) {
-            return;
-          }
+    CSAssignment assignment;
 
-          // Only do non-exclusive allocation when the node-label supports that
-          try {
-            if (rmContext.getNodeLabelManager().isExclusiveNodeLabel(
-                node.getPartition())) {
-              return;
-            }
-          } catch (IOException e) {
-            LOG.warn(
-                "Exception when trying to get exclusivity of node label=" + 
node
-                    .getPartition(), e);
-            return;
-          }
+    // Assign new containers...
+    // 1. Check for reserved applications
+    // 2. Schedule if there are no reservations
+    RMContainer reservedContainer = node.getReservedContainer();
+    if (reservedContainer != null) {
+      FiCaSchedulerApp reservedApplication = getCurrentAttemptForContainer(
+          reservedContainer.getContainerId());
 
-          // Try to use NON_EXCLUSIVE
-          assignment = root.assignContainers(getClusterResource(), node,
-              // TODO, now we only consider limits for parent for non-labeled
-              // resources, should consider labeled resources as well.
-              new ResourceLimits(labelManager
-                  .getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
-                      getClusterResource())),
-              SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
-          updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
+      // Try to fulfill the reservation
+      LOG.info(
+          "Trying to fulfill reservation for application " + 
reservedApplication
+              .getApplicationId() + " on node: " + node.getNodeID());
+
+      LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
+      assignment = queue.assignContainers(getClusterResource(), ps,
+          // TODO, now we only consider limits for parent for non-labeled
+          // resources, should consider labeled resources as well.
+          new ResourceLimits(labelManager
+              .getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
+                  getClusterResource())),
+          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+
+      if (assignment.isFulfilledReservation()) {
+        if (withNodeHeartbeat) {
+          // Only update SchedulerHealth in sync scheduling, existing
+          // Data structure of SchedulerHealth need to be updated for
+          // Async mode
+          updateSchedulerHealth(lastNodeUpdateTime, node.getNodeID(),
+              assignment);
         }
+
+        schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
+
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            queue.getParent().getQueueName(), queue.getQueueName(),
+            ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+        ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
+            node, reservedContainer.getContainerId(),
+            AllocationState.ALLOCATED_FROM_RESERVED);
       } else{
-        LOG.info("Skipping scheduling since node " + node.getNodeID()
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            queue.getParent().getQueueName(), queue.getQueueName(),
+            ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+        ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
+            node, reservedContainer.getContainerId(), AllocationState.SKIPPED);
+      }
+
+      assignment.setSchedulingMode(
+          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+      submitResourceCommitRequest(getClusterResource(), assignment);
+    }
+
+    // Do not schedule if there are any reservations to fulfill on the node
+    if (node.getReservedContainer() != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skipping scheduling since node " + node.getNodeID()
             + " is reserved by application " + node.getReservedContainer()
             .getContainerId().getApplicationAttemptId());
       }
-    } finally {
-      writeLock.unlock();
+      return null;
+    }
+
+    // First check if we can schedule
+    // When this time look at one node only, try schedule if the node
+    // has any available or killable resource
+    if (calculator.computeAvailableContainers(Resources
+            .add(node.getUnallocatedResource(), 
node.getTotalKillableResources()),
+        minimumAllocation) <= 0) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("This node or this node partition doesn't have available or"
+            + "killable resource");
+      }
+      return null;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Trying to schedule on node: " + node.getNodeName() + ", available: "
+              + node.getUnallocatedResource());
+    }
+
+    return allocateOrReserveNewContainers(ps, withNodeHeartbeat);
+  }
+
+  private CSAssignment allocateOrReserveNewContainers(
+      PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
+    CSAssignment assignment = root.assignContainers(getClusterResource(), ps,
+        new ResourceLimits(labelManager
+            .getResourceByLabel(ps.getPartition(), getClusterResource())),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+
+    assignment.setSchedulingMode(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    submitResourceCommitRequest(getClusterResource(), assignment);
+
+    if (Resources.greaterThan(calculator, getClusterResource(),
+        assignment.getResource(), Resources.none())) {
+      if (withNodeHeartbeat) {
+        updateSchedulerHealth(lastNodeUpdateTime,
+            PlacementSetUtils.getSingleNode(ps).getNodeID(), assignment);
+      }
+      return assignment;
+    }
+
+    // Only do non-exclusive allocation when node has node-labels.
+    if (StringUtils.equals(ps.getPartition(), RMNodeLabelsManager.NO_LABEL)) {
+      return null;
+    }
+
+    // Only do non-exclusive allocation when the node-label supports that
+    try {
+      if (rmContext.getNodeLabelManager().isExclusiveNodeLabel(
+          ps.getPartition())) {
+        return null;
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception when trying to get exclusivity of node label=" + ps
+          .getPartition(), e);
+      return null;
+    }
+
+    // Try to use NON_EXCLUSIVE
+    assignment = root.assignContainers(getClusterResource(), ps,
+        // TODO, now we only consider limits for parent for non-labeled
+        // resources, should consider labeled resources as well.
+        new ResourceLimits(labelManager
+            .getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
+                getClusterResource())),
+        SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
+    assignment.setSchedulingMode(SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
+    submitResourceCommitRequest(getClusterResource(), assignment);
+
+    return assignment;
+  }
+
+  /*
+   * New behavior, allocate containers considering multiple nodes
+   */
+  private CSAssignment allocateContainersOnMultiNodes(
+      PlacementSet<FiCaSchedulerNode> ps) {
+    // When this time look at multiple nodes, try schedule if the
+    // partition has any available resource or killable resource
+    if (root.getQueueCapacities().getUsedCapacity(ps.getPartition()) >= 1.0f
+        && preemptionManager.getKillableResource(
+        CapacitySchedulerConfiguration.ROOT, ps.getPartition()) == Resources
+        .none()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("This node or this node partition doesn't have available or"
+            + "killable resource");
+      }
+      return null;
+    }
+
+    return allocateOrReserveNewContainers(ps, false);
+  }
+
+  @VisibleForTesting
+  CSAssignment allocateContainersToNode(PlacementSet<FiCaSchedulerNode> ps,
+      boolean withNodeHeartbeat) {
+    if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext
+        .isSchedulerReadyForAllocatingContainers()) {
+      return null;
+    }
+
+    // Backward compatible way to make sure previous behavior which allocation
+    // driven by node heartbeat works.
+    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+
+    // We have two different logics to handle allocation on single node / multi
+    // nodes.
+    if (null != node) {
+      return allocateContainerOnSingleNode(ps, node, withNodeHeartbeat);
+    } else {
+      return allocateContainersOnMultiNodes(ps);
     }
   }
 
@@ -1357,7 +1575,7 @@ public class CapacityScheduler extends
     break;
     case NODE_RESOURCE_UPDATE:
     {
-      NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent = 
+      NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
           (NodeResourceUpdateSchedulerEvent)event;
       updateNodeAndQueueResource(nodeResourceUpdatedEvent.getRMNode(),
         nodeResourceUpdatedEvent.getResourceOption());
@@ -1367,7 +1585,7 @@ public class CapacityScheduler extends
     {
       NodeLabelsUpdateSchedulerEvent labelUpdateEvent =
           (NodeLabelsUpdateSchedulerEvent) event;
-
+      
       updateNodeLabelsAndQueueResource(labelUpdateEvent);
     }
     break;
@@ -1421,7 +1639,7 @@ public class CapacityScheduler extends
     break;
     case CONTAINER_EXPIRED:
     {
-      ContainerExpiredSchedulerEvent containerExpiredEvent = 
+      ContainerExpiredSchedulerEvent containerExpiredEvent =
           (ContainerExpiredSchedulerEvent) event;
       ContainerId containerId = containerExpiredEvent.getContainerId();
       if (containerExpiredEvent.isIncrease()) {
@@ -1516,7 +1734,9 @@ public class CapacityScheduler extends
               + clusterResource);
 
       if (scheduleAsynchronously && getNumClusterNodes() == 1) {
-        asyncSchedulerThread.beginSchedule();
+        for (AsyncScheduleThread t : asyncSchedulerThreads) {
+          t.beginSchedule();
+        }
       }
     } finally {
       writeLock.unlock();
@@ -1562,7 +1782,9 @@ public class CapacityScheduler extends
       int numNodes = nodeTracker.nodeCount();
 
       if (scheduleAsynchronously && numNodes == 0) {
-        asyncSchedulerThread.suspendSchedule();
+        for (AsyncScheduleThread t : asyncSchedulerThreads) {
+          t.suspendSchedule();
+        }
       }
 
       LOG.info(
@@ -1630,7 +1852,7 @@ public class CapacityScheduler extends
     queue.completedContainer(getClusterResource(), application, node,
         rmContainer, containerStatus, event, null, true);
   }
-  
+
   @Override
   protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest,
       SchedulerApplicationAttempt attempt) {
@@ -1674,7 +1896,7 @@ public class CapacityScheduler extends
   public List<FiCaSchedulerNode> getAllNodes() {
     return nodeTracker.getAllNodes();
   }
-  
+
   @Override
   @Lock(Lock.NoLock.class)
   public void recover(RMState state) throws Exception {
@@ -2086,7 +2308,7 @@ public class CapacityScheduler extends
     }
     return EnumSet.of(SchedulerResourceTypes.MEMORY, 
SchedulerResourceTypes.CPU);
   }
-  
+
   @Override
   public Resource getMaximumResourceCapability(String queueName) {
     CSQueue queue = getQueue(queueName);
@@ -2224,4 +2446,207 @@ public class CapacityScheduler extends
   public ResourceUsage getClusterResourceUsage() {
     return root.getQueueResourceUsage();
   }
+
+  private SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> 
getSchedulerContainer(
+      RMContainer rmContainer, boolean allocated) {
+    if (null == rmContainer) {
+      return null;
+    }
+
+    FiCaSchedulerApp app = getApplicationAttempt(
+        rmContainer.getApplicationAttemptId());
+    if (null == app) { return null; }
+
+    NodeId nodeId;
+    // Get nodeId
+    if (rmContainer.getState() == RMContainerState.RESERVED) {
+      nodeId = rmContainer.getReservedNode();
+    } else {
+      nodeId = rmContainer.getNodeId();
+    }
+
+    FiCaSchedulerNode node = getNode(nodeId);
+    if (null == node) {
+      return null;
+    }
+    return new SchedulerContainer<>(app, node, rmContainer,
+        // TODO, node partition should come from CSAssignment to avoid 
partition
+        // get updated before submitting the commit
+        node.getPartition(), allocated);
+  }
+
+  private List<SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>>
+      getSchedulerContainersToRelease(
+      CSAssignment csAssignment) {
+    List<SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>> list = null;
+
+    if (csAssignment.getContainersToKill() != null && !csAssignment
+        .getContainersToKill().isEmpty()) {
+      list = new ArrayList<>();
+      for (RMContainer rmContainer : csAssignment.getContainersToKill()) {
+        list.add(getSchedulerContainer(rmContainer, false));
+      }
+    }
+
+    if (csAssignment.getExcessReservation() != null) {
+      if (null == list) {
+        list = new ArrayList<>();
+      }
+      list.add(
+          getSchedulerContainer(csAssignment.getExcessReservation(), false));
+    }
+
+    return list;
+  }
+
+  @VisibleForTesting
+  public void submitResourceCommitRequest(Resource cluster,
+      CSAssignment csAssignment) {
+    ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
+        createResourceCommitRequest(csAssignment);
+
+    if (null == request) {
+      return;
+    }
+
+    if (scheduleAsynchronously) {
+      // Submit to a commit thread and commit it async-ly
+      resourceCommitterService.addNewCommitRequest(request);
+    } else{
+      // Otherwise do it sync-ly.
+      tryCommit(cluster, request);
+    }
+  }
+
+  @VisibleForTesting
+  public ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>
+      createResourceCommitRequest(CSAssignment csAssignment) {
+    ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocated 
=
+        null;
+    ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> reserved =
+        null;
+    List<SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>> released =
+        null;
+
+    if (Resources.greaterThan(calculator, getClusterResource(),
+        csAssignment.getResource(), Resources.none())) {
+      // Allocated something
+      List<AssignmentInformation.AssignmentDetails> allocations =
+          csAssignment.getAssignmentInformation().getAllocationDetails();
+      if (!allocations.isEmpty()) {
+        RMContainer rmContainer = allocations.get(0).rmContainer;
+        allocated = new ContainerAllocationProposal<>(
+            getSchedulerContainer(rmContainer, true),
+            getSchedulerContainersToRelease(csAssignment),
+            getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
+                false), csAssignment.isIncreasedAllocation(),
+            csAssignment.getType(), csAssignment.getRequestLocalityType(),
+            csAssignment.getSchedulingMode() != null ?
+                csAssignment.getSchedulingMode() :
+                SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
+            csAssignment.getResource());
+      }
+
+      // Reserved something
+      List<AssignmentInformation.AssignmentDetails> reservation =
+          csAssignment.getAssignmentInformation().getReservationDetails();
+      if (!reservation.isEmpty()) {
+        RMContainer rmContainer = reservation.get(0).rmContainer;
+        reserved = new ContainerAllocationProposal<>(
+            getSchedulerContainer(rmContainer, false),
+            getSchedulerContainersToRelease(csAssignment),
+            getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
+                false), csAssignment.isIncreasedAllocation(),
+            csAssignment.getType(), csAssignment.getRequestLocalityType(),
+            csAssignment.getSchedulingMode() != null ?
+                csAssignment.getSchedulingMode() :
+                SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
+            csAssignment.getResource());
+      }
+    }
+
+    // When we don't need to allocate/reserve anything, we can feel free to
+    // kill all to-release containers in the request.
+    if (null == allocated && null == reserved) {
+      released = getSchedulerContainersToRelease(csAssignment);
+    }
+
+    if (null != allocated || null != reserved || (null != released && !released
+        .isEmpty())) {
+      List<ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>>
+          allocationsList = null;
+      if (allocated != null) {
+        allocationsList = new ArrayList<>();
+        allocationsList.add(allocated);
+      }
+
+      List<ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>>
+          reservationsList = null;
+      if (reserved != null) {
+        reservationsList = new ArrayList<>();
+        reservationsList.add(reserved);
+      }
+
+      return new ResourceCommitRequest<>(allocationsList, reservationsList,
+          released);
+    }
+
+    return null;
+  }
+
+  @Override
+  public void tryCommit(Resource cluster, ResourceCommitRequest r) {
+    ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
+        (ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>) r;
+
+    ApplicationAttemptId attemptId = null;
+
+    // We need to update unconfirmed allocated resource of application when
+    // any container allocated.
+    boolean updateUnconfirmedAllocatedResource =
+        request.getContainersToAllocate() != null && !request
+            .getContainersToAllocate().isEmpty();
+
+    // find the application to accept and apply the ResourceCommitRequest
+    if (request.anythingAllocatedOrReserved()) {
+      ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> c =
+          request.getFirstAllocatedOrReservedContainer();
+      attemptId =
+          c.getAllocatedOrReservedContainer().getSchedulerApplicationAttempt()
+              .getApplicationAttemptId();
+    } else {
+      if (!request.getContainersToRelease().isEmpty()) {
+        attemptId = request.getContainersToRelease().get(0)
+            .getSchedulerApplicationAttempt().getApplicationAttemptId();
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Try to commit allocation proposal=" + request);
+    }
+
+    if (attemptId != null) {
+      FiCaSchedulerApp app = getApplicationAttempt(attemptId);
+      if (app != null) {
+        if (app.accept(cluster, request)) {
+          app.apply(cluster, request);
+          LOG.info("Allocation proposal accepted");
+        } else{
+          LOG.info("Failed to accept allocation proposal");
+        }
+
+        // Update unconfirmed allocated resource.
+        if (updateUnconfirmedAllocatedResource) {
+          app.decUnconfirmedRes(request.getTotalAllocatedResource());
+        }
+      }
+    }
+  }
+
+  public int getAsyncSchedulingPendingBacklogs() {
+    if (scheduleAsynchronously) {
+      return resourceCommitterService.getPendingBacklogs();
+    }
+    return 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6cdcab90/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index cea5aa4..c153c26 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -209,6 +209,10 @@ public class CapacitySchedulerConfiguration extends 
ReservationSchedulerConfigur
       SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".enable";
 
   @Private
+  public static final String SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD =
+      SCHEDULE_ASYNCHRONOUSLY_PREFIX + ".maximum-threads";
+
+  @Private
   public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = false;
 
   @Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6cdcab90/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 83396f3..5afc19f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -63,8 +63,13 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.Activi
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
 import org.apache.hadoop.yarn.server.utils.Lock;
@@ -923,54 +928,6 @@ public class LeafQueue extends AbstractCSQueue {
       ApplicationAttemptId applicationAttemptId) {
     return applicationAttemptMap.get(applicationAttemptId);
   }
-  
-  private void handleExcessReservedContainer(Resource clusterResource,
-      CSAssignment assignment, FiCaSchedulerNode node, FiCaSchedulerApp app) {
-    if (assignment.getExcessReservation() != null) {
-      RMContainer excessReservedContainer = assignment.getExcessReservation();
-      
-      if (excessReservedContainer.hasIncreaseReservation()) {
-        unreserveIncreasedContainer(clusterResource,
-            app, node, excessReservedContainer);
-      } else {
-        completedContainer(clusterResource, assignment.getApplication(),
-            scheduler.getNode(excessReservedContainer.getAllocatedNode()),
-            excessReservedContainer,
-            SchedulerUtils.createAbnormalContainerStatus(
-                excessReservedContainer.getContainerId(),
-                SchedulerUtils.UNRESERVED_CONTAINER),
-            RMContainerEventType.RELEASED, null, false);
-      }
-
-      assignment.setExcessReservation(null);
-    }
-  }
-
-  private void killToPreemptContainers(Resource clusterResource,
-      FiCaSchedulerNode node,
-      CSAssignment assignment) {
-    if (assignment.getContainersToKill() != null) {
-      StringBuilder sb = new StringBuilder("Killing containers: [");
-
-      for (RMContainer c : assignment.getContainersToKill()) {
-        FiCaSchedulerApp application = csContext.getApplicationAttempt(
-            c.getApplicationAttemptId());
-        LeafQueue q = application.getCSLeafQueue();
-        q.completedContainer(clusterResource, application, node, c, 
SchedulerUtils
-                .createPreemptedContainerStatus(c.getContainerId(),
-                    SchedulerUtils.PREEMPTED_CONTAINER), 
RMContainerEventType.KILL,
-            null, false);
-        sb.append("(container=" + c.getContainerId() + " resource=" + c
-            .getAllocatedResource() + ")");
-      }
-
-      sb.append("] for container=" + assignment.getAssignmentInformation()
-          .getFirstAllocatedOrReservedContainerId() + " resource=" + assignment
-          .getResource());
-      LOG.info(sb.toString());
-
-    }
-  }
 
   private void setPreemptionAllowed(ResourceLimits limits, String 
nodePartition) {
     // Set preemption-allowed:
@@ -980,174 +937,312 @@ public class LeafQueue extends AbstractCSQueue {
     limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
   }
 
+  private CSAssignment allocateFromReservedContainer(
+      Resource clusterResource, PlacementSet<FiCaSchedulerNode> ps,
+      ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
+    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
+    if (null == node) {
+      return null;
+    }
+
+    RMContainer reservedContainer = node.getReservedContainer();
+    if (reservedContainer != null) {
+      FiCaSchedulerApp application = getApplication(
+          reservedContainer.getApplicationAttemptId());
+
+      if (null != application) {
+        ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
+            node.getNodeID(), SystemClock.getInstance().getTime(), 
application);
+        CSAssignment assignment = application.assignContainers(clusterResource,
+            ps, currentResourceLimits, schedulingMode, reservedContainer);
+        return assignment;
+      }
+    }
+
+    return null;
+  }
+
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
-      SchedulingMode schedulingMode) {
-    try {
-      writeLock.lock();
-      updateCurrentResourceLimits(currentResourceLimits, clusterResource);
+      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits currentResourceLimits,
+    SchedulingMode schedulingMode) {
+    updateCurrentResourceLimits(currentResourceLimits, clusterResource);
+    FiCaSchedulerNode node = PlacementSetUtils.getSingleNode(ps);
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(
-            "assignContainers: node=" + node.getNodeName() + " #applications="
-                + orderingPolicy.getNumSchedulableEntities());
-      }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("assignContainers: partition=" + ps.getPartition()
+          + " #applications=" + orderingPolicy.getNumSchedulableEntities());
+    }
 
-      setPreemptionAllowed(currentResourceLimits, node.getPartition());
+    setPreemptionAllowed(currentResourceLimits, ps.getPartition());
 
-      // Check for reserved resources
-      RMContainer reservedContainer = node.getReservedContainer();
-      if (reservedContainer != null) {
-        FiCaSchedulerApp application = getApplication(
-            reservedContainer.getApplicationAttemptId());
+    // Check for reserved resources, try to allocate reserved container first.
+    CSAssignment assignment = allocateFromReservedContainer(clusterResource,
+        ps, currentResourceLimits, schedulingMode);
+    if (null != assignment) {
+      return assignment;
+    }
 
-        ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
-            node.getNodeID(), SystemClock.getInstance().getTime(), 
application);
+    // if our queue cannot access this node, just return
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
+        && !accessibleToPartition(ps.getPartition())) {
+      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+          getParent().getQueueName(), getQueueName(), ActivityState.REJECTED,
+          ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + ps
+              .getPartition());
+      return CSAssignment.NULL_ASSIGNMENT;
+    }
 
-        CSAssignment assignment = application.assignContainers(clusterResource,
-            node, currentResourceLimits, schedulingMode, reservedContainer);
-        handleExcessReservedContainer(clusterResource, assignment, node,
-            application);
-        killToPreemptContainers(clusterResource, node, assignment);
-        return assignment;
+    // Check if this queue need more resource, simply skip allocation if this
+    // queue doesn't need more resources.
+    if (!hasPendingResourceRequest(ps.getPartition(), clusterResource,
+        schedulingMode)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skip this queue=" + getQueuePath()
+            + ", because it doesn't need more resource, schedulingMode="
+            + schedulingMode.name() + " node-partition=" + ps.getPartition());
       }
+      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+          getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+          ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
+      return CSAssignment.NULL_ASSIGNMENT;
+    }
 
-      // if our queue cannot access this node, just return
-      if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
-          && !accessibleToPartition(node.getPartition())) {
+    for (Iterator<FiCaSchedulerApp> assignmentIterator =
+         orderingPolicy.getAssignmentIterator();
+         assignmentIterator.hasNext(); ) {
+      FiCaSchedulerApp application = assignmentIterator.next();
+
+      ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
+          node.getNodeID(), SystemClock.getInstance().getTime(), application);
+
+      // Check queue max-capacity limit
+      if (!super.canAssignToThisQueue(clusterResource, ps.getPartition(),
+          currentResourceLimits, application.getCurrentReservation(),
+          schedulingMode)) {
+        ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
+            activitiesManager, node, application, application.getPriority(),
+            ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
         ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-            getParent().getQueueName(), getQueueName(), ActivityState.REJECTED,
-            ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
-                .getPartition());
+            getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+            ActivityDiagnosticConstant.EMPTY);
         return CSAssignment.NULL_ASSIGNMENT;
       }
 
-      // Check if this queue need more resource, simply skip allocation if this
-      // queue doesn't need more resources.
-      if (!hasPendingResourceRequest(node.getPartition(), clusterResource,
-          schedulingMode)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Skip this queue=" + getQueuePath()
-              + ", because it doesn't need more resource, schedulingMode="
-              + schedulingMode.name() + " node-partition=" + node
-              .getPartition());
-        }
+      Resource userLimit = computeUserLimitAndSetHeadroom(application,
+          clusterResource, ps.getPartition(), schedulingMode);
+
+      // Check user limit
+      if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
+          application, ps.getPartition(), currentResourceLimits)) {
+        application.updateAMContainerDiagnostics(AMState.ACTIVATED,
+            "User capacity has reached its maximum limit.");
+        ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
+            activitiesManager, node, application, application.getPriority(),
+            ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT);
+        continue;
+      }
+
+      // Try to schedule
+      assignment = application.assignContainers(clusterResource,
+          ps, currentResourceLimits, schedulingMode, null);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("post-assignContainers for application " + application
+            .getApplicationId());
+        application.showRequests();
+      }
+
+      // Did we schedule or reserve a container?
+      Resource assigned = assignment.getResource();
+
+      if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
+          Resources.none())) {
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+            getParent().getQueueName(), getQueueName(),
+            ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+        return assignment;
+      } else if (assignment.getSkippedType()
+          == CSAssignment.SkippedType.OTHER) {
+        ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
+            activitiesManager, application.getApplicationId(),
+            ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
+        application.updateNodeInfoForAMDiagnostics(node);
+      } else if (assignment.getSkippedType()
+          == CSAssignment.SkippedType.QUEUE_LIMIT) {
+        return assignment;
+      } else{
+        // If we don't allocate anything, and it is not skipped by application,
+        // we will return to respect FIFO of applications
         ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
             getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
-            ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
+            ActivityDiagnosticConstant.RESPECT_FIFO);
+        ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
+            activitiesManager, application.getApplicationId(),
+            ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
         return CSAssignment.NULL_ASSIGNMENT;
       }
+    }
+    ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
+        getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
+        ActivityDiagnosticConstant.EMPTY);
 
-      for (Iterator<FiCaSchedulerApp> assignmentIterator =
-           orderingPolicy.getAssignmentIterator();
-           assignmentIterator.hasNext(); ) {
-        FiCaSchedulerApp application = assignmentIterator.next();
-
-        ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
-            node.getNodeID(), SystemClock.getInstance().getTime(), 
application);
+    return CSAssignment.NULL_ASSIGNMENT;
+  }
 
-        // Check queue max-capacity limit
-        if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
-            currentResourceLimits, application.getCurrentReservation(),
-            schedulingMode)) {
-          ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
-              activitiesManager, node, application, application.getPriority(),
-              ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
-          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-              getParent().getQueueName(), getQueueName(), 
ActivityState.SKIPPED,
-              ActivityDiagnosticConstant.EMPTY);
-          return CSAssignment.NULL_ASSIGNMENT;
+  @Override
+  public boolean accept(Resource cluster,
+      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
+    ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> 
allocation =
+        request.getFirstAllocatedOrReservedContainer();
+    SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer 
=
+        allocation.getAllocatedOrReservedContainer();
+
+    // Do not check limits when allocation from a reserved container
+    if (allocation.getAllocateFromReservedContainer() == null) {
+      try {
+        readLock.lock();
+        FiCaSchedulerApp app =
+            schedulerContainer.getSchedulerApplicationAttempt();
+        String username = app.getUser();
+        String p = schedulerContainer.getNodePartition();
+
+        // check user-limit
+        Resource userLimit = computeUserLimitAndSetHeadroom(app, cluster, p,
+            allocation.getSchedulingMode());
+
+        // Deduct resources that we can release
+        Resource usedResource = Resources.clone(getUser(username).getUsed(p));
+        Resources.subtractFrom(usedResource,
+            request.getTotalReleasedResource());
+
+        if (Resources.greaterThan(resourceCalculator, cluster, usedResource,
+            userLimit)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Used resource=" + usedResource + " exceeded user-limit="
+                + userLimit);
+          }
+          return false;
         }
+      } finally {
+        readLock.unlock();
+      }
+    }
 
-        Resource userLimit = computeUserLimitAndSetHeadroom(application,
-            clusterResource, node.getPartition(), schedulingMode);
-
-        // Check user limit
-        if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
-            application, node.getPartition(), currentResourceLimits)) {
-          application.updateAMContainerDiagnostics(AMState.ACTIVATED,
-              "User capacity has reached its maximum limit.");
-          ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
-              activitiesManager, node, application, application.getPriority(),
-              ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT);
-          continue;
-        }
+    return super.accept(cluster, request);
+  }
 
-        // Try to schedule
-        CSAssignment assignment = application.assignContainers(clusterResource,
-            node, currentResourceLimits, schedulingMode, null);
+  private void internalReleaseContainer(Resource clusterResource,
+      SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> 
schedulerContainer) {
+    RMContainer rmContainer = schedulerContainer.getRmContainer();
 
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("post-assignContainers for application " + application
-              .getApplicationId());
-          application.showRequests();
+    LeafQueue targetLeafQueue =
+        schedulerContainer.getSchedulerApplicationAttempt().getCSLeafQueue();
+
+    if (targetLeafQueue == this) {
+      // When trying to preempt containers from the same queue
+      if (rmContainer.hasIncreaseReservation()) {
+        // Increased container reservation
+        unreserveIncreasedContainer(clusterResource,
+            schedulerContainer.getSchedulerApplicationAttempt(),
+            schedulerContainer.getSchedulerNode(), rmContainer);
+      } else if (rmContainer.getState() == RMContainerState.RESERVED) {
+        // For other reserved containers
+        // This is a reservation exchange, complete previous reserved container
+        completedContainer(clusterResource,
+            schedulerContainer.getSchedulerApplicationAttempt(),
+            schedulerContainer.getSchedulerNode(), rmContainer, SchedulerUtils
+                .createAbnormalContainerStatus(rmContainer.getContainerId(),
+                    SchedulerUtils.UNRESERVED_CONTAINER),
+            RMContainerEventType.RELEASED, null, false);
+      }
+    } else{
+      // When trying to preempt containers from different queue -- this
+      // is for lazy preemption feature (kill preemption candidate in 
scheduling
+      // cycle).
+      targetLeafQueue.completedContainer(clusterResource,
+          schedulerContainer.getSchedulerApplicationAttempt(),
+          schedulerContainer.getSchedulerNode(),
+          schedulerContainer.getRmContainer(), SchedulerUtils
+              .createPreemptedContainerStatus(rmContainer.getContainerId(),
+                  SchedulerUtils.PREEMPTED_CONTAINER),
+          RMContainerEventType.KILL, null, false);
+    }
+  }
+
+  private void releaseContainers(Resource clusterResource,
+      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
+    for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : request
+        .getContainersToRelease()) {
+      internalReleaseContainer(clusterResource, c);
+    }
+
+    // Handle container reservation looking, or lazy preemption case:
+    if (null != request.getContainersToAllocate() && !request
+        .getContainersToAllocate().isEmpty()) {
+      for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> 
context : request
+          .getContainersToAllocate()) {
+        if (null != context.getToRelease()) {
+          for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : 
context
+              .getToRelease()) {
+            internalReleaseContainer(clusterResource, c);
+          }
         }
+      }
+    }
+  }
 
-        // Did we schedule or reserve a container?
-        Resource assigned = assignment.getResource();
+  public void apply(Resource cluster,
+      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
+    // Do we need to call parent queue's apply?
+    boolean applyToParentQueue = false;
 
-        handleExcessReservedContainer(clusterResource, assignment, node,
-            application);
-        killToPreemptContainers(clusterResource, node, assignment);
+    releaseContainers(cluster, request);
 
-        if (Resources.greaterThan(resourceCalculator, clusterResource, 
assigned,
-            Resources.none())) {
-          // Get reserved or allocated container from application
-          RMContainer reservedOrAllocatedRMContainer =
-              application.getRMContainer(assignment.getAssignmentInformation()
-                  .getFirstAllocatedOrReservedContainerId());
+    try {
+      writeLock.lock();
 
+      if (request.anythingAllocatedOrReserved()) {
+        ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
+            allocation = request.getFirstAllocatedOrReservedContainer();
+        SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
+            schedulerContainer = allocation.getAllocatedOrReservedContainer();
+
+        // Do not modify queue when allocation from reserved container
+        if (allocation.getAllocateFromReservedContainer() == null) {
+          // Only invoke apply() of ParentQueue when new allocation /
+          // reservation happen.
+          applyToParentQueue = true;
           // Book-keeping
           // Note: Update headroom to account for current allocation too...
-          allocateResource(clusterResource, application, assigned,
-              node.getPartition(), reservedOrAllocatedRMContainer,
-              assignment.isIncreasedAllocation());
-
-          // Update reserved metrics
-          Resource reservedRes =
-              assignment.getAssignmentInformation().getReserved();
-          if (reservedRes != null && !reservedRes.equals(Resources.none())) {
-            incReservedResource(node.getPartition(), reservedRes);
-          }
+          allocateResource(cluster,
+              schedulerContainer.getSchedulerApplicationAttempt(),
+              allocation.getAllocatedOrReservedResource(),
+              schedulerContainer.getNodePartition(),
+              schedulerContainer.getRmContainer(),
+              allocation.isIncreasedAllocation());
+          orderingPolicy.containerAllocated(
+              schedulerContainer.getSchedulerApplicationAttempt(),
+              schedulerContainer.getRmContainer());
+        }
 
-          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-              getParent().getQueueName(), getQueueName(),
-              ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
-
-          // Done
-          return assignment;
-        } else if (assignment.getSkippedType()
-            == CSAssignment.SkippedType.OTHER) {
-          ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
-              activitiesManager, application.getApplicationId(),
-              ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
-          application.updateNodeInfoForAMDiagnostics(node);
-        } else if (assignment.getSkippedType()
-            == CSAssignment.SkippedType.QUEUE_LIMIT) {
-          return assignment;
-        } else{
-          // If we don't allocate anything, and it is not skipped by 
application,
-          // we will return to respect FIFO of applications
-          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-              getParent().getQueueName(), getQueueName(), 
ActivityState.SKIPPED,
-              ActivityDiagnosticConstant.RESPECT_FIFO);
-          ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
-              activitiesManager, application.getApplicationId(),
-              ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
-          return CSAssignment.NULL_ASSIGNMENT;
+        // Update reserved resource
+        if (Resources.greaterThan(resourceCalculator, cluster,
+            request.getTotalReservedResource(), Resources.none())) {
+          incReservedResource(schedulerContainer.getNodePartition(),
+              request.getTotalReservedResource());
         }
       }
-      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-          getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
-          ActivityDiagnosticConstant.EMPTY);
-
-      return CSAssignment.NULL_ASSIGNMENT;
     } finally {
       writeLock.unlock();
     }
+
+    if (parent != null && applyToParentQueue) {
+      parent.apply(cluster, request);
+    }
   }
 
+
   protected Resource getHeadroom(User user, Resource queueCurrentLimit,
       Resource clusterResource, FiCaSchedulerApp application) {
     return getHeadroom(user, queueCurrentLimit, clusterResource, application,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to