This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new aa19cb2  YARN-6492. Generate queue metrics for each partition. 
Contributed by Manikandan R
aa19cb2 is described below

commit aa19cb20ea52de7e3d67a36fe0d99dcaabc9a189
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Tue May 26 13:39:08 2020 -0700

    YARN-6492. Generate queue metrics for each partition. Contributed by 
Manikandan R
    
    (cherry picked from commit c30c23cb665761e997bcfc1dc00908f70b069fa2)
    (cherry picked from commit 7a323a45aad07eed532d684d6dbe8436ba39c31c)
    (cherry picked from commit a80595a6deb3124a3d6d99057e9d5298cd7237d8)
---
 .../hadoop/yarn/util/resource/Resources.java       |  13 +
 .../scheduler/AppSchedulingInfo.java               |  24 +-
 .../scheduler/ContainerUpdateContext.java          |  14 +-
 .../scheduler/PartitionQueueMetrics.java           |  89 +++
 .../resourcemanager/scheduler/QueueMetrics.java    | 560 ++++++++++-----
 .../scheduler/capacity/CSQueueMetrics.java         |   7 +-
 .../scheduler/capacity/LeafQueue.java              |  32 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java    |   4 +-
 .../scheduler/fair/FSAppAttempt.java               |   2 +-
 .../scheduler/fifo/FifoAppAttempt.java             |   2 +-
 .../scheduler/TestPartitionQueueMetrics.java       | 752 +++++++++++++++++++++
 .../scheduler/TestSchedulerApplicationAttempt.java |   4 +-
 .../TestCapacitySchedulerAutoCreatedQueueBase.java |  54 +-
 .../TestCapacitySchedulerAutoQueueCreation.java    |   8 +-
 .../scheduler/capacity/TestLeafQueue.java          |   4 +-
 .../capacity/TestNodeLabelContainerAllocation.java | 405 ++++++++++-
 16 files changed, 1739 insertions(+), 235 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 7826f51..9607257 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -298,6 +298,19 @@ public class Resources {
     return lhs;
   }
 
+  /**
+   * Subtract {@code rhs} from {@code lhs} and reset any negative values to
+   * zero. This call will operate on a copy of {@code lhs}, leaving {@code lhs}
+   * unmodified.
+   *
+   * @param lhs {@link Resource} to subtract from
+   * @param rhs {@link Resource} to subtract
+   * @return the value of lhs after subtraction
+   */
+  public static Resource subtractNonNegative(Resource lhs, Resource rhs) {
+    return subtractFromNonNegative(clone(lhs), rhs);
+  }
+
   public static Resource negate(Resource resource) {
     return subtract(NONE, resource);
   }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 21f052a..793e13c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -37,7 +37,6 @@ import 
org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -531,7 +530,7 @@ public class AppSchedulingInfo {
 
   public ContainerRequest allocate(NodeType type,
       SchedulerNode node, SchedulerRequestKey schedulerKey,
-      Container containerAllocated) {
+      RMContainer containerAllocated) {
     try {
       writeLock.lock();
 
@@ -690,7 +689,7 @@ public class AppSchedulingInfo {
   }
 
   private void updateMetricsForAllocatedContainer(NodeType type,
-      SchedulerNode node, Container containerAllocated) {
+      SchedulerNode node, RMContainer containerAllocated) {
     QueueMetrics metrics = queue.getMetrics();
     if (pending) {
       // once an allocation is done we assume the application is
@@ -703,18 +702,21 @@ public class AppSchedulingInfo {
   }
 
   public static void updateMetrics(ApplicationId applicationId, NodeType type,
-      SchedulerNode node, Container containerAllocated, String user,
+      SchedulerNode node, RMContainer containerAllocated, String user,
       Queue queue) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("allocate: applicationId=" + applicationId + " container="
-          + containerAllocated.getId() + " host=" + containerAllocated
-          .getNodeId().toString() + " user=" + user + " resource="
-          + containerAllocated.getResource() + " type="
-          + type);
+          + containerAllocated.getContainer().getId() + " host="
+          + containerAllocated.getNodeId().toString() + " user="
+          + user + " resource="
+          + containerAllocated.getContainer().getResource() + " type=" + type);
     }
     if(node != null) {
       queue.getMetrics().allocateResources(node.getPartition(), user, 1,
-          containerAllocated.getResource(), true);
+          containerAllocated.getContainer().getResource(), false);
+      queue.getMetrics().decrPendingResources(
+          containerAllocated.getNodeLabelExpression(), user, 1,
+          containerAllocated.getContainer().getResource());
     }
     queue.getMetrics().incrNodeTypeAggregations(user, type);
   }
@@ -768,4 +770,8 @@ public class AppSchedulingInfo {
       this.readLock.unlock();
     }
   }
+
+  public RMContext getRMContext() {
+    return this.rmContext;
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
index 491a9ce..45db040 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java
@@ -161,11 +161,17 @@ public class ContainerUpdateContext {
       // Decrement the pending using a dummy RR with
       // resource = prev update req capability
       if (pendingAsk != null && pendingAsk.getCount() > 0) {
+        Container container = Container.newInstance(UNDEFINED,
+            schedulerNode.getNodeID(), "host:port",
+            pendingAsk.getPerAllocationResource(),
+            schedulerKey.getPriority(), null);
         appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
-            schedulerKey, Container.newInstance(UNDEFINED,
-                schedulerNode.getNodeID(), "host:port",
-                pendingAsk.getPerAllocationResource(),
-                schedulerKey.getPriority(), null));
+            schedulerKey,
+            new RMContainerImpl(container, schedulerKey,
+                appSchedulingInfo.getApplicationAttemptId(),
+                schedulerNode.getNodeID(), appSchedulingInfo.getUser(),
+                appSchedulingInfo.getRMContext(),
+                appPlacementAllocator.getPrimaryRequestedNodePartition()));
       }
     }
   }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java
new file mode 100644
index 0000000..f431318
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+
+@Metrics(context = "yarn")
+public class PartitionQueueMetrics extends QueueMetrics {
+
+  private String partition;
+
+  protected PartitionQueueMetrics(MetricsSystem ms, String queueName,
+      Queue parent, boolean enableUserMetrics, Configuration conf,
+      String partition) {
+    super(ms, queueName, parent, enableUserMetrics, conf);
+    this.partition = partition;
+    if (getParentQueue() != null) {
+      String newQueueName = (getParentQueue() instanceof CSQueue)
+          ? ((CSQueue) getParentQueue()).getQueuePath()
+          : getParentQueue().getQueueName();
+      String parentMetricName =
+          partition + METRIC_NAME_DELIMITER + newQueueName;
+      setParent(getQueueMetrics().get(parentMetricName));
+    }
+  }
+
+  /**
+   * Partition * Queue * User Metrics
+   *
+   * Computes Metrics at Partition (Node Label) * Queue * User Level.
+   *
+   * Sample JMX O/P Structure:
+   *
+   * PartitionQueueMetrics (labelX)
+   *  QueueMetrics (A)
+   *    usermetrics
+   *  QueueMetrics (A1)
+   *    usermetrics
+   *    QueueMetrics (A2)
+   *      usermetrics
+   *    QueueMetrics (B)
+   *      usermetrics
+   *
+   * @return QueueMetrics
+   */
+  @Override
+  public synchronized QueueMetrics getUserMetrics(String userName) {
+    if (users == null) {
+      return null;
+    }
+
+    String partitionJMXStr =
+        (partition.equals(DEFAULT_PARTITION)) ? DEFAULT_PARTITION_JMX_STR
+            : partition;
+
+    QueueMetrics metrics = (PartitionQueueMetrics) users.get(userName);
+    if (metrics == null) {
+      metrics = new PartitionQueueMetrics(this.metricsSystem, this.queueName,
+          null, false, this.conf, this.partition);
+      users.put(userName, metrics);
+      metricsSystem.register(
+          pSourceName(partitionJMXStr).append(qSourceName(queueName))
+              .append(",user=").append(userName).toString(),
+          "Metrics for user '" + userName + "' in queue '" + queueName + "'",
+          ((PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partitionJMXStr)
+              .tag(QUEUE_INFO, queueName)).tag(USER_INFO, userName));
+    }
+    return metrics;
+  }
+}
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
index b05a0ae..4d8a69c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
@@ -54,6 +53,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Splitter;
 
 @InterfaceAudience.Private
