Repository: ambari Updated Branches: refs/heads/trunk 29115e81e -> d1cec98c9
AMBARI-15100 OOM on TimelineMetricCache in Nodemanager (dsen) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d1cec98c Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d1cec98c Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d1cec98c Branch: refs/heads/trunk Commit: d1cec98c9a98d8e183e13ceeb06e11ebd02a0701 Parents: 29115e8 Author: Dmytro Sen <[email protected]> Authored: Fri Feb 26 09:25:43 2016 +0200 Committer: Dmytro Sen <[email protected]> Committed: Fri Feb 26 09:25:43 2016 +0200 ---------------------------------------------------------------------- .../timeline/cache/TimelineMetricsCache.java | 44 ++++++++------ .../cache/TimelineMetricsCacheTest.java | 64 +++++++++++++++++++- 2 files changed, 87 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/d1cec98c/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java index 15bd5f4..3316a54 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java @@ -24,9 +24,9 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; @InterfaceAudience.Public @InterfaceStability.Evolving @@ -70,38 +70,46 @@ public class TimelineMetricsCache { } } - public void putMetric(TimelineMetric metric) { + public synchronized void putMetric(TimelineMetric metric) { + TreeMap<Long, Double> metricValues = this.timelineMetric.getMetricValues(); + if (metricValues.size() > maxRecsPerName) { + // remove values for eldest maxEvictionTimeInMillis + long newEldestTimestamp = oldestTimestamp + maxEvictionTimeInMillis; + TreeMap<Long, Double> metricsSubSet = + new TreeMap<>(metricValues.tailMap(newEldestTimestamp)); + if (metricsSubSet.isEmpty()) { + oldestTimestamp = metric.getStartTime(); + this.timelineMetric.setStartTime(metric.getStartTime()); + } else { + Long newStartTime = metricsSubSet.firstKey(); + oldestTimestamp = newStartTime; + this.timelineMetric.setStartTime(newStartTime); + } + this.timelineMetric.setMetricValues(metricsSubSet); + LOG.warn("Metrics cache overflow. Values for metric " + + metric.getMetricName() + " older than " + newEldestTimestamp + + " were removed to clean up the cache."); + } this.timelineMetric.addMetricValues(metric.getMetricValues()); updateTimeDiff(metric.getStartTime()); } - public long getTimeDiff() { + public synchronized long getTimeDiff() { return timeDiff; } - public TimelineMetric getTimelineMetric() { + public synchronized TimelineMetric getTimelineMetric() { return timelineMetric; } } - // TODO: Change to ConcurentHashMap with weighted eviction - class TimelineMetricHolder extends LinkedHashMap<String, TimelineMetricWrapper> {// - private static final long serialVersionUID = 1L; - private boolean gotOverflow = false; + // TODO: Add weighted eviction + class TimelineMetricHolder extends ConcurrentSkipListMap<String, TimelineMetricWrapper> { + private static final long serialVersionUID = 2L; // To avoid duplication at the end of the buffer and beginning of the next // segment of values private Map<String, Long> endOfBufferTimestamps = new HashMap<String, Long>(); - @Override - protected boolean removeEldestEntry(Map.Entry<String, TimelineMetricWrapper> eldest) { - boolean overflow = size() > maxRecsPerName; - if (overflow && !gotOverflow) { - LOG.warn("Metrics cache overflow at "+ size() +" for "+ eldest); - gotOverflow = true; - } - return overflow; - } - public TimelineMetric evict(String metricName) { TimelineMetricWrapper metricWrapper = this.get(metricName); http://git-wip-us.apache.org/repos/asf/ambari/blob/d1cec98c/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java index ad98525..18d973c 100644 --- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java +++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.TreeMap; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; public class TimelineMetricsCacheTest { @@ -31,9 +33,9 @@ public class TimelineMetricsCacheTest { private static final String METRIC_NAME = "Test name"; private static final double delta = 0.00001; - private final TimelineMetricsCache timelineMetricsCache = new TimelineMetricsCache( - TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT, - TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS); + private final TimelineMetricsCache timelineMetricsCache = + new TimelineMetricsCache(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT, + TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS); @Test public void testPutGetCounterTimelineMetric() throws Exception { @@ -72,6 +74,62 @@ public class TimelineMetricsCacheTest { assertEquals(70, cachedMetric.getMetricValues().get(8L), delta); } + @Test + public void testMaxRecsPerName() throws Exception { + int maxRecsPerName = 2; + int maxEvictionTime = TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS ; + TimelineMetricsCache timelineMetricsCache = + new TimelineMetricsCache(maxRecsPerName, maxEvictionTime); + + // put 2 metrics , no cache overflow + timelineMetricsCache.putTimelineMetric( + createTimelineMetricSingleValue(DEFAULT_START_TIME )); + timelineMetricsCache.putTimelineMetric( + createTimelineMetricSingleValue(DEFAULT_START_TIME + maxEvictionTime * 2)); + TimelineMetric cachedMetric = timelineMetricsCache.getTimelineMetric(METRIC_NAME); + assertNotNull(cachedMetric); + assertFalse(cachedMetric.getMetricValues().isEmpty()); + assertEquals("2 values added.", 2, cachedMetric.getMetricValues().size()); + assertEquals(DEFAULT_START_TIME, cachedMetric.getStartTime()); + + // put 3 metrics, no cache overflow. check is performed before put operation + timelineMetricsCache.putTimelineMetric( + createTimelineMetricSingleValue(DEFAULT_START_TIME )); + timelineMetricsCache.putTimelineMetric( + createTimelineMetricSingleValue(DEFAULT_START_TIME + maxEvictionTime * 2)); + timelineMetricsCache.putTimelineMetric( + createTimelineMetricSingleValue(DEFAULT_START_TIME + maxEvictionTime * 3)); + cachedMetric = timelineMetricsCache.getTimelineMetric(METRIC_NAME); + assertNotNull(cachedMetric); + assertFalse(cachedMetric.getMetricValues().isEmpty()); + assertEquals("3 values added.", 3, cachedMetric.getMetricValues().size()); + assertEquals(DEFAULT_START_TIME, cachedMetric.getStartTime()); + + // put 4 metric values, cache cleaned. + timelineMetricsCache.putTimelineMetric( + createTimelineMetricSingleValue(DEFAULT_START_TIME )); + timelineMetricsCache.putTimelineMetric( + createTimelineMetricSingleValue(DEFAULT_START_TIME + maxEvictionTime * 2)); + timelineMetricsCache.putTimelineMetric( + createTimelineMetricSingleValue(DEFAULT_START_TIME + maxEvictionTime * 3)); + timelineMetricsCache.putTimelineMetric( + createTimelineMetricSingleValue(DEFAULT_START_TIME + maxEvictionTime * 4)); + cachedMetric = timelineMetricsCache.getTimelineMetric(METRIC_NAME); + assertNotNull(cachedMetric); + assertFalse(cachedMetric.getMetricValues().isEmpty()); + // check is performed before put operation. while putting 4th metric value, + // the first value deleted + assertEquals("1 metric value should have been removed", 3, cachedMetric.getMetricValues().size()); + // first metric value was removed, starttime == second metric value starttime + assertEquals(DEFAULT_START_TIME + maxEvictionTime * 2, cachedMetric.getStartTime()); + } + + private TimelineMetric createTimelineMetricSingleValue(final long startTime) { + TreeMap<Long, Double> values = new TreeMap<Long, Double>(); + values.put(startTime, 0.0); + return createTimelineMetric(values, startTime); + + } private TimelineMetric createTimelineMetric(Map<Long, Double> metricValues, long startTime) { TimelineMetric timelineMetric = new TimelineMetric();
