Repository: ambari Updated Branches: refs/heads/trunk a1251ecc9 -> 7ff7bcf3a
AMBARI-17909 AMS Storm Sink: apply change of Storm metrics improvement - worker level aggregation. (Jungtaek Lim via avijayan) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7ff7bcf3 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7ff7bcf3 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7ff7bcf3 Branch: refs/heads/trunk Commit: 7ff7bcf3a2d40693cb0a5881661236e07335973a Parents: a1251ec Author: Aravindan Vijayan <[email protected]> Authored: Thu Jul 28 09:01:40 2016 -0700 Committer: Aravindan Vijayan <[email protected]> Committed: Thu Jul 28 09:56:29 2016 -0700 ---------------------------------------------------------------------- .../sink/storm/StormTimelineMetricsSink.java | 87 +++++++++++++++++++- .../storm/StormTimelineMetricsSinkTest.java | 58 ++++++++++++- .../0.1.0/configuration/storm-site.xml | 43 +++++++++- 3 files changed, 181 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/7ff7bcf3/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index 3a4289b..f6531c8 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -18,6 +18,7 @@ package org.apache.hadoop.metrics2.sink.storm; +import org.apache.storm.Constants; import org.apache.storm.metric.api.IMetricsConsumer; import org.apache.storm.task.IErrorReporter; import org.apache.storm.task.TopologyContext; @@ -33,6 +34,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,11 +42,17 @@ import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCach import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT; public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer { + // covers built-in metrics but still not beauty + private static final String[] METRIC_LOWERCASE_SUBSTRINGS_AGGREGATE_AVERAGE = { "-latency", "timems", "time_ms", "rate_secs", "timesecs" }; + private static final String[] WARN_STRINGS_FOR_TOPOLOGY_OR_COMPONENT_NAME = { ".", "_" }; // create String manually in order to not rely on Guava Joiner or having our own private static final String JOINED_WARN_STRINGS_FOR_MESSAGE = "\".\", \"_\""; + // it's safe since it doesn't exceed the boundary + public static final int SYSTEM_TASK_ID = (int) Constants.SYSTEM_TASK_ID; + public static final String CLUSTER_REPORTER_APP_ID = "clusterReporterAppId"; public static final String DEFAULT_CLUSTER_REPORTER_APP_ID = "storm"; public static final String METRIC_NAME_PREFIX_KAFKA_OFFSET = "kafkaOffset."; @@ -136,7 +144,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem for (DataPoint dataPoint : dataPoints) { LOG.debug(dataPoint.name + " = " + dataPoint.value); - List<DataPoint> populatedDataPoints = populateDataPoints(dataPoint); + + List<DataPoint> populatedDataPoints; + if (taskInfo.srcTaskId == SYSTEM_TASK_ID && dataPoint.value instanceof Collection) { + // worker level aggregated metrics - aggregation should be handled + List<DataPoint> populatedBeforeAggregationDataPoints = populateAllDataPointValues(dataPoint); + Map<String, List<Double>> metricNameKeyedValues = groupByMetricNameDataPoints(populatedBeforeAggregationDataPoints); + populatedDataPoints = applyAggregationToMetricNameKeyedDataPoints(metricNameKeyedValues); + } else { + populatedDataPoints = populateDataPoints(dataPoint); + } for (DataPoint populatedDataPoint : populatedDataPoints) { String metricName; @@ -189,6 +206,22 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem return topologyId.substring(0, topologyId.substring(0, topologyId.lastIndexOf("-")).lastIndexOf("-")); } + private List<DataPoint> populateAllDataPointValues(DataPoint dataPoint) { + List<DataPoint> populatedDataPoints = new ArrayList<>(); + Collection<Object> values = (Collection<Object>) dataPoint.value; + for (Object value : values) { + List<DataPoint> populated = populateDataPoints(new DataPoint(dataPoint.name, value)); + for (DataPoint point : populated) { + if (point.value == null) { + continue; + } + + populatedDataPoints.add(point); + } + } + return populatedDataPoints; + } + private List<DataPoint> populateDataPoints(DataPoint dataPoint) { List<DataPoint> dataPoints = new ArrayList<>(); @@ -233,6 +266,58 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem } } + private Map<String, List<Double>> groupByMetricNameDataPoints(List<DataPoint> populatedDataPoints) { + Map<String, List<Double>> metricNameKeyedValues = new HashMap<>(); + for (DataPoint point : populatedDataPoints) { + List<Double> valuesOnMetric = metricNameKeyedValues.get(point.name); + + if (valuesOnMetric == null) { + valuesOnMetric = new ArrayList<>(); + metricNameKeyedValues.put(point.name, valuesOnMetric); + } + + valuesOnMetric.add(Double.valueOf(point.value.toString())); + } + return metricNameKeyedValues; + } + + private List<DataPoint> applyAggregationToMetricNameKeyedDataPoints(Map<String, List<Double>> metricNameKeyedValues) { + List<DataPoint> populatedDataPoints = new ArrayList<>(); + for (Map.Entry<String, List<Double>> metricNameToValues : metricNameKeyedValues.entrySet()) { + String key = metricNameToValues.getKey(); + List<Double> values = metricNameToValues.getValue(); + populatedDataPoints.add(new DataPoint(key, applyAggregateFunction(key, values))); + } + return populatedDataPoints; + } + + private Double applyAggregateFunction(String metricName, List<Double> values) { + String lowerCaseMetricName = metricName.toLowerCase(); + for (String aggregateMetricSubstring : METRIC_LOWERCASE_SUBSTRINGS_AGGREGATE_AVERAGE) { + if (lowerCaseMetricName.contains(aggregateMetricSubstring)) { + return calculateAverage(values); + } + } + + return calculateSummation(values); + } + + private Double calculateSummation(List<Double> values) { + Double sum = 0.0; + for (Double value : values) { + sum += value; + } + return sum; + } + + private Double calculateAverage(List<Double> values) { + if (values.isEmpty()) { + return 0.0d; + } + + return calculateSummation(values) / values.size(); + } + private TimelineMetric createTimelineMetric(long currentTimeMillis, String hostName, String attributeName, Double attributeValue) { TimelineMetric timelineMetric = new TimelineMetric(); http://git-wip-us.apache.org/repos/asf/ambari/blob/7ff7bcf3/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java index fadb00c..81a73db 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.metrics2.sink.storm; import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.METRIC_NAME_PREFIX_KAFKA_OFFSET; +import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.SYSTEM_TASK_ID; import static org.easymock.EasyMock.anyObject; import static org.easymock.EasyMock.createMockBuilder; import static org.easymock.EasyMock.createNiceMock; @@ -29,12 +30,16 @@ import static org.easymock.EasyMock.verify; import java.io.IOException; import java.net.SocketAddress; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; +import org.apache.storm.Constants; +import org.apache.storm.shade.com.google.common.collect.Lists; import org.junit.Ignore; import org.junit.Test; @@ -123,12 +128,59 @@ public class StormTimelineMetricsSinkTest { stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); replay(timelineMetricsCache); - Map<String, Object> valueMap = new HashMap<>(); - valueMap.put("field1", 53); - valueMap.put("field2", 64.12); + Map<String, Object> valueMap = getTestValueMap(); stormTimelineMetricsSink.handleDataPoints( new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60), Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMap))); verify(timelineMetricsCache); } + + @Test + @Ignore // TODO: Fix for failover + public void testWorkerLevelAggregatedNumericMetricMetricSubmission() throws InterruptedException, IOException { + StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); + stormTimelineMetricsSink.setTopologyName("topology1"); + TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); + expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234." + SYSTEM_TASK_ID + ".key1")) + .andReturn(new TimelineMetric()).once(); + timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); + expectLastCall().once(); + stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); + replay(timelineMetricsCache); + + stormTimelineMetricsSink.handleDataPoints( + new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_TASK_ID, 20000L, 60), + Collections.singleton(new IMetricsConsumer.DataPoint("key1", Lists.newArrayList(42.3, 42.3)))); + verify(timelineMetricsCache); + } + + @Test + @Ignore // TODO: Fix for failover + public void testWorkerLevelAggregatedMapMetricMetricSubmission() throws InterruptedException, IOException { + StormTimelineMetricsSink stormTimelineMetricsSink = new StormTimelineMetricsSink(); + stormTimelineMetricsSink.setTopologyName("topology1"); + TimelineMetricsCache timelineMetricsCache = createNiceMock(TimelineMetricsCache.class); + expect(timelineMetricsCache.getTimelineMetric("topology.topology1.testComponent.localhost.1234." + SYSTEM_TASK_ID + ".key1.field1")) + .andReturn(new TimelineMetric()).once(); + timelineMetricsCache.putTimelineMetric(anyObject(TimelineMetric.class)); + expectLastCall().once(); + stormTimelineMetricsSink.setMetricsCache(timelineMetricsCache); + replay(timelineMetricsCache); + + List<Map<String, Object>> valueMapList = new ArrayList<>(); + valueMapList.add(getTestValueMap()); + valueMapList.add(getTestValueMap()); + + stormTimelineMetricsSink.handleDataPoints( + new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", SYSTEM_TASK_ID, 20000L, 60), + Collections.singleton(new IMetricsConsumer.DataPoint("key1", valueMapList))); + verify(timelineMetricsCache); + } + + private Map<String, Object> getTestValueMap() { + Map<String, Object> valueMap = new HashMap<>(); + valueMap.put("field1", 53); + valueMap.put("field2", 64.12); + return valueMap; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/7ff7bcf3/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml index 280fc42..7059b90 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/storm-site.xml @@ -33,15 +33,52 @@ <value-attributes> <overridable>false</overridable> </value-attributes> - <on-ambari-upgrade add="false"/> + <on-ambari-upgrade add="true"/> </property> <property> <name>topology.metrics.consumer.register</name> - <value>[{"class": "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink", "parallelism.hint": 1, "expandMapType": true, "metricNameSeparator": ".", "whitelist": ["kafkaOffset\\..+/", "__complete-latency", "__process-latency", "__receive\\.population$", "__sendqueue\\.population$", "__execute-count", "__emit-count", "__ack-count", "__fail-count", "memory/heap\\.usedBytes$", "memory/nonHeap\\.usedBytes$", "GC/.+\\.count$", "GC/.+\\.timeMs$"]}]</value> + <value>[{"class": "org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink", "parallelism.hint": 1, "whitelist": ["kafkaOffset\\..+/", "__complete-latency", "__process-latency", "__receive\\.population$", "__sendqueue\\.population$", "__execute-count", "__emit-count", "__ack-count", "__fail-count", "memory/heap\\.usedBytes$", "memory/nonHeap\\.usedBytes$", "GC/.+\\.count$", "GC/.+\\.timeMs$"]}]</value> + <description></description> + <value-attributes> + <overridable>false</overridable> + </value-attributes> + <on-ambari-upgrade add="true"/> + </property> + <property> + <name>topology.metrics.aggregate.per.worker</name> + <value>true</value> + <description></description> + <value-attributes> + <overridable>false</overridable> + </value-attributes> + <on-ambari-upgrade add="true"/> + </property> + <property> + <name>topology.metrics.aggregate.metric.evict.secs</name> + <value>5</value> <description></description> <value-attributes> <overridable>false</overridable> </value-attributes> - <on-ambari-upgrade add="false"/> + <on-ambari-upgrade add="true"/> + </property> + <property> + <name>topology.metrics.expand.map.type</name> + <value>true</value> + <description></description> + <value-attributes> + <overridable>false</overridable> + </value-attributes> + <on-ambari-upgrade add="true"/> + </property> + <property> + <name>topology.metrics.metric.name.separator</name> + <value>.</value> + <description></description> + <value-attributes> + <overridable>false</overridable> + </value-attributes> + <on-ambari-upgrade add="true"/> </property> + </configuration>
