Repository: ambari Updated Branches: refs/heads/branch-2.4 5b585bda9 -> 12b1a0c80
Revert "AMBARI-16821 Improve TimelineMetricsCache eviction/flush logic using a cache library (dsen)" This reverts commit ecaef41426ff67811ac4b2f5c618fb651286d160. Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/12b1a0c8 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/12b1a0c8 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/12b1a0c8 Branch: refs/heads/branch-2.4 Commit: 12b1a0c803335ec60ca5fce51cd31ea8478be795 Parents: 5b585bd Author: Jonathan Hurley <[email protected]> Authored: Wed Jun 1 16:34:05 2016 -0400 Committer: Jonathan Hurley <[email protected]> Committed: Wed Jun 1 16:34:05 2016 -0400 ---------------------------------------------------------------------- ambari-metrics/ambari-metrics-common/pom.xml | 28 ---- .../timeline/cache/TimelineMetricsCache.java | 161 +++++++++++++------ .../cache/TimelineMetricsCacheTest.java | 20 +-- .../timeline/HadoopTimelineMetricsSinkTest.java | 11 +- ambari-project/pom.xml | 5 + ambari-server/pom.xml | 5 - 6 files changed, 125 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/12b1a0c8/ambari-metrics/ambari-metrics-common/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml index 3e17f22..af37d28 100644 --- a/ambari-metrics/ambari-metrics-common/pom.xml +++ b/ambari-metrics/ambari-metrics-common/pom.xml @@ -70,39 +70,11 @@ </execution> </executions> </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>2.3</version> - <executions> - <!-- Run shade goal on package phase --> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - <configuration> - <artifactSet> - <includes> - <include>com.google.guava:*</include> - </includes> - </artifactSet> - <minimizeJar>true</minimizeJar> - <createDependencyReducedPom>false</createDependencyReducedPom> - </configuration> - </execution> - </executions> - </plugin> </plugins> </build> <dependencies> <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>14.0.1</version> - </dependency> - <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> http://git-wip-us.apache.org/repos/asf/ambari/blob/12b1a0c8/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java index 57f1437..0bed7d0 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.metrics2.sink.timeline.cache; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -27,18 +25,18 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ConcurrentSkipListMap; @InterfaceAudience.Public @InterfaceStability.Evolving public class TimelineMetricsCache { - private final Cache<String, TimelineMetricWrapper> timelineMetricCache; + private final TimelineMetricHolder timelineMetricCache = new TimelineMetricHolder(); private static final Log LOG = LogFactory.getLog(TimelineMetric.class); public static final int MAX_RECS_PER_NAME_DEFAULT = 10000; public static final int MAX_EVICTION_TIME_MILLIS = 59000; // ~ 1 min @@ -56,81 +54,144 @@ public class TimelineMetricsCache { this.maxRecsPerName = maxRecsPerName; this.maxEvictionTimeInMillis = maxEvictionTimeInMillis; this.skipCounterTransform = skipCounterTransform; - this.timelineMetricCache = CacheBuilder.newBuilder().expireAfterWrite(maxEvictionTimeInMillis * 2, TimeUnit.MILLISECONDS).build(); } class TimelineMetricWrapper { - private Cache<Long, Double> dataPointsCache; + private long timeDiff = -1; + private long oldestTimestamp = -1; private TimelineMetric timelineMetric; - private Long oldestTimeStamp; - private Long newestTimeStamp; TimelineMetricWrapper(TimelineMetric timelineMetric) { this.timelineMetric = timelineMetric; - dataPointsCache = CacheBuilder.newBuilder(). - maximumSize(maxRecsPerName).expireAfterWrite(maxEvictionTimeInMillis * 2, TimeUnit.MILLISECONDS).build(); + this.oldestTimestamp = timelineMetric.getStartTime(); + } - putMetric(timelineMetric); + private void updateTimeDiff(long timestamp) { + if (oldestTimestamp != -1 && timestamp > oldestTimestamp) { + timeDiff = timestamp - oldestTimestamp; + } else { + oldestTimestamp = timestamp; + } } public synchronized void putMetric(TimelineMetric metric) { - if (dataPointsCache.size() == 0) { - oldestTimeStamp = metric.getStartTime(); - newestTimeStamp = metric.getStartTime(); + TreeMap<Long, Double> metricValues = this.timelineMetric.getMetricValues(); + if (metricValues.size() > maxRecsPerName) { + // remove values for eldest maxEvictionTimeInMillis + long newEldestTimestamp = oldestTimestamp + maxEvictionTimeInMillis; + TreeMap<Long, Double> metricsSubSet = + new TreeMap<>(metricValues.tailMap(newEldestTimestamp)); + if (metricsSubSet.isEmpty()) { + oldestTimestamp = metric.getStartTime(); + this.timelineMetric.setStartTime(metric.getStartTime()); + } else { + Long newStartTime = metricsSubSet.firstKey(); + oldestTimestamp = newStartTime; + this.timelineMetric.setStartTime(newStartTime); + } + this.timelineMetric.setMetricValues(metricsSubSet); + LOG.warn("Metrics cache overflow. Values for metric " + + metric.getMetricName() + " older than " + newEldestTimestamp + + " were removed to clean up the cache."); } - TreeMap<Long, Double> metricValues = metric.getMetricValues(); - for (Map.Entry<Long, Double> entry : metricValues.entrySet()) { - Long key = entry.getKey(); - dataPointsCache.put(key, entry.getValue()); - } - oldestTimeStamp = Math.min(oldestTimeStamp, metric.getStartTime()); - newestTimeStamp = Math.max(newestTimeStamp, metric.getStartTime()); + this.timelineMetric.addMetricValues(metric.getMetricValues()); + updateTimeDiff(metric.getStartTime()); + } + + public synchronized long getTimeDiff() { + return timeDiff; } public synchronized TimelineMetric getTimelineMetric() { - TreeMap<Long, Double> metricValues = new TreeMap<>(dataPointsCache.asMap()); - if (metricValues.isEmpty() || newestTimeStamp - oldestTimeStamp < maxEvictionTimeInMillis) { + return timelineMetric; + } + } + + // TODO: Add weighted eviction + class TimelineMetricHolder extends ConcurrentSkipListMap<String, TimelineMetricWrapper> { + private static final long serialVersionUID = 2L; + // To avoid duplication at the end of the buffer and beginning of the next + // segment of values + private Map<String, Long> endOfBufferTimestamps = new HashMap<String, Long>(); + + public TimelineMetric evict(String metricName) { + TimelineMetricWrapper metricWrapper = this.get(metricName); + + if (metricWrapper == null + || metricWrapper.getTimeDiff() < getMaxEvictionTimeInMillis()) { return null; } - dataPointsCache.invalidateAll(); - timelineMetric.setStartTime(metricValues.firstKey()); - timelineMetric.setMetricValues(metricValues); - return new TimelineMetric(timelineMetric); + + TimelineMetric timelineMetric = metricWrapper.getTimelineMetric(); + this.remove(metricName); + + return timelineMetric; + } + + public TimelineMetrics evictAll() { + List<TimelineMetric> metricList = new ArrayList<TimelineMetric>(); + + for (Iterator<Map.Entry<String, TimelineMetricWrapper>> it = this.entrySet().iterator(); it.hasNext();) { + Map.Entry<String, TimelineMetricWrapper> cacheEntry = it.next(); + TimelineMetricWrapper metricWrapper = cacheEntry.getValue(); + if (metricWrapper != null) { + TimelineMetric timelineMetric = cacheEntry.getValue().getTimelineMetric(); + metricList.add(timelineMetric); + } + it.remove(); + } + TimelineMetrics timelineMetrics = new TimelineMetrics(); + timelineMetrics.setMetrics(metricList); + return timelineMetrics; + } + + public void put(String metricName, TimelineMetric timelineMetric) { + if (isDuplicate(timelineMetric)) { + return; + } + TimelineMetricWrapper metric = this.get(metricName); + if (metric == null) { + this.put(metricName, new TimelineMetricWrapper(timelineMetric)); + } else { + metric.putMetric(timelineMetric); + } + // Buffer last ts value + endOfBufferTimestamps.put(metricName, timelineMetric.getStartTime()); + } + + /** + * Test whether last buffered timestamp is same as the newly received. + * @param timelineMetric @TimelineMetric + * @return true/false + */ + private boolean isDuplicate(TimelineMetric timelineMetric) { + return endOfBufferTimestamps.containsKey(timelineMetric.getMetricName()) + && endOfBufferTimestamps.get(timelineMetric.getMetricName()).equals(timelineMetric.getStartTime()); } } public TimelineMetric getTimelineMetric(String metricName) { - TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName); - if (timelineMetricWrapper != null) { - return timelineMetricWrapper.getTimelineMetric(); + if (timelineMetricCache.containsKey(metricName)) { + return timelineMetricCache.evict(metricName); } + return null; } public TimelineMetrics getAllMetrics() { - TimelineMetrics timelineMetrics = new TimelineMetrics(); - Collection<TimelineMetricWrapper> timelineMetricWrapperCollection = timelineMetricCache.asMap().values(); - List<TimelineMetric> timelineMetricList = - new ArrayList<>(timelineMetricWrapperCollection.size()); - - for (TimelineMetricWrapper timelineMetricWrapper : timelineMetricWrapperCollection) { - timelineMetricList.add(timelineMetricWrapper.getTimelineMetric()); - } - - timelineMetrics.setMetrics(timelineMetricList); - return timelineMetrics; + return timelineMetricCache.evictAll(); } + /** + * Getter method to help testing eviction + * @return @int + */ + public int getMaxEvictionTimeInMillis() { + return maxEvictionTimeInMillis; + } public void putTimelineMetric(TimelineMetric timelineMetric) { - String metricName = timelineMetric.getMetricName(); - TimelineMetricWrapper timelineMetricWrapper = timelineMetricCache.getIfPresent(metricName); - - if (timelineMetricWrapper != null) { - timelineMetricWrapper.putMetric(timelineMetric); - } else { - timelineMetricCache.put(metricName, new TimelineMetricWrapper(timelineMetric)); - } + timelineMetricCache.put(timelineMetric.getMetricName(), timelineMetric); } private void transformMetricValuesToDerivative(TimelineMetric timelineMetric) { http://git-wip-us.apache.org/repos/asf/ambari/blob/12b1a0c8/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java index 87c848b..18d973c 100644 --- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java +++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCacheTest.java @@ -26,7 +26,6 @@ import java.util.TreeMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; public class TimelineMetricsCacheTest { @@ -76,8 +75,8 @@ public class TimelineMetricsCacheTest { } @Test - public void testMaxRecsPerNameForTimelineMetricWrapperCache() throws Exception { - int maxRecsPerName = 3; + public void testMaxRecsPerName() throws Exception { + int maxRecsPerName = 2; int maxEvictionTime = TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS ; TimelineMetricsCache timelineMetricsCache = new TimelineMetricsCache(maxRecsPerName, maxEvictionTime); @@ -125,21 +124,6 @@ public class TimelineMetricsCacheTest { assertEquals(DEFAULT_START_TIME + maxEvictionTime * 2, cachedMetric.getStartTime()); } - @Test - public void testEvictionTimeForTimelineMetricWrapperCache() { - int maxEvictionTime = 10; - TimelineMetricsCache timelineMetricsCache = - new TimelineMetricsCache(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT, maxEvictionTime); - int numberOfMetricsInserted = 1000; - for (int i = 0; i < numberOfMetricsInserted; i++) { - timelineMetricsCache.putTimelineMetric( - createTimelineMetricSingleValue(DEFAULT_START_TIME + maxEvictionTime * i)); - } - TimelineMetric cachedMetric = timelineMetricsCache.getTimelineMetric(METRIC_NAME); - assertNotNull(cachedMetric); - assertTrue("Some metric values should have been removed", cachedMetric.getMetricValues().size() < numberOfMetricsInserted); - } - private TimelineMetric createTimelineMetricSingleValue(final long startTime) { TreeMap<Long, Double> values = new TreeMap<Long, Double>(); values.put(startTime, 0.0); http://git-wip-us.apache.org/repos/asf/ambari/blob/12b1a0c8/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java index afe0ea9..4a5abcc 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java @@ -150,7 +150,7 @@ public class HadoopTimelineMetricsSinkTest { expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes(); // Return eviction time smaller than time diff for first 3 entries // Third entry will result in eviction - expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(90).anyTimes(); + expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes(); conf.setListDelimiter(eq(',')); expectLastCall().anyTimes(); @@ -179,6 +179,7 @@ public class HadoopTimelineMetricsSinkTest { expect(metric.value()).andReturn(3.0).once(); expect(metric.value()).andReturn(4.0).once(); expect(metric.value()).andReturn(5.0).once(); + expect(metric.value()).andReturn(6.0).once(); MetricsRecord record = createNiceMock(MetricsRecord.class); expect(record.name()).andReturn("testName").anyTimes(); @@ -195,7 +196,7 @@ public class HadoopTimelineMetricsSinkTest { final Long now = System.currentTimeMillis(); // TODO: Current implementation of cache needs > 1 elements to evict any expect(record.timestamp()).andReturn(now).times(2); - expect(record.timestamp()).andReturn(now + 100l).once(); + expect(record.timestamp()).andReturn(now + 100l).times(2); expect(record.timestamp()).andReturn(now + 200l).once(); expect(record.timestamp()).andReturn(now + 300l).once(); @@ -226,6 +227,8 @@ public class HadoopTimelineMetricsSinkTest { sink.putMetrics(record); // time = t3 sink.putMetrics(record); + // time = t4 + sink.putMetrics(record); verify(conf, sink, record, metric); @@ -239,7 +242,7 @@ public class HadoopTimelineMetricsSinkTest { Assert.assertEquals(now, timestamps.next()); Assert.assertEquals(new Long(now + 100l), timestamps.next()); Iterator<Double> values = timelineMetric1.getMetricValues().values().iterator(); - Assert.assertEquals(new Double(2.0), values.next()); + Assert.assertEquals(new Double(1.0), values.next()); Assert.assertEquals(new Double(3.0), values.next()); // t3, t4 TimelineMetric timelineMetric2 = metricsIterator.next().getMetrics().get(0); @@ -248,8 +251,8 @@ public class HadoopTimelineMetricsSinkTest { Assert.assertEquals(new Long(now + 200l), timestamps.next()); Assert.assertEquals(new Long(now + 300l), timestamps.next()); values = timelineMetric2.getMetricValues().values().iterator(); - Assert.assertEquals(new Double(4.0), values.next()); Assert.assertEquals(new Double(5.0), values.next()); + Assert.assertEquals(new Double(6.0), values.next()); } @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/12b1a0c8/ambari-project/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-project/pom.xml b/ambari-project/pom.xml index fdcb31b..7fe1e6b 100644 --- a/ambari-project/pom.xml +++ b/ambari-project/pom.xml @@ -218,6 +218,11 @@ <version>9.3-1101-jdbc4</version> </dependency> <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>16.0</version> + </dependency> + <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> <version>1.3.9</version> http://git-wip-us.apache.org/repos/asf/ambari/blob/12b1a0c8/ambari-server/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-server/pom.xml b/ambari-server/pom.xml index 66cc038..b488971 100644 --- a/ambari-server/pom.xml +++ b/ambari-server/pom.xml @@ -1102,11 +1102,6 @@ <artifactId>jetty-server</artifactId> </dependency> <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>16.0</version> - </dependency> - <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> </dependency>
