YARN-5716. Add global scheduler interface definition and update 
CapacityScheduler to use it. Contributed by Wangda Tan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/de3b4aac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/de3b4aac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/de3b4aac

Branch: refs/heads/trunk
Commit: de3b4aac561258ad242a3c5ed1c919428893fd4c
Parents: acd509d
Author: Jian He <jia...@apache.org>
Authored: Mon Nov 7 10:14:39 2016 -0800
Committer: Jian He <jia...@apache.org>
Committed: Mon Nov 7 10:14:39 2016 -0800

----------------------------------------------------------------------
 .../dev-support/findbugs-exclude.xml            |   9 +
 .../rmcontainer/RMContainer.java                |  13 +
 .../rmcontainer/RMContainerImpl.java            |  78 +-
 .../scheduler/AppSchedulingInfo.java            | 168 +++-
 .../scheduler/SchedulerApplicationAttempt.java  |  73 +-
 .../scheduler/activities/ActivitiesLogger.java  |  17 +-
 .../scheduler/activities/ActivitiesManager.java |   7 +-
 .../scheduler/capacity/AbstractCSQueue.java     |  71 ++
 .../scheduler/capacity/CSAssignment.java        |  33 +
 .../scheduler/capacity/CSQueue.java             |  19 +-
 .../scheduler/capacity/CapacityScheduler.java   | 773 ++++++++++++++-----
 .../CapacitySchedulerConfiguration.java         |   4 +
 .../scheduler/capacity/LeafQueue.java           | 451 ++++++-----
 .../scheduler/capacity/ParentQueue.java         | 428 +++++-----
 .../allocator/AbstractContainerAllocator.java   |  39 +-
 .../capacity/allocator/ContainerAllocation.java |  12 +-
 .../capacity/allocator/ContainerAllocator.java  |  15 +-
 .../allocator/IncreaseContainerAllocator.java   |  89 +--
 .../allocator/RegularContainerAllocator.java    | 215 +++---
 .../scheduler/common/AssignmentInformation.java |  44 +-
 .../common/ContainerAllocationProposal.java     | 111 +++
 .../common/ResourceAllocationCommitter.java     |  29 +
 .../scheduler/common/ResourceCommitRequest.java | 164 ++++
 .../scheduler/common/SchedulerContainer.java    |  80 ++
 .../scheduler/common/fica/FiCaSchedulerApp.java | 624 ++++++++++++---
 .../scheduler/fifo/FifoAppAttempt.java          | 110 +++
 .../scheduler/fifo/FifoScheduler.java           |  55 +-
 .../scheduler/placement/PlacementSet.java       |  65 ++
 .../scheduler/placement/PlacementSetUtils.java  |  36 +
 .../placement/ResourceRequestUpdateResult.java  |  43 ++
 .../placement/SchedulingPlacementSet.java       |  90 +++
 .../scheduler/placement/SimplePlacementSet.java |  70 ++
 .../AbstractComparatorOrderingPolicy.java       |   4 +-
 .../scheduler/policy/FairOrderingPolicy.java    |   3 +-
 .../scheduler/policy/FifoOrderingPolicy.java    |   4 +-
 .../FifoOrderingPolicyForPendingApps.java       |   3 +-
 .../yarn/server/resourcemanager/MockRM.java     |  47 +-
 .../resourcemanager/TestClientRMService.java    |   2 +-
 .../scheduler/TestSchedulerHealth.java          |   6 +-
 .../capacity/TestCapacityScheduler.java         |  56 +-
 .../TestCapacitySchedulerAsyncScheduling.java   | 143 ++++
 .../scheduler/capacity/TestChildQueueOrder.java |  21 +-
 .../capacity/TestContainerAllocation.java       |  45 +-
 .../capacity/TestContainerResizing.java         |  10 +-
 .../scheduler/capacity/TestLeafQueue.java       | 647 +++++++++++-----
 .../scheduler/capacity/TestParentQueue.java     | 209 +++--
 .../scheduler/capacity/TestReservations.java    | 277 +++++--
 .../scheduler/capacity/TestUtils.java           |  26 +
 .../TestRMWebServicesSchedulerActivities.java   |   8 +-
 49 files changed, 4212 insertions(+), 1334 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml 
b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 01b1da7..ab36a4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -574,4 +574,13 @@
     </Or>
     <Bug pattern="VO_VOLATILE_INCREMENT" />
   </Match>
