MAPREDUCE-5279. Made MR headroom calculation honor cpu dimension when YARN 
scheduler resource type is memory plus cpu. Contributed by Peng Zhang and Varun 
Vasudev.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/376233cd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/376233cd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/376233cd

Branch: refs/heads/HDFS-6581
Commit: 376233cdd4a4ddbde5a92a0627f78338cb4c38b7
Parents: 26cba7f
Author: Zhijie Shen <zjs...@apache.org>
Authored: Mon Sep 22 09:28:47 2014 -0700
Committer: Zhijie Shen <zjs...@apache.org>
Committed: Mon Sep 22 09:28:47 2014 -0700

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   4 +
 .../mapreduce/v2/app/rm/RMCommunicator.java     |  12 +-
 .../v2/app/rm/RMContainerAllocator.java         | 272 +++++++++++--------
 .../v2/app/rm/ResourceCalculatorUtils.java      |  52 ++++
 .../v2/app/rm/TestRMContainerAllocator.java     |  82 ++++--
 .../hadoop/yarn/util/resource/Resources.java    |   4 +-
 6 files changed, 288 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/376233cd/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt 
b/hadoop-mapreduce-project/CHANGES.txt
index 815b3fb..859c899 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -254,6 +254,10 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5891. Improved shuffle error handling across NM restarts
     (Junping Du via jlowe)
 