@@ -114,17 +114,34 @@ public class QueueMetrics implements MetricsSource {
       info("Queue", "Metrics by queue");
   protected static final MetricsInfo USER_INFO =
       info("User", "Metrics by user");
+  protected static final MetricsInfo PARTITION_INFO =
+      info("Partition", "Metrics by partition");
   static final Splitter Q_SPLITTER =
       Splitter.on('.').omitEmptyStrings().trimResults();
 
   protected final MetricsRegistry registry;
   protected final String queueName;
-  protected final QueueMetrics parent;
+  private QueueMetrics parent;
+  private final Queue parentQueue;
   protected final MetricsSystem metricsSystem;
   protected final Map<String, QueueMetrics> users;
   protected final Configuration conf;
   private QueueMetricsForCustomResources queueMetricsForCustomResources;
 
+  private final boolean enableUserMetrics;
+
+  protected static final MetricsInfo P_RECORD_INFO =
+      info("PartitionQueueMetrics", "Metrics for the resource scheduler");
+
+  // Use "default" to operate NO_LABEL (default) partition internally
+  public static final String DEFAULT_PARTITION = "default";
+
+  // Use "" to register NO_LABEL (default) partition into metrics system
+  public static final String DEFAULT_PARTITION_JMX_STR = "";
+
+  // Metric Name Delimiter
+  public static final String METRIC_NAME_DELIMITER = ".";
+
   private static final String ALLOCATED_RESOURCE_METRIC_PREFIX =
       "AllocatedResource.";
   private static final String ALLOCATED_RESOURCE_METRIC_DESC =
@@ -150,13 +167,17 @@ public class QueueMetrics implements MetricsSource {
   private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC =
     "Aggregate Preempted Seconds for NAME";
 
-  protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, 
-              boolean enableUserMetrics, Configuration conf) {
+  public QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
+      boolean enableUserMetrics, Configuration conf) {
+
     registry = new MetricsRegistry(RECORD_INFO);
     this.queueName = queueName;
+
     this.parent = parent != null ? parent.getMetrics() : null;
-    this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>()
-                                   : null;
+    this.parentQueue = parent;
+    this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>() : 
null;
+    this.enableUserMetrics = enableUserMetrics;
+
     metricsSystem = ms;
     this.conf = conf;
     runningTime = buildBuckets(conf);
@@ -178,12 +199,25 @@ public class QueueMetrics implements MetricsSource {
     return sb;
   }
 
-  public synchronized
-  static QueueMetrics forQueue(String queueName, Queue parent,
-                               boolean enableUserMetrics,
-                              Configuration conf) {
+  static StringBuilder pSourceName(String partition) {
+    StringBuilder sb = new StringBuilder(P_RECORD_INFO.name());
+    sb.append(",partition").append('=').append(partition);
+    return sb;
+  }
+
+  static StringBuilder qSourceName(String queueName) {
+    StringBuilder sb = new StringBuilder();
+    int i = 0;
+    for (String node : Q_SPLITTER.split(queueName)) {
+      sb.append(",q").append(i++).append('=').append(node);
+    }
+    return sb;
+  }
+
+  public synchronized static QueueMetrics forQueue(String queueName,
+      Queue parent, boolean enableUserMetrics, Configuration conf) {
     return forQueue(DefaultMetricsSystem.instance(), queueName, parent,
-                    enableUserMetrics, conf);
+        enableUserMetrics, conf);
   }
 
   /**
@@ -205,28 +239,24 @@ public class QueueMetrics implements MetricsSource {
    *
    * @return A string to {@link QueueMetrics} map.
    */
-  protected static Map<String, QueueMetrics> getQueueMetrics() {
+  public static Map<String, QueueMetrics> getQueueMetrics() {
     return QUEUE_METRICS;
   }
 
-  public synchronized 
-  static QueueMetrics forQueue(MetricsSystem ms, String queueName,
-                                      Queue parent, boolean enableUserMetrics,
-                                     Configuration conf) {
-    QueueMetrics metrics = QUEUE_METRICS.get(queueName);
+  public synchronized static QueueMetrics forQueue(MetricsSystem ms,
+      String queueName, Queue parent, boolean enableUserMetrics,
+      Configuration conf) {
+    QueueMetrics metrics = getQueueMetrics().get(queueName);
     if (metrics == null) {
-      metrics =
-          new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf).
-          tag(QUEUE_INFO, queueName);
-      
+      metrics = new QueueMetrics(ms, queueName, parent, enableUserMetrics, 
conf)
+          .tag(QUEUE_INFO, queueName);
+
       // Register with the MetricsSystems
       if (ms != null) {
-        metrics = 
-            ms.register(
-                sourceName(queueName).toString(), 
-                "Metrics for queue: " + queueName, metrics);
+        metrics = ms.register(sourceName(queueName).toString(),
+            "Metrics for queue: " + queueName, metrics);
       }
-      QUEUE_METRICS.put(queueName, metrics);
+      getQueueMetrics().put(queueName, metrics);
     }
 
     return metrics;
@@ -238,7 +268,8 @@ public class QueueMetrics implements MetricsSource {
     }
     QueueMetrics metrics = users.get(userName);
     if (metrics == null) {
-      metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf);
+      metrics =
+          new QueueMetrics(metricsSystem, queueName, null, false, conf);
       users.put(userName, metrics);
       metricsSystem.register(
           sourceName(queueName).append(",user=").append(userName).toString(),
@@ -248,6 +279,96 @@ public class QueueMetrics implements MetricsSource {
     return metrics;
   }
 
+  /**
+   * Partition * Queue Metrics
+   *
+   * Computes Metrics at Partition (Node Label) * Queue Level.
+   *
+   * Sample JMX O/P Structure:
+   *
+   * PartitionQueueMetrics (labelX)
+   *  QueueMetrics (A)
+   *    metrics
+   *    QueueMetrics (A1)
+   *      metrics
+   *    QueueMetrics (A2)
+   *      metrics
+   *  QueueMetrics (B)
+   *    metrics
+   *
+   * @param partition
+   * @return QueueMetrics
+   */
+  public synchronized QueueMetrics getPartitionQueueMetrics(String partition) {
+
+    String partitionJMXStr = partition;
+
+    if ((partition == null)
+        || (partition.equals(RMNodeLabelsManager.NO_LABEL))) {
+      partition = DEFAULT_PARTITION;
+      partitionJMXStr = DEFAULT_PARTITION_JMX_STR;
+    }
+
+    String metricName = partition + METRIC_NAME_DELIMITER + this.queueName;
+    QueueMetrics metrics = getQueueMetrics().get(metricName);
+
+    if (metrics == null) {
+      QueueMetrics queueMetrics =
+          new PartitionQueueMetrics(metricsSystem, this.queueName, parentQueue,
+              this.enableUserMetrics, this.conf, partition);
+      metricsSystem.register(
+          pSourceName(partitionJMXStr).append(qSourceName(this.queueName))
+              .toString(),
+          "Metrics for queue: " + this.queueName,
+          queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO,
+              this.queueName));
+      getQueueMetrics().put(metricName, queueMetrics);
+      return queueMetrics;
+    } else {
+      return metrics;
+    }
+  }
+
+  /**
+   * Partition Metrics
+   *
+   * Computes Metrics at Partition (Node Label) Level.
+   *
+   * Sample JMX O/P Structure:
+   *
+   * PartitionQueueMetrics (labelX)
+   *  metrics
+   *
+   * @param partition
+   * @return QueueMetrics
+   */
+  private QueueMetrics getPartitionMetrics(String partition) {
+
+    String partitionJMXStr = partition;
+    if ((partition == null)
+        || (partition.equals(RMNodeLabelsManager.NO_LABEL))) {
+      partition = DEFAULT_PARTITION;
+      partitionJMXStr = DEFAULT_PARTITION_JMX_STR;
+    }
+
+    String metricName = partition + METRIC_NAME_DELIMITER;
+    QueueMetrics metrics = getQueueMetrics().get(metricName);
+    if (metrics == null) {
+      metrics = new PartitionQueueMetrics(metricsSystem, this.queueName, null,
+          false, this.conf, partition);
+
+      // Register with the MetricsSystems
+      if (metricsSystem != null) {
+        metricsSystem.register(pSourceName(partitionJMXStr).toString(),
+            "Metrics for partition: " + partitionJMXStr,
+            (PartitionQueueMetrics) metrics.tag(PARTITION_INFO,
+                partitionJMXStr));
+      }
+      getQueueMetrics().put(metricName, metrics);
+    }
+    return metrics;
+  }
+
   private ArrayList<Integer> parseInts(String value) {
     ArrayList<Integer> result = new ArrayList<Integer>();
     for(String s: value.split(",")) {
@@ -388,20 +509,42 @@ public class QueueMetrics implements MetricsSource {
    */
   public void setAvailableResourcesToQueue(String partition, Resource limit) {
     if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      availableMB.set(limit.getMemorySize());
-      availableVCores.set(limit.getVirtualCores());
-      if (queueMetricsForCustomResources != null) {
-        queueMetricsForCustomResources.setAvailable(limit);
-        registerCustomResources(
-            queueMetricsForCustomResources.getAvailableValues(),
-            AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC);
+      setAvailableResources(limit);
+    }
+
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      partitionQueueMetrics.setAvailableResources(limit);
+
+      if(this.queueName.equals("root")) {
+        QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+        if (partitionMetrics != null) {
+          partitionMetrics.setAvailableResources(limit);
+        }
       }
     }
   }
 
   /**
+   * Set Available resources with support for resource vectors.
+   *
+   * @param limit
+   */
+  public void setAvailableResources(Resource limit) {
+    availableMB.set(limit.getMemorySize());
+    availableVCores.set(limit.getVirtualCores());
+    if (queueMetricsForCustomResources != null) {
+      queueMetricsForCustomResources.setAvailable(limit);
+      registerCustomResources(
+          queueMetricsForCustomResources.getAvailableValues(),
+          AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC);
+    }
+  }
+
+  /**
    * Set available resources. To be called by scheduler periodically as
    * resources become available.
+   *
    * @param limit resource limit
    */
   public void setAvailableResourcesToQueue(Resource limit) {
@@ -411,42 +554,70 @@ public class QueueMetrics implements MetricsSource {
   /**
    * Set available resources. To be called by scheduler periodically as
    * resources become available.
+   *
    * @param partition Node Partition
    * @param user
    * @param limit resource limit
    */
-  public void setAvailableResourcesToUser(String partition,
-      String user, Resource limit) {
-    if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+  public void setAvailableResourcesToUser(String partition, String user,
+      Resource limit) {
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
       QueueMetrics userMetrics = getUserMetrics(user);
       if (userMetrics != null) {
-        userMetrics.setAvailableResourcesToQueue(partition, limit);
+        userMetrics.setAvailableResources(limit);
+      }
+    }
+
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      QueueMetrics partitionUserMetrics =
+          partitionQueueMetrics.getUserMetrics(user);
+      if (partitionUserMetrics != null) {
+        partitionUserMetrics.setAvailableResources(limit);
       }
     }
   }
 
   /**
    * Increment pending resource metrics
+   *
    * @param partition Node Partition
    * @param user
    * @param containers
-   * @param res the TOTAL delta of resources note this is different from
-   *            the other APIs which use per container resource
+   * @param res the TOTAL delta of resources note this is different from the
+   *          other APIs which use per container resource
    */
   public void incrPendingResources(String partition, String user,
       int containers, Resource res) {
+
     if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      _incrPendingResources(containers, res);
-      QueueMetrics userMetrics = getUserMetrics(user);
-      if (userMetrics != null) {
-        userMetrics.incrPendingResources(partition, user, containers, res);
-      }
-      if (parent != null) {
-        parent.incrPendingResources(partition, user, containers, res);
+      internalIncrPendingResources(partition, user, containers, res);
+    }
+
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      partitionQueueMetrics.internalIncrPendingResources(partition, user,
+          containers, res);
+      QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+      if (partitionMetrics != null) {
+        partitionMetrics.incrementPendingResources(containers, res);
       }
     }
   }
 
+  public void internalIncrPendingResources(String partition, String user,
+      int containers, Resource res) {
+    incrementPendingResources(containers, res);
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.internalIncrPendingResources(partition, user, containers,
+          res);
+    }
+    if (parent != null) {
+      parent.internalIncrPendingResources(partition, user, containers, res);
+    }
+  }
+
   protected Map<String, Long> initAndGetCustomResources() {
     Map<String, Long> customResources = new HashMap<String, Long>();
     ResourceInformation[] resources = ResourceUtils.getResourceTypesArray();
@@ -505,7 +676,7 @@ public class QueueMetrics implements MetricsSource {
     }
   }
 
-  private void _incrPendingResources(int containers, Resource res) {
+  private void incrementPendingResources(int containers, Resource res) {
     pendingContainers.incr(containers);
     pendingMB.incr(res.getMemorySize() * containers);
     pendingVCores.incr(res.getVirtualCores() * containers);
@@ -518,19 +689,36 @@ public class QueueMetrics implements MetricsSource {
 
   public void decrPendingResources(String partition, String user,
       int containers, Resource res) {
+
     if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      _decrPendingResources(containers, res);
-      QueueMetrics userMetrics = getUserMetrics(user);
-      if (userMetrics != null) {
-        userMetrics.decrPendingResources(partition, user, containers, res);
-      }
-      if (parent != null) {
-        parent.decrPendingResources(partition, user, containers, res);
+      internalDecrPendingResources(partition, user, containers, res);
+    }
+
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      partitionQueueMetrics.internalDecrPendingResources(partition, user,
+          containers, res);
+      QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+      if (partitionMetrics != null) {
+        partitionMetrics.decrementPendingResources(containers, res);
       }
     }
   }
 
-  private void _decrPendingResources(int containers, Resource res) {
+  protected void internalDecrPendingResources(String partition, String user,
+      int containers, Resource res) {
+    decrementPendingResources(containers, res);
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.internalDecrPendingResources(partition, user, containers,
+          res);
+    }
+    if (parent != null) {
+      parent.internalDecrPendingResources(partition, user, containers, res);
+    }
+  }
+
+  private void decrementPendingResources(int containers, Resource res) {
     pendingContainers.decr(containers);
     pendingMB.decr(res.getMemorySize() * containers);
     pendingVCores.decr(res.getVirtualCores() * containers);
@@ -560,35 +748,62 @@ public class QueueMetrics implements MetricsSource {
     }
   }
 
-  public void allocateResources(String partition, String user,
-      int containers, Resource res, boolean decrPending) {
+  public void allocateResources(String partition, String user, int containers,
+      Resource res, boolean decrPending) {
+
     if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      allocatedContainers.incr(containers);
-      aggregateContainersAllocated.incr(containers);
-
-      allocatedMB.incr(res.getMemorySize() * containers);
-      allocatedVCores.incr(res.getVirtualCores() * containers);
-      if (queueMetricsForCustomResources != null) {
-        queueMetricsForCustomResources.increaseAllocated(res, containers);
-        registerCustomResources(
-            queueMetricsForCustomResources.getAllocatedValues(),
-            ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
-      }
+      internalAllocateResources(partition, user, containers, res, decrPending);
+    }
 
-      if (decrPending) {
-        _decrPendingResources(containers, res);
-      }
-      QueueMetrics userMetrics = getUserMetrics(user);
-      if (userMetrics != null) {
-        userMetrics.allocateResources(partition, user,
-            containers, res, decrPending);
-      }
-      if (parent != null) {
-        parent.allocateResources(partition, user, containers, res, 
decrPending);
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      partitionQueueMetrics.internalAllocateResources(partition, user,
+          containers, res, decrPending);
+      QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+      if (partitionMetrics != null) {
+        partitionMetrics.computeAllocateResources(containers, res, 
decrPending);
       }
     }
   }
 
+  public void internalAllocateResources(String partition, String user,
+      int containers, Resource res, boolean decrPending) {
+    computeAllocateResources(containers, res, decrPending);
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.internalAllocateResources(partition, user, containers, res,
+          decrPending);
+    }
+    if (parent != null) {
+      parent.internalAllocateResources(partition, user, containers, res,
+          decrPending);
+    }
+  }
+
+  /**
+   * Allocate Resources for a partition with support for resource vectors.
+   *
+   * @param containers number of containers
+   * @param res resource containing memory size, vcores etc
+   * @param decrPending decides whether to decrease pending resource or not
+   */
+  private void computeAllocateResources(int containers, Resource res,
+      boolean decrPending) {
+    allocatedContainers.incr(containers);
+    aggregateContainersAllocated.incr(containers);
+    allocatedMB.incr(res.getMemorySize() * containers);
+    allocatedVCores.incr(res.getVirtualCores() * containers);
+    if (queueMetricsForCustomResources != null) {
+      queueMetricsForCustomResources.increaseAllocated(res, containers);
+      registerCustomResources(
+          queueMetricsForCustomResources.getAllocatedValues(),
+          ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
+    }
+    if (decrPending) {
+      decrementPendingResources(containers, res);
+    }
+  }
+
   /**
    * Allocate Resource for container size change.
    * @param partition Node Partition
@@ -596,82 +811,80 @@ public class QueueMetrics implements MetricsSource {
    * @param res
    */
   public void allocateResources(String partition, String user, Resource res) {
-    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      allocatedMB.incr(res.getMemorySize());
-      allocatedVCores.incr(res.getVirtualCores());
-      if (queueMetricsForCustomResources != null) {
-        queueMetricsForCustomResources.increaseAllocated(res);
-        registerCustomResources(
-            queueMetricsForCustomResources.getAllocatedValues(),
-            ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
-      }
+    allocatedMB.incr(res.getMemorySize());
+    allocatedVCores.incr(res.getVirtualCores());
+    if (queueMetricsForCustomResources != null) {
+      queueMetricsForCustomResources.increaseAllocated(res);
+      registerCustomResources(
+          queueMetricsForCustomResources.getAllocatedValues(),
+          ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
+    }
 
-      pendingMB.decr(res.getMemorySize());
-      pendingVCores.decr(res.getVirtualCores());
-      if (queueMetricsForCustomResources != null) {
-        queueMetricsForCustomResources.decreasePending(res);
-        registerCustomResources(
-            queueMetricsForCustomResources.getPendingValues(),
-            PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
-      }
+    pendingMB.decr(res.getMemorySize());
+    pendingVCores.decr(res.getVirtualCores());
+    if (queueMetricsForCustomResources != null) {
+      queueMetricsForCustomResources.decreasePending(res);
+      
registerCustomResources(queueMetricsForCustomResources.getPendingValues(),
+          PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
+    }
 
-      QueueMetrics userMetrics = getUserMetrics(user);
-      if (userMetrics != null) {
-        userMetrics.allocateResources(partition, user, res);
-      }
-      if (parent != null) {
-        parent.allocateResources(partition, user, res);
-      }
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.allocateResources(partition, user, res);
+    }
+    if (parent != null) {
+      parent.allocateResources(partition, user, res);
     }
   }
 
-  public void releaseResources(String partition,
-      String user, int containers, Resource res) {
+  public void releaseResources(String partition, String user, int containers,
+      Resource res) {
+
     if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      allocatedContainers.decr(containers);
-      aggregateContainersReleased.incr(containers);
-      allocatedMB.decr(res.getMemorySize() * containers);
-      allocatedVCores.decr(res.getVirtualCores() * containers);
-      if (queueMetricsForCustomResources != null) {
-        queueMetricsForCustomResources.decreaseAllocated(res, containers);
-        registerCustomResources(
-            queueMetricsForCustomResources.getAllocatedValues(),
-            ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
-      }
+      internalReleaseResources(partition, user, containers, res);
+    }
 
-      QueueMetrics userMetrics = getUserMetrics(user);
-      if (userMetrics != null) {
-        userMetrics.releaseResources(partition, user, containers, res);
-      }
-      if (parent != null) {
-        parent.releaseResources(partition, user, containers, res);
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      partitionQueueMetrics.internalReleaseResources(partition, user,
+          containers, res);
+      QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+      if (partitionMetrics != null) {
+        partitionMetrics.computeReleaseResources(containers, res);
       }
     }
   }
 
+  public void internalReleaseResources(String partition, String user,
+      int containers, Resource res) {
+
+    computeReleaseResources(containers, res);
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.internalReleaseResources(partition, user, containers, res);
+    }
+    if (parent != null) {
+      parent.internalReleaseResources(partition, user, containers, res);
+    }
+  }
+
   /**
-   * Release Resource for container size change.
+   * Release Resources for a partition with support for resource vectors.
    *
-   * @param user
-   * @param res
+   * @param containers number of containers
+   * @param res resource containing memory size, vcores etc
    */
-  private void releaseResources(String user, Resource res) {
-    allocatedMB.decr(res.getMemorySize());
-    allocatedVCores.decr(res.getVirtualCores());
+  private void computeReleaseResources(int containers, Resource res) {
+    allocatedContainers.decr(containers);
+    aggregateContainersReleased.incr(containers);
+    allocatedMB.decr(res.getMemorySize() * containers);
+    allocatedVCores.decr(res.getVirtualCores() * containers);
     if (queueMetricsForCustomResources != null) {
-      queueMetricsForCustomResources.decreaseAllocated(res);
+      queueMetricsForCustomResources.decreaseAllocated(res, containers);
       registerCustomResources(
           queueMetricsForCustomResources.getAllocatedValues(),
           ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
     }
-
-    QueueMetrics userMetrics = getUserMetrics(user);
-    if (userMetrics != null) {
-      userMetrics.releaseResources(user, res);
-    }
-    if (parent != null) {
-      parent.releaseResources(user, res);
-    }
   }
 
   public void preemptContainer() {
@@ -721,11 +934,31 @@ public class QueueMetrics implements MetricsSource {
 
   public void reserveResource(String partition, String user, Resource res) {
     if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      reserveResource(user, res);
+      internalReserveResources(partition, user, res);
+    }
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      partitionQueueMetrics.internalReserveResources(partition, user, res);
+      QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+      if (partitionMetrics != null) {
+        partitionMetrics.incrReserveResources(res);
+      }
+    }
+  }
+
+  protected void internalReserveResources(String partition, String user,
+      Resource res) {
+    incrReserveResources(res);
+    QueueMetrics userMetrics = getUserMetrics(user);
+    if (userMetrics != null) {
+      userMetrics.internalReserveResources(partition, user, res);
+    }
+    if (parent != null) {
+      parent.internalReserveResources(partition, user, res);
     }
   }
 
-  public void reserveResource(String user, Resource res) {
+  public void incrReserveResources(Resource res) {
     reservedContainers.incr();
     reservedMB.incr(res.getMemorySize());
     reservedVCores.incr(res.getVirtualCores());
@@ -735,17 +968,37 @@ public class QueueMetrics implements MetricsSource {
           queueMetricsForCustomResources.getReservedValues(),
           RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
     }
+  }
+
+  public void unreserveResource(String partition, String user, Resource res) {
+    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      internalUnReserveResources(partition, user, res);
+    }
+    QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
+    if (partitionQueueMetrics != null) {
+      partitionQueueMetrics.internalUnReserveResources(partition, user, res);
+      QueueMetrics partitionMetrics = getPartitionMetrics(partition);
+      if (partitionMetrics != null) {
+        partitionMetrics.decrReserveResource(res);
+      }
+    }
+  }
+
+  protected void internalUnReserveResources(String partition, String user,
+      Resource res) {
+    decrReserveResource(res);
     QueueMetrics userMetrics = getUserMetrics(user);
     if (userMetrics != null) {
-      userMetrics.reserveResource(user, res);
+      userMetrics.internalUnReserveResources(partition, user, res);
     }
     if (parent != null) {
-      parent.reserveResource(user, res);
+      parent.internalUnReserveResources(partition, user, res);
     }
   }
 
-  private void unreserveResource(String user, Resource res) {
-    reservedContainers.decr();
+  public void decrReserveResource(Resource res) {
+    int containers = 1;
+    reservedContainers.decr(containers);
     reservedMB.decr(res.getMemorySize());
     reservedVCores.decr(res.getVirtualCores());
     if (queueMetricsForCustomResources != null) {
@@ -754,19 +1007,6 @@ public class QueueMetrics implements MetricsSource {
           queueMetricsForCustomResources.getReservedValues(),
           RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
     }
-    QueueMetrics userMetrics = getUserMetrics(user);
-    if (userMetrics != null) {
-      userMetrics.unreserveResource(user, res);
-    }
-    if (parent != null) {
-      parent.unreserveResource(user, res);
-    }
-  }
-
-  public void unreserveResource(String partition, String user, Resource res) {
-    if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
-      unreserveResource(user, res);
-    }
   }
 
   public void incrActiveUsers() {
@@ -980,4 +1220,12 @@ public class QueueMetrics implements MetricsSource {
       QueueMetricsForCustomResources metrics) {
     this.queueMetricsForCustomResources = metrics;
   }
-}
+
+  public void setParent(QueueMetrics parent) {
+    this.parent = parent;
+  }
+
+  public Queue getParentQueue() {
+    return parentQueue;
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java
index e9a0aaf..106f565 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java
@@ -229,7 +229,7 @@ public class CSQueueMetrics extends QueueMetrics {
   public synchronized static CSQueueMetrics forQueue(String queueName,
       Queue parent, boolean enableUserMetrics, Configuration conf) {
     MetricsSystem ms = DefaultMetricsSystem.instance();
-    QueueMetrics metrics = QueueMetrics.getQueueMetrics().get(queueName);
+    QueueMetrics metrics = getQueueMetrics().get(queueName);
     if (metrics == null) {
       metrics =
           new CSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
@@ -241,7 +241,7 @@ public class CSQueueMetrics extends QueueMetrics {
             ms.register(sourceName(queueName).toString(), "Metrics for queue: "
                 + queueName, metrics);
       }
-      QueueMetrics.getQueueMetrics().put(queueName, metrics);
+      getQueueMetrics().put(queueName, metrics);
     }
 
     return (CSQueueMetrics) metrics;
@@ -254,7 +254,8 @@ public class CSQueueMetrics extends QueueMetrics {
     }
     CSQueueMetrics metrics = (CSQueueMetrics) users.get(userName);
     if (metrics == null) {
-      metrics = new CSQueueMetrics(metricsSystem, queueName, null, false, 
conf);
+      metrics =
+        new CSQueueMetrics(metricsSystem, queueName, null, false, conf);
       users.put(userName, metrics);
       metricsSystem.register(
           sourceName(queueName).append(",user=").append(userName).toString(),
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 72dce8e..d9883c0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1402,8 +1402,9 @@ public class LeafQueue extends AbstractCSQueue {
             : getQueueMaxResource(partition);
 
     Resource headroom = Resources.componentwiseMin(
-        Resources.subtract(userLimitResource, user.getUsed(partition)),
-        Resources.subtract(currentPartitionResourceLimit,
+        Resources.subtractNonNegative(userLimitResource,
+            user.getUsed(partition)),
+        Resources.subtractNonNegative(currentPartitionResourceLimit,
             queueUsage.getUsed(partition)));
     // Normalize it before return
     headroom =
@@ -1717,11 +1718,16 @@ public class LeafQueue extends AbstractCSQueue {
       User user = usersManager.updateUserResourceUsage(userName, resource,
           nodePartition, true);
 
-      // Note this is a bit unconventional since it gets the object and 
modifies
-      // it here, rather then using set routine
-      Resources.subtractFrom(application.getHeadroom(), resource); // headroom
-      metrics.setAvailableResourcesToUser(nodePartition,
-          userName, application.getHeadroom());
+      Resource partitionHeadroom = Resources.createResource(0, 0);
+      if (metrics.getUserMetrics(userName) != null) {
+        partitionHeadroom = getHeadroom(user,
+            cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
+            getResourceLimitForActiveUsers(userName, clusterResource,
+                nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
+            nodePartition);
+      }
+      metrics.setAvailableResourcesToUser(nodePartition, userName,
+          partitionHeadroom);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug(getQueueName() + " user=" + userName + " used="
@@ -1760,8 +1766,16 @@ public class LeafQueue extends AbstractCSQueue {
       User user = usersManager.updateUserResourceUsage(userName, resource,
           nodePartition, false);
 
-      metrics.setAvailableResourcesToUser(nodePartition,
-          userName, application.getHeadroom());
+      Resource partitionHeadroom = Resources.createResource(0, 0);
+      if (metrics.getUserMetrics(userName) != null) {
+        partitionHeadroom = getHeadroom(user,
+            cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
+            getResourceLimitForActiveUsers(userName, clusterResource,
+                nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
+            nodePartition);
+      }
+      metrics.setAvailableResourcesToUser(nodePartition, userName,
+          partitionHeadroom);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug(
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 5927644..589e9a1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -592,7 +592,7 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
                 allocation.getAllocationLocalityType(),
                 schedulerContainer.getSchedulerNode(),
                 schedulerContainer.getSchedulerRequestKey(),
-                schedulerContainer.getRmContainer().getContainer());
+                  schedulerContainer.getRmContainer());
             ((RMContainerImpl) rmContainer).setContainerRequest(
                 containerRequest);
 
@@ -606,7 +606,7 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
             AppSchedulingInfo.updateMetrics(getApplicationId(),
                 allocation.getAllocationLocalityType(),
                 schedulerContainer.getSchedulerNode(),
-                schedulerContainer.getRmContainer().getContainer(), getUser(),
+                schedulerContainer.getRmContainer(), getUser(),
                 getQueue());
           }
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 0eedec0..f0023f3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -463,7 +463,7 @@ public class FSAppAttempt extends 
SchedulerApplicationAttempt
 
       // Update consumption and track allocations
       ContainerRequest containerRequest = appSchedulingInfo.allocate(
-          type, node, schedulerKey, container);
+            type, node, schedulerKey, rmContainer);
       this.attemptResourceUsage.incUsed(container.getResource());
       getQueue().incUsedResource(container.getResource());
 
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
index 074d5da..67b8d6a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java
@@ -82,7 +82,7 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
 
       // Update consumption and track allocations
       ContainerRequest containerRequest = appSchedulingInfo.allocate(
-          type, node, schedulerKey, container);
+            type, node, schedulerKey, rmContainer);
 
       attemptResourceUsage.incUsed(node.getPartition(),
           container.getResource());
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java
new file mode 100644
index 0000000..eb240d1
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java
@@ -0,0 +1,752 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueMetrics;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPartitionQueueMetrics {
+
+  static final int GB = 1024; // MB
+  private static final Configuration CONF = new Configuration();
+
+  private MetricsSystem ms;
+
+  @Before
+  public void setUp() {
+    ms = new MetricsSystemImpl();
+    QueueMetrics.clearQueueMetrics();
+    PartitionQueueMetrics.clearQueueMetrics();
+  }
+
+  @After
+  public void tearDown() {
+    ms.shutdown();
+  }
+
+  /**
+   * Structure:
+   * Both queues, q1 & q2 has been configured to run in only 1 partition, x.
+   *
+   * root
+   * / \
+   * q1 q2
+   *
+   * @throws Exception
+   */
+
+  @Test
+  public void testSinglePartitionWithSingleLevelQueueMetrics()
+      throws Exception {
+
+    String parentQueueName = "root";
+    Queue parentQueue = mock(Queue.class);
+    String user = "alice";
+
+    QueueMetrics root = QueueMetrics.forQueue(ms, "root", null, true, CONF);
+    when(parentQueue.getMetrics()).thenReturn(root);
+    when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+    QueueMetrics q1 =
+        QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
+    QueueMetrics q2 =
+        QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF);
+
+    q1.submitApp(user);
+    q1.submitAppAttempt(user);
+
+    root.setAvailableResourcesToQueue("x",
+        Resources.createResource(200 * GB, 200));
+    q1.setAvailableResourcesToQueue("x",
+        Resources.createResource(100 * GB, 100));
+
+    q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
+
+    MetricsSource partitionSource = partitionSource(ms, "x");
+    MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
+    MetricsSource q1Source = queueSource(ms, "x", "root.q1");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
+
+    q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1));
+    MetricsSource q2Source = queueSource(ms, "x", "root.q2");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
+    checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
+  }
+
+  /**
+   * Structure:
+   * Both queues, q1 & q2 has been configured to run in both partitions, x & y.
+   *
+   * root
+   * / \
+   * q1 q2
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testTwoPartitionWithSingleLevelQueueMetrics() throws Exception {
+
+    String parentQueueName = "root";
+    String user = "alice";
+
+    QueueMetrics root =
+        QueueMetrics.forQueue(ms, parentQueueName, null, false, CONF);
+    Queue parentQueue = mock(Queue.class);
+    when(parentQueue.getMetrics()).thenReturn(root);
+    when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+
+    QueueMetrics q1 =
+        QueueMetrics.forQueue(ms, "root.q1", parentQueue, false, CONF);
+    QueueMetrics q2 =
+        QueueMetrics.forQueue(ms, "root.q2", parentQueue, false, CONF);
+
+    AppSchedulingInfo app = mockApp(user);
+    q1.submitApp(user);
+    q1.submitAppAttempt(user);
+
+    root.setAvailableResourcesToQueue("x",
+        Resources.createResource(200 * GB, 200));
+    q1.setAvailableResourcesToQueue("x",
+        Resources.createResource(100 * GB, 100));
+
+    q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
+
+    MetricsSource xPartitionSource = partitionSource(ms, "x");
+    MetricsSource xRootQueueSource = queueSource(ms, "x", parentQueueName);
+    MetricsSource q1Source = queueSource(ms, "x", "root.q1");
+
+    checkResources(xPartitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(xRootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
+
+    root.setAvailableResourcesToQueue("y",
+        Resources.createResource(400 * GB, 400));
+    q2.setAvailableResourcesToQueue("y",
+        Resources.createResource(200 * GB, 200));
+
+    q2.incrPendingResources("y", user, 3, Resource.newInstance(1024, 1));
+
+    MetricsSource yPartitionSource = partitionSource(ms, "y");
+    MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName);
+    MetricsSource q2Source = queueSource(ms, "y", "root.q2");
+
+    checkResources(yPartitionSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3);
+    checkResources(yRootQueueSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3);
+    checkResources(q2Source, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
+  }
+
+  /**
+   * Structure:
+   * Both queues, q1 has been configured to run in multiple partitions, x & y.
+   *
+   * root
+   * /
+   * q1
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMultiplePartitionWithSingleQueueMetrics() throws Exception {
+
+    String parentQueueName = "root";
+    Queue parentQueue = mock(Queue.class);
+
+    QueueMetrics root =
+        QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
+    when(parentQueue.getMetrics()).thenReturn(root);
+    when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+
+    QueueMetrics q1 =
+        QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
+
+    root.setAvailableResourcesToQueue("x",
+        Resources.createResource(200 * GB, 200));
+    root.setAvailableResourcesToQueue("y",
+        Resources.createResource(300 * GB, 300));
+
+    q1.incrPendingResources("x", "test_user", 2, Resource.newInstance(1024, 
1));
+
+    MetricsSource partitionSource = partitionSource(ms, "x");
+    MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
+    MetricsSource q1Source = queueSource(ms, "x", "root.q1");
+    MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
+    checkResources(userSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
+
+    q1.incrPendingResources("x", "test_user", 3, Resource.newInstance(1024, 
1));
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
+    checkResources(q1Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+    checkResources(userSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+
+    q1.incrPendingResources("x", "test_user1", 4,
+        Resource.newInstance(1024, 1));
+    MetricsSource userSource1 = userSource(ms, "x", "test_user1", "root.q1");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9);
+    checkResources(q1Source, 0, 0, 0, 0, 0, 9 * GB, 9, 9);
+    checkResources(userSource1, 0, 0, 0, 0, 0, 4 * GB, 4, 4);
+
+    q1.incrPendingResources("y", "test_user1", 6,
+        Resource.newInstance(1024, 1));
+    MetricsSource partitionSourceY = partitionSource(ms, "y");
+    MetricsSource rootQueueSourceY = queueSource(ms, "y", parentQueueName);
+    MetricsSource q1SourceY = queueSource(ms, "y", "root.q1");
+    MetricsSource userSourceY = userSource(ms, "y", "test_user1", "root.q1");
+
+    checkResources(partitionSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6);
+    checkResources(rootQueueSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6);
+    checkResources(q1SourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
+    checkResources(userSourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
+  }
+
+  /**
+   * Structure:
+   * Both queues, q1 & q2 has been configured to run in both partitions, x & y.
+   *
+   * root
+   * / \
+   * q1 q2
+   * q1
+   * / \
+   * q11 q12
+   * q2
+   * / \
+   * q21 q22
+   *
+   * @throws Exception
+   */
+
+  @Test
+  public void testMultiplePartitionsWithMultiLevelQueuesMetrics()
+      throws Exception {
+
+    String parentQueueName = "root";
+    Queue parentQueue = mock(Queue.class);
+
+    QueueMetrics root =
+        QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
+    when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+    when(parentQueue.getMetrics()).thenReturn(root);
+
+    QueueMetrics q1 =
+        QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
+    Queue childQueue1 = mock(Queue.class);
+    when(childQueue1.getQueueName()).thenReturn("root.q1");
+    when(childQueue1.getMetrics()).thenReturn(q1);
+
+    QueueMetrics q11 =
+        QueueMetrics.forQueue(ms, "root.q1.q11", childQueue1, true, CONF);
+    QueueMetrics q12 =
+        QueueMetrics.forQueue(ms, "root.q1.q12", childQueue1, true, CONF);
+
+    QueueMetrics q2 =
+        QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF);
+    Queue childQueue2 = mock(Queue.class);
+    when(childQueue2.getQueueName()).thenReturn("root.q2");
+    when(childQueue2.getMetrics()).thenReturn(q2);
+
+    QueueMetrics q21 =
+        QueueMetrics.forQueue(ms, "root.q2.q21", childQueue2, true, CONF);
+    QueueMetrics q22 =
+        QueueMetrics.forQueue(ms, "root.q2.q22", childQueue2, true, CONF);
+
+    root.setAvailableResourcesToQueue("x",
+        Resources.createResource(200 * GB, 200));
+
+    q1.setAvailableResourcesToQueue("x",
+        Resources.createResource(100 * GB, 100));
+    q11.setAvailableResourcesToQueue("x",
+        Resources.createResource(50 * GB, 50));
+
+    q11.incrPendingResources("x", "test_user", 2,
+        Resource.newInstance(1024, 1));
+
+    MetricsSource partitionSource = partitionSource(ms, "x");
+    MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
+    MetricsSource q1Source = queueSource(ms, "x", "root.q1");
+    MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
+    checkResources(userSource, 0, 0, 0, 0 * GB, 0, 2 * GB, 2, 2);
+
+    q11.incrPendingResources("x", "test_user", 4,
+        Resource.newInstance(1024, 1));
+
+    MetricsSource q11Source = queueSource(ms, "x", "root.q1.q11");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6);
+    checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 6 * GB, 6, 6);
+    checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 6 * GB, 6, 6);
+    checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6);
+
+    q11.incrPendingResources("x", "test_user1", 5,
+        Resource.newInstance(1024, 1));
+
+    MetricsSource q1UserSource1 = userSource(ms, "x", "test_user1", "root.q1");
+    MetricsSource userSource1 =
+        userSource(ms, "x", "test_user1", "root.q1.q11");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11);
+    checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 11 * GB, 11, 11);
+    checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 11 * GB, 11, 11);
+    checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6);
+    checkResources(q1UserSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5);
+    checkResources(userSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5);
+
+    q12.incrPendingResources("x", "test_user", 5,
+        Resource.newInstance(1024, 1));
+    MetricsSource q12Source = queueSource(ms, "x", "root.q1.q12");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16);
+    checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 16 * GB, 16, 16);
+    checkResources(q12Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+
+    root.setAvailableResourcesToQueue("y",
+        Resources.createResource(200 * GB, 200));
+    q1.setAvailableResourcesToQueue("y",
+        Resources.createResource(100 * GB, 100));
+    q12.setAvailableResourcesToQueue("y",
+        Resources.createResource(50 * GB, 50));
+
+    q12.incrPendingResources("y", "test_user", 3,
+        Resource.newInstance(1024, 1));
+
+    MetricsSource yPartitionSource = partitionSource(ms, "y");
+    MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName);
+    MetricsSource q1YSource = queueSource(ms, "y", "root.q1");
+    MetricsSource q12YSource = queueSource(ms, "y", "root.q1.q12");
+
+    checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
+    checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
+    checkResources(q1YSource, 0, 0, 0, 100 * GB, 100, 3 * GB, 3, 3);
+    checkResources(q12YSource, 0, 0, 0, 50 * GB, 50, 3 * GB, 3, 3);
+
+    root.setAvailableResourcesToQueue("y",
+        Resources.createResource(200 * GB, 200));
+    q2.setAvailableResourcesToQueue("y",
+        Resources.createResource(100 * GB, 100));
+    q21.setAvailableResourcesToQueue("y",
+        Resources.createResource(50 * GB, 50));
+
+    q21.incrPendingResources("y", "test_user", 5,
+        Resource.newInstance(1024, 1));
+    MetricsSource q21Source = queueSource(ms, "y", "root.q2.q21");
+    MetricsSource q2YSource = queueSource(ms, "y", "root.q2");
+
+    checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8);
+    checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8);
+    checkResources(q2YSource, 0, 0, 0, 100 * GB, 100, 5 * GB, 5, 5);
+    checkResources(q21Source, 0, 0, 0, 50 * GB, 50, 5 * GB, 5, 5);
+
+    q22.incrPendingResources("y", "test_user", 6,
+        Resource.newInstance(1024, 1));
+    MetricsSource q22Source = queueSource(ms, "y", "root.q2.q22");
+
+    checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14);
+    checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14);
+    checkResources(q22Source, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
+  }
+
+  @Test
+  public void testTwoLevelWithUserMetrics() {
+    String parentQueueName = "root";
+    String leafQueueName = "root.leaf";
+    String user = "alice";
+    String partition = "x";
+
+    QueueMetrics parentMetrics =
+        QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
+    Queue parentQueue = mock(Queue.class);
+    when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+    when(parentQueue.getMetrics()).thenReturn(parentMetrics);
+    QueueMetrics metrics =
+        QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, CONF);
+    AppSchedulingInfo app = mockApp(user);
+
+    metrics.submitApp(user);
+    metrics.submitAppAttempt(user);
+
+    parentMetrics.setAvailableResourcesToQueue(partition,
+        Resources.createResource(100 * GB, 100));
+    metrics.setAvailableResourcesToQueue(partition,
+        Resources.createResource(100 * GB, 100));
+    parentMetrics.setAvailableResourcesToUser(partition, user,
+        Resources.createResource(10 * GB, 10));
+    metrics.setAvailableResourcesToUser(partition, user,
+        Resources.createResource(10 * GB, 10));
+    metrics.incrPendingResources(partition, user, 6,
+        Resources.createResource(3 * GB, 3));
+
+    MetricsSource partitionSource = partitionSource(ms, partition);
+    MetricsSource parentQueueSource =
+        queueSource(ms, partition, parentQueueName);
+    MetricsSource queueSource = queueSource(ms, partition, leafQueueName);
+    MetricsSource userSource = userSource(ms, partition, user, leafQueueName);
+    MetricsSource userSource1 =
+        userSource(ms, partition, user, parentQueueName);
+
+    checkResources(queueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6, 
0,
+        0, 0);
+    checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 
18,
+        6, 0, 0, 0);
+    checkResources(userSource, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0, 
0,
+        0);
+    checkResources(userSource1, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0,
+        0, 0);
+    checkResources(partitionSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18,
+        6, 0, 0, 0);
+
+    metrics.runAppAttempt(app.getApplicationId(), user);
+
+    metrics.allocateResources(partition, user, 3,
+        Resources.createResource(1 * GB, 1), true);
+    metrics.reserveResource(partition, user,
+        Resources.createResource(3 * GB, 3));
+
+    // Available resources is set externally, as it depends on dynamic
+    // configurable cluster/queue resources
+    checkResources(queueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB, 15,
+        3, 3 * GB, 3, 1);
+    checkResources(parentQueueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100,
+        15 * GB, 15, 3, 3 * GB, 3, 1);
+    checkResources(partitionSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB,
+        15, 3, 3 * GB, 3, 1);
+    checkResources(userSource, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3,
+        3 * GB, 3, 1);
+    checkResources(userSource1, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 
3,
+        3 * GB, 3, 1);
+
+    metrics.allocateResources(partition, user, 3,
+        Resources.createResource(1 * GB, 1), true);
+
+    checkResources(queueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100, 12 * GB, 12,
+        0, 3 * GB, 3, 1);
+    checkResources(parentQueueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100,
+        12 * GB, 12, 0, 3 * GB, 3, 1);
+
+    metrics.releaseResources(partition, user, 1,
+        Resources.createResource(2 * GB, 2));
+    metrics.unreserveResource(partition, user,
+        Resources.createResource(3 * GB, 3));
+    checkResources(queueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB, 12,
+        0, 0, 0, 0);
+    checkResources(parentQueueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100,
+        12 * GB, 12, 0, 0, 0, 0);
+    checkResources(partitionSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB,
+        12, 0, 0, 0, 0);
+    checkResources(userSource, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0,
+        0, 0, 0);
+    checkResources(userSource1, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 
0,
+        0, 0, 0);
+
+    metrics.finishAppAttempt(app.getApplicationId(), app.isPending(),
+        app.getUser());
+
+    metrics.finishApp(user, RMAppState.FINISHED);
+  }
+
+  @Test
+  public void testThreeLevelWithUserMetrics() {
+    String parentQueueName = "root";
+    String leafQueueName = "root.leaf";
+    String leafQueueName1 = "root.leaf.leaf1";
+    String user = "alice";
+    String partitionX = "x";
+    String partitionY = "y";
+
+    QueueMetrics parentMetrics =
+        QueueMetrics.forQueue(parentQueueName, null, true, CONF);
+    Queue parentQueue = mock(Queue.class);
+    when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+    when(parentQueue.getMetrics()).thenReturn(parentMetrics);
+    QueueMetrics metrics =
+        QueueMetrics.forQueue(leafQueueName, parentQueue, true, CONF);
+    Queue leafQueue = mock(Queue.class);
+    when(leafQueue.getQueueName()).thenReturn(leafQueueName);
+    when(leafQueue.getMetrics()).thenReturn(metrics);
+    QueueMetrics metrics1 =
+        QueueMetrics.forQueue(leafQueueName1, leafQueue, true, CONF);
+    AppSchedulingInfo app = mockApp(user);
+
+    metrics1.submitApp(user);
+    metrics1.submitAppAttempt(user);
+
+    parentMetrics.setAvailableResourcesToQueue(partitionX,
+        Resources.createResource(200 * GB, 200));
+    parentMetrics.setAvailableResourcesToQueue(partitionY,
+        Resources.createResource(500 * GB, 500));
+    metrics.setAvailableResourcesToQueue(partitionX,
+        Resources.createResource(100 * GB, 100));
+    metrics.setAvailableResourcesToQueue(partitionY,
+        Resources.createResource(400 * GB, 400));
+    metrics1.setAvailableResourcesToQueue(partitionX,
+        Resources.createResource(50 * GB, 50));
+    metrics1.setAvailableResourcesToQueue(partitionY,
+        Resources.createResource(300 * GB, 300));
+    parentMetrics.setAvailableResourcesToUser(partitionX, user,
+        Resources.createResource(20 * GB, 20));
+    parentMetrics.setAvailableResourcesToUser(partitionY, user,
+        Resources.createResource(50 * GB, 50));
+    metrics.setAvailableResourcesToUser(partitionX, user,
+        Resources.createResource(10 * GB, 10));
+    metrics.setAvailableResourcesToUser(partitionY, user,
+        Resources.createResource(40 * GB, 40));
+    metrics1.setAvailableResourcesToUser(partitionX, user,
+        Resources.createResource(5 * GB, 5));
+    metrics1.setAvailableResourcesToUser(partitionY, user,
+        Resources.createResource(30 * GB, 30));
+    metrics1.incrPendingResources(partitionX, user, 6,
+        Resources.createResource(3 * GB, 3));
+    metrics1.incrPendingResources(partitionY, user, 6,
+        Resources.createResource(4 * GB, 4));
+
+    MetricsSource partitionSourceX =
+        partitionSource(metrics1.getMetricsSystem(), partitionX);
+
+    MetricsSource parentQueueSourceWithPartX =
+        queueSource(metrics1.getMetricsSystem(), partitionX, parentQueueName);
+    MetricsSource queueSourceWithPartX =
+        queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName);
+    MetricsSource queueSource1WithPartX =
+        queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName1);
+    MetricsSource parentUserSourceWithPartX = 
userSource(metrics1.getMetricsSystem(),
+        partitionX, user, parentQueueName);
+    MetricsSource userSourceWithPartX = userSource(metrics1.getMetricsSystem(),
+        partitionX, user, leafQueueName);
+    MetricsSource userSource1WithPartX = 
userSource(metrics1.getMetricsSystem(),
+        partitionX, user, leafQueueName1);
+
+    checkResources(partitionSourceX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB, 18,
+        6, 0, 0, 0);
+    checkResources(parentQueueSourceWithPartX, 0, 0, 0, 0, 0, 200 * GB, 200, 
18 * GB,
+        18, 6, 0, 0, 0);
+
+    checkResources(queueSourceWithPartX, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * 
GB, 18, 6,
+        0, 0, 0);
+    checkResources(queueSource1WithPartX, 0, 0, 0, 0, 0, 50 * GB, 50, 18 * GB, 
18, 6,
+        0, 0, 0);
+    checkResources(parentUserSourceWithPartX, 0, 0, 0, 0, 0, 20 * GB, 20, 18 * 
GB, 18,
+        6, 0, 0, 0);
+    checkResources(userSourceWithPartX, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 
18, 6, 0,
+        0, 0);
+    checkResources(userSource1WithPartX, 0, 0, 0, 0, 0, 5 * GB, 5, 18 * GB, 
18, 6, 0,
+        0, 0);
+
+    MetricsSource partitionSourceY =
+        partitionSource(metrics1.getMetricsSystem(), partitionY);
+
+    MetricsSource parentQueueSourceWithPartY =
+        queueSource(metrics1.getMetricsSystem(), partitionY, parentQueueName);
+    MetricsSource queueSourceWithPartY =
+        queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName);
+    MetricsSource queueSource1WithPartY =
+        queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName1);
+    MetricsSource parentUserSourceWithPartY = 
userSource(metrics1.getMetricsSystem(),
+        partitionY, user, parentQueueName);
+    MetricsSource userSourceWithPartY = userSource(metrics1.getMetricsSystem(),
+        partitionY, user, leafQueueName);
+    MetricsSource userSource1WithPartY = 
userSource(metrics1.getMetricsSystem(),
+        partitionY, user, leafQueueName1);
+
+    checkResources(partitionSourceY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB, 24,
+        6, 0, 0, 0);
+    checkResources(parentQueueSourceWithPartY, 0, 0, 0, 0, 0, 500 * GB, 500, 
24 * GB,
+        24, 6, 0, 0, 0);
+    checkResources(queueSourceWithPartY, 0, 0, 0, 0, 0, 400 * GB, 400, 24 * 
GB, 24, 6,
+        0, 0, 0);
+    checkResources(queueSource1WithPartY, 0, 0, 0, 0, 0, 300 * GB, 300, 24 * 
GB, 24, 6,
+        0, 0, 0);
+    checkResources(parentUserSourceWithPartY, 0, 0, 0, 0, 0, 50 * GB, 50, 24 * 
GB, 24,
+        6, 0, 0, 0);
+    checkResources(userSourceWithPartY, 0, 0, 0, 0, 0, 40 * GB, 40, 24 * GB, 
24, 6, 0,
+        0, 0);
+    checkResources(userSource1WithPartY, 0, 0, 0, 0, 0, 30 * GB, 30, 24 * GB, 
24, 6, 0,
+        0, 0);
+
+    metrics1.finishAppAttempt(app.getApplicationId(), app.isPending(),
+        app.getUser());
+
+    metrics1.finishApp(user, RMAppState.FINISHED);
+  }
+
+  /**
+   * Structure:
+   * Both queues, q1 & q2 has been configured to run in only 1 partition, x
+   * UserMetrics has been disabled, hence trying to access the user source
+   * throws NPE from sources.
+   *
+   * root
+   * / \
+   * q1 q2
+   *
+   * @throws Exception
+   */
+  @Test(expected = NullPointerException.class)
+  public void 
testSinglePartitionWithSingleLevelQueueMetricsWithoutUserMetrics()
+      throws Exception {
+
+    String parentQueueName = "root";
+    Queue parentQueue = mock(Queue.class);
+    String user = "alice";
+
+    QueueMetrics root = QueueMetrics.forQueue("root", null, false, CONF);
+    when(parentQueue.getMetrics()).thenReturn(root);
+    when(parentQueue.getQueueName()).thenReturn(parentQueueName);
+    CSQueueMetrics q1 =
+        CSQueueMetrics.forQueue("root.q1", parentQueue, false, CONF);
+    CSQueueMetrics q2 =
+        CSQueueMetrics.forQueue("root.q2", parentQueue, false, CONF);
+
+    AppSchedulingInfo app = mockApp(user);
+
+    q1.submitApp(user);
+    q1.submitAppAttempt(user);
+
+    root.setAvailableResourcesToQueue("x",
+        Resources.createResource(200 * GB, 200));
+
+    q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
+
+    MetricsSource partitionSource = partitionSource(q1.getMetricsSystem(), 
"x");
+    MetricsSource rootQueueSource =
+        queueSource(q1.getMetricsSystem(), "x", parentQueueName);
+    MetricsSource q1Source = queueSource(q1.getMetricsSystem(), "x", 
"root.q1");
+    MetricsSource q1UserSource =
+        userSource(q1.getMetricsSystem(), "x", user, "root.q1");
+
+    checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
+    checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
+    checkResources(q1UserSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
+
+    q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1));
+    MetricsSource q2Source = queueSource(q2.getMetricsSystem(), "x", 
"root.q2");
+    MetricsSource q2UserSource =
+        userSource(q1.getMetricsSystem(), "x", user, "root.q2");
+
+    checkResources(partitionSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+    checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
+    checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
+    checkResources(q2UserSource, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
+
+    q1.finishAppAttempt(app.getApplicationId(), app.isPending(), 
app.getUser());
+    q1.finishApp(user, RMAppState.FINISHED);
+  }
+
+  public static MetricsSource partitionSource(MetricsSystem ms,
+      String partition) {
+    MetricsSource s =
+        ms.getSource(QueueMetrics.pSourceName(partition).toString());
+    return s;
+  }
+
+  public static MetricsSource queueSource(MetricsSystem ms, String partition,
+      String queue) {
+    MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition)
+        .append(QueueMetrics.qSourceName(queue)).toString());
+    return s;
+  }
+
+  public static MetricsSource userSource(MetricsSystem ms, String partition,
+      String user, String queue) {
+    MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition)
+        .append(QueueMetrics.qSourceName(queue)).append(",user=")
+        .append(user).toString());
+    return s;
+  }
+
+  public static void checkResources(MetricsSource source, long allocatedMB,
+      int allocatedCores, int allocCtnrs, long availableMB, int availableCores,
+      long pendingMB, int pendingCores, int pendingCtnrs) {
+    MetricsRecordBuilder rb = getMetrics(source);
+    assertGauge("AllocatedMB", allocatedMB, rb);
+    assertGauge("AllocatedVCores", allocatedCores, rb);
+    assertGauge("AllocatedContainers", allocCtnrs, rb);
+    assertGauge("AvailableMB", availableMB, rb);
+    assertGauge("AvailableVCores", availableCores, rb);
+    assertGauge("PendingMB", pendingMB, rb);
+    assertGauge("PendingVCores", pendingCores, rb);
+    assertGauge("PendingContainers", pendingCtnrs, rb);
+  }
+
+  private static AppSchedulingInfo mockApp(String user) {
+    AppSchedulingInfo app = mock(AppSchedulingInfo.class);
+    when(app.getUser()).thenReturn(user);
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    ApplicationAttemptId id = BuilderUtils.newApplicationAttemptId(appId, 1);
+    when(app.getApplicationAttemptId()).thenReturn(id);
+    return app;
+  }
+
+  public static void checkResources(MetricsSource source, long allocatedMB,
+      int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
+      long aggreReleasedCtnrs, long availableMB, int availableCores,
+      long pendingMB, int pendingCores, int pendingCtnrs, long reservedMB,
+      int reservedCores, int reservedCtnrs) {
+    MetricsRecordBuilder rb = getMetrics(source);
+    assertGauge("AllocatedMB", allocatedMB, rb);
+    assertGauge("AllocatedVCores", allocatedCores, rb);
+    assertGauge("AllocatedContainers", allocCtnrs, rb);
+    assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
+    assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
+    assertGauge("AvailableMB", availableMB, rb);
+    assertGauge("AvailableVCores", availableCores, rb);
+    assertGauge("PendingMB", pendingMB, rb);
+    assertGauge("PendingVCores", pendingCores, rb);
+    assertGauge("PendingContainers", pendingCtnrs, rb);
+    assertGauge("ReservedMB", reservedMB, rb);
+    assertGauge("ReservedVCores", reservedCores, rb);
+    assertGauge("ReservedContainers", reservedCtnrs, rb);
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
index 0d6ff16..fd3e4ae 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
@@ -93,7 +93,7 @@ public class TestSchedulerApplicationAttempt {
     app.liveContainers.put(container1.getContainerId(), container1);
     SchedulerNode node = createNode();
     app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node,
-        toSchedulerKey(requestedPriority), container1.getContainer());
+        toSchedulerKey(requestedPriority), container1);
 
     // Active user count has to decrease from queue2 due to app has NO pending 
