YARN-4511. Common scheduler changes to support scheduler-specific 
oversubscription implementations.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b237095d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b237095d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b237095d

Branch: refs/heads/YARN-1011
Commit: b237095dbb5fd880b89c08aa6ad50e7fdf3ef4b9
Parents: da3021d
Author: Haibo Chen <haiboc...@apache.org>
Authored: Thu Nov 2 09:12:19 2017 -0700
Committer: Haibo Chen <haiboc...@apache.org>
Committed: Mon Apr 9 17:07:06 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java   |   6 +
 .../yarn/sls/scheduler/RMNodeWrapper.java       |   6 +
 .../resourcemanager/ResourceTrackerService.java |   3 +-
 .../monitor/capacity/TempSchedulerNode.java     |   2 +-
 .../server/resourcemanager/rmnode/RMNode.java   |   7 +
 .../resourcemanager/rmnode/RMNodeImpl.java      |  13 +-
 .../scheduler/AbstractYarnScheduler.java        |   4 +-
 .../scheduler/ClusterNodeTracker.java           |   6 +-
 .../scheduler/SchedulerNode.java                | 317 +++++++++++----
 .../scheduler/SchedulerNodeReport.java          |   4 +-
 .../scheduler/capacity/CapacityScheduler.java   |   2 +-
 .../allocator/RegularContainerAllocator.java    |   4 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java |   2 +-
 .../common/fica/FiCaSchedulerNode.java          |  11 +-
 .../scheduler/fair/FSPreemptionThread.java      |   2 +-
 .../scheduler/fair/FSSchedulerNode.java         |   9 +-
 .../yarn/server/resourcemanager/MockNodes.java  |   6 +
 .../TestWorkPreservingRMRestart.java            |  39 +-
 ...alCapacityPreemptionPolicyMockFramework.java |   2 +-
 ...alCapacityPreemptionPolicyMockFramework.java |   6 +-
 .../scheduler/TestAbstractYarnScheduler.java    |   4 +-
 .../scheduler/TestSchedulerNode.java            | 393 +++++++++++++++++++
 .../capacity/TestCapacityScheduler.java         |   2 +-
 .../TestCapacitySchedulerAsyncScheduling.java   |   8 +-
 .../scheduler/capacity/TestLeafQueue.java       |   4 +-
 .../TestNodeLabelContainerAllocation.java       |  14 +-
 .../fair/TestContinuousScheduling.java          |  42 +-
 .../scheduler/fair/TestFSSchedulerNode.java     |  18 +-
 .../scheduler/fair/TestFairScheduler.java       |  14 +-
 .../scheduler/fifo/TestFifoScheduler.java       |   4 +-
 30 files changed, 787 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 0c99139..4b9800f 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -200,6 +201,11 @@ public class NodeInfo {
     }
 
     @Override
