AMBARI-17845 : Storm cluster metrics do not show up because of AMS aggregation issue. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/27ed7309 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/27ed7309 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/27ed7309 Branch: refs/heads/branch-2.4 Commit: 27ed730940fd75a3a2301e37cfe9953918f55363 Parents: e8751bb Author: Aravindan Vijayan <[email protected]> Authored: Fri Jul 22 13:41:50 2016 -0700 Committer: Aravindan Vijayan <[email protected]> Committed: Fri Jul 22 14:02:19 2016 -0700 ---------------------------------------------------------------------- .../TimelineMetricClusterAggregatorSecond.java | 22 +- ...melineMetricClusterAggregatorSecondTest.java | 201 +++++++++++++++++++ 2 files changed, 219 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/27ed7309/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java index bd46045..1676867 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil; @@ -26,7 +27,6 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; @@ -40,7 +40,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_SQL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA; /** * Aggregates a metric across all hosts in the cluster. Reads metrics from @@ -97,7 +96,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre @Override protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new DefaultCondition(null, null, null, null, startTime, + Condition condition = new DefaultCondition(null, null, null, null, startTime - serverTimeShiftAdjustment, endTime, null, null, true); condition.setNoLimit(); condition.setFetchSize(resultsetFetchSize); @@ -163,7 +162,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre * timeline.metrics.cluster.aggregator.minute.timeslice.interval * Normalize value by averaging them within the interval */ - private void processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics, + protected void processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics, TimelineMetric metric, List<Long[]> timeSlices) { // Create time slices Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices); @@ -301,6 +300,17 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre } else { //For other metrics, ok to do only interpolation + Double defaultNextSeenValue = null; + if (MapUtils.isEmpty(timeSliceValueMap) && MapUtils.isNotEmpty(timelineMetric.getMetricValues())) { + //If no value was found within the start_time based slices, but the metric has value in the server_time range, + // use that. + + LOG.debug("No value found within range for metric : " + timelineMetric.getMetricName()); + Map.Entry<Long,Double> firstEntry = timelineMetric.getMetricValues().firstEntry(); + defaultNextSeenValue = firstEntry.getValue(); + LOG.debug("Found a data point outside timeslice range: " + new Date(firstEntry.getKey()) + ": " + defaultNextSeenValue); + } + for (int sliceNum = 0; sliceNum < timeSlices.size(); sliceNum++) { Long[] timeSlice = timeSlices.get(sliceNum); @@ -323,6 +333,10 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre nextSeenValue = timeSliceValueMap.get(nextTimeSlice[1]); } + if (nextSeenValue == null) { + nextSeenValue = defaultNextSeenValue; + } + Double interpolatedValue = PostProcessingUtil.interpolate(timeSlice[1], (prevTimeSlice != null ? prevTimeSlice[1] : null), lastSeenValue, (nextTimeSlice != null ? nextTimeSlice[1] : null), nextSeenValue); http://git-wip-us.apache.org/repos/asf/ambari/blob/27ed7309/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java index 6776a3c..b992d60 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java @@ -25,6 +25,7 @@ import org.easymock.EasyMock; import org.junit.Test; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -104,4 +105,204 @@ public class TimelineMetricClusterAggregatorSecondTest { } + @Test + public void testShouldAggregateProperly() { + + long aggregatorInterval = 120000l; + long sliceInterval = 30000l; + + Configuration configuration = new Configuration(); + TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class); + + TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond( + "TimelineClusterAggregatorSecond", metricMetadataManagerMock, null, configuration, null, + aggregatorInterval, 2, "false", "", "", aggregatorInterval, sliceInterval + ); + + long startTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(System.currentTimeMillis(),aggregatorInterval); + List<Long[]> timeslices = secondAggregator.getTimeSlices(startTime, startTime + aggregatorInterval); + + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>(); + long seconds = 1000; + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName("m1"); + timelineMetric.setHostName("h1"); + timelineMetric.setAppId("a1"); + timelineMetric.setType("GUAGE"); + timelineMetric.setStartTime(startTime); + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (1) (2) (3) (4) (5) (6) + + */ + // Case 1 : Points present in all the required timeslices. + // Life is good! Ignore (5) and (6). + + TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + metricValues.put(startTime + 15*seconds, 1.0); + metricValues.put(startTime + 45*seconds, 2.0); + metricValues.put(startTime + 75*seconds, 3.0); + metricValues.put(startTime + 105*seconds, 4.0); + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), + timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds, timelineMetric.getType()); + + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 1.0); + + timelineClusterMetric.setTimestamp(startTime + 4*30*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(),4.0); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (1) (3) (4) (5) (6) + + */ + // Case 2 : Some "middle" point missing in the required timeslices. + // Interpolate the middle point. Ignore (5) and (6). + metricValues.put(startTime + 15*seconds, 1.0); + metricValues.put(startTime + 75*seconds, 3.0); + metricValues.put(startTime + 105*seconds, 4.0); + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 60*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 2.0); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (1) (2) (3) (5) (6) + + */ + // Case 3 : "end" point missing in the required timeslices. + // Use all points to get missing point if COUNTER. Else use just (3). Ignore (6). + metricValues.put(startTime + 15*seconds, 1.0); + metricValues.put(startTime + 45*seconds, 2.0); + metricValues.put(startTime + 75*seconds, 3.0); + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + timelineMetric.setType("GUAGE"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 120*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 3.0); + + aggregateClusterMetrics.clear(); + + timelineMetric.setType("COUNTER"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 120*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 4.5); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (2) (3) (4) (5) (6) + + */ + // Case 4 : "start" point missing in the required timeslices. + // Interpolate with only (2) to get missing point if GUAGE metric. Else use all points for COUNTER. + + metricValues.put(startTime + 45*seconds, 2.0); + metricValues.put(startTime + 75*seconds, 3.0); + metricValues.put(startTime + 105*seconds, 4.0); + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + timelineMetric.setType("GUAGE"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 30*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 2.0); + + aggregateClusterMetrics.clear(); + + timelineMetric.setType("COUNTER"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 30*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 1.5); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (5) (6) + + */ + // Case 5 : Well, we have nothing in the 2 min window. + // Use (5) to paint the 2 min window as (5). + + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + timelineMetric.setType("GUAGE"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 30*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 5.0); + + aggregateClusterMetrics.clear(); + + timelineMetric.setType("COUNTER"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 60*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 2.5); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + } + }
