AMBARI-17725 : AMS Storm Sink: Storm topology level metrics should have prefix to distinguish cluster level metrics (Jungtaek Lim via avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/da2e6771 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/da2e6771 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/da2e6771 Branch: refs/heads/trunk Commit: da2e67710f8d39958a412a33f08a675b2d75863f Parents: 70c3ef1 Author: Aravindan Vijayan <[email protected]> Authored: Wed Jul 20 10:47:47 2016 -0700 Committer: Aravindan Vijayan <[email protected]> Committed: Wed Jul 20 10:47:47 2016 -0700 ---------------------------------------------------------------------- .../sink/storm/StormTimelineMetricsSink.java | 18 +++++++++++++++++- .../sink/storm/StormTimelineMetricsSinkTest.java | 12 ++++++------ .../sink/storm/StormTimelineMetricsSink.java | 18 +++++++++++++++++- .../sink/storm/StormTimelineMetricsSinkTest.java | 12 ++++++------ 4 files changed, 46 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/da2e6771/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index 9e6cc98..879cbfc 100644 --- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -40,6 +40,11 @@ import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT; public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer { + private static final String[] WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME = { ".", "_" }; + + // create String manually in order to not rely on Guava Joiner or having our own + private static final String JOINED_WARN_STRINGS_FOR_MESSAGE = "\".\", \"_\""; + public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId"; public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm"; @@ -121,6 +126,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem loadTruststore(trustStorePath, trustStoreType, trustStorePwd); } this.topologyName = removeNonce(topologyContext.getStormId()); + warnIfTopologyNameContainsWarnString(topologyName); } @Override @@ -237,13 +243,23 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem private String createMetricName(String componentId, String workerHost, int workerPort, int taskId, String attributeName) { // <topology name>.<component name>.<worker host>.<worker port>.<task id>.<metric name> - String metricName = topologyName + "." + componentId + "." + workerHost + "." + workerPort + + String metricName = "topology." + topologyName + "." + componentId + "." + workerHost + "." + workerPort + "." + taskId + "." + attributeName; // since '._' is treat as special character (separator) so it should be replaced return metricName.replace('_', '-'); } + private void warnIfTopologyNameContainsWarnString(String name) { + for (String warn : WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME) { + if (name.contains(warn)) { + LOG.warn("Topology name \"" + name + "\" contains \"" + warn + "\" which can be problematic for AMS."); + LOG.warn("Encouraged to not using any of these strings: " + JOINED_WARN_STRINGS_FOR_MESSAGE); + LOG.warn("Same suggestion applies to component name."); + } + } + } + public void setMetricsCache(TimelineMetricsCache metricsCache) { this.metricsCache = metricsCache; } http://git-wip-us.apache.org/repos/asf/ambari/blob/da2e6771/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java index 271c11f..2128e07 100644 --- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java @@ -43,7 +43,7 @@ public class StormTimelineMetricsSinkTest { @Test public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); - stormTimelineMetricsSink.setTopologyName("topology"); + stormTimelineMetricsSink.setTopologyName("topology1"); TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); replay(timelineMetricsCache); @@ -57,9 +57,9 @@ public class StormTimelineMetricsSinkTest { @Ignore // TODO: Fix for failover public void testNumericMetricMetricSubmission() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); - stormTimelineMetricsSink.setTopologyName("topology"); + stormTimelineMetricsSink.setTopologyName("topology1"); TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); - expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1")) + expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1")) .andReturn(new TimelineMetric()).once(); timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); expectLastCall().once(); @@ -75,11 +75,11 @@ public class StormTimelineMetricsSinkTest { @Ignore // TODO: Fix for failover public void testMapMetricMetricSubmission() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); - stormTimelineMetricsSink.setTopologyName("topology"); + stormTimelineMetricsSink.setTopologyName("topology1"); TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); - expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field1")) + expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field1")) .andReturn(new TimelineMetric()).once(); - expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field2")) + expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field2")) .andReturn(new TimelineMetric()).once(); timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); expectLastCall().once(); http://git-wip-us.apache.org/repos/asf/ambari/blob/da2e6771/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index dcc3192..91f78bc 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -40,6 +40,11 @@ import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT; public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer { + private static final String[] WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME = { ".", "_" }; + + // create String manually in order to not rely on Guava Joiner or having our own + private static final String JOINED_WARN_STRINGS_FOR_MESSAGE = "\".\", \"_\""; + public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId"; public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm"; @@ -121,6 +126,7 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem loadTruststore(trustStorePath, trustStoreType, trustStorePwd); } this.topologyName = removeNonce(topologyContext.getStormId()); + warnIfTopologyNameContainsWarnString(topologyName); } @Override @@ -237,13 +243,23 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem private String createMetricName(String componentId, String workerHost, int workerPort, int taskId, String attributeName) { // <topology name>.<component name>.<worker host>.<worker port>.<task id>.<metric name> - String metricName = topologyName + "." + componentId + "." + workerHost + "." + workerPort + + String metricName = "topology." + topologyName + "." + componentId + "." + workerHost + "." + workerPort + "." + taskId + "." + attributeName; // since '._' is treat as special character (separator) so it should be replaced return metricName.replace('_', '-'); } + private void warnIfTopologyNameContainsWarnString(String name) { + for (String warn : WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME) { + if (name.contains(warn)) { + LOG.warn("Topology name \"" + name + "\" contains \"" + warn + "\" which can be problematic for AMS."); + LOG.warn("Encouraged to not using any of these strings: " + JOINED_WARN_STRINGS_FOR_MESSAGE); + LOG.warn("Same suggestion applies to component name."); + } + } + } + public void setMetricsCache(TimelineMetricsCache metricsCache) { this.metricsCache = metricsCache; } http://git-wip-us.apache.org/repos/asf/ambari/blob/da2e6771/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java index c76197b..efe3022 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java @@ -43,7 +43,7 @@ public class StormTimelineMetricsSinkTest { @Test public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); - stormTimelineMetricsSink.setTopologyName("topology"); + stormTimelineMetricsSink.setTopologyName("topology1"); TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); replay(timelineMetricsCache); @@ -57,9 +57,9 @@ public class StormTimelineMetricsSinkTest { @Ignore // TODO: Fix for failover public void testNumericMetricMetricSubmission() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); - stormTimelineMetricsSink.setTopologyName("topology"); + stormTimelineMetricsSink.setTopologyName("topology1"); TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); - expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1")) + expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1")) .andReturn(new TimelineMetric()).once(); timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); expectLastCall().once(); @@ -75,11 +75,11 @@ public class StormTimelineMetricsSinkTest { @Ignore // TODO: Fix for failover public void testMapMetricMetricSubmission() throws InterruptedException, IOException { StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); - stormTimelineMetricsSink.setTopologyName("topology"); + stormTimelineMetricsSink.setTopologyName("topology1"); TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); - expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field1")) + expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field1")) .andReturn(new TimelineMetric()).once(); - expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field2")) + expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field2")) .andReturn(new TimelineMetric()).once(); timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); expectLastCall().once();
