AMBARI-17725 : AMS Storm Sink: Storm topology level metrics should have prefix 
to distinguish cluster level metrics (Jungtaek Lim via avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/da2e6771
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/da2e6771
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/da2e6771

Branch: refs/heads/trunk
Commit: da2e67710f8d39958a412a33f08a675b2d75863f
Parents: 70c3ef1
Author: Aravindan Vijayan <[email protected]>
Authored: Wed Jul 20 10:47:47 2016 -0700
Committer: Aravindan Vijayan <[email protected]>
Committed: Wed Jul 20 10:47:47 2016 -0700

----------------------------------------------------------------------
 .../sink/storm/StormTimelineMetricsSink.java      | 18 +++++++++++++++++-
 .../sink/storm/StormTimelineMetricsSinkTest.java  | 12 ++++++------
 .../sink/storm/StormTimelineMetricsSink.java      | 18 +++++++++++++++++-
 .../sink/storm/StormTimelineMetricsSinkTest.java  | 12 ++++++------
 4 files changed, 46 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/da2e6771/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 9e6cc98..879cbfc 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -40,6 +40,11 @@ import static 
org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach
 import static 
org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
 
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink 
implements IMetricsConsumer {
+  private static final String[] WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME = 
{ ".", "_" };
+
+  // create String manually in order to not rely on Guava Joiner or having our 
own
+  private static final String JOINED_WARN_STRINGS_FOR_MESSAGE = "\".\", \"_\"";
+
   public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId";
   public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm";
 
@@ -121,6 +126,7 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
       loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
     }
     this.topologyName = removeNonce(topologyContext.getStormId());
+    warnIfTopologyNameContainsWarnString(topologyName);
   }
 
   @Override
@@ -237,13 +243,23 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
   private String createMetricName(String componentId, String workerHost, int 
workerPort, int taskId,
       String attributeName) {
     // <topology name>.<component name>.<worker host>.<worker port>.<task 
id>.<metric name>
-    String metricName = topologyName + "." + componentId + "." + workerHost + 
"." + workerPort +
+    String metricName = "topology." + topologyName + "." + componentId + "." + 
workerHost + "." + workerPort +
         "." + taskId + "." + attributeName;
 
     // since '._' is treat as special character (separator) so it should be 
replaced
     return metricName.replace('_', '-');
   }
 
