YARN-1015. FS should watch node resource utilization and allocate opportunistic 
containers if appropriate.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bdf3e661
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bdf3e661
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bdf3e661

Branch: refs/heads/YARN-1011
Commit: bdf3e661b7558020c284442f24ade677406b19e8
Parents: eba8436
Author: Haibo Chen <haiboc...@apache.org>
Authored: Fri Nov 17 07:47:32 2017 -0800
Committer: Haibo Chen <haiboc...@apache.org>
Committed: Sun Jan 28 18:08:11 2018 -0800

----------------------------------------------------------------------
 .../sls/scheduler/FairSchedulerMetrics.java     |   4 +-
 .../hadoop/yarn/conf/YarnConfiguration.java     |   5 +
 .../src/main/resources/yarn-default.xml         |  13 +
 .../scheduler/SchedulerNode.java                |  48 ++
 .../scheduler/fair/FSAppAttempt.java            | 166 ++++---
 .../scheduler/fair/FSLeafQueue.java             |  51 +-
 .../scheduler/fair/FSParentQueue.java           |  36 +-
 .../resourcemanager/scheduler/fair/FSQueue.java |  39 +-
 .../scheduler/fair/FairScheduler.java           |  97 ++--
 .../fair/FairSchedulerConfiguration.java        |   5 +
 .../scheduler/fair/Schedulable.java             |  17 +-
 .../DominantResourceFairnessPolicy.java         |   8 +-
 .../fair/policies/FairSharePolicy.java          |   4 +-
 .../webapp/dao/FairSchedulerQueueInfo.java      |   2 +-
 .../yarn/server/resourcemanager/MockNodes.java  |  60 ++-
 .../TestWorkPreservingRMRestart.java            |   2 +-
 .../scheduler/fair/FakeSchedulable.java         |   9 +-
 .../scheduler/fair/TestAppRunnability.java      |   9 +-
 .../scheduler/fair/TestFSAppAttempt.java        |   4 +-
 .../scheduler/fair/TestFSLeafQueue.java         |   4 +-
 .../scheduler/fair/TestFSSchedulerNode.java     |   4 +-
 .../scheduler/fair/TestFairScheduler.java       | 468 +++++++++++++++++--
 .../scheduler/fair/TestSchedulingPolicy.java    |  10 +-
 23 files changed, 861 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
index a5aee74..1f4e7c7 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/FairSchedulerMetrics.java
@@ -75,7 +75,7 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
       case DEMAND:
         return schedulable.getDemand().getMemorySize();
       case USAGE:
-        return schedulable.getResourceUsage().getMemorySize();
+        return schedulable.getGuaranteedResourceUsage().getMemorySize();
       case MINSHARE:
         return schedulable.getMinShare().getMemorySize();
       case MAXSHARE:
@@ -96,7 +96,7 @@ public class FairSchedulerMetrics extends SchedulerMetrics {
       case DEMAND:
         return schedulable.getDemand().getVirtualCores();
       case USAGE:
-        return schedulable.getResourceUsage().getVirtualCores();
+        return schedulable.getGuaranteedResourceUsage().getVirtualCores();
       case MINSHARE:
         return schedulable.getMinShare().getVirtualCores();
       case MAXSHARE:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index ed8404c..15f6480 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -268,6 +268,11 @@ public class YarnConfiguration extends Configuration {
   /** UserGroupMappingPlacementRule configuration string. */
   public static final String USER_GROUP_PLACEMENT_RULE = "user-group";
 
+  public static final String RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED =
+      RM_PREFIX + "scheduler.oversubscription.enabled";
+  public static final boolean DEFAULT_RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED
+      = false;
+
   /** Enable Resource Manager webapp ui actions */
   public static final String RM_WEBAPP_UI_ACTIONS_ENABLED =
     RM_PREFIX + "webapp.ui-actions.enabled";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index da31694..7962dae 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -389,6 +389,19 @@
   </property>
 
   <property>
+    <description>
+      If set to true, the scheduler will try to over-allocate resources on the
+      nodes that allow overallocation. To enable overallocatin on a node, set
+      {code}yarn.nodemanager.overallocation.memory-utilization-threshold{code}
+      and
+      {code}yarn.nodemanager.overallocation.cpu-utilization-threshold{code}
+      to a number in the range (0.0, 1.0)
+    </description>
+    <name>yarn.resourcemanager.scheduler.oversubscription.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
     <description>Enable RM to recover state after starting. If true, then
       yarn.resourcemanager.store.class must be specified. </description>
     <name>yarn.resourcemanager.recovery.enabled</name>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 349944e..e942981 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -59,6 +61,10 @@ public abstract class SchedulerNode {
   private static final Log LOG = LogFactory.getLog(SchedulerNode.class);
 
   private Resource capacity;
+  // The resource available within the node's capacity that can be given out
+  // to run GUARANTEED containers, including reserved, preempted and any
+  // remaining free resources. Resources allocated to OPPORTUNISTIC containers
+  // are tracked in allocatedResourceOpportunistic
   private Resource unallocatedResource = Resource.newInstance(0, 0);
 
   private RMContainer reservedContainer;
@@ -596,6 +602,48 @@ public abstract class SchedulerNode {
     return this.nodeUtilization;
   }
 
+  /**
+   * Get the amount of resources that can be allocated to opportunistic
+   * containers in the case of overallocation. It is calculated as
+   * node capacity - (node utilization + resources of allocated-yet-not-started
+   * containers).
+   * @return the amount of resources that are available to be allocated to
+   *         opportunistic containers
+   */
+  public synchronized Resource allowedResourceForOverAllocation() {
+    OverAllocationInfo overAllocationInfo = rmNode.getOverAllocationInfo();
+    if (overAllocationInfo == null) {
+      LOG.debug("Overallocation is disabled on node: " + rmNode.getHostName());
+      return Resources.none();
+    }
+
+    ResourceUtilization projectedNodeUtilization = ResourceUtilization.
+        newInstance(getNodeUtilization());
+    // account for resources allocated in this heartbeat
+    projectedNodeUtilization.addTo(
+        (int) (resourceAllocatedPendingLaunch.getMemorySize()), 0,
+        (float) resourceAllocatedPendingLaunch.getVirtualCores() /
+            capacity.getVirtualCores());
+
+    ResourceThresholds thresholds =
+        overAllocationInfo.getOverAllocationThresholds();
+    Resource overAllocationThreshold = Resources.createResource(
+        (long) (capacity.getMemorySize() * thresholds.getMemoryThreshold()),
+        (int) (capacity.getVirtualCores() * thresholds.getCpuThreshold()));
+    long allowedMemory = Math.max(0, overAllocationThreshold.getMemorySize()
+        - projectedNodeUtilization.getPhysicalMemory());
+    int allowedCpu = Math.max(0, (int)
+        (overAllocationThreshold.getVirtualCores() -
+            projectedNodeUtilization.getCPU() * capacity.getVirtualCores()));
+
+    Resource resourceAllowedForOpportunisticContainers =
+        Resources.createResource(allowedMemory, allowedCpu);
+
+    // TODO cap the resources allocated to OPPORTUNISTIC containers on a node
+    // in terms of its capacity. i.e. return min(max_ratio * capacity, allowed)
+    return resourceAllowedForOpportunisticContainers;
+  }
+
 
   private static class ContainerInfo {
     private final RMContainer container;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 0305702..970ffcb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -36,6 +36,7 @@ import 
org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -169,7 +170,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
           rmContainer.getNodeLabelExpression(),
           getUser(), 1, containerResource);
       this.attemptResourceUsage.decUsed(containerResource);
-      getQueue().decUsedResource(containerResource);
+      getQueue().decUsedGuaranteedResource(containerResource);
 
       // Clear resource utilization metrics cache.
       lastMemoryAggregateAllocationUpdateTime = -1;
@@ -178,30 +179,35 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
     }
   }
 
-  private void unreserveInternal(
+  private boolean unreserveInternal(
       SchedulerRequestKey schedulerKey, FSSchedulerNode node) {
     try {
       writeLock.lock();
       Map<NodeId, RMContainer> reservedContainers = 
this.reservedContainers.get(
           schedulerKey);
-      RMContainer reservedContainer = reservedContainers.remove(
-          node.getNodeID());
-      if (reservedContainers.isEmpty()) {
-        this.reservedContainers.remove(schedulerKey);
-      }
+      boolean unreserved = false;
+      if (reservedContainers != null) {
+        RMContainer reservedContainer = reservedContainers.remove(
+            node.getNodeID());
+        if (reservedContainers.isEmpty()) {
+          this.reservedContainers.remove(schedulerKey);
+        }
 
-      // Reset the re-reservation count
-      resetReReservations(schedulerKey);
+        // Reset the re-reservation count
+        resetReReservations(schedulerKey);
 
-      Resource resource = reservedContainer.getContainer().getResource();
-      this.attemptResourceUsage.decReserved(resource);
+        Resource resource = reservedContainer.getContainer().getResource();
+        this.attemptResourceUsage.decReserved(resource);
+        unreserved = true;
 
-      LOG.info(
-          "Application " + getApplicationId() + " unreserved " + " on node "
-              + node + ", currently has " + reservedContainers.size()
-              + " at priority " + schedulerKey.getPriority()
-              + "; currentReservation " + this.attemptResourceUsage
-              .getReserved());
+        LOG.info(
+            "Application " + getApplicationId() + " unreserved " + " on node "
+                + node + ", currently has " + reservedContainers.size()
+                + " at priority " + schedulerKey.getPriority()
+                + "; currentReservation " + this.attemptResourceUsage
+                .getReserved());
+      }
+      return unreserved;
     } finally {
       writeLock.unlock();
     }
@@ -229,7 +235,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
     SchedulingPolicy policy = fsQueue.getPolicy();
 
     Resource queueFairShare = fsQueue.getFairShare();
-    Resource queueUsage = fsQueue.getResourceUsage();
+    Resource queueUsage = fsQueue.getGuaranteedResourceUsage();
     Resource clusterResource = this.scheduler.getClusterResource();
     Resource clusterUsage = this.scheduler.getRootQueueMetrics()
         .getAllocatedResources();
@@ -420,7 +426,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
 
   public RMContainer allocate(NodeType type, FSSchedulerNode node,
       SchedulerRequestKey schedulerKey, PendingAsk pendingAsk,
-      Container reservedContainer) {
+      Container reservedContainer, boolean opportunistic) {
     RMContainer rmContainer;
     Container container;
 
@@ -445,9 +451,11 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
       }
 
       container = reservedContainer;
+      ExecutionType executionType = opportunistic ?
+          ExecutionType.OPPORTUNISTIC : ExecutionType.GUARANTEED;
       if (container == null) {
         container = createContainer(node, 
pendingAsk.getPerAllocationResource(),
-            schedulerKey);
+            schedulerKey, executionType);
       }
 
       // Create RMContainer
@@ -463,8 +471,12 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
       // Update consumption and track allocations
       ContainerRequest containerRequest = appSchedulingInfo.allocate(
           type, node, schedulerKey, container);
-      this.attemptResourceUsage.incUsed(container.getResource());
-      getQueue().incUsedResource(container.getResource());
+      if (executionType.equals(ExecutionType.GUARANTEED)) {
+        this.attemptResourceUsage.incUsed(container.getResource());
+        getQueue().incUsedGuaranteedResource(container.getResource());
+      } else {
+        
this.attemptOpportunisticResourceUsage.incUsed(container.getResource());
+      }
 
       // Update resource requests related to "request" and store in RMContainer
       ((RMContainerImpl) rmContainer).setContainerRequest(containerRequest);
@@ -621,7 +633,8 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
 
   private Resource getUsageAfterPreemptingContainer(Resource 
containerResources,
           Resource alreadyConsideringForPreemption) {
-    Resource usageAfterPreemption = Resources.clone(getResourceUsage());
+    Resource usageAfterPreemption =
+        Resources.clone(getGuaranteedResourceUsage());
 
     // Subtract resources of containers already queued for preemption
     synchronized (preemptionVariablesLock) {
@@ -648,7 +661,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
    * @return Container
    */
   private Container createContainer(FSSchedulerNode node, Resource capability,
-      SchedulerRequestKey schedulerKey) {
+      SchedulerRequestKey schedulerKey, ExecutionType executionType) {
 
     NodeId nodeId = node.getRMNode().getNodeID();
     ContainerId containerId = BuilderUtils.newContainerId(
@@ -658,7 +671,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
     return BuilderUtils.newContainer(containerId, nodeId,
         node.getRMNode().getHttpAddress(), capability,
         schedulerKey.getPriority(), null,
-        schedulerKey.getAllocationRequestId());
+        executionType, schedulerKey.getAllocationRequestId());
   }
 
   @Override
@@ -670,7 +683,8 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
       super.recoverContainer(node, rmContainer);
 
       if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) {
-        getQueue().incUsedResource(rmContainer.getContainer().getResource());
+        getQueue().incUsedGuaranteedResource(
+            rmContainer.getContainer().getResource());
       }
 
       // If not running unmanaged, the first container we recover is always
@@ -708,7 +722,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
       if (reservedContainer == null) {
         reservedContainer =
             createContainer(node, perAllocationResource,
-              schedulerKey);
+              schedulerKey, ExecutionType.GUARANTEED);
         getMetrics().reserveResource(node.getPartition(), getUser(),
             reservedContainer.getResource());
         RMContainer rmContainer =
@@ -765,11 +779,12 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
   public void unreserve(SchedulerRequestKey schedulerKey,
       FSSchedulerNode node) {
     RMContainer rmContainer = node.getReservedContainer();
-    unreserveInternal(schedulerKey, node);
-    node.unreserveResource(this);
-    clearReservation(node);
-    getMetrics().unreserveResource(node.getPartition(),
-        getUser(), rmContainer.getContainer().getResource());
+    if (unreserveInternal(schedulerKey, node)) {
+      node.unreserveResource(this);
+      clearReservation(node);
+      getMetrics().unreserveResource(node.getPartition(),
+          getUser(), rmContainer.getContainer().getResource());
+    }
   }
 
   private void setReservation(SchedulerNode node) {
@@ -843,13 +858,15 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
    */
   private Resource assignContainer(
       FSSchedulerNode node, PendingAsk pendingAsk, NodeType type,
-      boolean reserved, SchedulerRequestKey schedulerKey) {
+      boolean reserved, boolean opportunistic,
+      SchedulerRequestKey schedulerKey) {
 
     // How much does this request need?
     Resource capability = pendingAsk.getPerAllocationResource();
 
     // How much does the node have?
-    Resource available = node.getUnallocatedResource();
+    Resource available = opportunistic ?
+        node.allowedResourceForOverAllocation() : 
node.getUnallocatedResource();
 
     Container reservedContainer = null;
     if (reserved) {
@@ -861,33 +878,39 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
       // Inform the application of the new container for this request
       RMContainer allocatedContainer =
           allocate(type, node, schedulerKey, pendingAsk,
-              reservedContainer);
-      if (allocatedContainer == null) {
-        // Did the application need this resource?
-        if (reserved) {
-          unreserve(schedulerKey, node);
-        }
-        return Resources.none();
-      }
+              reservedContainer, opportunistic);
 
-      // If we had previously made a reservation, delete it
+      // delete the previous reservation, if any
       if (reserved) {
         unreserve(schedulerKey, node);
       }
 
-      // Inform the node
-      node.allocateContainer(allocatedContainer);
+      if (allocatedContainer != null) {
+        if (opportunistic) {
+          // if an OPPORTUNISTIC container is allocated, we need to
+          // unreserve anything that we may have reserved in our
+          // previous attempt to assign GUARANTEED containers for this
+          // scheduling request.
+          unreserve(schedulerKey, node);
+        }
+
 
-      // If not running unmanaged, the first container we allocate is always
-      // the AM. Set the amResource for this app and update the leaf queue's AM
-      // usage
-      if (!isAmRunning() && !getUnmanagedAM()) {
-        setAMResource(capability);
-        getQueue().addAMResourceUsage(capability);
-        setAmRunning(true);
-      }
+        // Inform the node
+        node.allocateContainer(allocatedContainer);
+
+        // If not running unmanaged, the first container we allocate
+        // is always the AM. Set amResource for this app and update
+        // the leaf queue's AM usage
+        if (!isAmRunning() && !getUnmanagedAM()) {
+          setAMResource(capability);
+          getQueue().addAMResourceUsage(capability);
+          setAmRunning(true);
+        }
 
-      return capability;
+        return capability;
+      } else {
+        return Resources.none();
+      }
     }
 
     if (LOG.isDebugEnabled()) {
@@ -898,7 +921,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
     // The desired container won't fit here, so reserve
     // Reserve only, if app does not wait for preempted resources on the node,
     // otherwise we may end up with duplicate reservations
-    if (isReservable(capability) &&
+    if (isReservable(capability) && !opportunistic &&
         !node.isPreemptedForApp(this) &&
         reserve(pendingAsk.getPerAllocationResource(), node, reservedContainer,
             type, schedulerKey)) {
@@ -944,7 +967,8 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
   }
 
   @SuppressWarnings("deprecation")
-  private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
+  private Resource assignContainer(FSSchedulerNode node, boolean opportunistic,
+                                   boolean reserved) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("Node offered to app: " + getName() + " reserved: " + 
reserved);
     }
@@ -1007,7 +1031,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
                 + ", app attempt id: " + this.attemptId);
           }
           return assignContainer(node, nodeLocalPendingAsk, 
NodeType.NODE_LOCAL,
-              reserved, schedulerKey);
+              reserved, opportunistic, schedulerKey);
         }
 
         if (!appSchedulingInfo.canDelayTo(schedulerKey, node.getRackName())) {
@@ -1024,7 +1048,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
                 + ", app attempt id: " + this.attemptId);
           }
           return assignContainer(node, rackLocalPendingAsk, 
NodeType.RACK_LOCAL,
-              reserved, schedulerKey);
+              reserved, opportunistic, schedulerKey);
         }
 
         PendingAsk offswitchAsk = getPendingAsk(schedulerKey,
@@ -1044,7 +1068,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
                   + ", app attempt id: " + this.attemptId);
             }
             return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH,