+
+
+  <!-- Ignore false alert for UL_UNRELEASED_LOCK_EXCEPTION_PATH -->
+  <Match>
+    <Class 
name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler$ResourceCommitterService"/>
+    <Method name="run" />
+    <Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
+  </Match>
+
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index e5d1208..a244ad8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -46,6 +47,8 @@ public interface RMContainer extends 
EventHandler<RMContainerEvent> {
 
   ContainerId getContainerId();
 
+  void setContainerId(ContainerId containerId);
+
   ApplicationAttemptId getApplicationAttemptId();
 
   RMContainerState getState();
@@ -105,4 +108,14 @@ public interface RMContainer extends 
EventHandler<RMContainerEvent> {
    * @return If the container was allocated remotely.
    */
   boolean isRemotelyAllocated();
+
+  /*
+   * Return reserved resource for reserved containers, return allocated 
resource
+   * for other container
+   */
+  Resource getAllocatedOrReservedResource();
+
+  boolean completed();
+
+  NodeId getNodeId();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 706821e..4294ef0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -161,7 +161,6 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
                                                  RMContainerEvent> 
stateMachine;
   private final ReadLock readLock;
   private final WriteLock writeLock;
-  private final ContainerId containerId;
   private final ApplicationAttemptId appAttemptId;
   private final NodeId nodeId;
   private final Container container;
@@ -224,7 +223,6 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
       RMContext rmContext, long creationTime, String nodeLabelExpression,
       boolean isExternallyAllocated) {
     this.stateMachine = stateMachineFactory.make(this);
-    this.containerId = container.getId();
     this.nodeId = nodeId;
     this.container = container;
     this.allocatedSchedulerKey = SchedulerRequestKey.extractFrom(container);
@@ -255,7 +253,7 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
     // containers. If false, and if this container is marked as the AM, metrics
     // will still be published for this container, but that calculation happens
     // later.
-    if (saveNonAMContainerMetaInfo) {
+    if (saveNonAMContainerMetaInfo && null != container.getId()) {
       rmContext.getSystemMetricsPublisher().containerCreated(
           this, this.creationTime);
     }
@@ -263,7 +261,7 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
 
   @Override
   public ContainerId getContainerId() {
-    return this.containerId;
+    return this.container.getId();
   }
 
   @Override
@@ -356,8 +354,8 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
   public String getDiagnosticsInfo() {
     try {
       readLock.lock();
-      if (getFinishedStatus() != null) {
-        return getFinishedStatus().getDiagnostics();
+      if (finishedStatus != null) {
+        return finishedStatus.getDiagnostics();
       } else {
         return null;
       }
@@ -374,7 +372,7 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
       logURL.append(WebAppUtils.getHttpSchemePrefix(rmContext
           .getYarnConfiguration()));
       logURL.append(WebAppUtils.getRunningLogURL(
-          container.getNodeHttpAddress(), containerId.toString(),
+          container.getNodeHttpAddress(), getContainerId().toString(),
           user));
       return logURL.toString();
     } finally {
@@ -386,8 +384,8 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
   public int getContainerExitStatus() {
     try {
       readLock.lock();
-      if (getFinishedStatus() != null) {
-        return getFinishedStatus().getExitStatus();
+      if (finishedStatus != null) {
+        return finishedStatus.getExitStatus();
       } else {
         return 0;
       }
@@ -400,8 +398,8 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
   public ContainerState getContainerState() {
     try {
       readLock.lock();
-      if (getFinishedStatus() != null) {
-        return getFinishedStatus().getState();
+      if (finishedStatus != null) {
+        return finishedStatus.getState();
       } else {
         return ContainerState.RUNNING;
       }
@@ -431,7 +429,7 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
 
   @Override
   public String toString() {
-    return containerId.toString();
+    return getContainerId().toString();
   }
   
   @Override
@@ -476,7 +474,7 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
       } catch (InvalidStateTransitionException e) {
         LOG.error("Can't handle this event at current state", e);
         LOG.error("Invalid event " + event.getType() + 
-            " on container " + this.containerId);
+            " on container " + this.getContainerId());
       }
       if (oldState != getState()) {
         LOG.info(event.getContainerId() + " Container Transitioned from "
@@ -489,10 +487,15 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
     }
   }
   
-  public ContainerStatus getFinishedStatus() {
-    return finishedStatus;
+  public boolean completed() {
+    return finishedStatus != null;
   }
-  
+
+  @Override
+  public NodeId getNodeId() {
+    return nodeId;
+  }
+
   private static class BaseTransition implements
       SingleArcTransition<RMContainerImpl, RMContainerEvent> {
 
@@ -517,7 +520,7 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
               report.getContainerExitStatus());
 
         new FinishedTransition().transition(container,
-          new RMContainerFinishedEvent(container.containerId, status,
+          new RMContainerFinishedEvent(container.getContainerId(), status,
             RMContainerEventType.FINISHED));
         return RMContainerState.COMPLETED;
       } else if (report.getContainerState().equals(ContainerState.RUNNING)) {
@@ -654,11 +657,11 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
       } else {
         // Something wrong happened, kill the container
         LOG.warn("Something wrong happened, container size reported by NM"
-            + " is not expected, ContainerID=" + container.containerId
+            + " is not expected, ContainerID=" + container.getContainerId()
             + " rm-size-resource:" + rmContainerResource + " nm-size-reosurce:"
             + nmContainerResource);
         container.eventHandler.handle(new RMNodeCleanContainerEvent(
-            container.nodeId, container.containerId));
+            container.nodeId, container.getContainerId()));
 
       }
     }
@@ -761,7 +764,7 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
 
       // Inform node
       container.eventHandler.handle(new RMNodeCleanContainerEvent(
-          container.nodeId, container.containerId));
+          container.nodeId, container.getContainerId()));
 
       // Inform appAttempt
       super.transition(container, event);
@@ -831,8 +834,8 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
 
   @Override
   public int compareTo(RMContainer o) {
-    if (containerId != null && o.getContainerId() != null) {
-      return containerId.compareTo(o.getContainerId());
+    if (getContainerId() != null && o.getContainerId() != null) {
+      return getContainerId().compareTo(o.getContainerId());
     }
     return -1;
   }
@@ -865,4 +868,35 @@ public class RMContainerImpl implements RMContainer, 
Comparable<RMContainer> {
   public boolean isRemotelyAllocated() {
     return isExternallyAllocated;
   }
+
+  @Override
+  public Resource getAllocatedOrReservedResource() {
+    try {
+      readLock.lock();
+      if (getState().equals(RMContainerState.RESERVED)) {
+        return getReservedResource();
+      } else {
+        return getAllocatedResource();
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void setContainerId(ContainerId containerId) {
+    // In some cases, for example, global scheduling. It is possible that
+    // container created without container-id assigned, so we will publish
+    // container creation event to timeline service when id assigned.
+    container.setId(containerId);
+
+    // If saveNonAMContainerMetaInfo is true, store system metrics for all
+    // containers. If false, and if this container is marked as the AM, metrics
+    // will still be published for this container, but that calculation happens
+    // later.
+    if (saveNonAMContainerMetaInfo && null != container.getId()) {
+      rmContext.getSystemMetricsPublisher().containerCreated(
+          this, this.creationTime);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 59a6650..ffb1885 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -49,6 +51,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -691,6 +696,25 @@ public class AppSchedulingInfo {
     }
   }
 
+  public List<ResourceRequest> allocate(NodeType type,
+      SchedulerNode node, SchedulerRequestKey schedulerKey,
+      Container containerAllocated) {
+    try {
+      writeLock.lock();
+      ResourceRequest request;
+      if (type == NodeType.NODE_LOCAL) {
+        request = resourceRequestMap.get(schedulerKey).get(node.getNodeName());
+      } else if (type == NodeType.RACK_LOCAL) {
+        request = resourceRequestMap.get(schedulerKey).get(node.getRackName());
+      } else{
+        request = 
resourceRequestMap.get(schedulerKey).get(ResourceRequest.ANY);
+      }
+      return allocate(type, node, schedulerKey, request, containerAllocated);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
   /**
    * Resources have been allocated to this application by the resource
    * scheduler. Track them.
@@ -701,40 +725,26 @@ public class AppSchedulingInfo {
    * @param containerAllocated Container Allocated
    * @return List of ResourceRequests
    */
-  public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
-      SchedulerRequestKey schedulerKey, ResourceRequest request,
-      Container containerAllocated) {
-    List<ResourceRequest> resourceRequests = new ArrayList<>();
+  public List<ResourceRequest> allocate(NodeType type,
+      SchedulerNode node, SchedulerRequestKey schedulerKey,
+      ResourceRequest request, Container containerAllocated) {
     try {
-      this.writeLock.lock();
+      writeLock.lock();
+      List<ResourceRequest> resourceRequests = new ArrayList<>();
       if (type == NodeType.NODE_LOCAL) {
         allocateNodeLocal(node, schedulerKey, request, resourceRequests);
       } else if (type == NodeType.RACK_LOCAL) {
         allocateRackLocal(node, schedulerKey, request, resourceRequests);
-      } else {
+      } else{
         allocateOffSwitch(request, resourceRequests, schedulerKey);
       }
-      QueueMetrics metrics = queue.getMetrics();
-      if (pending) {
-        // once an allocation is done we assume the application is
-        // running from scheduler's POV.
-        pending = false;
-        metrics.runAppAttempt(applicationId, user);
-      }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("allocate: applicationId=" + applicationId
-            + " container=" + containerAllocated.getId()
-            + " host=" + containerAllocated.getNodeId().toString()
-            + " user=" + user
-            + " resource=" + request.getCapability()
-            + " type=" + type);
+      if (null != containerAllocated) {
+        updateMetricsForAllocatedContainer(request, type, containerAllocated);
       }
-      metrics.allocateResources(user, 1, request.getCapability(), true);
-      metrics.incrNodeTypeAggregations(user, type);
       return resourceRequests;
     } finally {
-      this.writeLock.unlock();
+      writeLock.unlock();
     }
   }
 
@@ -942,4 +952,116 @@ public class AppSchedulingInfo {
             request.getRelaxLocality(), request.getNodeLabelExpression());
     return newRequest;
   }
+
+  /*
+   * In async environment, pending resource request could be updated during
+   * scheduling, this method checks pending request before allocating
+   */
+  public boolean checkAllocation(NodeType type, SchedulerNode node,
+      SchedulerRequestKey schedulerKey) {
+    try {
+      readLock.lock();
+      ResourceRequest r = resourceRequestMap.get(schedulerKey).get(
+          ResourceRequest.ANY);
+      if (r == null || r.getNumContainers() <= 0) {
+        return false;
+      }
+      if (type == NodeType.RACK_LOCAL || type == NodeType.NODE_LOCAL) {
+        r = resourceRequestMap.get(schedulerKey).get(node.getRackName());
+        if (r == null || r.getNumContainers() <= 0) {
+          return false;
+        }
+        if (type == NodeType.NODE_LOCAL) {
+          r = resourceRequestMap.get(schedulerKey).get(node.getNodeName());
+          if (r == null || r.getNumContainers() <= 0) {
+            return false;
+          }
+        }
+      }
+
+      return true;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public void updateMetricsForAllocatedContainer(
+      ResourceRequest request, NodeType type, Container containerAllocated) {
+    try {
+      writeLock.lock();
+      QueueMetrics metrics = queue.getMetrics();
+      if (pending) {
+        // once an allocation is done we assume the application is
+        // running from scheduler's POV.
+        pending = false;
+        metrics.runAppAttempt(applicationId, user);
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("allocate: applicationId=" + applicationId + " container="
+            + containerAllocated.getId() + " host=" + containerAllocated
+            .getNodeId().toString() + " user=" + user + " resource=" + request
+            .getCapability() + " type=" + type);
+      }
+      metrics.allocateResources(user, 1, request.getCapability(), true);
+      metrics.incrNodeTypeAggregations(user, type);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  // Get placement-set by specified schedulerKey
+  // Now simply return all node of the input clusterPlacementSet
+  // TODO, need update this when we support global scheduling
+  public <N extends SchedulerNode> SchedulingPlacementSet<N> 
getSchedulingPlacementSet(
+      SchedulerRequestKey schedulerkey) {
+    return new SchedulingPlacementSet<N>() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public Iterator<N> getPreferredNodeIterator(
+          PlacementSet<N> clusterPlacementSet) {
+        return IteratorUtils.singletonIterator(
+            clusterPlacementSet.getAllNodes().values().iterator().next());
+      }
+
+      @Override
+      public ResourceRequestUpdateResult updateResourceRequests(
+          List<ResourceRequest> requests,
+          boolean recoverPreemptedRequestForAContainer) {
+        return null;
+      }
+
+      @Override
+      public Map<String, ResourceRequest> getResourceRequests() {
+        return null;
+      }
+
+      @Override
+      public ResourceRequest getResourceRequest(String resourceName,
+          SchedulerRequestKey requestKey) {
+        return null;
+      }
+
+      @Override
+      public List<ResourceRequest> allocate(NodeType type, SchedulerNode node,
+          ResourceRequest request) {
+        return null;
+      }
+
+      @Override
+      public Map<NodeId, N> getAllNodes() {
+        return null;
+      }
+
+      @Override
+      public long getVersion() {
+        return 0;
+      }
+
+      @Override
+      public String getPartition() {
+        return null;
+      }
+    };
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index d148132..bb1d461 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -28,10 +28,12 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.collect.ConcurrentHashMultiset;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.time.DateUtils;
 import org.apache.commons.lang.time.FastDateFormat;
 import org.apache.commons.logging.Log;
@@ -49,12 +51,14 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -69,6 +73,9 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
 
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
@@ -178,6 +185,11 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
   protected ReentrantReadWriteLock.ReadLock readLock;
   protected ReentrantReadWriteLock.WriteLock writeLock;
 
+  // Not confirmed allocation resource, will be used to avoid too many proposal
+  // rejected because of duplicated allocation
+  private AtomicLong unconfirmedAllocatedMem = new AtomicLong();
+  private AtomicInteger unconfirmedAllocatedVcores = new AtomicInteger();
+
   public SchedulerApplicationAttempt(ApplicationAttemptId 
applicationAttemptId, 
       String user, Queue queue, ActiveUsersManager activeUsersManager,
       RMContext rmContext) {
@@ -529,6 +541,8 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
       if (rmContainer == null) {
         rmContainer = new RMContainerImpl(container, getApplicationAttemptId(),
             node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
+      }
+      if (rmContainer.getState() == RMContainerState.NEW) {
         attemptResourceUsage.incReserved(node.getPartition(),
             container.getResource());
         ((RMContainerImpl) rmContainer).setQueueName(this.getQueueName());
@@ -839,16 +853,10 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
   }
 
   // used for continuous scheduling
-  public void resetSchedulingOpportunities(
-      SchedulerRequestKey schedulerKey, long currentTimeMs) {
-    try {
-      writeLock.lock();
-      lastScheduledContainer.put(schedulerKey, currentTimeMs);
-      schedulingOpportunities.setCount(schedulerKey, 0);
-    } finally {
-      writeLock.unlock();
-    }
-
+  public void resetSchedulingOpportunities(SchedulerRequestKey schedulerKey,
+      long currentTimeMs) {
+    lastScheduledContainer.put(schedulerKey, currentTimeMs);
+    schedulingOpportunities.setCount(schedulerKey, 0);
   }
 
   @VisibleForTesting
@@ -998,6 +1006,11 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
 
   public void incNumAllocatedContainers(NodeType containerType,
       NodeType requestType) {
+    if (containerType == null || requestType == null) {
+      // Sanity check
+      return;
+    }
+
     RMAppAttempt attempt =
         rmContext.getRMApps().get(attemptId.getApplicationId())
           .getCurrentAppAttempt();
@@ -1039,9 +1052,27 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
   public boolean hasPendingResourceRequest(ResourceCalculator rc,
       String nodePartition, Resource cluster,
       SchedulingMode schedulingMode) {
-    return SchedulerUtils.hasPendingResourceRequest(rc,
-        this.attemptResourceUsage, nodePartition, cluster,
-        schedulingMode);
+    // We need to consider unconfirmed allocations
+    if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+      nodePartition = RMNodeLabelsManager.NO_LABEL;
+    }
+
+    Resource pending = attemptResourceUsage.getPending(nodePartition);
+
+    // TODO, need consider node partition here
+    // To avoid too many allocation-proposals rejected for non-default
+    // partition allocation
+    if (StringUtils.equals(nodePartition, RMNodeLabelsManager.NO_LABEL)) {
+      pending = Resources.subtract(pending, Resources
+          .createResource(unconfirmedAllocatedMem.get(),
+              unconfirmedAllocatedVcores.get()));
+    }
+
+    if (Resources.greaterThan(rc, cluster, pending, Resources.none())) {
+      return true;
+    }
+
+    return false;
   }
   
   @VisibleForTesting
@@ -1206,6 +1237,22 @@ public class SchedulerApplicationAttempt implements 
SchedulableEntity {
     this.isAttemptRecovering = isRecovering;
   }
 
+  public <N extends SchedulerNode> SchedulingPlacementSet<N> 
getSchedulingPlacementSet(
+      SchedulerRequestKey schedulerRequestKey) {
+    return appSchedulingInfo.getSchedulingPlacementSet(schedulerRequestKey);
+  }
+
+
+  public void incUnconfirmedRes(Resource res) {
+    unconfirmedAllocatedMem.addAndGet(res.getMemorySize());
+    unconfirmedAllocatedVcores.addAndGet(res.getVirtualCores());
+  }
+
+  public void decUnconfirmedRes(Resource res) {
+    unconfirmedAllocatedMem.addAndGet(-res.getMemorySize());
+    unconfirmedAllocatedVcores.addAndGet(-res.getVirtualCores());
+  }
+
   /**
    * Different state for Application Master, user can see this state from web 
UI
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
index 8fa1bb5..3f8ed55 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
@@ -25,12 +25,14 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
 /**
  * Utility for logging scheduler activities
  */
+// FIXME: make sure PlacementSet works with this class
 public class ActivitiesLogger {
   private static final Log LOG = LogFactory.getLog(ActivitiesLogger.class);
 
@@ -112,7 +114,7 @@ public class ActivitiesLogger {
      */
     public static void recordAppActivityWithAllocation(
         ActivitiesManager activitiesManager, SchedulerNode node,
-        SchedulerApplicationAttempt application, Container updatedContainer,
+        SchedulerApplicationAttempt application, RMContainer updatedContainer,
         ActivityState activityState) {
       if (activitiesManager == null) {
         return;
@@ -122,9 +124,9 @@ public class ActivitiesLogger {
         // Add application-container activity into specific node allocation.
         activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
             application.getApplicationId().toString(),
-            updatedContainer.getId().toString(),
-            updatedContainer.getPriority().toString(), activityState,
-            ActivityDiagnosticConstant.EMPTY, type);
+            updatedContainer.getContainer().toString(),
+            updatedContainer.getContainer().getPriority().toString(),
+            activityState, ActivityDiagnosticConstant.EMPTY, type);
         type = "app";
         // Add queue-application activity into specific node allocation.
         activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
@@ -138,9 +140,10 @@ public class ActivitiesLogger {
           application.getApplicationId())) {
         String type = "container";
         activitiesManager.addSchedulingActivityForApp(
-            application.getApplicationId(), 
updatedContainer.getId().toString(),
-            updatedContainer.getPriority().toString(), activityState,
-            ActivityDiagnosticConstant.EMPTY, type);
+            application.getApplicationId(),
+            updatedContainer.getContainerId(),
+            updatedContainer.getContainer().getPriority().toString(),
+            activityState, ActivityDiagnosticConstant.EMPTY, type);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
index 4fa5feb..af73ae3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
@@ -200,12 +200,13 @@ public class ActivitiesManager extends AbstractService {
   // Add queue, application or container activity into specific application
   // allocation.
   void addSchedulingActivityForApp(ApplicationId applicationId,
-      String containerId, String priority, ActivityState state,
+      ContainerId containerId, String priority, ActivityState state,
       String diagnostic, String type) {
     if (shouldRecordThisApp(applicationId)) {
       AppAllocation appAllocation = appsAllocation.get(applicationId);
-      appAllocation.addAppAllocationActivity(containerId, priority, state,
-          diagnostic, type);
+      appAllocation.addAppAllocationActivity(containerId == null ?
+          "Container-Id-Not-Assigned" :
+          containerId.toString(), priority, state, diagnostic, type);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 096f5ea..7e18b29 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,6 +55,12 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -738,4 +745,68 @@ public abstract class AbstractCSQueue implements CSQueue {
     return csContext.getPreemptionManager().getKillableContainers(queueName,
         partition);
   }
+
+  @VisibleForTesting
+  @Override
+  public CSAssignment assignContainers(Resource clusterResource,
+      FiCaSchedulerNode node, ResourceLimits resourceLimits,
+      SchedulingMode schedulingMode) {
+    return assignContainers(clusterResource, new SimplePlacementSet<>(node),
+        resourceLimits, schedulingMode);
+  }
+
+  @Override
+  public boolean accept(Resource cluster,
+      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
+    // Do we need to check parent queue before making this decision?
+    boolean checkParentQueue = false;
+
+    ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> 
allocation =
+        request.getFirstAllocatedOrReservedContainer();
+    SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer 
=
+        allocation.getAllocatedOrReservedContainer();
+
+    // Do not check when allocating new container from a reserved container
+    if (allocation.getAllocateFromReservedContainer() == null) {
+      Resource required = allocation.getAllocatedOrReservedResource();
+      Resource netAllocated = Resources.subtract(required,
+          request.getTotalReleasedResource());
+
+      try {
+        readLock.lock();
+
+        String partition = schedulerContainer.getNodePartition();
+        Resource maxResourceLimit;
+        if (allocation.getSchedulingMode()
+            == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+          maxResourceLimit = getQueueMaxResource(partition, cluster);
+        } else{
+          maxResourceLimit = labelManager.getResourceByLabel(
+              schedulerContainer.getNodePartition(), cluster);
+        }
+        if (!Resources.fitsIn(resourceCalculator, cluster,
+            Resources.add(queueUsage.getUsed(partition), netAllocated),
+            maxResourceLimit)) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Used resource=" + queueUsage.getUsed(partition)
+                + " exceeded maxResourceLimit of the queue ="
+                + maxResourceLimit);
+          }
+          return false;
+        }
+      } finally {
+        readLock.unlock();
+      }
+
+      // Only check parent queue when something new allocated or reserved.
+      checkParentQueue = true;
+    }
+
+
+    if (parent != null && checkParentQueue) {
+      return parent.accept(cluster, request);
+    }
+
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
index 7bea9af..2cae9a9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
@@ -38,7 +38,11 @@ public class CSAssignment {
       new CSAssignment(SkippedType.OTHER);
 
   private Resource resource;
+  // Container allocation locality type
   private NodeType type;
+
+  // Pending request locality type
+  private NodeType requestLocalityType;
   private RMContainer excessReservation;
   private FiCaSchedulerApp application;
   private SkippedType skipped;
@@ -57,6 +61,10 @@ public class CSAssignment {
   private boolean increaseAllocation;
   private List<RMContainer> containersToKill;
 
+  // Set when fulfilledReservation = true
+  private RMContainer fulfilledReservedContainer;
+  private SchedulingMode schedulingMode;
+
   public CSAssignment(Resource resource, NodeType type) {
     this(resource, type, null, null, SkippedType.NONE, false);
   }
@@ -173,4 +181,29 @@ public class CSAssignment {
   public List<RMContainer> getContainersToKill() {
     return containersToKill;
   }
+
+  public RMContainer getFulfilledReservedContainer() {
+    return fulfilledReservedContainer;
+  }
+
+  public void setFulfilledReservedContainer(
+      RMContainer fulfilledReservedContainer) {
+    this.fulfilledReservedContainer = fulfilledReservedContainer;
+  }
+
+  public SchedulingMode getSchedulingMode() {
+    return schedulingMode;
+  }
+
+  public void setSchedulingMode(SchedulingMode schedulingMode) {
+    this.schedulingMode = schedulingMode;
+  }
+
+  public NodeType getRequestLocalityType() {
+    return requestLocalityType;
+  }
+
+  public void setRequestLocalityType(NodeType requestLocalityType) {
+    this.requestLocalityType = requestLocalityType;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index daf7790..e5cbd04 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.security.AccessControlException;
@@ -42,8 +43,11 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet;
 
 /**
  * <code>CSQueue</code> represents a node in the tree of 
@@ -195,14 +199,14 @@ extends 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
   /**
    * Assign containers to applications in the queue or it's children (if any).
    * @param clusterResource the resource of the cluster.
-   * @param node node on which resources are available
+   * @param ps {@link PlacementSet} of nodes which resources are available
    * @param resourceLimits how much overall resource of this queue can use. 
    * @param schedulingMode Type of exclusive check when assign container on a 
    * NodeManager, see {@link SchedulingMode}.
    * @return the assignment
    */
   public CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, ResourceLimits resourceLimits,
+      PlacementSet<FiCaSchedulerNode> ps, ResourceLimits resourceLimits,
       SchedulingMode schedulingMode);
   
   /**
@@ -340,4 +344,15 @@ extends 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * @return valid node labels
    */
   public Set<String> getNodeLabelsForQueue();
+
+  @VisibleForTesting
+  CSAssignment assignContainers(Resource clusterResource,
+      FiCaSchedulerNode node, ResourceLimits resourceLimits,
+      SchedulingMode schedulingMode);
+
+  boolean accept(Resource cluster,
+      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request);
+
+  void apply(Resource cluster,
+      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to