Repository: hadoop
Updated Branches:
  refs/heads/YARN-2928 bc698197c -> bcbb52d8a


YARN-4712. CPU Usage Metric is not captured properly in YARN-2928. 
(Naganarasimha G R via Varun Saxena)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bcbb52d8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bcbb52d8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bcbb52d8

Branch: refs/heads/YARN-2928
Commit: bcbb52d8a8397ef9fbf5dd69447ad5c1c95e4364
Parents: bc69819
Author: Varun Saxena <varunsax...@apache.org>
Authored: Fri Mar 18 23:19:18 2016 +0530
Committer: Varun Saxena <varunsax...@apache.org>
Committed: Fri Mar 18 23:19:18 2016 +0530

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../monitor/ContainersMonitorImpl.java          |   9 +-
 .../timelineservice/NMTimelinePublisher.java    |  21 ++-
 .../TestNMTimelinePublisher.java                | 157 +++++++++++++++++++
 4 files changed, 174 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcbb52d8/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 762e43c..ab4c706 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -241,6 +241,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-4700. ATS storage has one extra record each time the RM got restarted.
     (Naganarasimha G R via varunsaxena)
 
+    YARN-4712. CPU Usage Metric is not captured properly in YARN-2928.
+    (Naganarasimha G R via varunsaxena)
+
 Trunk - Unreleased
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcbb52d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index 4bd5026..5cf484c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -32,11 +31,10 @@ import 
org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -47,6 +45,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 public class ContainersMonitorImpl extends AbstractService implements
@@ -563,8 +562,8 @@ public class ContainersMonitorImpl extends AbstractService 
implements
             NMTimelinePublisher nmMetricsPublisher =
                 container.getNMTimelinePublisher();
             if (nmMetricsPublisher != null) {
-              nmMetricsPublisher.reportContainerResourceUsage(container, pId,
-                  currentPmemUsage, cpuUsageTotalCoresPercentage);
+              nmMetricsPublisher.reportContainerResourceUsage(container,
+                  currentPmemUsage, cpuUsagePercentPerCore);
             }
           } catch (Exception e) {
             // Log the exception and proceed to the next container.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcbb52d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
index 684feaa..70b7e8d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -113,29 +113,28 @@ public class NMTimelinePublisher extends CompositeService 
{
   }
 
   @SuppressWarnings("unchecked")
-  public void reportContainerResourceUsage(Container container, String pId,
-      Long pmemUsage, Float cpuUsageTotalCoresPercentage) {
+  public void reportContainerResourceUsage(Container container, Long pmemUsage,
+      Float cpuUsagePercentPerCore) {
     if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE ||
-        cpuUsageTotalCoresPercentage !=
-            ResourceCalculatorProcessTree.UNAVAILABLE) {
+        cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
       ContainerEntity entity =
           createContainerEntity(container.getContainerId());
       long currentTimeMillis = System.currentTimeMillis();
       if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
         TimelineMetric memoryMetric = new TimelineMetric();
-        memoryMetric.setId(ContainerMetric.MEMORY.toString() + pId);
+        memoryMetric.setId(ContainerMetric.MEMORY.toString());
         memoryMetric.addValue(currentTimeMillis, pmemUsage);
         entity.addMetric(memoryMetric);
       }
-      if (cpuUsageTotalCoresPercentage !=
-          ResourceCalculatorProcessTree.UNAVAILABLE) {
+      if (cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) 
{
         TimelineMetric cpuMetric = new TimelineMetric();
-        cpuMetric.setId(ContainerMetric.CPU.toString() + pId);
-        cpuMetric.addValue(currentTimeMillis, cpuUsageTotalCoresPercentage);
+        cpuMetric.setId(ContainerMetric.CPU.toString());
+        cpuMetric.addValue(currentTimeMillis,
+            Math.round(cpuUsagePercentPerCore));
         entity.addMetric(cpuMetric);
       }
-      dispatcher.getEventHandler().handle(
-          new TimelinePublishEvent(entity, container.getContainerId()
+      dispatcher.getEventHandler()
+          .handle(new TimelinePublishEvent(entity, container.getContainerId()
               .getApplicationAttemptId().getApplicationId()));
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcbb52d8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
new file mode 100644
index 0000000..830ed6b
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/TestNMTimelinePublisher.java
@@ -0,0 +1,157 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestNMTimelinePublisher {
+  private static final String MEMORY_ID = "MEMORY";
+  private static final String CPU_ID = "CPU";
+
+  @Test
+  public void testContainerResourceUsage() {
+    Context context = mock(Context.class);
+    @SuppressWarnings("unchecked")
+    ConcurrentMap<ApplicationId, Application> map = mock(ConcurrentMap.class);
+    Application aApp = mock(Application.class);
+    when(map.get(any(ApplicationId.class))).thenReturn(aApp);
+    DummyTimelineClient timelineClient = new DummyTimelineClient();
+    when(aApp.getTimelineClient()).thenReturn(timelineClient);
+    when(context.getApplications()).thenReturn(map);
+    when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
+    when(context.getHttpPort()).thenReturn(0);
+    NMTimelinePublisher publisher = new NMTimelinePublisher(context);
+    publisher.init(new Configuration());
+    publisher.start();
+    Container aContainer = mock(Container.class);
+    when(aContainer.getContainerId()).thenReturn(ContainerId.newContainerId(
+        ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1),
+        0L));
+    publisher.reportContainerResourceUsage(aContainer, 1024L, 8F);
+    verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 8);
+    timelineClient.reset();
+
+    publisher.reportContainerResourceUsage(aContainer, 1024L, 0.8F);
+    verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 1);
+    timelineClient.reset();
+
+    publisher.reportContainerResourceUsage(aContainer, 1024L, 0.49F);
+    verifyPublishedResourceUsageMetrics(timelineClient, 1024L, 0);
+    timelineClient.reset();
+
+    publisher.reportContainerResourceUsage(aContainer, 1024L,
+        (float) ResourceCalculatorProcessTree.UNAVAILABLE);
+    verifyPublishedResourceUsageMetrics(timelineClient, 1024L,
+        ResourceCalculatorProcessTree.UNAVAILABLE);
+    publisher.stop();
+  }
+
+  private void verifyPublishedResourceUsageMetrics(
+      DummyTimelineClient timelineClient, long memoryUsage, int cpuUsage) {
+    TimelineEntity[] entities = null;
+    for (int i = 0; i < 10; i++) {
+      entities = timelineClient.getLastPublishedEntities();
+      if (entities != null) {
+        break;
+      }
+      try {
+        Thread.sleep(150L);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    }
+    int numberOfResourceMetrics = 0;
+    numberOfResourceMetrics +=
+        (memoryUsage == ResourceCalculatorProcessTree.UNAVAILABLE) ? 0 : 1;
+    numberOfResourceMetrics +=
+        (cpuUsage == ResourceCalculatorProcessTree.UNAVAILABLE) ? 0 : 1;
+    assertNotNull("entities are expected to be published", entities);
+    assertEquals("Expected number of metrics notpublished",
+        numberOfResourceMetrics, entities[0].getMetrics().size());
+    Iterator<TimelineMetric> metrics = entities[0].getMetrics().iterator();
+    while (metrics.hasNext()) {
+      TimelineMetric metric = metrics.next();
+      Iterator<Entry<Long, Number>> entrySet;
+      switch (metric.getId()) {
+      case CPU_ID:
+        if (cpuUsage == ResourceCalculatorProcessTree.UNAVAILABLE) {
+          Assert.fail("Not Expecting CPU Metric to be published");
+        }
+        entrySet = metric.getValues().entrySet().iterator();
+        assertEquals("CPU usage metric not matching", cpuUsage,
+            entrySet.next().getValue());
+        break;
+      case MEMORY_ID:
+        if (memoryUsage == ResourceCalculatorProcessTree.UNAVAILABLE) {
+          Assert.fail("Not Expecting Memory Metric to be published");
+        }
+        entrySet = metric.getValues().entrySet().iterator();
+        assertEquals("Memory usage metric not matching", memoryUsage,
+            entrySet.next().getValue());
+        break;
+      default:
+        Assert.fail("Invalid Resource Usage metric");
+        break;
+      }
+    }
+  }
+
+  protected static class DummyTimelineClient extends TimelineClientImpl {
+    private TimelineEntity[] lastPublishedEntities;
+
+    @Override
+    public void putEntities(TimelineEntity... entities)
+        throws IOException, YarnException {
+      this.lastPublishedEntities = entities;
+    }
+
+    public TimelineEntity[] getLastPublishedEntities() {
+      return lastPublishedEntities;
+    }
+
+    public void reset() {
+      lastPublishedEntities = null;
+    }
+  }
+}

Reply via email to