Repository: ambari
Updated Branches:
  refs/heads/trunk 4aa1639b4 -> 95426f795


AMBARI-16946 Storm Metrics Sink has high chance to discard some 
datapoints(Jungtaek Lim via avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/95426f79
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/95426f79
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/95426f79

Branch: refs/heads/trunk
Commit: 95426f795444df20fb4ee9ce087f9191f0a4868e
Parents: 4aa1639
Author: Aravindan Vijayan <[email protected]>
Authored: Thu Jun 9 09:59:45 2016 -0700
Committer: Aravindan Vijayan <[email protected]>
Committed: Thu Jun 9 10:09:57 2016 -0700

----------------------------------------------------------------------
 .../sink/storm/StormTimelineMetricsSink.java    | 92 ++++++++++++++++----
 .../storm/StormTimelineMetricsSinkTest.java     | 28 +++++-
 2 files changed, 104 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/95426f79/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 02f5598..eb572b3 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -21,12 +21,10 @@ package org.apache.hadoop.metrics2.sink.storm;
 import backtype.storm.metric.api.IMetricsConsumer;
 import backtype.storm.task.IErrorReporter;
 import backtype.storm.task.TopologyContext;
-
 import org.apache.commons.lang3.ClassUtils;
-import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
@@ -38,7 +36,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import static 
org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.*;
+import static 
org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS;
+import static 
org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
 
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink 
implements IMetricsConsumer {
   private String collectorUri;
@@ -49,6 +48,7 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
   private String zkQuorum;
   private String protocol;
   private String port;
+  private String topologyName;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -115,20 +115,26 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
       String trustStorePwd = 
configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
       loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
     }
+    this.topologyName = removeNonce(topologyContext.getStormId());
   }
 
   @Override
   public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> 