-                reserved, schedulerKey);
+                reserved, opportunistic, schedulerKey);
           }
         }
 
@@ -1145,7 +1169,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
     // there's only one container size per priority.
     if (Resources.fitsIn(node.getReservedContainer().getReservedResource(),
         node.getUnallocatedResource())) {
-      assignContainer(node, true);
+      assignContainer(node, false, true);
     }
     return true;
   }
@@ -1161,7 +1185,8 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
     Resource fairDemand = Resources.componentwiseMin(threshold, demand);
 
     // Check if the queue is starved for fairshare
-    boolean starved = isUsageBelowShare(getResourceUsage(), fairDemand);
+    boolean starved =
+        isUsageBelowShare(getGuaranteedResourceUsage(), fairDemand);
 
     if (!starved) {
       lastTimeAtFairShare = now;
@@ -1173,8 +1198,8 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
       fairshareStarvation = Resources.none();
     } else {
       // The app has been starved for longer than preemption-timeout.
-      fairshareStarvation =
-          Resources.subtractFromNonNegative(fairDemand, getResourceUsage());
+      fairshareStarvation = Resources.subtractFromNonNegative(fairDemand,
+          getGuaranteedResourceUsage());
     }
     return fairshareStarvation;
   }
@@ -1193,7 +1218,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
    * @return true if the app is starved for fairshare, false otherwise
    */
   boolean isStarvedForFairShare() {
-    return isUsageBelowShare(getResourceUsage(), getFairShare());
+    return isUsageBelowShare(getGuaranteedResourceUsage(), getFairShare());
   }
 
   /**
@@ -1294,7 +1319,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
    * Get the current app's unsatisfied demand.
    */
   Resource getPendingDemand() {
-    return Resources.subtract(demand, getResourceUsage());
+    return Resources.subtract(demand, getGuaranteedResourceUsage());
   }
 
   @Override