+    public OverAllocationInfo getOverAllocationInfo() {
+      return null;
+    }
+
+    @Override
     public long getUntrackedTimeStamp() {
       return 0;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 78645e9..a652ac8 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode
@@ -188,6 +189,11 @@ public class RMNodeWrapper implements RMNode {
   }
 
   @Override
+  public OverAllocationInfo getOverAllocationInfo() {
+    return node.getOverAllocationInfo();
+  }
+
+  @Override
   public long getUntrackedTimeStamp() {
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index e997192..00d1169 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -395,7 +395,8 @@ public class ResourceTrackerService extends AbstractService 
implements
         .getCurrentKey());
 
     RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
-        resolve(host), capability, nodeManagerVersion, physicalResource);
+        resolve(host), capability, nodeManagerVersion, physicalResource,
+        request.getOverAllocationInfo());
 
     RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
     if (oldNode == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java
index 320f262..25777af 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempSchedulerNode.java
@@ -51,7 +51,7 @@ public class TempSchedulerNode {
   public static TempSchedulerNode fromSchedulerNode(
       FiCaSchedulerNode schedulerNode) {
     TempSchedulerNode n = new TempSchedulerNode();
-    n.totalResource = Resources.clone(schedulerNode.getTotalResource());
+    n.totalResource = Resources.clone(schedulerNode.getCapacity());
     n.allocatedResource = 
Resources.clone(schedulerNode.getAllocatedResource());
     n.runningContainers = schedulerNode.getCopiedListOfRunningContainers();
     n.reservedContainer = schedulerNode.getReservedContainer();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 872f2a6..a858b47 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 
 /**
  * Node managers information on available resources 
@@ -115,6 +116,12 @@ public interface RMNode {
   public ResourceUtilization getNodeUtilization();
 
   /**
+   * Get the node overallocation threshold.
+   * @return the overallocation threshold
+   */
+  OverAllocationInfo getOverAllocationInfo();
+
+  /**
    * the physical resources in the node.
    * @return the physical resources in the node.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index b942afa..5a7c008 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -64,6 +64,7 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
@@ -109,6 +110,7 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
   private final WriteLock writeLock;
 
   private final ConcurrentLinkedQueue<UpdatedContainerInfo> nodeUpdateQueue;
+  private final OverAllocationInfo overallocationInfo;
   private volatile boolean nextHeartBeat = true;
 
   private final NodeId nodeId;
@@ -364,12 +366,13 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
       int cmPort, int httpPort, Node node, Resource capability,
       String nodeManagerVersion) {
     this(nodeId, context, hostName, cmPort, httpPort, node, capability,
-        nodeManagerVersion, null);
+        nodeManagerVersion, null, null);
   }
 
   public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
       int cmPort, int httpPort, Node node, Resource capability,
-      String nodeManagerVersion, Resource physResource) {
+      String nodeManagerVersion, Resource physResource,
+      OverAllocationInfo overAllocationInfo) {
     this.nodeId = nodeId;
     this.context = context;
     this.hostName = hostName;
@@ -384,6 +387,7 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
     this.nodeManagerVersion = nodeManagerVersion;
     this.timeStamp = 0;
     this.physicalResource = physResource;
+    this.overallocationInfo = overAllocationInfo;
 
     this.latestNodeHeartBeatResponse.setResponseId(0);
 
@@ -533,6 +537,11 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
     }
   }
 
+  @Override
+  public OverAllocationInfo getOverAllocationInfo() {
+    return this.overallocationInfo;
+  }
+
   public void setNodeUtilization(ResourceUtilization nodeUtilization) {
     this.writeLock.lock();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index e76287d..4818132 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -357,7 +357,7 @@ public abstract class AbstractYarnScheduler
       }
 
       application.containerLaunchedOnNode(containerId, node.getNodeID());
-      node.containerStarted(containerId);
+      node.containerLaunched(containerId);
     } finally {
       readLock.unlock();
     }
@@ -823,7 +823,7 @@ public abstract class AbstractYarnScheduler
       writeLock.lock();
       SchedulerNode node = getSchedulerNode(nm.getNodeID());
       Resource newResource = resourceOption.getResource();
-      Resource oldResource = node.getTotalResource();
+      Resource oldResource = node.getCapacity();
       if (!oldResource.equals(newResource)) {
         // Notify NodeLabelsManager about this change
         rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
index 66d8810..3030373 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ClusterNodeTracker.java
@@ -90,7 +90,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
       nodesList.add(node);
 
       // Update cluster capacity
-      Resources.addTo(clusterCapacity, node.getTotalResource());
+      Resources.addTo(clusterCapacity, node.getCapacity());
       staleClusterCapacity = Resources.clone(clusterCapacity);
 
       // Update maximumAllocation
@@ -175,7 +175,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
       }
 
       // Update cluster capacity
-      Resources.subtractFrom(clusterCapacity, node.getTotalResource());
+      Resources.subtractFrom(clusterCapacity, node.getCapacity());
       staleClusterCapacity = Resources.clone(clusterCapacity);
 
       // Update maximumAllocation
@@ -237,7 +237,7 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
   }
 
   private void updateMaxResources(SchedulerNode node, boolean add) {
-    Resource totalResource = node.getTotalResource();
+    Resource totalResource = node.getCapacity();
     ResourceInformation[] totalResources;
 
     if (totalResource != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index d5bfc57..5c8efbe 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -58,23 +59,32 @@ public abstract class SchedulerNode {
 
   private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
 
+  private Resource capacity;
   private Resource unallocatedResource = Resource.newInstance(0, 0);
-  private Resource allocatedResource = Resource.newInstance(0, 0);
-  private Resource totalResource;
+
   private RMContainer reservedContainer;
-  private volatile int numContainers;
   private volatile ResourceUtilization containersUtilization =
       ResourceUtilization.newInstance(0, 0, 0f);
   private volatile ResourceUtilization nodeUtilization =
       ResourceUtilization.newInstance(0, 0, 0f);
 
-  /* set of containers that are allocated containers */
-  private final Map<ContainerId, ContainerInfo> launchedContainers =
-      new HashMap<>();
+  private final Map<ContainerId, ContainerInfo>
+      allocatedContainers = new HashMap<>();
+
+  private volatile int numGuaranteedContainers = 0;
+  private Resource allocatedResourceGuaranteed = Resource.newInstance(0, 0);
+
+  private volatile int numOpportunisticContainers = 0;
+  private Resource allocatedResourceOpportunistic = Resource.newInstance(0, 0);
 
   private final RMNode rmNode;
   private final String nodeName;
 
+  // The total amount of resources requested by containers that have been
+  // allocated but not yet launched on the node.
+  protected Resource resourceAllocatedPendingLaunch =
+      Resource.newInstance(0, 0);
+
   private volatile Set<String> labels = null;
 
   // Last updated time
@@ -84,7 +94,7 @@ public abstract class SchedulerNode {
       Set<String> labels) {
     this.rmNode = node;
     this.unallocatedResource = Resources.clone(node.getTotalCapability());
-    this.totalResource = Resources.clone(node.getTotalCapability());
+    this.capacity = Resources.clone(node.getTotalCapability());
     if (usePortForNodeName) {
       nodeName = rmNode.getHostName() + ":" + node.getNodeID().getPort();
     } else {
@@ -107,9 +117,9 @@ public abstract class SchedulerNode {
    * @param resource Total resources on the node.
    */
   public synchronized void updateTotalResource(Resource resource){
-    this.totalResource = resource;
-    this.unallocatedResource = Resources.subtract(totalResource,
-        this.allocatedResource);
+    this.capacity = Resources.clone(resource);
+    this.unallocatedResource = Resources.subtract(capacity,
+        this.allocatedResourceGuaranteed);
   }
 
   /**
@@ -168,17 +178,83 @@ public abstract class SchedulerNode {
   protected synchronized void allocateContainer(RMContainer rmContainer,
       boolean launchedOnNode) {
     Container container = rmContainer.getContainer();
-    if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
-      deductUnallocatedResource(container.getResource());
-      ++numContainers;
+
+    if (container.getExecutionType() == ExecutionType.GUARANTEED) {
+      guaranteedContainerResourceAllocated(rmContainer, launchedOnNode);
+    } else {
+      opportunisticContainerResourceAllocated(rmContainer, launchedOnNode);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Assigned container " + container.getId() + " of capacity "
+          + container.getResource() + " and type " +
+          container.getExecutionType() + " on host " + toString());
+    }
+  }
+
+  /**
+   * Handle an allocation of a GUARANTEED container.
+   * @param rmContainer the allocated GUARANTEED container
+   * @param launchedOnNode true if the container has been launched
+   */
+  private void guaranteedContainerResourceAllocated(
+      RMContainer rmContainer, boolean launchedOnNode) {
+    Container container = rmContainer.getContainer();
+
+    if (container.getExecutionType() != ExecutionType.GUARANTEED) {
+      throw new YarnRuntimeException("Inapplicable ExecutionType: " +
+          container.getExecutionType());
     }
 
-    launchedContainers.put(container.getId(),
+    allocatedContainers.put(container.getId(),
         new ContainerInfo(rmContainer, launchedOnNode));
+
+    Resource resource = container.getResource();
+    if (containerResourceAllocated(resource, allocatedResourceGuaranteed)) {
+      Resources.subtractFrom(unallocatedResource, resource);
+    }
+
+    numGuaranteedContainers++;
   }
 
   /**
-   * Get unallocated resources on the node.
+   * Handle an allocation of a OPPORTUNISTIC container.
+   * @param rmContainer the allocated OPPORTUNISTIC container
+   * @param launchedOnNode true if the container has been launched
+   */
+  private void opportunisticContainerResourceAllocated(
+      RMContainer rmContainer, boolean launchedOnNode) {
+    Container container = rmContainer.getContainer();
+
+    if (container.getExecutionType() != ExecutionType.OPPORTUNISTIC) {
+      throw new YarnRuntimeException("Inapplicable ExecutionType: " +
+          container.getExecutionType());
+    }
+
+    allocatedContainers.put(rmContainer.getContainerId(),
+        new ContainerInfo(rmContainer, launchedOnNode));
+    if (containerResourceAllocated(
+        container.getResource(), allocatedResourceOpportunistic)) {
+      // nothing to do here
+    }
+    numOpportunisticContainers++;
+  }
+
+  private boolean containerResourceAllocated(Resource allocated,
+      Resource aggregatedResources) {
+    if (allocated == null) {
+      LOG.error("Invalid deduction of null resource for "
+          + rmNode.getNodeAddress());
+      return false;
+    }
+    Resources.addTo(resourceAllocatedPendingLaunch, allocated);
+    Resources.addTo(aggregatedResources, allocated);
+    return true;
+  }
+
+
+  /**
+   * Get resources that are not allocated to GUARANTEED containers on the node.
    * @return Unallocated resources on the node
    */
   public synchronized Resource getUnallocatedResource() {
@@ -186,42 +262,57 @@ public abstract class SchedulerNode {
   }
 
   /**
-   * Get allocated resources on the node.
-   * @return Allocated resources on the node
+   * Get resources allocated to GUARANTEED containers on the node.
+   * @return Allocated resources to GUARANTEED containers on the node
    */
   public synchronized Resource getAllocatedResource() {
-    return this.allocatedResource;
+    return this.allocatedResourceGuaranteed;
+  }
+
+  /**
+   * Get resources allocated to OPPORTUNISTIC containers on the node.
+   * @return Allocated resources to OPPORTUNISTIC containers on the node
+   */
+  public synchronized Resource getOpportunisticResourceAllocated() {
+    return this.allocatedResourceOpportunistic;
+  }
+
+  @VisibleForTesting
+  public synchronized Resource getResourceAllocatedPendingLaunch() {
+    return this.resourceAllocatedPendingLaunch;
   }
 
   /**
    * Get total resources on the node.
    * @return Total resources on the node.
    */
-  public synchronized Resource getTotalResource() {
-    return this.totalResource;
+  public synchronized Resource getCapacity() {
+    return this.capacity;
   }
 
   /**
-   * Check if a container is launched by this node.
+   * Check if a GUARANTEED container is launched by this node.
    * @return If the container is launched by the node.
    */
-  public synchronized boolean isValidContainer(ContainerId containerId) {
-    if (launchedContainers.containsKey(containerId)) {
-      return true;
-    }
-    return false;
+  @VisibleForTesting
+  public synchronized boolean isValidGuaranteedContainer(
+      ContainerId containerId) {
+    ContainerInfo containerInfo = allocatedContainers.get(containerId);
+    return containerInfo != null && ExecutionType.GUARANTEED ==
+        containerInfo.container.getExecutionType();
   }
 
   /**
-   * Update the resources of the node when releasing a container.
-   * @param container Container to release.
+   * Check if an OPPORTUNISTIC container is launched by this node.
+   * @param containerId id of the container to check
+   * @return If the container is launched by the node.
    */
-  protected synchronized void updateResourceForReleasedContainer(
-      Container container) {
-    if (container.getExecutionType() == ExecutionType.GUARANTEED) {
-      addUnallocatedResource(container.getResource());
-      --numContainers;
-    }
+  @VisibleForTesting
+  public synchronized boolean isValidOpportunisticContainer(
+      ContainerId containerId)  {
+    ContainerInfo containerInfo = allocatedContainers.get(containerId);
+    return containerInfo != null && ExecutionType.OPPORTUNISTIC ==
+        containerInfo.container.getExecutionType();
   }
 
   /**
@@ -231,25 +322,34 @@ public abstract class SchedulerNode {
    */
   public synchronized void releaseContainer(ContainerId containerId,
       boolean releasedByNode) {
-    ContainerInfo info = launchedContainers.get(containerId);
-    if (info == null) {
+    RMContainer rmContainer = getContainer(containerId);
+    if (rmContainer == null) {
+      LOG.warn("Invalid container " + containerId + " is released.");
       return;
     }
-    if (!releasedByNode && info.launchedOnNode) {
-      // wait until node reports container has completed
+
+    if (!allocatedContainers.containsKey(containerId)) {
+      // do not process if the container is never allocated on the node
+      return;
+    }
+    if (!releasedByNode &&
+        allocatedContainers.get(containerId).launchedOnNode) {
+      // only process if the container has not been launched on a node
+      // yet or it is released by node.
       return;
     }
 
-    launchedContainers.remove(containerId);
-    Container container = info.container.getContainer();
-    updateResourceForReleasedContainer(container);
+    Container container = rmContainer.getContainer();
+    if (container.getExecutionType() == ExecutionType.GUARANTEED) {
+      guaranteedContainerReleased(container);
+    } else {
+      opportunisticContainerReleased(container);
+    }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Released container " + container.getId() + " of capacity "
-              + container.getResource() + " on host " + rmNode.getNodeAddress()
-              + ", which currently has " + numContainers + " containers, "
-              + getAllocatedResource() + " used and " + 
getUnallocatedResource()
-              + " available" + ", release resources=" + true);
+      LOG.debug("Released " +  container.getExecutionType() + " container " +
+          containerId + " of " + "capacity " +  container.getResource() +
+          " on node (" + toString() + ")" + ", release resources=" + true);
     }
   }
 
@@ -257,42 +357,75 @@ public abstract class SchedulerNode {
    * Inform the node that a container has launched.
    * @param containerId ID of the launched container
    */
-  public synchronized void containerStarted(ContainerId containerId) {
-    ContainerInfo info = launchedContainers.get(containerId);
-    if (info != null) {
+  public synchronized void containerLaunched(ContainerId containerId) {
+    ContainerInfo info = allocatedContainers.get(containerId);
+    if (info != null && !info.launchedOnNode) {
       info.launchedOnNode = true;
+      Resources.subtractFrom(resourceAllocatedPendingLaunch,
+          info.container.getContainer().getResource());
     }
   }
 
   /**
-   * Add unallocated resources to the node. This is used when unallocating a
-   * container.
-   * @param resource Resources to add.
+   * Handle the release of a GUARANTEED container.
+   * @param container Container to release.
    */
-  private synchronized void addUnallocatedResource(Resource resource) {
-    if (resource == null) {
-      LOG.error("Invalid resource addition of null resource for "
-          + rmNode.getNodeAddress());
-      return;
+  protected synchronized void guaranteedContainerReleased(
+      Container container) {
+    if (container.getExecutionType() != ExecutionType.GUARANTEED) {
+      throw new YarnRuntimeException("Inapplicable ExecutionType: " +
+          container.getExecutionType());
+    }
+
+    if (containerResourceReleased(container, allocatedResourceGuaranteed)) {
+      Resources.addTo(unallocatedResource, container.getResource());
     }
-    Resources.addTo(unallocatedResource, resource);
-    Resources.subtractFrom(allocatedResource, resource);
+    // do not update allocated containers until the resources of
+    // the container are released because we need to check if we
+    // need to update resourceAllocatedPendingLaunch in case the
+    // container has not been launched on the node.
+    allocatedContainers.remove(container.getId());
+    numGuaranteedContainers--;
   }
 
   /**
-   * Deduct unallocated resources from the node. This is used when allocating a
-   * container.
-   * @param resource Resources to deduct.
+   * Handle the release of an OPPORTUNISTIC container.
+   * @param container Container to release.
    */
-  @VisibleForTesting
-  public synchronized void deductUnallocatedResource(Resource resource) {
-    if (resource == null) {
-      LOG.error("Invalid deduction of null resource for "
+  private void opportunisticContainerReleased(
+      Container container) {
+    if (container.getExecutionType() != ExecutionType.OPPORTUNISTIC) {
+      throw new YarnRuntimeException("Inapplicable ExecutionType: " +
+          container.getExecutionType());
+    }
+
+    if (containerResourceReleased(container, allocatedResourceOpportunistic)) {
+      // nothing to do here
+    }
+    // do not update allocated containers until the resources of
+    // the container are released because we need to check if we
+    // need to update resourceAllocatedPendingLaunch in case the
+    // container has not been launched on the node.
+    allocatedContainers.remove(container.getId());
+    numOpportunisticContainers--;
+  }
+
+  private boolean containerResourceReleased(Container container,
+      Resource aggregatedResource) {
+    Resource released = container.getResource();
+    if (released == null) {
+      LOG.error("Invalid resource addition of null resource for "
           + rmNode.getNodeAddress());
-      return;
+      return false;
     }
-    Resources.subtractFrom(unallocatedResource, resource);
-    Resources.addTo(allocatedResource, resource);
+    Resources.subtractFrom(aggregatedResource, released);
+
+    if (!allocatedContainers.get(container.getId()).launchedOnNode) {
+      // update resourceAllocatedPendingLaunch if the container is has
+      // not yet been launched on the node
+      Resources.subtractFrom(resourceAllocatedPendingLaunch, released);
+    }
+    return true;
   }
 
   /**
@@ -312,17 +445,28 @@ public abstract class SchedulerNode {
 
   @Override
   public String toString() {
-    return "host: " + rmNode.getNodeAddress() + " #containers="
-        + getNumContainers() + " available=" + getUnallocatedResource()
-        + " used=" + getAllocatedResource();
+    return "host: " + rmNode.getNodeAddress() + " #guaranteed containers=" +
+        getNumGuaranteedContainers() + " #opportunistic containers="  +
+        getNumOpportunisticContainers() + " available=" +
+        getUnallocatedResource() + " used by guaranteed containers=" +
+        allocatedResourceGuaranteed + " used by opportunistic containers=" +
+        allocatedResourceOpportunistic;
+  }
+
+  /**
+   * Get number of active GUARANTEED containers on the node.
+   * @return Number of active OPPORTUNISTIC containers on the node.
+   */
+  public int getNumGuaranteedContainers() {
+    return numGuaranteedContainers;
   }
 
   /**
-   * Get number of active containers on the node.
-   * @return Number of active containers on the node.
+   * Get number of active OPPORTUNISTIC containers on the node.
+   * @return Number of active OPPORTUNISTIC containers on the node.
    */
-  public int getNumContainers() {
-    return numContainers;
+  public int getNumOpportunisticContainers() {
+    return numOpportunisticContainers;
   }
 
   /**
@@ -330,8 +474,8 @@ public abstract class SchedulerNode {
    * @return A copy of containers running on the node.
    */
   public synchronized List<RMContainer> getCopiedListOfRunningContainers() {
-    List<RMContainer> result = new ArrayList<>(launchedContainers.size());
-    for (ContainerInfo info : launchedContainers.values()) {
+    List<RMContainer> result = new ArrayList<>(allocatedContainers.size());
+    for (ContainerInfo info : allocatedContainers.values()) {
       result.add(info.container);
     }
     return result;
@@ -341,12 +485,14 @@ public abstract class SchedulerNode {
    * Get the containers running on the node with AM containers at the end.
    * @return A copy of running containers with AM containers at the end.
    */
-  public synchronized List<RMContainer> getRunningContainersWithAMsAtTheEnd() {
+  public synchronized List<RMContainer>
+      getRunningGuaranteedContainersWithAMsAtTheEnd() {
     LinkedList<RMContainer> result = new LinkedList<>();
-    for (ContainerInfo info : launchedContainers.values()) {
+    for (ContainerInfo info : allocatedContainers.values()) {
       if(info.container.isAMContainer()) {
         result.addLast(info.container);
-      } else {
+      } else if (info.container.getExecutionType() ==
+          ExecutionType.GUARANTEED){
         result.addFirst(info.container);
       }
     }
@@ -359,12 +505,9 @@ public abstract class SchedulerNode {
    * @return The container for the specified container ID
    */
   protected synchronized RMContainer getContainer(ContainerId containerId) {
-    RMContainer container = null;
-    ContainerInfo info = launchedContainers.get(containerId);
-    if (info != null) {
-      container = info.container;
-    }
-    return container;
+    ContainerInfo info = allocatedContainers.get(containerId);
+
+    return info != null ? info.container : null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java
index fa71a25..ea30d78 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNodeReport.java
@@ -31,11 +31,11 @@ public class SchedulerNodeReport {
   private final Resource used;
   private final Resource avail;
   private final int num;
-  
+
   public SchedulerNodeReport(SchedulerNode node) {
     this.used = node.getAllocatedResource();
     this.avail = node.getUnallocatedResource();
-    this.num = node.getNumContainers();
+    this.num = node.getNumGuaranteedContainers();
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 776e512..c06887e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1839,7 +1839,7 @@ public class CapacityScheduler extends
       // update this node to node label manager
       if (labelManager != null) {
         labelManager.activateNode(nodeManager.getNodeID(),
-            schedulerNode.getTotalResource());
+            schedulerNode.getCapacity());
       }
 
       Resource clusterResource = getClusterResource();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
index afa468b..3f3ba1e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java
@@ -501,13 +501,13 @@ public class RegularContainerAllocator extends 
AbstractContainerAllocator {
 
     Resource capability = pendingAsk.getPerAllocationResource();
     Resource available = node.getUnallocatedResource();
-    Resource totalResource = node.getTotalResource();
+    Resource totalResource = node.getCapacity();
 
     if (!Resources.lessThanOrEqual(rc, clusterResource,
         capability, totalResource)) {
       LOG.warn("Node : " + node.getNodeID()
           + " does not have sufficient resource for ask : " + pendingAsk
-          + " node total capability : " + node.getTotalResource());
+          + " node total capability : " + node.getCapacity());
       // Skip this locality request
       ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
           activitiesManager, node, application, priority,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 32b2cad..8b6b946 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -1007,7 +1007,7 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
       diagnosticMessageBldr.append(" ( Partition : ");
       diagnosticMessageBldr.append(node.getLabels());
       diagnosticMessageBldr.append(", Total resource : ");
-      diagnosticMessageBldr.append(node.getTotalResource());
+      diagnosticMessageBldr.append(node.getCapacity());
       diagnosticMessageBldr.append(", Available resource : ");
       diagnosticMessageBldr.append(node.getUnallocatedResource());
       diagnosticMessageBldr.append(" ).");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index 7277779..52165a8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -144,9 +144,9 @@ public class FiCaSchedulerNode extends SchedulerNode {
   }
 
   @Override
-  protected synchronized void updateResourceForReleasedContainer(
+  protected synchronized void guaranteedContainerReleased(
       Container container) {
-    super.updateResourceForReleasedContainer(container);
+    super.guaranteedContainerReleased(container);
     if (killableContainers.containsKey(container.getId())) {
       Resources.subtractFrom(totalKillableResources, container.getResource());
       killableContainers.remove(container.getId());
@@ -168,9 +168,10 @@ public class FiCaSchedulerNode extends SchedulerNode {
     final Container container = rmContainer.getContainer();
     LOG.info("Assigned container " + container.getId() + " of capacity "
           + container.getResource() + " on host " + 
getRMNode().getNodeAddress()
-          + ", which has " + getNumContainers() + " containers, "
-          + getAllocatedResource() + " used and " + getUnallocatedResource()
-          + " available after allocation");
+          + ", which has " + getNumGuaranteedContainers() + " guaranteed"
+          + " containers using " + getAllocatedResource() + ", "
+          + getNumOpportunisticContainers() + " opportunistic containers"
+          + " using " + getOpportunisticResourceAllocated());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
index c32565f..dcb076f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java
@@ -193,7 +193,7 @@ class FSPreemptionThread extends Thread {
 
     // Figure out list of containers to consider
     List<RMContainer> containersToCheck =
-        node.getRunningContainersWithAMsAtTheEnd();
+        node.getRunningGuaranteedContainersWithAMsAtTheEnd();
     containersToCheck.removeAll(node.getContainersForPreemption());
 
     // Initialize potential with unallocated but not reserved resources

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
index 44ec9c3..95490f5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java
@@ -242,11 +242,12 @@ public class FSSchedulerNode extends SchedulerNode {
     super.allocateContainer(rmContainer, launchedOnNode);
     if (LOG.isDebugEnabled()) {
       final Container container = rmContainer.getContainer();
-      LOG.debug("Assigned container " + container.getId() + " of capacity "
+      LOG.info("Assigned container " + container.getId() + " of capacity "
           + container.getResource() + " on host " + 
getRMNode().getNodeAddress()
-          + ", which has " + getNumContainers() + " containers, "
-          + getAllocatedResource() + " used and " + getUnallocatedResource()
-          + " available after allocation");
+          + ", which has " + getNumGuaranteedContainers() + " guaranteed "
+          + "containers using " + getAllocatedResource() + ", "
+          + getNumOpportunisticContainers() + " opportunistic containers "
+          + "using " + getOpportunisticResourceAllocated());
     }
 
     Resource allocated = rmContainer.getAllocatedResource();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 84105d9..522d4f4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -37,6 +37,7 @@ import 
org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 
@@ -262,6 +263,11 @@ public class MockNodes {
       return this.nodeUtilization;
     }
 
+    @Override
+    public OverAllocationInfo getOverAllocationInfo() {
+      return null;
+    }
+
     public OpportunisticContainersStatus getOpportunisticContainersStatus() {
       return OpportunisticContainersStatus.newInstance();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index e4c83e3..5b49303 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -226,13 +226,14 @@ public class TestWorkPreservingRMRestart extends 
ParameterizedSchedulerTestBase
     Resource nmResource =
         Resource.newInstance(nm1.getMemory(), nm1.getvCores());
 
-    assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
-    assertTrue(schedulerNode1.isValidContainer(runningContainer
-      .getContainerId()));
-    assertFalse(schedulerNode1.isValidContainer(completedContainer
-      .getContainerId()));
+    assertTrue(schedulerNode1.isValidGuaranteedContainer(
+        amContainer.getContainerId()));
+    assertTrue(schedulerNode1.isValidGuaranteedContainer(
+        runningContainer.getContainerId()));
+    assertFalse(schedulerNode1.isValidGuaranteedContainer(
+        completedContainer.getContainerId()));
     // 2 launched containers, 1 completed container
-    assertEquals(2, schedulerNode1.getNumContainers());
+    assertEquals(2, schedulerNode1.getNumGuaranteedContainers());
 
     assertEquals(Resources.subtract(nmResource, usedResources),
       schedulerNode1.getUnallocatedResource());
@@ -383,13 +384,14 @@ public class TestWorkPreservingRMRestart extends 
ParameterizedSchedulerTestBase
     Resource nmResource =
         Resource.newInstance(nm1.getMemory(), nm1.getvCores());
 
-    assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
-    assertTrue(
-        schedulerNode1.isValidContainer(runningContainer.getContainerId()));
-    assertFalse(
-        schedulerNode1.isValidContainer(completedContainer.getContainerId()));
+    assertTrue(schedulerNode1.isValidGuaranteedContainer(
+        amContainer.getContainerId()));
+    assertTrue(schedulerNode1.isValidGuaranteedContainer(
+        runningContainer.getContainerId()));
+    assertFalse(schedulerNode1.isValidGuaranteedContainer(
+        completedContainer.getContainerId()));
     // 2 launched containers, 1 completed container
-    assertEquals(2, schedulerNode1.getNumContainers());
+    assertEquals(2, schedulerNode1.getNumGuaranteedContainers());
 
     assertEquals(Resources.subtract(nmResource, usedResources),
         schedulerNode1.getUnallocatedResource());
@@ -1650,13 +1652,14 @@ public class TestWorkPreservingRMRestart extends 
ParameterizedSchedulerTestBase
     Resource nmResource = Resource.newInstance(nm1.getMemory(),
         nm1.getvCores());
 
-    assertTrue(schedulerNode1.isValidContainer(amContainer.getContainerId()));
-    assertTrue(
-        schedulerNode1.isValidContainer(runningContainer.getContainerId()));
-    assertFalse(
-        schedulerNode1.isValidContainer(completedContainer.getContainerId()));
+    assertTrue(schedulerNode1.isValidGuaranteedContainer(
+        amContainer.getContainerId()));
+    assertTrue(schedulerNode1.isValidGuaranteedContainer(
+        runningContainer.getContainerId()));
+    assertFalse(schedulerNode1.isValidGuaranteedContainer(
+        completedContainer.getContainerId()));
     // 2 launched containers, 1 completed container
-    assertEquals(2, schedulerNode1.getNumContainers());
+    assertEquals(2, schedulerNode1.getNumGuaranteedContainers());
 
     assertEquals(Resources.subtract(nmResource, usedResources),
         schedulerNode1.getUnallocatedResource());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index a8e2697..f725b9ef 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -480,7 +480,7 @@ public class 
ProportionalCapacityPreemptionPolicyMockFramework {
           totalRes = parseResourceFromString(resSring);
         }
       }
-      when(sn.getTotalResource()).thenReturn(totalRes);
+      when(sn.getCapacity()).thenReturn(totalRes);
       when(sn.getUnallocatedResource()).thenReturn(Resources.clone(totalRes));
 
       // TODO, add settings of killable resources when necessary

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
index 964a230..89fb846 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
@@ -237,17 +237,17 @@ public class 
TestProportionalCapacityPreemptionPolicyMockFramework
     // Check host resources
     Assert.assertEquals(3, this.cs.getAllNodes().size());
     SchedulerNode node1 = cs.getSchedulerNode(NodeId.newInstance("n1", 1));
-    Assert.assertEquals(100, node1.getTotalResource().getMemorySize());
+    Assert.assertEquals(100, node1.getCapacity().getMemorySize());
     Assert.assertEquals(100, node1.getCopiedListOfRunningContainers().size());
     Assert.assertNull(node1.getReservedContainer());
 
     SchedulerNode node2 = cs.getSchedulerNode(NodeId.newInstance("n2", 1));
-    Assert.assertEquals(0, node2.getTotalResource().getMemorySize());
+    Assert.assertEquals(0, node2.getCapacity().getMemorySize());
     Assert.assertEquals(50, node2.getCopiedListOfRunningContainers().size());
     Assert.assertNotNull(node2.getReservedContainer());
 
     SchedulerNode node3 = cs.getSchedulerNode(NodeId.newInstance("n3", 1));
-    Assert.assertEquals(30, node3.getTotalResource().getMemorySize());
+    Assert.assertEquals(30, node3.getCapacity().getMemorySize());
     Assert.assertEquals(100, node3.getCopiedListOfRunningContainers().size());
     Assert.assertNull(node3.getReservedContainer());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
index 979e68a..ef5a0c6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
@@ -289,12 +289,12 @@ public class TestAbstractYarnScheduler extends 
ParameterizedSchedulerTestBase {
       SchedulerNode mockNode1 = mock(SchedulerNode.class);
       when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("foo", 8080));
       when(mockNode1.getUnallocatedResource()).thenReturn(emptyResource);
-      when(mockNode1.getTotalResource()).thenReturn(fullResource1);
+      when(mockNode1.getCapacity()).thenReturn(fullResource1);
 
       SchedulerNode mockNode2 = mock(SchedulerNode.class);
       when(mockNode1.getNodeID()).thenReturn(NodeId.newInstance("bar", 8081));
       when(mockNode2.getUnallocatedResource()).thenReturn(emptyResource);
-      when(mockNode2.getTotalResource()).thenReturn(fullResource2);
+      when(mockNode2.getCapacity()).thenReturn(fullResource2);
 
       verifyMaximumResourceCapability(configuredMaximumResource, scheduler);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java
new file mode 100644
index 0000000..b04b277
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerNode.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import 
org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPublisher;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for SchedulerNode.
+ */
+public class TestSchedulerNode {
+  private final Resource nodeCapacity = Resource.newInstance(1024*10, 4);
+
+  @Test
+  public void testAllocateAndReleaseGuaranteedContainer() {
+    SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+    Resource resource = Resource.newInstance(4096, 1);
+    RMContainer container = createRMContainer(0, resource,
+        ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+    ContainerId containerId = container.getContainerId();
+
+    // allocate a container on the node
+    schedulerNode.allocateContainer(container);
+
+    Assert.assertEquals("The container should have been allocated",
+        resource, schedulerNode.getAllocatedResource());
+    Assert.assertEquals("Incorrect remaining resource accounted.",
+        Resources.subtract(nodeCapacity, resource),
+        schedulerNode.getUnallocatedResource());
+    Assert.assertEquals("The container should have been allocated" +
+        " but not launched", resource,
+        schedulerNode.getResourceAllocatedPendingLaunch());
+    Assert.assertEquals("The container should have been allocated",
+        1, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertTrue(
+        schedulerNode.isValidGuaranteedContainer(containerId));
+
+    // launch the container on the node
+    schedulerNode.containerLaunched(containerId);
+
+    Assert.assertEquals("The container should have been launched",
+        Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+
+    // release the container
+    schedulerNode.releaseContainer(containerId, true);
+    Assert.assertEquals("The container should have been released",
+        0, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertEquals("The container should have been released",
+        Resources.none(), schedulerNode.getAllocatedResource());
+  }
+
+  @Test
+  public void testAllocateAndReleaseOpportunisticContainer() {
+    SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+    Resource resource = Resource.newInstance(4096, 1);
+    RMContainer container = createRMContainer(0, resource,
+        ExecutionType.OPPORTUNISTIC, schedulerNode.getRMNode());
+    ContainerId containerId = container.getContainerId();
+
+    // allocate a container on the node
+    schedulerNode.allocateContainer(container);
+
+    Assert.assertEquals("The container should have been allocated",
+        resource, schedulerNode.getOpportunisticResourceAllocated());
+    Assert.assertEquals("Incorrect remaining resource accounted.",
+        nodeCapacity, schedulerNode.getUnallocatedResource());
+    Assert.assertEquals("The container should have been allocated" +
+        " but not launched", resource,
+        schedulerNode.getResourceAllocatedPendingLaunch());
+    Assert.assertEquals("The container should have been allocated",
+        1, schedulerNode.getNumOpportunisticContainers());
+    Assert.assertTrue(
+        schedulerNode.isValidOpportunisticContainer(containerId));
+
+    // launch the container on the node
+    schedulerNode.containerLaunched(containerId);
+
+    Assert.assertEquals("The container should have been launched",
+        Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+
+    // release the container
+    schedulerNode.releaseContainer(containerId, true);
+    Assert.assertEquals("The container should have been released",
+        0, schedulerNode.getNumOpportunisticContainers());
+    Assert.assertEquals("The container should have been released",
+        Resources.none(), schedulerNode.getOpportunisticResourceAllocated());
+    Assert.assertFalse("The container should have been released",
+        schedulerNode.isValidOpportunisticContainer(containerId));
+  }
+
+  @Test
+  public void testAllocateAndReleaseContainers() {
+    SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+
+    Resource guaranteedResource = Resource.newInstance(4096, 1);
+    RMContainer guaranteedContainer =
+        createRMContainer(0, guaranteedResource,
+            ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+    ContainerId guaranteedContainerId = guaranteedContainer.getContainerId();
+
+    // allocate a guaranteed container on the node
+    schedulerNode.allocateContainer(guaranteedContainer);
+
+    Assert.assertEquals("The guaranteed container should have been allocated",
+        guaranteedResource, schedulerNode.getAllocatedResource());
+    Assert.assertEquals("Incorrect remaining resource accounted.",
+        Resources.subtract(nodeCapacity, guaranteedResource),
+        schedulerNode.getUnallocatedResource());
+    Assert.assertEquals("The guaranteed container should have been allocated" +
+            " but not launched", guaranteedResource,
+        schedulerNode.getResourceAllocatedPendingLaunch());
+    Assert.assertEquals("The container should have been allocated",
+        1, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertTrue(
+        schedulerNode.isValidGuaranteedContainer(guaranteedContainerId));
+
+    Resource opportunisticResource = Resource.newInstance(8192, 4);
+    RMContainer opportunisticContainer =
+        createRMContainer(1, opportunisticResource,
+            ExecutionType.OPPORTUNISTIC, schedulerNode.getRMNode());
+    ContainerId opportunisticContainerId =
+        opportunisticContainer.getContainerId();
+
+    // allocate an opportunistic container on the node
+    schedulerNode.allocateContainer(opportunisticContainer);
+
+    Assert.assertEquals("The opportunistic container should have been" +
+        " allocated", opportunisticResource,
+        schedulerNode.getOpportunisticResourceAllocated());
+    Assert.assertEquals("Incorrect remaining resource accounted.",
+        Resources.subtract(nodeCapacity, guaranteedResource),
+        schedulerNode.getUnallocatedResource());
+    Assert.assertEquals("The opportunistic container should also have been" +
+        " allocated but not launched",
+        Resources.add(guaranteedResource, opportunisticResource),
+        schedulerNode.getResourceAllocatedPendingLaunch());
+    Assert.assertEquals("The container should have been allocated",
+        1, schedulerNode.getNumOpportunisticContainers());
+    Assert.assertTrue(
+        schedulerNode.isValidOpportunisticContainer(opportunisticContainerId));
+
+    // launch both containers on the node
+    schedulerNode.containerLaunched(guaranteedContainerId);
+    schedulerNode.containerLaunched(opportunisticContainerId);
+
+    Assert.assertEquals("Both containers should have been launched",
+        Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+
+    // release both containers
+    schedulerNode.releaseContainer(guaranteedContainerId, true);
+    schedulerNode.releaseContainer(opportunisticContainerId, true);
+
+    Assert.assertEquals("The guaranteed container should have been released",
+        0, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertEquals("The opportunistic container should have been 
released",
+        0, schedulerNode.getNumOpportunisticContainers());
+    Assert.assertEquals("The guaranteed container should have been released",
+        Resources.none(), schedulerNode.getAllocatedResource());
+    Assert.assertEquals("The opportunistic container should have been 
released",
+        Resources.none(), schedulerNode.getOpportunisticResourceAllocated());
+    Assert.assertFalse("The guaranteed container should have been released",
+        schedulerNode.isValidGuaranteedContainer(guaranteedContainerId));
+    Assert.assertFalse("The opportunistic container should have been released",
+        schedulerNode.isValidOpportunisticContainer(opportunisticContainerId));
+  }
+
+  @Test
+  public void testReleaseLaunchedContainerNotAsNode() {
+    SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+    Resource resource = Resource.newInstance(4096, 1);
+    RMContainer container = createRMContainer(0, resource,
+        ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+    ContainerId containerId = container.getContainerId();
+
+    // allocate a container on the node
+    schedulerNode.allocateContainer(container);
+
+    Assert.assertEquals("The container should have been allocated",
+        resource, schedulerNode.getAllocatedResource());
+    Assert.assertEquals("Incorrect remaining resource accounted.",
+        Resources.subtract(nodeCapacity, resource),
+        schedulerNode.getUnallocatedResource());
+    Assert.assertEquals("The container should have been allocated" +
+        " but not launched", resource,
+        schedulerNode.getResourceAllocatedPendingLaunch());
+    Assert.assertEquals("The container should have been allocated",
+        1, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertTrue(
+        schedulerNode.isValidGuaranteedContainer(containerId));
+
+    // launch the container on the node
+    schedulerNode.containerLaunched(containerId);
+
+    Assert.assertEquals("The container should have been launched",
+        Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+
+    // release the container
+    schedulerNode.releaseContainer(containerId, false);
+    Assert.assertEquals("The container should not have been released",
+        1, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertEquals("The container should not have been released",
+        resource, schedulerNode.getAllocatedResource());
+    Assert.assertTrue("The container should not have been released",
+        schedulerNode.isValidGuaranteedContainer(containerId));
+  }
+
+  @Test
+  public void testReleaseUnlaunchedContainerAsNode() {
+    SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+    Resource resource = Resource.newInstance(4096, 1);
+    RMContainer container = createRMContainer(0, resource,
+        ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+    ContainerId containerId = container.getContainerId();
+
+    // allocate a container on the node
+    schedulerNode.allocateContainer(container);
+
+    Assert.assertEquals("The container should have been allocated",
+        resource, schedulerNode.getAllocatedResource());
+    Assert.assertEquals("Incorrect remaining resource accounted.",
+        Resources.subtract(nodeCapacity, resource),
+        schedulerNode.getUnallocatedResource());
+    Assert.assertEquals("The container should have been allocated" +
+        " but not launched",
+        resource, schedulerNode.getResourceAllocatedPendingLaunch());
+    Assert.assertEquals("The container should have been allocated",
+        1, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertTrue(
+        schedulerNode.isValidGuaranteedContainer(containerId));
+
+    // make sure the container is not launched yet
+    Assert.assertEquals("The container should not be launched already",
+        resource, schedulerNode.getResourceAllocatedPendingLaunch());
+
+    // release the container
+    schedulerNode.releaseContainer(containerId, true);
+    Assert.assertEquals("The container should have been released",
+        0, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertEquals("The container should have been released",
+        Resources.none(), schedulerNode.getAllocatedResource());
+    Assert.assertFalse("The container should have been released",
+        schedulerNode.isValidGuaranteedContainer(containerId));
+    Assert.assertEquals("The container should have been released",
+        Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+  }
+
+  @Test
+  public void testReleaseUnlaunchedContainerNotAsNode() {
+    SchedulerNode schedulerNode = createSchedulerNode(nodeCapacity);
+    Resource resource = Resource.newInstance(4096, 1);
+    RMContainer container = createRMContainer(0, resource,
+        ExecutionType.GUARANTEED, schedulerNode.getRMNode());
+    ContainerId containerId = container.getContainerId();
+
+    // allocate a container on the node
+    schedulerNode.allocateContainer(container);
+
+    Assert.assertEquals("The container should have been allocated",
+        resource, schedulerNode.getAllocatedResource());
+    Assert.assertEquals("Incorrect remaining resource accounted.",
+        Resources.subtract(nodeCapacity, resource),
+        schedulerNode.getUnallocatedResource());
+    Assert.assertEquals("The container should have been allocated" +
+        " but not launched", resource,
+        schedulerNode.getResourceAllocatedPendingLaunch());
+    Assert.assertEquals("The container should have been allocated",
+        1, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertTrue(
+        schedulerNode.isValidGuaranteedContainer(containerId));
+
+    // make sure the container is not launched yet
+    Assert.assertEquals("The container should not have been launched",
+        resource, schedulerNode.getResourceAllocatedPendingLaunch());
+
+    // release the container
+    schedulerNode.releaseContainer(containerId, false);
+    Assert.assertEquals("The container should have been released",
+        0, schedulerNode.getNumGuaranteedContainers());
+    Assert.assertEquals("The container should have been released",
+        Resources.none(), schedulerNode.getAllocatedResource());
+    Assert.assertFalse("The container should have been released",
+        schedulerNode.isValidGuaranteedContainer(containerId));
+    Assert.assertEquals("The container should have been released",
+        Resources.none(), schedulerNode.getResourceAllocatedPendingLaunch());
+  }
+
+  private SchedulerNode createSchedulerNode(Resource capacity) {
+    NodeId nodeId = NodeId.newInstance("localhost", 0);
+
+    RMNode rmNode = mock(RMNode.class);
+    when(rmNode.getNodeID()).thenReturn(nodeId);
+    when(rmNode.getHostName()).thenReturn(nodeId.getHost());
+    when(rmNode.getTotalCapability()).thenReturn(capacity);
+    when(rmNode.getRackName()).thenReturn("/default");
+    when(rmNode.getHttpAddress()).thenReturn(nodeId.getHost());
+    when(rmNode.getNodeAddress()).thenReturn(nodeId.getHost());
+
+    return new SchedulerNodeForTest(rmNode);
+  }
+
+  private static RMContainerImpl createRMContainer(long containerId,
+      Resource resource, ExecutionType executionType, RMNode node) {
+    Container container =
+        createContainer(containerId, resource, executionType, node);
+
+    Dispatcher dispatcher = new AsyncDispatcher();
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getDispatcher()).thenReturn(dispatcher);
+    when(rmContext.getSystemMetricsPublisher()).
+        thenReturn(new NoOpSystemMetricPublisher());
+    when(rmContext.getYarnConfiguration()).
+        thenReturn(new YarnConfiguration());
+    when(rmContext.getContainerAllocationExpirer()).
+        thenReturn(new ContainerAllocationExpirer(dispatcher));
+    when(rmContext.getRMApplicationHistoryWriter()).
+        thenReturn(new RMApplicationHistoryWriter());
+
+    return new RMContainerImpl(container, null,
+        container.getId().getApplicationAttemptId(),
+        node.getNodeID(), "test", rmContext);
+  }
+
+  private static Container createContainer(long containerId, Resource resource,
+      ExecutionType executionType, RMNode node) {
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.
+        newInstance(ApplicationId.newInstance(0, 0), 0);
+    ContainerId cId =
+        ContainerId.newContainerId(appAttemptId, containerId);
+    Container container = Container.newInstance(
+        cId, node.getNodeID(), node.getNodeAddress(), resource,
+        Priority.newInstance(0), null, executionType);
+    return container;
+  }
+
+
+  /**
+   * A test implementation of SchedulerNode for the purpose of testing
+   * SchedulerNode only. Resource reservation is scheduler-dependent,
+   * and therefore not covered here.
+   */
+  private static final class SchedulerNodeForTest extends SchedulerNode {
+    SchedulerNodeForTest(RMNode node) {
+      super(node, false);
+    }
+
+    @Override
+    public void reserveResource(SchedulerApplicationAttempt attempt,
+        SchedulerRequestKey schedulerKey, RMContainer container) {
+    }
+
+    @Override
+    public void unreserveResource(SchedulerApplicationAttempt attempt) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 1d2aadc..b7f7237 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -4112,7 +4112,7 @@ public class TestCapacityScheduler extends 
CapacitySchedulerTestBase {
     // Check total resource of scheduler node is also changed to 1 GB 1 core
     Resource totalResource =
         resourceManager.getResourceScheduler()
-            .getSchedulerNode(nm_0.getNodeId()).getTotalResource();
+            .getSchedulerNode(nm_0.getNodeId()).getCapacity();
     Assert.assertEquals("Total Resource Memory Size should be 1GB", 1 * GB,
         totalResource.getMemorySize());
     Assert.assertEquals("Total Resource Virtual Cores should be 1", 1,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index 18cd942..0fc8c52 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -223,8 +223,8 @@ public class TestCapacitySchedulerAsyncScheduling {
 
     // nm1 runs 1 container(app1-container_01/AM)
     // nm2 runs 1 container(app1-container_02)
-    Assert.assertEquals(1, sn1.getNumContainers());
-    Assert.assertEquals(1, sn2.getNumContainers());
+    Assert.assertEquals(1, sn1.getNumGuaranteedContainers());
+    Assert.assertEquals(1, sn2.getNumGuaranteedContainers());
 
     // kill app attempt1
     scheduler.handle(
@@ -319,8 +319,8 @@ public class TestCapacitySchedulerAsyncScheduling {
     // nm1 runs 3 containers(app1-container_01/AM, app1-container_02,
     //                       app2-container_01/AM)
     // nm2 runs 1 container(app1-container_03)
-    Assert.assertEquals(3, sn1.getNumContainers());
-    Assert.assertEquals(1, sn2.getNumContainers());
+    Assert.assertEquals(3, sn1.getNumGuaranteedContainers());
+    Assert.assertEquals(1, sn2.getNumGuaranteedContainers());
 
     // reserve 1 container(app1-container_04) for app1 on nm1
     ResourceRequest rr2 = ResourceRequest

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b237095d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 04bb791..6215ce8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -385,7 +385,7 @@ public class TestLeafQueue {
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
     assertEquals(
-        (int)(node_0.getTotalResource().getMemorySize() * a.getCapacity()) - 
(1*GB),
+        (int)(node_0.getCapacity().getMemorySize() * a.getCapacity()) - (1*GB),
         a.getMetrics().getAvailableMB());
   }
 
@@ -684,7 +684,7 @@ public class TestLeafQueue {
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, a.getMetrics().getReservedMB());
     assertEquals(0*GB, a.getMetrics().getAllocatedMB());
-    assertEquals((int)(a.getCapacity() * 
node_0.getTotalResource().getMemorySize()),
+    assertEquals((int)(a.getCapacity() * node_0.getCapacity().getMemorySize()),
         a.getMetrics().getAvailableMB());
   }
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to