http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java new file mode 100644 index 0000000..de0236c --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcherTest.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import org.apache.hadoop.metrics2.sink.timeline.Precision; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.easymock.EasyMock; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.List; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.anyString; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.eq; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.powermock.api.easymock.PowerMock.verifyAll; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(ExitUtil.class) +public class TimelineMetricStoreWatcherTest { + + @Test + public void testRunPositive() throws Exception { + TimelineMetricStore metricStore = createNiceMock(TimelineMetricStore.class); + + expect(metricStore.putMetrics(anyObject(TimelineMetrics.class))) + .andReturn(new TimelinePutResponse()); + + // metric found + expect(metricStore.getTimelineMetrics(EasyMock.<List<String>>anyObject(), + EasyMock.<List<String>>anyObject(), anyObject(String.class), + anyObject(String.class), anyObject(Long.class), anyObject(Long.class), + eq(Precision.SECONDS), eq(1), eq(true), anyObject(TopNConfig.class), anyString())) + .andReturn(null).anyTimes(); + + mockStatic(ExitUtil.class); + + replay(metricStore); + + TimelineMetricStoreWatcher timelineMetricStoreWatcher = + new TimelineMetricStoreWatcher(metricStore, TimelineMetricConfiguration.getInstance()); + timelineMetricStoreWatcher.run(); + timelineMetricStoreWatcher.run(); + timelineMetricStoreWatcher.run(); + verify(metricStore); + + } + + @Test + public void testRunNegative() throws Exception { + TimelineMetricStore metricStore = createNiceMock(TimelineMetricStore.class); + + expect(metricStore.putMetrics(anyObject(TimelineMetrics.class))) + .andReturn(new TimelinePutResponse()); + + // no metrics found + expect(metricStore.getTimelineMetrics(EasyMock.<List<String>>anyObject(), + EasyMock.<List<String>>anyObject(), anyObject(String.class), + anyObject(String.class), anyObject(Long.class), anyObject(Long.class), + eq(Precision.SECONDS), eq(1), eq(true), anyObject(TopNConfig.class), anyString())) + .andReturn(null).anyTimes(); + + String msg = "Error getting metrics from TimelineMetricStore. " + + "Shutting down by TimelineMetricStoreWatcher."; + mockStatic(ExitUtil.class); + ExitUtil.terminate(-1, msg); + expectLastCall().anyTimes(); + + replayAll(); + + TimelineMetricStoreWatcher timelineMetricStoreWatcher = + new TimelineMetricStoreWatcher(metricStore, TimelineMetricConfiguration.getInstance()); + timelineMetricStoreWatcher.run(); + timelineMetricStoreWatcher.run(); + timelineMetricStoreWatcher.run(); + + verifyAll(); + + } + +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsAggregatorMemorySink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsAggregatorMemorySink.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsAggregatorMemorySink.java new file mode 100644 index 0000000..11a01d9 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsAggregatorMemorySink.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; +import org.apache.hadoop.metrics2.sink.timeline.Precision; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; + +/** + * Aggregator Memory sink implementation to perform test + */ +public class TimelineMetricsAggregatorMemorySink + implements TimelineMetricsAggregatorSink { + + private static Map<Precision, Map<TimelineMetric, MetricHostAggregate>> hostAggregateRecords = + new HashMap<>(); + private static Map<Precision, Map<TimelineClusterMetric, MetricHostAggregate>> clusterTimeAggregateRecords = + new HashMap<>(); + private static Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateRecords = + new HashMap<>(); + + @Override + public void saveHostAggregateRecords( + Map<TimelineMetric, MetricHostAggregate> hostAggregateMap, + Precision precision) { + if (hostAggregateMap == null || hostAggregateMap.size() == 0) { + return; + } + + Map<TimelineMetric, MetricHostAggregate> aggregatedValue = null; + if (hostAggregateRecords.containsKey(precision)) { + aggregatedValue = hostAggregateRecords.get(precision); + } else { + aggregatedValue = new HashMap<>(); + hostAggregateRecords.put(precision, aggregatedValue); + } + + for (Entry<TimelineMetric, MetricHostAggregate> entry : hostAggregateMap + .entrySet()) { + TimelineMetric timelineMetricClone = new TimelineMetric(entry.getKey()); + MetricHostAggregate hostAggregate = entry.getValue(); + MetricHostAggregate hostAggregateClone = new MetricHostAggregate( + hostAggregate.getSum(), (int) hostAggregate.getNumberOfSamples(), + hostAggregate.getDeviation(), hostAggregate.getMax(), + hostAggregate.getMin()); + aggregatedValue.put(timelineMetricClone, hostAggregateClone); + } + } + + @Override + public void saveClusterTimeAggregateRecords( + Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap, + Precision precision) { + if (clusterTimeAggregateMap == null + || clusterTimeAggregateMap.size() == 0) { + return; + } + + Map<TimelineClusterMetric, MetricHostAggregate> aggregatedValue = null; + if (clusterTimeAggregateRecords.containsKey(precision)) { + aggregatedValue = clusterTimeAggregateRecords.get(precision); + } else { + aggregatedValue = new HashMap<>(); + clusterTimeAggregateRecords.put(precision, aggregatedValue); + } + + for (Entry<TimelineClusterMetric, MetricHostAggregate> entry : clusterTimeAggregateMap + .entrySet()) { + TimelineClusterMetric clusterMetric = entry.getKey(); + TimelineClusterMetric clusterMetricClone = + new TimelineClusterMetric(clusterMetric.getMetricName(), + clusterMetric.getAppId(), clusterMetric.getInstanceId(), + clusterMetric.getTimestamp()); + MetricHostAggregate hostAggregate = entry.getValue(); + MetricHostAggregate hostAggregateClone = new MetricHostAggregate( + hostAggregate.getSum(), (int) hostAggregate.getNumberOfSamples(), + hostAggregate.getDeviation(), hostAggregate.getMax(), + hostAggregate.getMin()); + aggregatedValue.put(clusterMetricClone, hostAggregateClone); + } + } + + @Override + public void saveClusterAggregateRecords( + Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMaps) { + + if (clusterAggregateMaps == null || clusterAggregateMaps.size() == 0) { + return; + } + + for (Entry<TimelineClusterMetric, MetricClusterAggregate> entry : clusterAggregateMaps + .entrySet()) { + TimelineClusterMetric clusterMetric = entry.getKey(); + TimelineClusterMetric clusterMetricClone = + new TimelineClusterMetric(clusterMetric.getMetricName(), + clusterMetric.getAppId(), clusterMetric.getInstanceId(), + clusterMetric.getTimestamp()); + MetricClusterAggregate clusterAggregate = entry.getValue(); + MetricClusterAggregate clusterAggregateClone = new MetricClusterAggregate( + clusterAggregate.getSum(), (int) clusterAggregate.getNumberOfHosts(), + clusterAggregate.getDeviation(), clusterAggregate.getMax(), + clusterAggregate.getMin()); + clusterAggregateRecords.put(clusterMetricClone, clusterAggregateClone); + } + } + + public Map<Precision, Map<TimelineMetric, MetricHostAggregate>> getHostAggregateRecords() { + return Collections.unmodifiableMap(hostAggregateRecords); + } + + public Map<Precision, Map<TimelineClusterMetric, MetricHostAggregate>> getClusterTimeAggregateRecords() { + return Collections.unmodifiableMap(clusterTimeAggregateRecords); + } + + public Map<TimelineClusterMetric, MetricClusterAggregate> getClusterAggregateRecords() { + return Collections.unmodifiableMap(clusterAggregateRecords); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsFilterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsFilterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsFilterTest.java new file mode 100644 index 0000000..d64bf7c --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsFilterTest.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.easymock.EasyMock; +import org.junit.Test; + +import java.net.URL; +import java.util.HashSet; +import java.util.Set; + +public class TimelineMetricsFilterTest { + + @Test + public void testAppBlacklisting() throws Exception{ + + Configuration metricsConf = new Configuration(); + metricsConf.set("timeline.metrics.apps.blacklist", "hbase,datanode,nimbus"); + TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class); + expect(configuration.getMetricsConf()).andReturn(metricsConf).once(); + replay(configuration); + + TimelineMetricsFilter.initializeMetricFilter(configuration); + + TimelineMetric timelineMetric = new TimelineMetric(); + + timelineMetric.setAppId("hbase"); + Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setAppId("namenode"); + Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setAppId("nimbus"); + Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); + } + + @Test + public void testMetricWhitelisting() throws Exception { + + Configuration metricsConf = new Configuration(); + TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class); + expect(configuration.getMetricsConf()).andReturn(metricsConf).once(); + expect(configuration.isWhitelistingEnabled()).andReturn(true).anyTimes(); + replay(configuration); + + URL fileUrl = ClassLoader.getSystemResource("test_data/metric_whitelist.dat"); + + metricsConf.set("timeline.metrics.whitelist.file", fileUrl.getPath()); + TimelineMetricsFilter.initializeMetricFilter(configuration); + + TimelineMetric timelineMetric = new TimelineMetric(); + + timelineMetric.setMetricName("cpu_system"); + Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setMetricName("cpu_system1"); + Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setMetricName("jvm.JvmMetrics.MemHeapUsedM"); + Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setMetricName("dfs.FSNamesystem.TotalFiles"); + Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); + } + + @Test + public void testTogether() throws Exception { + + Configuration metricsConf = new Configuration(); + metricsConf.set("timeline.metrics.apps.blacklist", "hbase,datanode,nimbus"); + TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class); + expect(configuration.getMetricsConf()).andReturn(metricsConf).once(); + replay(configuration); + + URL fileUrl = ClassLoader.getSystemResource("test_data/metric_whitelist.dat"); + metricsConf.set("timeline.metrics.whitelist.file", fileUrl.getPath()); + + TimelineMetricsFilter.initializeMetricFilter(configuration); + + TimelineMetric timelineMetric = new TimelineMetric(); + + timelineMetric.setMetricName("cpu_system"); + timelineMetric.setAppId("hbase"); + Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setMetricName("cpu_system"); + timelineMetric.setAppId("HOST"); + Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setMetricName("jvm.JvmMetrics.MemHeapUsedM"); + Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setMetricName("dfs.FSNamesystem.TotalFiles"); + Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); + } + + @Test + public void testAmshbaseWhitelisting() throws Exception { + + TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class); + + Configuration metricsConf = new Configuration(); + expect(configuration.getMetricsConf()).andReturn(metricsConf).once(); + + Set<String> whitelist = new HashSet(); + whitelist.add("regionserver.Server.Delete_99th_percentile"); + whitelist.add("regionserver.Server.Delete_max"); + whitelist.add("regionserver.Server.Delete_mean"); + expect(configuration.getAmshbaseWhitelist()).andReturn(whitelist).once(); + + replay(configuration); + + TimelineMetricsFilter.initializeMetricFilter(configuration); + + TimelineMetric timelineMetric = new TimelineMetric(); + + timelineMetric.setMetricName("regionserver.Server.Delete_max"); + timelineMetric.setAppId("ams-hbase"); + Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setMetricName("regionserver.Server.Delete_min3333"); + timelineMetric.setAppId("ams-hbase"); + Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setMetricName("jvm.JvmMetrics.MemHeapUsedM"); + timelineMetric.setAppId("hbase"); + Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); + } + + @Test + public void testHybridFilter() throws Exception { + + // Whitelist Apps - namenode, nimbus + // Blacklist Apps - datanode, kafka_broker + // Accept ams-hbase whitelisting. + // Reject non whitelisted metrics from non whitelisted Apps (Say hbase) + + TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class); + + Configuration metricsConf = new Configuration(); + metricsConf.set("timeline.metrics.apps.whitelist", "namenode,nimbus"); + metricsConf.set("timeline.metrics.apps.blacklist", "datanode,kafka_broker"); + URL fileUrl = ClassLoader.getSystemResource("test_data/metric_whitelist.dat"); + metricsConf.set("timeline.metrics.whitelist.file", fileUrl.getPath()); + expect(configuration.getMetricsConf()).andReturn(metricsConf).once(); + + Set<String> whitelist = new HashSet(); + whitelist.add("regionserver.Server.Delete_99th_percentile"); + whitelist.add("regionserver.Server.Delete_max"); + whitelist.add("regionserver.Server.Delete_mean"); + expect(configuration.getAmshbaseWhitelist()).andReturn(whitelist).once(); + + expect(configuration.isWhitelistingEnabled()).andReturn(true).anyTimes(); + + replay(configuration); + + TimelineMetricsFilter.initializeMetricFilter(configuration); + + TimelineMetric timelineMetric = new TimelineMetric(); + + //Test App Whitelisting + timelineMetric.setMetricName("metric.a.b.c"); + timelineMetric.setAppId("namenode"); + Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setMetricName("metric.d.e.f"); + timelineMetric.setAppId("nimbus"); + Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + //Test App Blacklisting + timelineMetric.setMetricName("metric.d.e.f"); + timelineMetric.setAppId("datanode"); + Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setMetricName("metric.d.e.f"); + timelineMetric.setAppId("kafka_broker"); + Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + + //Test ams-hbase Whitelisting + timelineMetric.setMetricName("regionserver.Server.Delete_max"); + timelineMetric.setAppId("ams-hbase"); + Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setMetricName("regionserver.Server.Delete_min3333"); + timelineMetric.setAppId("ams-hbase"); + Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setMetricName("regionserver.Server.Delete_mean"); + timelineMetric.setAppId("ams-hbase"); + Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + //Test Metric Whitelisting + timelineMetric.setMetricName("regionserver.WAL.SyncTime_max"); + timelineMetric.setAppId("hbase"); + Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); + + timelineMetric.setMetricName("regionserver.WAL.metric.not.needed"); + timelineMetric.setAppId("hbase"); + Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCacheTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCacheTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCacheTest.java new file mode 100644 index 0000000..5b47545 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCacheTest.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.core.timeline; + + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.HOST_APP_ID; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES; +import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replayAll; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(TimelineMetricConfiguration.class) + +@PowerMockIgnore("javax.management.*") +public class TimelineMetricsIgniteCacheTest { + private static TimelineMetricsIgniteCache timelineMetricsIgniteCache; + @BeforeClass + public static void setupConf() throws Exception { + TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new + Configuration(), new Configuration()); + mockStatic(TimelineMetricConfiguration.class); + expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes(); + conf.getMetricsConf().set(CLUSTER_AGGREGATOR_APP_IDS, "appIdForHostsAggr"); + conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost"); + replayAll(); + + timelineMetricsIgniteCache = new TimelineMetricsIgniteCache(); + } + + @Test + public void putEvictMetricsFromCacheSlicesMerging() throws Exception { + long cacheSliceIntervalMillis = 30000L; + + TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class); + expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once(); + replay(metricMetadataManagerMock); + + long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis); + + long seconds = 1000; + TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + /* + + 0 +30s +60s + | | | + (1)(2)(3) (4)(5)(6) h1 + + */ + // Case 1 : data points are distributed equally, no values are lost, single host. + metricValues.put(startTime + 4*seconds, 1.0); + metricValues.put(startTime + 14*seconds, 2.0); + metricValues.put(startTime + 24*seconds, 3.0); + metricValues.put(startTime + 34*seconds, 4.0); + metricValues.put(startTime + 44*seconds, 5.0); + metricValues.put(startTime + 54*seconds, 6.0); + + TimelineMetric timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1"); + timelineMetric.setStartTime(metricValues.firstKey()); + timelineMetric.addMetricValues(metricValues); + + Collection<TimelineMetric> timelineMetrics = new ArrayList<>(); + timelineMetrics.add(timelineMetric); + timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock); + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds); + + Assert.assertEquals(aggregateMap.size(), 2); + TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), + timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds); + + Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); + Assert.assertEquals(2.0, aggregateMap.get(timelineClusterMetric).getSum()); + + timelineClusterMetric.setTimestamp(startTime + 2*30*seconds); + Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); + Assert.assertEquals(5.0, aggregateMap.get(timelineClusterMetric).getSum()); + + metricValues.clear(); + timelineMetrics.clear(); + + /* + + 0 +30s +60s + | | | + (1)(2)(3) (4)(5)(6) h1, h2 + + */ + // Case 2 : data points are distributed equally, no values are lost, two hosts. + metricValues.put(startTime + 4*seconds, 1.0); + metricValues.put(startTime + 14*seconds, 2.0); + metricValues.put(startTime + 24*seconds, 3.0); + metricValues.put(startTime + 34*seconds, 4.0); + metricValues.put(startTime + 44*seconds, 5.0); + metricValues.put(startTime + 54*seconds, 6.0); + + timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1"); + timelineMetric.setMetricValues(metricValues); + + metricValues = new TreeMap<>(); + metricValues.put(startTime + 5*seconds, 2.0); + metricValues.put(startTime + 15*seconds, 4.0); + metricValues.put(startTime + 25*seconds, 6.0); + metricValues.put(startTime + 35*seconds, 8.0); + metricValues.put(startTime + 45*seconds, 10.0); + metricValues.put(startTime + 55*seconds, 12.0); + TimelineMetric timelineMetric2 = new TimelineMetric("metric1", "host2", "app1", "instance1"); + timelineMetric2.setMetricValues(metricValues); + + timelineMetrics = new ArrayList<>(); + timelineMetrics.add(timelineMetric); + timelineMetrics.add(timelineMetric2); + timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock); + aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds); + + Assert.assertEquals(aggregateMap.size(), 2); + timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), + timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds); + + Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); + Assert.assertEquals(6.0, aggregateMap.get(timelineClusterMetric).getSum()); + + timelineClusterMetric.setTimestamp(startTime + 2*30*seconds); + Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); + Assert.assertEquals(15.0, aggregateMap.get(timelineClusterMetric).getSum()); + + metricValues.clear(); + timelineMetrics.clear(); + + Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize")); + } + + @Test + public void updateAppAggregatesFromHostMetricTest() { + //make sure hosts metrics are aggregated for appIds from "timeline.metrics.service.cluster.aggregator.appIds" + + long cacheSliceIntervalMillis = 30000L; + + TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class); + expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once(); + expect(metricMetadataManagerMock.getHostedAppsCache()).andReturn(new HashMap<>()).anyTimes(); + replay(metricMetadataManagerMock); + + long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis); + + long seconds = 1000; + + TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + List<TimelineMetric> timelineMetrics = new ArrayList<>(); + TimelineMetric timelineMetric; + + metricValues = new TreeMap<>(); + metricValues.put(startTime + 15*seconds, 1.0); + metricValues.put(startTime + 55*seconds, 2.0); + timelineMetric = new TimelineMetric("host_metric", "host1", HOST_APP_ID, "instance1"); + timelineMetric.setMetricValues(metricValues); + timelineMetrics.add(timelineMetric); + + metricValues = new TreeMap<>(); + metricValues.put(startTime + 45*seconds, 3.0); + metricValues.put(startTime + 85*seconds, 4.0); + timelineMetric = new TimelineMetric("app_metric", "host1", "appIdForHostsAggr", "instance1"); + timelineMetric.setMetricValues(metricValues); + timelineMetrics.add(timelineMetric); + + metricValues = new TreeMap<>(); + metricValues.put(startTime + 85*seconds, 5.0); + timelineMetric = new TimelineMetric("host_metric", "host1", HOST_APP_ID, "instance1"); + timelineMetric.setMetricValues(metricValues); + timelineMetrics.add(timelineMetric); + + metricValues = new TreeMap<>(); + metricValues.put(startTime + 85*seconds, 6.0); + timelineMetric = new TimelineMetric("host_metric", "host2", HOST_APP_ID, "instance1"); + timelineMetric.setMetricValues(metricValues); + timelineMetrics.add(timelineMetric); + + timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock); + + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds); + Assert.assertEquals(aggregateMap.size(), 6); + TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), + timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 90*seconds); + + Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); + Assert.assertEquals(11.0, aggregateMap.get(timelineClusterMetric).getSum()); + + timelineClusterMetric = new TimelineClusterMetric("app_metric", + "appIdForHostsAggr", "instance1", startTime + 90*seconds); + Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); + Assert.assertEquals(4.0, aggregateMap.get(timelineClusterMetric).getSum()); + + timelineClusterMetric = new TimelineClusterMetric("host_metric", + "appIdForHostsAggr", "instance1", startTime + 90*seconds); + Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); + Assert.assertEquals(5.0, aggregateMap.get(timelineClusterMetric).getSum()); + + Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize")); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TopNConditionTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TopNConditionTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TopNConditionTest.java new file mode 100644 index 0000000..5f1c470 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TopNConditionTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.core.timeline; + +import junit.framework.Assert; +import org.apache.ambari.metrics.core.timeline.query.TopNCondition; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class TopNConditionTest { + + + @Test + public void testTopNCondition() { + + List<String> metricNames = new ArrayList<>(); + List<String> hostnames = new ArrayList<>(); + + //Valid Cases + + // "H" hosts and 1 Metric + hostnames.add("h1"); + hostnames.add("h2"); + metricNames.add("m1"); + Assert.assertTrue(TopNCondition.isTopNHostCondition(metricNames, hostnames)); + Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); + hostnames.clear(); + + // Host(s) with wildcard & 1 Metric + hostnames.add("h%"); + hostnames.add("g1"); + Assert.assertTrue(TopNCondition.isTopNHostCondition(metricNames, hostnames)); + Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); + hostnames.clear(); + + // M Metrics and No host + metricNames.add("m2"); + metricNames.add("m3"); + Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames)); + Assert.assertTrue(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); + + // M Metrics with wildcard and No host + metricNames.add("m2"); + metricNames.add("m%"); + Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames)); + Assert.assertTrue(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); + + // M Metrics with wildcard and 1 host + metricNames.add("m2"); + metricNames.add("m%"); + hostnames.add("h1"); + Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames)); + Assert.assertTrue(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); + + metricNames.clear(); + hostnames.clear(); + + //Invalid Cases + // M metrics and H hosts + metricNames.add("m1"); + metricNames.add("m2"); + hostnames.add("h1"); + hostnames.add("h2"); + Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames)); + Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); + + metricNames.clear(); + hostnames.clear(); + + // Wildcard in 1 and multiple in other + metricNames.add("m1"); + metricNames.add("m2"); + hostnames.add("%"); + Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames)); + Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); + + metricNames.clear(); + hostnames.clear(); + + //Wildcard in both + metricNames.add("m%"); + hostnames.add("%"); + Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames)); + Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); + + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregatorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregatorTest.java new file mode 100644 index 0000000..d67ec7f --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregatorTest.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + +import org.apache.hadoop.conf.Configuration; +import org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner; +import org.apache.ambari.metrics.core.timeline.query.Condition; +import org.junit.Before; +import org.junit.Test; +import java.io.IOException; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.concurrent.atomic.AtomicLong; + +import static junit.framework.Assert.assertEquals; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE; +import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedAggregateTimeMillis; +import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis; + +public class AbstractTimelineAggregatorTest { + + private AbstractTimelineAggregator agg; + + AtomicLong startTimeInDoWork; + AtomicLong endTimeInDoWork; + AtomicLong checkPoint; + int actualRuns; + + long sleepIntervalMillis; + int checkpointCutOffMultiplier; + + @Before + public void setUp() throws Exception { + sleepIntervalMillis = 5 * 60 * 1000l; //5 minutes + checkpointCutOffMultiplier = 2; + + Configuration metricsConf = new Configuration(); + metricsConf.setInt(AGGREGATOR_CHECKPOINT_DELAY, 0); + metricsConf.setInt(RESULTSET_FETCH_SIZE, 2000); + + startTimeInDoWork = new AtomicLong(0); + endTimeInDoWork = new AtomicLong(0); + checkPoint = new AtomicLong(-1); + actualRuns = 0; + + agg = new AbstractTimelineAggregator(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND, null, metricsConf) { + @Override + public boolean doWork(long startTime, long endTime) { + startTimeInDoWork.set(startTime); + endTimeInDoWork.set(endTime); + actualRuns++; + + return true; + } + + @Override + protected Condition + prepareMetricQueryCondition(long startTime, long endTime) { + return null; + } + + @Override + protected void aggregate(ResultSet rs, long startTime, + long endTime) throws IOException, SQLException { + } + + @Override + public Long getSleepIntervalMillis() { + return sleepIntervalMillis; + } + + @Override + protected Integer getCheckpointCutOffMultiplier() { + return checkpointCutOffMultiplier; + } + + @Override + public boolean isDisabled() { + return false; + } + + @Override + protected String getCheckpointLocation() { + return "dummy_ckptFile"; + } + + protected long readCheckPoint() { + return checkPoint.get(); + } + + @Override + protected void saveCheckPoint(long checkpointTime) throws IOException { + checkPoint.set(checkpointTime); + } + }; + + } + + @Test + public void testDoWorkOnZeroDelay() throws Exception { + + long currentTime = System.currentTimeMillis(); + long roundedOffAggregatorTime = getRoundedCheckPointTimeMillis(currentTime, + sleepIntervalMillis); + + //Test first run of aggregator with no checkpoint + checkPoint.set(-1); + agg.runOnce(sleepIntervalMillis); + assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); + assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); + assertEquals(roundedOffAggregatorTime, checkPoint.get()); + assertEquals("Do not aggregate on first run", 0, actualRuns); + +// //Test first run with too "recent" checkpoint + currentTime = System.currentTimeMillis(); + checkPoint.set(currentTime); + agg.setSleepIntervalMillis(sleepIntervalMillis); + agg.runOnce(sleepIntervalMillis); + assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); + assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); + assertEquals("Do not aggregate on first run", 0, actualRuns); + + //Test first run with Too Old checkpoint + currentTime = System.currentTimeMillis(); + checkPoint.set(currentTime - 16*60*1000); //Old checkpoint + agg.runOnce(sleepIntervalMillis); + long checkPointTime = getRoundedAggregateTimeMillis(sleepIntervalMillis); + assertEquals("startTime should be zero", checkPointTime - sleepIntervalMillis, startTimeInDoWork.get()); + assertEquals("endTime should be zero", checkPointTime, endTimeInDoWork.get()); + assertEquals(roundedOffAggregatorTime, checkPoint.get()); + assertEquals("Do not aggregate on first run", 1, actualRuns); + + +// //Test first run with perfect checkpoint (sleepIntervalMillis back) + currentTime = System.currentTimeMillis(); + roundedOffAggregatorTime = getRoundedCheckPointTimeMillis(currentTime, + sleepIntervalMillis); + checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis; + long expectedCheckPoint = getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis); + checkPoint.set(checkPointTime); + agg.runOnce(sleepIntervalMillis); + assertEquals("startTime should the lower rounded time of the checkpoint time", + expectedCheckPoint, startTimeInDoWork.get()); + assertEquals("endTime should the lower rounded time of the checkpoint time + sleepIntervalMillis", + expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get()); + assertEquals(expectedCheckPoint + sleepIntervalMillis, + checkPoint.get()); + assertEquals("Aggregate on first run", 2, actualRuns); + + //Test edge case for checkpoint (2 x sleepIntervalMillis) + currentTime = System.currentTimeMillis(); + checkPoint.set(currentTime - 2*sleepIntervalMillis + 5000); + agg.runOnce(sleepIntervalMillis); + long expectedStartTime = getRoundedCheckPointTimeMillis(currentTime - 2*sleepIntervalMillis + 5000, sleepIntervalMillis); + assertEquals("startTime should the lower rounded time of the checkpoint time", + expectedStartTime, startTimeInDoWork.get()); + assertEquals("startTime should the lower rounded time of the checkpoint time + sleepIntervalMillis", + expectedStartTime + sleepIntervalMillis, endTimeInDoWork.get()); + assertEquals(expectedStartTime + sleepIntervalMillis, + checkPoint.get()); + assertEquals("Aggregate on second run", 3, actualRuns); + + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerTest.java new file mode 100644 index 0000000..f216b13 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/DownSamplerTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.core.timeline.aggregators; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + +public class DownSamplerTest { + + @Test + public void testGetDownsampleMetricPatterns() throws Exception { + + Configuration configuration = new Configuration(); + configuration.setIfUnset("timeline.metrics.downsampler.topn.metric.patterns", "pattern1,pattern2"); + configuration.setIfUnset("timeline.metrics.downsampler.lastvalue.metric.patterns", "pattern3"); + + List<String> patterns = DownSamplerUtils.getDownsampleMetricPatterns(configuration); + Assert.assertEquals(patterns.size(), 3); + Assert.assertTrue(patterns.contains("pattern1")); + Assert.assertTrue(patterns.contains("pattern2")); + Assert.assertTrue(patterns.contains("pattern3")); + + Configuration configuration2 = new Configuration(); + patterns = DownSamplerUtils.getDownsampleMetricPatterns(configuration2); + Assert.assertEquals(patterns.size(), 0); + } + + @Test + public void testGetDownSamplers() throws Exception { + + Configuration configuration = new Configuration(); + configuration.setIfUnset("timeline.metrics.downsampler.topn.metric.patterns", "pattern1,pattern2"); + configuration.setIfUnset("timeline.metrics.downsampler.test.metric.patterns", "pattern3"); + + List<CustomDownSampler> downSamplers = DownSamplerUtils.getDownSamplers(configuration); + Assert.assertEquals(downSamplers.size(), 1); + Assert.assertTrue(downSamplers.get(0) instanceof TopNDownSampler); + } + + @Ignore + @Test + public void testPrepareTopNDownSamplingStatement() throws Exception { + Configuration configuration = new Configuration(); + configuration.setIfUnset("timeline.metrics.downsampler.topn.metric.patterns", "pattern1,pattern2"); + configuration.setIfUnset("timeline.metrics.downsampler.topn.value", "3"); + + Map<String, String> conf = configuration.getValByRegex(DownSamplerUtils.downSamplerConfigPrefix); + + TopNDownSampler topNDownSampler = TopNDownSampler.fromConfig(conf); + List<String> stmts = topNDownSampler.prepareDownSamplingStatement(14000000l, 14100000l, "METRIC_RECORD"); + Assert.assertEquals(stmts.size(),2); + Assert.assertTrue(stmts.contains("SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, 14100000 AS SERVER_TIME, UNITS, " + + "MAX(METRIC_MAX), 1, MAX(METRIC_MAX), MAX(METRIC_MAX) FROM METRIC_RECORD WHERE " + + "METRIC_NAME LIKE 'pattern1' AND SERVER_TIME > 14000000 AND SERVER_TIME <= 14100000 " + + "GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS ORDER BY MAX(METRIC_MAX) DESC LIMIT 3")); + + Assert.assertTrue(stmts.contains("SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, 14100000 AS SERVER_TIME, UNITS, " + + "MAX(METRIC_MAX), 1, MAX(METRIC_MAX), MAX(METRIC_MAX) FROM METRIC_RECORD WHERE " + + "METRIC_NAME LIKE 'pattern2' AND SERVER_TIME > 14000000 AND SERVER_TIME <= 14100000 " + + "GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS ORDER BY MAX(METRIC_MAX) DESC LIMIT 3")); + + configuration.clear(); + configuration.setIfUnset("timeline.metrics.downsampler.topn.metric.patterns", "pattern1"); + configuration.setIfUnset("timeline.metrics.downsampler.topn.value", "4"); + configuration.setIfUnset("timeline.metrics.downsampler.topn.function", "sum"); + conf = configuration.getValByRegex(DownSamplerUtils.downSamplerConfigPrefix); + topNDownSampler = TopNDownSampler.fromConfig(conf); + stmts = topNDownSampler.prepareDownSamplingStatement(14000000l, 14100000l, "METRIC_AGGREGATE_MINUTE"); + Assert.assertEquals(stmts.size(),1); + + Assert.assertTrue(stmts.contains("SELECT METRIC_NAME, APP_ID, INSTANCE_ID, 14100000 AS SERVER_TIME, UNITS, " + + "SUM(METRIC_SUM), 1, SUM(METRIC_SUM), SUM(METRIC_SUM) FROM METRIC_AGGREGATE_MINUTE WHERE " + + "METRIC_NAME LIKE 'pattern1' AND SERVER_TIME > 14000000 AND SERVER_TIME <= 14100000 " + + "GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS ORDER BY SUM(METRIC_SUM) DESC LIMIT 4")); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITClusterAggregator.java new file mode 100644 index 0000000..e01f3b1 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITClusterAggregator.java @@ -0,0 +1,711 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNotNull; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; +import static org.apache.ambari.metrics.core.timeline.MetricTestHelper.createEmptyTimelineClusterMetric; +import static org.apache.ambari.metrics.core.timeline.MetricTestHelper.prepareSingleTimelineMetric; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.ambari.metrics.core.timeline.AbstractMiniHBaseClusterTest; +import org.apache.ambari.metrics.core.timeline.MetricTestHelper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.ambari.metrics.core.timeline.query.Condition; +import org.apache.ambari.metrics.core.timeline.query.DefaultCondition; +import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL; +import org.junit.Test; + +import junit.framework.Assert; + +public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { + private final TimelineMetricReadHelper metricReader = new TimelineMetricReadHelper(metadataManager, false); + + private Configuration getConfigurationForTest(boolean useGroupByAggregators) { + Configuration configuration = new Configuration(); + configuration.set("timeline.metrics.service.use.groupBy.aggregators", String.valueOf(useGroupByAggregators)); + return configuration; + } + + @Test + public void testShouldAggregateClusterProperly() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, + getConfigurationForTest(false), metadataManager, null, null); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 1)); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 2)); + ctime += 2*minute; + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 2)); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 1)); + + // WHEN + long endTime = ctime + minute + 1; + boolean success = agg.doWork(startTime, endTime); + + //THEN + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + while (rs.next()) { + TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); + MetricClusterAggregate currentHostAggregate = + metricReader.getMetricClusterAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + assertEquals(2, currentHostAggregate.getNumberOfHosts()); + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(3.0, currentHostAggregate.getSum()); + recordCount++; + } else { + fail("Unexpected entry"); + } + } + } + + @Test + public void testShouldAggregateClusterIgnoringInstance() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, + getConfigurationForTest(false), metadataManager, null, null); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000 * 2; + + /** + * Here we have two nodes with two instances each: + * | local1 | local2 | + * instance i1 | 1 | 2 | + * instance i2 | 3 | 4 | + * + */ + // Four 1's at ctime - 100 + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime - 100, "local1", + "i1", "disk_free", 1), true); + // Four 2's at ctime - 100: different host + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime - 100, "local2", + "i1", "disk_free", 2), true); + // Avoid overwrite + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime + 100, "local1", + "i2", "disk_free", 3), true); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime + 100, "local2", + "i2", "disk_free", 4), true); + + ctime += minute; + + // Four 1's at ctime + 2 min + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime - 100, "local1", + "i1", "disk_free", 1), true); + // Four 1's at ctime + 2 min - different host + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime - 100, "local2", + "i1", "disk_free", 3), true); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime + 100, "local1", + "i2", "disk_free", 2), true); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime + 100, "local2", + "i2", "disk_free", 4), true); + // WHEN + long endTime = ctime + minute; + boolean success = agg.doWork(startTime - 1000, endTime + 1000); + + //THEN + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + while (rs.next()) { + TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs); + MetricClusterAggregate currentHostAggregate = + readHelper.getMetricClusterAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate); + assertEquals(2, currentHostAggregate.getNumberOfHosts()); + assertEquals(5.0, Math.floor(currentHostAggregate.getSum())); + recordCount++; + } else { + if (!currentMetric.getMetricName().equals("live_hosts")) { + fail("Unexpected entry"); + } + } + } + + Assert.assertEquals(6, recordCount); //Interpolation adds 1 record. + } + + @Test + public void testShouldAggregateDifferentMetricsOnClusterProperly() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, + getConfigurationForTest(false), metadataManager, null, null); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false); + + // here we put some metrics tha will be aggregated + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 1), true); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 2), true); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1", + "disk_used", 1), true); + + ctime += 2*minute; + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 2), true); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 1), true); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1", + "disk_used", 1), true); + + // WHEN + long endTime = ctime + minute; + boolean success = agg.doWork(startTime, endTime); + + //THEN + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + while (rs.next()) { + TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs); + MetricClusterAggregate currentHostAggregate = + readHelper.getMetricClusterAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + assertEquals(2, currentHostAggregate.getNumberOfHosts()); + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(3.0, currentHostAggregate.getSum()); + recordCount++; + } else if ("disk_used".equals(currentMetric.getMetricName())) { + assertEquals(1, currentHostAggregate.getNumberOfHosts()); + assertEquals(1.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(1.0, currentHostAggregate.getSum()); + recordCount++; + } else { + if (!currentMetric.getMetricName().equals("live_hosts")) { + fail("Unexpected entry"); + } + } + } + } + + @Test + public void testAggregateDailyClusterMetrics() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, getConfigurationForTest( + false), + metadataManager, + null); + + // this time can be virtualized! or made independent from real clock + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long hour = 3600 * 1000; + + Map<TimelineClusterMetric, MetricHostAggregate> records = + new HashMap<TimelineClusterMetric, MetricHostAggregate>(); + + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime), + MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += hour), + MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += hour), + MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += hour), + MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); + + + hdb.saveClusterAggregateRecordsSecond(records, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); + + // WHEN + agg.doWork(startTime, ctime + hour + 1000); + + // THEN + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_DAILY"); + int count = 0; + while (rs.next()) { + TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID")); + assertEquals("METRIC_NAME", "disk_used", metric.getMetricName()); + assertEquals("APP_ID", "test_app", metric.getAppId()); + assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + count++; + } + + assertEquals("Day aggregated row expected ", 1, count); + } + + @Test + public void testShouldAggregateClusterOnMinuteProperly() throws Exception { + + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute( + hdb, + getConfigurationForTest(false), + metadataManager, + null); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long second = 1000; + long minute = 60*second; + + Map<TimelineClusterMetric, MetricClusterAggregate> records = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + + hdb.saveClusterAggregateRecords(records); + agg.doWork(startTime, ctime + second); + long oldCtime = ctime + second; + + //Next minute + ctime = startTime + minute; + + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += second), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + + hdb.saveClusterAggregateRecords(records); + agg.doWork(oldCtime, ctime + second); + + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_MINUTE"); + int count = 0; + long diff = 0 ; + while (rs.next()) { + TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID")); + assertEquals("METRIC_NAME", "disk_used", metric.getMetricName()); + assertEquals("APP_ID", "test_app", metric.getAppId()); + assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + if (count == 0) { + diff+=rs.getLong("SERVER_TIME"); + } else { + diff-=rs.getLong("SERVER_TIME"); + if (diff < 0) { + diff*=-1; + } + assertTrue(diff == minute); + } + count++; + } + + assertEquals("One hourly aggregated row expected ", 2, count); + } + + @Test + public void testShouldAggregateClusterOnHourProperly() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly( + hdb, + getConfigurationForTest(false), + metadataManager, + null); + + // this time can be virtualized! or made independent from real clock + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + + Map<TimelineClusterMetric, MetricClusterAggregate> records = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + + hdb.saveClusterAggregateRecords(records); + + // WHEN + agg.doWork(startTime, ctime + minute); + + // THEN + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); + int count = 0; + while (rs.next()) { + TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID")); + assertEquals("METRIC_NAME", "disk_used", metric.getMetricName()); + assertEquals("APP_ID", "test_app", metric.getAppId()); + assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + count++; + } + + assertEquals("One hourly aggregated row expected ", 1, count); + } + + @Test + public void testShouldAggregateDifferentMetricsOnHourProperly() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly( + hdb, + getConfigurationForTest(false), + metadataManager, + null); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + + Map<TimelineClusterMetric, MetricClusterAggregate> records = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + hdb.saveClusterAggregateRecords(records); + + // WHEN + agg.doWork(startTime, ctime + minute); + + // THEN + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); + int count = 0; + while (rs.next()) { + TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID")); + if ("disk_used".equals(metric.getMetricName())) { + assertEquals("APP_ID", "test_app", metric.getAppId()); + assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + } else if ("disk_free".equals(metric.getMetricName())) { + assertEquals("APP_ID", "test_app", metric.getAppId()); + assertEquals("METRIC_SUM", 1.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN")); + } + + count++; + } + + assertEquals("Two hourly aggregated row expected ", 2, count); + } + + @Test + public void testAppLevelHostMetricAggregates() throws Exception { + Configuration conf = getConfigurationForTest(false); + conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1"); + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond( + hdb, + conf, + metadataManager, + null, + null); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric((ctime), "local1", + "app1", null, "app_metric_random", 1), true); + ctime += 10; + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1", + "cpu_user", 1), true); + ctime += 10; + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2", + "cpu_user", 2), true); + + // WHEN + long endTime = ctime + minute; + boolean success = agg.doWork(startTime, endTime); + + //THEN + List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ add("cpu_user"); }}, + null, + "app1", null); + + Condition condition = new DefaultCondition(uuids, + new ArrayList<String>() {{ add("cpu_user"); }}, null, "app1", null, + startTime - 90000, endTime, null, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + TimelineClusterMetric currentMetric = null; + MetricClusterAggregate currentHostAggregate = null; + while (rs.next()) { + currentMetric = readHelper.fromResultSet(rs); + currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs); + recordCount++; + } + assertEquals(3, recordCount); + assertNotNull(currentMetric); + assertEquals("cpu_user", currentMetric.getMetricName()); + assertEquals("app1", currentMetric.getAppId()); + assertNotNull(currentHostAggregate); + assertEquals(1, currentHostAggregate.getNumberOfHosts()); + assertEquals(1.0d, currentHostAggregate.getSum()); + } + + @Test + public void testClusterAggregateMetricNormalization() throws Exception { + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond( + hdb, + getConfigurationForTest(false), + metadataManager, + null, + null); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false); + + long currentTime = System.currentTimeMillis(); + // Sample data + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("yarn.ClusterMetrics.NumActiveNMs"); + metric1.setAppId("resourcemanager"); + metric1.setHostName("h1"); + metric1.setStartTime(currentTime); + metric1.setMetricValues(new TreeMap<Long, Double>() {{ + put(currentTime + 10000, 1.0); + put(currentTime + 20000, 1.0); + put(currentTime + 30000, 1.0); + put(currentTime + 40000, 1.0); + put(currentTime + 50000, 1.0); + put(currentTime + 60000, 1.0); + put(currentTime + 70000, 1.0); + }}); + + TimelineMetric metric2 = new TimelineMetric(); + metric2.setMetricName("yarn.ClusterMetrics.NumActiveNMs"); + metric2.setAppId("resourcemanager"); + metric2.setHostName("h1"); + metric2.setStartTime(currentTime + 70000); + metric2.setMetricValues(new TreeMap<Long, Double>() {{ + put(currentTime + 70000, 1.0); + put(currentTime + 80000, 1.0); + put(currentTime + 90000, 1.0); + put(currentTime + 100000, 1.0); + put(currentTime + 110000, 1.0); + put(currentTime + 120000, 1.0); + put(currentTime + 130000, 1.0); + }}); + + TimelineMetrics metrics = new TimelineMetrics(); + metrics.setMetrics(Collections.singletonList(metric1)); + insertMetricRecords(conn, metrics); + + metrics.setMetrics(Collections.singletonList(metric2)); + insertMetricRecords(conn, metrics); + + long startTime = currentTime - 3*60*1000; + long endTime = currentTime + 3*60*1000; + + agg.doWork(startTime, endTime); + + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime, null, null, true); + condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, + METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + int recordCount = 0; + while (rs.next()) { + TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs); + MetricClusterAggregate currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs); + + if ("yarn.ClusterMetrics.NumActiveNMs".equals(currentMetric.getMetricName())) { + assertEquals(1, currentHostAggregate.getNumberOfHosts()); + assertEquals(1.0, currentHostAggregate.getMax()); + assertEquals(1.0, currentHostAggregate.getMin()); + assertEquals(1.0, currentHostAggregate.getSum()); + recordCount++; + } else { + if (!currentMetric.getMetricName().equals("live_hosts")) { + fail("Unexpected entry"); + } + } + } + Assert.assertEquals(10, recordCount); //With interpolation. + } + + @Test + public void testAggregationUsingGroupByQuery() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly( + hdb, + getConfigurationForTest(true), + metadataManager, + null); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + + Map<TimelineClusterMetric, MetricClusterAggregate> records = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_used", ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric("disk_free", ctime), + new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); + + hdb.saveClusterAggregateRecords(records); + + // WHEN + agg.doWork(startTime, ctime + minute); + + // THEN + ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); + int count = 0; + while (rs.next()) { + TimelineMetric metric = metadataManager.getMetricFromUuid(rs.getBytes("UUID")); + if ("disk_used".equals(metric.getMetricName())) { + assertEquals("APP_ID", "test_app", metric.getAppId()); + assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); + } else if ("disk_free".equals(metric.getMetricName())) { + assertEquals("APP_ID", "test_app", metric.getAppId()); + assertEquals("METRIC_SUM", 1.0, rs.getDouble("METRIC_SUM")); + assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT")); + assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX")); + assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN")); + } + count++; + } + assertEquals("Two hourly aggregated row expected ", 2, count); + } + + private ResultSet executeQuery(String query) throws SQLException { + Connection conn = getConnection(getUrl()); + Statement stmt = conn.createStatement(); + return stmt.executeQuery(query); + } +}