@@ -1313,11 +1338,16 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
   }
 
   @Override
-  public Resource getResourceUsage() {
+  public Resource getGuaranteedResourceUsage() {
     return getCurrentConsumption();
   }
 
   @Override
+  public Resource getOpportunisticResourceUsage() {
+    return attemptOpportunisticResourceUsage.getUsed();
+  }
+
+  @Override
   public float getWeight() {
     float weight = 1.0F;
 
@@ -1366,7 +1396,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
   }
 
   @Override
-  public Resource assignContainer(FSSchedulerNode node) {
+  public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) 
{
     if (isOverAMShareLimit()) {
       PendingAsk amAsk = appSchedulingInfo.getNextPendingAsk();
       updateAMDiagnosticMsg(amAsk.getPerAllocationResource(),
@@ -1378,7 +1408,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
       }
       return Resources.none();
     }
-    return assignContainer(node, false);
+    return assignContainer(node, opportunistic, false);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 49d2166..2a90228 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -89,7 +89,7 @@ public class FSLeafQueue extends FSQueue {
       } else {
         nonRunnableApps.add(app);
       }
-      incUsedResource(app.getResourceUsage());
+      incUsedGuaranteedResource(app.getGuaranteedResourceUsage());
     } finally {
       writeLock.unlock();
     }
@@ -124,7 +124,7 @@ public class FSLeafQueue extends FSQueue {
       getMetrics().setAMResourceUsage(amResourceUsage);
     }
 
-    decUsedResource(app.getResourceUsage());
+    decUsedGuaranteedResource(app.getGuaranteedResourceUsage());
     return runnable;
   }
 
@@ -294,6 +294,42 @@ public class FSLeafQueue extends FSQueue {
     return demand;
   }
 
