YARN-8468. Enable the use of queue based maximum container allocation limit and 
implement it in FairScheduler. Contributed by Antal Bálint Steinbach.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fd6be589
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fd6be589
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fd6be589

Branch: refs/heads/HDFS-13532
Commit: fd6be5898ad1a650e3bceacb8169a53520da57e5
Parents: 0da03f8
Author: Weiwei Yang <w...@apache.org>
Authored: Sat Sep 29 17:47:12 2018 +0800
Committer: Weiwei Yang <w...@apache.org>
Committed: Sat Sep 29 17:47:12 2018 +0800

----------------------------------------------------------------------
 .../server/resourcemanager/RMAppManager.java    |  11 +-
 .../server/resourcemanager/RMServerUtils.java   |   9 +-
 .../scheduler/AbstractYarnScheduler.java        |  21 ++-
 .../scheduler/SchedulerUtils.java               |  62 +++----
 .../scheduler/YarnScheduler.java                |   9 +-
 .../scheduler/capacity/CapacityScheduler.java   |   7 +-
 .../processor/PlacementConstraintProcessor.java |  12 +-
 .../scheduler/fair/AllocationConfiguration.java |  13 ++
 .../scheduler/fair/FSLeafQueue.java             |  10 ++
 .../scheduler/fair/FSParentQueue.java           |  15 +-
 .../resourcemanager/scheduler/fair/FSQueue.java |   7 +
 .../scheduler/fair/FairScheduler.java           |  52 ++++--
 .../allocation/AllocationFileQueueParser.java   |   7 +
 .../fair/allocation/QueueProperties.java        |  16 +-
 .../webapp/FairSchedulerPage.java               |   4 +
 .../webapp/dao/FairSchedulerQueueInfo.java      |   9 +-
 .../resourcemanager/AppManagerTestBase.java     | 107 +++++++++++
 .../yarn/server/resourcemanager/MockRM.java     |   8 +
 .../server/resourcemanager/TestAppManager.java  |  89 ++--------
 .../TestAppManagerWithFairScheduler.java        | 177 +++++++++++++++++++
 .../resourcemanager/TestClientRMService.java    | 104 +++++------
 .../resourcemanager/TestRMServerUtils.java      |  98 +++++++++-
 .../scheduler/TestSchedulerUtils.java           | 144 ++++++++-------
 .../capacity/TestCapacityScheduler.java         |  39 +++-
 .../scheduler/fair/FairSchedulerTestBase.java   |   4 +-
 .../fair/TestAllocationFileLoaderService.java   |  37 +++-
 .../TestApplicationMasterServiceWithFS.java     | 174 ++++++++++++++++++
 .../scheduler/fair/TestFairScheduler.java       | 140 +++++++++++++--
 .../allocationfile/AllocationFileQueue.java     |   6 +-
 .../AllocationFileQueueBuilder.java             |   6 +
 .../AllocationFileQueueProperties.java          |  12 ++
 .../src/site/markdown/FairScheduler.md          |   2 +
 32 files changed, 1115 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index d0f2ce6..783fab0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -567,13 +568,13 @@ public class RMAppManager implements 
EventHandler<RMAppManagerEvent>,
 
         // Normalize all requests
         String queue = submissionContext.getQueue();
