Repository: ambari
Updated Branches:
  refs/heads/trunk a1251ecc9 -> 7ff7bcf3a


AMBARI-17909 AMS Storm Sink: apply change of Storm metrics improvement - worker 
level aggregation. (Jungtaek Lim via avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7ff7bcf3
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7ff7bcf3
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7ff7bcf3

Branch: refs/heads/trunk
Commit: 7ff7bcf3a2d40693cb0a5881661236e07335973a
Parents: a1251ec
Author: Aravindan Vijayan <[email protected]>
Authored: Thu Jul 28 09:01:40 2016 -0700
Committer: Aravindan Vijayan <[email protected]>
Committed: Thu Jul 28 09:56:29 2016 -0700

----------------------------------------------------------------------
 .../sink/storm/StormTimelineMetricsSink.java    | 87 +++++++++++++++++++-
 .../storm/StormTimelineMetricsSinkTest.java     | 58 ++++++++++++-
 .../0.1.0/configuration/storm-site.xml          | 43 +++++++++-
 3 files changed, 181 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7ff7bcf3/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 3a4289b..f6531c8 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.metrics2.sink.storm;
 
+import org.apache.storm.Constants;
 import org.apache.storm.metric.api.IMetricsConsumer;
 import org.apache.storm.task.IErrorReporter;
 import org.apache.storm.task.TopologyContext;
@@ -33,6 +34,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -40,11 +42,17 @@ import static 
org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach
 import static 
org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
 
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink 
implements IMetricsConsumer {
+  // covers built-in metrics but still not beauty
+  private static final String[] METRIC_LOWERCASE_SUBSTRINGS_AGGREGATE_AVERAGE 
= { "-latency", "timems", "time_ms", "rate_secs", "timesecs" };
+
   private static final String[] WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME = 
{ ".", "_" };
 
   // create String manually in order to not rely on Guava Joiner or having our 
own
   private static final String JOINED_WARN_STRINGS_FOR_MESSAGE = "\".\", \"_\"";
 
+  // it's safe since it doesn't exceed the boundary
+  public static final int SYSTEM_TASK_ID = (int) Constants.SYSTEM_TASK_ID;
+
   public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId";
   public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm";
   public static final String METRIC_NAME_PREFIX_KAFKA_OFFSET = "kafkaOffset.";
@@ -136,7 +144,16 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
 
     for (DataPoint dataPoint : dataPoints) {
       LOG.debug(dataPoint.name + " = " + dataPoint.value);
-      List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
+
+      List<DataPoint> populatedDataPoints;
+      if (taskInfo.srcTaskId == SYSTEM_TASK_ID && dataPoint.value instanceof 
Collection) {
+        // worker level aggregated metrics - aggregation should be handled
+        List<DataPoint> populatedBeforeAggregationDataPoints = 
populateAllDataPointValues(dataPoint);
+        Map<String, List<Double>> metricNameKeyedValues = 
groupByMetricNameDataPoints(populatedBeforeAggregationDataPoints);
+        populatedDataPoints = 
applyAggregationToMetricNameKeyedDataPoints(metricNameKeyedValues);
+      } else {
+        populatedDataPoints = populateDataPoints(dataPoint);
+      }
 
       for (DataPoint populatedDataPoint : populatedDataPoints) {
         String metricName;
@@ -189,6 +206,22 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
     return topologyId.substring(0, topologyId.substring(0, 
topologyId.lastIndexOf("-")).lastIndexOf("-"));
   }
 
+  private List<DataPoint> populateAllDataPointValues(DataPoint dataPoint) {
+    List<DataPoint> populatedDataPoints = new ArrayList<>();
+    Collection<Object> values = (Collection<Object>) dataPoint.value;
+    for (Object value : values) {
+      List<DataPoint> populated = populateDataPoints(new 
DataPoint(dataPoint.name, value));
+      for (DataPoint point : populated) {
+        if (point.value == null) {
+          continue;
+        }
+
+        populatedDataPoints.add(point);
+      }
+    }
+    return populatedDataPoints;
+  }
+
   private List<DataPoint> populateDataPoints(DataPoint dataPoint) {
     List<DataPoint> dataPoints = new ArrayList<>();
 
@@ -233,6 +266,58 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
     }
   }
 
+  private Map<String, List<Double>> 
groupByMetricNameDataPoints(List<DataPoint> populatedDataPoints) {
+    Map<String, List<Double>> metricNameKeyedValues = new HashMap<>();
+    for (DataPoint point : populatedDataPoints) {
+      List<Double> valuesOnMetric = metricNameKeyedValues.get(point.name);
+
+      if (valuesOnMetric == null) {
+        valuesOnMetric = new ArrayList<>();
+        metricNameKeyedValues.put(point.name, valuesOnMetric);
+      }
+
+      valuesOnMetric.add(Double.valueOf(point.value.toString()));
+    }
+    return metricNameKeyedValues;
+  }
+
+  private List<DataPoint> 
applyAggregationToMetricNameKeyedDataPoints(Map<String, List<Double>> 
metricNameKeyedValues) {
+    List<DataPoint> populatedDataPoints = new ArrayList<>();
+    for (Map.Entry<String, List<Double>> metricNameToValues : 
metricNameKeyedValues.entrySet()) {
+      String key = metricNameToValues.getKey();
+      List<Double> values = metricNameToValues.getValue();
+      populatedDataPoints.add(new DataPoint(key, applyAggregateFunction(key, 
values)));
+    }
+    return populatedDataPoints;
+  }
+
+  private Double applyAggregateFunction(String metricName, List<Double> 
values) {
+    String lowerCaseMetricName = metricName.toLowerCase();
+    for (String aggregateMetricSubstring : 
METRIC_LOWERCASE_SUBSTRINGS_AGGREGATE_AVERAGE) {
+      if (lowerCaseMetricName.contains(aggregateMetricSubstring)) {
+        return calculateAverage(values);
+      }
+    }
+
+    return calculateSummation(values);
+  }
+
+  private Double calculateSummation(List<Double> values) {
+    Double sum = 0.0;
+    for (Double value : values) {
+      sum += value;
+    }
+    return sum;
+  }
+
+  private Double calculateAverage(List<Double> values) {
+    if (values.isEmpty()) {
+      return 0.0d;
+    }
+
+    return calculateSummation(values) / values.size();
+  }
+
   private TimelineMetric createTimelineMetric(long currentTimeMillis, String 
hostName,
                                               String attributeName, Double 
attributeValue) {
     TimelineMetric timelineMetric = new TimelineMetric();

http://git-wip-us.apache.org/repos/asf/ambari/blob/7ff7bcf3/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
 
b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index fadb00c..81a73db 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.metrics2.sink.storm;
 
 import static 
org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.METRIC_NAME_PREFIX_KAFKA_OFFSET;
+import static 
org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.SYSTEM_TASK_ID;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.createMockBuilder;
 import static org.easymock.EasyMock.createNiceMock;
@@ -29,12 +30,16 @@ import static org.easymock.EasyMock.verify;
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.apache.storm.Constants;
+import org.apache.storm.shade.com.google.common.collect.Lists;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -123,12 +128,59 @@ public class StormTimelineMetricsSinkTest {
     stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
     replay(timelineMetricsCache);
 
-    Map<String, Object> valueMap = new HashMap<>();
-    valueMap.put("field1", 53);
-    valueMap.put("field2", 64.12);
+    Map<String, Object> valueMap = getTestValueMap();
     stormTimelineMetricsSink.handleDataPoints(
         new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 
20000L, 60),
         Collections.singleton(new IMetricsConsumer.DataPoint("key1", 
valueMap)));
     verify(timelineMetricsCache);
   }
+
+  @Test
+  @Ignore // TODO: Fix for failover
+  public void testWorkerLevelAggregatedNumericMetricMetricSubmission() throws 
InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new 
StormTimelineMetricsSink();
+    stormTimelineMetricsSink.setTopologyName("topology1");
+    TimelineMetricsCache timelineMetricsCache = 
createNiceMock(TimelineMetricsCache.class);
+    
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234."
 + SYSTEM_TASK_ID + ".key1"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 
SYSTEM_TASK_ID, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", 
Lists.newArrayList(42.3, 42.3))));
+    verify(timelineMetricsCache);
+  }
+
+  @Test
+  @Ignore // TODO: Fix for failover
+  public void testWorkerLevelAggregatedMapMetricMetricSubmission() throws 
InterruptedException, IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new 
StormTimelineMetricsSink();
+    stormTimelineMetricsSink.setTopologyName("topology1");
+    TimelineMetricsCache timelineMetricsCache = 
createNiceMock(TimelineMetricsCache.class);
+    
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234."
 + SYSTEM_TASK_ID + ".key1.field1"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+