requests
     assertEquals(0, queue2.getAbstractUsersManager().getNumActiveUsers());
@@ -135,7 +135,7 @@ public class TestSchedulerApplicationAttempt {
     app.liveContainers.put(container1.getContainerId(), container1);
     SchedulerNode node = createNode();
     app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node,
-        toSchedulerKey(requestedPriority), container1.getContainer());
+        toSchedulerKey(requestedPriority), container1);
     
     // Reserved container
     Priority prio1 = Priority.newInstance(1);
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java
index 6e22608..9120e87 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels
     .NullRMNodeLabelsManager;
 import 
org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -494,6 +495,11 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
   }
 
   protected MockRM setupSchedulerInstance() throws Exception {
+
+    if (mockRM != null) {
+      mockRM.stop();
+    }
+
     CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
     setupQueueConfiguration(conf);
     conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
@@ -578,16 +584,26 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
         autoCreatedLeafQueue.getMaxApplicationsPerUser());
   }
 
-  protected void validateInitialQueueEntitlement(CSQueue parentQueue, String
-      leafQueueName, Map<String, Float>
-      expectedTotalChildQueueAbsCapacityByLabel,
+  protected void validateInitialQueueEntitlement(CSQueue parentQueue,
+      String leafQueueName,
+      Map<String, Float> expectedTotalChildQueueAbsCapacityByLabel,
       Set<String> nodeLabels)
       throws SchedulerDynamicEditException, InterruptedException {
-    validateInitialQueueEntitlement(cs, parentQueue, leafQueueName,
+    validateInitialQueueEntitlement(mockRM, cs, parentQueue, leafQueueName,
         expectedTotalChildQueueAbsCapacityByLabel, nodeLabels);
   }
 
-  protected void validateInitialQueueEntitlement(
+  protected void validateInitialQueueEntitlement(ResourceManager rm,
+      CSQueue parentQueue, String leafQueueName,
+      Map<String, Float> expectedTotalChildQueueAbsCapacityByLabel,
+      Set<String> nodeLabels)
+      throws SchedulerDynamicEditException, InterruptedException {
+    validateInitialQueueEntitlement(rm,
+        (CapacityScheduler) rm.getResourceScheduler(), parentQueue,
+        leafQueueName, expectedTotalChildQueueAbsCapacityByLabel, nodeLabels);
+  }
+
+  protected void validateInitialQueueEntitlement(ResourceManager rm,
       CapacityScheduler capacityScheduler, CSQueue parentQueue,
       String leafQueueName,
       Map<String, Float> expectedTotalChildQueueAbsCapacityByLabel,
@@ -600,7 +616,8 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
         (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue
             .getAutoCreatedQueueManagementPolicy();
 
-    AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) 
capacityScheduler.getQueue(leafQueueName);
+    AutoCreatedLeafQueue leafQueue =
+        (AutoCreatedLeafQueue) capacityScheduler.getQueue(leafQueueName);
 
     Map<String, QueueEntitlement> expectedEntitlements = new HashMap<>();
     QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate()
@@ -618,7 +635,8 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
 
       expectedEntitlements.put(label, expectedEntitlement);
 
-      validateEffectiveMinResource(leafQueue, label, expectedEntitlements);
+      validateEffectiveMinResource(rm, capacityScheduler, leafQueue, label,
+          expectedEntitlements);
     }
   }
 
@@ -635,24 +653,24 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
             .getMaximumCapacity(label), EPSILON);
   }
 