+    MAPREDUCE-5279. Made MR headroom calculation honor cpu dimension when YARN
+    scheduler resource type is memory plus cpu. (Peng Zhang and Varun Vasudev
+    via zjshen)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/376233cd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
index 6e9f313..6c58a68 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.app.rm;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 
 /**
  * Registers/unregisters to RM and sends heartbeats to RM.
@@ -90,6 +92,8 @@ public abstract class RMCommunicator extends AbstractService
   private volatile boolean shouldUnregister = true;
   private boolean isApplicationMasterRegistered = false;
 
+  private EnumSet<SchedulerResourceTypes> schedulerResourceTypes;
+
   public RMCommunicator(ClientService clientService, AppContext context) {
     super("RMCommunicator");
     this.clientService = clientService;
@@ -98,6 +102,7 @@ public abstract class RMCommunicator extends AbstractService
     this.applicationId = context.getApplicationID();
     this.stopped = new AtomicBoolean(false);
     this.heartbeatCallbacks = new ConcurrentLinkedQueue<Runnable>();
+    this.schedulerResourceTypes = EnumSet.of(SchedulerResourceTypes.MEMORY);
   }
 
   @Override
@@ -163,10 +168,11 @@ public abstract class RMCommunicator extends 
AbstractService
         setClientToAMToken(response.getClientToAMTokenMasterKey());        
       }
       this.applicationACLs = response.getApplicationACLs();
-      LOG.info("maxContainerCapability: " + 
maxContainerCapability.getMemory());
+      LOG.info("maxContainerCapability: " + maxContainerCapability);
       String queue = response.getQueue();
       LOG.info("queue: " + queue);
       job.setQueueName(queue);
+      this.schedulerResourceTypes.addAll(response.getSchedulerResourceTypes());
     } catch (Exception are) {
       LOG.error("Exception while registering", are);
       throw new YarnRuntimeException(are);
@@ -343,4 +349,8 @@ public abstract class RMCommunicator extends AbstractService
   protected boolean isApplicationMasterRegistered() {
     return isApplicationMasterRegistered;
   }
+
+  public EnumSet<SchedulerResourceTypes> getSchedulerResourceTypes() {
+    return schedulerResourceTypes;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/376233cd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 6cb0191..fb8771a 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.NMTokenCache;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -80,6 +81,7 @@ import 
org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -149,8 +151,8 @@ public class RMContainerAllocator extends 
RMContainerRequestor
   private int lastCompletedTasks = 0;
   
   private boolean recalculateReduceSchedule = false;
-  private int mapResourceRequest;//memory
-  private int reduceResourceRequest;//memory
+  private Resource mapResourceRequest = Resources.none();
+  private Resource reduceResourceRequest = Resources.none();
   
   private boolean reduceStarted = false;
   private float maxReduceRampupLimit = 0;
@@ -328,49 +330,61 @@ public class RMContainerAllocator extends 
RMContainerRequestor
     if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
       ContainerRequestEvent reqEvent = (ContainerRequestEvent) event;
       JobId jobId = getJob().getID();
-      int supportedMaxContainerCapability =
-          getMaxContainerCapability().getMemory();
+      Resource supportedMaxContainerCapability = getMaxContainerCapability();
       if 
(reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
-        if (mapResourceRequest == 0) {
-          mapResourceRequest = reqEvent.getCapability().getMemory();
-          eventHandler.handle(new JobHistoryEvent(jobId, 
-              new 
NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
-                  mapResourceRequest)));
-          LOG.info("mapResourceRequest:"+ mapResourceRequest);
-          if (mapResourceRequest > supportedMaxContainerCapability) {
-            String diagMsg = "MAP capability required is more than the 
supported " +
-            "max container capability in the cluster. Killing the Job. 
mapResourceRequest: " +
-                mapResourceRequest + " maxContainerCapability:" + 
supportedMaxContainerCapability;
+        if (mapResourceRequest.equals(Resources.none())) {
+          mapResourceRequest = reqEvent.getCapability();
+          eventHandler.handle(new JobHistoryEvent(jobId,
+            new NormalizedResourceEvent(
+              org.apache.hadoop.mapreduce.TaskType.MAP, mapResourceRequest
+                .getMemory())));
+          LOG.info("mapResourceRequest:" + mapResourceRequest);
+          if (mapResourceRequest.getMemory() > supportedMaxContainerCapability
+            .getMemory()
+              || mapResourceRequest.getVirtualCores() > 
supportedMaxContainerCapability
+                .getVirtualCores()) {
+            String diagMsg =
+                "MAP capability required is more than the supported "
+                    + "max container capability in the cluster. Killing the 
Job. mapResourceRequest: "
+                    + mapResourceRequest + " maxContainerCapability:"
+                    + supportedMaxContainerCapability;
             LOG.info(diagMsg);
-            eventHandler.handle(new JobDiagnosticsUpdateEvent(
-                jobId, diagMsg));
+            eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
             eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
           }
         }
-        //set the rounded off memory
-        reqEvent.getCapability().setMemory(mapResourceRequest);
+        // set the resources
+        reqEvent.getCapability().setMemory(mapResourceRequest.getMemory());
+        reqEvent.getCapability().setVirtualCores(
+          mapResourceRequest.getVirtualCores());
         scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
       } else {
-        if (reduceResourceRequest == 0) {
-          reduceResourceRequest = reqEvent.getCapability().getMemory();
-          eventHandler.handle(new JobHistoryEvent(jobId, 
-              new NormalizedResourceEvent(
-                  org.apache.hadoop.mapreduce.TaskType.REDUCE,
-                  reduceResourceRequest)));
-          LOG.info("reduceResourceRequest:"+ reduceResourceRequest);
-          if (reduceResourceRequest > supportedMaxContainerCapability) {
-            String diagMsg = "REDUCE capability required is more than the " +
-                       "supported max container capability in the cluster. 
Killing the " +
-                       "Job. reduceResourceRequest: " + reduceResourceRequest +
-                       " maxContainerCapability:" + 
supportedMaxContainerCapability;
+        if (reduceResourceRequest.equals(Resources.none())) {
+          reduceResourceRequest = reqEvent.getCapability();
+          eventHandler.handle(new JobHistoryEvent(jobId,
+            new NormalizedResourceEvent(
+              org.apache.hadoop.mapreduce.TaskType.REDUCE,
+              reduceResourceRequest.getMemory())));
+          LOG.info("reduceResourceRequest:" + reduceResourceRequest);
+          if (reduceResourceRequest.getMemory() > 
supportedMaxContainerCapability
+            .getMemory()
+              || reduceResourceRequest.getVirtualCores() > 
supportedMaxContainerCapability
+                .getVirtualCores()) {
+            String diagMsg =
+                "REDUCE capability required is more than the "
+                    + "supported max container capability in the cluster. 
Killing the "
+                    + "Job. reduceResourceRequest: " + reduceResourceRequest
+                    + " maxContainerCapability:"
+                    + supportedMaxContainerCapability;
             LOG.info(diagMsg);
-            eventHandler.handle(new JobDiagnosticsUpdateEvent(
-                jobId, diagMsg));
+            eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
             eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
           }
         }
-        //set the rounded off memory
-        reqEvent.getCapability().setMemory(reduceResourceRequest);
+        // set the resources
+        reqEvent.getCapability().setMemory(reduceResourceRequest.getMemory());
+        reqEvent.getCapability().setVirtualCores(
+          reduceResourceRequest.getVirtualCores());
         if (reqEvent.getEarlierAttemptFailed()) {
           //add to the front of queue for fail fast
           pendingReduces.addFirst(new ContainerRequest(reqEvent, 
PRIORITY_REDUCE));
@@ -425,34 +439,40 @@ public class RMContainerAllocator extends 
RMContainerRequestor
 
   @Private
   @VisibleForTesting
-  synchronized void setReduceResourceRequest(int mem) {
-    this.reduceResourceRequest = mem;
+  synchronized void setReduceResourceRequest(Resource res) {
+    this.reduceResourceRequest = res;
   }
 
   @Private
   @VisibleForTesting
-  synchronized void setMapResourceRequest(int mem) {
-    this.mapResourceRequest = mem;
+  synchronized void setMapResourceRequest(Resource res) {
+    this.mapResourceRequest = res;
   }
 
   @Private
   @VisibleForTesting
   void preemptReducesIfNeeded() {
-    if (reduceResourceRequest == 0) {
-      return; //no reduces
+    if (reduceResourceRequest.equals(Resources.none())) {
+      return; // no reduces
     }
     //check if reduces have taken over the whole cluster and there are 
     //unassigned maps
     if (scheduledRequests.maps.size() > 0) {
-      int memLimit = getMemLimit();
-      int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
-          assignedRequests.preemptionWaitingReduces.size()) * 
reduceResourceRequest);
-      //availableMemForMap must be sufficient to run atleast 1 map
-      if (availableMemForMap < mapResourceRequest) {
-        //to make sure new containers are given to maps and not reduces
-        //ramp down all scheduled reduces if any
-        //(since reduces are scheduled at higher priority than maps)
-        LOG.info("Ramping down all scheduled reduces:" + 
scheduledRequests.reduces.size());
+      Resource resourceLimit = getResourceLimit();
+      Resource availableResourceForMap =
+          Resources.subtract(
+            resourceLimit,
+            Resources.multiply(reduceResourceRequest,
+              assignedRequests.reduces.size()
+                  - assignedRequests.preemptionWaitingReduces.size()));
+      // availableMemForMap must be sufficient to run at least 1 map
+      if 
(ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
+        mapResourceRequest, getSchedulerResourceTypes()) <= 0) {
+        // to make sure new containers are given to maps and not reduces
+        // ramp down all scheduled reduces if any
+        // (since reduces are scheduled at higher priority than maps)
+        LOG.info("Ramping down all scheduled reduces:"
+            + scheduledRequests.reduces.size());
         for (ContainerRequest req : scheduledRequests.reduces.values()) {
           pendingReduces.add(req);
         }
@@ -462,17 +482,25 @@ public class RMContainerAllocator extends 
RMContainerRequestor
         //hanging around for a while
         int hangingMapRequests = 
getNumOfHangingRequests(scheduledRequests.maps);
         if (hangingMapRequests > 0) {
-          //preempt for making space for at least one map
-          int premeptionLimit = Math.max(mapResourceRequest,
-              (int) (maxReducePreemptionLimit * memLimit));
-
-          int preemptMem = Math.min(hangingMapRequests * mapResourceRequest,
-              premeptionLimit);
-
-          int toPreempt = (int) Math.ceil((float) preemptMem / 
reduceResourceRequest);
-          toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
-
-          LOG.info("Going to preempt " + toPreempt + " due to lack of space 
for maps");
+          // preempt for making space for at least one map
+          int preemptionReduceNumForOneMap =
+              
ResourceCalculatorUtils.divideAndCeilContainers(mapResourceRequest,
+                reduceResourceRequest, getSchedulerResourceTypes());
+          int preemptionReduceNumForPreemptionLimit =
+              ResourceCalculatorUtils.divideAndCeilContainers(
+                Resources.multiply(resourceLimit, maxReducePreemptionLimit),
+                reduceResourceRequest, getSchedulerResourceTypes());
+          int preemptionReduceNumForAllMaps =
+              ResourceCalculatorUtils.divideAndCeilContainers(
+                Resources.multiply(mapResourceRequest, hangingMapRequests),
+                reduceResourceRequest, getSchedulerResourceTypes());
+          int toPreempt =
+              Math.min(Math.max(preemptionReduceNumForOneMap,
+                preemptionReduceNumForPreemptionLimit),
+                preemptionReduceNumForAllMaps);
+
+          LOG.info("Going to preempt " + toPreempt
+              + " due to lack of space for maps");
           assignedRequests.preemptReduce(toPreempt);
         }
       }
@@ -497,7 +525,7 @@ public class RMContainerAllocator extends 
RMContainerRequestor
       int totalMaps, int completedMaps,
       int scheduledMaps, int scheduledReduces,
       int assignedMaps, int assignedReduces,
-      int mapResourceReqt, int reduceResourceReqt,
+      Resource mapResourceReqt, Resource reduceResourceReqt,
       int numPendingReduces,
       float maxReduceRampupLimit, float reduceSlowStart) {
     
@@ -505,8 +533,12 @@ public class RMContainerAllocator extends 
RMContainerRequestor
       return;
     }
     
-    int headRoom = getAvailableResources() != null ?
-        getAvailableResources().getMemory() : 0;
+    // get available resources for this job
+    Resource headRoom = getAvailableResources();
+    if (headRoom == null) {
+      headRoom = Resources.none();
+    }
+
     LOG.info("Recalculating schedule, headroom=" + headRoom);
     
     //check for slow start
@@ -540,49 +572,60 @@ public class RMContainerAllocator extends 
RMContainerRequestor
       completedMapPercent = 1;
     }
     
-    int netScheduledMapMem = 
-        (scheduledMaps + assignedMaps) * mapResourceReqt;
+    Resource netScheduledMapResource =
+        Resources.multiply(mapResourceReqt, (scheduledMaps + assignedMaps));
 
-    int netScheduledReduceMem = 
-        (scheduledReduces + assignedReduces) * reduceResourceReqt;
+    Resource netScheduledReduceResource =
+        Resources.multiply(reduceResourceReqt,
+          (scheduledReduces + assignedReduces));
+
+    Resource finalMapResourceLimit;
+    Resource finalReduceResourceLimit;
 
-    int finalMapMemLimit = 0;
-    int finalReduceMemLimit = 0;
-    
     // ramp up the reduces based on completed map percentage
-    int totalMemLimit = getMemLimit();
-    int idealReduceMemLimit = 
-        Math.min(
-            (int)(completedMapPercent * totalMemLimit),
-            (int) (maxReduceRampupLimit * totalMemLimit));
-    int idealMapMemLimit = totalMemLimit - idealReduceMemLimit;
+    Resource totalResourceLimit = getResourceLimit();
+
+    Resource idealReduceResourceLimit =
+        Resources.multiply(totalResourceLimit,
+          Math.min(completedMapPercent, maxReduceRampupLimit));
+    Resource ideaMapResourceLimit =
+        Resources.subtract(totalResourceLimit, idealReduceResourceLimit);
 
     // check if there aren't enough maps scheduled, give the free map capacity
-    // to reduce
-    if (idealMapMemLimit > netScheduledMapMem) {
-      int unusedMapMemLimit = idealMapMemLimit - netScheduledMapMem;
-      finalReduceMemLimit = idealReduceMemLimit + unusedMapMemLimit;
-      finalMapMemLimit = totalMemLimit - finalReduceMemLimit;
+    // to reduce.
+    // Even when container number equals, there may be unused resources in one
+    // dimension
+    if 
(ResourceCalculatorUtils.computeAvailableContainers(ideaMapResourceLimit,
+      mapResourceReqt, getSchedulerResourceTypes()) >= (scheduledMaps + 
assignedMaps)) {
+      // enough resource given to maps, given the remaining to reduces
+      Resource unusedMapResourceLimit =
+          Resources.subtract(ideaMapResourceLimit, netScheduledMapResource);
+      finalReduceResourceLimit =
+          Resources.add(idealReduceResourceLimit, unusedMapResourceLimit);
+      finalMapResourceLimit =
+          Resources.subtract(totalResourceLimit, finalReduceResourceLimit);
     } else {
-      finalMapMemLimit = idealMapMemLimit;
-      finalReduceMemLimit = idealReduceMemLimit;
+      finalMapResourceLimit = ideaMapResourceLimit;
+      finalReduceResourceLimit = idealReduceResourceLimit;
     }
-    
-    LOG.info("completedMapPercent " + completedMapPercent +
-        " totalMemLimit:" + totalMemLimit +
-        " finalMapMemLimit:" + finalMapMemLimit +
-        " finalReduceMemLimit:" + finalReduceMemLimit + 
-        " netScheduledMapMem:" + netScheduledMapMem +
-        " netScheduledReduceMem:" + netScheduledReduceMem);
-    
-    int rampUp = 
-        (finalReduceMemLimit - netScheduledReduceMem) / reduceResourceReqt;
-    
+
+    LOG.info("completedMapPercent " + completedMapPercent
+        + " totalResourceLimit:" + totalResourceLimit
+        + " finalMapResourceLimit:" + finalMapResourceLimit
+        + " finalReduceResourceLimit:" + finalReduceResourceLimit
+        + " netScheduledMapResource:" + netScheduledMapResource
+        + " netScheduledReduceResource:" + netScheduledReduceResource);
+
+    int rampUp =
+        ResourceCalculatorUtils.computeAvailableContainers(Resources.subtract(
+                finalReduceResourceLimit, netScheduledReduceResource),
+            reduceResourceReqt, getSchedulerResourceTypes());
+
     if (rampUp > 0) {
       rampUp = Math.min(rampUp, numPendingReduces);
       LOG.info("Ramping up " + rampUp);
       rampUpReduces(rampUp);
-    } else if (rampUp < 0){
+    } else if (rampUp < 0) {
       int rampDown = -1 * rampUp;
       rampDown = Math.min(rampDown, scheduledReduces);
       LOG.info("Ramping down " + rampDown);
@@ -618,8 +661,10 @@ public class RMContainerAllocator extends 
RMContainerRequestor
   
   @SuppressWarnings("unchecked")
   private List<Container> getResources() throws Exception {
-    int headRoom = getAvailableResources() != null
-        ? getAvailableResources().getMemory() : 0;//first time it would be null
+    // will be null the first time
+    Resource headRoom =
+        getAvailableResources() == null ? Resources.none() :
+            Resources.clone(getAvailableResources());
     AllocateResponse response;
     /*
      * If contact with RM is lost, the AM will wait 
MR_AM_TO_RM_WAIT_INTERVAL_MS
@@ -670,7 +715,9 @@ public class RMContainerAllocator extends 
RMContainerRequestor
         throw new YarnRuntimeException(msg);
       }
     }
-    int newHeadRoom = getAvailableResources() != null ? 
getAvailableResources().getMemory() : 0;
+    Resource newHeadRoom =
+        getAvailableResources() == null ? Resources.none()
+            : getAvailableResources();
     List<Container> newContainers = response.getAllocatedContainers();
     // Setting NMTokens
     if (response.getNMTokens() != null) {
@@ -694,10 +741,11 @@ public class RMContainerAllocator extends 
RMContainerRequestor
           new PreemptionContext(assignedRequests), preemptReq);
     }
 
-    if (newContainers.size() + finishedContainers.size() > 0 || headRoom != 
newHeadRoom) {
+    if (newContainers.size() + finishedContainers.size() > 0
+        || !headRoom.equals(newHeadRoom)) {
       //something changed
       recalculateReduceSchedule = true;
-      if (LOG.isDebugEnabled() && headRoom != newHeadRoom) {
+      if (LOG.isDebugEnabled() && !headRoom.equals(newHeadRoom)) {
         LOG.debug("headroom=" + newHeadRoom);
       }
     }
@@ -802,10 +850,18 @@ public class RMContainerAllocator extends 
RMContainerRequestor
   }
 
   @Private
-  public int getMemLimit() {
-    int headRoom = getAvailableResources() != null ? 
getAvailableResources().getMemory() : 0;
-    return headRoom + assignedRequests.maps.size() * mapResourceRequest +
-       assignedRequests.reduces.size() * reduceResourceRequest;
+  public Resource getResourceLimit() {
+    Resource headRoom = getAvailableResources();
+    if (headRoom == null) {
+      headRoom = Resources.none();
+    }
+    Resource assignedMapResource =
+        Resources.multiply(mapResourceRequest, assignedRequests.maps.size());
+    Resource assignedReduceResource =
+        Resources.multiply(reduceResourceRequest,
+          assignedRequests.reduces.size());
+    return Resources.add(headRoom,
+      Resources.add(assignedMapResource, assignedReduceResource));
   }
 
   @Private
@@ -914,10 +970,11 @@ public class RMContainerAllocator extends 
RMContainerRequestor
         // a container to be assigned
         boolean isAssignable = true;
         Priority priority = allocated.getPriority();
-        int allocatedMemory = allocated.getResource().getMemory();
+        Resource allocatedResource = allocated.getResource();
         if (PRIORITY_FAST_FAIL_MAP.equals(priority) 
             || PRIORITY_MAP.equals(priority)) {
-          if (allocatedMemory < mapResourceRequest
+          if 
(ResourceCalculatorUtils.computeAvailableContainers(allocatedResource,
+              mapResourceRequest, getSchedulerResourceTypes()) <= 0
               || maps.isEmpty()) {
             LOG.info("Cannot assign container " + allocated 
                 + " for a map as either "
@@ -928,7 +985,8 @@ public class RMContainerAllocator extends 
RMContainerRequestor
           }
         } 
         else if (PRIORITY_REDUCE.equals(priority)) {
-          if (allocatedMemory < reduceResourceRequest
+          if 
(ResourceCalculatorUtils.computeAvailableContainers(allocatedResource,
+              reduceResourceRequest, getSchedulerResourceTypes()) <= 0
               || reduces.isEmpty()) {
             LOG.info("Cannot assign container " + allocated 
                 + " for a reduce as either "

http://git-wip-us.apache.org/repos/asf/hadoop/blob/376233cd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ResourceCalculatorUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ResourceCalculatorUtils.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ResourceCalculatorUtils.java
new file mode 100644
index 0000000..b9bc8b5
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ResourceCalculatorUtils.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app.rm;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
+import org.apache.hadoop.yarn.util.Records;
+
+import java.util.EnumSet;
+
+public class ResourceCalculatorUtils {
+  public static int divideAndCeil(int a, int b) {
+    if (b == 0) {
+      return 0;
+    }
+    return (a + (b - 1)) / b;
+  }
+
+  public static int computeAvailableContainers(Resource available,
+      Resource required, EnumSet<SchedulerResourceTypes> resourceTypes) {
+    if (resourceTypes.contains(SchedulerResourceTypes.CPU)) {
+      return Math.min(available.getMemory() / required.getMemory(),
+        available.getVirtualCores() / required.getVirtualCores());
+    }
+    return available.getMemory() / required.getMemory();
+  }
+
+  public static int divideAndCeilContainers(Resource required, Resource factor,
+      EnumSet<SchedulerResourceTypes> resourceTypes) {
+    if (resourceTypes.contains(SchedulerResourceTypes.CPU)) {
+      return Math.max(divideAndCeil(required.getMemory(), factor.getMemory()),
+        divideAndCeil(required.getVirtualCores(), factor.getVirtualCores()));
+    }
+    return divideAndCeil(required.getMemory(), factor.getMemory());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/376233cd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 4c03b31..341e673 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapreduce.v2.app.rm;
 
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyFloat;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.isA;
@@ -30,19 +31,14 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
 import org.apache.hadoop.mapreduce.v2.app.MRApp;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -458,8 +454,8 @@ public class TestRMContainerAllocator {
             0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob, new SystemClock());
-    allocator.setMapResourceRequest(1024);
-    allocator.setReduceResourceRequest(1024);
+    allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1));
+    allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1));
     RMContainerAllocator.AssignedRequests assignedRequests =
         allocator.getAssignedRequests();
     RMContainerAllocator.ScheduledRequests scheduledRequests =
@@ -478,7 +474,7 @@ public class TestRMContainerAllocator {
 
   @Test(timeout = 30000)
   public void testNonAggressivelyPreemptReducers() throws Exception {
-    LOG.info("Running testPreemptReducers");
+    LOG.info("Running testNonAggressivelyPreemptReducers");
 
     final int preemptThreshold = 2; //sec
     Configuration conf = new Configuration();
@@ -513,8 +509,8 @@ public class TestRMContainerAllocator {
     clock.setTime(1);
     MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
         appAttemptId, mockJob, clock);
-    allocator.setMapResourceRequest(1024);
-    allocator.setReduceResourceRequest(1024);
+    allocator.setMapResourceRequest(BuilderUtils.newResource(1024, 1));
+    allocator.setReduceResourceRequest(BuilderUtils.newResource(1024, 1));
     RMContainerAllocator.AssignedRequests assignedRequests =
         allocator.getAssignedRequests();
     RMContainerAllocator.ScheduledRequests scheduledRequests =
@@ -1774,17 +1770,19 @@ public class TestRMContainerAllocator {
     int scheduledReduces = 0;
     int assignedMaps = 2;
     int assignedReduces = 0;
-    int mapResourceReqt = 1024;
-    int reduceResourceReqt = 2*1024;
+    Resource mapResourceReqt = BuilderUtils.newResource(1024, 1);
+    Resource reduceResourceReqt = BuilderUtils.newResource(2 * 1024, 1);
     int numPendingReduces = 4;
     float maxReduceRampupLimit = 0.5f;
     float reduceSlowStart = 0.2f;
     
     RMContainerAllocator allocator = mock(RMContainerAllocator.class);
-    doCallRealMethod().when(allocator).
-        scheduleReduces(anyInt(), anyInt(), anyInt(), anyInt(), anyInt(), 
-            anyInt(), anyInt(), anyInt(), anyInt(), anyFloat(), anyFloat());
-    
+    doCallRealMethod().when(allocator).scheduleReduces(anyInt(), anyInt(),
+        anyInt(), anyInt(), anyInt(), anyInt(), any(Resource.class),
+        any(Resource.class), anyInt(), anyFloat(), anyFloat());
+    doReturn(EnumSet.of(SchedulerResourceTypes.MEMORY)).when(allocator)
+      .getSchedulerResourceTypes();
+
     // Test slow-start
     allocator.scheduleReduces(
         totalMaps, succeededMaps, 
@@ -1808,6 +1806,7 @@ public class TestRMContainerAllocator {
     verify(allocator, never()).scheduleAllReduces();
 
     succeededMaps = 3;
+    doReturn(BuilderUtils.newResource(0, 
0)).when(allocator).getResourceLimit();
     allocator.scheduleReduces(
         totalMaps, succeededMaps, 
         scheduledMaps, scheduledReduces, 
@@ -1818,7 +1817,8 @@ public class TestRMContainerAllocator {
     verify(allocator, times(1)).setIsReduceStarted(true);
     
     // Test reduce ramp-up
-    doReturn(100 * 1024).when(allocator).getMemLimit();
+    doReturn(BuilderUtils.newResource(100 * 1024, 100 * 1)).when(allocator)
+      .getResourceLimit();
     allocator.scheduleReduces(
         totalMaps, succeededMaps, 
         scheduledMaps, scheduledReduces, 
@@ -1831,13 +1831,14 @@ public class TestRMContainerAllocator {
 
     // Test reduce ramp-down
     scheduledReduces = 3;
-    doReturn(10 * 1024).when(allocator).getMemLimit();
+    doReturn(BuilderUtils.newResource(10 * 1024, 10 * 1)).when(allocator)
+      .getResourceLimit();
     allocator.scheduleReduces(
-        totalMaps, succeededMaps, 
-        scheduledMaps, scheduledReduces, 
-        assignedMaps, assignedReduces, 
-        mapResourceReqt, reduceResourceReqt, 
-        numPendingReduces, 
+        totalMaps, succeededMaps,
+        scheduledMaps, scheduledReduces,
+        assignedMaps, assignedReduces,
+        mapResourceReqt, reduceResourceReqt,
+        numPendingReduces,
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator).rampDownReduces(anyInt());
 
@@ -1846,7 +1847,8 @@ public class TestRMContainerAllocator {
     // should be invoked twice.
     scheduledMaps = 2;
     assignedReduces = 2;
-    doReturn(10 * 1024).when(allocator).getMemLimit();
+    doReturn(BuilderUtils.newResource(10 * 1024, 10 * 1)).when(allocator)
+      .getResourceLimit();
     allocator.scheduleReduces(
         totalMaps, succeededMaps, 
         scheduledMaps, scheduledReduces, 
@@ -1855,6 +1857,30 @@ public class TestRMContainerAllocator {
         numPendingReduces, 
         maxReduceRampupLimit, reduceSlowStart);
     verify(allocator, times(2)).rampDownReduces(anyInt());
+
+    doReturn(
+        EnumSet.of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU))
+        .when(allocator).getSchedulerResourceTypes();
+
+    // Test ramp-down when enough memory but not enough cpu resource
+    scheduledMaps = 10;
+    assignedReduces = 0;
+    doReturn(BuilderUtils.newResource(100 * 1024, 5 * 1)).when(allocator)
+        .getResourceLimit();
+    allocator.scheduleReduces(totalMaps, succeededMaps, scheduledMaps,
+        scheduledReduces, assignedMaps, assignedReduces, mapResourceReqt,
+        reduceResourceReqt, numPendingReduces, maxReduceRampupLimit,
+        reduceSlowStart);
+    verify(allocator, times(3)).rampDownReduces(anyInt());
+
+    // Test ramp-down when enough cpu but not enough memory resource
+    doReturn(BuilderUtils.newResource(10 * 1024, 100 * 1)).when(allocator)
+        .getResourceLimit();
+    allocator.scheduleReduces(totalMaps, succeededMaps, scheduledMaps,
+        scheduledReduces, assignedMaps, assignedReduces, mapResourceReqt,
+        reduceResourceReqt, numPendingReduces, maxReduceRampupLimit,
+        reduceSlowStart);
+    verify(allocator, times(4)).rampDownReduces(anyInt());
   }
 
   private static class RecalculateContainerAllocator extends 
MyContainerAllocator {
@@ -1868,7 +1894,7 @@ public class TestRMContainerAllocator {
     @Override
     public void scheduleReduces(int totalMaps, int completedMaps,
         int scheduledMaps, int scheduledReduces, int assignedMaps,
-        int assignedReduces, int mapResourceReqt, int reduceResourceReqt,
+        int assignedReduces, Resource mapResourceReqt, Resource 
reduceResourceReqt,
         int numPendingReduces, float maxReduceRampupLimit, float 
reduceSlowStart) {
       recalculatedReduceSchedule = true;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/376233cd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 077c962..a205bd1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -18,12 +18,12 @@
 
 package org.apache.hadoop.yarn.util.resource;
 
-import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.util.Records;
 
-@Private
+@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
 @Unstable
 public class Resources {
   

Reply via email to