dataPoints) {
     List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+
     for (DataPoint dataPoint : dataPoints) {
-      if (dataPoint.value != null && 
NumberUtils.isNumber(dataPoint.value.toString())) {
-        LOG.debug(dataPoint.name + " = " + dataPoint.value);
-        TimelineMetric timelineMetric = 
createTimelineMetric(taskInfo.timestamp,
-            taskInfo.srcComponentId, dataPoint.name, 
dataPoint.value.toString());
+      LOG.debug(dataPoint.name + " = " + dataPoint.value);
+      List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint);
+
+      for (DataPoint populatedDataPoint : populatedDataPoints) {
+        TimelineMetric timelineMetric = 
createTimelineMetric(taskInfo.timestamp * 1000,
+            taskInfo.srcComponentId, taskInfo.srcTaskId, 
taskInfo.srcWorkerHost, populatedDataPoint.name,
+            Double.valueOf(populatedDataPoint.value.toString()));
+
         // Put intermediate values into the cache until it is time to send
         metricsCache.putTimelineMetric(timelineMetric);
 
-        TimelineMetric cachedMetric = 
metricsCache.getTimelineMetric(dataPoint.name);
+        TimelineMetric cachedMetric = 
metricsCache.getTimelineMetric(timelineMetric.getMetricName());
 
         if (cachedMetric != null) {
           metricList.add(cachedMetric);
@@ -139,6 +145,7 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
     if (!metricList.isEmpty()) {
       TimelineMetrics timelineMetrics = new TimelineMetrics();
       timelineMetrics.setMetrics(metricList);
+
       try {
         emitMetrics(timelineMetrics);
       } catch (UnableToConnectException uce) {
@@ -152,20 +159,75 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
     LOG.info("Stopping Storm Metrics Sink");
   }
 
-  private TimelineMetric createTimelineMetric(long currentTimeMillis, String 
component, String attributeName, String attributeValue) {
+  private String removeNonce(String topologyId) {
+    return topologyId.substring(0, topologyId.substring(0, 
topologyId.lastIndexOf("-")).lastIndexOf("-"));
+  }
+
+  private List<DataPoint> populateDataPoints(DataPoint dataPoint) {
+    List<DataPoint> dataPoints = new ArrayList<>();
+
+    if (dataPoint.value == null) {
+      LOG.warn("Data point with name " + dataPoint.name + " is null. 
Discarding." + dataPoint.name);
+    } else if (dataPoint.value instanceof Map) {
+      Map<String, Object> dataMap = (Map<String, Object>) dataPoint.value;
+
+      for (Map.Entry<String, Object> entry : dataMap.entrySet()) {
+        Double value = convertValueToDouble(entry.getKey(), entry.getValue());
+        if (value != null) {
+          dataPoints.add(new DataPoint(dataPoint.name + "." + entry.getKey(), 
value));
+        }
+      }
+    } else {
+      Double value = convertValueToDouble(dataPoint.name, dataPoint.value);
+      if (value != null) {
+        dataPoints.add(new DataPoint(dataPoint.name, value));
+      }
+    }
+
+    return dataPoints;
+  }
+
+  private Double convertValueToDouble(String metricName, Object value) {
+    if (value instanceof Number) {
+      return ((Number) value).doubleValue();
+    } else if (value instanceof String) {
+      try {
+        return Double.parseDouble((String) value);
+      } catch (NumberFormatException e) {
+        LOG.warn("Data point with name " + metricName + " doesn't have number 
format value " +
+            value + ". Discarding.");
+      }
+
+      return null;
+    } else {
+      LOG.warn("Data point with name " + metricName + " has value " + value +
+          " which is not supported. Discarding.");
+
+      return null;
+    }
+  }
+
+  private TimelineMetric createTimelineMetric(long currentTimeMillis, String 
componentId, int taskId, String hostName,
+      String attributeName, Double attributeValue) {
     TimelineMetric timelineMetric = new TimelineMetric();
-    timelineMetric.setMetricName(attributeName);
-    timelineMetric.setHostName(hostname);
-    timelineMetric.setAppId(component);
+    timelineMetric.setMetricName(createMetricName(componentId, taskId, 
attributeName));
+    timelineMetric.setHostName(hostName);
+    timelineMetric.setAppId(topologyName);
     timelineMetric.setStartTime(currentTimeMillis);
     timelineMetric.setType(ClassUtils.getShortCanonicalName(
         attributeValue, "Number"));
-    timelineMetric.getMetricValues().put(currentTimeMillis, 
Double.parseDouble(attributeValue));
+    timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue);
     return timelineMetric;
   }
 
+  private String createMetricName(String componentId, int taskId, String 
attributeName) {
+    String metricName = componentId + "." + taskId + "." + attributeName;
+    // since '._' is treat as special character (separator) so it should be 
replaced
+    return metricName.replace('_', '-');
+  }
+
   public void setMetricsCache(TimelineMetricsCache metricsCache) {
     this.metricsCache = metricsCache;
   }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/95426f79/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
 
b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index 8171a4d..c4b54b4 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -29,6 +29,8 @@ import static org.easymock.EasyMock.verify;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
@@ -55,7 +57,8 @@ public class StormTimelineMetricsSinkTest {
   public void testNumericMetricMetricSubmission() throws InterruptedException, 
IOException {
     StormTimelineMetricsSink stormTimelineMetricsSink = new 
StormTimelineMetricsSink();
     TimelineMetricsCache timelineMetricsCache = 
createNiceMock(TimelineMetricsCache.class);
-    expect(timelineMetricsCache.getTimelineMetric("key1")).andReturn(new 
TimelineMetric()).once();
+    expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1"))
+        .andReturn(new TimelineMetric()).once();
     timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
     expectLastCall().once();
     stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
@@ -65,4 +68,27 @@ public class StormTimelineMetricsSinkTest {
         Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42)));
     verify(timelineMetricsCache);
   }
+
+  @Test
+  @Ignore // TODO: Fix for failover
+  public void testMapMetricMetricSubmission() throws InterruptedException, 
IOException {
+    StormTimelineMetricsSink stormTimelineMetricsSink = new 
StormTimelineMetricsSink();
+    TimelineMetricsCache timelineMetricsCache = 
createNiceMock(TimelineMetricsCache.class);
+    
expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field1"))
+        .andReturn(new TimelineMetric()).once();
+    
expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field2"))
+        .andReturn(new TimelineMetric()).once();
+    timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
+    expectLastCall().once();
+    stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
+    replay(timelineMetricsCache);
+
+    Map<String, Object> valueMap = new HashMap<>();
+    valueMap.put("field1", 53);
+    valueMap.put("field2", 64.12);
+    stormTimelineMetricsSink.handleDataPoints(
+        new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 
20000L, 60),
+        Collections.singleton(new IMetricsConsumer.DataPoint("key1", 
valueMap)));
+    verify(timelineMetricsCache);
+  }
 }

Reply via email to