Repository: hadoop Updated Branches: refs/heads/YARN-1011 bb5991423 -> 608f00998
YARN-8827. Plumb aggregated application resource utilization from the NM to RM. (asuresh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/608f0099 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/608f0099 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/608f0099 Branch: refs/heads/YARN-1011 Commit: 608f009980b779857b3660a3bd6c70ee44738b8f Parents: bb59914 Author: Arun Suresh <asur...@apache.org> Authored: Tue Oct 9 21:09:50 2018 -0700 Committer: Arun Suresh <asur...@apache.org> Committed: Tue Oct 9 21:09:50 2018 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 6 + .../yarn/sls/scheduler/RMNodeWrapper.java | 6 + .../yarn/api/records/ResourceUtilization.java | 24 ++ .../yarn/server/api/records/NodeStatus.java | 19 + .../api/records/impl/pb/NodeStatusPBImpl.java | 59 +++ .../main/proto/yarn_server_common_protos.proto | 6 + .../nodemanager/NodeStatusUpdaterImpl.java | 9 + .../monitor/ContainersMonitor.java | 33 ++ .../monitor/ContainersMonitorImpl.java | 53 ++- .../server/resourcemanager/ResourceManager.java | 8 + .../ResourceUtilizationAggregator.java | 178 +++++++++ .../server/resourcemanager/rmnode/RMNode.java | 7 + .../resourcemanager/rmnode/RMNodeImpl.java | 26 +- .../rmnode/RMNodeStatusEvent.java | 7 +- .../yarn/server/resourcemanager/MockNM.java | 26 +- .../yarn/server/resourcemanager/MockNodes.java | 16 + .../TestResourceUtilizationAggregator.java | 357 +++++++++++++++++++ 17 files changed, 820 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 350f4a3..716a1d8 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -199,6 +199,12 @@ public class NodeInfo { } @Override + public Map<ApplicationId, ResourceUtilization> + getAggregatedAppUtilizations() { + return null; + } + + @Override public ResourceUtilization getNodeUtilization() { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index bb6fb9d..0e2a84e 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -225,6 +225,12 @@ public class RMNodeWrapper implements RMNode { } @Override + public Map<ApplicationId, ResourceUtilization> + getAggregatedAppUtilizations() { + return node.getAggregatedAppUtilizations(); + } + + @Override public Resource getPhysicalResource() { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java index 2ae4872..c340093 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceUtilization.java @@ -53,6 +53,16 @@ public abstract class ResourceUtilization implements } /** + * Helper function to return a zero-ed out Utilization. + * @return New Resource Utilization. + */ + @Public + @Unstable + public static ResourceUtilization newZero() { + return newInstance(0, 0, 0.0f); + } + + /** * Get used <em>virtual memory</em>. * * @return <em>virtual memory</em> in MB @@ -157,6 +167,20 @@ public abstract class ResourceUtilization implements } /** + * Add utilization to the current one. + * @param resUtil Resource Utilization to add. + */ + @Public + @Unstable + public void addTo(ResourceUtilization resUtil) { + this.setPhysicalMemory( + this.getPhysicalMemory() + resUtil.getPhysicalMemory()); + this.setVirtualMemory( + this.getVirtualMemory() + resUtil.getVirtualMemory()); + this.setCPU(this.getCPU() + resUtil.getCPU()); + } + + /** * Subtract utilization from the current one. * @param pmem Physical memory to be subtracted. * @param vmem Virtual memory to be subtracted. http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 440cd0a..73ff1cd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.api.records; import java.util.List; +import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -132,4 +133,22 @@ public abstract class NodeStatus { @Unstable public abstract void setOpportunisticContainersStatus( OpportunisticContainersStatus opportunisticContainersStatus); + + /** + * Set per application ResourceUtilization. + * @param applicationUtilizations Per application utilization map. + */ + @Private + @Unstable + public abstract void setApplicationUtilizations( + Map<ApplicationId, ResourceUtilization> applicationUtilizations); + + /** + * Get per application ResourceUtilization. + * @return Per application utilizations map. + */ + @Private + @Unstable + public abstract Map<ApplicationId, ResourceUtilization> + getApplicationUtilizations(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 8aebc6f..fb76233 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -20,9 +20,11 @@ package org.apache.hadoop.yarn.server.api.records.impl.pb; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -38,6 +40,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; +import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.AppResourceUtilizationProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; @@ -57,6 +60,7 @@ public class NodeStatusPBImpl extends NodeStatus { private NodeHealthStatus nodeHealthStatus = null; private List<ApplicationId> keepAliveApplications = null; private List<Container> increasedContainers = null; + private Map<ApplicationId, ResourceUtilization> appUtilizations = null; public NodeStatusPBImpl() { builder = NodeStatusProto.newBuilder(); @@ -90,6 +94,9 @@ public class NodeStatusPBImpl extends NodeStatus { if (this.increasedContainers != null) { addIncreasedContainersToProto(); } + if (this.appUtilizations != null) { + addAppUtilizations(); + } } private synchronized void mergeLocalToProto() { @@ -107,6 +114,26 @@ public class NodeStatusPBImpl extends NodeStatus { } viaProto = false; } + + private void addAppUtilizations() { + maybeInitBuilder(); + builder.clearApplicationUtilizations(); + if (this.appUtilizations == null) { + return; + } + List<AppResourceUtilizationProto> protoList = + new ArrayList<>(); + + for (Map.Entry<ApplicationId, ResourceUtilization> entry : + this.appUtilizations.entrySet()) { + ApplicationId appId = entry.getKey(); + ResourceUtilization resU = entry.getValue(); + protoList.add(AppResourceUtilizationProto.newBuilder() + .setApplicationId(convertToProtoFormat(appId)) + .setUtilization(convertToProtoFormat(resU)).build()); + } + builder.addAllApplicationUtilizations(protoList); + } private synchronized void addContainersToProto() { maybeInitBuilder(); @@ -425,6 +452,38 @@ public class NodeStatusPBImpl extends NodeStatus { convertToProtoFormat(opportunisticContainersStatus)); } + private synchronized void initAppUtilizations() { + if (this.appUtilizations != null) { + return; + } + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + List<AppResourceUtilizationProto> protoList = + p.getApplicationUtilizationsList(); + this.appUtilizations = new HashMap<>(); + for (AppResourceUtilizationProto au : protoList) { + ApplicationId appId = convertFromProtoFormat(au.getApplicationId()); + ResourceUtilization resU = convertFromProtoFormat(au.getUtilization()); + this.appUtilizations.put(appId, resU); + } + } + + @Override + public synchronized void setApplicationUtilizations( + Map<ApplicationId, ResourceUtilization> applicationUtilizations) { + maybeInitBuilder(); + if (applicationUtilizations == null) { + builder.clearApplicationUtilizations(); + } + this.appUtilizations = applicationUtilizations; + } + + @Override + public synchronized Map<ApplicationId, ResourceUtilization> + getApplicationUtilizations() { + initAppUtilizations(); + return this.appUtilizations; + } + private NodeIdProto convertToProtoFormat(NodeId nodeId) { return ((NodeIdPBImpl)nodeId).getProto(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 8200808..30a1e29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -40,6 +40,12 @@ message NodeStatusProto { optional ResourceUtilizationProto node_utilization = 7; repeated ContainerProto increased_containers = 8; optional OpportunisticContainersStatusProto opportunistic_containers_status = 9; + repeated AppResourceUtilizationProto application_utilizations = 11; +} + +message AppResourceUtilizationProto { + required ApplicationIdProto application_id = 1; + optional ResourceUtilizationProto utilization = 3; } message OpportunisticContainersStatusProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index d757376..e09fe08 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -514,6 +514,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements List<ContainerStatus> containersStatuses = getContainerStatuses(); ResourceUtilization containersUtilization = getContainersUtilization(); ResourceUtilization nodeUtilization = getNodeUtilization(); + Map<ApplicationId, ResourceUtilization> appUtilizations = + getAppUtilizations(); List<org.apache.hadoop.yarn.api.records.Container> increasedContainers = getIncreasedContainers(); NodeStatus nodeStatus = @@ -523,6 +525,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements nodeStatus.setOpportunisticContainersStatus( getOpportunisticContainersStatus()); + nodeStatus.setApplicationUtilizations(appUtilizations); return nodeStatus; } @@ -546,6 +549,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements return containersMonitor.getContainersUtilization(false).getUtilization(); } + private Map<ApplicationId, ResourceUtilization> getAppUtilizations() { + ContainersMonitor containersMonitor = + this.context.getContainerManager().getContainersMonitor(); + return containersMonitor.getAppUtilizations(false).getUtilizations(); + } + /** * Get the utilization of the node. This includes the containers. * @return Resource utilization of the node. http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java index 8da4ec4..c2193ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitor.java @@ -18,7 +18,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; +import java.util.Map; import org.apache.hadoop.service.Service; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.event.EventHandler; @@ -37,6 +39,14 @@ public interface ContainersMonitor extends Service, ContainersResourceUtilization getContainersUtilization(boolean latest); /** + * Get the per app aggregate resource utilization of containers running on + * the node. + * @param latest true if the latest result should be returned. + * @return AppResourceUtilization per app resource utilization. + */ + AppResourceUtilizations getAppUtilizations(boolean latest); + + /** * Get the policy to over-allocate containers when over-allocation is on. * @return null if over-allocation is turned off */ @@ -102,4 +112,27 @@ public interface ContainersMonitor extends Service, return utilization; } } + + /** + * A snapshot of resource utilization of all containers with the timestamp. + */ + final class AppResourceUtilizations { + private final Map<ApplicationId, ResourceUtilization> utilizations; + private final long timestamp; + + public AppResourceUtilizations( + Map<ApplicationId, ResourceUtilization> utilizations, + long timestamp) { + this.utilizations = utilizations; + this.timestamp = timestamp; + } + + public long getTimestamp() { + return timestamp; + } + + public Map<ApplicationId, ResourceUtilization> getUtilizations() { + return utilizations; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java index 8bd4c47..2a561ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java @@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.util.HashMap; import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.CGroupElasticMemoryController; import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.MemoryResourceHandler; @@ -122,6 +124,7 @@ public class ContainersMonitorImpl extends AbstractService implements } private ContainersResourceUtilization latestContainersUtilization; + private AppResourceUtilizations latestAppUtilizations; private NMAllocationPolicy overAllocationPolicy; private ResourceThresholds overAllocationPreemptionThresholds; @@ -622,6 +625,9 @@ public class ContainersMonitorImpl extends AbstractService implements long vmemUsageByAllContainers = 0; long pmemByAllContainers = 0; long cpuUsagePercentPerCoreByAllContainers = 0; + + Map<ApplicationId, ResourceUtilization> perAppResourceUtilization = + new HashMap<>(); for (Entry<ContainerId, ProcessTreeInfo> entry : trackingContainers .entrySet()) { ContainerId containerId = entry.getKey(); @@ -656,8 +662,9 @@ public class ContainersMonitorImpl extends AbstractService implements continue; } - recordUsage(containerId, pId, pTree, ptInfo, currentVmemUsage, - currentPmemUsage, trackedContainersUtilization); + recordUsage(pTree, ptInfo, currentVmemUsage, + currentPmemUsage, trackedContainersUtilization, + perAppResourceUtilization); checkLimit(containerId, pId, pTree, ptInfo, currentVmemUsage, currentPmemUsage); @@ -686,6 +693,8 @@ public class ContainersMonitorImpl extends AbstractService implements // Save the aggregated utilization of the containers setLatestContainersUtilization(trackedContainersUtilization); + // Save the aggregated app utilizations + setLatestAppUtilizations(perAppResourceUtilization); // check opportunity to start containers if over-allocation is on checkUtilization(); @@ -769,19 +778,19 @@ public class ContainersMonitorImpl extends AbstractService implements /** * Record usage metrics. - * @param containerId container id - * @param pId process id * @param pTree valid process tree entry with CPU measurement * @param ptInfo process tree info with limit information * @param currentVmemUsage virtual memory measurement * @param currentPmemUsage physical memory measurement * @param trackedContainersUtilization utilization tracker to update */ - private void recordUsage(ContainerId containerId, String pId, - ResourceCalculatorProcessTree pTree, - ProcessTreeInfo ptInfo, - long currentVmemUsage, long currentPmemUsage, - ResourceUtilization trackedContainersUtilization) { + private void recordUsage(ResourceCalculatorProcessTree pTree, + ProcessTreeInfo ptInfo, + long currentVmemUsage, long currentPmemUsage, + ResourceUtilization trackedContainersUtilization, + Map<ApplicationId, ResourceUtilization> perAppUtil) { + ContainerId containerId = ptInfo.getContainerId(); + String pId =ptInfo.getPID(); // if machine has 6 cores and 3 are used, // cpuUsagePercentPerCore should be 300% and // cpuUsageTotalCoresPercentage should be 50% @@ -806,12 +815,21 @@ public class ContainersMonitorImpl extends AbstractService implements cpuUsageTotalCoresPercentage)); } - // Add resource utilization for this container - trackedContainersUtilization.addTo( + ResourceUtilization currResUtil = + ResourceUtilization.newInstance( (int) (currentPmemUsage >> 20), (int) (currentVmemUsage >> 20), milliVcoresUsed / 1000.0f); + // Add resource utilization for this container + trackedContainersUtilization.addTo(currResUtil); + + ResourceUtilization appUtil = + perAppUtil.computeIfAbsent( + containerId.getApplicationAttemptId().getApplicationId(), + (x -> ResourceUtilization.newInstance(0, 0, 0.0f))); + appUtil.addTo(currResUtil); + // Add usage to container metrics if (containerMetricsEnabled) { ContainerMetrics.forContainer( @@ -1085,6 +1103,13 @@ public class ContainersMonitorImpl extends AbstractService implements } @Override + public AppResourceUtilizations getAppUtilizations(boolean latest) { + // TODO If latest is true, kickoff an immediate app utilization + // and return value. + return this.latestAppUtilizations; + } + + @Override public NMAllocationPolicy getContainerOverAllocationPolicy() { return overAllocationPolicy; } @@ -1102,6 +1127,12 @@ public class ContainersMonitorImpl extends AbstractService implements } } + private void setLatestAppUtilizations( + Map<ApplicationId, ResourceUtilization> appUtilization) { + this.latestAppUtilizations = new AppResourceUtilizations( + appUtilization, Time.now()); + } + /** * Check the resource utilization of the node. If the utilization is below * the over-allocation threshold, {@link ContainerScheduler} is notified to http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 16f019f..f8e447d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -206,6 +206,7 @@ public class ResourceManager extends CompositeService private final String zkRootNodePassword = Long.toString(new SecureRandom().nextLong()); private boolean recoveryEnabled; + private ResourceUtilizationAggregator resUtilAggregator; @VisibleForTesting protected String webAppAddress; @@ -344,6 +345,8 @@ public class ResourceManager extends CompositeService rmContext.setSystemMetricsPublisher(systemMetricsPublisher); registerMXBean(); + this.resUtilAggregator = new ResourceUtilizationAggregator(rmContext); + addService(this.resUtilAggregator); super.serviceInit(this.conf); } @@ -1645,4 +1648,9 @@ public class ResourceManager extends CompositeService public boolean isSecurityEnabled() { return UserGroupInformation.isSecurityEnabled(); } + + @VisibleForTesting + ResourceUtilizationAggregator getResUtilizationAggregator() { + return resUtilAggregator; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceUtilizationAggregator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceUtilizationAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceUtilizationAggregator.java new file mode 100644 index 0000000..fb98d5f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceUtilizationAggregator.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import com.google.common.annotations.VisibleForTesting; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * This computes a snapshot of aggregated actual resource utilization across + * Applications, Users and Queues. Queue aggregation will be performed + * only at the LeafQueue level. + * The snapshot calculation interval is set to the Node heartbeat interval. + * It is assumed that all nodes would have heartbeat-ed to the RM in that + * interval. + */ +public class ResourceUtilizationAggregator extends AbstractService { + + private static final Logger LOG = + LoggerFactory.getLogger(ResourceUtilizationAggregator.class); + + private static final Function<Object, ResourceUtilization> RU_GENERATOR = + (x -> ResourceUtilization.newZero()); + + private final RMContext rmContext; + private final ScheduledExecutorService scheduledExecutor; + private volatile Map<ApplicationId, ResourceUtilization> + stalePerAppUtilization = new HashMap<>(); + private volatile Map<String, ResourceUtilization> stalePerUserUtilization = + new HashMap<>(); + private volatile Map<Queue, ResourceUtilization> stalePerQueueUtilization = + new HashMap<>(); + + private AggregationTask aggTask = null; + + final class AggregationTask implements Runnable { + @Override + public void run() { + ConcurrentMap<NodeId, RMNode> rmNodes = rmContext.getRMNodes(); + Map<ApplicationId, ResourceUtilization> perAppUtilization = + new HashMap<>(); + Map<String, ResourceUtilization> perUserUtilization = + new HashMap<>(); + Map<Queue, ResourceUtilization> perQueueUtilization = + new HashMap<>(); + rmNodes.values().stream() + .filter(n -> !n.getState().isUnusable()) + .forEach(rmNode -> { + Map<ApplicationId, ResourceUtilization> aggAppUtilizations = + rmNode.getAggregatedAppUtilizations(); + if (aggAppUtilizations != null) { + aggAppUtilizations.forEach((appId, appResUtilPerNode) -> { + RMApp rmApp = rmContext.getRMApps().get(appId); + if (rmApp != null) { + SchedulerApplicationAttempt appAttempt = + ((AbstractYarnScheduler) rmContext.getScheduler()) + .getApplicationAttempt( + rmApp.getCurrentAppAttempt().getAppAttemptId()); + if (appAttempt != null) { + Queue queue = appAttempt.getQueue(); + perQueueUtilization.computeIfAbsent(queue, RU_GENERATOR) + .addTo(appResUtilPerNode); + perAppUtilization.computeIfAbsent(appId, RU_GENERATOR) + .addTo(appResUtilPerNode); + String user = appAttempt.getUser(); + if (user != null) { + perUserUtilization.computeIfAbsent(user, RU_GENERATOR) + .addTo(appResUtilPerNode); + } else { + LOG.warn("No user found for application attempt [{}]!!", + appAttempt.getApplicationAttemptId()); + } + } else { + LOG.warn("No App Attempt for application [{}]!!", appId); + } + } else { + LOG.warn("Invalid Application [{}] received !!", appId); + } + }); + } + }); + stalePerAppUtilization = perAppUtilization; + stalePerQueueUtilization = perQueueUtilization; + stalePerUserUtilization = perUserUtilization; + } + } + + /** + * Construct the service. + */ + public ResourceUtilizationAggregator(RMContext rmContext) { + super("Resource Utilization Aggregator"); + this.rmContext = rmContext; + this.scheduledExecutor = new HadoopScheduledThreadPoolExecutor(1); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + long aggInterval = conf.getLong( + YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, + YarnConfiguration.DEFAULT_RM_NM_HEARTBEAT_INTERVAL_MS); + this.aggTask = new AggregationTask(); + this.scheduledExecutor.scheduleAtFixedRate(aggTask, + aggInterval, aggInterval, TimeUnit.MILLISECONDS); + } + + @VisibleForTesting + void kickoffAggregation() { + this.aggTask.run(); + } + + /** + * Return aggregated Resource Utilization for the User. + * @param user User. + * @return Resource Utilization. + */ + public ResourceUtilization getUserResourceUtilization(String user) { + return stalePerUserUtilization.computeIfAbsent(user, RU_GENERATOR); + } + + /** + * Return aggregated Resource Utilization for the Queue. Currently, + * user is expected to provide the Leaf Queue. Aggregation across + * the queue hierarchy is not supported since queue traversal is + * not consistent across schedulers. + * @param queue Queue. + * @return Resource Utilization. + */ + public ResourceUtilization getQueueResourceUtilization(Queue queue) { + return stalePerQueueUtilization.computeIfAbsent(queue, RU_GENERATOR); + } + + /** + * Return aggregated Resource Utilization for the application. + * @param applicationId Application Id. + * @return Resource Utilization. + */ + public ResourceUtilization getAppResourceUtilization( + ApplicationId applicationId) { + return stalePerAppUtilization.computeIfAbsent(applicationId, RU_GENERATOR); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index aa19483..024160d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -112,6 +112,13 @@ public interface RMNode { public ResourceUtilization getAggregatedContainersUtilization(); /** + * the per app aggregated utilization of the containers running + * on the node. + * @return the aggregated per-app container utilzation; + */ + Map<ApplicationId, ResourceUtilization> getAggregatedAppUtilizations(); + + /** * the total resource utilization of the node. * @return the total resource utilization of the node. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index 0e32f1e..de32e99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -138,6 +138,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { private ResourceUtilization containersUtilization; /* Resource utilization for the node. */ private ResourceUtilization nodeUtilization; + /* Per app aggregate utilization. */ + private Map<ApplicationId, ResourceUtilization> appUtilizations; /** Physical resources in the node. */ private volatile Resource physicalResource; @@ -508,9 +510,29 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { } @Override - public ResourceUtilization getAggregatedContainersUtilization() { + public Map<ApplicationId, ResourceUtilization> + getAggregatedAppUtilizations() { this.readLock.lock(); + try { + return this.appUtilizations; + } finally { + this.readLock.unlock(); + } + } + + public void setAggregatedAppUtilizations( + Map<ApplicationId, ResourceUtilization> appUtils) { + this.writeLock.lock(); + try { + this.appUtilizations = appUtils; + } finally { + this.writeLock.unlock(); + } + } + @Override + public ResourceUtilization getAggregatedContainersUtilization() { + this.readLock.lock(); try { return this.containersUtilization; } finally { @@ -830,6 +852,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> { rmNode.setAggregatedContainersUtilization(statusEvent .getAggregatedContainersUtilization()); rmNode.setNodeUtilization(statusEvent.getNodeUtilization()); + rmNode.setAggregatedAppUtilizations( + statusEvent.getAggregateAppUtilization()); return remoteNodeHealthStatus; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index 5f5fe24..d61bc99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -63,6 +64,10 @@ public class RMNodeStatusEvent extends RMNodeEvent { return this.nodeStatus.getContainersUtilization(); } + public Map<ApplicationId, ResourceUtilization> getAggregateAppUtilization() { + return this.nodeStatus.getApplicationUtilizations(); + } + public ResourceUtilization getNodeUtilization() { return this.nodeStatus.getNodeUtilization(); } @@ -79,7 +84,7 @@ public class RMNodeStatusEvent extends RMNodeEvent { List<LogAggregationReport> logAggregationReportsForApps) { this.logAggregationReportsForApps = logAggregationReportsForApps; } - + public List<Container> getNMReportedIncreasedContainers() { return this.nodeStatus.getIncreasedContainers() == null ? Collections.emptyList() : this.nodeStatus.getIncreasedContainers(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 2e28395..6a0d209 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -141,7 +142,7 @@ public class MockNM { container.getResource()); List<Container> increasedConts = Collections.singletonList(container); nodeHeartbeat(Collections.singletonList(containerStatus), increasedConts, - true, responseId); + true, responseId, null); } public void addRegisteringCollector(ApplicationId appId, @@ -211,7 +212,13 @@ public class MockNM { public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { return nodeHeartbeat(Collections.<ContainerStatus>emptyList(), - Collections.<Container>emptyList(), isHealthy, responseId); + Collections.<Container>emptyList(), isHealthy, responseId, null); + } + + public NodeHeartbeatResponse nodeHeartbeat( + Map<ApplicationId, ResourceUtilization> appUtil) throws Exception { + return nodeHeartbeat(Collections.<ContainerStatus>emptyList(), + Collections.<Container>emptyList(), true, responseId, appUtil); } public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, @@ -224,7 +231,7 @@ public class MockNM { containerStatusList.add(containerStatus); Log.getLog().info("ContainerStatus: " + containerStatus); return nodeHeartbeat(containerStatusList, - Collections.<Container>emptyList(), true, responseId); + Collections.<Container>emptyList(), true, responseId, null); } public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId, @@ -239,18 +246,18 @@ public class MockNM { updatedStats.addAll(stats); } return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(), - isHealthy, resId); + isHealthy, resId, null); } public NodeHeartbeatResponse nodeHeartbeat( List<ContainerStatus> updatedStats, boolean isHealthy) throws Exception { return nodeHeartbeat(updatedStats, Collections.<Container>emptyList(), - isHealthy, responseId); + isHealthy, responseId, null); } public NodeHeartbeatResponse nodeHeartbeat(List<ContainerStatus> updatedStats, - List<Container> increasedConts, boolean isHealthy, int resId) - throws Exception { + List<Container> increasedConts, boolean isHealthy, int resId, + Map<ApplicationId, ResourceUtilization> appUtil) throws Exception { NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeStatus status = Records.newRecord(NodeStatus.class); status.setResponseId(resId); @@ -273,6 +280,11 @@ public class MockNM { healthStatus.setIsNodeHealthy(isHealthy); healthStatus.setLastHealthReportTime(1); status.setNodeHealthStatus(healthStatus); + + if (appUtil != null && !appUtil.isEmpty()) { + status.setApplicationUtilizations(appUtil); + } + req.setNodeStatus(status); req.setLastKnownContainerTokenMasterKey(this.currentContainerTokenMasterKey); req.setLastKnownNMTokenMasterKey(this.currentNMTokenMasterKey); http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index d841ff0..d7edb1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -121,6 +121,7 @@ public class MockNodes { private Set<String> labels; private ResourceUtilization containersUtilization; private ResourceUtilization nodeUtilization; + private Map<ApplicationId, ResourceUtilization> appUtilization; private Resource physicalResource; private OverAllocationInfo overAllocationInfo; private List<UpdatedContainerInfo> containerUpdates = @@ -330,13 +331,28 @@ public class MockNodes { return this.physicalResource; } + @Override + public Map<ApplicationId, ResourceUtilization> + getAggregatedAppUtilizations() { + return this.appUtilization; + } + public void updateContainersInfoAndUtilization( UpdatedContainerInfo updatedContainerInfo, ResourceUtilization resourceUtilization) { + updateContainersInfoAndUtilization( + updatedContainerInfo, resourceUtilization, null); + } + + public void updateContainersInfoAndUtilization( + UpdatedContainerInfo updatedContainerInfo, + ResourceUtilization resourceUtilization, + Map<ApplicationId, ResourceUtilization> appUtils) { if (updatedContainerInfo != null) { containerUpdates = Collections.singletonList(updatedContainerInfo); } this.containersUtilization = resourceUtilization; + this.appUtilization = appUtils; } }; http://git-wip-us.apache.org/repos/asf/hadoop/blob/608f0099/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceUtilizationAggregator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceUtilizationAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceUtilizationAggregator.java new file mode 100644 index 0000000..f2fb75d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceUtilizationAggregator.java @@ -0,0 +1,357 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + + +/** + * Test ResourceUtilizationAggregator functionality. + */ +public class TestResourceUtilizationAggregator { + + private static final int GB = 1024; + + private MockRM rm; + private DrainDispatcher dispatcher; + private MockNM nm1, nm2, nm3, nm4; + private ResourceUtilizationAggregator agg; + + @Before + public void createAndStartRM() throws Exception { + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 100); + startRM(conf); + + nm1 = new MockNM("h1:1234", 8 * GB, rm.getResourceTrackerService()); + nm2 = new MockNM("h1:4321", 8 * GB, rm.getResourceTrackerService()); + nm3 = new MockNM("h2:1234", 8 * GB, rm.getResourceTrackerService()); + nm4 = new MockNM("h2:4321", 8 * GB, rm.getResourceTrackerService()); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + agg = rm.getRMContext().getResourceManager().getResUtilizationAggregator(); + } + + private void startRM(final YarnConfiguration conf) { + dispatcher = new DrainDispatcher(); + rm = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + } + + @After + public void stopRM() { + if (rm != null) { + rm.stop(); + } + } + + /** + * Check if Resource Utilization Aggregation works correctly. + * Start 3 Apps across 4 nodes : 2 apps by 'user1' and 1 by 'user2' + * .. but all on the same queue. + * + * Step 1: Send Node Heartbeats with App Resource Utilization. + * Ensure the Resource utilization is correctly aggregated across + * apps, users and queues. + * + * Step 2: Resend Node Heartbeats with Increase in one App's Utilization + * Ensure the Resource utilization is correctly aggregated across + * apps, users and queues. + * + * Step 3: Resend Node Heatbeats with Decrease in utilization across + * all app. Ensure the Resource utilization is correctly aggregated across + * apps, users and queues. + * + * @throws Exception + */ + @Test(timeout = 60000) + public void testResourceUtilizationAggregation() throws Exception { + + RMApp app1 = rm.submitApp(1 * GB, "app1", "user1", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + + RMApp app2 = rm.submitApp(1 * GB, "app2", "user2", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm3); + + RMApp app3 = rm.submitApp(1 * GB, "app3", "user2", null, "default"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm4); + + am1.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, true))), + null); + + am2.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, true))), + null); + + am3.allocate( + Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), + "*", Resources.createResource(1 * GB), 1, true, null, + ExecutionTypeRequest.newInstance( + ExecutionType.GUARANTEED, true))), + null); + + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + rm.drainEvents(); + dispatcher.waitForEventThreadToWait(); + + AllocateResponse alloc1 = am1.allocate( + new ArrayList<>(), new ArrayList<>()); + assertEquals(1, alloc1.getAllocatedContainers().size()); + + AllocateResponse alloc2 = am2.allocate( + new ArrayList<>(), new ArrayList<>()); + assertEquals(1, alloc2.getAllocatedContainers().size()); + + AllocateResponse alloc3 = am3.allocate( + new ArrayList<>(), new ArrayList<>()); + assertEquals(1, alloc3.getAllocatedContainers().size()); + + ApplicationId appId1 = app1.getApplicationId(); + ApplicationId appId2 = app2.getApplicationId(); + ApplicationId appId3 = app3.getApplicationId(); + + AbstractYarnScheduler sched = + (AbstractYarnScheduler) rm.getRMContext().getScheduler(); + SchedulerApplicationAttempt appAttempt1 = + sched.getApplicationAttempt( + app1.getCurrentAppAttempt().getAppAttemptId()); + SchedulerApplicationAttempt appAttempt2 = + sched.getApplicationAttempt( + app2.getCurrentAppAttempt().getAppAttemptId()); + + // START Step 1 ========> + // Send Node Heartbeats with App Resource Utilization. + // Ensure the Resource utilization is correctly aggregated across + // apps, users and queues. + sendHeartBeatsWithAppUtil( + mkmap( + e(appId1, ResourceUtilization.newInstance(1, 1, 0.1f)), + e(appId2, ResourceUtilization.newInstance(3, 3, 0.3f)) + ), + mkmap( + e(appId1, ResourceUtilization.newInstance(2, 2, 0.2f)) + ), + mkmap( + e(appId3, ResourceUtilization.newInstance(5, 5, 0.5f)) + ), + mkmap( + e(appId2, ResourceUtilization.newInstance(4, 4, 0.4f)) + )); + + ResourceUtilization aRU1 = + agg.getAppResourceUtilization(app1.getApplicationId()); + ResourceUtilization aRU2 = + agg.getAppResourceUtilization(app2.getApplicationId()); + ResourceUtilization aRU3 = + agg.getAppResourceUtilization(app3.getApplicationId()); + + ResourceUtilization uRU1 = agg.getUserResourceUtilization("user1"); + ResourceUtilization uRU2 = agg.getUserResourceUtilization("user2"); + + // Check aggregated utilization across nodes for + // each application + assertEquals(3, aRU1.getPhysicalMemory()); + assertEquals(3, aRU1.getVirtualMemory()); + assertEquals(7, aRU2.getPhysicalMemory()); + assertEquals(7, aRU2.getVirtualMemory()); + assertEquals(5, aRU3.getPhysicalMemory()); + assertEquals(5, aRU3.getVirtualMemory()); + + // Check aggregated utilization across nodes for + // each user + assertEquals(3, uRU1.getPhysicalMemory()); + assertEquals(3, uRU1.getVirtualMemory()); + assertEquals(12, uRU2.getPhysicalMemory()); + assertEquals(12, uRU2.getVirtualMemory()); + + assertEquals(appAttempt1.getQueue(), appAttempt2.getQueue()); + + // All three applications are bound to the same queue, + // so the queue utilization should be the total aggregate.. + ResourceUtilization qRU = + agg.getQueueResourceUtilization(appAttempt1.getQueue()); + assertEquals(15, qRU.getPhysicalMemory()); + assertEquals(15, qRU.getVirtualMemory()); + // <======== END Step 1 + + Queue queue = appAttempt1.getQueue(); + // Step 2: Resend Node Heartbeats with Increase in one App's Utilization + // Ensure the Resource utilization is correctly aggregated across + // apps, users and queues. + checkAggAfterUtilIncrease(appId1, appId2, appId3, queue, "user1", "user2"); + + // Step 3: Resend Node Heatbeats with Decrease in utilization across + // all app. Ensure the Resource utilization is correctly aggregated across + // apps, users and queues. + checkAggAfterUtilDecrease(appId1, appId2, appId3, queue, "user1", "user2"); + } + + + private void checkAggAfterUtilDecrease(ApplicationId appId1, + ApplicationId appId2, ApplicationId appId3, + Queue queue, String user1, String user2) + throws Exception { + sendHeartBeatsWithAppUtil( + mkmap( + e(appId1, ResourceUtilization.newInstance(1, 1, 0.1f)), + e(appId2, ResourceUtilization.newInstance(1, 1, 0.1f)) + ), + mkmap( + e(appId1, ResourceUtilization.newInstance(1, 1, 0.1f)) + ), + mkmap( + e(appId3, ResourceUtilization.newInstance(1, 1, 0.1f)) + ), + mkmap( + e(appId2, ResourceUtilization.newInstance(1, 1, 0.1f)) + )); + + // All Utilizations should decrease.. + assertEquals(2, + agg.getAppResourceUtilization(appId1).getPhysicalMemory()); + assertEquals(2, + agg.getAppResourceUtilization(appId2).getPhysicalMemory()); + assertEquals(1, + agg.getAppResourceUtilization(appId3).getPhysicalMemory()); + assertEquals(2, + agg.getUserResourceUtilization(user1).getPhysicalMemory()); + assertEquals(3, + agg.getUserResourceUtilization(user2).getPhysicalMemory()); + assertEquals(5, + agg.getQueueResourceUtilization(queue).getPhysicalMemory()); + } + + + private void checkAggAfterUtilIncrease(ApplicationId appId1, + ApplicationId appId2, ApplicationId appId3, + Queue queue, String user1, String user2) + throws Exception { + sendHeartBeatsWithAppUtil( + mkmap( + e(appId1, ResourceUtilization.newInstance(2, 2, 0.1f)), + e(appId2, ResourceUtilization.newInstance(3, 3, 0.3f)) + ), + mkmap( + e(appId1, ResourceUtilization.newInstance(2, 2, 0.2f)) + ), + mkmap( + e(appId3, ResourceUtilization.newInstance(5, 5, 0.5f)) + ), + mkmap( + e(appId2, ResourceUtilization.newInstance(4, 4, 0.4f)) + )); + + // App1, User1 and overall Queue utilization should increase + // Everything else should stay the same.. + assertEquals(4, + agg.getAppResourceUtilization(appId1).getPhysicalMemory()); + assertEquals(7, + agg.getAppResourceUtilization(appId2).getPhysicalMemory()); + assertEquals(5, + agg.getAppResourceUtilization(appId3).getPhysicalMemory()); + assertEquals(4, + agg.getUserResourceUtilization(user1).getPhysicalMemory()); + assertEquals(12, + agg.getUserResourceUtilization(user2).getPhysicalMemory()); + assertEquals(16, + agg.getQueueResourceUtilization(queue).getPhysicalMemory()); + } + + + private void sendHeartBeatsWithAppUtil( + Map<ApplicationId, ResourceUtilization> nm1AppUtil, + Map<ApplicationId, ResourceUtilization> nm2AppUtil, + Map<ApplicationId, ResourceUtilization> nm3AppUtil, + Map<ApplicationId, ResourceUtilization> nm4AppUtil) throws Exception{ + nm1.nodeHeartbeat(nm1AppUtil); + nm2.nodeHeartbeat(nm2AppUtil); + nm3.nodeHeartbeat(nm3AppUtil); + nm4.nodeHeartbeat(nm4AppUtil); + + // Wait for scheduler to process all events + rm.drainEvents(); + dispatcher.waitForEventThreadToWait(); + + agg.kickoffAggregation(); + } + /** + * Utility function to create a map. + */ + private static <K, V> Map<K, V> mkmap(AbstractMap.SimpleEntry<K, V>... es) { + return Stream.of(es).collect( + Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + /** + * Utility function to create a map entry to me used by above function. + */ + private static <K, V> AbstractMap.SimpleEntry<K, V> e(K key, V val) { + return new AbstractMap.SimpleEntry<>(key, val); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org