+  @Override
+  public Resource getGuaranteedResourceUsage() {
+    Resource guaranteedResource = Resources.createResource(0);
+    readLock.lock();
+    try {
+      for (FSAppAttempt app : runnableApps) {
+        Resources.addTo(guaranteedResource, app.getGuaranteedResourceUsage());
+      }
+      for (FSAppAttempt app : nonRunnableApps) {
+        Resources.addTo(guaranteedResource, app.getGuaranteedResourceUsage());
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return guaranteedResource;
+  }
+
+  @Override
+  public Resource getOpportunisticResourceUsage() {
+    Resource opportunisticResource = Resource.newInstance(0, 0);
+    readLock.lock();
+    try {
+      for (FSAppAttempt app : runnableApps) {
+        Resources.addTo(opportunisticResource,
+            app.getOpportunisticResourceUsage());
+      }
+      for (FSAppAttempt app : nonRunnableApps) {
+        Resources.addTo(opportunisticResource,
+            app.getOpportunisticResourceUsage());
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return opportunisticResource;
+  }
+
   Resource getAmResourceUsage() {
     return amResourceUsage;
   }
@@ -327,14 +363,14 @@ public class FSLeafQueue extends FSQueue {
   }
 
   @Override
-  public Resource assignContainer(FSSchedulerNode node) {
+  public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) 
{
     Resource assigned = none();
     if (LOG.isDebugEnabled()) {
       LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
           getName() + " fairShare: " + getFairShare());
     }
 
-    if (!assignContainerPreCheck(node)) {
+    if (!assignContainerPreCheck(node, opportunistic)) {
       return assigned;
     }
 
@@ -342,7 +378,7 @@ public class FSLeafQueue extends FSQueue {
       if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) {
         continue;
       }
-      assigned = sched.assignContainer(node);
+      assigned = sched.assignContainer(node, opportunistic);
       if (!assigned.equals(none())) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Assigned container in queue:" + getName() + " " +
@@ -540,7 +576,8 @@ public class FSLeafQueue extends FSQueue {
     Resource desiredShare = Resources.min(policy.getResourceCalculator(),
         scheduler.getClusterResource(), getMinShare(), getDemand());
 
-    Resource starvation = Resources.subtract(desiredShare, getResourceUsage());
+    Resource starvation =
+        Resources.subtract(desiredShare, getGuaranteedResourceUsage());
     boolean starved = !Resources.isNone(starvation);
 
     long now = scheduler.getClock().getTime();
@@ -598,7 +635,7 @@ public class FSLeafQueue extends FSQueue {
         ", SteadyFairShare: " + getSteadyFairShare() +
         ", MaxShare: " + getMaxShare() +
         ", MinShare: " + minShare +
-        ", ResourceUsage: " + getResourceUsage() +
+        ", ResourceUsage: " + getGuaranteedResourceUsage() +
         ", Demand: " + getDemand() +
         ", Runnable: " + getNumRunnableApps() +
         ", NumPendingApps: " + getNumPendingApps() +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index a8e53fc..3e63e97 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -119,6 +119,34 @@ public class FSParentQueue extends FSQueue {
   }
 
   @Override
+  public Resource getGuaranteedResourceUsage() {
+    Resource guaranteedResource = Resources.createResource(0);
+    readLock.lock();
+    try {
+      for (FSQueue child : childQueues) {
+        Resources.addTo(guaranteedResource, 
child.getGuaranteedResourceUsage());
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return guaranteedResource;
+  }
+
+  @Override
+  public Resource getOpportunisticResourceUsage() {
+    Resource opportunisticResource = Resource.newInstance(0, 0);
+    readLock.lock();
+    try {
+      for (FSQueue child : childQueues) {
+        Resources.addTo(opportunisticResource,
+            child.getOpportunisticResourceUsage());
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return opportunisticResource;
+  }
+
   public void updateDemand() {
     // Compute demand by iterating through apps in the queue
     // Limit demand to maxResources
@@ -177,11 +205,11 @@ public class FSParentQueue extends FSQueue {
   }
 
   @Override
-  public Resource assignContainer(FSSchedulerNode node) {
+  public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) 
{
     Resource assigned = Resources.none();
 
     // If this queue is over its limit, reject
-    if (!assignContainerPreCheck(node)) {
+    if (!assignContainerPreCheck(node, opportunistic)) {
       return assigned;
     }
 
@@ -204,7 +232,7 @@ public class FSParentQueue extends FSQueue {
     readLock.lock();
     try {
       for (FSQueue child : childQueues) {
-        assigned = child.assignContainer(node);
+        assigned = child.assignContainer(node, opportunistic);
         if (!Resources.equals(assigned, Resources.none())) {
           break;
         }
@@ -288,7 +316,7 @@ public class FSParentQueue extends FSQueue {
         ", SteadyFairShare: " + getSteadyFairShare() +
         ", MaxShare: " + getMaxShare() +
         ", MinShare: " + minShare +
-        ", ResourceUsage: " + getResourceUsage() +
+        ", Guaranteed ResourceUsage: " + getGuaranteedResourceUsage() +
         ", Demand: " + getDemand() +
         ", MaxAMShare: " + maxAMShare +
         ", Runnable: " + getNumRunnableApps() +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index 4babfd5..f293eb1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -57,7 +57,7 @@ public abstract class FSQueue implements Queue, Schedulable {
   private Resource fairShare = Resources.createResource(0, 0);
   private Resource steadyFairShare = Resources.createResource(0, 0);
   private Resource reservedResource = Resources.createResource(0, 0);
-  private final Resource resourceUsage = Resource.newInstance(0, 0);
+  private final Resource guaranteedResourceUsage = Resource.newInstance(0, 0);
   private final String name;
   protected final FairScheduler scheduler;
   private final YarnAuthorizationProvider authorizer;
@@ -234,7 +234,8 @@ public abstract class FSQueue implements Queue, Schedulable 
{
     if (getFairShare().getMemorySize() == 0) {
       queueInfo.setCurrentCapacity(0.0f);
     } else {
-      queueInfo.setCurrentCapacity((float) getResourceUsage().getMemorySize() /
+      queueInfo.setCurrentCapacity(
+          (float) getGuaranteedResourceUsage().getMemorySize() /
           getFairShare().getMemorySize());
     }
 
@@ -418,14 +419,17 @@ public abstract class FSQueue implements Queue, 
Schedulable {
    * 
    * @return true if check passes (can assign) or false otherwise
    */
-  boolean assignContainerPreCheck(FSSchedulerNode node) {
-    if (node.getReservedContainer() != null) {
+  boolean assignContainerPreCheck(FSSchedulerNode node, boolean opportunistic) 
{
+    if (opportunistic) {
+      // always pre-approve OPPORTUNISTIC containers to be assigned on the node
+      return true;
+    } else if (node.getReservedContainer() != null) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Assigning container failed on node '" + node.getNodeName()
             + " because it has reserved containers.");
       }
       return false;
-    } else if (!Resources.fitsIn(getResourceUsage(), getMaxShare())) {
+    } else if (!Resources.fitsIn(getGuaranteedResourceUsage(), getMaxShare())) 
{
       if (LOG.isDebugEnabled()) {
         LOG.debug("Assigning container failed on node '" + node.getNodeName()
             + " because queue resource usage is larger than MaxShare: "
@@ -448,7 +452,8 @@ public abstract class FSQueue implements Queue, Schedulable 
{
   @Override
   public String toString() {
     return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
-        getName(), getDemand(), getResourceUsage(), fairShare, getWeight());
+        getName(), getDemand(), getGuaranteedResourceUsage(), fairShare,
+        getWeight());
   }
   
   @Override
@@ -480,8 +485,8 @@ public abstract class FSQueue implements Queue, Schedulable 
{
   }
 
   @Override
-  public Resource getResourceUsage() {
-    return resourceUsage;
+  public Resource getGuaranteedResourceUsage() {
+    return guaranteedResourceUsage;
   }
 
   /**
@@ -489,11 +494,11 @@ public abstract class FSQueue implements Queue, 
Schedulable {
    *
    * @param res the resource to increase
    */
-  protected void incUsedResource(Resource res) {
-    synchronized (resourceUsage) {
-      Resources.addTo(resourceUsage, res);
+  protected void incUsedGuaranteedResource(Resource res) {
+    synchronized (guaranteedResourceUsage) {
+      Resources.addTo(guaranteedResourceUsage, res);
       if (parent != null) {
-        parent.incUsedResource(res);
+        parent.incUsedGuaranteedResource(res);
       }
     }
   }
@@ -503,11 +508,11 @@ public abstract class FSQueue implements Queue, 
Schedulable {
    *
    * @param res the resource to decrease
    */
-  protected void decUsedResource(Resource res) {
-    synchronized (resourceUsage) {
-      Resources.subtractFrom(resourceUsage, res);
+  protected void decUsedGuaranteedResource(Resource res) {
+    synchronized (guaranteedResourceUsage) {
+      Resources.subtractFrom(guaranteedResourceUsage, res);
       if (parent != null) {
-        parent.decUsedResource(res);
+        parent.decUsedGuaranteedResource(res);
       }
     }
   }
@@ -520,7 +525,7 @@ public abstract class FSQueue implements Queue, Schedulable 
{
 
   boolean fitsInMaxShare(Resource additionalResource) {
     Resource usagePlusAddition =
-        Resources.add(getResourceUsage(), additionalResource);
+        Resources.add(getGuaranteedResourceUsage(), additionalResource);
 
     if (!Resources.fitsIn(usagePlusAddition, getMaxShare())) {
       if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index e2a62ec..a8e348c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -165,7 +165,6 @@ public class FairScheduler extends
 
   private float reservableNodesRatio; // percentage of available nodes
                                       // an app can be reserved on
-
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   // Continuous Scheduling enabled or not
   @Deprecated
@@ -188,6 +187,8 @@ public class FairScheduler extends
   boolean maxAssignDynamic;
   protected int maxAssign; // Max containers to assign per heartbeat
 
+  protected boolean oversubscriptionEnabled;
+
   @VisibleForTesting
   final MaxRunningAppsEnforcer maxRunningEnforcer;
 
@@ -1000,13 +1001,13 @@ public class FairScheduler extends
    * resources for preempted containers.
    * @param node Node to check
    */
-  static void assignPreemptedContainers(FSSchedulerNode node) {
+  static void attemptToAssignPreemptedResources(FSSchedulerNode node) {
     for (Entry<FSAppAttempt, Resource> entry :
         node.getPreemptionList().entrySet()) {
       FSAppAttempt app = entry.getKey();
       Resource preemptionPending = Resources.clone(entry.getValue());
       while (!app.isStopped() && !Resources.isNone(preemptionPending)) {
-        Resource assigned = app.assignContainer(node);
+        Resource assigned = app.assignContainer(node, false);
         if (Resources.isNone(assigned) ||
             assigned.equals(FairScheduler.CONTAINER_RESERVED)) {
           // Fail to assign, let's not try further
@@ -1038,44 +1039,82 @@ public class FairScheduler extends
       // Assign new containers...
       // 1. Ensure containers are assigned to the apps that preempted
       // 2. Check for reserved applications
-      // 3. Schedule if there are no reservations
+      // 3. Schedule GUARANTEED containers if there are no reservations
+      // 4. Schedule OPPORTUNISTIC containers if possible
 
       // Apps may wait for preempted containers
       // We have to satisfy these first to avoid cases, when we preempt
       // a container for A from B and C gets the preempted containers,
       // when C does not qualify for preemption itself.
-      assignPreemptedContainers(node);
-      FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
-      boolean validReservation = false;
-      if (reservedAppSchedulable != null) {
-        validReservation = 
reservedAppSchedulable.assignReservedContainer(node);
-      }
+      attemptToAssignPreemptedResources(node);
+
+      boolean validReservation =  attemptToAssignReservedResources(node);
       if (!validReservation) {
-        // No reservation, schedule at queue which is farthest below fair share
-        int assignedContainers = 0;
-        Resource assignedResource = Resources.clone(Resources.none());
-        Resource maxResourcesToAssign = Resources.multiply(
-            node.getUnallocatedResource(), 0.5f);
-        while (node.getReservedContainer() == null) {
-          Resource assignment = queueMgr.getRootQueue().assignContainer(node);
-          if (assignment.equals(Resources.none())) {
-            break;
-          }
+        // only attempt to assign GUARANTEED containers if there is no
+        // reservation on the node because
+        attemptToAssignResourcesAsGuaranteedContainers(node);
+      }
 
-          assignedContainers++;
-          Resources.addTo(assignedResource, assignment);
-          if (!shouldContinueAssigning(assignedContainers, 
maxResourcesToAssign,
-              assignedResource)) {
-            break;
-          }
-        }
+      // attempt to assign OPPORTUNISTIC containers regardless of whether
+      // we have made a reservation or assigned a GUARANTEED container
+      if (oversubscriptionEnabled) {
+        attemptToAssignResourcesAsOpportunisticContainers(node);
       }
+
       updateRootQueueMetrics();
     } finally {
       writeLock.unlock();
     }
   }
 
+  /**
+   * Assign the reserved resource to the application that have reserved it.
+   */
+  private boolean attemptToAssignReservedResources(FSSchedulerNode node) {
+    boolean success = false;
+    FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
+    if (reservedAppSchedulable != null) {
+      success = reservedAppSchedulable.assignReservedContainer(node);
+    }
+    return success;
+  }
+
+  private void attemptToAssignResourcesAsGuaranteedContainers(
+      FSSchedulerNode node) {
+    // No reservation, schedule at queue which is farthest below fair share
+    int assignedContainers = 0;
+    Resource assignedResource = Resources.clone(Resources.none());
+    Resource maxResourcesToAssign = Resources.multiply(
+        node.getUnallocatedResource(), 0.5f);
+    while (node.getReservedContainer() == null) {
+      Resource assignment =
+          queueMgr.getRootQueue().assignContainer(node, false);
+      if (assignment.equals(Resources.none())) {
+        break;
+      }
+      assignedContainers++;
+      Resources.addTo(assignedResource, assignment);
+
+      if (!shouldContinueAssigning(assignedContainers, maxResourcesToAssign,
+          assignedResource)) {
+        break;
+      }
+    }
+  }
+
+  /**
+   * Try to assign OPPORTUNISTIC containers as long as there is resources
+   * to.
+   * @param node the node to assign OPPORTUNISTIC containers on
+   */
+  private void attemptToAssignResourcesAsOpportunisticContainers(
+      FSSchedulerNode node) {
+    while (!Resources.none().equals(
+        queueMgr.getRootQueue().assignContainer(node, true))) {
+      // nothing to do here
+    }
+  }
+
   public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
     return super.getApplicationAttempt(appAttemptId);
   }
@@ -1316,6 +1355,7 @@ public class FairScheduler extends
       sizeBasedWeight = this.conf.getSizeBasedWeight();
       usePortForNodeName = this.conf.getUsePortForNodeName();
       reservableNodesRatio = this.conf.getReservableNodes();
+      oversubscriptionEnabled = this.conf.isOversubscriptionEnabled();
 
       updateInterval = this.conf.getUpdateInterval();
       if (updateInterval < 0) {
@@ -1690,7 +1730,8 @@ public class FairScheduler extends
       }
       
       // maxShare
-      if (!Resources.fitsIn(Resources.add(cur.getResourceUsage(), consumption),
+      if (!Resources.fitsIn(
+          Resources.add(cur.getGuaranteedResourceUsage(), consumption),
           cur.getMaxShare())) {
         throw new YarnException("Moving app attempt " + appAttId + " to queue "
             + queueName + " would violate queue maxShare constraints on"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
index b50e4bb..30bbb78 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
@@ -344,6 +344,11 @@ public class FairSchedulerConfiguration extends 
Configuration {
         DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
   }
 
+  public boolean isOversubscriptionEnabled() {
+    return getBoolean(YarnConfiguration.RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_OVERSUBSCRIPTION_ENABLED);
+  }
+
   /**
    * Delay in milliseconds for locality fallback node to rack.
    * @deprecated linked to {@link #CONTINUOUS_SCHEDULING_ENABLED} deprecation

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
index bd1ff7a..f018929 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
@@ -58,8 +58,17 @@ public interface Schedulable {
    */
   Resource getDemand();
 
-  /** Get the aggregate amount of resources consumed by the schedulable. */
-  Resource getResourceUsage();
+  /**
+   * Get the aggregate amount of guaranteed resources consumed by the
+   * schedulable.
+   */
+  Resource getGuaranteedResourceUsage();
+
+  /**
+   * Get the aggregate amount of opportunistic resources consumed by the
+   * schedulable.
+   */
+  Resource getOpportunisticResourceUsage();
 
   /** Minimum Resource share assigned to the schedulable. */
   Resource getMinShare();
@@ -89,8 +98,10 @@ public interface Schedulable {
   /**
    * Assign a container on this node if possible, and return the amount of
    * resources assigned.
+   * @param node the node to assign containers on
+   * @param opportunistic whether to assign OPPORTUNISTIC containers or not
    */
-  Resource assignContainer(FSSchedulerNode node);
+  Resource assignContainer(FSSchedulerNode node, boolean opportunistic);
 
   /** Get the fair share assigned to this Schedulable. */
   Resource getFairShare();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
index 59635d9..f366891 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
@@ -169,8 +169,8 @@ public class DominantResourceFairnessPolicy extends 
SchedulingPolicy {
       extends DominantResourceFairnessComparator {
     @Override
     public int compare(Schedulable s1, Schedulable s2) {
-      Resource usage1 = s1.getResourceUsage();
-      Resource usage2 = s2.getResourceUsage();
+      Resource usage1 = s1.getGuaranteedResourceUsage();
+      Resource usage2 = s2.getGuaranteedResourceUsage();
       Resource minShare1 = s1.getMinShare();
       Resource minShare2 = s2.getMinShare();
       Resource clusterCapacity = fsContext.getClusterResource();
@@ -370,9 +370,9 @@ public class DominantResourceFairnessPolicy extends 
SchedulingPolicy {
     @Override
     public int compare(Schedulable s1, Schedulable s2) {
       ResourceInformation[] resourceInfo1 =
-          s1.getResourceUsage().getResources();
+          s1.getGuaranteedResourceUsage().getResources();
       ResourceInformation[] resourceInfo2 =
-          s2.getResourceUsage().getResources();
+          s2.getGuaranteedResourceUsage().getResources();
       ResourceInformation[] minShareInfo1 = s1.getMinShare().getResources();
       ResourceInformation[] minShareInfo2 = s2.getMinShare().getResources();
       ResourceInformation[] clusterInfo =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
index 8179aa7..7ecbeea 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
@@ -90,8 +90,8 @@ public class FairSharePolicy extends SchedulingPolicy {
       int res = compareDemand(s1, s2);
 
       // Pre-compute resource usages to avoid duplicate calculation
-      Resource resourceUsage1 = s1.getResourceUsage();
-      Resource resourceUsage2 = s2.getResourceUsage();
+      Resource resourceUsage1 = s1.getGuaranteedResourceUsage();
+      Resource resourceUsage2 = s2.getGuaranteedResourceUsage();
 
       if (res == 0) {
         res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
index 913513c..fe313a6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
@@ -88,7 +88,7 @@ public class FairSchedulerQueueInfo {
     amMaxResources = new ResourceInfo(Resource.newInstance(
         queue.getMetrics().getMaxAMShareMB(),
         queue.getMetrics().getMaxAMShareVCores()));
-    usedResources = new ResourceInfo(queue.getResourceUsage());
+    usedResources = new ResourceInfo(queue.getGuaranteedResourceUsage());
     demandResources = new ResourceInfo(queue.getDemand());
     fractionMemUsed = (float)usedResources.getMemorySize() /
         clusterResources.getMemorySize();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index e81192e..6197a2d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -103,7 +102,10 @@ public class MockNodes {
     return rs;
   }
 
-  private static class MockRMNodeImpl implements RMNode {
+  /**
+   * A mock implementation of RMNode.
+   */
+  public static class MockRMNodeImpl implements RMNode {
     private NodeId nodeId;
     private String hostName;
     private String nodeAddr;
@@ -118,12 +120,27 @@ public class MockNodes {
     private ResourceUtilization containersUtilization;
     private ResourceUtilization nodeUtilization;
     private Resource physicalResource;
+    private OverAllocationInfo overAllocationInfo;
+    private List<UpdatedContainerInfo> containerUpdates =
+        Collections.EMPTY_LIST;
+
+    public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
+        Resource perNode, String rackName, String healthReport,
+        long lastHealthReportTime, int cmdPort, String hostName,
+        NodeState state, Set<String> labels,
+        ResourceUtilization containersUtilization,
+        ResourceUtilization nodeUtilization, Resource pPhysicalResource) {
+      this(nodeId, nodeAddr, httpAddress, perNode, rackName, healthReport,
+          lastHealthReportTime, cmdPort, hostName, state, labels,
+          containersUtilization, nodeUtilization, pPhysicalResource, null);
+    }
 
     public MockRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
         Resource perNode, String rackName, String healthReport,
         long lastHealthReportTime, int cmdPort, String hostName, NodeState 
state,
         Set<String> labels, ResourceUtilization containersUtilization,
-        ResourceUtilization nodeUtilization, Resource pPhysicalResource) {
+        ResourceUtilization nodeUtilization, Resource pPhysicalResource,
+        OverAllocationInfo overAllocationInfo) {
       this.nodeId = nodeId;
       this.nodeAddr = nodeAddr;
       this.httpAddress = httpAddress;
@@ -138,6 +155,7 @@ public class MockNodes {
       this.containersUtilization = containersUtilization;
       this.nodeUtilization = nodeUtilization;
       this.physicalResource = pPhysicalResource;
+      this.overAllocationInfo = overAllocationInfo;
     }
 
     @Override
@@ -226,7 +244,7 @@ public class MockNodes {
 
     @Override
     public List<UpdatedContainerInfo> pullContainerUpdates() {
-      return new ArrayList<UpdatedContainerInfo>();
+      return containerUpdates;
     }
 
     @Override
@@ -264,7 +282,7 @@ public class MockNodes {
 
     @Override
     public OverAllocationInfo getOverAllocationInfo() {
-      return null;
+      return this.overAllocationInfo;
     }
 
     public OpportunisticContainersStatus getOpportunisticContainersStatus() {
@@ -289,6 +307,19 @@ public class MockNodes {
     public Resource getPhysicalResource() {
       return this.physicalResource;
     }
+
+    public void updateResourceUtilization(ResourceUtilization utilization) {
+      this.nodeUtilization = utilization;
+    }
+
+    public void updateContainersAndNodeUtilization(
+        UpdatedContainerInfo updatedContainerInfo,
+        ResourceUtilization resourceUtilization) {
+      if (updatedContainerInfo != null) {
+        containerUpdates = Collections.singletonList(updatedContainerInfo);
+      }
+      this.nodeUtilization = resourceUtilization;
+    }
   };
 
   private static RMNode buildRMNode(int rack, final Resource perNode,
@@ -312,6 +343,15 @@ public class MockNodes {
       NodeState state, String httpAddr, int hostnum, String hostName, int port,
       Set<String> labels, ResourceUtilization containersUtilization,
       ResourceUtilization nodeUtilization, Resource physicalResource) {
+    return buildRMNode(rack, perNode, state, httpAddr, hostnum, hostName, port,
+        labels, containersUtilization, nodeUtilization, physicalResource, 
null);
+  }
+
+  private static MockRMNodeImpl buildRMNode(int rack, final Resource perNode,
+      NodeState state, String httpAddr, int hostnum, String hostName, int port,
+      Set<String> labels, ResourceUtilization containersUtilization,
+      ResourceUtilization nodeUtilization, Resource physicalResource,
+      OverAllocationInfo overAllocationInfo) {
     final String rackName = "rack"+ rack;
     final int nid = hostnum;
     final String nodeAddr = hostName + ":" + nid;
@@ -324,9 +364,9 @@ public class MockNodes {
     String healthReport = (state == NodeState.UNHEALTHY) ? null : "HealthyMe";
     return new MockRMNodeImpl(nodeID, nodeAddr, httpAddress, perNode,
         rackName, healthReport, 0, nid, hostName, state, labels,
-        containersUtilization, nodeUtilization, physicalResource);
+        containersUtilization, nodeUtilization,
+        physicalResource, overAllocationInfo);
   }
-
   public static RMNode nodeInfo(int rack, final Resource perNode,
       NodeState state) {
     return buildRMNode(rack, perNode, state, "N/A");
@@ -355,4 +395,10 @@ public class MockNodes {
     return buildRMNode(rack, perNode, null, "localhost:0", hostnum, hostName, 
port);
   }
 
+  public static MockRMNodeImpl newNodeInfo(int rack, final Resource perNode,
+      OverAllocationInfo overAllocationInfo) {
+    return buildRMNode(rack, perNode, NodeState.RUNNING, "localhost:0",
+        NODE_ID++, null, 123, null, ResourceUtilization.newInstance(0, 0, 
0.0f),
+        ResourceUtilization.newInstance(0, 0, 0.0f), null, overAllocationInfo);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index 5b49303..075124f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -502,7 +502,7 @@ public class TestWorkPreservingRMRestart extends 
ParameterizedSchedulerTestBase
     FSParentQueue root = scheduler.getQueueManager().getRootQueue();
     // ************ check cluster used Resources ********
     assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy);
-    assertEquals(usedResources,root.getResourceUsage());
+    assertEquals(usedResources, root.getGuaranteedResourceUsage());
 
     // ************ check app headroom ****************
     FSAppAttempt schedulerAttempt =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
index 03332b2..6adc63c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
@@ -82,7 +82,7 @@ public class FakeSchedulable implements Schedulable {
   }
   
   @Override
-  public Resource assignContainer(FSSchedulerNode node) {
+  public Resource assignContainer(FSSchedulerNode node, boolean opportunistic) 
{
     return null;
   }
 
@@ -112,11 +112,16 @@ public class FakeSchedulable implements Schedulable {
   }
 
   @Override
-  public Resource getResourceUsage() {
+  public Resource getGuaranteedResourceUsage() {
     return usage;
   }
 
   @Override
+  public Resource getOpportunisticResourceUsage() {
+    return Resource.newInstance(0, 0);
+  }
+
+  @Override
   public long getStartTime() {
     return startTime;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
index f581935..71db786 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAppRunnability.java
@@ -222,7 +222,8 @@ public class TestAppRunnability extends 
FairSchedulerTestBase {
     scheduler.handle(nodeEvent);
     scheduler.handle(updateEvent);
 
-    assertEquals(Resource.newInstance(1024, 1), oldQueue.getResourceUsage());
+    assertEquals(Resource.newInstance(1024, 1),
+        oldQueue.getGuaranteedResourceUsage());
     scheduler.update();
     assertEquals(Resource.newInstance(3072, 3), oldQueue.getDemand());
 
@@ -231,8 +232,10 @@ public class TestAppRunnability extends 
FairSchedulerTestBase {
     assertSame(targetQueue, app.getQueue());
     assertFalse(oldQueue.isRunnableApp(app));
     assertTrue(targetQueue.isRunnableApp(app));
-    assertEquals(Resource.newInstance(0, 0), oldQueue.getResourceUsage());
-    assertEquals(Resource.newInstance(1024, 1), 
targetQueue.getResourceUsage());
+    assertEquals(Resource.newInstance(0, 0),
+        oldQueue.getGuaranteedResourceUsage());
+    assertEquals(Resource.newInstance(1024, 1),
+        targetQueue.getGuaranteedResourceUsage());
     assertEquals(0, oldQueue.getNumRunnableApps());
     assertEquals(1, targetQueue.getNumRunnableApps());
     assertEquals(1, queueMgr.getRootQueue().getNumRunnableApps());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
index 51ffd23..9bfbb91 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSAppAttempt.java
@@ -223,7 +223,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase 
{
 
     Mockito.when(mockQueue.getMaxShare()).thenReturn(queueMaxResources);
     Mockito.when(mockQueue.getFairShare()).thenReturn(queueFairShare);
-    Mockito.when(mockQueue.getResourceUsage()).thenReturn(queueUsage);
+    
Mockito.when(mockQueue.getGuaranteedResourceUsage()).thenReturn(queueUsage);
     Mockito.when(mockScheduler.getClusterResource()).thenReturn
         (clusterResource);
     Mockito.when(fakeRootQueueMetrics.getAllocatedResources()).thenReturn
@@ -305,7 +305,7 @@ public class TestFSAppAttempt extends FairSchedulerTestBase 
{
             getApplicationId()));
     FSAppAttempt app = scheduler.getSchedulerApp(id11);
     assertNotNull(app);
-    Resource queueUsage = app.getQueue().getResourceUsage();
+    Resource queueUsage = app.getQueue().getGuaranteedResourceUsage();
     assertEquals(0, queueUsage.getMemorySize());
     assertEquals(0, queueUsage.getVirtualCores());
     SchedulerNode n1 = scheduler.getSchedulerNode(node1.getNodeID());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 4a738ca..efe4ad1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -88,7 +88,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
 
     FSAppAttempt app = mock(FSAppAttempt.class);
     Mockito.when(app.getDemand()).thenReturn(maxResource);
-    Mockito.when(app.getResourceUsage()).thenReturn(Resources.none());
+    
Mockito.when(app.getGuaranteedResourceUsage()).thenReturn(Resources.none());
 
     schedulable.addApp(app, true);
     schedulable.addApp(app, true);
@@ -176,7 +176,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
       @Override
       public void run() {
         for (int i=0; i < 500; i++) {
-          schedulable.getResourceUsage();
+          schedulable.getGuaranteedResourceUsage();
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bdf3e661/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
index 6726f17..f79ba4c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerNode.java
@@ -98,7 +98,7 @@ public class TestFSSchedulerNode {
     ApplicationAttemptId appAttemptId =
         mock(ApplicationAttemptId.class);
     when(starvingApp.getApplicationAttemptId()).thenReturn(appAttemptId);
-    when(starvingApp.assignContainer(schedulerNode)).thenAnswer(
+    when(starvingApp.assignContainer(schedulerNode, false)).thenAnswer(
         new Answer<Resource>() {
           @Override
           public Resource answer(InvocationOnMock invocationOnMock)
@@ -142,7 +142,7 @@ public class TestFSSchedulerNode {
   }
 
   private void allocateContainers(FSSchedulerNode schedulerNode) {
-    FairScheduler.assignPreemptedContainers(schedulerNode);
+    FairScheduler.attemptToAssignPreemptedResources(schedulerNode);
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to