-  protected void validateEffectiveMinResource(CSQueue leafQueue,
-      String label, Map<String, QueueEntitlement> expectedQueueEntitlements) {
+  protected void validateEffectiveMinResource(ResourceManager rm,
+      CapacityScheduler cs, CSQueue leafQueue, String label,
+      Map<String, QueueEntitlement> expectedQueueEntitlements) {
     ManagedParentQueue parentQueue = (ManagedParentQueue) 
leafQueue.getParent();
 
-    Resource resourceByLabel = mockRM.getRMContext().getNodeLabelManager().
-        getResourceByLabel(label, cs.getClusterResource());
+    Resource resourceByLabel = rm.getRMContext().getNodeLabelManager()
+        .getResourceByLabel(label, cs.getClusterResource());
     Resource effMinCapacity = Resources.multiply(resourceByLabel,
-        expectedQueueEntitlements.get(label).getCapacity() * parentQueue
-            .getQueueCapacities().getAbsoluteCapacity(label));
+        expectedQueueEntitlements.get(label).getCapacity()
+            * parentQueue.getQueueCapacities().getAbsoluteCapacity(label));
     assertEquals(effMinCapacity, Resources.multiply(resourceByLabel,
         leafQueue.getQueueCapacities().getAbsoluteCapacity(label)));
     assertEquals(effMinCapacity, leafQueue.getEffectiveCapacity(label));
 
     if (leafQueue.getQueueCapacities().getAbsoluteCapacity(label) > 0) {
-      assertTrue(Resources
-          .greaterThan(cs.getResourceCalculator(), cs.getClusterResource(),
-              effMinCapacity, Resources.none()));
-    } else{
+      assertTrue(Resources.greaterThan(cs.getResourceCalculator(),
+          cs.getClusterResource(), effMinCapacity, Resources.none()));
+    } else {
       assertTrue(Resources.equals(effMinCapacity, Resources.none()));
     }
   }
