Repository: ambari Updated Branches: refs/heads/trunk 4aa1639b4 -> 95426f795
AMBARI-16946 Storm Metrics Sink has high chance to discard some datapoints(Jungtaek Lim via avijayan) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/95426f79 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/95426f79 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/95426f79 Branch: refs/heads/trunk Commit: 95426f795444df20fb4ee9ce087f9191f0a4868e Parents: 4aa1639 Author: Aravindan Vijayan <[email protected]> Authored: Thu Jun 9 09:59:45 2016 -0700 Committer: Aravindan Vijayan <[email protected]> Committed: Thu Jun 9 10:09:57 2016 -0700 ---------------------------------------------------------------------- .../sink/storm/StormTimelineMetricsSink.java | 92 ++++++++++++++++---- .../storm/StormTimelineMetricsSinkTest.java | 28 +++++- 2 files changed, 104 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/95426f79/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index 02f5598..eb572b3 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -21,12 +21,10 @@ package org.apache.hadoop.metrics2.sink.storm; import backtype.storm.metric.api.IMetricsConsumer; import backtype.storm.task.IErrorReporter; import backtype.storm.task.TopologyContext; - import org.apache.commons.lang3.ClassUtils; -import org.apache.commons.lang3.math.NumberUtils; +import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration; @@ -38,7 +36,8 @@ import java.util.Collection; import java.util.List; import java.util.Map; -import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.*; +import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS; +import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT; public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer { private String collectorUri; @@ -49,6 +48,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem private String zkQuorum; private String protocol; private String port; + private String topologyName; @Override protected String getCollectorUri(String host) { @@ -115,20 +115,26 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim(); loadTruststore(trustStorePath, trustStoreType, trustStorePwd); } + this.topologyName = removeNonce(topologyContext.getStormId()); } @Override public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) { List<TimelineMetric> metricList = new ArrayList<TimelineMetric>(); + for (DataPoint dataPoint : dataPoints) { - if (dataPoint.value != null && NumberUtils.isNumber(dataPoint.value.toString())) { - LOG.debug(dataPoint.name + " = " + dataPoint.value); - TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp, - taskInfo.srcComponentId, dataPoint.name, dataPoint.value.toString()); + LOG.debug(dataPoint.name + " = " + dataPoint.value); + List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint); + + for (DataPoint populatedDataPoint : populatedDataPoints) { + TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000, + taskInfo.srcComponentId, taskInfo.srcTaskId, taskInfo.srcWorkerHost, populatedDataPoint.name, + Double.valueOf(populatedDataPoint.value.toString())); + // Put intermediate values into the cache until it is time to send metricsCache.putTimelineMetric(timelineMetric); - TimelineMetric cachedMetric = metricsCache.getTimelineMetric(dataPoint.name); + TimelineMetric cachedMetric = metricsCache.getTimelineMetric(timelineMetric.getMetricName()); if (cachedMetric != null) { metricList.add(cachedMetric); @@ -139,6 +145,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem if (!metricList.isEmpty()) { TimelineMetrics timelineMetrics = new TimelineMetrics(); timelineMetrics.setMetrics(metricList); + try { emitMetrics(timelineMetrics); } catch (UnableToConnectException uce) { @@ -152,20 +159,75 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem LOG.info("Stopping Storm Metrics Sink"); } - private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName, String attributeValue) { + private String removeNonce(String topologyId) { + return topologyId.substring(0, topologyId.substring(0, topologyId.lastIndexOf("-")).lastIndexOf("-")); + } + + private List<DataPoint> populateDataPoints(DataPoint dataPoint) { + List<DataPoint> dataPoints = new ArrayList<>(); + + if (dataPoint.value == null) { + LOG.warn("Data point with name " + dataPoint.name + " is null. Discarding." + dataPoint.name); + } else if (dataPoint.value instanceof Map) { + Map<String, Object> dataMap = (Map<String, Object>) dataPoint.value; + + for (Map.Entry<String, Object> entry : dataMap.entrySet()) { + Double value = convertValueToDouble(entry.getKey(), entry.getValue()); + if (value != null) { + dataPoints.add(new DataPoint(dataPoint.name + "." + entry.getKey(), value)); + } + } + } else { + Double value = convertValueToDouble(dataPoint.name, dataPoint.value); + if (value != null) { + dataPoints.add(new DataPoint(dataPoint.name, value)); + } + } + + return dataPoints; + } + + private Double convertValueToDouble(String metricName, Object value) { + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } else if (value instanceof String) { + try { + return Double.parseDouble((String) value); + } catch (NumberFormatException e) { + LOG.warn("Data point with name " + metricName + " doesn't have number format value " + + value + ". Discarding."); + } + + return null; + } else { + LOG.warn("Data point with name " + metricName + " has value " + value + + " which is not supported. Discarding."); + + return null; + } + } + + private TimelineMetric createTimelineMetric(long currentTimeMillis, String componentId, int taskId, String hostName, + String attributeName, Double attributeValue) { TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(attributeName); - timelineMetric.setHostName(hostname); - timelineMetric.setAppId(component); + timelineMetric.setMetricName(createMetricName(componentId, taskId, attributeName)); + timelineMetric.setHostName(hostName); + timelineMetric.setAppId(topologyName); timelineMetric.setStartTime(currentTimeMillis); timelineMetric.setType(ClassUtils.getShortCanonicalName( attributeValue, "Number")); - timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue)); + timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue); return timelineMetric; } + private String createMetricName(String componentId, int taskId, String attributeName) { + String metricName = componentId + "." + taskId + "." + attributeName; + // since '._' is treat as special character (separator) so it should be replaced + return metricName.replace('_', '-'); + } + public void setMetricsCache(TimelineMetricsCache metricsCache) { this.metricsCache = metricsCache; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/95426f79/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java index 8171a4d..c4b54b4 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java @@ -29,6 +29,8 @@ import static org.easymock.EasyMock.verify; import java.io.IOException; import java.net.SocketAddress; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; @@ -55,7 +57,8 @@ public class StormTimelineMetricsSinkTest { public void testNumericMetricMetricSubmission() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); - expect(timelineMetricsCache.getTimelineMetric("key1")).andReturn(new TimelineMetric()).once(); + expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1")) + .andReturn(new TimelineMetric()).once(); timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); expectLastCall().once(); stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); @@ -65,4 +68,27 @@ public class StormTimelineMetricsSinkTest { Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42))); verify(timelineMetricsCache); } + + @Test + @Ignore // TODO: Fix for failover + public void testMapMetricMetricSubmission() throws InterruptedException, IOException { + StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); + TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); + expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field1")) + .andReturn(new TimelineMetric()).once(); + expect(timelineMetricsCache.getTimelineMetric("testComponent.42.key1.field2")) + .andReturn(new TimelineMetric()).once(); + timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); + expectLastCall().once(); + stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); + replay(timelineMetricsCache); + + Map<String, Object> valueMap = new HashMap<>(); + valueMap.put("field1", 53); + valueMap.put("field2", 64.12); + stormTimelineMetricsSink.handleDataPoints( + new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60), + Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMap))); + verify(timelineMetricsCache); + } }
