http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java deleted file mode 100644 index dd0378d..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStoreWatcherTest.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import org.apache.hadoop.metrics2.sink.timeline.Precision; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; -import org.apache.hadoop.util.ExitUtil; -import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; -import org.easymock.EasyMock; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.List; - -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.anyString; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.eq; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; -import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.verify; -import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replayAll; -import static org.powermock.api.easymock.PowerMock.verifyAll; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(ExitUtil.class) -public class TimelineMetricStoreWatcherTest { - - @Test - public void testRunPositive() throws Exception { - TimelineMetricStore metricStore = createNiceMock(TimelineMetricStore.class); - - expect(metricStore.putMetrics(anyObject(TimelineMetrics.class))) - .andReturn(new TimelinePutResponse()); - - // metric found - expect(metricStore.getTimelineMetrics(EasyMock.<List<String>>anyObject(), - EasyMock.<List<String>>anyObject(), anyObject(String.class), - anyObject(String.class), anyObject(Long.class), anyObject(Long.class), - eq(Precision.SECONDS), eq(1), eq(true), anyObject(TopNConfig.class), anyString())) - .andReturn(null).anyTimes(); - - mockStatic(ExitUtil.class); - - replay(metricStore); - - TimelineMetricStoreWatcher timelineMetricStoreWatcher = - new TimelineMetricStoreWatcher(metricStore, TimelineMetricConfiguration.getInstance()); - timelineMetricStoreWatcher.run(); - timelineMetricStoreWatcher.run(); - timelineMetricStoreWatcher.run(); - verify(metricStore); - - } - - @Test - public void testRunNegative() throws Exception { - TimelineMetricStore metricStore = createNiceMock(TimelineMetricStore.class); - - expect(metricStore.putMetrics(anyObject(TimelineMetrics.class))) - .andReturn(new TimelinePutResponse()); - - // no metrics found - expect(metricStore.getTimelineMetrics(EasyMock.<List<String>>anyObject(), - EasyMock.<List<String>>anyObject(), anyObject(String.class), - anyObject(String.class), anyObject(Long.class), anyObject(Long.class), - eq(Precision.SECONDS), eq(1), eq(true), anyObject(TopNConfig.class), anyString())) - .andReturn(null).anyTimes(); - - String msg = "Error getting metrics from TimelineMetricStore. " + - "Shutting down by TimelineMetricStoreWatcher."; - mockStatic(ExitUtil.class); - ExitUtil.terminate(-1, msg); - expectLastCall().anyTimes(); - - replayAll(); - - TimelineMetricStoreWatcher timelineMetricStoreWatcher = - new TimelineMetricStoreWatcher(metricStore, TimelineMetricConfiguration.getInstance()); - timelineMetricStoreWatcher.run(); - timelineMetricStoreWatcher.run(); - timelineMetricStoreWatcher.run(); - - verifyAll(); - - } - -}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java deleted file mode 100644 index 51cde4a..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; -import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; -import org.apache.hadoop.metrics2.sink.timeline.Precision; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; - -/** - * Aggregator Memory sink implementation to perform test - */ -public class TimelineMetricsAggregatorMemorySink - implements TimelineMetricsAggregatorSink { - - private static Map<Precision, Map<TimelineMetric, MetricHostAggregate>> hostAggregateRecords = - new HashMap<>(); - private static Map<Precision, Map<TimelineClusterMetric, MetricHostAggregate>> clusterTimeAggregateRecords = - new HashMap<>(); - private static Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateRecords = - new HashMap<>(); - - @Override - public void saveHostAggregateRecords( - Map<TimelineMetric, MetricHostAggregate> hostAggregateMap, - Precision precision) { - if (hostAggregateMap == null || hostAggregateMap.size() == 0) { - return; - } - - Map<TimelineMetric, MetricHostAggregate> aggregatedValue = null; - if (hostAggregateRecords.containsKey(precision)) { - aggregatedValue = hostAggregateRecords.get(precision); - } else { - aggregatedValue = new HashMap<>(); - hostAggregateRecords.put(precision, aggregatedValue); - } - - for (Entry<TimelineMetric, MetricHostAggregate> entry : hostAggregateMap - .entrySet()) { - TimelineMetric timelineMetricClone = new TimelineMetric(entry.getKey()); - MetricHostAggregate hostAggregate = entry.getValue(); - MetricHostAggregate hostAggregateClone = new MetricHostAggregate( - hostAggregate.getSum(), (int) hostAggregate.getNumberOfSamples(), - hostAggregate.getDeviation(), hostAggregate.getMax(), - hostAggregate.getMin()); - aggregatedValue.put(timelineMetricClone, hostAggregateClone); - } - } - - @Override - public void saveClusterTimeAggregateRecords( - Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap, - Precision precision) { - if (clusterTimeAggregateMap == null - || clusterTimeAggregateMap.size() == 0) { - return; - } - - Map<TimelineClusterMetric, MetricHostAggregate> aggregatedValue = null; - if (clusterTimeAggregateRecords.containsKey(precision)) { - aggregatedValue = clusterTimeAggregateRecords.get(precision); - } else { - aggregatedValue = new HashMap<>(); - clusterTimeAggregateRecords.put(precision, aggregatedValue); - } - - for (Entry<TimelineClusterMetric, MetricHostAggregate> entry : clusterTimeAggregateMap - .entrySet()) { - TimelineClusterMetric clusterMetric = entry.getKey(); - TimelineClusterMetric clusterMetricClone = - new TimelineClusterMetric(clusterMetric.getMetricName(), - clusterMetric.getAppId(), clusterMetric.getInstanceId(), - clusterMetric.getTimestamp()); - MetricHostAggregate hostAggregate = entry.getValue(); - MetricHostAggregate hostAggregateClone = new MetricHostAggregate( - hostAggregate.getSum(), (int) hostAggregate.getNumberOfSamples(), - hostAggregate.getDeviation(), hostAggregate.getMax(), - hostAggregate.getMin()); - aggregatedValue.put(clusterMetricClone, hostAggregateClone); - } - } - - @Override - public void saveClusterAggregateRecords( - Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMaps) { - - if (clusterAggregateMaps == null || clusterAggregateMaps.size() == 0) { - return; - } - - for (Entry<TimelineClusterMetric, MetricClusterAggregate> entry : clusterAggregateMaps - .entrySet()) { - TimelineClusterMetric clusterMetric = entry.getKey(); - TimelineClusterMetric clusterMetricClone = - new TimelineClusterMetric(clusterMetric.getMetricName(), - clusterMetric.getAppId(), clusterMetric.getInstanceId(), - clusterMetric.getTimestamp()); - MetricClusterAggregate clusterAggregate = entry.getValue(); - MetricClusterAggregate clusterAggregateClone = new MetricClusterAggregate( - clusterAggregate.getSum(), (int) clusterAggregate.getNumberOfHosts(), - clusterAggregate.getDeviation(), clusterAggregate.getMax(), - clusterAggregate.getMin()); - clusterAggregateRecords.put(clusterMetricClone, clusterAggregateClone); - } - } - - public Map<Precision, Map<TimelineMetric, MetricHostAggregate>> getHostAggregateRecords() { - return Collections.unmodifiableMap(hostAggregateRecords); - } - - public Map<Precision, Map<TimelineClusterMetric, MetricHostAggregate>> getClusterTimeAggregateRecords() { - return Collections.unmodifiableMap(clusterTimeAggregateRecords); - } - - public Map<TimelineClusterMetric, MetricClusterAggregate> getClusterAggregateRecords() { - return Collections.unmodifiableMap(clusterAggregateRecords); - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilterTest.java deleted file mode 100644 index a308248..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilterTest.java +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; - -import junit.framework.Assert; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.easymock.EasyMock; -import org.junit.Test; - -import java.net.URL; -import java.util.HashSet; -import java.util.Set; - -public class TimelineMetricsFilterTest { - - @Test - public void testAppBlacklisting() throws Exception{ - - Configuration metricsConf = new Configuration(); - metricsConf.set("timeline.metrics.apps.blacklist", "hbase,datanode,nimbus"); - TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class); - expect(configuration.getMetricsConf()).andReturn(metricsConf).once(); - replay(configuration); - - TimelineMetricsFilter.initializeMetricFilter(configuration); - - TimelineMetric timelineMetric = new TimelineMetric(); - - timelineMetric.setAppId("hbase"); - Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setAppId("namenode"); - Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setAppId("nimbus"); - Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); - } - - @Test - public void testMetricWhitelisting() throws Exception { - - Configuration metricsConf = new Configuration(); - TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class); - expect(configuration.getMetricsConf()).andReturn(metricsConf).once(); - expect(configuration.isWhitelistingEnabled()).andReturn(true).anyTimes(); - replay(configuration); - - URL fileUrl = ClassLoader.getSystemResource("test_data/metric_whitelist.dat"); - - metricsConf.set("timeline.metrics.whitelist.file", fileUrl.getPath()); - TimelineMetricsFilter.initializeMetricFilter(configuration); - - TimelineMetric timelineMetric = new TimelineMetric(); - - timelineMetric.setMetricName("cpu_system"); - Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setMetricName("cpu_system1"); - Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setMetricName("jvm.JvmMetrics.MemHeapUsedM"); - Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setMetricName("dfs.FSNamesystem.TotalFiles"); - Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); - } - - @Test - public void testTogether() throws Exception { - - Configuration metricsConf = new Configuration(); - metricsConf.set("timeline.metrics.apps.blacklist", "hbase,datanode,nimbus"); - TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class); - expect(configuration.getMetricsConf()).andReturn(metricsConf).once(); - replay(configuration); - - URL fileUrl = ClassLoader.getSystemResource("test_data/metric_whitelist.dat"); - metricsConf.set("timeline.metrics.whitelist.file", fileUrl.getPath()); - - TimelineMetricsFilter.initializeMetricFilter(configuration); - - TimelineMetric timelineMetric = new TimelineMetric(); - - timelineMetric.setMetricName("cpu_system"); - timelineMetric.setAppId("hbase"); - Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setMetricName("cpu_system"); - timelineMetric.setAppId("HOST"); - Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setMetricName("jvm.JvmMetrics.MemHeapUsedM"); - Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setMetricName("dfs.FSNamesystem.TotalFiles"); - Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); - } - - @Test - public void testAmshbaseWhitelisting() throws Exception { - - TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class); - - Configuration metricsConf = new Configuration(); - expect(configuration.getMetricsConf()).andReturn(metricsConf).once(); - - Set<String> whitelist = new HashSet(); - whitelist.add("regionserver.Server.Delete_99th_percentile"); - whitelist.add("regionserver.Server.Delete_max"); - whitelist.add("regionserver.Server.Delete_mean"); - expect(configuration.getAmshbaseWhitelist()).andReturn(whitelist).once(); - - replay(configuration); - - TimelineMetricsFilter.initializeMetricFilter(configuration); - - TimelineMetric timelineMetric = new TimelineMetric(); - - timelineMetric.setMetricName("regionserver.Server.Delete_max"); - timelineMetric.setAppId("ams-hbase"); - Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setMetricName("regionserver.Server.Delete_min3333"); - timelineMetric.setAppId("ams-hbase"); - Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setMetricName("jvm.JvmMetrics.MemHeapUsedM"); - timelineMetric.setAppId("hbase"); - Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); - } - - @Test - public void testHybridFilter() throws Exception { - - // Whitelist Apps - namenode, nimbus - // Blacklist Apps - datanode, kafka_broker - // Accept ams-hbase whitelisting. - // Reject non whitelisted metrics from non whitelisted Apps (Say hbase) - - TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class); - - Configuration metricsConf = new Configuration(); - metricsConf.set("timeline.metrics.apps.whitelist", "namenode,nimbus"); - metricsConf.set("timeline.metrics.apps.blacklist", "datanode,kafka_broker"); - URL fileUrl = ClassLoader.getSystemResource("test_data/metric_whitelist.dat"); - metricsConf.set("timeline.metrics.whitelist.file", fileUrl.getPath()); - expect(configuration.getMetricsConf()).andReturn(metricsConf).once(); - - Set<String> whitelist = new HashSet(); - whitelist.add("regionserver.Server.Delete_99th_percentile"); - whitelist.add("regionserver.Server.Delete_max"); - whitelist.add("regionserver.Server.Delete_mean"); - expect(configuration.getAmshbaseWhitelist()).andReturn(whitelist).once(); - - expect(configuration.isWhitelistingEnabled()).andReturn(true).anyTimes(); - - replay(configuration); - - TimelineMetricsFilter.initializeMetricFilter(configuration); - - TimelineMetric timelineMetric = new TimelineMetric(); - - //Test App Whitelisting - timelineMetric.setMetricName("metric.a.b.c"); - timelineMetric.setAppId("namenode"); - Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setMetricName("metric.d.e.f"); - timelineMetric.setAppId("nimbus"); - Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - //Test App Blacklisting - timelineMetric.setMetricName("metric.d.e.f"); - timelineMetric.setAppId("datanode"); - Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setMetricName("metric.d.e.f"); - timelineMetric.setAppId("kafka_broker"); - Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - - //Test ams-hbase Whitelisting - timelineMetric.setMetricName("regionserver.Server.Delete_max"); - timelineMetric.setAppId("ams-hbase"); - Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setMetricName("regionserver.Server.Delete_min3333"); - timelineMetric.setAppId("ams-hbase"); - Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setMetricName("regionserver.Server.Delete_mean"); - timelineMetric.setAppId("ams-hbase"); - Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - //Test Metric Whitelisting - timelineMetric.setMetricName("regionserver.WAL.SyncTime_max"); - timelineMetric.setAppId("hbase"); - Assert.assertTrue(TimelineMetricsFilter.acceptMetric(timelineMetric)); - - timelineMetric.setMetricName("regionserver.WAL.metric.not.needed"); - timelineMetric.setAppId("hbase"); - Assert.assertFalse(TimelineMetricsFilter.acceptMetric(timelineMetric)); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java deleted file mode 100644 index 2cb66ba..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsIgniteCacheTest.java +++ /dev/null @@ -1,240 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - - -import junit.framework.Assert; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; - -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_APP_ID; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis; -import static org.easymock.EasyMock.anyObject; -import static org.easymock.EasyMock.createNiceMock; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.replay; -import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replayAll; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(TimelineMetricConfiguration.class) - -@PowerMockIgnore("javax.management.*") -public class TimelineMetricsIgniteCacheTest { - private static TimelineMetricsIgniteCache timelineMetricsIgniteCache; - @BeforeClass - public static void setupConf() throws Exception { - TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new - Configuration(), new Configuration()); - mockStatic(TimelineMetricConfiguration.class); - expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes(); - conf.getMetricsConf().set(CLUSTER_AGGREGATOR_APP_IDS, "appIdForHostsAggr"); - conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost"); - replayAll(); - - timelineMetricsIgniteCache = new TimelineMetricsIgniteCache(); - } - - @Test - public void putEvictMetricsFromCacheSlicesMerging() throws Exception { - long cacheSliceIntervalMillis = 30000L; - - TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class); - expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once(); - replay(metricMetadataManagerMock); - - long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis); - - long seconds = 1000; - TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); - /* - - 0 +30s +60s - | | | - (1)(2)(3) (4)(5)(6) h1 - - */ - // Case 1 : data points are distributed equally, no values are lost, single host. - metricValues.put(startTime + 4*seconds, 1.0); - metricValues.put(startTime + 14*seconds, 2.0); - metricValues.put(startTime + 24*seconds, 3.0); - metricValues.put(startTime + 34*seconds, 4.0); - metricValues.put(startTime + 44*seconds, 5.0); - metricValues.put(startTime + 54*seconds, 6.0); - - TimelineMetric timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1"); - timelineMetric.setStartTime(metricValues.firstKey()); - timelineMetric.addMetricValues(metricValues); - - Collection<TimelineMetric> timelineMetrics = new ArrayList<>(); - timelineMetrics.add(timelineMetric); - timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock); - Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds); - - Assert.assertEquals(aggregateMap.size(), 2); - TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), - timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds); - - Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); - Assert.assertEquals(2.0, aggregateMap.get(timelineClusterMetric).getSum()); - - timelineClusterMetric.setTimestamp(startTime + 2*30*seconds); - Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); - Assert.assertEquals(5.0, aggregateMap.get(timelineClusterMetric).getSum()); - - metricValues.clear(); - timelineMetrics.clear(); - - /* - - 0 +30s +60s - | | | - (1)(2)(3) (4)(5)(6) h1, h2 - - */ - // Case 2 : data points are distributed equally, no values are lost, two hosts. - metricValues.put(startTime + 4*seconds, 1.0); - metricValues.put(startTime + 14*seconds, 2.0); - metricValues.put(startTime + 24*seconds, 3.0); - metricValues.put(startTime + 34*seconds, 4.0); - metricValues.put(startTime + 44*seconds, 5.0); - metricValues.put(startTime + 54*seconds, 6.0); - - timelineMetric = new TimelineMetric("metric1", "host1", "app1", "instance1"); - timelineMetric.setMetricValues(metricValues); - - metricValues = new TreeMap<>(); - metricValues.put(startTime + 5*seconds, 2.0); - metricValues.put(startTime + 15*seconds, 4.0); - metricValues.put(startTime + 25*seconds, 6.0); - metricValues.put(startTime + 35*seconds, 8.0); - metricValues.put(startTime + 45*seconds, 10.0); - metricValues.put(startTime + 55*seconds, 12.0); - TimelineMetric timelineMetric2 = new TimelineMetric("metric1", "host2", "app1", "instance1"); - timelineMetric2.setMetricValues(metricValues); - - timelineMetrics = new ArrayList<>(); - timelineMetrics.add(timelineMetric); - timelineMetrics.add(timelineMetric2); - timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock); - aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds); - - Assert.assertEquals(aggregateMap.size(), 2); - timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), - timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds); - - Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); - Assert.assertEquals(6.0, aggregateMap.get(timelineClusterMetric).getSum()); - - timelineClusterMetric.setTimestamp(startTime + 2*30*seconds); - Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); - Assert.assertEquals(15.0, aggregateMap.get(timelineClusterMetric).getSum()); - - metricValues.clear(); - timelineMetrics.clear(); - - Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize")); - } - - @Test - public void updateAppAggregatesFromHostMetricTest() { - //make sure hosts metrics are aggregated for appIds from "timeline.metrics.service.cluster.aggregator.appIds" - - long cacheSliceIntervalMillis = 30000L; - - TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class); - expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once(); - expect(metricMetadataManagerMock.getHostedAppsCache()).andReturn(new HashMap<>()).anyTimes(); - replay(metricMetadataManagerMock); - - long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(), cacheSliceIntervalMillis); - - long seconds = 1000; - - TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); - List<TimelineMetric> timelineMetrics = new ArrayList<>(); - TimelineMetric timelineMetric; - - metricValues = new TreeMap<>(); - metricValues.put(startTime + 15*seconds, 1.0); - metricValues.put(startTime + 55*seconds, 2.0); - timelineMetric = new TimelineMetric("host_metric", "host1", HOST_APP_ID, "instance1"); - timelineMetric.setMetricValues(metricValues); - timelineMetrics.add(timelineMetric); - - metricValues = new TreeMap<>(); - metricValues.put(startTime + 45*seconds, 3.0); - metricValues.put(startTime + 85*seconds, 4.0); - timelineMetric = new TimelineMetric("app_metric", "host1", "appIdForHostsAggr", "instance1"); - timelineMetric.setMetricValues(metricValues); - timelineMetrics.add(timelineMetric); - - metricValues = new TreeMap<>(); - metricValues.put(startTime + 85*seconds, 5.0); - timelineMetric = new TimelineMetric("host_metric", "host1", HOST_APP_ID, "instance1"); - timelineMetric.setMetricValues(metricValues); - timelineMetrics.add(timelineMetric); - - metricValues = new TreeMap<>(); - metricValues.put(startTime + 85*seconds, 6.0); - timelineMetric = new TimelineMetric("host_metric", "host2", HOST_APP_ID, "instance1"); - timelineMetric.setMetricValues(metricValues); - timelineMetrics.add(timelineMetric); - - timelineMetricsIgniteCache.putMetrics(timelineMetrics, metricMetadataManagerMock); - - Map<TimelineClusterMetric, MetricClusterAggregate> aggregateMap = timelineMetricsIgniteCache.evictMetricAggregates(startTime, startTime + 120*seconds); - Assert.assertEquals(aggregateMap.size(), 6); - TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), - timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 90*seconds); - - Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); - Assert.assertEquals(11.0, aggregateMap.get(timelineClusterMetric).getSum()); - - timelineClusterMetric = new TimelineClusterMetric("app_metric", - "appIdForHostsAggr", "instance1", startTime + 90*seconds); - Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); - Assert.assertEquals(4.0, aggregateMap.get(timelineClusterMetric).getSum()); - - timelineClusterMetric = new TimelineClusterMetric("host_metric", - "appIdForHostsAggr", "instance1", startTime + 90*seconds); - Assert.assertTrue(aggregateMap.containsKey(timelineClusterMetric)); - Assert.assertEquals(5.0, aggregateMap.get(timelineClusterMetric).getSum()); - - Assert.assertEquals(0d, timelineMetricsIgniteCache.getPointInTimeCacheMetrics().get("Cluster_KeySize")); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TopNConditionTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TopNConditionTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TopNConditionTest.java deleted file mode 100644 index 5110191..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TopNConditionTest.java +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline; - -import junit.framework.Assert; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - -public class TopNConditionTest { - - - @Test - public void testTopNCondition() { - - List<String> metricNames = new ArrayList<>(); - List<String> hostnames = new ArrayList<>(); - - //Valid Cases - - // "H" hosts and 1 Metric - hostnames.add("h1"); - hostnames.add("h2"); - metricNames.add("m1"); - Assert.assertTrue(TopNCondition.isTopNHostCondition(metricNames, hostnames)); - Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); - hostnames.clear(); - - // Host(s) with wildcard & 1 Metric - hostnames.add("h%"); - hostnames.add("g1"); - Assert.assertTrue(TopNCondition.isTopNHostCondition(metricNames, hostnames)); - Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); - hostnames.clear(); - - // M Metrics and No host - metricNames.add("m2"); - metricNames.add("m3"); - Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames)); - Assert.assertTrue(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); - - // M Metrics with wildcard and No host - metricNames.add("m2"); - metricNames.add("m%"); - Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames)); - Assert.assertTrue(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); - - // M Metrics with wildcard and 1 host - metricNames.add("m2"); - metricNames.add("m%"); - hostnames.add("h1"); - Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames)); - Assert.assertTrue(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); - - metricNames.clear(); - hostnames.clear(); - - //Invalid Cases - // M metrics and H hosts - metricNames.add("m1"); - metricNames.add("m2"); - hostnames.add("h1"); - hostnames.add("h2"); - Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames)); - Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); - - metricNames.clear(); - hostnames.clear(); - - // Wildcard in 1 and multiple in other - metricNames.add("m1"); - metricNames.add("m2"); - hostnames.add("%"); - Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames)); - Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); - - metricNames.clear(); - hostnames.clear(); - - //Wildcard in both - metricNames.add("m%"); - hostnames.add("%"); - Assert.assertFalse(TopNCondition.isTopNHostCondition(metricNames, hostnames)); - Assert.assertFalse(TopNCondition.isTopNMetricCondition(metricNames, hostnames)); - - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java deleted file mode 100644 index b4d0f0a..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.junit.Before; -import org.junit.Test; -import java.io.IOException; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.concurrent.atomic.AtomicLong; - -import static junit.framework.Assert.assertEquals; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.RESULTSET_FETCH_SIZE; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedAggregateTimeMillis; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis; - -public class AbstractTimelineAggregatorTest { - - private AbstractTimelineAggregator agg; - - AtomicLong startTimeInDoWork; - AtomicLong endTimeInDoWork; - AtomicLong checkPoint; - int actualRuns; - - long sleepIntervalMillis; - int checkpointCutOffMultiplier; - - @Before - public void setUp() throws Exception { - sleepIntervalMillis = 5 * 60 * 1000l; //5 minutes - checkpointCutOffMultiplier = 2; - - Configuration metricsConf = new Configuration(); - metricsConf.setInt(AGGREGATOR_CHECKPOINT_DELAY, 0); - metricsConf.setInt(RESULTSET_FETCH_SIZE, 2000); - - startTimeInDoWork = new AtomicLong(0); - endTimeInDoWork = new AtomicLong(0); - checkPoint = new AtomicLong(-1); - actualRuns = 0; - - agg = new AbstractTimelineAggregator(AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND, null, metricsConf) { - @Override - public boolean doWork(long startTime, long endTime) { - startTimeInDoWork.set(startTime); - endTimeInDoWork.set(endTime); - actualRuns++; - - return true; - } - - @Override - protected Condition - prepareMetricQueryCondition(long startTime, long endTime) { - return null; - } - - @Override - protected void aggregate(ResultSet rs, long startTime, - long endTime) throws IOException, SQLException { - } - - @Override - public Long getSleepIntervalMillis() { - return sleepIntervalMillis; - } - - @Override - protected Integer getCheckpointCutOffMultiplier() { - return checkpointCutOffMultiplier; - } - - @Override - public boolean isDisabled() { - return false; - } - - @Override - protected String getCheckpointLocation() { - return "dummy_ckptFile"; - } - - protected long readCheckPoint() { - return checkPoint.get(); - } - - @Override - protected void saveCheckPoint(long checkpointTime) throws IOException { - checkPoint.set(checkpointTime); - } - }; - - } - - @Test - public void testDoWorkOnZeroDelay() throws Exception { - - long currentTime = System.currentTimeMillis(); - long roundedOffAggregatorTime = getRoundedCheckPointTimeMillis(currentTime, - sleepIntervalMillis); - - //Test first run of aggregator with no checkpoint - checkPoint.set(-1); - agg.runOnce(sleepIntervalMillis); - assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); - assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); - assertEquals(roundedOffAggregatorTime, checkPoint.get()); - assertEquals("Do not aggregate on first run", 0, actualRuns); - -// //Test first run with too "recent" checkpoint - currentTime = System.currentTimeMillis(); - checkPoint.set(currentTime); - agg.setSleepIntervalMillis(sleepIntervalMillis); - agg.runOnce(sleepIntervalMillis); - assertEquals("startTime should be zero", 0, startTimeInDoWork.get()); - assertEquals("endTime should be zero", 0, endTimeInDoWork.get()); - assertEquals("Do not aggregate on first run", 0, actualRuns); - - //Test first run with Too Old checkpoint - currentTime = System.currentTimeMillis(); - checkPoint.set(currentTime - 16*60*1000); //Old checkpoint - agg.runOnce(sleepIntervalMillis); - long checkPointTime = getRoundedAggregateTimeMillis(sleepIntervalMillis); - assertEquals("startTime should be zero", checkPointTime - sleepIntervalMillis, startTimeInDoWork.get()); - assertEquals("endTime should be zero", checkPointTime, endTimeInDoWork.get()); - assertEquals(roundedOffAggregatorTime, checkPoint.get()); - assertEquals("Do not aggregate on first run", 1, actualRuns); - - -// //Test first run with perfect checkpoint (sleepIntervalMillis back) - currentTime = System.currentTimeMillis(); - roundedOffAggregatorTime = getRoundedCheckPointTimeMillis(currentTime, - sleepIntervalMillis); - checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis; - long expectedCheckPoint = getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis); - checkPoint.set(checkPointTime); - agg.runOnce(sleepIntervalMillis); - assertEquals("startTime should the lower rounded time of the checkpoint time", - expectedCheckPoint, startTimeInDoWork.get()); - assertEquals("endTime should the lower rounded time of the checkpoint time + sleepIntervalMillis", - expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get()); - assertEquals(expectedCheckPoint + sleepIntervalMillis, - checkPoint.get()); - assertEquals("Aggregate on first run", 2, actualRuns); - - //Test edge case for checkpoint (2 x sleepIntervalMillis) - currentTime = System.currentTimeMillis(); - checkPoint.set(currentTime - 2*sleepIntervalMillis + 5000); - agg.runOnce(sleepIntervalMillis); - long expectedStartTime = getRoundedCheckPointTimeMillis(currentTime - 2*sleepIntervalMillis + 5000, sleepIntervalMillis); - assertEquals("startTime should the lower rounded time of the checkpoint time", - expectedStartTime, startTimeInDoWork.get()); - assertEquals("startTime should the lower rounded time of the checkpoint time + sleepIntervalMillis", - expectedStartTime + sleepIntervalMillis, endTimeInDoWork.get()); - assertEquals(expectedStartTime + sleepIntervalMillis, - checkPoint.get()); - assertEquals("Aggregate on second run", 3, actualRuns); - - - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/DownSamplerTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/DownSamplerTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/DownSamplerTest.java deleted file mode 100644 index 0fca2e3..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/DownSamplerTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - -import junit.framework.Assert; -import org.apache.hadoop.conf.Configuration; -import org.junit.Ignore; -import org.junit.Test; - -import java.util.List; -import java.util.Map; - -public class DownSamplerTest { - - @Test - public void testGetDownsampleMetricPatterns() throws Exception { - - Configuration configuration = new Configuration(); - configuration.setIfUnset("timeline.metrics.downsampler.topn.metric.patterns", "pattern1,pattern2"); - configuration.setIfUnset("timeline.metrics.downsampler.lastvalue.metric.patterns", "pattern3"); - - List<String> patterns = DownSamplerUtils.getDownsampleMetricPatterns(configuration); - Assert.assertEquals(patterns.size(), 3); - Assert.assertTrue(patterns.contains("pattern1")); - Assert.assertTrue(patterns.contains("pattern2")); - Assert.assertTrue(patterns.contains("pattern3")); - - Configuration configuration2 = new Configuration(); - patterns = DownSamplerUtils.getDownsampleMetricPatterns(configuration2); - Assert.assertEquals(patterns.size(), 0); - } - - @Test - public void testGetDownSamplers() throws Exception { - - Configuration configuration = new Configuration(); - configuration.setIfUnset("timeline.metrics.downsampler.topn.metric.patterns", "pattern1,pattern2"); - configuration.setIfUnset("timeline.metrics.downsampler.test.metric.patterns", "pattern3"); - - List<CustomDownSampler> downSamplers = DownSamplerUtils.getDownSamplers(configuration); - Assert.assertEquals(downSamplers.size(), 1); - Assert.assertTrue(downSamplers.get(0) instanceof TopNDownSampler); - } - - @Ignore - @Test - public void testPrepareTopNDownSamplingStatement() throws Exception { - Configuration configuration = new Configuration(); - configuration.setIfUnset("timeline.metrics.downsampler.topn.metric.patterns", "pattern1,pattern2"); - configuration.setIfUnset("timeline.metrics.downsampler.topn.value", "3"); - - Map<String, String> conf = configuration.getValByRegex(DownSamplerUtils.downSamplerConfigPrefix); - - TopNDownSampler topNDownSampler = TopNDownSampler.fromConfig(conf); - List<String> stmts = topNDownSampler.prepareDownSamplingStatement(14000000l, 14100000l, "METRIC_RECORD"); - Assert.assertEquals(stmts.size(),2); - Assert.assertTrue(stmts.contains("SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, 14100000 AS SERVER_TIME, UNITS, " + - "MAX(METRIC_MAX), 1, MAX(METRIC_MAX), MAX(METRIC_MAX) FROM METRIC_RECORD WHERE " + - "METRIC_NAME LIKE 'pattern1' AND SERVER_TIME > 14000000 AND SERVER_TIME <= 14100000 " + - "GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS ORDER BY MAX(METRIC_MAX) DESC LIMIT 3")); - - Assert.assertTrue(stmts.contains("SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, 14100000 AS SERVER_TIME, UNITS, " + - "MAX(METRIC_MAX), 1, MAX(METRIC_MAX), MAX(METRIC_MAX) FROM METRIC_RECORD WHERE " + - "METRIC_NAME LIKE 'pattern2' AND SERVER_TIME > 14000000 AND SERVER_TIME <= 14100000 " + - "GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS ORDER BY MAX(METRIC_MAX) DESC LIMIT 3")); - - configuration.clear(); - configuration.setIfUnset("timeline.metrics.downsampler.topn.metric.patterns", "pattern1"); - configuration.setIfUnset("timeline.metrics.downsampler.topn.value", "4"); - configuration.setIfUnset("timeline.metrics.downsampler.topn.function", "sum"); - conf = configuration.getValByRegex(DownSamplerUtils.downSamplerConfigPrefix); - topNDownSampler = TopNDownSampler.fromConfig(conf); - stmts = topNDownSampler.prepareDownSamplingStatement(14000000l, 14100000l, "METRIC_AGGREGATE_MINUTE"); - Assert.assertEquals(stmts.size(),1); - - Assert.assertTrue(stmts.contains("SELECT METRIC_NAME, APP_ID, INSTANCE_ID, 14100000 AS SERVER_TIME, UNITS, " + - "SUM(METRIC_SUM), 1, SUM(METRIC_SUM), SUM(METRIC_SUM) FROM METRIC_AGGREGATE_MINUTE WHERE " + - "METRIC_NAME LIKE 'pattern1' AND SERVER_TIME > 14000000 AND SERVER_TIME <= 14100000 " + - "GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS ORDER BY SUM(METRIC_SUM) DESC LIMIT 4")); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java deleted file mode 100644 index 1c5f41f..0000000 --- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java +++ /dev/null @@ -1,669 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators; - - -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNotNull; -import static junit.framework.Assert.assertTrue; -import static junit.framework.Assert.fail; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.createEmptyTimelineClusterMetric; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper.prepareSingleTimelineMetric; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_APP_IDS; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME; -import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME; - -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.TreeMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; -import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition; -import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL; -import org.junit.Test; - -import junit.framework.Assert; - -public class ITClusterAggregator extends AbstractMiniHBaseClusterTest { - private final TimelineMetricReadHelper metricReader = new TimelineMetricReadHelper(false); - - private Configuration getConfigurationForTest(boolean useGroupByAggregators) { - Configuration configuration = new Configuration(); - configuration.set("timeline.metrics.service.use.groupBy.aggregators", String.valueOf(useGroupByAggregators)); - return configuration; - } - - @Test - public void testShouldAggregateClusterProperly() throws Exception { - // GIVEN - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, - getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000; - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_free", 1)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "disk_free", 2)); - ctime += 2*minute; - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_free", 2)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "disk_free", 1)); - - // WHEN - long endTime = ctime + minute; - boolean success = agg.doWork(startTime, endTime); - - //THEN - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, - METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - ResultSet rs = pstmt.executeQuery(); - - int recordCount = 0; - while (rs.next()) { - TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); - MetricClusterAggregate currentHostAggregate = - readHelper.getMetricClusterAggregateFromResultSet(rs); - - if ("disk_free".equals(currentMetric.getMetricName())) { - assertEquals(2, currentHostAggregate.getNumberOfHosts()); - assertEquals(2.0, currentHostAggregate.getMax()); - assertEquals(1.0, currentHostAggregate.getMin()); - assertEquals(3.0, currentHostAggregate.getSum()); - recordCount++; - } else { - fail("Unexpected entry"); - } - } - } - - @Test - public void testShouldAggregateClusterIgnoringInstance() throws Exception { - // GIVEN - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, - getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000 * 2; - - /** - * Here we have two nodes with two instances each: - * | local1 | local2 | - * instance i1 | 1 | 2 | - * instance i2 | 3 | 4 | - * - */ - // Four 1's at ctime - 100 - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1", - "i1", "disk_free", 1)); - // Four 2's at ctime - 100: different host - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2", - "i1", "disk_free", 2)); - // Avoid overwrite - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1", - "i2", "disk_free", 3)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2", - "i2", "disk_free", 4)); - - ctime += minute; - - // Four 1's at ctime + 2 min - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local1", - "i1", "disk_free", 1)); - // Four 1's at ctime + 2 min - different host - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime - 100, "local2", - "i1", "disk_free", 3)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local1", - "i2", "disk_free", 2)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime + 100, "local2", - "i2", "disk_free", 4)); - // WHEN - long endTime = ctime + minute; - boolean success = agg.doWork(startTime - 1000, endTime + 1000); - - //THEN - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, - METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - ResultSet rs = pstmt.executeQuery(); - - int recordCount = 0; - while (rs.next()) { - TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); - MetricClusterAggregate currentHostAggregate = - readHelper.getMetricClusterAggregateFromResultSet(rs); - - if ("disk_free".equals(currentMetric.getMetricName())) { - System.out.println("OUTPUT: " + currentMetric + " - " + currentHostAggregate); - assertEquals(2, currentHostAggregate.getNumberOfHosts()); - assertEquals(5.0, Math.floor(currentHostAggregate.getSum())); - recordCount++; - } else { - fail("Unexpected entry"); - } - } - - Assert.assertEquals(5, recordCount); - } - - @Test - public void testShouldAggregateDifferentMetricsOnClusterProperly() throws Exception { - // GIVEN - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, - getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - - // here we put some metrics tha will be aggregated - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000; - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_free", 1)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "disk_free", 2)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_used", 1)); - - ctime += 2*minute; - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_free", 2)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "disk_free", 1)); - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "disk_used", 1)); - - // WHEN - long endTime = ctime + minute; - boolean success = agg.doWork(startTime, endTime); - - //THEN - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, - METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - ResultSet rs = pstmt.executeQuery(); - - int recordCount = 0; - while (rs.next()) { - TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); - MetricClusterAggregate currentHostAggregate = - readHelper.getMetricClusterAggregateFromResultSet(rs); - - if ("disk_free".equals(currentMetric.getMetricName())) { - assertEquals(2, currentHostAggregate.getNumberOfHosts()); - assertEquals(2.0, currentHostAggregate.getMax()); - assertEquals(1.0, currentHostAggregate.getMin()); - assertEquals(3.0, currentHostAggregate.getSum()); - recordCount++; - } else if ("disk_used".equals(currentMetric.getMetricName())) { - assertEquals(1, currentHostAggregate.getNumberOfHosts()); - assertEquals(1.0, currentHostAggregate.getMax()); - assertEquals(1.0, currentHostAggregate.getMin()); - assertEquals(1.0, currentHostAggregate.getSum()); - recordCount++; - } else { - fail("Unexpected entry"); - } - } - } - - @Test - public void testAggregateDailyClusterMetrics() throws Exception { - // GIVEN - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hdb, getConfigurationForTest(false), null, null); - - // this time can be virtualized! or made independent from real clock - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long hour = 3600 * 1000; - - Map<TimelineClusterMetric, MetricHostAggregate> records = - new HashMap<TimelineClusterMetric, MetricHostAggregate>(); - - records.put(createEmptyTimelineClusterMetric(ctime), - MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); - records.put(createEmptyTimelineClusterMetric(ctime += hour), - MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); - records.put(createEmptyTimelineClusterMetric(ctime += hour), - MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); - records.put(createEmptyTimelineClusterMetric(ctime += hour), - MetricTestHelper.createMetricHostAggregate(4.0, 0.0, 2, 4.0)); - - - hdb.saveClusterAggregateRecordsSecond(records, METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); - - // WHEN - agg.doWork(startTime, ctime + hour + 1000); - - // THEN - ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_DAILY"); - int count = 0; - while (rs.next()) { - assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME")); - assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); - assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); - assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); - assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); - assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); - count++; - } - - assertEquals("Day aggregated row expected ", 1, count); - } - - @Test - public void testShouldAggregateClusterOnMinuteProperly() throws Exception { - - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hdb, getConfigurationForTest(false), null, null); - - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long second = 1000; - long minute = 60*second; - - Map<TimelineClusterMetric, MetricClusterAggregate> records = - new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); - - records.put(createEmptyTimelineClusterMetric(ctime), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += second), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += second), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += second), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - - hdb.saveClusterAggregateRecords(records); - agg.doWork(startTime, ctime + second); - long oldCtime = ctime + second; - - //Next minute - ctime = startTime + minute; - - records.put(createEmptyTimelineClusterMetric(ctime), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += second), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += second), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += second), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - - hdb.saveClusterAggregateRecords(records); - agg.doWork(oldCtime, ctime + second); - - ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_MINUTE"); - int count = 0; - long diff = 0 ; - while (rs.next()) { - assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME")); - assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); - assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); - assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); - assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); - assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); - if (count == 0) { - diff+=rs.getLong("SERVER_TIME"); - } else { - diff-=rs.getLong("SERVER_TIME"); - if (diff < 0) { - diff*=-1; - } - assertTrue(diff == minute); - } - count++; - } - - assertEquals("One hourly aggregated row expected ", 2, count); - } - - @Test - public void testShouldAggregateClusterOnHourProperly() throws Exception { - // GIVEN - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false), null, null); - - // this time can be virtualized! or made independent from real clock - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000; - - Map<TimelineClusterMetric, MetricClusterAggregate> records = - new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); - - records.put(createEmptyTimelineClusterMetric(ctime), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric(ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - - hdb.saveClusterAggregateRecords(records); - - // WHEN - agg.doWork(startTime, ctime + minute); - - // THEN - ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); - int count = 0; - while (rs.next()) { - assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME")); - assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); - assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); - assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); - assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); - assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); - count++; - } - - assertEquals("One hourly aggregated row expected ", 1, count); - } - - @Test - public void testShouldAggregateDifferentMetricsOnHourProperly() throws Exception { - // GIVEN - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(false), null, null); - - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000; - - Map<TimelineClusterMetric, MetricClusterAggregate> records = - new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - hdb.saveClusterAggregateRecords(records); - - // WHEN - agg.doWork(startTime, ctime + minute); - - // THEN - ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); - int count = 0; - while (rs.next()) { - if ("disk_used".equals(rs.getString("METRIC_NAME"))) { - assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); - assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM")); - assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); - assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); - assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); - } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) { - assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); - assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM")); - assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT")); - assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX")); - assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN")); - } - - count++; - } - - assertEquals("Two hourly aggregated row expected ", 2, count); - } - - @Test - public void testAppLevelHostMetricAggregates() throws Exception { - Configuration conf = getConfigurationForTest(false); - conf.set(CLUSTER_AGGREGATOR_APP_IDS, "app1"); - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, - conf, new TimelineMetricMetadataManager(new Configuration(), hdb), null, null); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000; - hdb.insertMetricRecords(prepareSingleTimelineMetric((ctime), "local1", - "app1", null, "app_metric_random", 1)); - ctime += 10; - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", - "cpu_user", 1)); - ctime += 10; - hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", - "cpu_user", 2)); - - // WHEN - long endTime = ctime + minute; - boolean success = agg.doWork(startTime, endTime); - - //THEN - Condition condition = new DefaultCondition( - new ArrayList<String>() {{ add("cpu_user"); }}, null, "app1", null, - startTime, endTime, null, null, true); - condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, - METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - ResultSet rs = pstmt.executeQuery(); - - int recordCount = 0; - TimelineClusterMetric currentMetric = null; - MetricClusterAggregate currentHostAggregate = null; - while (rs.next()) { - currentMetric = metricReader.fromResultSet(rs); - currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs); - recordCount++; - } - assertEquals(3, recordCount); - assertNotNull(currentMetric); - assertEquals("cpu_user", currentMetric.getMetricName()); - assertEquals("app1", currentMetric.getAppId()); - assertNotNull(currentHostAggregate); - assertEquals(1, currentHostAggregate.getNumberOfHosts()); - assertEquals(1.0d, currentHostAggregate.getSum()); - } - - @Test - public void testClusterAggregateMetricNormalization() throws Exception { - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, - getConfigurationForTest(false), new TimelineMetricMetadataManager(new Configuration(), hdb), null, null); - TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(false); - - // Sample data - TimelineMetric metric1 = new TimelineMetric(); - metric1.setMetricName("yarn.ClusterMetrics.NumActiveNMs"); - metric1.setAppId("resourcemanager"); - metric1.setHostName("h1"); - metric1.setStartTime(1431372311811l); - metric1.setMetricValues(new TreeMap<Long, Double>() {{ - put(1431372311811l, 1.0); - put(1431372321811l, 1.0); - put(1431372331811l, 1.0); - put(1431372341811l, 1.0); - put(1431372351811l, 1.0); - put(1431372361811l, 1.0); - put(1431372371810l, 1.0); - }}); - - TimelineMetric metric2 = new TimelineMetric(); - metric2.setMetricName("yarn.ClusterMetrics.NumActiveNMs"); - metric2.setAppId("resourcemanager"); - metric2.setHostName("h1"); - metric2.setStartTime(1431372381810l); - metric2.setMetricValues(new TreeMap<Long, Double>() {{ - put(1431372381810l, 1.0); - put(1431372391811l, 1.0); - put(1431372401811l, 1.0); - put(1431372411811l, 1.0); - put(1431372421811l, 1.0); - put(1431372431811l, 1.0); - put(1431372441810l, 1.0); - }}); - - TimelineMetrics metrics = new TimelineMetrics(); - metrics.setMetrics(Collections.singletonList(metric1)); - insertMetricRecords(conn, metrics, 1431372371810l); - - metrics.setMetrics(Collections.singletonList(metric2)); - insertMetricRecords(conn, metrics, 1431372441810l); - - long startTime = 1431372055000l; - long endTime = 1431372655000l; - - agg.doWork(startTime, endTime); - - Condition condition = new DefaultCondition(null, null, null, null, startTime, - endTime, null, null, true); - condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL, - METRICS_CLUSTER_AGGREGATE_TABLE_NAME)); - - PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); - ResultSet rs = pstmt.executeQuery(); - - int recordCount = 0; - while (rs.next()) { - TimelineClusterMetric currentMetric = metricReader.fromResultSet(rs); - MetricClusterAggregate currentHostAggregate = readHelper.getMetricClusterAggregateFromResultSet(rs); - - if ("yarn.ClusterMetrics.NumActiveNMs".equals(currentMetric.getMetricName())) { - assertEquals(1, currentHostAggregate.getNumberOfHosts()); - assertEquals(1.0, currentHostAggregate.getMax()); - assertEquals(1.0, currentHostAggregate.getMin()); - assertEquals(1.0, currentHostAggregate.getSum()); - recordCount++; - } else { - fail("Unexpected entry"); - } - } - Assert.assertEquals(5, recordCount); - } - - @Test - public void testAggregationUsingGroupByQuery() throws Exception { - // GIVEN - TimelineMetricAggregator agg = - TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, getConfigurationForTest(true), null, null); - - long startTime = System.currentTimeMillis(); - long ctime = startTime; - long minute = 60 * 1000; - - Map<TimelineClusterMetric, MetricClusterAggregate> records = - new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - records.put(createEmptyTimelineClusterMetric("disk_used", ctime += minute), - new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); - records.put(createEmptyTimelineClusterMetric("disk_free", ctime), - new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0)); - - hdb.saveClusterAggregateRecords(records); - - // WHEN - agg.doWork(startTime, ctime + minute); - - // THEN - ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY"); - int count = 0; - while (rs.next()) { - if ("disk_used".equals(rs.getString("METRIC_NAME"))) { - assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); - assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM")); - assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT")); - assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX")); - assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN")); - } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) { - assertEquals("APP_ID", "test_app", rs.getString("APP_ID")); - assertEquals("METRIC_SUM", 1.0, rs.getDouble("METRIC_SUM")); - assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT")); - assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX")); - assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN")); - } - count++; - } - assertEquals("Two hourly aggregated row expected ", 2, count); - } - - private ResultSet executeQuery(String query) throws SQLException { - Connection conn = getConnection(getUrl()); - Statement stmt = conn.createStatement(); - return stmt.executeQuery(query); - } -}