@@ -763,7 +781,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
               updatedQueueTemplate.getQueueCapacities().getMaximumCapacity
                   (label));
           assertEquals(expectedQueueEntitlements.get(label), newEntitlement);
-          validateEffectiveMinResource(leafQueue, label,
+          validateEffectiveMinResource(mockRM, cs, leafQueue, label,
               expectedQueueEntitlements);
         }
         found = true;
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
index 0a530dd..2ef67b5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java
@@ -622,7 +622,7 @@ public class TestCapacitySchedulerAutoQueueCreation
       submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1);
       Map<String, Float> expectedAbsChildQueueCapacity =
           populateExpectedAbsCapacityByLabelForParentQueue(1);
-      validateInitialQueueEntitlement(newCS, parentQueue, USER1,
+      validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER1,
           expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
 
       //submit another app2 as USER2
@@ -630,7 +630,7 @@ public class TestCapacitySchedulerAutoQueueCreation
           1);
       expectedAbsChildQueueCapacity =
           populateExpectedAbsCapacityByLabelForParentQueue(2);
-      validateInitialQueueEntitlement(newCS, parentQueue, USER2,
+      validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER2,
           expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
 
       //validate total activated abs capacity remains the same
@@ -725,7 +725,7 @@ public class TestCapacitySchedulerAutoQueueCreation
 
       Map<String, Float> expectedChildQueueAbsCapacity =
       populateExpectedAbsCapacityByLabelForParentQueue(1);
-      validateInitialQueueEntitlement(newCS, parentQueue, USER1,
+      validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER1,
           expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
 
       //submit another app2 as USER2
@@ -734,7 +734,7 @@ public class TestCapacitySchedulerAutoQueueCreation
           1);
       expectedChildQueueAbsCapacity =
           populateExpectedAbsCapacityByLabelForParentQueue(2);
-      validateInitialQueueEntitlement(newCS, parentQueue, USER2,
+      validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER2,
           expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
 
       //update parent queue capacity
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index c0537ff..934fbc1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -1234,9 +1234,9 @@ public class TestLeafQueue {
     qb.finishApplication(app_0.getApplicationId(), user_0);
     qb.finishApplication(app_2.getApplicationId(), user_1);
     qb.releaseResource(clusterResource, app_0, Resource.newInstance(4*GB, 1),
-        null, null);
+        "", null);
     qb.releaseResource(clusterResource, app_2, Resource.newInstance(4*GB, 1),
-        null, null);
+        "", null);
 
     qb.setUserLimit(50);
     qb.setUserLimitFactor(1);
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index 737db5b..144f11f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -25,9 +25,11 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -47,11 +49,14 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import 
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestPartitionQueueMetrics;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -2032,10 +2037,9 @@ public class TestNodeLabelContainerAllocation {
     assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
     assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());
 
-    // The total memory tracked by QueueMetrics is 0GB for the default 
partition
     CSQueue rootQueue = cs.getRootQueue();
-    assertEquals(0*GB, rootQueue.getMetrics().getAvailableMB() +
-        rootQueue.getMetrics().getAllocatedMB());
+    assertEquals(0 * GB, rootQueue.getMetrics().getAvailableMB()
+        + rootQueue.getMetrics().getAllocatedMB());
 
     // Kill all apps in queue a
     cs.killAllAppsInQueue("a");
@@ -2084,6 +2088,8 @@ public class TestNodeLabelContainerAllocation {
     csConf.setCapacityByLabel(queueB, "x", 50);
     csConf.setMaximumCapacityByLabel(queueB, "x", 50);
 
+    csConf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, 
true);
+
     // set node -> label
     mgr.addToCluserNodeLabels(
         ImmutableSet.of(NodeLabel.newInstance("x", false)));
@@ -2102,6 +2108,54 @@ public class TestNodeLabelContainerAllocation {
     rm1.start();
     MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
     MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <no_label>
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+    SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+    double delta = 0.0001;
+    CSQueue leafQueue = cs.getQueue("a");
+    CSQueue leafQueueB = cs.getQueue("b");
+    CSQueue rootQueue = cs.getRootQueue();
+    assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta);
+    assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
+    assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta);
+
+    MetricsSystem ms = leafQueueB.getMetrics().getMetricsSystem();
+    QueueMetrics partXMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x");
+    QueueMetrics partDefaultMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "");
+    QueueMetrics queueAMetrics =
+        (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a");
+    QueueMetrics queueBMetrics =
+        (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.b");
+    QueueMetrics queueAPartDefaultMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a");
+    QueueMetrics queueAPartXMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", 
"root.a");
+    QueueMetrics queueBPartDefaultMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.b");
+    QueueMetrics queueBPartXMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", 
"root.b");
+    QueueMetrics rootMetrics =
+        (QueueMetrics) TestQueueMetrics.queueSource(ms, "root");
+    assertEquals(10 * GB, partXMetrics.getAvailableMB(), delta);
+    assertEquals(10 * GB, partDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+    assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta);
+    assertEquals(10 * GB, rootMetrics.getAvailableMB(), delta);
+    assertEquals(2.5 * GB, queueAMetrics.getAvailableMB(), delta);
+    assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta);
+
     // app1 -> a
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
@@ -2109,47 +2163,73 @@ public class TestNodeLabelContainerAllocation {
     // app1 asks for 3 partition= containers
     am1.allocate("*", 1 * GB, 3, new ArrayList<ContainerId>());
 
-    // NM1 do 50 heartbeats
-    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
-    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
-    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
     for (int i = 0; i < 50; i++) {
       cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
     }
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
 
     // app1 gets all resource in partition=x (non-exclusive)
     Assert.assertEquals(3, schedulerNode1.getNumContainers());
-
     SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
         .getNodeReport(nm1.getNodeId());
     Assert.assertEquals(3 * GB, reportNm1.getUsedResource().getMemorySize());
     Assert.assertEquals(7 * GB,
         reportNm1.getAvailableResource().getMemorySize());
-
     SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
         .getNodeReport(nm2.getNodeId());
     Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize());
     Assert.assertEquals(9 * GB,
         reportNm2.getAvailableResource().getMemorySize());