+        Resource maxAllocation = scheduler.getMaximumResourceCapability(queue);
         for (ResourceRequest amReq : amReqs) {
-          SchedulerUtils.normalizeAndValidateRequest(amReq,
-              scheduler.getMaximumResourceCapability(queue),
-              queue, scheduler, isRecovery, rmContext);
+          SchedulerUtils.normalizeAndValidateRequest(amReq, maxAllocation,
+              queue, scheduler, isRecovery, rmContext, null);
 
-          amReq.setCapability(
-              scheduler.getNormalizedResource(amReq.getCapability()));
+          amReq.setCapability(scheduler.getNormalizedResource(
+              amReq.getCapability(), maxAllocation));
         }
         return amReqs;
       } catch (InvalidResourceRequestException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index ab6bbcf..b18b12e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -97,7 +97,7 @@ public class RMServerUtils {
       "INCORRECT_CONTAINER_VERSION_ERROR";
   private static final String INVALID_CONTAINER_ID =
       "INVALID_CONTAINER_ID";
-  private static final String RESOURCE_OUTSIDE_ALLOWED_RANGE =
+  public static final String RESOURCE_OUTSIDE_ALLOWED_RANGE =
       "RESOURCE_OUTSIDE_ALLOWED_RANGE";
 
   protected static final RecordFactory RECORD_FACTORY =
@@ -235,7 +235,7 @@ public class RMServerUtils {
    * requested memory/vcore is non-negative and not greater than max
    */
   public static void normalizeAndValidateRequests(List<ResourceRequest> ask,
-      Resource maximumResource, String queueName, YarnScheduler scheduler,
+      Resource maximumAllocation, String queueName, YarnScheduler scheduler,
       RMContext rmContext) throws InvalidResourceRequestException {
     // Get queue from scheduler
     QueueInfo queueInfo = null;
@@ -247,7 +247,7 @@ public class RMServerUtils {
     }
 
     for (ResourceRequest resReq : ask) {
-      SchedulerUtils.normalizeAndvalidateRequest(resReq, maximumResource,
+      SchedulerUtils.normalizeAndValidateRequest(resReq, maximumAllocation,
           queueName, scheduler, rmContext, queueInfo);
     }
   }
@@ -338,7 +338,8 @@ public class RMServerUtils {
       return false;
     }
     ResourceScheduler scheduler = rmContext.getScheduler();
-    
request.setCapability(scheduler.getNormalizedResource(request.getCapability()));
+    request.setCapability(scheduler
+        .getNormalizedResource(request.getCapability(), maximumAllocation));
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 9d2b058..0acfca7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -1159,11 +1159,12 @@ public abstract class AbstractYarnScheduler
   }
 
   @Override
-  public Resource getNormalizedResource(Resource requestedResource) {
+  public Resource getNormalizedResource(Resource requestedResource,
+                                        Resource maxResourceCapability) {
     return SchedulerUtils.getNormalizedResource(requestedResource,
         getResourceCalculator(),
         getMinimumResourceCapability(),
-        getMaximumResourceCapability(),
+        maxResourceCapability,
         getMinimumResourceCapability());
   }
 
@@ -1173,8 +1174,20 @@ public abstract class AbstractYarnScheduler
    * @param asks resource requests
    */
   protected void normalizeResourceRequests(List<ResourceRequest> asks) {
-    for (ResourceRequest ask: asks) {
-      ask.setCapability(getNormalizedResource(ask.getCapability()));
+    normalizeResourceRequests(asks, null);
+  }
+
+  /**
+   * Normalize a list of resource requests
+   * using queue maximum resource allocations.
+   * @param asks resource requests
+   */
+  protected void normalizeResourceRequests(List<ResourceRequest> asks,
+      String queueName) {
+    Resource maxAllocation = getMaximumResourceCapability(queueName);
+    for (ResourceRequest ask : asks) {
+      ask.setCapability(
+          getNormalizedResource(ask.getCapability(), maxAllocation));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 9b07d37..9a02b6e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -114,19 +114,19 @@ public class SchedulerUtils {
 
   public static final String UPDATED_CONTAINER =
       "Temporary container killed by application for ExeutionType update";
-  
-  public static final String LOST_CONTAINER = 
+
+  public static final String LOST_CONTAINER =
       "Container released on a *lost* node";
-  
-  public static final String PREEMPTED_CONTAINER = 
+
+  public static final String PREEMPTED_CONTAINER =
       "Container preempted by scheduler";
-  
-  public static final String COMPLETED_APPLICATION = 
+
+  public static final String COMPLETED_APPLICATION =
       "Container of a completed application";
-  
+
   public static final String EXPIRED_CONTAINER =
       "Container expired since it was unused";
-  
+
   public static final String UNRESERVED_CONTAINER =
       "Container reservation no longer required.";
 
@@ -141,7 +141,7 @@ public class SchedulerUtils {
    */
   public static ContainerStatus createAbnormalContainerStatus(
       ContainerId containerId, String diagnostics) {
-    return createAbnormalContainerStatus(containerId, 
+    return createAbnormalContainerStatus(containerId,
         ContainerExitStatus.ABORTED, diagnostics);
   }
 
@@ -169,14 +169,14 @@ public class SchedulerUtils {
    */
   public static ContainerStatus createPreemptedContainerStatus(
       ContainerId containerId, String diagnostics) {
-    return createAbnormalContainerStatus(containerId, 
+    return createAbnormalContainerStatus(containerId,
         ContainerExitStatus.PREEMPTED, diagnostics);
   }
 
   /**
    * Utility to create a {@link ContainerStatus} during exceptional
    * circumstances.
-   * 
+   *
    * @param containerId {@link ContainerId} of returned/released/lost 
container.
    * @param diagnostics diagnostic message
    * @return <code>ContainerStatus</code> for an returned/released/lost 
@@ -184,7 +184,7 @@ public class SchedulerUtils {
    */
   private static ContainerStatus createAbnormalContainerStatus(
       ContainerId containerId, int exitStatus, String diagnostics) {
-    ContainerStatus containerStatus = 
+    ContainerStatus containerStatus =
         recordFactory.newRecordInstance(ContainerStatus.class);
     containerStatus.setContainerId(containerId);
     containerStatus.setDiagnostics(diagnostics);
@@ -254,23 +254,14 @@ public class SchedulerUtils {
       labelExp = RMNodeLabelsManager.NO_LABEL;
     }
 
-    if ( labelExp != null) {
+    if (labelExp != null) {
       resReq.setNodeLabelExpression(labelExp);
     }
   }
 
   public static void normalizeAndValidateRequest(ResourceRequest resReq,
-      Resource maximumResource, String queueName, YarnScheduler scheduler,
-      boolean isRecovery, RMContext rmContext)
-      throws InvalidResourceRequestException {
-    normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler,
-        isRecovery, rmContext, null);
-  }
-
-
-  private static void normalizeAndValidateRequest(ResourceRequest resReq,
-          Resource maximumResource, String queueName, YarnScheduler scheduler,
-          boolean isRecovery, RMContext rmContext, QueueInfo queueInfo)
+      Resource maximumAllocation, String queueName, YarnScheduler scheduler,
+      boolean isRecovery, RMContext rmContext, QueueInfo queueInfo)
       throws InvalidResourceRequestException {
     Configuration conf = rmContext.getYarnConfiguration();
     // If Node label is not enabled throw exception
@@ -299,37 +290,30 @@ public class SchedulerUtils {
     SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo);
 
     if (!isRecovery) {
-      validateResourceRequest(resReq, maximumResource, queueInfo, rmContext);
+      validateResourceRequest(resReq, maximumAllocation, queueInfo, rmContext);
     }
   }
 
-  public static void normalizeAndvalidateRequest(ResourceRequest resReq,
-      Resource maximumResource, String queueName, YarnScheduler scheduler,
-      RMContext rmContext) throws InvalidResourceRequestException {
-    normalizeAndvalidateRequest(resReq, maximumResource, queueName, scheduler,
-        rmContext, null);
-  }
-
-  public static void normalizeAndvalidateRequest(ResourceRequest resReq,
-      Resource maximumResource, String queueName, YarnScheduler scheduler,
+  public static void normalizeAndValidateRequest(ResourceRequest resReq,
+      Resource maximumAllocation, String queueName, YarnScheduler scheduler,
       RMContext rmContext, QueueInfo queueInfo)
       throws InvalidResourceRequestException {
-    normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler,
+    normalizeAndValidateRequest(resReq, maximumAllocation, queueName, 
scheduler,
         false, rmContext, queueInfo);
   }
 
   /**
    * Utility method to validate a resource request, by insuring that the
    * requested memory/vcore is non-negative and not greater than max
-   * 
+   *
    * @throws InvalidResourceRequestException when there is invalid request
    */
   private static void validateResourceRequest(ResourceRequest resReq,
-      Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
+      Resource maximumAllocation, QueueInfo queueInfo, RMContext rmContext)
       throws InvalidResourceRequestException {
     final Resource requestedResource = resReq.getCapability();
     checkResourceRequestAgainstAvailableResource(requestedResource,
-        maximumResource);
+        maximumAllocation);
 
     String labelExp = resReq.getNodeLabelExpression();
     // we don't allow specify label expression other than resourceName=ANY now
@@ -535,7 +519,7 @@ public class SchedulerUtils {
       if (!str.trim().isEmpty()) {
         // check queue label
         if (queueLabels == null) {
-          return false; 
+          return false;
         } else {
           if (!queueLabels.contains(str)
               && !queueLabels.contains(RMNodeLabelsManager.ANY)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index 0f7a5b5..d95fe7d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -390,12 +390,17 @@ public interface YarnScheduler extends 
EventHandler<SchedulerEvent> {
   SchedulerNode getSchedulerNode(NodeId nodeId);
 
   /**
-   * Normalize a resource request.
+   * Normalize a resource request using scheduler level maximum resource or
+   * queue based maximum resource.
    *
    * @param requestedResource the resource to be normalized
+   * @param maxResourceCapability Maximum container allocation value, if null 
or
+   *          empty scheduler level maximum container allocation value will be
+   *          used
    * @return the normalized resource
    */
-  Resource getNormalizedResource(Resource requestedResource);
+  Resource getNormalizedResource(Resource requestedResource,
+      Resource maxResourceCapability);
 
   /**
    * Verify whether a submitted application lifetime is valid as per configured

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 4b274df..75d6144 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1159,10 +1159,12 @@ public class CapacityScheduler extends
     if (asks == null) {
       return;
     }
+    Resource maxAllocation = getMaximumResourceCapability();
     for (SchedulingRequest ask: asks) {
       ResourceSizing sizing = ask.getResourceSizing();
       if (sizing != null && sizing.getResources() != null) {
-        sizing.setResources(getNormalizedResource(sizing.getResources()));
+        sizing.setResources(
+            getNormalizedResource(sizing.getResources(), maxAllocation));
       }
     }
   }
@@ -2567,6 +2569,9 @@ public class CapacityScheduler extends
 
   @Override
   public Resource getMaximumResourceCapability(String queueName) {
+    if(queueName == null || queueName.isEmpty()) {
+      return getMaximumResourceCapability();
+    }
     CSQueue queue = getQueue(queueName);
     if (queue == null) {
       LOG.error("Unknown queue: " + queueName);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java
index cf944a6..c4c9574 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementConstraintProcessor.java
@@ -175,11 +175,19 @@ public class PlacementConstraintProcessor extends 
AbstractPlacementProcessor {
   private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId,
       List<SchedulingRequest> schedulingRequests) {
     if (schedulingRequests != null && !schedulingRequests.isEmpty()) {
+      SchedulerApplicationAttempt appAttempt =
+          scheduler.getApplicationAttempt(appAttemptId);
+      String queueName = null;
+      if(appAttempt != null) {
+        queueName = appAttempt.getQueueName();
+      }
+      Resource maxAllocation =
+          scheduler.getMaximumResourceCapability(queueName);
       // Normalize the Requests before dispatching
       schedulingRequests.forEach(req -> {
         Resource reqResource = req.getResourceSizing().getResources();
-        req.getResourceSizing()
-            .setResources(this.scheduler.getNormalizedResource(reqResource));
+        req.getResourceSizing().setResources(
+            this.scheduler.getNormalizedResource(reqResource, maxAllocation));
       });
       this.placementDispatcher.dispatch(new BatchedRequests(iteratorType,
           appAttemptId.getApplicationId(), schedulingRequests, 1));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
index e48e04b..826d9f5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
@@ -91,6 +91,9 @@ public class AllocationConfiguration extends 
ReservationSchedulerConfiguration {
 
   private final SchedulingPolicy defaultSchedulingPolicy;
 
+  //Map for maximum container resource allocation per queues by queue name
+  private final Map<String, Resource> queueMaxContainerAllocationMap;
+
   // Policy for mapping apps to queues
   @VisibleForTesting
   QueuePlacementPolicy placementPolicy;
@@ -138,6 +141,8 @@ public class AllocationConfiguration extends 
ReservationSchedulerConfiguration {
     this.placementPolicy = newPlacementPolicy;
     this.configuredQueues = queueProperties.getConfiguredQueues();
     this.nonPreemptableQueues = queueProperties.getNonPreemptableQueues();
+    this.queueMaxContainerAllocationMap =
+        queueProperties.getMaxContainerAllocation();
   }
 
   public AllocationConfiguration(Configuration conf) {
@@ -167,6 +172,7 @@ public class AllocationConfiguration extends 
ReservationSchedulerConfiguration {
     placementPolicy =
         QueuePlacementPolicy.fromConfiguration(conf, configuredQueues);
     nonPreemptableQueues = new HashSet<>();
+    queueMaxContainerAllocationMap = new HashMap<>();
   }
 
   /**
@@ -272,6 +278,12 @@ public class AllocationConfiguration extends 
ReservationSchedulerConfiguration {
     return maxQueueResource;
   }
 
+  @VisibleForTesting
+  Resource getQueueMaxContainerAllocation(String queue) {
+    Resource resource = queueMaxContainerAllocationMap.get(queue);
+    return resource == null ? Resources.unbounded() : resource;
+  }
+
   /**
    * Get the maximum resource allocation for children of the given queue.
    *
@@ -375,6 +387,7 @@ public class AllocationConfiguration extends 
ReservationSchedulerConfiguration {
     queue.setMaxRunningApps(getQueueMaxApps(name));
     queue.setMaxAMShare(getQueueMaxAMShare(name));
     queue.setMaxChildQueueResource(getMaxChildResources(name));
+    queue.setMaxContainerAllocation(getQueueMaxContainerAllocation(name));
 
     // Set queue metrics.
     queue.getMetrics().setMinShare(queue.getMinShare());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index cbc74d2..7e4dab8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -547,6 +547,16 @@ public class FSLeafQueue extends FSQueue {
     this.weights = weight;
   }
 
+  @Override
+  public Resource getMaximumContainerAllocation() {
+    if (maxContainerAllocation.equals(Resources.unbounded())
+        && getParent() != null) {
+      return getParent().getMaximumContainerAllocation();
+    } else {
+      return maxContainerAllocation;
+    }
+  }
+
   /**
    * Helper method to compute the amount of minshare starvation.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index d5df549..e9f4af6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -59,7 +59,20 @@ public class FSParentQueue extends FSQueue {
       FSParentQueue parent) {
     super(name, scheduler, parent);
   }
-  
+
+  @Override
+  public Resource getMaximumContainerAllocation() {
+    if (getName().equals("root")) {
+      return maxContainerAllocation;
+    }
+    if (maxContainerAllocation.equals(Resources.unbounded())
+        && getParent() != null) {
+      return getParent().getMaximumContainerAllocation();
+    } else {
+      return maxContainerAllocation;
+    }
+  }
+
   void addChildQueue(FSQueue child) {
     writeLock.lock();
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index 6b88a32..6217f55 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -84,6 +84,7 @@ public abstract class FSQueue implements Queue, Schedulable {
   private float fairSharePreemptionThreshold = 0.5f;
   private boolean preemptable = true;
   private boolean isDynamic = true;
+  protected Resource maxContainerAllocation;
 
   public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
     this.name = name;
@@ -163,6 +164,12 @@ public abstract class FSQueue implements Queue, 
Schedulable {
     this.maxShare = maxShare;
   }
 
+  public void setMaxContainerAllocation(Resource maxContainerAllocation){
+    this.maxContainerAllocation = maxContainerAllocation;
+  }
+
+  public abstract Resource getMaximumContainerAllocation();
+
   @Override
   public Resource getMaxShare() {
     Resource maxResource = 
maxShare.getResource(scheduler.getClusterResource());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index 43a47ae..da5e4c9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -192,6 +192,7 @@ public class FairScheduler extends
   protected long rackLocalityDelayMs; // Delay for rack locality
   protected boolean assignMultiple; // Allocate multiple containers per
                                     // heartbeat
+
   @VisibleForTesting
   boolean maxAssignDynamic;
   protected int maxAssign; // Max containers to assign per heartbeat
@@ -227,12 +228,12 @@ public class FairScheduler extends
 
   private void validateConf(FairSchedulerConfiguration config) {
     // validate scheduler memory allocation setting
-    int minMem = config.getInt(
-      YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
-    int maxMem = config.getInt(
-      YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    int minMem =
+        config.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    int maxMem =
+        config.getInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
 
     if (minMem < 0 || minMem > maxMem) {
       throw new YarnRuntimeException("Invalid resource scheduler memory"
@@ -254,12 +255,12 @@ public class FairScheduler extends
     }
 
     // validate scheduler vcores allocation setting
-    int minVcores = config.getInt(
-      YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
-    int maxVcores = config.getInt(
-      YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
-      YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+    int minVcores =
+        config.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+    int maxVcores =
+        config.getInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
 
     if (minVcores < 0 || minVcores > maxVcores) {
       throw new YarnRuntimeException("Invalid resource scheduler vcores"
@@ -833,14 +834,35 @@ public class FairScheduler extends
   }
 
   @Override
-  public Resource getNormalizedResource(Resource requestedResource) {
+  public Resource getNormalizedResource(Resource requestedResource,
+      Resource maxResourceCapability) {
     return SchedulerUtils.getNormalizedResource(requestedResource,
         DOMINANT_RESOURCE_CALCULATOR,
         minimumAllocation,
-        getMaximumResourceCapability(),
+        maxResourceCapability,
         incrAllocation);
   }
 
+  @Override
+  public Resource getMaximumResourceCapability(String queueName) {
+    if(queueName == null || queueName.isEmpty()) {
+      return  getMaximumResourceCapability();
+    }
+    FSQueue queue = queueMgr.getQueue(queueName);
+    Resource schedulerLevelMaxResourceCapability =
+        getMaximumResourceCapability();
+    if (queue == null) {
+      return schedulerLevelMaxResourceCapability;
+    }
+    Resource queueMaxResourceCapability = 
queue.getMaximumContainerAllocation();
+    if (queueMaxResourceCapability.equals(Resources.unbounded())) {
+      return schedulerLevelMaxResourceCapability;
+    } else {
+      return Resources.componentwiseMin(schedulerLevelMaxResourceCapability,
+          queueMaxResourceCapability);
+    }
+  }
+
   @VisibleForTesting
   @Override
   public void killContainer(RMContainer container) {
@@ -897,7 +919,7 @@ public class FairScheduler extends
     handleContainerUpdates(application, updateRequests);
 
     // Sanity check
-    normalizeResourceRequests(ask);
+    normalizeResourceRequests(ask, queue.getName());
 
     // TODO, normalize SchedulingRequest
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java
index 441c34a..72c6c68 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java
@@ -51,6 +51,8 @@ public class AllocationFileQueueParser {
   private static final String MAX_CHILD_RESOURCES = "maxChildResources";
   private static final String MAX_RUNNING_APPS = "maxRunningApps";
   private static final String MAX_AMSHARE = "maxAMShare";
+  public static final String MAX_CONTAINER_ALLOCATION =
+      "maxContainerAllocation";
   private static final String WEIGHT = "weight";
   private static final String MIN_SHARE_PREEMPTION_TIMEOUT =
       "minSharePreemptionTimeout";
@@ -155,6 +157,11 @@ public class AllocationFileQueueParser {
         float val = Float.parseFloat(text);
         val = Math.min(val, 1.0f);
         builder.queueMaxAMShares(queueName, val);
+      } else if (MAX_CONTAINER_ALLOCATION.equals(field.getTagName())) {
+        String text = getTrimmedTextData(field);
+        ConfigurableResource val =
+            FairSchedulerConfiguration.parseResourceConfigValue(text);
+        builder.queueMaxContainerAllocation(queueName, val.getResource());
       } else if (WEIGHT.equals(field.getTagName())) {
         String text = getTrimmedTextData(field);
         double val = Double.parseDouble(text);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java
index ee5f179..badf05f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java
@@ -53,6 +53,7 @@ public class QueueProperties {
   private final Set<String> reservableQueues;
   private final Set<String> nonPreemptableQueues;
   private final Map<FSQueueType, Set<String>> configuredQueues;
+  private final Map<String, Resource> queueMaxContainerAllocation;
 
   QueueProperties(Builder builder) {
     this.reservableQueues = builder.reservableQueues;
@@ -70,6 +71,7 @@ public class QueueProperties {
     this.maxChildQueueResources = builder.maxChildQueueResources;
     this.reservationAcls = builder.reservationAcls;
     this.queueAcls = builder.queueAcls;
+    this.queueMaxContainerAllocation = builder.queueMaxContainerAllocation;
   }
 
   public Map<FSQueueType, Set<String>> getConfiguredQueues() {
@@ -133,7 +135,11 @@ public class QueueProperties {
     return nonPreemptableQueues;
   }
 
-  /**
+  public Map<String, Resource> getMaxContainerAllocation() {
+    return queueMaxContainerAllocation;
+  }
+
+    /**
    * Builder class for {@link QueueProperties}.
    * All methods are adding queue properties to the maps of this builder
    * keyed by the queue's name except some methods
@@ -149,6 +155,7 @@ public class QueueProperties {
         new HashMap<>();
     private Map<String, Integer> queueMaxApps = new HashMap<>();
     private Map<String, Float> queueMaxAMShares = new HashMap<>();
+    private Map<String, Resource> queueMaxContainerAllocation = new 
HashMap<>();
     private Map<String, Float> queueWeights = new HashMap<>();
     private Map<String, SchedulingPolicy> queuePolicies = new HashMap<>();
     private Map<String, Long> minSharePreemptionTimeouts = new HashMap<>();
@@ -253,6 +260,12 @@ public class QueueProperties {
       return this;
     }
 
+    public Builder queueMaxContainerAllocation(String queueName,
+        Resource value) {
+      queueMaxContainerAllocation.put(queueName, value);
+      return this;
+    }
+
     public void configuredQueues(FSQueueType queueType, String queueName) {
       this.configuredQueues.get(queueType).add(queueName);
     }
@@ -275,6 +288,5 @@ public class QueueProperties {
     public QueueProperties build() {
       return new QueueProperties(this);
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
index ef417d4..7f31def 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
@@ -78,6 +78,8 @@ public class FairSchedulerPage extends RmView {
           __("Num Pending Applications:", qinfo.getNumPendingApplications()).
           __("Min Resources:", qinfo.getMinResources().toString()).
           __("Max Resources:", qinfo.getMaxResources().toString()).
+          __("Max Container Allocation:",
+              qinfo.getMaxContainerAllocation().toString()).
           __("Reserved Resources:", qinfo.getReservedResources().toString());
       int maxApps = qinfo.getMaxApplications();
       if (maxApps < Integer.MAX_VALUE) {
@@ -107,6 +109,8 @@ public class FairSchedulerPage extends RmView {
           __("Used Resources:", qinfo.getUsedResources().toString()).
           __("Min Resources:", qinfo.getMinResources().toString()).
           __("Max Resources:", qinfo.getMaxResources().toString()).
+          __("Max Container Allocation:",
+              qinfo.getMaxContainerAllocation().toString()).
           __("Reserved Resources:", qinfo.getReservedResources().toString());
       int maxApps = qinfo.getMaxApplications();
       if (maxApps < Integer.MAX_VALUE) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
index 913513c..70c5fd0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
@@ -60,6 +60,7 @@ public class FairSchedulerQueueInfo {
   private ResourceInfo fairResources;
   private ResourceInfo clusterResources;
   private ResourceInfo reservedResources;
+  private ResourceInfo maxContainerAllocation;
 
   private long allocatedContainers;
   private long reservedContainers;
@@ -99,6 +100,8 @@ public class FairSchedulerQueueInfo {
     maxResources = new ResourceInfo(
         Resources.componentwiseMin(queue.getMaxShare(),
             scheduler.getClusterResource()));
+    maxContainerAllocation =
+        new ResourceInfo(scheduler.getMaximumResourceCapability(queueName));
     reservedResources = new ResourceInfo(queue.getReservedResource());
 
     fractionMemSteadyFairShare =
@@ -186,7 +189,11 @@ public class FairSchedulerQueueInfo {
   public ResourceInfo getMaxResources() {
     return maxResources;
   }
-  
+
+  public ResourceInfo getMaxContainerAllocation() {
+    return maxContainerAllocation;
+  }
+
   public ResourceInfo getReservedResources() {
     return reservedResources;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java
new file mode 100644
index 0000000..36258b4
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static java.util.stream.Collectors.toSet;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.mockito.ArgumentCaptor;
+
+/**
+ * Base class for AppManager related test.
+ *
+ */
+public class AppManagerTestBase {
+
+  // Extend and make the functions we want to test public
+  protected class TestRMAppManager extends RMAppManager {
+    private final RMStateStore stateStore;
+
+    public TestRMAppManager(RMContext context, Configuration conf) {
+      super(context, null, null, new ApplicationACLsManager(conf), conf);
+      this.stateStore = context.getStateStore();
+    }
+
+    public TestRMAppManager(RMContext context,
+        ClientToAMTokenSecretManagerInRM clientToAMSecretManager,
+        YarnScheduler scheduler, ApplicationMasterService masterService,
+        ApplicationACLsManager applicationACLsManager, Configuration conf) {
+      super(context, scheduler, masterService, applicationACLsManager, conf);
+      this.stateStore = context.getStateStore();
+    }
+
+    public void checkAppNumCompletedLimit() {
+      super.checkAppNumCompletedLimit();
+    }
+
+    public void finishApplication(ApplicationId appId) {
+      super.finishApplication(appId);
+    }
+
+    public int getCompletedAppsListSize() {
+      return super.getCompletedAppsListSize();
+    }
+
+    public int getNumberOfCompletedAppsInStateStore() {
+      return this.completedAppsInStateStore;
+    }
+
+    public List<ApplicationId> getCompletedApps() {
+      return completedApps;
+    }
+
+    public Set<ApplicationId> getFirstNCompletedApps(int n) {
+      return getCompletedApps().stream().limit(n).collect(toSet());
+    }
+
+    public Set<ApplicationId> getCompletedAppsWithEvenIdsInRange(int n) {
+      return getCompletedApps().stream().limit(n)
+          .filter(app -> app.getId() % 2 == 0).collect(toSet());
+    }
+
+    public Set<ApplicationId> getRemovedAppsFromStateStore(int numRemoves) {
+      ArgumentCaptor<RMApp> argumentCaptor =
+          ArgumentCaptor.forClass(RMApp.class);
+      verify(stateStore, times(numRemoves))
+          .removeApplication(argumentCaptor.capture());
+      return 
argumentCaptor.getAllValues().stream().map(RMApp::getApplicationId)
+          .collect(toSet());
+    }
+
+    public void submitApplication(
+        ApplicationSubmissionContext submissionContext, String user)
+        throws YarnException {
+      super.submitApplication(submissionContext, System.currentTimeMillis(),
+          user);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index df86f28..408cb18 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -513,6 +513,14 @@ public class MockRM extends ResourceManager {
     return submitApp(masterMemory, false);
   }
 
+  public RMApp submitApp(int masterMemory, String queue) throws Exception {
+    return submitApp(masterMemory, "",
+        UserGroupInformation.getCurrentUser().getShortUserName(), null, false,
+        queue, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
+            YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS),
+        null);
+  }
+
   public RMApp submitApp(int masterMemory, Set<String> appTags)
       throws Exception {
     Resource resource = Resource.newInstance(masterMemory, 0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
index 0bd5372..8f48293 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
@@ -68,7 +68,6 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
@@ -82,12 +81,10 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -99,10 +96,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
-import static java.util.stream.Collectors.toSet;
 import static 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Matchers.matches;
 import static org.mockito.Mockito.doAnswer;
@@ -117,7 +114,7 @@ import static org.mockito.Mockito.when;
  *
  */
 
-public class TestAppManager{
+public class TestAppManager extends AppManagerTestBase{
   private Log LOG = LogFactory.getLog(TestAppManager.class);
   private static RMAppEventType appEventType = RMAppEventType.KILL;
 
@@ -234,70 +231,6 @@ public class TestAppManager{
       setAppEventType(event.getType());
       System.out.println("in handle routine " + getAppEventType().toString());
     }   
-  }   
-  
-
-  // Extend and make the functions we want to test public
-  public class TestRMAppManager extends RMAppManager {
-    private final RMStateStore stateStore;
-
-    public TestRMAppManager(RMContext context, Configuration conf) {
-      super(context, null, null, new ApplicationACLsManager(conf), conf);
-      this.stateStore = context.getStateStore();
-    }
-
-    public TestRMAppManager(RMContext context,
-        ClientToAMTokenSecretManagerInRM clientToAMSecretManager,
-        YarnScheduler scheduler, ApplicationMasterService masterService,
-        ApplicationACLsManager applicationACLsManager, Configuration conf) {
-      super(context, scheduler, masterService, applicationACLsManager, conf);
-      this.stateStore = context.getStateStore();
-    }
-
-    public void checkAppNumCompletedLimit() {
-      super.checkAppNumCompletedLimit();
-    }
-
-    public void finishApplication(ApplicationId appId) {
-      super.finishApplication(appId);
-    }
-
-    public int getCompletedAppsListSize() {
-      return super.getCompletedAppsListSize();
-    }
-
-    public int getNumberOfCompletedAppsInStateStore() {
-      return this.completedAppsInStateStore;
-    }
-
-    List<ApplicationId> getCompletedApps() {
-      return completedApps;
-    }
-
-    Set<ApplicationId> getFirstNCompletedApps(int n) {
-      return getCompletedApps().stream().limit(n).collect(toSet());
-    }
-
-    Set<ApplicationId> getCompletedAppsWithEvenIdsInRange(int n) {
-      return getCompletedApps().stream().limit(n)
-          .filter(app -> app.getId() % 2 == 0).collect(toSet());
-    }
-
-    Set<ApplicationId> getRemovedAppsFromStateStore(int numRemoves) {
-      ArgumentCaptor<RMApp> argumentCaptor =
-          ArgumentCaptor.forClass(RMApp.class);
-      verify(stateStore, times(numRemoves))
-          .removeApplication(argumentCaptor.capture());
-      return 
argumentCaptor.getAllValues().stream().map(RMApp::getApplicationId)
-          .collect(toSet());
-    }
-
-    public void submitApplication(
-        ApplicationSubmissionContext submissionContext, String user)
-            throws YarnException, IOException {
-      super.submitApplication(submissionContext, System.currentTimeMillis(),
-        user);
-    }
   }
 
   private void addToCompletedApps(TestRMAppManager appMonitor,
@@ -1213,17 +1146,21 @@ public class TestAppManager{
         Resources.createResource(
             YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
 
+    when(scheduler.getMaximumResourceCapability(anyString())).thenReturn(
+        Resources.createResource(
+            YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+
     ResourceCalculator rs = mock(ResourceCalculator.class);
     when(scheduler.getResourceCalculator()).thenReturn(rs);
 
-    when(scheduler.getNormalizedResource(any()))
+    when(scheduler.getNormalizedResource(any(), any()))
         .thenAnswer(new Answer<Resource>() {
-      @Override
-      public Resource answer(InvocationOnMock invocationOnMock)
-          throws Throwable {
-        return (Resource) invocationOnMock.getArguments()[0];
-      }
-    });
+          @Override
+          public Resource answer(InvocationOnMock invocationOnMock)
+              throws Throwable {
+            return (Resource) invocationOnMock.getArguments()[0];
+          }
+        });
 
     return scheduler;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java
new file mode 100644
index 0000000..feb7ed2
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import static 
org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException.InvalidResourceType.GREATER_THEN_MAX_ALLOCATION;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.matches;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.HashMap;
+
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import 
org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
+import 
org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Testing applications being retired from RM with fair scheduler.
+ *
+ */
+public class TestAppManagerWithFairScheduler extends AppManagerTestBase{
+
+  private static final String TEST_FOLDER = "test-queues";
+
+  private static YarnConfiguration conf = new YarnConfiguration();
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    String allocFile =
+        GenericTestUtils.getTestDir(TEST_FOLDER).getAbsolutePath();
+
+    int queueMaxAllocation = 512;
+
+    PrintWriter out = new PrintWriter(new FileWriter(allocFile));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println(" <queue name=\"queueA\">");
+    out.println("  <maxContainerAllocation>" + queueMaxAllocation
+        + " mb 1 vcores" + "</maxContainerAllocation>");
+    out.println(" </queue>");
+    out.println(" <queue name=\"queueB\">");
+    out.println(" </queue>");
+    out.println("</allocations>");
+    out.close();
+
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
+        ResourceScheduler.class);
+
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
+  }
+
+  @AfterClass
+  public static void teardown(){
+    File allocFile = GenericTestUtils.getTestDir(TEST_FOLDER);
+    allocFile.delete();
+  }
+
+  @Test
+  public void testQueueSubmitWithHighQueueContainerSize()
+      throws YarnException {
+
+    ApplicationId appId = MockApps.newAppID(1);
+    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+    Resource resource = Resources.createResource(
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+
+    ApplicationSubmissionContext asContext =
+        recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
+    asContext.setApplicationId(appId);
+    asContext.setResource(resource);
+    asContext.setPriority(Priority.newInstance(0));
+    asContext.setAMContainerSpec(mockContainerLaunchContext(recordFactory));
+    asContext.setQueue("queueA");
+    QueueInfo mockDefaultQueueInfo = mock(QueueInfo.class);
+
+    // Setup a PlacementManager returns a new queue
+    PlacementManager placementMgr = mock(PlacementManager.class);
+    doAnswer(new Answer<ApplicationPlacementContext>() {
+
+      @Override
+      public ApplicationPlacementContext answer(InvocationOnMock invocation)
+          throws Throwable {
+        return new ApplicationPlacementContext("queueA");
+      }
+
+    }).when(placementMgr).placeApplication(
+        any(ApplicationSubmissionContext.class), matches("test1"));
+    doAnswer(new Answer<ApplicationPlacementContext>() {
+
+      @Override
+      public ApplicationPlacementContext answer(InvocationOnMock invocation)
+          throws Throwable {
+        return new ApplicationPlacementContext("queueB");
+      }
+
+    }).when(placementMgr).placeApplication(
+        any(ApplicationSubmissionContext.class), matches("test2"));
+
+    MockRM newMockRM = new MockRM(conf);
+    RMContext newMockRMContext = newMockRM.getRMContext();
+    newMockRMContext.setQueuePlacementManager(placementMgr);
+    ApplicationMasterService masterService = new ApplicationMasterService(
+        newMockRMContext, newMockRMContext.getScheduler());
+
+    TestRMAppManager newAppMonitor = new TestRMAppManager(newMockRMContext,
+        new ClientToAMTokenSecretManagerInRM(), 
newMockRMContext.getScheduler(),
+        masterService, new ApplicationACLsManager(conf), conf);
+
+    // only user test has permission to submit to 'test' queue
+
+    try {
+      newAppMonitor.submitApplication(asContext, "test1");
+      Assert.fail("Test should fail on too high allocation!");
+    } catch (InvalidResourceRequestException e) {
+      Assert.assertEquals(GREATER_THEN_MAX_ALLOCATION,
+          e.getInvalidResourceType());
+    }
+
+    // Should not throw exception
+    newAppMonitor.submitApplication(asContext, "test2");
+  }
+
+  private static ContainerLaunchContext mockContainerLaunchContext(
+      RecordFactory recordFactory) {
+    ContainerLaunchContext amContainer = recordFactory.newRecordInstance(
+        ContainerLaunchContext.class);
+    amContainer
+        .setApplicationACLs(new HashMap<ApplicationAccessType, String>());
+    return amContainer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fd6be589/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 61048a5..3fe46b1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -362,9 +362,9 @@ public class TestClientRMService {
 
    @Test
   public void testGetApplicationReport() throws Exception {
-    YarnScheduler yarnScheduler = mock(YarnScheduler.class);
+    ResourceScheduler scheduler = mock(ResourceScheduler.class);
     RMContext rmContext = mock(RMContext.class);
-    mockRMContext(yarnScheduler, rmContext);
+    mockRMContext(scheduler, rmContext);
 
     ApplicationId appId1 = getApplicationId(1);
 
@@ -373,7 +373,7 @@ public class TestClientRMService {
         mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
             ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true);
 
-    ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler,
+    ClientRMService rmService = new ClientRMService(rmContext, scheduler,
         null, mockAclsManager, null, null);
     try {
       RecordFactory recordFactory = 
RecordFactoryProvider.getRecordFactory(null);
@@ -456,9 +456,9 @@ public class TestClientRMService {
   public void testGetApplicationResourceUsageReportDummy() throws 
YarnException,
       IOException {
     ApplicationAttemptId attemptId = getApplicationAttemptId(1);
-    YarnScheduler yarnScheduler = mockYarnScheduler();
+    ResourceScheduler scheduler = mockResourceScheduler();
     RMContext rmContext = mock(RMContext.class);
-    mockRMContext(yarnScheduler, rmContext);
+    mockRMContext(scheduler, rmContext);
     when(rmContext.getDispatcher().getEventHandler()).thenReturn(
         new EventHandler<Event>() {
           public void handle(Event event) {
@@ -468,7 +468,7 @@ public class TestClientRMService {
         mock(ApplicationSubmissionContext.class);
     YarnConfiguration config = new YarnConfiguration();
     RMAppAttemptImpl rmAppAttemptImpl = new RMAppAttemptImpl(attemptId,
-        rmContext, yarnScheduler, null, asContext, config, null, null);
+        rmContext, scheduler, null, asContext, config, null, null);
     ApplicationResourceUsageReport report = rmAppAttemptImpl
         .getApplicationResourceUsageReport();
     assertEquals(report, 
RMServerUtils.DUMMY_APPLICATION_RESOURCE_USAGE_REPORT);
@@ -537,14 +537,14 @@ public class TestClientRMService {
   }
 
   public ClientRMService createRMService() throws IOException, YarnException {
-    YarnScheduler yarnScheduler = mockYarnScheduler();
+    ResourceScheduler scheduler = mockResourceScheduler();
     RMContext rmContext = mock(RMContext.class);
-    mockRMContext(yarnScheduler, rmContext);
+    mockRMContext(scheduler, rmContext);
     ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
-        yarnScheduler);
+        scheduler);
     when(rmContext.getRMApps()).thenReturn(apps);
     when(rmContext.getYarnConfiguration()).thenReturn(new Configuration());
-    RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, null,
+    RMAppManager appManager = new RMAppManager(rmContext, scheduler, null,
         mock(ApplicationACLsManager.class), new Configuration());
     when(rmContext.getDispatcher().getEventHandler()).thenReturn(
         new EventHandler<Event>() {
@@ -558,7 +558,7 @@ public class TestClientRMService {
         mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
             any(QueueACL.class), any(RMApp.class), any(String.class),
             any())).thenReturn(true);
-    return new ClientRMService(rmContext, yarnScheduler, appManager,
+    return new ClientRMService(rmContext, scheduler, appManager,
         mockAclsManager, mockQueueACLsManager, null);
   }
 
@@ -907,9 +907,9 @@ public class TestClientRMService {
 
   @Test
   public void testGetQueueInfo() throws Exception {
-    YarnScheduler yarnScheduler = mock(YarnScheduler.class);
+    ResourceScheduler scheduler = mock(ResourceScheduler.class);
     RMContext rmContext = mock(RMContext.class);
-    mockRMContext(yarnScheduler, rmContext);
+    mockRMContext(scheduler, rmContext);
 
     ApplicationACLsManager mockAclsManager = 
mock(ApplicationACLsManager.class);
     QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
@@ -921,7 +921,7 @@ public class TestClientRMService {
         any(ApplicationAccessType.class), anyString(),
         any(ApplicationId.class))).thenReturn(true);
 
-    ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler,
+    ClientRMService rmService = new ClientRMService(rmContext, scheduler,
         null, mockAclsManager, mockQueueACLsManager, null);
     GetQueueInfoRequest request = recordFactory
         .newRecordInstance(GetQueueInfoRequest.class);
@@ -960,7 +960,7 @@ public class TestClientRMService {
         any(ApplicationAccessType.class), anyString(),
         any(ApplicationId.class))).thenReturn(false);
 
-    ClientRMService rmService1 = new ClientRMService(rmContext, yarnScheduler,
+    ClientRMService rmService1 = new ClientRMService(rmContext, scheduler,
         null, mockAclsManager1, mockQueueACLsManager1, null);
     request.setQueueName("testqueue");
     request.setIncludeApplications(true);
@@ -974,12 +974,12 @@ public class TestClientRMService {
   @Test (timeout = 30000)
   @SuppressWarnings ("rawtypes")
   public void testAppSubmit() throws Exception {
-    YarnScheduler yarnScheduler = mockYarnScheduler();
+    ResourceScheduler scheduler = mockResourceScheduler();
     RMContext rmContext = mock(RMContext.class);
-    mockRMContext(yarnScheduler, rmContext);
+    mockRMContext(scheduler, rmContext);
     RMStateStore stateStore = mock(RMStateStore.class);
     when(rmContext.getStateStore()).thenReturn(stateStore);
-    RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
+    RMAppManager appManager = new RMAppManager(rmContext, scheduler,
         null, mock(ApplicationACLsManager.class), new Configuration());
     when(rmContext.getDispatcher().getEventHandler()).thenReturn(
         new EventHandler<Event>() {
@@ -1001,7 +1001,7 @@ public class TestClientRMService {
         any()))
         .thenReturn(true);
     ClientRMService rmService =
-        new ClientRMService(rmContext, yarnScheduler, appManager,
+        new ClientRMService(rmContext, scheduler, appManager,
             mockAclsManager, mockQueueACLsManager, null);
     rmService.init(new Configuration());
 
@@ -1085,15 +1085,15 @@ public class TestClientRMService {
      * 2. Test each of the filters
      */
     // Basic setup
-    YarnScheduler yarnScheduler = mockYarnScheduler();
+    ResourceScheduler scheduler = mockResourceScheduler();
     RMContext rmContext = mock(RMContext.class);
-    mockRMContext(yarnScheduler, rmContext);
+    mockRMContext(scheduler, rmContext);
     RMStateStore stateStore = mock(RMStateStore.class);
     when(rmContext.getStateStore()).thenReturn(stateStore);
     doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext)
     .getRMTimelineCollectorManager();
 
-    RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
+    RMAppManager appManager = new RMAppManager(rmContext, scheduler,
         null, mock(ApplicationACLsManager.class), new Configuration());
     when(rmContext.getDispatcher().getEventHandler()).thenReturn(
         new EventHandler<Event>() {
@@ -1107,7 +1107,7 @@ public class TestClientRMService {
         any()))
         .thenReturn(true);
     ClientRMService rmService =
-        new ClientRMService(rmContext, yarnScheduler, appManager,
+        new ClientRMService(rmContext, scheduler, appManager,
             mockAclsManager, mockQueueACLsManager, null);
     rmService.init(new Configuration());
 
@@ -1238,12 +1238,12 @@ public class TestClientRMService {
   public void testConcurrentAppSubmit()
       throws IOException, InterruptedException, BrokenBarrierException,
       YarnException {
-    YarnScheduler yarnScheduler = mockYarnScheduler();
+    ResourceScheduler scheduler = mockResourceScheduler();
     RMContext rmContext = mock(RMContext.class);
-    mockRMContext(yarnScheduler, rmContext);
+    mockRMContext(scheduler, rmContext);
     RMStateStore stateStore = mock(RMStateStore.class);
     when(rmContext.getStateStore()).thenReturn(stateStore);
-    RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
+    RMAppManager appManager = new RMAppManager(rmContext, scheduler,
         null, mock(ApplicationACLsManager.class), new Configuration());
 
     final ApplicationId appId1 = getApplicationId(100);
@@ -1280,7 +1280,7 @@ public class TestClientRMService {
         .getRMTimelineCollectorManager();
 
     final ClientRMService rmService =
-        new ClientRMService(rmContext, yarnScheduler, appManager, null, null,
+        new ClientRMService(rmContext, scheduler, appManager, null, null,
             null);
     rmService.init(new Configuration());
 
@@ -1339,7 +1339,7 @@ public class TestClientRMService {
     return submitRequest;
   }
 
-  private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)
+  private void mockRMContext(ResourceScheduler scheduler, RMContext rmContext)
       throws IOException {
     Dispatcher dispatcher = mock(Dispatcher.class);
     when(rmContext.getDispatcher()).thenReturn(dispatcher);
@@ -1361,22 +1361,21 @@ public class TestClientRMService {
     queueConfigsByPartition.put("*", queueConfigs);
     queInfo.setQueueConfigurations(queueConfigsByPartition);
 
-    when(yarnScheduler.getQueueInfo(eq("testqueue"), anyBoolean(), 
anyBoolean()))
+    when(scheduler.getQueueInfo(eq("testqueue"), anyBoolean(), anyBoolean()))
         .thenReturn(queInfo);
-    when(yarnScheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(), 
anyBoolean()))
-        .thenThrow(new IOException("queue does not exist"));
+    when(scheduler.getQueueInfo(eq("nonexistentqueue"), anyBoolean(),
+        anyBoolean())).thenThrow(new IOException("queue does not exist"));
     RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
     when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
     SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
     when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
     when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
-    ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
-        yarnScheduler);
+    ConcurrentHashMap<ApplicationId, RMApp> apps =
+        getRMApps(rmContext, scheduler);
     when(rmContext.getRMApps()).thenReturn(apps);
-    when(yarnScheduler.getAppsInQueue(eq("testqueue"))).thenReturn(
+    when(scheduler.getAppsInQueue(eq("testqueue"))).thenReturn(
         getSchedulerApps(apps));
-     ResourceScheduler rs = mock(ResourceScheduler.class);
-     when(rmContext.getScheduler()).thenReturn(rs);
+    when(rmContext.getScheduler()).thenReturn(scheduler);
   }
 
   private ConcurrentHashMap<ApplicationId, RMApp> getRMApps(
@@ -1480,31 +1479,32 @@ public class TestClientRMService {
     return app;
   }
 
-  private static YarnScheduler mockYarnScheduler() throws YarnException {
-    YarnScheduler yarnScheduler = mock(YarnScheduler.class);
-    when(yarnScheduler.getMinimumResourceCapability()).thenReturn(
+  private static ResourceScheduler mockResourceScheduler()
+      throws YarnException {
+    ResourceScheduler scheduler = mock(ResourceScheduler.class);
+    when(scheduler.getMinimumResourceCapability()).thenReturn(
         Resources.createResource(
             YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
-    when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
+    when(scheduler.getMaximumResourceCapability()).thenReturn(
         Resources.createResource(
             YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
-    when(yarnScheduler.getMaximumResourceCapability(any(String.class)))
-        .thenReturn(Resources.createResource(
+    when(scheduler.getMaximumResourceCapability(anyString())).thenReturn(
+        Resources.createResource(
             YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
-    when(yarnScheduler.getAppsInQueue(QUEUE_1)).thenReturn(
+    when(scheduler.getAppsInQueue(QUEUE_1)).thenReturn(
         Arrays.asList(getApplicationAttemptId(101), 
getApplicationAttemptId(102)));
-    when(yarnScheduler.getAppsInQueue(QUEUE_2)).thenReturn(
+    when(scheduler.getAppsInQueue(QUEUE_2)).thenReturn(
         Arrays.asList(getApplicationAttemptId(103)));
     ApplicationAttemptId attemptId = getApplicationAttemptId(1);
-    when(yarnScheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
+    when(scheduler.getAppResourceUsageReport(attemptId)).thenReturn(null);
 
     ResourceCalculator rs = mock(ResourceCalculator.class);
-    when(yarnScheduler.getResourceCalculator()).thenReturn(rs);
+    when(scheduler.getResourceCalculator()).thenReturn(rs);
 
-    when(yarnScheduler.checkAndGetApplicationPriority(any(Priority.class),
+    when(scheduler.checkAndGetApplicationPriority(any(Priority.class),
         any(UserGroupInformation.class), anyString(), 
any(ApplicationId.class)))
             .thenReturn(Priority.newInstance(0));
-    return yarnScheduler;
+    return scheduler;
   }
 
   private ResourceManager setupResourceManager() {
@@ -2427,15 +2427,15 @@ public class TestClientRMService {
      * Submit 3 applications alternately in two queues
      */
     // Basic setup
-    YarnScheduler yarnScheduler = mockYarnScheduler();
+    ResourceScheduler scheduler = mockResourceScheduler();
     RMContext rmContext = mock(RMContext.class);
-    mockRMContext(yarnScheduler, rmContext);
+    mockRMContext(scheduler, rmContext);
     RMStateStore stateStore = mock(RMStateStore.class);
     when(rmContext.getStateStore()).thenReturn(stateStore);
     doReturn(mock(RMTimelineCollectorManager.class)).when(rmContext)
         .getRMTimelineCollectorManager();
 
-    RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler, null,
+    RMAppManager appManager = new RMAppManager(rmContext, scheduler, null,
         mock(ApplicationACLsManager.class), new Configuration());
     when(rmContext.getDispatcher().getEventHandler())
         .thenReturn(new EventHandler<Event>() {
@@ -2454,7 +2454,7 @@ public class TestClientRMService {
     when(appAclsManager.checkAccess(eq(UserGroupInformation.getCurrentUser()),
         any(ApplicationAccessType.class), any(String.class),
         any(ApplicationId.class))).thenReturn(false);
-    ClientRMService rmService = new ClientRMService(rmContext, yarnScheduler,
+    ClientRMService rmService = new ClientRMService(rmContext, scheduler,
         appManager, appAclsManager, queueAclsManager, null);
     rmService.init(new Configuration());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to