Repository: hadoop
Updated Branches:
  refs/heads/YARN-1011 bb5991423 -> 608f00998


YARN-8827. Plumb aggregated application resource utilization from the NM to RM. 
(asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/608f0099
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/608f0099
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/608f0099

Branch: refs/heads/YARN-1011
Commit: 608f009980b779857b3660a3bd6c70ee44738b8f
Parents: bb59914
Author: Arun Suresh <asur...@apache.org>
Authored: Tue Oct 9 21:09:50 2018 -0700
Committer: Arun Suresh <asur...@apache.org>
Committed: Tue Oct 9 21:09:50 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java   |   6 +
 .../yarn/sls/scheduler/RMNodeWrapper.java       |   6 +
 .../yarn/api/records/ResourceUtilization.java   |  24 ++
 .../yarn/server/api/records/NodeStatus.java     |  19 +
 .../api/records/impl/pb/NodeStatusPBImpl.java   |  59 +++
 .../main/proto/yarn_server_common_protos.proto  |   6 +
 .../nodemanager/NodeStatusUpdaterImpl.java      |   9 +
 .../monitor/ContainersMonitor.java              |  33 ++
 .../monitor/ContainersMonitorImpl.java          |  53 ++-
 .../server/resourcemanager/ResourceManager.java |   8 +
 .../ResourceUtilizationAggregator.java          | 178 +++++++++
 .../server/resourcemanager/rmnode/RMNode.java   |   7 +
 .../resourcemanager/rmnode/RMNodeImpl.java      |  26 +-
 .../rmnode/RMNodeStatusEvent.java               |   7 +-
 .../yarn/server/resourcemanager/MockNM.java     |  26 +-
 .../yarn/server/resourcemanager/MockNodes.java  |  16 +
 .../TestResourceUtilizationAggregator.java      | 357 +++++++++++++++++++
 17 files changed, 820 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 350f4a3..716a1d8 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -199,6 +199,12 @@ public class NodeInfo {
     }
 
     @Override
+    public Map<ApplicationId, ResourceUtilization>
+        getAggregatedAppUtilizations() {
+      return null;
+    }
+
+    @Override
     public ResourceUtilization getNodeUtilization() {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index bb6fb9d..0e2a84e 100644
--- 
a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ 
b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -225,6 +225,12 @@ public class RMNodeWrapper implements RMNode {
   }
 
   @Override
+  public Map<ApplicationId, ResourceUtilization>
+      getAggregatedAppUtilizations() {
+    return node.getAggregatedAppUtilizations();
+  }
+
+  @Override
   public Resource getPhysicalResource() {
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java
index 2ae4872..c340093 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java
@@ -53,6 +53,16 @@ public abstract class ResourceUtilization implements
   }
 
   /**
+   * Helper function to return a zero-ed out Utilization.
+   * @return New Resource Utilization.
+   */
+  @Public
+  @Unstable
+  public static ResourceUtilization newZero() {
+    return newInstance(0, 0, 0.0f);
+  }
+
+  /**
    * Get used <em>virtual memory</em>.
    *
    * @return <em>virtual memory</em> in MB
@@ -157,6 +167,20 @@ public abstract class ResourceUtilization implements
   }
 
   /**
+   * Add utilization to the current one.
+   * @param resUtil Resource Utilization to add.
+   */
+  @Public
+  @Unstable
+  public void addTo(ResourceUtilization resUtil) {
+    this.setPhysicalMemory(
+        this.getPhysicalMemory() + resUtil.getPhysicalMemory());
+    this.setVirtualMemory(
+        this.getVirtualMemory() + resUtil.getVirtualMemory());
+    this.setCPU(this.getCPU() + resUtil.getCPU());
+  }
+
+  /**
    * Subtract utilization from the current one.
    * @param pmem Physical memory to be subtracted.
    * @param vmem Virtual memory to be subtracted.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
index 440cd0a..73ff1cd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.records;
 
 import java.util.List;
 
+import java.util.Map;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -132,4 +133,22 @@ public abstract class NodeStatus {
   @Unstable
   public abstract void setOpportunisticContainersStatus(
       OpportunisticContainersStatus opportunisticContainersStatus);
+
+  /**
+   * Set per application ResourceUtilization.
+   * @param applicationUtilizations Per application utilization map.
+   */
+  @Private
+  @Unstable
+  public abstract void setApplicationUtilizations(
+      Map<ApplicationId, ResourceUtilization> applicationUtilizations);
+
+  /**
+   * Get per application ResourceUtilization.
+   * @return Per application utilizations map.
+   */
+  @Private
+  @Unstable
+  public abstract Map<ApplicationId, ResourceUtilization>
+      getApplicationUtilizations();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
index 8aebc6f..fb76233 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
@@ -20,9 +20,11 @@ package org.apache.hadoop.yarn.server.api.records.impl.pb;
 
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
+import java.util.Map;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -38,6 +40,7 @@ import 
org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
+import 
org.apache.hadoop.yarn.proto.YarnServerCommonProtos.AppResourceUtilizationProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
@@ -57,6 +60,7 @@ public class NodeStatusPBImpl extends NodeStatus {
   private NodeHealthStatus nodeHealthStatus = null;
   private List<ApplicationId> keepAliveApplications = null;
   private List<Container> increasedContainers = null;
+  private Map<ApplicationId, ResourceUtilization> appUtilizations = null;
 
   public NodeStatusPBImpl() {
     builder = NodeStatusProto.newBuilder();
@@ -90,6 +94,9 @@ public class NodeStatusPBImpl extends NodeStatus {
     if (this.increasedContainers != null) {
       addIncreasedContainersToProto();
     }
+    if (this.appUtilizations != null) {
+      addAppUtilizations();
+    }
   }
 
   private synchronized void mergeLocalToProto() {
@@ -107,6 +114,26 @@ public class NodeStatusPBImpl extends NodeStatus {
     }
     viaProto = false;
   }
+
+  private void addAppUtilizations() {
+    maybeInitBuilder();
+    builder.clearApplicationUtilizations();
+    if (this.appUtilizations == null) {
+      return;
+    }
+    List<AppResourceUtilizationProto> protoList =
+        new ArrayList<>();
+
+    for (Map.Entry<ApplicationId, ResourceUtilization> entry :
+        this.appUtilizations.entrySet()) {
+      ApplicationId appId = entry.getKey();
+      ResourceUtilization resU = entry.getValue();
+      protoList.add(AppResourceUtilizationProto.newBuilder()
+          .setApplicationId(convertToProtoFormat(appId))
+          .setUtilization(convertToProtoFormat(resU)).build());
+    }
+    builder.addAllApplicationUtilizations(protoList);
+  }
     
   private synchronized void addContainersToProto() {
     maybeInitBuilder();
@@ -425,6 +452,38 @@ public class NodeStatusPBImpl extends NodeStatus {
         convertToProtoFormat(opportunisticContainersStatus));
   }
 
+  private synchronized void initAppUtilizations() {
+    if (this.appUtilizations != null) {
+      return;
+    }
+    NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
+    List<AppResourceUtilizationProto> protoList =
+        p.getApplicationUtilizationsList();
+    this.appUtilizations = new HashMap<>();
+    for (AppResourceUtilizationProto au : protoList) {
+      ApplicationId appId = convertFromProtoFormat(au.getApplicationId());
+      ResourceUtilization resU = convertFromProtoFormat(au.getUtilization());
+      this.appUtilizations.put(appId, resU);
+    }
+  }
+
+  @Override
+  public synchronized void setApplicationUtilizations(
+      Map<ApplicationId, ResourceUtilization> applicationUtilizations) {
+    maybeInitBuilder();
+    if (applicationUtilizations == null) {
+      builder.clearApplicationUtilizations();
+    }
+    this.appUtilizations = applicationUtilizations;
+  }
+
+  @Override
+  public synchronized Map<ApplicationId, ResourceUtilization>
+      getApplicationUtilizations() {
+    initAppUtilizations();
+    return this.appUtilizations;
+  }
+
   private NodeIdProto convertToProtoFormat(NodeId nodeId) {
     return ((NodeIdPBImpl)nodeId).getProto();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 8200808..30a1e29 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -40,6 +40,12 @@ message NodeStatusProto {
   optional ResourceUtilizationProto node_utilization = 7;
   repeated ContainerProto increased_containers = 8;
   optional OpportunisticContainersStatusProto opportunistic_containers_status 
= 9;
+  repeated AppResourceUtilizationProto application_utilizations = 11;
+}
+
+message AppResourceUtilizationProto {
+  required ApplicationIdProto application_id = 1;
+  optional ResourceUtilizationProto utilization = 3;
 }
 
 message OpportunisticContainersStatusProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index d757376..e09fe08 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -514,6 +514,8 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
     List<ContainerStatus> containersStatuses = getContainerStatuses();
     ResourceUtilization containersUtilization = getContainersUtilization();
     ResourceUtilization nodeUtilization = getNodeUtilization();
+    Map<ApplicationId, ResourceUtilization> appUtilizations =
+        getAppUtilizations();
     List<org.apache.hadoop.yarn.api.records.Container> increasedContainers
         = getIncreasedContainers();
     NodeStatus nodeStatus =
@@ -523,6 +525,7 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
 
     nodeStatus.setOpportunisticContainersStatus(
         getOpportunisticContainersStatus());
+    nodeStatus.setApplicationUtilizations(appUtilizations);
     return nodeStatus;
   }
 
@@ -546,6 +549,12 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
     return containersMonitor.getContainersUtilization(false).getUtilization();
   }
 
+  private Map<ApplicationId, ResourceUtilization> getAppUtilizations() {
+    ContainersMonitor containersMonitor =
+        this.context.getContainerManager().getContainersMonitor();
+    return containersMonitor.getAppUtilizations(false).getUtilizations();
+  }
+
   /**
    * Get the utilization of the node. This includes the containers.
    * @return Resource utilization of the node.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
index 8da4ec4..c2193ec 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
+import java.util.Map;
 import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -37,6 +39,14 @@ public interface ContainersMonitor extends Service,
   ContainersResourceUtilization getContainersUtilization(boolean latest);
 
   /**
+   * Get the per app aggregate resource utilization of containers running on
+   * the node.
+   * @param latest true if the latest result should be returned.
+   * @return AppResourceUtilization per app resource utilization.
+   */
+  AppResourceUtilizations getAppUtilizations(boolean latest);
+
+  /**
    * Get the policy to over-allocate containers when over-allocation is on.
    * @return null if over-allocation is turned off
    */
@@ -102,4 +112,27 @@ public interface ContainersMonitor extends Service,
       return utilization;
     }
   }
+
+  /**
+   * A snapshot of resource utilization of all containers with the timestamp.
+   */
+  final class AppResourceUtilizations {
+    private final Map<ApplicationId, ResourceUtilization> utilizations;
+    private final long timestamp;
+
+    public AppResourceUtilizations(
+        Map<ApplicationId, ResourceUtilization> utilizations,
+        long timestamp) {
+      this.utilizations = utilizations;
+      this.timestamp = timestamp;
+    }
+
+    public long getTimestamp() {
+      return timestamp;
+    }
+
+    public Map<ApplicationId, ResourceUtilization> getUtilizations() {
+      return utilizations;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 8bd4c47..2a561ba 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -20,7 +20,9 @@ package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.MemoryResourceHandler;
@@ -122,6 +124,7 @@ public class ContainersMonitorImpl extends AbstractService 
implements
   }
 
   private ContainersResourceUtilization latestContainersUtilization;
+  private AppResourceUtilizations latestAppUtilizations;
 
   private NMAllocationPolicy overAllocationPolicy;
   private ResourceThresholds overAllocationPreemptionThresholds;
@@ -622,6 +625,9 @@ public class ContainersMonitorImpl extends AbstractService 
implements
         long vmemUsageByAllContainers = 0;
         long pmemByAllContainers = 0;
         long cpuUsagePercentPerCoreByAllContainers = 0;
+
+        Map<ApplicationId, ResourceUtilization> perAppResourceUtilization =
+            new HashMap<>();
         for (Entry<ContainerId, ProcessTreeInfo> entry : trackingContainers
             .entrySet()) {
           ContainerId containerId = entry.getKey();
@@ -656,8 +662,9 @@ public class ContainersMonitorImpl extends AbstractService 
implements
               continue;
             }
 
-            recordUsage(containerId, pId, pTree, ptInfo, currentVmemUsage,
-                    currentPmemUsage, trackedContainersUtilization);
+            recordUsage(pTree, ptInfo, currentVmemUsage,
+                currentPmemUsage, trackedContainersUtilization,
+                perAppResourceUtilization);
 
             checkLimit(containerId, pId, pTree, ptInfo,
                     currentVmemUsage, currentPmemUsage);
@@ -686,6 +693,8 @@ public class ContainersMonitorImpl extends AbstractService 
implements
 
         // Save the aggregated utilization of the containers
         setLatestContainersUtilization(trackedContainersUtilization);
+        // Save the aggregated app utilizations
+        setLatestAppUtilizations(perAppResourceUtilization);
 
         // check opportunity to start containers if over-allocation is on
         checkUtilization();
@@ -769,19 +778,19 @@ public class ContainersMonitorImpl extends 
AbstractService implements
 
     /**
      * Record usage metrics.
-     * @param containerId container id
-     * @param pId process id
      * @param pTree valid process tree entry with CPU measurement
      * @param ptInfo process tree info with limit information
      * @param currentVmemUsage virtual memory measurement
      * @param currentPmemUsage physical memory measurement
      * @param trackedContainersUtilization utilization tracker to update
      */
-    private void recordUsage(ContainerId containerId, String pId,
-                             ResourceCalculatorProcessTree pTree,
-                             ProcessTreeInfo ptInfo,
-                             long currentVmemUsage, long currentPmemUsage,
-                             ResourceUtilization trackedContainersUtilization) 
{
+    private void recordUsage(ResourceCalculatorProcessTree pTree,
+        ProcessTreeInfo ptInfo,
+        long currentVmemUsage, long currentPmemUsage,
+        ResourceUtilization trackedContainersUtilization,
+        Map<ApplicationId, ResourceUtilization> perAppUtil) {
+      ContainerId containerId = ptInfo.getContainerId();
+      String pId =ptInfo.getPID();
       // if machine has 6 cores and 3 are used,
       // cpuUsagePercentPerCore should be 300% and
       // cpuUsageTotalCoresPercentage should be 50%
@@ -806,12 +815,21 @@ public class ContainersMonitorImpl extends 
AbstractService implements
             cpuUsageTotalCoresPercentage));
       }
 
-      // Add resource utilization for this container
-      trackedContainersUtilization.addTo(
+      ResourceUtilization currResUtil =
+          ResourceUtilization.newInstance(
               (int) (currentPmemUsage >> 20),
               (int) (currentVmemUsage >> 20),
               milliVcoresUsed / 1000.0f);
 
+      // Add resource utilization for this container
+      trackedContainersUtilization.addTo(currResUtil);
+
+      ResourceUtilization appUtil =
+          perAppUtil.computeIfAbsent(
+              containerId.getApplicationAttemptId().getApplicationId(),
+              (x -> ResourceUtilization.newInstance(0, 0, 0.0f)));
+      appUtil.addTo(currResUtil);
+
       // Add usage to container metrics
       if (containerMetricsEnabled) {
         ContainerMetrics.forContainer(
@@ -1085,6 +1103,13 @@ public class ContainersMonitorImpl extends 
AbstractService implements
   }
 
   @Override
+  public AppResourceUtilizations getAppUtilizations(boolean latest) {
+    // TODO If latest is true, kickoff an immediate app utilization
+    //      and return value.
+    return this.latestAppUtilizations;
+  }
+
+  @Override
   public NMAllocationPolicy getContainerOverAllocationPolicy() {
     return overAllocationPolicy;
   }
@@ -1102,6 +1127,12 @@ public class ContainersMonitorImpl extends 
AbstractService implements
     }
   }
 
+  private void setLatestAppUtilizations(
+      Map<ApplicationId, ResourceUtilization> appUtilization) {
+    this.latestAppUtilizations = new AppResourceUtilizations(
+        appUtilization, Time.now());
+  }
+
   /**
    * Check the resource utilization of the node. If the utilization is below
    * the over-allocation threshold, {@link ContainerScheduler} is notified to

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 16f019f..f8e447d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -206,6 +206,7 @@ public class ResourceManager extends CompositeService
   private final String zkRootNodePassword =
       Long.toString(new SecureRandom().nextLong());
   private boolean recoveryEnabled;
+  private ResourceUtilizationAggregator resUtilAggregator;
 
   @VisibleForTesting
   protected String webAppAddress;
@@ -344,6 +345,8 @@ public class ResourceManager extends CompositeService
     rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
 
     registerMXBean();
+    this.resUtilAggregator = new ResourceUtilizationAggregator(rmContext);
+    addService(this.resUtilAggregator);
 
     super.serviceInit(this.conf);
   }
@@ -1645,4 +1648,9 @@ public class ResourceManager extends CompositeService
   public boolean isSecurityEnabled() {
     return UserGroupInformation.isSecurityEnabled();
   }
+
+  @VisibleForTesting
+  ResourceUtilizationAggregator getResUtilizationAggregator() {
+    return resUtilAggregator;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceUtilizationAggregator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceUtilizationAggregator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceUtilizationAggregator.java
new file mode 100644
index 0000000..fb98d5f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceUtilizationAggregator.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This computes a snapshot of aggregated actual resource utilization across
+ * Applications, Users and Queues. Queue aggregation will be performed
+ * only at the LeafQueue level.
+ * The snapshot calculation interval is set to the Node heartbeat interval.
+ * It is assumed that all nodes would have heartbeat-ed to the RM in that
+ * interval.
+ */
+public class ResourceUtilizationAggregator extends AbstractService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ResourceUtilizationAggregator.class);
+
+  private static final Function<Object, ResourceUtilization> RU_GENERATOR =
+      (x -> ResourceUtilization.newZero());
+
+  private final RMContext rmContext;
+  private final ScheduledExecutorService scheduledExecutor;
+  private volatile Map<ApplicationId, ResourceUtilization>
+      stalePerAppUtilization = new HashMap<>();
+  private volatile Map<String, ResourceUtilization> stalePerUserUtilization =
+      new HashMap<>();
+  private volatile Map<Queue, ResourceUtilization> stalePerQueueUtilization =
+      new HashMap<>();
+
+  private AggregationTask aggTask = null;
+
+  final class AggregationTask implements Runnable {
+    @Override
+    public void run() {
+      ConcurrentMap<NodeId, RMNode> rmNodes = rmContext.getRMNodes();
+      Map<ApplicationId, ResourceUtilization> perAppUtilization =
+          new HashMap<>();
+      Map<String, ResourceUtilization> perUserUtilization =
+          new HashMap<>();
+      Map<Queue, ResourceUtilization> perQueueUtilization =
+          new HashMap<>();
+      rmNodes.values().stream()
+          .filter(n -> !n.getState().isUnusable())
+          .forEach(rmNode -> {
+            Map<ApplicationId, ResourceUtilization> aggAppUtilizations =
+                rmNode.getAggregatedAppUtilizations();
+            if (aggAppUtilizations != null) {
+              aggAppUtilizations.forEach((appId, appResUtilPerNode) -> {
+                RMApp rmApp = rmContext.getRMApps().get(appId);
+                if (rmApp != null) {
+                  SchedulerApplicationAttempt appAttempt =
+                      ((AbstractYarnScheduler) rmContext.getScheduler())
+                          .getApplicationAttempt(
+                              rmApp.getCurrentAppAttempt().getAppAttemptId());
+                  if (appAttempt != null) {
+                    Queue queue = appAttempt.getQueue();
+                    perQueueUtilization.computeIfAbsent(queue, RU_GENERATOR)
+                        .addTo(appResUtilPerNode);
+                    perAppUtilization.computeIfAbsent(appId, RU_GENERATOR)
+                        .addTo(appResUtilPerNode);
+                    String user = appAttempt.getUser();
+                    if (user != null) {
+                      perUserUtilization.computeIfAbsent(user, RU_GENERATOR)
+                          .addTo(appResUtilPerNode);
+                    } else {
+                      LOG.warn("No user found for application attempt [{}]!!",
+                          appAttempt.getApplicationAttemptId());
+                    }
+                  } else {
+                    LOG.warn("No App Attempt for application [{}]!!", appId);
+                  }
+                } else {
+                  LOG.warn("Invalid Application [{}] received !!", appId);
+                }
+              });
+            }
+          });
+      stalePerAppUtilization = perAppUtilization;
+      stalePerQueueUtilization = perQueueUtilization;
+      stalePerUserUtilization = perUserUtilization;
+    }
+  }
+
+  /**
+   * Construct the service.
+   */
+  public ResourceUtilizationAggregator(RMContext rmContext) {
+    super("Resource Utilization Aggregator");
+    this.rmContext = rmContext;
+    this.scheduledExecutor = new HadoopScheduledThreadPoolExecutor(1);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    long aggInterval = conf.getLong(
+        YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS,
+        YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS);
+    this.aggTask = new AggregationTask();
+    this.scheduledExecutor.scheduleAtFixedRate(aggTask,
+        aggInterval, aggInterval, TimeUnit.MILLISECONDS);
+  }
+
+  @VisibleForTesting
+  void kickoffAggregation() {
+    this.aggTask.run();
+  }
+
+  /**
+   * Return aggregated Resource Utilization for the User.
+   * @param user User.
+   * @return Resource Utilization.
+   */
+  public ResourceUtilization getUserResourceUtilization(String user) {
+    return stalePerUserUtilization.computeIfAbsent(user, RU_GENERATOR);
+  }
+
+  /**
+   * Return aggregated Resource Utilization for the Queue. Currently,
+   * user is expected to provide the Leaf Queue. Aggregation across
+   * the queue hierarchy is not supported since queue traversal is
+   * not consistent across schedulers.
+   * @param queue Queue.
+   * @return Resource Utilization.
+   */
+  public ResourceUtilization getQueueResourceUtilization(Queue queue) {
+    return stalePerQueueUtilization.computeIfAbsent(queue, RU_GENERATOR);
+  }
+
+  /**
+   * Return aggregated Resource Utilization for the application.
+   * @param applicationId Application Id.
+   * @return Resource Utilization.
+   */
+  public ResourceUtilization getAppResourceUtilization(
+      ApplicationId applicationId) {
+    return stalePerAppUtilization.computeIfAbsent(applicationId, RU_GENERATOR);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index aa19483..024160d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -112,6 +112,13 @@ public interface RMNode {
   public ResourceUtilization getAggregatedContainersUtilization();
 
   /**
+   * the per app aggregated utilization of the containers running
+   * on the node.
+   * @return the aggregated per-app container utilzation;
+   */
+  Map<ApplicationId, ResourceUtilization> getAggregatedAppUtilizations();
+
+  /**
    * the total resource utilization of the node.
    * @return the total resource utilization of the node.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 0e32f1e..de32e99 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -138,6 +138,8 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
   private ResourceUtilization containersUtilization;
   /* Resource utilization for the node. */
   private ResourceUtilization nodeUtilization;
+  /* Per app aggregate utilization. */
+  private Map<ApplicationId, ResourceUtilization> appUtilizations;
 
   /** Physical resources in the node. */
   private volatile Resource physicalResource;
@@ -508,9 +510,29 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
   }
 
   @Override
-  public ResourceUtilization getAggregatedContainersUtilization() {
+  public Map<ApplicationId, ResourceUtilization>
+      getAggregatedAppUtilizations() {
     this.readLock.lock();
+    try {
+      return this.appUtilizations;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  public void setAggregatedAppUtilizations(
+      Map<ApplicationId, ResourceUtilization> appUtils) {
+    this.writeLock.lock();
+    try {
+      this.appUtilizations = appUtils;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
 
+  @Override
+  public ResourceUtilization getAggregatedContainersUtilization() {
+    this.readLock.lock();
     try {
       return this.containersUtilization;
     } finally {
@@ -830,6 +852,8 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
     rmNode.setAggregatedContainersUtilization(statusEvent
         .getAggregatedContainersUtilization());
     rmNode.setNodeUtilization(statusEvent.getNodeUtilization());
+    rmNode.setAggregatedAppUtilizations(
+        statusEvent.getAggregateAppUtilization());
     return remoteNodeHealthStatus;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
index 5f5fe24..d61bc99 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 import java.util.Collections;
 import java.util.List;
 
+import java.util.Map;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -63,6 +64,10 @@ public class RMNodeStatusEvent extends RMNodeEvent {
     return this.nodeStatus.getContainersUtilization();
   }
 
+  public Map<ApplicationId, ResourceUtilization> getAggregateAppUtilization() {
+    return this.nodeStatus.getApplicationUtilizations();
+  }
+
   public ResourceUtilization getNodeUtilization() {
     return this.nodeStatus.getNodeUtilization();
   }
@@ -79,7 +84,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
       List<LogAggregationReport> logAggregationReportsForApps) {
     this.logAggregationReportsForApps = logAggregationReportsForApps;
   }
-  
+
   public List<Container> getNMReportedIncreasedContainers() {
     return this.nodeStatus.getIncreasedContainers() == null ?
         Collections.emptyList() : this.nodeStatus.getIncreasedContainers();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 2e28395..6a0d209 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@@ -141,7 +142,7 @@ public class MockNM {
             container.getResource());
     List<Container> increasedConts = Collections.singletonList(container);
     nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts,
-        true, responseId);
+        true, responseId, null);
   }
 
   public void addRegisteringCollector(ApplicationId appId,
@@ -211,7 +212,13 @@ public class MockNM {
 
   public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws 
Exception {
     return nodeHeartbeat(Collections.<ContainerStatus>emptyList(),
-        Collections.<Container>emptyList(), isHealthy, responseId);
+        Collections.<Container>emptyList(), isHealthy, responseId, null);
+  }
+
+  public NodeHeartbeatResponse nodeHeartbeat(
+      Map<ApplicationId, ResourceUtilization> appUtil) throws Exception {
+    return nodeHeartbeat(Collections.<ContainerStatus>emptyList(),
+        Collections.<Container>emptyList(), true, responseId, appUtil);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
@@ -224,7 +231,7 @@ public class MockNM {
     containerStatusList.add(containerStatus);
     Log.getLog().info("ContainerStatus: " + containerStatus);
     return nodeHeartbeat(containerStatusList,
-        Collections.<Container>emptyList(), true, responseId);
+        Collections.<Container>emptyList(), true, responseId, null);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
@@ -239,18 +246,18 @@ public class MockNM {
       updatedStats.addAll(stats);
     }
     return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
-        isHealthy, resId);
+        isHealthy, resId, null);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(
       List<ContainerStatus> updatedStats, boolean isHealthy) throws Exception {
     return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(),
-        isHealthy, responseId);
+        isHealthy, responseId, null);
   }
 
   public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> 
updatedStats,
-      List<Container> increasedConts, boolean isHealthy, int resId)
-          throws Exception {
+      List<Container> increasedConts, boolean isHealthy, int resId,
+      Map<ApplicationId, ResourceUtilization> appUtil) throws Exception {
     NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
     NodeStatus status = Records.newRecord(NodeStatus.class);
     status.setResponseId(resId);
@@ -273,6 +280,11 @@ public class MockNM {
     healthStatus.setIsNodeHealthy(isHealthy);
     healthStatus.setLastHealthReportTime(1);
     status.setNodeHealthStatus(healthStatus);
+
+    if (appUtil != null && !appUtil.isEmpty()) {
+      status.setApplicationUtilizations(appUtil);
+    }
+
     req.setNodeStatus(status);
     
req.setLastKnownContainerTokenMasterKey(this.currentContainerTokenMasterKey);
     req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index d841ff0..d7edb1b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -121,6 +121,7 @@ public class MockNodes {
     private Set<String> labels;
     private ResourceUtilization containersUtilization;
     private ResourceUtilization nodeUtilization;
+    private Map<ApplicationId, ResourceUtilization> appUtilization;
     private Resource physicalResource;
     private OverAllocationInfo overAllocationInfo;
     private List<UpdatedContainerInfo> containerUpdates =
@@ -330,13 +331,28 @@ public class MockNodes {
       return this.physicalResource;
     }
 
+    @Override
+    public Map<ApplicationId, ResourceUtilization>
+        getAggregatedAppUtilizations() {
+      return this.appUtilization;
+    }
+
     public void updateContainersInfoAndUtilization(
         UpdatedContainerInfo updatedContainerInfo,
         ResourceUtilization resourceUtilization) {
+      updateContainersInfoAndUtilization(
+          updatedContainerInfo, resourceUtilization, null);
+    }
+
+    public void updateContainersInfoAndUtilization(
+        UpdatedContainerInfo updatedContainerInfo,
+        ResourceUtilization resourceUtilization,
+        Map<ApplicationId, ResourceUtilization> appUtils) {
       if (updatedContainerInfo != null) {
         containerUpdates = Collections.singletonList(updatedContainerInfo);
       }
       this.containersUtilization = resourceUtilization;
+      this.appUtilization = appUtils;
     }
   };
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceUtilizationAggregator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceUtilizationAggregator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceUtilizationAggregator.java
new file mode 100644
index 0000000..f2fb75d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceUtilizationAggregator.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Test ResourceUtilizationAggregator functionality.
+ */
+public class TestResourceUtilizationAggregator {
+
+  private static final int GB = 1024;
+
+  private MockRM rm;
+  private DrainDispatcher dispatcher;
+  private MockNM nm1, nm2, nm3, nm4;
+  private ResourceUtilizationAggregator agg;
+
+  @Before
+  public void createAndStartRM() throws Exception {
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100);
+    startRM(conf);
+
+    nm1 = new MockNM("h1:1234", 8 * GB, rm.getResourceTrackerService());
+    nm2 = new MockNM("h1:4321", 8 * GB, rm.getResourceTrackerService());
+    nm3 = new MockNM("h2:1234", 8 * GB, rm.getResourceTrackerService());
+    nm4 = new MockNM("h2:4321", 8 * GB, rm.getResourceTrackerService());
+    nm1.registerNode();
+    nm2.registerNode();
+    nm3.registerNode();
+    nm4.registerNode();
+
+    agg = rm.getRMContext().getResourceManager().getResUtilizationAggregator();
+  }
+
+  private void startRM(final YarnConfiguration conf) {
+    dispatcher = new DrainDispatcher();
+    rm = new MockRM(conf) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm.start();
+  }
+
+  @After
+  public void stopRM() {
+    if (rm != null) {
+      rm.stop();
+    }
+  }
+
+  /**
+   * Check if Resource Utilization Aggregation works correctly.
+   * Start 3 Apps across 4 nodes : 2 apps by 'user1' and 1 by 'user2'
+   * .. but all on the same queue.
+   *
+   * Step 1: Send Node Heartbeats with App Resource Utilization.
+   * Ensure the Resource utilization is correctly aggregated across
+   * apps, users and queues.
+   *
+   * Step 2: Resend Node Heartbeats with Increase in one App's Utilization
+   * Ensure the Resource utilization is correctly aggregated across
+   * apps, users and queues.
+   *
+   * Step 3: Resend Node Heatbeats with Decrease in utilization across
+   * all app. Ensure the Resource utilization is correctly aggregated across
+   * apps, users and queues.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 60000)
+  public void testResourceUtilizationAggregation() throws Exception {
+
+    RMApp app1 = rm.submitApp(1 * GB, "app1", "user1", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+
+    RMApp app2 = rm.submitApp(1 * GB, "app2", "user2", null, "default");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm3);
+
+    RMApp app3 = rm.submitApp(1 * GB, "app3", "user2", null, "default");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm4);
+
+    am1.allocate(
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+            "*", Resources.createResource(1 * GB), 1, true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.GUARANTEED, true))),
+        null);
+
+    am2.allocate(
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+            "*", Resources.createResource(1 * GB), 1, true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.GUARANTEED, true))),
+        null);
+
+    am3.allocate(
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+            "*", Resources.createResource(1 * GB), 1, true, null,
+            ExecutionTypeRequest.newInstance(
+                ExecutionType.GUARANTEED, true))),
+        null);
+
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+    nm3.nodeHeartbeat(true);
+    nm4.nodeHeartbeat(true);
+    rm.drainEvents();
+    dispatcher.waitForEventThreadToWait();
+
+    AllocateResponse alloc1 = am1.allocate(
+        new ArrayList<>(), new ArrayList<>());
+    assertEquals(1, alloc1.getAllocatedContainers().size());
+
+    AllocateResponse alloc2 = am2.allocate(
+        new ArrayList<>(), new ArrayList<>());
+    assertEquals(1, alloc2.getAllocatedContainers().size());
+
+    AllocateResponse alloc3 = am3.allocate(
+        new ArrayList<>(), new ArrayList<>());
+    assertEquals(1, alloc3.getAllocatedContainers().size());
+
+    ApplicationId appId1 = app1.getApplicationId();
+    ApplicationId appId2 = app2.getApplicationId();
+    ApplicationId appId3 = app3.getApplicationId();
+
+    AbstractYarnScheduler sched =
+        (AbstractYarnScheduler) rm.getRMContext().getScheduler();
+    SchedulerApplicationAttempt appAttempt1 =
+        sched.getApplicationAttempt(
+            app1.getCurrentAppAttempt().getAppAttemptId());
+    SchedulerApplicationAttempt appAttempt2 =
+        sched.getApplicationAttempt(
+            app2.getCurrentAppAttempt().getAppAttemptId());
+
+    // START Step 1 ========>
+    // Send Node Heartbeats with App Resource Utilization.
+    // Ensure the Resource utilization is correctly aggregated across
+    // apps, users and queues.
+    sendHeartBeatsWithAppUtil(
+        mkmap(
+            e(appId1, ResourceUtilization.newInstance(1, 1, 0.1f)),
+            e(appId2, ResourceUtilization.newInstance(3, 3, 0.3f))
+        ),
+        mkmap(
+            e(appId1, ResourceUtilization.newInstance(2, 2, 0.2f))
+        ),
+        mkmap(
+            e(appId3, ResourceUtilization.newInstance(5, 5, 0.5f))
+        ),
+        mkmap(
+            e(appId2, ResourceUtilization.newInstance(4, 4, 0.4f))
+        ));
+
+    ResourceUtilization aRU1 =
+        agg.getAppResourceUtilization(app1.getApplicationId());
+    ResourceUtilization aRU2 =
+        agg.getAppResourceUtilization(app2.getApplicationId());
+    ResourceUtilization aRU3 =
+        agg.getAppResourceUtilization(app3.getApplicationId());
+
+    ResourceUtilization uRU1 = agg.getUserResourceUtilization("user1");
+    ResourceUtilization uRU2 = agg.getUserResourceUtilization("user2");
+
+    // Check aggregated utilization across nodes for
+    // each application
+    assertEquals(3, aRU1.getPhysicalMemory());
+    assertEquals(3, aRU1.getVirtualMemory());
+    assertEquals(7, aRU2.getPhysicalMemory());
+    assertEquals(7, aRU2.getVirtualMemory());
+    assertEquals(5, aRU3.getPhysicalMemory());
+    assertEquals(5, aRU3.getVirtualMemory());
+
+    // Check aggregated utilization across nodes for
+    // each user
+    assertEquals(3, uRU1.getPhysicalMemory());
+    assertEquals(3, uRU1.getVirtualMemory());
+    assertEquals(12, uRU2.getPhysicalMemory());
+    assertEquals(12, uRU2.getVirtualMemory());
+
+    assertEquals(appAttempt1.getQueue(), appAttempt2.getQueue());
+
+    // All three applications are bound to the same queue,
+    // so the queue utilization should be the total aggregate..
+    ResourceUtilization qRU =
+        agg.getQueueResourceUtilization(appAttempt1.getQueue());
+    assertEquals(15, qRU.getPhysicalMemory());
+    assertEquals(15, qRU.getVirtualMemory());
+    // <======== END Step 1
+
+    Queue queue = appAttempt1.getQueue();
+    // Step 2: Resend Node Heartbeats with Increase in one App's Utilization
+    // Ensure the Resource utilization is correctly aggregated across
+    // apps, users and queues.
+    checkAggAfterUtilIncrease(appId1, appId2, appId3, queue, "user1", "user2");
+
+    // Step 3: Resend Node Heatbeats with Decrease in utilization across
+    // all app. Ensure the Resource utilization is correctly aggregated across
+    // apps, users and queues.
+    checkAggAfterUtilDecrease(appId1, appId2, appId3, queue, "user1", "user2");
+  }
+
+
+  private void checkAggAfterUtilDecrease(ApplicationId appId1,
+      ApplicationId appId2, ApplicationId appId3,
+      Queue queue, String user1, String user2)
+      throws Exception {
+    sendHeartBeatsWithAppUtil(
+        mkmap(
+            e(appId1, ResourceUtilization.newInstance(1, 1, 0.1f)),
+            e(appId2, ResourceUtilization.newInstance(1, 1, 0.1f))
+        ),
+        mkmap(
+            e(appId1, ResourceUtilization.newInstance(1, 1, 0.1f))
+        ),
+        mkmap(
+            e(appId3, ResourceUtilization.newInstance(1, 1, 0.1f))
+        ),
+        mkmap(
+            e(appId2, ResourceUtilization.newInstance(1, 1, 0.1f))
+        ));
+
+    // All Utilizations should decrease..
+    assertEquals(2,
+        agg.getAppResourceUtilization(appId1).getPhysicalMemory());
+    assertEquals(2,
+        agg.getAppResourceUtilization(appId2).getPhysicalMemory());
+    assertEquals(1,
+        agg.getAppResourceUtilization(appId3).getPhysicalMemory());
+    assertEquals(2,
+        agg.getUserResourceUtilization(user1).getPhysicalMemory());
+    assertEquals(3,
+        agg.getUserResourceUtilization(user2).getPhysicalMemory());
+    assertEquals(5,
+        agg.getQueueResourceUtilization(queue).getPhysicalMemory());
+  }
+
+
+  private void checkAggAfterUtilIncrease(ApplicationId appId1,
+      ApplicationId appId2, ApplicationId appId3,
+      Queue queue, String user1, String user2)
+      throws Exception {
+    sendHeartBeatsWithAppUtil(
+        mkmap(
+            e(appId1, ResourceUtilization.newInstance(2, 2, 0.1f)),
+            e(appId2, ResourceUtilization.newInstance(3, 3, 0.3f))
+        ),
+        mkmap(
+            e(appId1, ResourceUtilization.newInstance(2, 2, 0.2f))
+        ),
+        mkmap(
+            e(appId3, ResourceUtilization.newInstance(5, 5, 0.5f))
+        ),
+        mkmap(
+            e(appId2, ResourceUtilization.newInstance(4, 4, 0.4f))
+        ));
+
+    // App1, User1 and overall Queue utilization should increase
+    // Everything else should stay the same..
+    assertEquals(4,
+        agg.getAppResourceUtilization(appId1).getPhysicalMemory());
+    assertEquals(7,
+        agg.getAppResourceUtilization(appId2).getPhysicalMemory());
+    assertEquals(5,
+        agg.getAppResourceUtilization(appId3).getPhysicalMemory());
+    assertEquals(4,
+        agg.getUserResourceUtilization(user1).getPhysicalMemory());
+    assertEquals(12,
+        agg.getUserResourceUtilization(user2).getPhysicalMemory());
+    assertEquals(16,
+        agg.getQueueResourceUtilization(queue).getPhysicalMemory());
+  }
+
+
+  private void sendHeartBeatsWithAppUtil(
+      Map<ApplicationId, ResourceUtilization> nm1AppUtil,
+      Map<ApplicationId, ResourceUtilization> nm2AppUtil,
+      Map<ApplicationId, ResourceUtilization> nm3AppUtil,
+      Map<ApplicationId, ResourceUtilization> nm4AppUtil) throws Exception{
+    nm1.nodeHeartbeat(nm1AppUtil);
+    nm2.nodeHeartbeat(nm2AppUtil);
+    nm3.nodeHeartbeat(nm3AppUtil);
+    nm4.nodeHeartbeat(nm4AppUtil);
+
+    // Wait for scheduler to process all events
+    rm.drainEvents();
+    dispatcher.waitForEventThreadToWait();
+
+    agg.kickoffAggregation();
+  }
+  /**
+   * Utility function to create a map.
+   */
+  private static <K, V> Map<K, V> mkmap(AbstractMap.SimpleEntry<K, V>... es) {
+    return Stream.of(es).collect(
+        Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+  }
+
+  /**
+   * Utility function to create a map entry to me used by above function.
+   */
+  private static <K, V> AbstractMap.SimpleEntry<K, V> e(K key, V val) {
+    return new AbstractMap.SimpleEntry<>(key, val);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to