-
-    LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
-    // 3GB is used from label x quota. 1.5 GB is remaining from default label.
-    // 2GB is remaining from label x.
-    assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB());
+    assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta);
+    assertEquals(9 * GB, partDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
+    assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
+    assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta);
+    assertEquals(1 * GB, queueAMetrics.getAllocatedMB(), delta);
+    assertEquals(1.5 * GB, queueAMetrics.getAvailableMB(), delta);
+    assertEquals(0 * GB, queueBMetrics.getAllocatedMB(), delta);
+    assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta);
+    assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, queueBPartDefaultMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, queueBPartXMetrics.getPendingMB(), delta);
+    assertEquals(1.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
     assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB());
+    assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta);
+    assertEquals(1 * GB, partDefaultMetrics.getAllocatedMB(), delta);
+
+    QueueMetrics partDefaultQueueAUserMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "", "user",
+            "root.a");
+    QueueMetrics partXQueueAUserMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "x", "user",
+            "root.a");
+    QueueMetrics queueAUserMetrics =
+        (QueueMetrics) TestQueueMetrics.userSource(ms, "root.a", "user");
+
+    assertEquals(2 * GB, queueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(1 * GB, queueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
+    assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
+    assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
 
-    // app1 asks for 1 default partition container
     am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
 
-    // NM2 do couple of heartbeats
-    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
-    SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
     cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
 
-    // app1 gets all resource in default partition
     Assert.assertEquals(2, schedulerNode2.getNumContainers());
+    Assert.assertEquals(3, schedulerNode1.getNumContainers());
 
     // 3GB is used from label x quota. 2GB used from default label.
     // So 0.5 GB is remaining from default label.
@@ -2158,10 +2238,100 @@ public class TestNodeLabelContainerAllocation {
 
     // The total memory tracked by QueueMetrics is 10GB
     // for the default partition
-    CSQueue rootQueue = cs.getRootQueue();
     assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() +
         rootQueue.getMetrics().getAllocatedMB());
 
+    assertEquals(0.5 * GB, queueAMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, queueAMetrics.getAllocatedMB());
+    assertEquals(0.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
+    assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
+    assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAvailableMB(),
+      delta);
+    assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(1 * GB, queueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, queueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta);
+    assertEquals(8 * GB, partDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, partDefaultMetrics.getAllocatedMB(), delta);
+
+    // Pending Resources when containers are waiting on "default" partition
+    assertEquals(4 * GB, queueAMetrics.getPendingMB(), delta);
+    assertEquals(4 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
+    assertEquals(4 * GB, partDefaultQueueAUserMetrics.getPendingMB(),
+        delta);
+    assertEquals(4 * GB, queueAUserMetrics.getPendingMB(), delta);
+    assertEquals(4 * GB, partDefaultMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, partXQueueAUserMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, partXMetrics.getPendingMB(), delta);
+
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    assertEquals(0.5 * GB, queueAMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, queueAMetrics.getAllocatedMB());
+    assertEquals(0.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
+    assertEquals(0 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+    assertEquals(7 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
+    assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAvailableMB(),
+      delta);
+    assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(0 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(7 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(1 * GB, queueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, queueAUserMetrics.getAllocatedMB(), delta);
+    assertEquals(3 * GB, partXMetrics.getAvailableMB(), delta);
+    assertEquals(7 * GB, partXMetrics.getAllocatedMB(), delta);
+    assertEquals(8 * GB, partDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(2 * GB, partDefaultMetrics.getAllocatedMB(), delta);
+
+    // Pending Resources after containers has been assigned on "x" partition
+    assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, partDefaultQueueAUserMetrics.getPendingMB(),
+        delta);
+    assertEquals(0 * GB, queueAUserMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, partDefaultMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, partXQueueAUserMetrics.getPendingMB(), delta);
+    assertEquals(0 * GB, partXMetrics.getPendingMB(), delta);
+
+    rm1.killApp(app1.getApplicationId());
+    rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta);
+    assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
+    assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta);
+    assertEquals(2, queueAMetrics.getAggregateAllocatedContainers());
+    assertEquals(2, queueAMetrics.getAggegatedReleasedContainers());
+    assertEquals(2, 
queueAPartDefaultMetrics.getAggregateAllocatedContainers());
+    assertEquals(2, queueAPartDefaultMetrics.getAggegatedReleasedContainers());
+    assertEquals(7, partXMetrics.getAggregateAllocatedContainers());
+    assertEquals(2, partDefaultMetrics.getAggregateAllocatedContainers());
+    assertEquals(7, queueAPartXMetrics.getAggregateAllocatedContainers());
+    assertEquals(7, queueAPartXMetrics.getAggegatedReleasedContainers());
+    assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
+    assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, queueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(3 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta);
+    assertEquals(5 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
     rm1.close();
   }
 
@@ -2278,8 +2448,8 @@ public class TestNodeLabelContainerAllocation {
     // The total memory tracked by QueueMetrics is 12GB
     // for the default partition
     CSQueue rootQueue = cs.getRootQueue();
-    assertEquals(12*GB, rootQueue.getMetrics().getAvailableMB() +
-        rootQueue.getMetrics().getAllocatedMB());
+    assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB()
+        + rootQueue.getMetrics().getAllocatedMB());
 
     // Kill all apps in queue a
     cs.killAllAppsInQueue("a");
@@ -2292,6 +2462,193 @@ public class TestNodeLabelContainerAllocation {
   }
 
   @Test
+  public void testTwoLevelQueueMetricsWithLabels() throws Exception {
+
+    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
+        this.conf);
+
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] {"a"});
+    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+    final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
+    csConf.setCapacity(queueA, 100);
+    csConf.setAccessibleNodeLabels(queueA, toSet("x"));
+    csConf.setCapacityByLabel(queueA, "x", 100);
+    csConf.setMaximumCapacityByLabel(queueA, "x", 100);
+
+    csConf.setQueues(queueA, new String[] {"a1"});
+    final String queueA1 = queueA + ".a1";
+    csConf.setCapacity(queueA1, 100);
+
+    csConf.setAccessibleNodeLabels(queueA1, toSet("x"));
+    csConf.setCapacityByLabel(queueA1, "x", 100);
+    csConf.setMaximumCapacityByLabel(queueA1, "x", 100);
+
+    // set node -> label
+    // label x exclusivity is set to true
+    mgr.addToCluserNodeLabels(
+        ImmutableSet.of(NodeLabel.newInstance("x", true)));
+    mgr.addLabelsToNode(
+        ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = <no_label>
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    ParentQueue leafQueueA = (ParentQueue) cs.getQueue("a");
+    LeafQueue leafQueueA1 = (LeafQueue) cs.getQueue("a1");
+    assertEquals(12 * GB, leafQueueA1.getMetrics().getAvailableMB());
+    assertEquals(0 * GB, leafQueueA1.getMetrics().getAllocatedMB());
+    MetricsSystem ms = leafQueueA1.getMetrics().getMetricsSystem();
+    QueueMetrics partXMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x");
+    QueueMetrics partDefaultMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "");
+    QueueMetrics queueAPartDefaultMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a");
+    QueueMetrics queueAPartXMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", 
"root.a");
+    QueueMetrics queueA1PartDefaultMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", 
"root.a.a1");
+    QueueMetrics queueA1PartXMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", 
"root.a.a1");
+    QueueMetrics queueRootPartDefaultMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root");
+    QueueMetrics queueRootPartXMetrics =
+        (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root");
+    QueueMetrics queueAMetrics =
+        (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a");
+    QueueMetrics queueA1Metrics =
+        (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a.a1");
+    QueueMetrics queueRootMetrics =
+        (QueueMetrics) TestQueueMetrics.queueSource(ms, "root");
+    assertEquals(12 * GB, queueAMetrics.getAvailableMB());
+    assertEquals(12 * GB, queueA1Metrics.getAvailableMB());
+    assertEquals(12 * GB, queueRootMetrics.getAvailableMB());
+    assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
+    assertEquals(10 * GB, queueA1PartXMetrics.getAvailableMB());
+    assertEquals(10 * GB, queueAPartXMetrics.getAvailableMB());
+    assertEquals(10 * GB, queueRootPartXMetrics.getAvailableMB());
+    assertEquals(12 * GB, queueA1PartDefaultMetrics.getAvailableMB());
+    assertEquals(12 * GB, queueAPartDefaultMetrics.getAvailableMB());
+    assertEquals(12 * GB, queueRootPartDefaultMetrics.getAvailableMB());
+    assertEquals(10 * GB, partXMetrics.getAvailableMB());
+    assertEquals(12 * GB, partDefaultMetrics.getAvailableMB());
+
+    // app1 -> a
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // app1 asks for 5 partition=x containers
+    am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
+    // NM1 do 50 heartbeats
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+
+    // app1 gets all resource in partition=x
+    Assert.assertEquals(6, schedulerNode1.getNumContainers());
+
+    SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
+        .getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
+    Assert.assertEquals(4 * GB, 
reportNm1.getAvailableResource().getMemorySize());
+
+    SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
+        .getNodeReport(nm2.getNodeId());
+    Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
+    Assert.assertEquals(12 * GB,
+        reportNm2.getAvailableResource().getMemorySize());
+
+    assertEquals(0 * GB, queueAMetrics.getAllocatedMB());
+    assertEquals(0 * GB, queueA1Metrics.getAllocatedMB());
+    assertEquals(0 * GB, queueRootMetrics.getAllocatedMB());
+    assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
+    assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
+    assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB());
+    assertEquals(0 * GB, queueA1PartDefaultMetrics.getAllocatedMB());
+    assertEquals(0 * GB, queueAPartDefaultMetrics.getAllocatedMB());
+    assertEquals(0 * GB, queueRootPartDefaultMetrics.getAllocatedMB());
+    assertEquals(6 * GB, partXMetrics.getAllocatedMB());
+    assertEquals(0 * GB, partDefaultMetrics.getAllocatedMB());
+
+    // app2 -> a
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // app2 asks for 5 partition= containers
+    am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "");
+    // NM2 do 50 heartbeats
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
+
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // app1 gets all resource in partition=x
+    Assert.assertEquals(6, schedulerNode2.getNumContainers());
+
+    reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId());
+    Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
+    Assert.assertEquals(4 * GB,
+        reportNm1.getAvailableResource().getMemorySize());
+
+    reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId());
+    Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize());
+    Assert.assertEquals(6 * GB,
+        reportNm2.getAvailableResource().getMemorySize());
+
+    assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB());
+    assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB());
+
+    // The total memory tracked by QueueMetrics is 12GB
+    // for the default partition
+    CSQueue rootQueue = cs.getRootQueue();
+    assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB()
+        + rootQueue.getMetrics().getAllocatedMB());
+
+    assertEquals(6 * GB, queueAMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueA1Metrics.getAllocatedMB());
+    assertEquals(6 * GB, queueRootMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueA1PartDefaultMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueAPartDefaultMetrics.getAllocatedMB());
+    assertEquals(6 * GB, queueRootPartDefaultMetrics.getAllocatedMB());
+    assertEquals(6 * GB, partXMetrics.getAllocatedMB());
+    assertEquals(6 * GB, partDefaultMetrics.getAllocatedMB());
+
+    // Kill all apps in queue a
+    cs.killAllAppsInQueue("a1");
+    rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
+    rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
+    assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
+    assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
+    rm1.close();
+  }
+
+  @Test
   public void testQueueMetricsWithLabelsDisableElasticity() throws Exception {
     /**
      * Test case: have a following queue structure:


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to