AMBARI-17784 : AMS Storm Sink: remove redundant information from kafka offset metrics on storm-kafka (Jungtaek Lim via avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2e873b6c Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2e873b6c Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2e873b6c Branch: refs/heads/trunk Commit: 2e873b6c4a4a7dae9c33f8df3e5f97ac61f73499 Parents: da2e677 Author: Aravindan Vijayan <[email protected]> Authored: Wed Jul 20 10:48:09 2016 -0700 Committer: Aravindan Vijayan <[email protected]> Committed: Wed Jul 20 10:48:09 2016 -0700 ---------------------------------------------------------------------- .../sink/storm/StormTimelineMetricsSink.java | 36 +++++++++++++++++-- .../storm/StormTimelineMetricsSinkTest.java | 37 ++++++++++++++++++++ .../sink/storm/StormTimelineMetricsSink.java | 36 +++++++++++++++++-- .../storm/StormTimelineMetricsSinkTest.java | 37 ++++++++++++++++++++ 4 files changed, 142 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/2e873b6c/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index 879cbfc..0d3b770 100644 --- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -47,6 +47,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId"; public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm"; + public static final String METRIC_NAME_PREFIX_KAFKA_OFFSET = "kafkaOffset."; private String collectorUri; private TimelineMetricsCache metricsCache; @@ -138,8 +139,13 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint); for (DataPoint populatedDataPoint : populatedDataPoints) { - String metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost, - taskInfo.srcWorkerPort, taskInfo.srcTaskId, populatedDataPoint.name); + String metricName; + if (populatedDataPoint.name.startsWith(METRIC_NAME_PREFIX_KAFKA_OFFSET)) { + metricName = createKafkaOffsetMetricName(populatedDataPoint.name); + } else { + metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost, + taskInfo.srcWorkerPort, taskInfo.srcTaskId, populatedDataPoint.name); + } LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value); @@ -250,6 +256,32 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem return metricName.replace('_', '-'); } + private String createKafkaOffsetMetricName(String kafkaOffsetMetricName) { + // get rid of "kafkaOffset." + // <topic>/<metric name (starts with total)> or <topic>/partition_<partition_num>/<metricName> + String tempMetricName = kafkaOffsetMetricName.substring(METRIC_NAME_PREFIX_KAFKA_OFFSET.length()); + + String[] slashSplittedNames = tempMetricName.split("/"); + + if (slashSplittedNames.length == 1) { + // unknown metrics + throw new IllegalArgumentException("Unknown metrics for kafka offset metric: " + kafkaOffsetMetricName); + } + + String topic = slashSplittedNames[0]; + String metricName = "topology." + topologyName + ".kafka-topic." + topic; + if (slashSplittedNames.length > 2) { + // partition level + metricName = metricName + "." + slashSplittedNames[1] + "." + slashSplittedNames[2]; + } else { + // topic level + metricName = metricName + "." + slashSplittedNames[1]; + } + + // since '._' is treat as special character (separator) so it should be replaced + return metricName.replace('_', '-'); + } + private void warnIfTopologyNameContainsWarnString(String name) { for (String warn : WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME) { if (name.contains(warn)) { http://git-wip-us.apache.org/repos/asf/ambari/blob/2e873b6c/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java index 2128e07..3b3e236 100644 --- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.metrics2.sink.storm; +import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.METRIC_NAME_PREFIX_KAFKA_OFFSET; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createMockBuilder; import static org.easymock.EasyMock.createNiceMock; @@ -73,6 +74,42 @@ public class StormTimelineMetricsSinkTest { @Test @Ignore // TODO: Fix for failover + public void testTopicLevelKafkaOffsetMetricSubmission() throws InterruptedException, IOException { + StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); + stormTimelineMetricsSink.setTopologyName("topology1"); + TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); + expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.totalLatestTimeOffset")) + .andReturn(new TimelineMetric()).once(); + timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); + expectLastCall().once(); + stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); + replay(timelineMetricsCache); + stormTimelineMetricsSink.handleDataPoints( + new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60), + Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET + "topic1/totalLatestTimeOffset", 42))); + verify(timelineMetricsCache); + } + + @Test + @Ignore // TODO: Fix for failover + public void testPartitionLevelKafkaOffsetMetricSubmission() throws InterruptedException, IOException { + StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); + stormTimelineMetricsSink.setTopologyName("topology1"); + TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); + expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.partition-1.latestTimeOffset")) + .andReturn(new TimelineMetric()).once(); + timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); + expectLastCall().once(); + stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); + replay(timelineMetricsCache); + stormTimelineMetricsSink.handleDataPoints( + new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60), + Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET + "topic1/partition_1/latestTimeOffset", 42))); + verify(timelineMetricsCache); + } + + @Test + @Ignore // TODO: Fix for failover public void testMapMetricMetricSubmission() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); stormTimelineMetricsSink.setTopologyName("topology1"); http://git-wip-us.apache.org/repos/asf/ambari/blob/2e873b6c/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index 91f78bc..3a4289b 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -47,6 +47,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId"; public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm"; + public static final String METRIC_NAME_PREFIX_KAFKA_OFFSET = "kafkaOffset."; private String collectorUri; private TimelineMetricsCache metricsCache; @@ -138,8 +139,13 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint); for (DataPoint populatedDataPoint : populatedDataPoints) { - String metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost, - taskInfo.srcWorkerPort, taskInfo.srcTaskId, populatedDataPoint.name); + String metricName; + if (populatedDataPoint.name.startsWith(METRIC_NAME_PREFIX_KAFKA_OFFSET)) { + metricName = createKafkaOffsetMetricName(populatedDataPoint.name); + } else { + metricName = createMetricName(taskInfo.srcComponentId, taskInfo.srcWorkerHost, + taskInfo.srcWorkerPort, taskInfo.srcTaskId, populatedDataPoint.name); + } LOG.debug("populated datapoint: " + metricName + " = " + populatedDataPoint.value); @@ -250,6 +256,32 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem return metricName.replace('_', '-'); } + private String createKafkaOffsetMetricName(String kafkaOffsetMetricName) { + // get rid of "kafkaOffset." + // <topic>/<metric name (starts with total)> or <topic>/partition_<partition_num>/<metricName> + String tempMetricName = kafkaOffsetMetricName.substring(METRIC_NAME_PREFIX_KAFKA_OFFSET.length()); + + String[] slashSplittedNames = tempMetricName.split("/"); + + if (slashSplittedNames.length == 1) { + // unknown metrics + throw new IllegalArgumentException("Unknown metrics for kafka offset metric: " + kafkaOffsetMetricName); + } + + String topic = slashSplittedNames[0]; + String metricName = "topology." + topologyName + ".kafka-topic." + topic; + if (slashSplittedNames.length > 2) { + // partition level + metricName = metricName + "." + slashSplittedNames[1] + "." + slashSplittedNames[2]; + } else { + // topic level + metricName = metricName + "." + slashSplittedNames[1]; + } + + // since '._' is treat as special character (separator) so it should be replaced + return metricName.replace('_', '-'); + } + private void warnIfTopologyNameContainsWarnString(String name) { for (String warn : WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME) { if (name.contains(warn)) { http://git-wip-us.apache.org/repos/asf/ambari/blob/2e873b6c/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java index efe3022..fadb00c 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.metrics2.sink.storm; +import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.METRIC_NAME_PREFIX_KAFKA_OFFSET; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createMockBuilder; import static org.easymock.EasyMock.createNiceMock; @@ -73,6 +74,42 @@ public class StormTimelineMetricsSinkTest { @Test @Ignore // TODO: Fix for failover + public void testTopicLevelKafkaOffsetMetricSubmission() throws InterruptedException, IOException { + StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); + stormTimelineMetricsSink.setTopologyName("topology1"); + TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); + expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.totalLatestTimeOffset")) + .andReturn(new TimelineMetric()).once(); + timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); + expectLastCall().once(); + stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); + replay(timelineMetricsCache); + stormTimelineMetricsSink.handleDataPoints( + new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60), + Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET + "topic1/totalLatestTimeOffset", 42))); + verify(timelineMetricsCache); + } + + @Test + @Ignore // TODO: Fix for failover + public void testPartitionLevelKafkaOffsetMetricSubmission() throws InterruptedException, IOException { + StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); + stormTimelineMetricsSink.setTopologyName("topology1"); + TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); + expect(timelineMetricsCache.getTimelineMetric("topology.topology1.kafka-topic.topic1.partition-1.latestTimeOffset")) + .andReturn(new TimelineMetric()).once(); + timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); + expectLastCall().once(); + stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); + replay(timelineMetricsCache); + stormTimelineMetricsSink.handleDataPoints( + new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60), + Collections.singleton(new IMetricsConsumer.DataPoint(METRIC_NAME_PREFIX_KAFKA_OFFSET + "topic1/partition_1/latestTimeOffset", 42))); + verify(timelineMetricsCache); + } + + @Test + @Ignore // TODO: Fix for failover public void testMapMetricMetricSubmission() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); stormTimelineMetricsSink.setTopologyName("topology1");