+  private void warnIfTopologyNameContainsWarnString(String name) {
+    for (String warn : WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME) {
+      if (name.contains(warn)) {
+        LOG.warn("Topology name \"" + name + "\" contains \"" + warn + "\" 
which can be problematic for AMS.");
+        LOG.warn("Encouraged to not using any of these strings: " + 
JOINED_WARN_STRINGS_FOR_MESSAGE);
+        LOG.warn("Same suggestion applies to component name.");
+      }
+    }
+  }
+
   public void setMetricsCache(TimelineMetricsCache metricsCache) {
     this.metricsCache = metricsCache;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/da2e6771/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
 
b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index 271c11f..2128e07 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -43,7 +43,7 @@ public class StormTimelineMetricsSinkTest {
   @Test
   public void testNonNumericMetricMetricExclusion() throws 
InterruptedException, IOException {
     StormTimelineMetricsSink stormTimelineMetricsSink = new 
StormTimelineMetricsSink();
-    stormTimelineMetricsSink.setTopologyName("topology");
+    stormTimelineMetricsSink.setTopologyName("topology1");
     TimelineMetricsCache timelineMetricsCache = 
createNiceMock(TimelineMetricsCache.class);
     stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
     replay(timelineMetricsCache);
@@ -57,9 +57,9 @@ public class StormTimelineMetricsSinkTest {
   @Ignore // TODO: Fix for failover
   public void testNumericMetricMetricSubmission() throws InterruptedException, 
IOException {
     StormTimelineMetricsSink stormTimelineMetricsSink = new 
StormTimelineMetricsSink();
-    stormTimelineMetricsSink.setTopologyName("topology");
+    stormTimelineMetricsSink.setTopologyName("topology1");
     TimelineMetricsCache timelineMetricsCache = 
createNiceMock(TimelineMetricsCache.class);
-    
expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1"))
+    
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1"))
         .andReturn(new TimelineMetric()).once();
     timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
     expectLastCall().once();
@@ -75,11 +75,11 @@ public class StormTimelineMetricsSinkTest {
   @Ignore // TODO: Fix for failover
   public void testMapMetricMetricSubmission() throws InterruptedException, 
IOException {
     StormTimelineMetricsSink stormTimelineMetricsSink = new 
StormTimelineMetricsSink();
-    stormTimelineMetricsSink.setTopologyName("topology");
+    stormTimelineMetricsSink.setTopologyName("topology1");
     TimelineMetricsCache timelineMetricsCache = 
createNiceMock(TimelineMetricsCache.class);
-    
expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field1"))
+    
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field1"))
         .andReturn(new TimelineMetric()).once();
-    
expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field2"))
+    
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field2"))
         .andReturn(new TimelineMetric()).once();
     timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
     expectLastCall().once();

http://git-wip-us.apache.org/repos/asf/ambari/blob/da2e6771/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index dcc3192..91f78bc 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -40,6 +40,11 @@ import static 
org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach
 import static 
org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
 
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink 
implements IMetricsConsumer {
+  private static final String[] WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME = 
{ ".", "_" };
+
+  // create String manually in order to not rely on Guava Joiner or having our 
own
+  private static final String JOINED_WARN_STRINGS_FOR_MESSAGE = "\".\", \"_\"";
+
   public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId";
   public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm";
 
@@ -121,6 +126,7 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
       loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
     }
     this.topologyName = removeNonce(topologyContext.getStormId());
+    warnIfTopologyNameContainsWarnString(topologyName);
   }
 
   @Override
@@ -237,13 +243,23 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
   private String createMetricName(String componentId, String workerHost, int 
workerPort, int taskId,
       String attributeName) {
     // <topology name>.<component name>.<worker host>.<worker port>.<task 
id>.<metric name>
-    String metricName = topologyName + "." + componentId + "." + workerHost + 
"." + workerPort +
+    String metricName = "topology." + topologyName + "." + componentId + "." + 
workerHost + "." + workerPort +
         "." + taskId + "." + attributeName;
 
     // since '._' is treat as special character (separator) so it should be 
replaced
     return metricName.replace('_', '-');
   }
 
+  private void warnIfTopologyNameContainsWarnString(String name) {
+    for (String warn : WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME) {
+      if (name.contains(warn)) {
+        LOG.warn("Topology name \"" + name + "\" contains \"" + warn + "\" 
which can be problematic for AMS.");
+        LOG.warn("Encouraged to not using any of these strings: " + 
JOINED_WARN_STRINGS_FOR_MESSAGE);
+        LOG.warn("Same suggestion applies to component name.");
+      }
+    }
+  }
+
   public void setMetricsCache(TimelineMetricsCache metricsCache) {
     this.metricsCache = metricsCache;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/da2e6771/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
 
b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index c76197b..efe3022 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -43,7 +43,7 @@ public class StormTimelineMetricsSinkTest {
   @Test
   public void testNonNumericMetricMetricExclusion() throws 
InterruptedException, IOException {
     StormTimelineMetricsSink stormTimelineMetricsSink = new 
StormTimelineMetricsSink();
-    stormTimelineMetricsSink.setTopologyName("topology");
+    stormTimelineMetricsSink.setTopologyName("topology1");
     TimelineMetricsCache timelineMetricsCache = 
createNiceMock(TimelineMetricsCache.class);
     stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache);
     replay(timelineMetricsCache);
@@ -57,9 +57,9 @@ public class StormTimelineMetricsSinkTest {
   @Ignore // TODO: Fix for failover
   public void testNumericMetricMetricSubmission() throws InterruptedException, 
IOException {
     StormTimelineMetricsSink stormTimelineMetricsSink = new 
StormTimelineMetricsSink();
-    stormTimelineMetricsSink.setTopologyName("topology");
+    stormTimelineMetricsSink.setTopologyName("topology1");
     TimelineMetricsCache timelineMetricsCache = 
createNiceMock(TimelineMetricsCache.class);
-    
expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1"))
+    
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1"))
         .andReturn(new TimelineMetric()).once();
     timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
     expectLastCall().once();
@@ -75,11 +75,11 @@ public class StormTimelineMetricsSinkTest {
   @Ignore // TODO: Fix for failover
   public void testMapMetricMetricSubmission() throws InterruptedException, 
IOException {
     StormTimelineMetricsSink stormTimelineMetricsSink = new 
StormTimelineMetricsSink();
-    stormTimelineMetricsSink.setTopologyName("topology");
+    stormTimelineMetricsSink.setTopologyName("topology1");
     TimelineMetricsCache timelineMetricsCache = 
createNiceMock(TimelineMetricsCache.class);
-    
expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field1"))
+    
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field1"))
         .andReturn(new TimelineMetric()).once();
-    
expect(timelineMetricsCache.getTimelineMetric("topology.testComponent.localhost.1234.42.key1.field2"))
+    
expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234.42.key1.field2"))
         .andReturn(new TimelineMetric()).once();
     timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class));
     expectLastCall().once();

Reply via email to