AMBARI-17845 : Storm cluster metrics do not show up because of AMS aggregation issue. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1a415a66 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1a415a66 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1a415a66 Branch: refs/heads/trunk Commit: 1a415a66d569d0aa6e698564d54b446c921fa449 Parents: a484b23 Author: Aravindan Vijayan <[email protected]> Authored: Fri Jul 22 14:56:24 2016 -0700 Committer: Aravindan Vijayan <[email protected]> Committed: Fri Jul 22 14:56:24 2016 -0700 ---------------------------------------------------------------------- .../TimelineMetricClusterAggregatorSecond.java | 20 +- ...melineMetricClusterAggregatorSecondTest.java | 201 +++++++++++++++++++ 2 files changed, 219 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/1a415a66/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java index bdc0feb..6731eb3 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; +import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil; @@ -98,7 +99,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre @Override protected Condition prepareMetricQueryCondition(long startTime, long endTime) { - Condition condition = new DefaultCondition(null, null, null, null, startTime, + Condition condition = new DefaultCondition(null, null, null, null, startTime - serverTimeShiftAdjustment, endTime, null, null, true); condition.setNoLimit(); condition.setFetchSize(resultsetFetchSize); @@ -164,7 +165,7 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre * timeline.metrics.cluster.aggregator.minute.timeslice.interval * Normalize value by averaging them within the interval */ - private void processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics, + protected void processAggregateClusterMetrics(Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics, TimelineMetric metric, List<Long[]> timeSlices) { // Create time slices Map<TimelineClusterMetric, Double> clusterMetrics = sliceFromTimelineMetric(metric, timeSlices); @@ -302,6 +303,17 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre } else { //For other metrics, ok to do only interpolation + Double defaultNextSeenValue = null; + if (MapUtils.isEmpty(timeSliceValueMap) && MapUtils.isNotEmpty(timelineMetric.getMetricValues())) { + //If no value was found within the start_time based slices, but the metric has value in the server_time range, + // use that. + + LOG.debug("No value found within range for metric : " + timelineMetric.getMetricName()); + Map.Entry<Long,Double> firstEntry = timelineMetric.getMetricValues().firstEntry(); + defaultNextSeenValue = firstEntry.getValue(); + LOG.debug("Found a data point outside timeslice range: " + new Date(firstEntry.getKey()) + ": " + defaultNextSeenValue); + } + for (int sliceNum = 0; sliceNum < timeSlices.size(); sliceNum++) { Long[] timeSlice = timeSlices.get(sliceNum); @@ -324,6 +336,10 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre nextSeenValue = timeSliceValueMap.get(nextTimeSlice[1]); } + if (nextSeenValue == null) { + nextSeenValue = defaultNextSeenValue; + } + Double interpolatedValue = PostProcessingUtil.interpolate(timeSlice[1], (prevTimeSlice != null ? prevTimeSlice[1] : null), lastSeenValue, (nextTimeSlice != null ? nextTimeSlice[1] : null), nextSeenValue); http://git-wip-us.apache.org/repos/asf/ambari/blob/1a415a66/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java index 1e2f4ec..0f93bab 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java @@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline. import org.easymock.EasyMock; import org.junit.Test; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -105,4 +106,204 @@ public class TimelineMetricClusterAggregatorSecondTest { } + @Test + public void testShouldAggregateProperly() { + + long aggregatorInterval = 120000l; + long sliceInterval = 30000l; + + Configuration configuration = new Configuration(); + TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createNiceMock(TimelineMetricMetadataManager.class); + + TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond( + METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null, + aggregatorInterval, 2, "false", "", "", aggregatorInterval, sliceInterval, null + ); + + long startTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(System.currentTimeMillis(),aggregatorInterval); + List<Long[]> timeslices = secondAggregator.getTimeSlices(startTime, startTime + aggregatorInterval); + + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>(); + long seconds = 1000; + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName("m1"); + timelineMetric.setHostName("h1"); + timelineMetric.setAppId("a1"); + timelineMetric.setType("GUAGE"); + timelineMetric.setStartTime(startTime); + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (1) (2) (3) (4) (5) (6) + + */ + // Case 1 : Points present in all the required timeslices. + // Life is good! Ignore (5) and (6). + + TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + metricValues.put(startTime + 15*seconds, 1.0); + metricValues.put(startTime + 45*seconds, 2.0); + metricValues.put(startTime + 75*seconds, 3.0); + metricValues.put(startTime + 105*seconds, 4.0); + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), + timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds, timelineMetric.getType()); + + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 1.0); + + timelineClusterMetric.setTimestamp(startTime + 4*30*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(),4.0); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (1) (3) (4) (5) (6) + + */ + // Case 2 : Some "middle" point missing in the required timeslices. + // Interpolate the middle point. Ignore (5) and (6). + metricValues.put(startTime + 15*seconds, 1.0); + metricValues.put(startTime + 75*seconds, 3.0); + metricValues.put(startTime + 105*seconds, 4.0); + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 60*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 2.0); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (1) (2) (3) (5) (6) + + */ + // Case 3 : "end" point missing in the required timeslices. + // Use all points to get missing point if COUNTER. Else use just (3). Ignore (6). + metricValues.put(startTime + 15*seconds, 1.0); + metricValues.put(startTime + 45*seconds, 2.0); + metricValues.put(startTime + 75*seconds, 3.0); + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + timelineMetric.setType("GUAGE"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 120*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 3.0); + + aggregateClusterMetrics.clear(); + + timelineMetric.setType("COUNTER"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 120*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 4.5); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (2) (3) (4) (5) (6) + + */ + // Case 4 : "start" point missing in the required timeslices. + // Interpolate with only (2) to get missing point if GUAGE metric. Else use all points for COUNTER. + + metricValues.put(startTime + 45*seconds, 2.0); + metricValues.put(startTime + 75*seconds, 3.0); + metricValues.put(startTime + 105*seconds, 4.0); + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + timelineMetric.setType("GUAGE"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 30*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 2.0); + + aggregateClusterMetrics.clear(); + + timelineMetric.setType("COUNTER"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 30*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 1.5); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (5) (6) + + */ + // Case 5 : Well, we have nothing in the 2 min window. + // Use (5) to paint the 2 min window as (5). + + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + timelineMetric.setType("GUAGE"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 30*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 5.0); + + aggregateClusterMetrics.clear(); + + timelineMetric.setType("COUNTER"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 60*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 2.5); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + } + }
