Repository: ambari Updated Branches: refs/heads/branch-2.4 a08c207b5 -> 0b3fc3f1c
AMBARI-17249 : Storm metrics sink should include worker host and port to metric name when metrics are coming from SystemBolt (Jungtaek Lim via avijayan) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/0b3fc3f1 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/0b3fc3f1 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/0b3fc3f1 Branch: refs/heads/branch-2.4 Commit: 0b3fc3f1cb079d4d99590364eafa9cfd2d0b91ff Parents: a08c207 Author: Aravindan Vijayan <[email protected]> Authored: Thu Jun 30 11:14:14 2016 -0700 Committer: Aravindan Vijayan <[email protected]> Committed: Thu Jun 30 11:14:14 2016 -0700 ---------------------------------------------------------------------- .../sink/storm/StormTimelineMetricsSink.java | 27 ++++++++++++++++---- .../storm/StormTimelineMetricsSinkTest.java | 18 +++++++++++++ 2 files changed, 40 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/0b3fc3f1/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index 89906d8..6ab12e1 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -40,6 +40,8 @@ import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT; public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer { + public static final int SYSTEM_BOLT_TASK_ID = -1; + private String collectorUri; private TimelineMetricsCache metricsCache; private String hostname; @@ -96,9 +98,17 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint); for (DataPoint populatedDataPoint : populatedDataPoints) { - TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000, - taskInfo.srcComponentId, taskInfo.srcTaskId, taskInfo.srcWorkerHost, populatedDataPoint.name, - Double.valueOf(populatedDataPoint.value.toString())); + String metricName; + if (taskInfo.srcTaskId == SYSTEM_BOLT_TASK_ID) { + metricName = createMetricNameForSystemBolt(taskInfo, populatedDataPoint.name); + } else { + metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcTaskId, populatedDataPoint.name); + } + + LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value); + + TimelineMetric timelineMetric = createTimelineMetric(taskInfo.timestamp * 1000, taskInfo.srcWorkerHost, + metricName, Double.valueOf(populatedDataPoint.value.toString())); // Put intermediate values into the cache until it is time to send metricsCache.putTimelineMetric(timelineMetric); @@ -176,10 +186,10 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem } } - private TimelineMetric createTimelineMetric(long currentTimeMillis, String componentId, int taskId, String hostName, + private TimelineMetric createTimelineMetric(long currentTimeMillis, String hostName, String attributeName, Double attributeValue) { TimelineMetric timelineMetric = new TimelineMetric(); - timelineMetric.setMetricName(createMetricName(componentId, taskId, attributeName)); + timelineMetric.setMetricName(attributeName); timelineMetric.setHostName(hostName); timelineMetric.setAppId(topologyName); timelineMetric.setStartTime(currentTimeMillis); @@ -195,6 +205,13 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem return metricName.replace('_', '-'); } + private String createMetricNameForSystemBolt(TaskInfo taskInfo, String attributeName) { + String metricName = taskInfo.srcComponentId + "." + taskInfo.srcWorkerHost + "." + + taskInfo.srcWorkerPort + "." + attributeName; + // since '._' is treat as special character (separator) so it should be replaced + return metricName.replace('_', '-'); + } + public void setMetricsCache(TimelineMetricsCache metricsCache) { this.metricsCache = metricsCache; } http://git-wip-us.apache.org/repos/asf/ambari/blob/0b3fc3f1/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java index 557d088..e582a95 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.metrics2.sink.storm; +import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.SYSTEM_BOLT_TASK_ID; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createNiceMock; import static org.easymock.EasyMock.expect; @@ -68,6 +69,23 @@ public class StormTimelineMetricsSinkTest { @Test @Ignore // TODO: Fix for failover + public void testNumericMetricFromSystemBoltMetricSubmission() throws InterruptedException, IOException { + StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); + TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); + expect(timelineMetricsCache.getTimelineMetric("testComponent.localhost.1234.key1")) + .andReturn(new TimelineMetric()).once(); + timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); + expectLastCall().once(); + stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); + replay(timelineMetricsCache); + stormTimelineMetricsSink.handleDataPoints( + new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_BOLT_TASK_ID, 20000L, 60), + Collections.singleton(new IMetricsConsumer.DataPoint("key1", 42))); + verify(timelineMetricsCache); + } + + @Test + @Ignore // TODO: Fix for failover public void testMapMetricMetricSubmission() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class);