+    List<Map<String, Object>> valueMapList = new ArrayList<>();
+    valueMapList.add(getTestValueMap());
+    valueMapList.add(getTestValueMap());
+
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 
SYSTEM_TASK_ID, 20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", 
valueMapList)));
+    verify(timelineMetricsCache);
+  }
+
+  private Map<String, Object> getTestValueMap() {
+    Map<String, Object> valueMap = new HashMap<>();
+    valueMap.put("field1", 53);
+    valueMap.put("field2", 64.12);
+    return valueMap;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7ff7bcf3/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml
 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml
index 280fc42..7059b90 100644
--- 
a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml
+++ 
b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml
@@ -33,15 +33,52 @@
     <value-attributes>
       <overridable>false</overridable>
     </value-attributes>
-    <on-ambari-upgrade add="false"/>
+    <on-ambari-upgrade add="true"/>
   </property>
   <property>
     <name>topology.metrics.consumer.register</name>
-    <value>[{"class": 
"org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink", 
"parallelism.hint": 1, "expandMapType": true, "metricNameSeparator": ".", 
"whitelist": ["kafkaOffset\\..+/", "__complete-latency", "__process-latency", 
"__receive\\.population$", "__sendqueue\\.population$", "__execute-count", 
"__emit-count", "__ack-count", "__fail-count", "memory/heap\\.usedBytes$", 
"memory/nonHeap\\.usedBytes$", "GC/.+\\.count$", "GC/.+\\.timeMs$"]}]</value>
+    <value>[{"class": 
"org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink", 
"parallelism.hint": 1, "whitelist": ["kafkaOffset\\..+/", "__complete-latency", 
"__process-latency", "__receive\\.population$", "__sendqueue\\.population$", 
"__execute-count", "__emit-count", "__ack-count", "__fail-count", 
"memory/heap\\.usedBytes$", "memory/nonHeap\\.usedBytes$", "GC/.+\\.count$", 
"GC/.+\\.timeMs$"]}]</value>
+    <description></description>
+    <value-attributes>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>topology.metrics.aggregate.per.worker</name>
+    <value>true</value>
+    <description></description>
+    <value-attributes>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>topology.metrics.aggregate.metric.evict.secs</name>
+    <value>5</value>
     <description></description>
     <value-attributes>
       <overridable>false</overridable>
     </value-attributes>
-    <on-ambari-upgrade add="false"/>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>topology.metrics.expand.map.type</name>
+    <value>true</value>
+    <description></description>
+    <value-attributes>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
+  </property>
+  <property>
+    <name>topology.metrics.metric.name.separator</name>
+    <value>.</value>
+    <description></description>
+    <value-attributes>
+      <overridable>false</overridable>
+    </value-attributes>
+    <on-ambari-upgrade add="true"/>
   </property>
+
 </configuration>

Reply via email to