Repository: ambari Updated Branches: refs/heads/branch-3.0-ams a9c6054fe -> e196358ca
AMBARI-22215 Refine cluster second aggregator by aligning sink publish times to 1 minute boundaries. (dsen) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e196358c Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e196358c Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e196358c Branch: refs/heads/branch-3.0-ams Commit: e196358caa8b4c3387645238b8c48bb009cee311 Parents: a9c6054 Author: Dmytro Sen <d...@apache.org> Authored: Thu Oct 12 12:49:22 2017 +0300 Committer: Dmytro Sen <d...@apache.org> Committed: Thu Oct 12 12:49:22 2017 +0300 ---------------------------------------------------------------------- .../timeline/AbstractTimelineMetricsSink.java | 95 +++++++- .../metrics2/sink/timeline/TimelineMetric.java | 3 + .../AbstractTimelineMetricSinkTest.java | 240 +++++++++++++++++++ .../AbstractTimelineMetricSinkTest.java | 113 --------- .../timeline/HadoopTimelineMetricsSink.java | 2 +- .../timeline/HadoopTimelineMetricsSinkTest.java | 4 +- .../main/python/core/application_metric_map.py | 52 +++- .../python/core/TestApplicationMetricMap.py | 38 ++- .../timeline/TimelineMetricConfiguration.java | 3 - .../timeline/TimelineMetricsIgniteCache.java | 14 +- .../timeline/aggregators/AggregatorUtils.java | 2 +- .../TimelineMetricAggregatorFactory.java | 7 +- ...cClusterAggregatorSecondWithCacheSource.java | 38 +-- .../TimelineMetricsIgniteCacheTest.java | 56 ----- ...sterAggregatorSecondWithCacheSourceTest.java | 65 +---- 15 files changed, 437 insertions(+), 295 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index 3c06032..739e9dc 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -19,6 +19,8 @@ package org.apache.hadoop.metrics2.sink.timeline; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; @@ -58,6 +60,7 @@ import java.util.List; import java.util.Random; import java.util.Set; import java.util.SortedSet; +import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -130,6 +133,13 @@ public abstract class AbstractTimelineMetricsSink { private static final int COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES = 75; private static final int COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES = 60; + //10 seconds + protected int collectionPeriodMillis = 10000; + + private int cacheExpireTimeMinutesDefault = 10; + + private volatile Cache<String, TimelineMetric> metricsPostCache = CacheBuilder.newBuilder().expireAfterAccess(cacheExpireTimeMinutesDefault, TimeUnit.MINUTES).build(); + static { mapper = new ObjectMapper(); AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); @@ -289,7 +299,21 @@ public abstract class AbstractTimelineMetricsSink { return collectorHost; } + /** + * @param metrics metrics to post, metric values will be aligned by minute mark, + * last uncompleted minute will be cached to post in future iteration + */ protected boolean emitMetrics(TimelineMetrics metrics) { + return emitMetrics(metrics, false); + } + + /** + * @param metrics metrics to post, if postAllCachedMetrics is false metric values will be aligned by minute mark, + * last uncompleted minute will be cached to post in future iteration + * @param postAllCachedMetrics if set to true all cached metrics will be posted, ignoring the minute aligning + * @return + */ + protected boolean emitMetrics(TimelineMetrics metrics, boolean postAllCachedMetrics) { String connectUrl; boolean validCollectorHost = true; @@ -307,11 +331,20 @@ public abstract class AbstractTimelineMetricsSink { connectUrl = getCollectorUri(collectorHost); } + TimelineMetrics metricsToEmit = alignMetricsByMinuteMark(metrics); + + if (postAllCachedMetrics) { + for (TimelineMetric timelineMetric : metricsPostCache.asMap().values()) { + metricsToEmit.addOrMergeTimelineMetric(timelineMetric); + } + metricsPostCache.invalidateAll(); + } + if (validCollectorHost) { String jsonData = null; LOG.debug("EmitMetrics connectUrl = " + connectUrl); try { - jsonData = mapper.writeValueAsString(metrics); + jsonData = mapper.writeValueAsString(metricsToEmit); } catch (IOException e) { LOG.error("Unable to parse metrics", e); } @@ -335,6 +368,61 @@ public abstract class AbstractTimelineMetricsSink { } /** + * Align metrics by the minutes so that only complete minutes are send. + * Not completed minutes data points will be cached and posted when the minute will be completed. + * Cached metrics are merged with currently posting metrics + * e.g: + * first iteration if metrics from 00m15s to 01m15s are processed, + * then metrics from 00m15s to 00m59s will be posted + * and from 01m00s to 01m15s will be cached + * second iteration metrics from 01m25s to 02m55s are processed, + * cached metrics from previous call will be merged with current, + * metrics from 01m00s to 02m55s will be posted, cache will be empty + * @param metrics + * @return + */ + protected TimelineMetrics alignMetricsByMinuteMark(TimelineMetrics metrics) { + TimelineMetrics allMetricsToPost = new TimelineMetrics(); + + for (TimelineMetric metric : metrics.getMetrics()) { + TimelineMetric cachedMetric = metricsPostCache.getIfPresent(metric.getMetricName()); + if (cachedMetric != null) { + metric.addMetricValues(cachedMetric.getMetricValues()); + metricsPostCache.invalidate(metric.getMetricName()); + } + } + + for (TimelineMetric metric : metrics.getMetrics()) { + TreeMap<Long, Double> valuesToCache = new TreeMap<>(); + TreeMap<Long, Double> valuesToPost = metric.getMetricValues(); + + // in case there can't be any more datapoints in last minute just post the metrics, + // otherwise need to cut off and cache the last uncompleted minute + if (!(valuesToPost.lastKey() % 60000 > 60000 - collectionPeriodMillis)) { + Long lastMinute = valuesToPost.lastKey() / 60000; + while (!valuesToPost.isEmpty() && valuesToPost.lastKey() / 60000 == lastMinute) { + valuesToCache.put(valuesToPost.lastKey(), valuesToPost.get(valuesToPost.lastKey())); + valuesToPost.remove(valuesToPost.lastKey()); + } + } + + if (!valuesToCache.isEmpty()) { + TimelineMetric metricToCache = new TimelineMetric(metric); + metricToCache.setMetricValues(valuesToCache); + metricsPostCache.put(metricToCache.getMetricName(), metricToCache); + } + + if (!valuesToPost.isEmpty()) { + TimelineMetric metricToPost = new TimelineMetric(metric); + metricToPost.setMetricValues(valuesToPost); + allMetricsToPost.addOrMergeTimelineMetric(metricToPost); + } + } + + return allMetricsToPost; + } + + /** * Cleans up and closes an input stream * see http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html * @param is the InputStream to clean up @@ -609,6 +697,11 @@ public abstract class AbstractTimelineMetricsSink { rand.nextInt(zookeeperMaxBackoffTimeMins - zookeeperMinBackoffTimeMins + 1)) * 60*1000l; } + //for now it's used only for testing + protected Cache<String, TimelineMetric> getMetricsPostCache() { + return metricsPostCache; + } + /** * Get a pre-formatted URI for the collector */ http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java index 3dfcf4e..b376048 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java @@ -146,6 +146,9 @@ public class TimelineMetric implements Comparable<TimelineMetric>, Serializable public void addMetricValues(Map<Long, Double> metricValues) { this.metricValues.putAll(metricValues); + if (!this.metricValues.isEmpty()) { + this.setStartTime(this.metricValues.firstKey()); + } } @XmlElement(name = "metadata") http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java new file mode 100644 index 0000000..634d18c --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricSinkTest.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +import junit.framework.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.Arrays; +import java.util.Collection; +import java.util.TreeMap; + +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.expect; +import static org.powermock.api.easymock.PowerMock.expectNew; +import static org.powermock.api.easymock.PowerMock.replayAll; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({AbstractTimelineMetricsSink.class, HttpURLConnection.class}) +public class AbstractTimelineMetricSinkTest { + + @Test + public void testParseHostsStringIntoCollection() { + AbstractTimelineMetricsSink sink = new TestTimelineMetricsSink(); + Collection<String> hosts; + + hosts = sink.parseHostsStringIntoCollection(""); + Assert.assertTrue(hosts.isEmpty()); + + hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local"); + Assert.assertTrue(hosts.size() == 1); + Assert.assertTrue(hosts.contains("test1.123.abc.def.local")); + + hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local "); + Assert.assertTrue(hosts.size() == 1); + Assert.assertTrue(hosts.contains("test1.123.abc.def.local")); + + hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local,test1.456.abc.def.local"); + Assert.assertTrue(hosts.size() == 2); + + hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local, test1.456.abc.def.local"); + Assert.assertTrue(hosts.size() == 2); + Assert.assertTrue(hosts.contains("test1.123.abc.def.local")); + Assert.assertTrue(hosts.contains("test1.456.abc.def.local")); + } + + @Test + @PrepareForTest({URL.class, OutputStream.class, AbstractTimelineMetricsSink.class, HttpURLConnection.class, TimelineMetric.class}) + public void testEmitMetrics() throws Exception { + HttpURLConnection connection = PowerMock.createNiceMock(HttpURLConnection.class); + URL url = PowerMock.createNiceMock(URL.class); + expectNew(URL.class, anyString()).andReturn(url).anyTimes(); + expect(url.openConnection()).andReturn(connection).anyTimes(); + expect(connection.getResponseCode()).andReturn(200).anyTimes(); + OutputStream os = PowerMock.createNiceMock(OutputStream.class); + expect(connection.getOutputStream()).andReturn(os).anyTimes(); + + + TestTimelineMetricsSink sink = new TestTimelineMetricsSink(); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + long startTime = System.currentTimeMillis() / 60000 * 60000; + + long seconds = 1000; + TreeMap<Long, Double> metricValues = new TreeMap<>(); + /* + + 0 +30s +60s + | | | + (1)(2)(3) (4)(5) (6) m1 + + */ + // (6) should be cached, the rest - posted + + metricValues.put(startTime + 4*seconds, 1.0); + metricValues.put(startTime + 14*seconds, 2.0); + metricValues.put(startTime + 24*seconds, 3.0); + metricValues.put(startTime + 34*seconds, 4.0); + metricValues.put(startTime + 44*seconds, 5.0); + metricValues.put(startTime + 64*seconds, 6.0); + + TimelineMetric timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1"); + timelineMetric.setStartTime(metricValues.firstKey()); + timelineMetric.addMetricValues(metricValues); + + timelineMetrics.addOrMergeTimelineMetric(timelineMetric); + + replayAll(); + sink.emitMetrics(timelineMetrics); + Assert.assertEquals(1, sink.getMetricsPostCache().size()); + metricValues = new TreeMap<>(); + metricValues.put(startTime + 64*seconds, 6.0); + Assert.assertEquals(metricValues, sink.getMetricsPostCache().getIfPresent("metric1").getMetricValues()); + + timelineMetrics = new TimelineMetrics(); + metricValues = new TreeMap<>(); + /* + + +60 +90s +120s +150s +180s + | | | | | + (7) (8) (9) (10) (11) m1 + + */ + // (6) from previous post should be merged with current data + // (6),(7),(8),(9),(10) - should be posted, (11) - cached + metricValues.put(startTime + 74*seconds, 7.0); + metricValues.put(startTime + 94*seconds, 8.0); + metricValues.put(startTime + 124*seconds, 9.0); + metricValues.put(startTime + 154*seconds, 10.0); + metricValues.put(startTime + 184*seconds, 11.0); + + timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1"); + timelineMetric.setStartTime(metricValues.firstKey()); + timelineMetric.addMetricValues(metricValues); + + timelineMetrics.addOrMergeTimelineMetric(timelineMetric); + sink.emitMetrics(timelineMetrics); + + Assert.assertEquals(1, sink.getMetricsPostCache().size()); + metricValues = new TreeMap<>(); + metricValues.put(startTime + 184*seconds, 11.0); + Assert.assertEquals(metricValues, sink.getMetricsPostCache().getIfPresent("metric1").getMetricValues());timelineMetrics = new TimelineMetrics(); + + metricValues = new TreeMap<>(); + /* + + +180s +210s +240s + | | | + (12) (13) + + */ + // (11) from previous post should be merged with current data + // (11),(12),(13) - should be posted, cache should be empty + metricValues.put(startTime + 194*seconds, 12.0); + metricValues.put(startTime + 239*seconds, 13.0); + + timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1"); + timelineMetric.setStartTime(metricValues.firstKey()); + timelineMetric.addMetricValues(metricValues); + + timelineMetrics.addOrMergeTimelineMetric(timelineMetric); + sink.emitMetrics(timelineMetrics); + + Assert.assertEquals(0, sink.getMetricsPostCache().size()); + + metricValues = new TreeMap<>(); + /* + + +240s +270s +300s +330s + | | | | + (14) (15) (16) + + */ + // since postAllCachedMetrics in emitMetrics call is true (14),(15),(16) - should be posted, cache should be empty + metricValues.put(startTime + 245*seconds, 14.0); + metricValues.put(startTime + 294*seconds, 15.0); + metricValues.put(startTime + 315*seconds, 16.0); + + timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1"); + timelineMetric.setStartTime(metricValues.firstKey()); + timelineMetric.addMetricValues(metricValues); + + timelineMetrics.addOrMergeTimelineMetric(timelineMetric); + sink.emitMetrics(timelineMetrics, true); + + Assert.assertEquals(0, sink.getMetricsPostCache().size()); + } + + private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink { + @Override + protected String getCollectorUri(String host) { + return ""; + } + + @Override + protected String getCollectorProtocol() { + return "http"; + } + + @Override + protected String getCollectorPort() { + return "2181"; + } + + @Override + protected int getTimeoutSeconds() { + return 10; + } + + @Override + protected String getZookeeperQuorum() { + return "localhost:2181"; + } + + @Override + protected Collection<String> getConfiguredCollectorHosts() { + return Arrays.asList("localhost"); + } + + @Override + protected String getHostname() { + return "h1"; + } + + @Override + protected boolean isHostInMemoryAggregationEnabled() { + return true; + } + + @Override + protected int getHostInMemoryAggregationPort() { + return 61888; + } + + @Override + protected String getHostInMemoryAggregationProtocol() { + return "http"; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java deleted file mode 100644 index 396d08d..0000000 --- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.metrics2.sink.timeline.availability; - -import junit.framework.Assert; -import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Collection; - -public class AbstractTimelineMetricSinkTest { - - @Test - public void testParseHostsStringIntoCollection() { - AbstractTimelineMetricsSink sink = new TestTimelineMetricsSink(); - Collection<String> hosts; - - hosts = sink.parseHostsStringIntoCollection(""); - Assert.assertTrue(hosts.isEmpty()); - - hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local"); - Assert.assertTrue(hosts.size() == 1); - Assert.assertTrue(hosts.contains("test1.123.abc.def.local")); - - hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local "); - Assert.assertTrue(hosts.size() == 1); - Assert.assertTrue(hosts.contains("test1.123.abc.def.local")); - - hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local,test1.456.abc.def.local"); - Assert.assertTrue(hosts.size() == 2); - - hosts = sink.parseHostsStringIntoCollection("test1.123.abc.def.local, test1.456.abc.def.local"); - Assert.assertTrue(hosts.size() == 2); - Assert.assertTrue(hosts.contains("test1.123.abc.def.local")); - Assert.assertTrue(hosts.contains("test1.456.abc.def.local")); - - } - - private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink { - @Override - protected String getCollectorUri(String host) { - return ""; - } - - @Override - protected String getCollectorProtocol() { - return "http"; - } - - @Override - protected String getCollectorPort() { - return "2181"; - } - - @Override - protected int getTimeoutSeconds() { - return 10; - } - - @Override - protected String getZookeeperQuorum() { - return "localhost:2181"; - } - - @Override - protected Collection<String> getConfiguredCollectorHosts() { - return Arrays.asList("localhost"); - } - - @Override - protected String getHostname() { - return "h1"; - } - - @Override - protected boolean isHostInMemoryAggregationEnabled() { - return true; - } - - @Override - protected int getHostInMemoryAggregationPort() { - return 61888; - } - - @Override - protected String getHostInMemoryAggregationProtocol() { - return "http"; - } - - @Override - public boolean emitMetrics(TimelineMetrics metrics) { - super.init(); - return super.emitMetrics(metrics); - } - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java index f37c2be..f70d8ec 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java @@ -508,7 +508,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple LOG.debug("Closing HadoopTimelineMetricSink. Flushing metrics to collector..."); TimelineMetrics metrics = metricsCache.getAllMetrics(); if (metrics != null) { - emitMetrics(metrics); + emitMetrics(metrics, true); } } }); http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java index 8fde394..b194924 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java @@ -179,7 +179,7 @@ public class HadoopTimelineMetricsSinkTest { createMockBuilder(HadoopTimelineMetricsSink.class) .withConstructor().addMockedMethod("appendPrefix") .addMockedMethod("findLiveCollectorHostsFromKnownCollector") - .addMockedMethod("emitMetrics").createNiceMock(); + .addMockedMethod("emitMetrics", TimelineMetrics.class).createNiceMock(); SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class); expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes(); @@ -310,7 +310,7 @@ public class HadoopTimelineMetricsSinkTest { createMockBuilder(HadoopTimelineMetricsSink.class) .withConstructor().addMockedMethod("appendPrefix") .addMockedMethod("findLiveCollectorHostsFromKnownCollector") - .addMockedMethod("emitMetrics").createNiceMock(); + .addMockedMethod("emitMetrics", TimelineMetrics.class).createNiceMock(); SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class); expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes(); http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py index 34a6787..bd957a0 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py @@ -41,6 +41,7 @@ class ApplicationMetricMap: self.ip_address = ip_address self.lock = RLock() self.app_metric_map = {} + self.cached_metric_map = {} pass def put_metric(self, application_id, metric_id_to_value_map, timestamp): @@ -98,7 +99,7 @@ class ApplicationMetricMap: "appid" : "HOST", "instanceid" : result_instanceid, "starttime" : self.get_start_time(appId, metricId), - "metrics" : metricData + "metrics" : self.align_values_by_minute_mark(appId, metricId, metricData) if clear_once_flattened else metricData } timeline_metrics[ "metrics" ].append( timeline_metric ) pass @@ -114,6 +115,10 @@ class ApplicationMetricMap: def get_start_time(self, app_id, metric_id): with self.lock: + if self.cached_metric_map.has_key(app_id): + if self.cached_metric_map.get(app_id).has_key(metric_id): + metrics = self.cached_metric_map.get(app_id).get(metric_id) + return min(metrics.iterkeys()) if self.app_metric_map.has_key(app_id): if self.app_metric_map.get(app_id).has_key(metric_id): metrics = self.app_metric_map.get(app_id).get(metric_id) @@ -137,3 +142,48 @@ class ApplicationMetricMap: with self.lock: self.app_metric_map.clear() pass + + # Align metrics by the minutes so that only complete minutes are send. + # Not completed minutes data points will be cached and posted when the minute will be completed. + # Cached metrics are merged with currently posting metrics + # e.g: + # first iteration if metrics from 00m15s to 01m15s are processed, + # then metrics from 00m15s to 00m59s will be posted + # and from 01m00s to 01m15s will be cached + # second iteration metrics from 01m25s to 02m55s are processed, + # cached metrics from previous call will be merged with current, + # metrics from 01m00s to 02m55s will be posted, cache will be empty + def align_values_by_minute_mark(self, appId, metricId, metricData): + with self.lock: + # append with cached values + if self.cached_metric_map.get(appId) and self.cached_metric_map.get(appId).get(metricId): + metricData.update(self.cached_metric_map[appId][metricId]) + self.cached_metric_map[appId].pop(metricId) + + # check if needs to be cached + # in case there can't be any more datapoints in last minute just post the metrics, + # otherwise need to cut off and cache the last uncompleted minute + max_time = max(metricData.iterkeys()) + if max_time % 60000 <= 60000 - 10000: + max_minute = max_time / 60000 + metric_data_copy = metricData.copy() + for time,value in metric_data_copy.iteritems(): + if time / 60000 == max_minute: + cached_metric_map = self.cached_metric_map.get(appId) + if not cached_metric_map: + cached_metric_map = { metricId : { time : value } } + self.cached_metric_map[ appId ] = cached_metric_map + else: + cached_metric_id_map = cached_metric_map.get(metricId) + if not cached_metric_id_map: + cached_metric_id_map = { time : value } + cached_metric_map[ metricId ] = cached_metric_id_map + else: + cached_metric_map[ metricId ].update( { time : value } ) + pass + pass + metricData.pop(time) + pass + pass + + return metricData \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py index a956a78..d9ea55d 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestApplicationMetricMap.py @@ -50,7 +50,7 @@ class TestApplicationMetricMap(TestCase): self.assertEqual(p['metrics'][0]['metrics'][str(timestamp)], 'bv') self.assertEqual(application_metric_map.get_start_time(application_id, "b"), timestamp) - + metrics = {} metrics.update({"b" : 'bv'}) metrics.update({"a" : 'av'}) @@ -71,4 +71,38 @@ class TestApplicationMetricMap(TestCase): json_data = json.loads(application_metric_map.flatten('A1', True)) self.assertEqual(len(json_data['metrics']), 1) self.assertTrue(json_data['metrics'][0]['metricname'] == 'a') - self.assertFalse(application_metric_map.app_metric_map) \ No newline at end of file + self.assertFalse(application_metric_map.app_metric_map) + + def test_flatten_and_align_values_by_minute_mark(self): + application_metric_map = ApplicationMetricMap("host", "10.10.10.10") + second = 1000 + timestamp = int(round(1415390640.3806491 * second)) + application_id = application_metric_map.format_app_id("A","1") + metrics = {} + metrics.update({"b" : 'bv'}) + + # 0s 60s 120s + # (0) (1) (2) (3) + # (3) should be cut off and cached + application_metric_map.put_metric(application_id, metrics, timestamp) + application_metric_map.put_metric(application_id, metrics, timestamp + second*24) + application_metric_map.put_metric(application_id, metrics, timestamp + second*84) + application_metric_map.put_metric(application_id, metrics, timestamp + second*124) + + json_data = json.loads(application_metric_map.flatten(application_id, True)) + self.assertEqual(len(json_data['metrics'][0]['metrics']), 3) + self.assertEqual(len(application_metric_map.cached_metric_map.get(application_id).get("b")), 1) + self.assertEqual(application_metric_map.cached_metric_map.get(application_id).get("b"), {timestamp + second*124 : 'bv'}) + + # 120s 180s + # (3) (4) + # cached (3) should be added to the post; + # (4) should be posted as well because there can't be more data points in the minute + application_metric_map.put_metric(application_id, metrics, timestamp + second * 176) + + json_data = json.loads(application_metric_map.flatten(application_id, True)) + self.assertEqual(len(json_data['metrics'][0]['metrics']), 2) + + # starttime should be set to (3) + self.assertEqual(json_data['metrics'][0]['starttime'], timestamp + second*124) + self.assertEqual(len(application_metric_map.cached_metric_map.get(application_id)), 0) http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java index d0e385b..026eaf5 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java @@ -121,9 +121,6 @@ public class TimelineMetricConfiguration { public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL = "timeline.metrics.cluster.aggregator.second.timeslice.interval"; - public static final String CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL = - "timeline.metrics.cluster.cache.aggregator.second.timeslice.interval"; - public static final String AGGREGATOR_CHECKPOINT_DELAY = "timeline.metrics.service.checkpointDelay"; http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java index aeaa4ba..6441c9c 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCache.java @@ -50,7 +50,6 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -60,12 +59,11 @@ import java.util.concurrent.locks.Lock; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_SINK_COLLECTION_PERIOD; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis; @@ -77,7 +75,6 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach LogFactory.getLog(TimelineMetricsIgniteCache.class); private IgniteCache<TimelineClusterMetric, MetricClusterAggregate> igniteCache; private long cacheSliceIntervalMillis; - private int collectionPeriodMillis; private boolean interpolationEnabled; private List<String> skipAggrPatternStrings = new ArrayList<>(); private List<String> appIdsToAggregate; @@ -110,8 +107,7 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach //aggregation parameters appIdsToAggregate = timelineMetricConfiguration.getAppIdsForHostAggregation(); interpolationEnabled = Boolean.parseBoolean(metricConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true")); - collectionPeriodMillis = (int) SECONDS.toMillis(metricConf.getInt(TIMELINE_METRICS_SINK_COLLECTION_PERIOD, 10)); - cacheSliceIntervalMillis = SECONDS.toMillis(metricConf.getInt(CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL, 30)); + cacheSliceIntervalMillis = SECONDS.toMillis(metricConf.getInt(CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30)); Long aggregationInterval = metricConf.getLong(CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120L); String filteredMetricPatterns = metricConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS); @@ -215,12 +211,6 @@ public class TimelineMetricsIgniteCache implements TimelineMetricDistributedCach if (slicedClusterMetrics != null) { for (Map.Entry<TimelineClusterMetric, Double> metricDoubleEntry : slicedClusterMetrics.entrySet()) { - if (metricDoubleEntry.getKey().getTimestamp() == timeSlices.get(timeSlices.size()-1)[1] && metricDoubleEntry.getKey().getTimestamp() - metric.getMetricValues().lastKey() > collectionPeriodMillis) { - if(LOG.isDebugEnabled()) { - LOG.debug("Last skipped timestamp @ " + new Date(metric.getMetricValues().lastKey()) + " slice timestamp @ " + new Date(metricDoubleEntry.getKey().getTimestamp())); - } - continue; - } MetricClusterAggregate newMetricClusterAggregate = new MetricClusterAggregate( metricDoubleEntry.getValue(), 1, null, metricDoubleEntry.getValue(), metricDoubleEntry.getValue()); //put app metric into cache http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java index b12cb86..b8338fb 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AggregatorUtils.java @@ -223,7 +223,7 @@ public class AggregatorUtils { */ public static Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) { for (Long[] timeSlice : timeSlices) { - if (timestamp > timeSlice[0] && timestamp <= timeSlice[1]) { + if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) { return timeSlice[1]; } } http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java index c27d712..9e493ea 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java @@ -41,7 +41,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_DISABLED; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_DAILY_DISABLED; @@ -273,9 +272,6 @@ public class TimelineMetricAggregatorFactory { long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt (CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30)); - long cacheTimeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt - (CLUSTER_CACHE_AGGREGATOR_TIMESLICE_INTERVAL, 30)); - int checkpointCutOffMultiplier = metricsConf.getInt(CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER, 2); @@ -297,8 +293,7 @@ public class TimelineMetricAggregatorFactory { 120000l, timeSliceIntervalMillis, haController, - distributedCache, - cacheTimeSliceIntervalMillis + distributedCache ); } http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java index 0c030b6..888044a 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSource.java @@ -31,19 +31,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getSliceTimeForMetric; import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getTimeSlices; public class TimelineMetricClusterAggregatorSecondWithCacheSource extends TimelineMetricClusterAggregatorSecond { private TimelineMetricDistributedCache distributedCache; - private Long cacheTimeSliceIntervalMillis; public TimelineMetricClusterAggregatorSecondWithCacheSource(AggregationTaskRunner.AGGREGATOR_NAME metricAggregateSecond, TimelineMetricMetadataManager metricMetadataManager, PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf, String checkpointLocation, long sleepIntervalMillis, int checkpointCutOffMultiplier, String aggregatorDisabledParam, String inputTableName, String outputTableName, Long nativeTimeRangeDelay, Long timeSliceInterval, - MetricCollectorHAController haController, TimelineMetricDistributedCache distributedCache, Long cacheTimeSliceIntervalMillis) { + MetricCollectorHAController haController, TimelineMetricDistributedCache distributedCache) { super(metricAggregateSecond, metricMetadataManager, hBaseAccessor, metricsConf, checkpointLocation, sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam, inputTableName, outputTableName, nativeTimeRangeDelay, timeSliceInterval, haController); this.distributedCache = distributedCache; - this.cacheTimeSliceIntervalMillis = cacheTimeSliceIntervalMillis; } @Override @@ -81,36 +78,11 @@ public class TimelineMetricClusterAggregatorSecondWithCacheSource extends Timeli //Slices in cache could be different from aggregate slices, so need to recalculate. Counts hosted apps Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMetricsFromMetricClusterAggregates(Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache, List<Long[]> timeSlices) { - Map<TimelineClusterMetric, MetricClusterAggregate> result = new HashMap<>(); - - //normalize if slices in cache are different from the aggregation slices - //TODO add basic interpolation, current implementation assumes that cacheTimeSliceIntervalMillis <= timeSliceIntervalMillis - if (cacheTimeSliceIntervalMillis.equals(timeSliceIntervalMillis)) { - result = metricsFromCache; - } else { - for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> clusterMetricAggregateEntry : metricsFromCache.entrySet()) { - Long timestamp = getSliceTimeForMetric(timeSlices, clusterMetricAggregateEntry.getKey().getTimestamp()); - if (timestamp <= 0) { - LOG.warn("Entry doesn't match any slice. Slices : " + timeSlices + " metric timestamp : " + clusterMetricAggregateEntry.getKey().getTimestamp()); - continue; - } - TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(clusterMetricAggregateEntry.getKey().getMetricName(), clusterMetricAggregateEntry.getKey().getAppId(), clusterMetricAggregateEntry.getKey().getInstanceId(), timestamp); - if (result.containsKey(timelineClusterMetric)) { - MetricClusterAggregate metricClusterAggregate = result.get(timelineClusterMetric); - metricClusterAggregate.updateMax(clusterMetricAggregateEntry.getValue().getMax()); - metricClusterAggregate.updateMin(clusterMetricAggregateEntry.getValue().getMin()); - metricClusterAggregate.setSum((metricClusterAggregate.getSum() + clusterMetricAggregateEntry.getValue().getSum()) / 2D); - metricClusterAggregate.setNumberOfHosts(Math.max(metricClusterAggregate.getNumberOfHosts(), clusterMetricAggregateEntry.getValue().getNumberOfHosts())); - } else { - result.put(timelineClusterMetric, clusterMetricAggregateEntry.getValue()); - } - } - } - + //TODO add basic interpolation //TODO investigate if needed, maybe add config to disable/enable //count hosted apps Map<String, MutableInt> hostedAppCounter = new HashMap<>(); - for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> clusterMetricAggregateEntry : result.entrySet()) { + for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> clusterMetricAggregateEntry : metricsFromCache.entrySet()) { int numHosts = clusterMetricAggregateEntry.getValue().getNumberOfHosts(); String appId = clusterMetricAggregateEntry.getKey().getAppId(); if (!hostedAppCounter.containsKey(appId)) { @@ -124,9 +96,9 @@ public class TimelineMetricClusterAggregatorSecondWithCacheSource extends Timeli } // Add liveHosts per AppId metrics. - processLiveAppCountMetrics(result, hostedAppCounter, timeSlices.get(timeSlices.size() - 1)[1]); + processLiveAppCountMetrics(metricsFromCache, hostedAppCounter, timeSlices.get(timeSlices.size() - 1)[1]); - return result; + return metricsFromCache; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java index d3c6061..2cb66ba 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java @@ -167,62 +167,6 @@ public class TimelineMetricsIgniteCacheTest { metricValues.clear(); timelineMetrics.clear(); - /* - - 0 +30s +60s +90s - | | | | - (1) (2) h1 - (3) (4) h2 - (5) (6) h1 - - */ - // Case 3 : merging host data points, ignore (2) for h1 as it will conflict with (5), two hosts. - metricValues = new TreeMap<>(); - metricValues.put(startTime + 15*seconds, 1.0); - metricValues.put(startTime + 45*seconds, 2.0); - timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1"); - timelineMetric.setMetricValues(metricValues); - timelineMetrics.add(timelineMetric); - - metricValues = new TreeMap<>(); - metricValues.put(startTime + 45*seconds, 3.0); - metricValues.put(startTime + 85*seconds, 4.0); - timelineMetric = new TimelineMetric("metric1", "host2", "app1", "instance1"); - timelineMetric.setMetricValues(metricValues); - timelineMetrics.add(timelineMetric); - - metricValues = new TreeMap<>(); - metricValues.put(startTime + 55*seconds, 5.0); - metricValues.put(startTime + 85*seconds, 6.0); - timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1"); - timelineMetric.setMetricValues(metricValues); - timelineMetrics.add(timelineMetric); - - timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock); - - aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds); - - Assert.assertEquals(aggregateMap.size(), 3); - timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), - timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds); - - Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); - Assert.assertEquals(1.0, aggregateMap.get(timelineClusterMetric).getSum()); - Assert.assertEquals(1, aggregateMap.get(timelineClusterMetric).getNumberOfHosts()); - - timelineClusterMetric.setTimestamp(startTime + 2*30*seconds); - Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); - Assert.assertEquals(8.0, aggregateMap.get(timelineClusterMetric).getSum()); - Assert.assertEquals(2, aggregateMap.get(timelineClusterMetric).getNumberOfHosts()); - - timelineClusterMetric.setTimestamp(startTime + 3*30*seconds); - Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); - Assert.assertEquals(10.0, aggregateMap.get(timelineClusterMetric).getSum()); - Assert.assertEquals(2, aggregateMap.get(timelineClusterMetric).getNumberOfHosts()); - - metricValues.clear(); - timelineMetrics.clear(); - Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize")); } http://git-wip-us.apache.org/repos/asf/ambari/blob/e196358c/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java index 7cddb00..e8a9dc2 100644 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java @@ -79,7 +79,7 @@ public class TimelineMetricClusterAggregatorSecondWithCacheSourceTest { TimelineMetricClusterAggregatorSecondWithCacheSource secondAggregator = new TimelineMetricClusterAggregatorSecondWithCacheSource( METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null, aggregatorInterval, 2, "false", "", "", aggregatorInterval, - sliceInterval, null, timelineMetricsIgniteCache, 30L); + sliceInterval, null, timelineMetricsIgniteCache); long now = System.currentTimeMillis(); long startTime = now - 120000; @@ -112,67 +112,4 @@ public class TimelineMetricClusterAggregatorSecondWithCacheSourceTest { Assert.assertEquals(2d, a1.getSum()); Assert.assertEquals(5d, a2.getSum()); } - - @Test - public void testSlicesRecalculation() throws Exception { - long aggregatorInterval = 120000; - long sliceInterval = 30000; - - Configuration configuration = new Configuration(); - - TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class); - expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes(); - replay(metricMetadataManagerMock); - - TimelineMetricClusterAggregatorSecondWithCacheSource secondAggregator = new TimelineMetricClusterAggregatorSecondWithCacheSource( - METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null, - aggregatorInterval, 2, "false", "", "", aggregatorInterval, - sliceInterval, null, timelineMetricsIgniteCache, 30L); - - long seconds = 1000; - long now = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), 120*seconds); - long startTime = now - 120*seconds; - - Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = new HashMap<>(); - metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 5 * seconds), - new MetricClusterAggregate(1.0, 2, 1.0, 1.0, 1.0)); - metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 25 * seconds), - new MetricClusterAggregate(2.0, 2, 1.0, 2.0, 2.0)); - metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 45 * seconds), - new MetricClusterAggregate(3.0, 2, 1.0, 1.0, 1.0)); - metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 65 * seconds), - new MetricClusterAggregate(4.0, 2, 1.0, 4.0, 4.0)); - metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 85 * seconds), - new MetricClusterAggregate(5.0, 2, 1.0, 5.0, 5.0)); - - List<Long[]> timeslices = getTimeSlices(startTime, startTime + 120*seconds, 30*seconds); - - Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = secondAggregator.aggregateMetricsFromMetricClusterAggregates(metricsFromCache, timeslices); - - Assert.assertNotNull(aggregates); - Assert.assertEquals(4, aggregates.size()); - - TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric("m1", "a1", "i1", startTime + 30*seconds); - MetricClusterAggregate metricClusterAggregate = aggregates.get(timelineClusterMetric); - Assert.assertNotNull(metricClusterAggregate); - Assert.assertEquals(1.5, metricClusterAggregate.getSum()); - Assert.assertEquals(1d, metricClusterAggregate.getMin()); - Assert.assertEquals(2d, metricClusterAggregate.getMax()); - Assert.assertEquals(2, metricClusterAggregate.getNumberOfHosts()); - - timelineClusterMetric.setTimestamp(startTime + 60*seconds); - metricClusterAggregate = aggregates.get(timelineClusterMetric); - Assert.assertNotNull(metricClusterAggregate); - Assert.assertEquals(3d, metricClusterAggregate.getSum()); - - timelineClusterMetric.setTimestamp(startTime + 90*seconds); - metricClusterAggregate = aggregates.get(timelineClusterMetric); - Assert.assertNotNull(metricClusterAggregate); - Assert.assertEquals(4.5d, metricClusterAggregate.getSum()); - - timelineClusterMetric = new TimelineClusterMetric("live_hosts", "a1", null, startTime + 120*seconds); - metricClusterAggregate = aggregates.get(timelineClusterMetric); - Assert.assertNotNull(metricClusterAggregate); - Assert.assertEquals(2d, metricClusterAggregate.getSum()); - } }