http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsServiceTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsServiceTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsServiceTest.java new file mode 100644 index 0000000..514bf86 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsServiceTest.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import com.google.common.collect.Multimap; +import junit.framework.Assert; +import org.apache.ambari.metrics.core.timeline.aggregators.Function; +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.apache.ambari.metrics.core.timeline.aggregators.Function.ReadFunction.AVG; +import static org.apache.ambari.metrics.core.timeline.aggregators.Function.ReadFunction.SUM; +import static org.apache.ambari.metrics.core.timeline.aggregators.Function.PostProcessingFunction.RATE; +import static org.assertj.core.api.Assertions.assertThat; + +public class HBaseTimelineMetricsServiceTest { + + public static final String MEM_METRIC = "mem"; + public static final String BYTES_IN_METRIC = "bytes_in"; + public static final String BYTES_NOT_AFUNCTION_METRIC = "bytes._not._afunction"; + + @Test + public void testParseMetricNamesToAggregationFunctions() throws Exception { + //giwen + List<String> metricNames = Arrays.asList( + MEM_METRIC + "._avg", + MEM_METRIC + "._sum", + MEM_METRIC + "._rate._avg", + BYTES_IN_METRIC, + BYTES_NOT_AFUNCTION_METRIC); + + //when + Multimap<String, List<Function>> multimap = + HBaseTimelineMetricsService.parseMetricNamesToAggregationFunctions(metricNames); + + //then + Assert.assertEquals(multimap.keySet().size(), 3); + Assert.assertTrue(multimap.containsKey(MEM_METRIC)); + Assert.assertTrue(multimap.containsKey(BYTES_IN_METRIC)); + Assert.assertTrue(multimap.containsKey(BYTES_NOT_AFUNCTION_METRIC)); + + List<List<Function>> metricEntry = (List<List<Function>>) multimap.get(MEM_METRIC); + HashMap<String, List<Function>> mfm = new HashMap<String, List<Function>>(); + mfm.put(MEM_METRIC, metricEntry.get(0)); + + assertThat(mfm.get(MEM_METRIC)).containsOnly( + new Function(AVG, null)); + + mfm = new HashMap<String, List<Function>>(); + mfm.put(MEM_METRIC, metricEntry.get(1)); + assertThat(mfm.get(MEM_METRIC)).containsOnly( + new Function(SUM, null)); + + mfm = new HashMap<String, List<Function>>(); + mfm.put(MEM_METRIC, metricEntry.get(2)); + assertThat(mfm.get(MEM_METRIC)).containsOnly( + new Function(AVG, RATE)); + + metricEntry = (List<List<Function>>) multimap.get(BYTES_IN_METRIC); + mfm = new HashMap<String, List<Function>>(); + mfm.put(BYTES_IN_METRIC, metricEntry.get(0)); + + assertThat(mfm.get(BYTES_IN_METRIC)) + .contains(Function.DEFAULT_VALUE_FUNCTION); + + metricEntry = (List<List<Function>>) multimap.get(BYTES_NOT_AFUNCTION_METRIC); + mfm = new HashMap<String, List<Function>>(); + mfm.put(BYTES_NOT_AFUNCTION_METRIC, metricEntry.get(0)); + + assertThat(mfm.get(BYTES_NOT_AFUNCTION_METRIC)) + .contains(Function.DEFAULT_VALUE_FUNCTION); + + } + + @Test + public void testRateCalculationOnMetricsWithEqualValues() throws Exception { + Map<Long, Double> metricValues = new TreeMap<>(); + metricValues.put(1454000000000L, 1.0); + metricValues.put(1454000001000L, 6.0); + metricValues.put(1454000002000L, 0.0); + metricValues.put(1454000003000L, 3.0); + metricValues.put(1454000004000L, 4.0); + metricValues.put(1454000005000L, 7.0); + + // Calculate rate + Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), false); + + // Make sure rate is zero + Assert.assertTrue(rates.size() == 4); + + Assert.assertFalse(rates.containsKey(1454000000000L)); + Assert.assertFalse(rates.containsKey(1454000002000L)); + + Assert.assertEquals(rates.get(1454000001000L), 5.0); + Assert.assertEquals(rates.get(1454000003000L), 3.0); + Assert.assertEquals(rates.get(1454000004000L), 1.0); + Assert.assertEquals(rates.get(1454000005000L), 3.0); + } + + @Test + public void testDiffCalculation() throws Exception { + Map<Long, Double> metricValues = new TreeMap<>(); + metricValues.put(1454016368371L, 1011.25); + metricValues.put(1454016428371L, 1010.25); + metricValues.put(1454016488371L, 1012.25); + metricValues.put(1454016548371L, 1015.25); + metricValues.put(1454016608371L, 1020.25); + + Map<Long, Double> rates = HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), true); + + Assert.assertTrue(rates.size() == 3); + Assert.assertTrue(rates.containsValue(2.0)); + Assert.assertTrue(rates.containsValue(3.0)); + Assert.assertTrue(rates.containsValue(5.0)); + } +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java new file mode 100644 index 0000000..d4171bc --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java @@ -0,0 +1,472 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; +import static org.apache.ambari.metrics.core.timeline.MetricTestHelper.createEmptyTimelineClusterMetric; +import static org.apache.ambari.metrics.core.timeline.MetricTestHelper.prepareSingleTimelineMetric; +import static org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.DATE_TIERED_COMPACTION_POLICY; +import static org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.FIFO_COMPACTION_POLICY_CLASS; +import static org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.HSTORE_COMPACTION_CLASS_KEY; +import static org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.HSTORE_ENGINE_CLASS; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES_REGEX_PATTERN_STRING; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; +import org.apache.hadoop.metrics2.sink.timeline.Precision; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.ambari.metrics.core.timeline.aggregators.Function; +import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric; +import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregator; +import org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregatorFactory; +import org.apache.ambari.metrics.core.timeline.query.Condition; +import org.apache.ambari.metrics.core.timeline.query.DefaultCondition; +import org.junit.Test; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; + +import junit.framework.Assert; + + + +public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest { + + @Test + public void testGetMetricRecordsSeconds() throws IOException, SQLException { + // GIVEN + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 1), true); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 2), true); + ctime += minute; + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 2), true); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 1), true); + + // WHEN + long endTime = ctime + minute; + List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ add("disk_free"); }}, + Collections.singletonList("local1"), + "host", null); + + Condition condition = new DefaultCondition(uuids, + new ArrayList<String>() {{ add("disk_free"); }}, + Collections.singletonList("local1"), + "host", null, startTime, endTime, Precision.SECONDS, null, true); + TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition, + singletonValueFunctionMap("disk_free")); + + //THEN + assertEquals(1, timelineMetrics.getMetrics().size()); + TimelineMetric metric = timelineMetrics.getMetrics().get(0); + + assertEquals("disk_free", metric.getMetricName()); + assertEquals("local1", metric.getHostName()); + assertEquals(8, metric.getMetricValues().size()); + } + + @Test + public void testGetMetricRecordsMinutes() throws IOException, SQLException { + // GIVEN + TimelineMetricAggregator aggregatorMinute = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new Configuration(), metadataManager, null); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + + TimelineMetrics metrics1 = MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 1); + hdb.insertMetricRecords(metrics1, true); + + TimelineMetrics metrics2 = MetricTestHelper.prepareSingleTimelineMetric(ctime + minute, "local1", + "disk_free", 2); + hdb.insertMetricRecords(metrics2, true); + + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 2), true); + long endTime = ctime + minute; + boolean success = aggregatorMinute.doWork(startTime - 1000, endTime); + assertTrue(success); + + // WHEN + List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ add("disk_%"); }}, + Collections.singletonList("local1"), + "host", null); + Condition condition = new DefaultCondition(uuids, + new ArrayList<String>() {{ add("disk_free"); }}, + Collections.singletonList("local1"), + "host", null, startTime, endTime + 1000, Precision.MINUTES, null, false); + TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition, + singletonValueFunctionMap("disk_free")); + + //THEN + assertEquals(1, timelineMetrics.getMetrics().size()); + TimelineMetric metric = timelineMetrics.getMetrics().get(0); + + assertEquals("disk_free", metric.getMetricName()); + assertEquals("local1", metric.getHostName()); + assertEquals(1, metric.getMetricValues().size()); + Iterator<Map.Entry<Long, Double>> iterator = metric.getMetricValues().entrySet().iterator(); + assertEquals(1.5, iterator.next().getValue(), 0.00001); + } + + @Test + public void testGetMetricRecordsHours() throws IOException, SQLException { + // GIVEN + TimelineMetricAggregator aggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new Configuration(), metadataManager, null); + + MetricHostAggregate expectedAggregate = + MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); + Map<TimelineMetric, MetricHostAggregate> + aggMap = new HashMap<TimelineMetric, + MetricHostAggregate>(); + + long startTime = System.currentTimeMillis(); + int min_5 = 5 * 60 * 1000; + long ctime = startTime - min_5; + aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + + hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME); + long endTime = ctime + min_5; + boolean success = aggregator.doWork(startTime - 1000, endTime); + assertTrue(success); + + // WHEN + List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ add("disk_used"); }}, + Collections.singletonList("test_host"), + "test_app", "test_instance"); + + Condition condition = new DefaultCondition(uuids, + new ArrayList<String>() {{ add("disk_used"); }}, + Collections.singletonList("test_host"), + "test_app", "test_instance", startTime, endTime + 1000, Precision.HOURS, null, true); + TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition, + singletonValueFunctionMap("disk_used")); + + //THEN + assertEquals(1, timelineMetrics.getMetrics().size()); + TimelineMetric metric = timelineMetrics.getMetrics().get(0); + + assertEquals("disk_used", metric.getMetricName()); + assertEquals("test_host", metric.getHostName()); + assertEquals(1, metric.getMetricValues().size()); + Iterator<Map.Entry<Long, Double>> iterator = metric.getMetricValues().entrySet().iterator(); + assertEquals(0.75, iterator.next().getValue(), 0.00001); + } + + @Test + public void testGetClusterMetricRecordsSeconds() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond( + hdb, new Configuration(), metadataManager, null, null); + + long startTime = System.currentTimeMillis(); + long ctime = startTime + 1; + long minute = 60 * 1000; + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 1), true); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 2), true); + ctime += minute; + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 2), true); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 1), true); + + long endTime = ctime + minute + 1; + boolean success = agg.doWork(startTime, endTime); + assertTrue(success); + + // WHEN + List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ add("disk_free"); }}, + null, + "host", null); + + Condition condition = new DefaultCondition(uuids, + new ArrayList<String>() {{ add("disk_free"); }}, + null, "host", null, startTime - 90000, endTime, Precision.SECONDS, null, true); + TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition, + singletonValueFunctionMap("disk_free")); + + //THEN + assertEquals(1, timelineMetrics.getMetrics().size()); + TimelineMetric metric = timelineMetrics.getMetrics().get(0); + + assertEquals("disk_free", metric.getMetricName()); + assertEquals(5, metric.getMetricValues().size()); + assertEquals(1.5, metric.getMetricValues().values().iterator().next(), 0.00001); + } + + @Test + public void testGetClusterMetricRecordLatestWithFunction() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb, + new Configuration(), metadataManager, null, null); + + long startTime = System.currentTimeMillis(); + long ctime = startTime + 1; + long minute = 60 * 1000; + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 1), true); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 2), true); + ctime += minute; + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1", + "disk_free", 2), true); + hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, "local2", + "disk_free", 1), true); + + long endTime = ctime + minute + 1; + boolean success = agg.doWork(startTime, endTime); + assertTrue(success); + + // WHEN + List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ add("disk_free"); }}, + null, + "host", null); + + Condition condition = new DefaultCondition(uuids, + new ArrayList<String>() {{ add("disk_free"); }}, + null, "host", null, null, null, Precision.SECONDS, null, true); + + Multimap<String, List<Function>> mmap = ArrayListMultimap.create(); + mmap.put("disk_free", Collections.singletonList(new Function(Function.ReadFunction.SUM, null))); + TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition, mmap); + + //THEN + assertEquals(1, timelineMetrics.getMetrics().size()); + TimelineMetric metric = timelineMetrics.getMetrics().get(0); + + assertEquals("disk_free._sum", metric.getMetricName()); + assertEquals(1, metric.getMetricValues().size()); + assertEquals(3, metric.getMetricValues().values().iterator().next().intValue()); + } + + @Test + public void testGetClusterMetricRecordsHours() throws Exception { + // GIVEN + TimelineMetricAggregator agg = + TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new Configuration(), metadataManager, null); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + + Map<TimelineClusterMetric, MetricClusterAggregate> records = + new HashMap<TimelineClusterMetric, MetricClusterAggregate>(); + + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += minute), + new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0)); + + hdb.saveClusterAggregateRecords(records); + boolean success = agg.doWork(startTime, ctime + minute); + assertTrue(success); + + // WHEN + List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ add("disk_used"); }}, + null, + "test_app", + "instance_id"); + + Condition condition = new DefaultCondition( uuids, + new ArrayList<String>() {{ add("disk_used"); }}, + null, + "test_app", + "instance_id", + startTime, + ctime + minute + 1000, + Precision.HOURS, + null, + true); + TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition, + singletonValueFunctionMap("disk_used")); + + // THEN + assertEquals(1, timelineMetrics.getMetrics().size()); + TimelineMetric metric = timelineMetrics.getMetrics().get(0); + + assertEquals("disk_used", metric.getMetricName()); + assertEquals("test_app", metric.getAppId()); + assertEquals(1, metric.getMetricValues().size()); + assertEquals(2.0, metric.getMetricValues().values().iterator().next(), 0.00001); + } + + @Test + public void testInitPoliciesAndTTL() throws Exception { + Admin hBaseAdmin = hdb.getHBaseAdmin(); + int precisionTtl = 2 * 86400; + + Field f = PhoenixHBaseAccessor.class.getDeclaredField("tableTTL"); + f.setAccessible(true); + Map<String, Integer> precisionValues = (Map<String, Integer>) f.get(hdb); + precisionValues.put(METRICS_RECORD_TABLE_NAME, precisionTtl); + f.set(hdb, precisionValues); + + Field f2 = PhoenixHBaseAccessor.class.getDeclaredField("timelineMetricsTablesDurability"); + f2.setAccessible(true); + f2.set(hdb, "ASYNC_WAL"); + + hdb.initPoliciesAndTTL(); + + // Verify expected policies are set + boolean normalizerEnabled = false; + String precisionTableCompactionPolicy = null; + String aggregateTableCompactionPolicy = null; + boolean tableDurabilitySet = false; + for (int i = 0; i < 10; i++) { + LOG.warn("Policy check retry : " + i); + for (String tableName : PHOENIX_TABLES) { + TableName[] tableNames = hBaseAdmin.listTableNames(PHOENIX_TABLES_REGEX_PATTERN_STRING, false); + Optional<TableName> tableNameOptional = Arrays.stream(tableNames) + .filter(t -> tableName.equals(t.getNameAsString())).findFirst(); + + TableDescriptor tableDescriptor = hBaseAdmin.getTableDescriptor(tableNameOptional.get()); + + normalizerEnabled = tableDescriptor.isNormalizationEnabled(); + tableDurabilitySet = (Durability.ASYNC_WAL.equals(tableDescriptor.getDurability())); + if (tableName.equals(METRICS_RECORD_TABLE_NAME)) { + precisionTableCompactionPolicy = tableDescriptor.getValue(HSTORE_COMPACTION_CLASS_KEY); + } else { + aggregateTableCompactionPolicy = tableDescriptor.getValue(HSTORE_ENGINE_CLASS); + } + LOG.debug("Table: " + tableName + ", normalizerEnabled = " + normalizerEnabled); + // Best effort for 20 seconds + if (normalizerEnabled || (precisionTableCompactionPolicy == null && aggregateTableCompactionPolicy == null)) { + Thread.sleep(2000l); + } + if (tableName.equals(METRICS_RECORD_TABLE_NAME)) { + for (ColumnFamilyDescriptor family : tableDescriptor.getColumnFamilies()) { + precisionTtl = family.getTimeToLive(); + } + } + } + } + + Assert.assertFalse("Normalizer disabled.", normalizerEnabled); + Assert.assertTrue("Durability Set.", tableDurabilitySet); + Assert.assertEquals("FIFO compaction policy is set for METRIC_RECORD.", FIFO_COMPACTION_POLICY_CLASS, precisionTableCompactionPolicy); + Assert.assertEquals("FIFO compaction policy is set for aggregate tables", DATE_TIERED_COMPACTION_POLICY, aggregateTableCompactionPolicy); + Assert.assertEquals("Precision TTL value as expected.", 86400, precisionTtl); + + hBaseAdmin.close(); + } + + private Multimap<String, List<Function>> singletonValueFunctionMap(String metricName) { + Multimap<String, List<Function>> mmap = ArrayListMultimap.create(); + mmap.put(metricName, Collections.singletonList(new Function())); + return mmap; + } + + @Test + public void testInsertContainerMetrics() throws Exception { + ContainerMetric metric = new ContainerMetric(); + metric.setContainerId("container_1450744875949_0001_01_000001"); + metric.setHostName("host1"); + metric.setPmemLimit(2048); + metric.setVmemLimit(2048); + metric.setPmemUsedAvg(1024); + metric.setPmemUsedMin(1024); + metric.setPmemUsedMax(1024); + metric.setLaunchDuration(2000); + metric.setLocalizationDuration(3000); + long startTime = System.currentTimeMillis(); + long finishTime = startTime + 5000; + metric.setStartTime(startTime); + metric.setFinishTime(finishTime); + metric.setExitCode(0); + List<ContainerMetric> list = Arrays.asList(metric); + hdb.insertContainerMetrics(list); + PreparedStatement stmt = conn.prepareStatement("SELECT * FROM CONTAINER_METRICS"); + ResultSet set = stmt.executeQuery(); + // check each filed is set properly when read back. + boolean foundRecord = false; + while (set.next()) { + assertEquals("application_1450744875949_0001", set.getString("APP_ID")); + assertEquals("container_1450744875949_0001_01_000001", set.getString("CONTAINER_ID")); + assertEquals(new java.sql.Timestamp(startTime), set.getTimestamp("START_TIME")); + assertEquals(new java.sql.Timestamp(finishTime), set.getTimestamp("FINISH_TIME")); + assertEquals(5000, set.getLong("DURATION")); + assertEquals("host1", set.getString("HOSTNAME")); + assertEquals(0, set.getInt("EXIT_CODE")); + assertEquals(3000, set.getLong("LOCALIZATION_DURATION")); + assertEquals(2000, set.getLong("LAUNCH_DURATION")); + assertEquals((double)2, set.getDouble("MEM_REQUESTED_GB")); + assertEquals((double)2 * 5000, set.getDouble("MEM_REQUESTED_GB_MILLIS")); + assertEquals((double)2, set.getDouble("MEM_VIRTUAL_GB")); + assertEquals((double)1, set.getDouble("MEM_USED_GB_MIN")); + assertEquals((double)1, set.getDouble("MEM_USED_GB_MAX")); + assertEquals((double)1, set.getDouble("MEM_USED_GB_AVG")); + assertEquals((double)(2 - 1), set.getDouble("MEM_UNUSED_GB")); + assertEquals((double)(2-1) * 5000, set.getDouble("MEM_UNUSED_GB_MILLIS")); + foundRecord = true; + } + assertTrue(foundRecord); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/MetricTestHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/MetricTestHelper.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/MetricTestHelper.java new file mode 100644 index 0000000..a99d488 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/MetricTestHelper.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric; + +import java.util.Arrays; +import java.util.TreeMap; + +public class MetricTestHelper { + + public static MetricHostAggregate createMetricHostAggregate(double max, double min, int numberOfSamples, double sum) { + MetricHostAggregate expectedAggregate = new MetricHostAggregate(); + expectedAggregate.setMax(max); + expectedAggregate.setMin(min); + expectedAggregate.setNumberOfSamples(numberOfSamples); + expectedAggregate.setSum(sum); + + return expectedAggregate; + } + + public static TimelineMetrics prepareSingleTimelineMetric(long startTime, + String host, + String metricName, + double val) { + return prepareSingleTimelineMetric(startTime, host, null, metricName, val); + } + + public static TimelineMetrics prepareSingleTimelineMetric(long startTime, + String host, + String instanceId, + String metricName, + double val) { + TimelineMetrics m = new TimelineMetrics(); + m.setMetrics(Arrays.asList( + createTimelineMetric(startTime, metricName, host, null, instanceId, val))); + + return m; + } + + public static TimelineMetrics prepareSingleTimelineMetric(long startTime, + String host, + String appId, + String instanceId, + String metricName, + double val) { + TimelineMetrics m = new TimelineMetrics(); + m.setMetrics(Arrays.asList( + createTimelineMetric(startTime, metricName, host, appId, instanceId, val))); + + return m; + } + + + public static TimelineMetric createTimelineMetric(long startTime, + String metricName, + String host, + String appId, + String instanceId, + double val) { + TimelineMetric m = new TimelineMetric(); + m.setHostName(host); + m.setAppId(appId != null ? appId : "host"); + m.setInstanceId(instanceId); + m.setMetricName(metricName); + m.setStartTime(startTime); + TreeMap<Long, Double> vals = new TreeMap<Long, Double>(); + vals.put(startTime + 15000l, val); + vals.put(startTime + 30000l, val); + vals.put(startTime + 45000l, val); + vals.put(startTime + 60000l, val); + + m.setMetricValues(vals); + + return m; + } + + public static TimelineMetric createEmptyTimelineMetric(long startTime) { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName("disk_used"); + metric.setAppId("test_app"); + metric.setInstanceId("test_instance"); + metric.setHostName("test_host"); + metric.setStartTime(startTime); + + return metric; + } + + public static TimelineClusterMetric createEmptyTimelineClusterMetric( + String name, long startTime) { + TimelineClusterMetric metric = new TimelineClusterMetric(name, + "test_app", "instance_id", startTime); + + return metric; + } + + public static TimelineClusterMetric createEmptyTimelineClusterMetric( + long startTime) { + return createEmptyTimelineClusterMetric("disk_used", startTime); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java new file mode 100644 index 0000000..8c31691 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; +import org.apache.hadoop.metrics2.sink.timeline.Precision; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric; +import org.apache.ambari.metrics.core.timeline.aggregators.Function; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; +import org.apache.ambari.metrics.core.timeline.query.Condition; +import org.apache.ambari.metrics.core.timeline.query.DefaultCondition; +import org.apache.ambari.metrics.core.timeline.query.PhoenixConnectionProvider; +import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL; +import org.apache.phoenix.exception.PhoenixIOException; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.powermock.api.easymock.PowerMock.*; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({PhoenixTransactSQL.class, TimelineMetricConfiguration.class}) +public class PhoenixHBaseAccessorTest { + private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; + + PhoenixConnectionProvider connectionProvider; + PhoenixHBaseAccessor accessor; + + @Before + public void setupConf() throws Exception { + Configuration hbaseConf = new Configuration(); + hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum"); + Configuration metricsConf = new Configuration(); + metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, "1"); + metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, "100"); + metricsConf.setStrings( + TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS, + "org.apache.ambari.metrics.core.timeline.TimelineMetricsAggregatorMemorySink"); + + TimelineMetricConfiguration conf = new TimelineMetricConfiguration(hbaseConf, metricsConf); + mockStatic(TimelineMetricConfiguration.class); + expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes(); + replayAll(); + + connectionProvider = new PhoenixConnectionProvider() { + @Override + public HBaseAdmin getHBaseAdmin() throws IOException { + return null; + } + + @Override + public Connection getConnection() throws SQLException { + return null; + } + + }; + + accessor = new PhoenixHBaseAccessor(connectionProvider); + } + + @Test + public void testGetMetricRecords() throws SQLException, IOException { + List<String> metricNames = new LinkedList<>(); + List<String> hostnames = new LinkedList<>(); + Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create(); + + mockStatic(PhoenixTransactSQL.class); + PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class); + Condition condition = new DefaultCondition(metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true); + expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once(); + ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class); + expect(preparedStatementMock.executeQuery()).andReturn(rsMock); + + + replayAll(); + EasyMock.replay(preparedStatementMock, rsMock); + + // Check when startTime < endTime + TimelineMetrics tml = accessor.getMetricRecords(condition, metricFunctions); + + // Check when startTime > endTime + Condition condition2 = new DefaultCondition(metricNames, hostnames, "appid", "instanceid", 234L, 123L, Precision.SECONDS, 10, true); + TimelineMetrics tml2 = accessor.getMetricRecords(condition2, metricFunctions); + assertEquals(0, tml2.getMetrics().size()); + + verifyAll(); + EasyMock.verify(preparedStatementMock, rsMock); + } + + @Test + public void testGetMetricRecordsIOException() throws SQLException, IOException { + List<String> metricNames = new LinkedList<>(); + List<String> hostnames = new LinkedList<>(); + Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create(); + + mockStatic(PhoenixTransactSQL.class); + PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class); + Condition condition = new DefaultCondition(metricNames, hostnames, "appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true); + expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, condition)).andReturn(preparedStatementMock).once(); + ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class); + RuntimeException runtimeException = EasyMock.createNiceMock(RuntimeException.class); + IOException io = EasyMock.createNiceMock(IOException.class); + expect(preparedStatementMock.executeQuery()).andThrow(runtimeException); + expect(runtimeException.getCause()).andReturn(io).atLeastOnce(); + StackTraceElement stackTrace[] = new StackTraceElement[]{new StackTraceElement("TimeRange","method","file",1)}; + expect(io.getStackTrace()).andReturn(stackTrace).atLeastOnce(); + + + replayAll(); + EasyMock.replay(preparedStatementMock, rsMock, io, runtimeException); + + TimelineMetrics tml = accessor.getMetricRecords(condition, metricFunctions); + + assertEquals(0, tml.getMetrics().size()); + + verifyAll(); + EasyMock.verify(preparedStatementMock, rsMock, io, runtimeException); + } + + @Test + public void testGetMetricRecordsPhoenixIOExceptionDoNotRetryException() throws SQLException, IOException { + List<String> metricNames = new LinkedList<>(); + List<String> hostnames = new LinkedList<>(); + Multimap<String, List<Function>> metricFunctions = ArrayListMultimap.create(); + + mockStatic(PhoenixTransactSQL.class); + PreparedStatement preparedStatementMock = EasyMock.createNiceMock(PreparedStatement.class); + Condition condition = new DefaultCondition(metricNames, hostnames, "appid", "instanceid", null, null, Precision.SECONDS, 10, true); + expect(PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(null, condition)).andReturn(preparedStatementMock).once(); + PhoenixTransactSQL.setSortMergeJoinEnabled(true); + EasyMock.expectLastCall(); + ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class); + PhoenixIOException pioe1 = EasyMock.createNiceMock(PhoenixIOException.class); + PhoenixIOException pioe2 = EasyMock.createNiceMock(PhoenixIOException.class); + DoNotRetryIOException dnrioe = EasyMock.createNiceMock(DoNotRetryIOException.class); + expect(preparedStatementMock.executeQuery()).andThrow(pioe1); + expect(pioe1.getCause()).andReturn(pioe2).atLeastOnce(); + expect(pioe2.getCause()).andReturn(dnrioe).atLeastOnce(); + StackTraceElement stackTrace[] = new StackTraceElement[]{new StackTraceElement("HashJoinRegionScanner","method","file",1)}; + expect(dnrioe.getStackTrace()).andReturn(stackTrace).atLeastOnce(); + + + replayAll(); + EasyMock.replay(preparedStatementMock, rsMock, pioe1, pioe2, dnrioe); + try { + accessor.getMetricRecords(condition, metricFunctions); + fail(); + } catch (Exception e) { + //NOP + } + verifyAll(); + } + + @Test + public void testMetricsCacheCommittingWhenFull() throws IOException, SQLException { + Configuration hbaseConf = new Configuration(); + hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum"); + + final Connection connection = EasyMock.createNiceMock(Connection.class); + + accessor = new PhoenixHBaseAccessor(connectionProvider) { + @Override + public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection) { + try { + connection.commit(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + }; + + TimelineMetrics timelineMetrics = EasyMock.createNiceMock(TimelineMetrics.class); + expect(timelineMetrics.getMetrics()).andReturn(Collections.singletonList(new TimelineMetric())).anyTimes(); + connection.commit(); + EasyMock.expectLastCall().once(); + + EasyMock.replay(timelineMetrics, connection); + + accessor.insertMetricRecords(timelineMetrics); + accessor.insertMetricRecords(timelineMetrics); + accessor.insertMetricRecords(timelineMetrics); + + EasyMock.verify(timelineMetrics, connection); + } + + @Test + public void testMetricsAggregatorSink() throws IOException, SQLException { + Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMap = + new HashMap<>(); + Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap = + new HashMap<>(); + Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = new HashMap<>(); + + + final Connection connection = EasyMock.createNiceMock(Connection.class); + final PreparedStatement statement = EasyMock.createNiceMock(PreparedStatement.class); + expect(connection.prepareStatement(EasyMock.anyString())).andReturn(statement).anyTimes(); + EasyMock.replay(statement); + EasyMock.replay(connection); + + connectionProvider = new PhoenixConnectionProvider() { + + @Override + public HBaseAdmin getHBaseAdmin() throws IOException { + return null; + } + + @Override + public Connection getConnection() throws SQLException { + return connection; + } + }; + + accessor = new PhoenixHBaseAccessor(connectionProvider); + + TimelineClusterMetric clusterMetric = + new TimelineClusterMetric("metricName", "appId", "instanceId", + System.currentTimeMillis()); + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName("Metric1"); + timelineMetric.setType("type1"); + timelineMetric.setAppId("App1"); + timelineMetric.setInstanceId("instance1"); + timelineMetric.setHostName("host1"); + + clusterAggregateMap.put(clusterMetric, new MetricClusterAggregate()); + clusterTimeAggregateMap.put(clusterMetric, new MetricHostAggregate()); + hostAggregateMap.put(timelineMetric, new MetricHostAggregate()); + + TimelineMetricMetadataManager metricMetadataManagerMock = EasyMock.createMock(TimelineMetricMetadataManager.class); + expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).times(2); + expect(metricMetadataManagerMock.getUuid(anyObject(TimelineMetric.class))).andReturn(new byte[20]).once(); + replay(metricMetadataManagerMock); + + accessor.setMetadataInstance(metricMetadataManagerMock); + accessor.saveClusterAggregateRecords(clusterAggregateMap); + accessor.saveHostAggregateRecords(hostAggregateMap, + PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME); + accessor.saveClusterAggregateRecordsSecond(clusterTimeAggregateMap, + PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME); + + TimelineMetricsAggregatorMemorySink memorySink = + new TimelineMetricsAggregatorMemorySink(); + assertEquals(1, memorySink.getClusterAggregateRecords().size()); + assertEquals(1, memorySink.getClusterTimeAggregateRecords().size()); + assertEquals(1, memorySink.getHostAggregateRecords().size()); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestClusterSuite.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestClusterSuite.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestClusterSuite.java new file mode 100644 index 0000000..b8c28ed --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestClusterSuite.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + + +import org.apache.ambari.metrics.core.timeline.aggregators.ITClusterAggregator; +import org.apache.ambari.metrics.core.timeline.aggregators.ITMetricAggregator; +import org.junit.Ignore; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +import static org.junit.runners.Suite.SuiteClasses; + +@Ignore +@RunWith(Suite.class) +@SuiteClasses({ITMetricAggregator.class, ITClusterAggregator.class, ITPhoenixHBaseAccessor.class}) +public class TestClusterSuite { + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestMetricHostAggregate.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestMetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestMetricHostAggregate.java new file mode 100644 index 0000000..ad27b74 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestMetricHostAggregate.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; +import org.junit.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class TestMetricHostAggregate { + + @Test + public void testCreateAggregate() throws Exception { + // given + MetricHostAggregate aggregate = createAggregate(3.0, 1.0, 2.0, 2); + + //then + assertThat(aggregate.getSum()).isEqualTo(3.0); + assertThat(aggregate.getMin()).isEqualTo(1.0); + assertThat(aggregate.getMax()).isEqualTo(2.0); + assertThat(aggregate.calculateAverage()).isEqualTo(3.0 / 2); + } + + @Test + public void testUpdateAggregates() throws Exception { + // given + MetricHostAggregate aggregate = createAggregate(3.0, 1.0, 2.0, 2); + + //when + aggregate.updateAggregates(createAggregate(8.0, 0.5, 7.5, 2)); + aggregate.updateAggregates(createAggregate(1.0, 1.0, 1.0, 1)); + + //then + assertThat(aggregate.getSum()).isEqualTo(12.0); + assertThat(aggregate.getMin()).isEqualTo(0.5); + assertThat(aggregate.getMax()).isEqualTo(7.5); + assertThat(aggregate.calculateAverage()).isEqualTo((3.0 + 8.0 + 1.0) / 5); + } + + static MetricHostAggregate createAggregate (Double sum, Double min, + Double max, Integer samplesCount) { + MetricHostAggregate aggregate = new MetricHostAggregate(); + aggregate.setSum(sum); + aggregate.setMax(max); + aggregate.setMin(min); + aggregate.setDeviation(0.0); + aggregate.setNumberOfSamples(samplesCount); + return aggregate; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java new file mode 100644 index 0000000..656034b --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java @@ -0,0 +1,608 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import org.apache.hadoop.metrics2.sink.timeline.Precision; +import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException; +import org.apache.ambari.metrics.core.timeline.query.Condition; +import org.apache.ambari.metrics.core.timeline.query.DefaultCondition; +import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL; +import org.apache.ambari.metrics.core.timeline.query.SplitByMetricNamesCondition; +import org.apache.ambari.metrics.core.timeline.query.TopNCondition; +import org.easymock.Capture; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import java.sql.Connection; +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.reset; +import static org.easymock.EasyMock.verify; +import org.easymock.EasyMock; + +public class TestPhoenixTransactSQL { + @Test + public void testConditionClause() throws Exception { + Condition condition = new DefaultCondition(Arrays.asList(new byte[8], new byte[8]), + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"), + "a1", "i1", 1407959718L, 1407959918L, null, null, false); + + String preparedClause = condition.getConditionClause().toString(); + String expectedClause = "(UUID IN (?, ?)) AND SERVER_TIME >= ? AND SERVER_TIME < ?"; + + Assert.assertNotNull(preparedClause); + Assert.assertEquals(expectedClause, preparedClause); + } + + @Test + public void testSplitByMetricNamesCondition() throws Exception { + Condition c = new DefaultCondition(Arrays.asList(new byte[8], new byte[8]), + Arrays.asList("cpu_user", "mem_free"), Collections.singletonList("h1"), + "a1", "i1", 1407959718L, 1407959918L, null, null, false); + + SplitByMetricNamesCondition condition = new SplitByMetricNamesCondition(c); + condition.setCurrentUuid(new byte[8]); + + String preparedClause = condition.getConditionClause().toString(); + String expectedClause = "UUID = ? AND SERVER_TIME >= ? AND SERVER_TIME < ?"; + + Assert.assertNotNull(preparedClause); + Assert.assertEquals(expectedClause, preparedClause); + } + + @Ignore + @Test + public void testLikeConditionClause() throws Exception { + Condition condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("cpu_user", "some=%.metric")), + Collections.singletonList("h1"), "a1", "i1", 1407959718L, 1407959918L, + null, null, false); + + String preparedClause = condition.getConditionClause().toString(); + String expectedClause = "(METRIC_NAME IN (?) OR METRIC_NAME LIKE ?) AND HOSTNAME = ? AND " + + "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < ?"; + + Assert.assertNotNull(preparedClause); + Assert.assertEquals(expectedClause, preparedClause); + + + condition = new DefaultCondition( + Collections.<String>emptyList(), Collections.singletonList("h1"), "a1", "i1", + 1407959718L, 1407959918L, null, null, false); + + preparedClause = condition.getConditionClause().toString(); + expectedClause = " HOSTNAME = ? AND " + + "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < ?"; + + Assert.assertNotNull(preparedClause); + Assert.assertEquals(expectedClause, preparedClause); + + + condition = new DefaultCondition( + null, Collections.singletonList("h1"), "a1", "i1", + 1407959718L, 1407959918L, null, null, false); + + preparedClause = condition.getConditionClause().toString(); + expectedClause = " HOSTNAME = ? AND " + + "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < ?"; + + Assert.assertNotNull(preparedClause); + Assert.assertEquals(expectedClause, preparedClause); + + + condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("some=%.metric")), Collections.singletonList("h1"), "a1", "i1", + 1407959718L, 1407959918L, null, null, false); + + preparedClause = condition.getConditionClause().toString(); + expectedClause = "(METRIC_NAME LIKE ?) AND HOSTNAME = ? AND " + + "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < ?"; + + Assert.assertNotNull(preparedClause); + Assert.assertEquals(expectedClause, preparedClause); + + + condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("some=%.metric1", "some=%.metric2", "some=%.metric3")), + Collections.singletonList("h1"), "a1", "i1", + 1407959718L, 1407959918L, null, null, false); + + preparedClause = condition.getConditionClause().toString(); + expectedClause = "(METRIC_NAME LIKE ? OR METRIC_NAME LIKE ? OR METRIC_NAME LIKE ?) AND HOSTNAME = ? AND " + + "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < ?"; + + Assert.assertNotNull(preparedClause); + Assert.assertEquals(expectedClause, preparedClause); + } + + @Test + public void testPrepareGetAggregatePrecisionMINUTES() throws SQLException { + Condition condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"), + "a1", "i1", 1407959718L, 1407959918L, Precision.MINUTES, null, false); + Connection connection = createNiceMock(Connection.class); + PreparedStatement preparedStatement = createNiceMock(PreparedStatement.class); + Capture<String> stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition); + String stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE")); + verify(connection, preparedStatement); + } + + @Test + public void testPrepareGetAggregateNoPrecision() throws SQLException { + Long endTime = 1407959918L; + Long startTime = 1407959718L; + //SECONDS precision + Condition condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"), + "a1", "i1", startTime, endTime, null, null, false); + Connection connection = createNiceMock(Connection.class); + PreparedStatement preparedStatement = createNiceMock(PreparedStatement.class); + Capture<String> stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition); + String stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE")); + Assert.assertEquals(Precision.SECONDS, condition.getPrecision()); + verify(connection, preparedStatement); + + // MINUTES precision + startTime = endTime-PhoenixTransactSQL.DAY/1000; + condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"), + "a1", "i1", startTime, endTime, null, null, false); + connection = createNiceMock(Connection.class); + preparedStatement = createNiceMock(PreparedStatement.class); + stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition); + stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE")); + Assert.assertEquals(Precision.MINUTES, condition.getPrecision()); + verify(connection, preparedStatement); + + // HOURS precision + startTime = endTime-PhoenixTransactSQL.DAY*30/1000; + condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"), + "a1", "i1", startTime, endTime, null, null, false); + connection = createNiceMock(Connection.class); + preparedStatement = createNiceMock(PreparedStatement.class); + stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition); + stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE_HOURLY")); + Assert.assertEquals(Precision.HOURS, condition.getPrecision()); + verify(connection, preparedStatement); + + // DAYS precision + startTime = endTime-PhoenixTransactSQL.DAY*30*2/1000; + condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"), + "a1", "i1", startTime, endTime, null, null, false); + connection = createNiceMock(Connection.class); + preparedStatement = createNiceMock(PreparedStatement.class); + stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition); + stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE_DAILY")); + Assert.assertEquals(Precision.DAYS, condition.getPrecision()); + verify(connection, preparedStatement); + } + + @Test + public void testPrepareGetAggregatePrecisionHours() throws SQLException { + Condition condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"), + "a1", "i1", 1407959718L, 1407959918L, Precision.HOURS, null, false); + Connection connection = createNiceMock(Connection.class); + PreparedStatement preparedStatement = createNiceMock(PreparedStatement.class); + Capture<String> stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition); + String stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE_HOURLY")); + verify(connection, preparedStatement); + } + + @Test + public void testPrepareGetMetricsPrecisionMinutes() throws SQLException { + Condition condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"), + "a1", "i1", 1407959718L, 1407959918L, Precision.MINUTES, null, false); + Connection connection = createNiceMock(Connection.class); + PreparedStatement preparedStatement = createNiceMock(PreparedStatement.class); + Capture<String> stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition); + String stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_MINUTE")); + verify(connection, preparedStatement); + } + + @Test + public void testPrepareGetMetricsNoPrecision() throws SQLException { + Long endTime = 1407959918L; + Long startTime = endTime - 200; + // SECONDS precision + Condition condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"), + "a1", "i1", startTime, endTime, null, null, false); + Connection connection = createNiceMock(Connection.class); + PreparedStatement preparedStatement = createNiceMock(PreparedStatement.class); + Capture<String> stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition); + String stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_RECORD")); + Assert.assertEquals(Precision.SECONDS, condition.getPrecision()); + verify(connection, preparedStatement); + reset(connection, preparedStatement); + + // SECONDS precision + startTime = endTime-PhoenixTransactSQL.HOUR*2/1000; + condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"), + "a1", "i1", startTime, endTime, null, null, false); + connection = createNiceMock(Connection.class); + preparedStatement = createNiceMock(PreparedStatement.class); + stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition); + stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_RECORD")); + Assert.assertEquals(Precision.SECONDS, condition.getPrecision()); + verify(connection, preparedStatement); + + // MINUTES precision + startTime = endTime-PhoenixTransactSQL.DAY/1000; + condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"), + "a1", "i1", startTime, endTime, null, null, false); + connection = createNiceMock(Connection.class); + preparedStatement = createNiceMock(PreparedStatement.class); + stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition); + stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_MINUTE")); + Assert.assertEquals(Precision.MINUTES, condition.getPrecision()); + verify(connection, preparedStatement); + + // HOURS precision + startTime = endTime-PhoenixTransactSQL.DAY*30/1000; + condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"), + "a1", "i1", startTime, endTime, null, null, false); + connection = createNiceMock(Connection.class); + preparedStatement = createNiceMock(PreparedStatement.class); + stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition); + stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_HOURLY")); + Assert.assertEquals(Precision.HOURS, condition.getPrecision()); + verify(connection, preparedStatement); + + // DAYS precision + startTime = endTime-PhoenixTransactSQL.DAY*30*2/1000; + condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"), + "a1", "i1", startTime, endTime, null, null, false); + connection = createNiceMock(Connection.class); + preparedStatement = createNiceMock(PreparedStatement.class); + stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition); + stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_DAILY")); + Assert.assertEquals(Precision.DAYS, condition.getPrecision()); + verify(connection, preparedStatement); + + } + + @Test + public void testPrepareGetLatestMetricSqlStmtMultipleHostNames() throws SQLException { + Condition condition = new DefaultCondition(Arrays.asList(new byte[16], new byte[16], new byte[16], new byte[16]), + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Arrays.asList("h1", "h2"), + "a1", "i1", null, null, null, null, false); + Connection connection = createNiceMock(Connection.class); + PreparedStatement preparedStatement = createNiceMock(PreparedStatement.class); + ParameterMetaData parameterMetaData = createNiceMock(ParameterMetaData.class); + Capture<String> stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + expect(preparedStatement.getParameterMetaData()) + .andReturn(parameterMetaData).once(); + // 6 = 1 instance_id + 1 appd_id + 2 hostnames + 2 metric names + expect(parameterMetaData.getParameterCount()) + .andReturn(4).once(); + + replay(connection, preparedStatement, parameterMetaData); + PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(connection, condition); + String stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_RECORD")); + Assert.assertTrue(stmt.contains("JOIN")); + verify(connection, preparedStatement, parameterMetaData); + } + + @Test + public void testPrepareGetLatestMetricSqlStmtSortMergeJoinAlgorithm() + throws SQLException { + Condition condition = new DefaultCondition(Arrays.asList(new byte[16], new byte[16]), + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Arrays.asList("h1"), + "a1", "i1", null, null, null, null, false); + Connection connection = createNiceMock(Connection.class); + PreparedStatement preparedStatement = createNiceMock(PreparedStatement.class); + ParameterMetaData parameterMetaData = createNiceMock(ParameterMetaData.class); + Capture<String> stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + expect(preparedStatement.getParameterMetaData()) + .andReturn(parameterMetaData).anyTimes(); + expect(parameterMetaData.getParameterCount()) + .andReturn(2).anyTimes(); + + replay(connection, preparedStatement, parameterMetaData); + PhoenixTransactSQL.setSortMergeJoinEnabled(true); + PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(connection, condition); + String stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("/*+ USE_SORT_MERGE_JOIN NO_CACHE */")); + } + + @Test + public void testPrepareGetMetricsPrecisionHours() throws SQLException { + Condition condition = new DefaultCondition( + new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"), + "a1", "i1", 1407959718L, 1407959918L, Precision.HOURS, null, false); + Connection connection = createNiceMock(Connection.class); + PreparedStatement preparedStatement = createNiceMock(PreparedStatement.class); + Capture<String> stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition); + String stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_HOURLY")); + verify(connection, preparedStatement); + } + + @Test + public void testResultSetLimitCheck() throws SQLException { + + List<String> metrics = new ArrayList<String>(); + List<String> hosts = new ArrayList<String>(); + int numMetrics = 0; + int numHosts = 0; + int limit = PhoenixHBaseAccessor.RESULTSET_LIMIT; + + // 22 Metrics x 2 Hosts x 1 hour with requested SECONDS precision = 15840 points. Should be OK! + numMetrics = 22; + numHosts = 2; + for (int i = 0; i < numMetrics; i++) { + metrics.add("TestMetric"+i); + } + for (int i = 0; i < numHosts; i++) { + hosts.add("TestHost"+i); + } + + Condition condition = new DefaultCondition( + metrics, hosts, + "a1", "i1", 1407950000L, 1407953600L, Precision.SECONDS, null, false); + Connection connection = createNiceMock(Connection.class); + PreparedStatement preparedStatement = createNiceMock(PreparedStatement.class); + Capture<String> stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition); + String stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_RECORD")); + verify(connection, preparedStatement); + + //Check without passing precision. Should be OK! + condition = new DefaultCondition( + metrics, hosts, + "a1", "i1", 1407950000L, 1407953600L, null, null, false); + connection = createNiceMock(Connection.class); + preparedStatement = createNiceMock(PreparedStatement.class); + stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition); + stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_RECORD")); + verify(connection, preparedStatement); + + //Check with more hosts and lesser metrics for 1 day data = 11520 points Should be OK! + metrics.clear(); + hosts.clear(); + numMetrics = 2; + numHosts = 20; + for (int i = 0; i < numMetrics; i++) { + metrics.add("TestMetric"+i); + } + for (int i = 0; i < numHosts; i++) { + hosts.add("TestHost"+i); + } + condition = new DefaultCondition( + metrics, hosts, + "a1", "i1", 1407867200L, 1407953600L, null, null, false); + connection = createNiceMock(Connection.class); + preparedStatement = createNiceMock(PreparedStatement.class); + stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition); + stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_MINUTE")); + verify(connection, preparedStatement); + + //Check with 20 metrics, NO hosts and 1 day data = 5760 points. Should be OK! + metrics.clear(); + hosts.clear(); + numMetrics = 20; + for (int i = 0; i < numMetrics; i++) { + metrics.add("TestMetric"+i); + } + condition = new DefaultCondition( + metrics, hosts, + "a1", "i1", 1407867200L, 1407953600L, null, null, false); + connection = createNiceMock(Connection.class); + preparedStatement = createNiceMock(PreparedStatement.class); + stmtCapture = new Capture<String>(); + expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), EasyMock.capture(stmtCapture)))) + .andReturn(preparedStatement); + + replay(connection, preparedStatement); + PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition); + stmt = stmtCapture.getValue(); + Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE_MINUTE")); + verify(connection, preparedStatement); + + //Check with 5 hosts and 10 metrics for 1 hour data = 18000 points. Should throw out Exception! + metrics.clear(); + hosts.clear(); + numMetrics = 10; + numHosts = 5; + for (int i = 0; i < numMetrics; i++) { + metrics.add("TestMetric"+i); + } + for (int i = 0; i < numHosts; i++) { + hosts.add("TestHost"+i); + } + condition = new DefaultCondition( + metrics, hosts, + "a1", "i1", 1407950000L, 1407953600L, null, null, false); + boolean exceptionThrown = false; + boolean requestedSizeFoundInMessage = false; + + try { + PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition); + } catch (PrecisionLimitExceededException pe) { + exceptionThrown = true; + String message = pe.getMessage(); + if (message !=null && message.contains("18000")) { + requestedSizeFoundInMessage = true; + } + } + Assert.assertTrue(exceptionThrown); + Assert.assertTrue(requestedSizeFoundInMessage); + } + + @Test + public void testTopNHostsConditionClause() throws Exception { + List<String> hosts = Arrays.asList("h1", "h2"); + List<byte[]> uuids = Arrays.asList(new byte[16], new byte[16]); + + Condition condition = new TopNCondition(uuids, new ArrayList<>(Collections.singletonList("cpu_user")), hosts, + "a1", "i1", 1407959718L, 1407959918L, null, null, false, 2, null, false); + + String conditionClause = condition.getConditionClause().toString(); + String expectedClause = " UUID IN (" + + "SELECT UUID FROM METRIC_RECORD WHERE " + + "(UUID LIKE ? OR UUID LIKE ?) AND " + + "SERVER_TIME >= ? AND SERVER_TIME < ? " + + "GROUP BY UUID ORDER BY MAX(METRIC_MAX) DESC LIMIT 2) AND SERVER_TIME >= ? AND SERVER_TIME < ?"; + + Assert.assertEquals(expectedClause, conditionClause); + } + + @Test + public void testTopNMetricsConditionClause() throws Exception { + List<String> metricNames = new ArrayList<>(Arrays.asList("m1", "m2", "m3")); + List<byte[]> uuids = Arrays.asList(new byte[16], new byte[16], new byte[16]); + + Condition condition = new TopNCondition(uuids, metricNames, Collections.singletonList("h1"), + "a1", "i1", 1407959718L, 1407959918L, null, null, false, 2, null, false); + + String conditionClause = condition.getConditionClause().toString(); + String expectedClause = " UUID IN (" + + "SELECT UUID FROM METRIC_RECORD WHERE " + + "(UUID LIKE ? OR UUID LIKE ? OR UUID LIKE ?) AND " + + "SERVER_TIME >= ? AND SERVER_TIME < ? " + + "GROUP BY UUID ORDER BY MAX(METRIC_MAX) DESC LIMIT 2) AND SERVER_TIME >= ? AND SERVER_TIME < ?"; + + Assert.assertEquals(expectedClause, conditionClause); + } + + @Test + public void testTopNMetricsIllegalConditionClause() throws Exception { + List<String> metricNames = new ArrayList<>(Arrays.asList("m1", "m2")); + + List<String> hosts = Arrays.asList("h1", "h2"); + List<byte[]> uuids = Arrays.asList(new byte[16], new byte[16], new byte[16], new byte[16]); + + Condition condition = new TopNCondition(uuids, metricNames, hosts, + "a1", "i1", 1407959718L, 1407959918L, null, null, false, 2, null, false); + + Assert.assertEquals(condition.getConditionClause(), null); + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestTimelineMetricStore.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestTimelineMetricStore.java new file mode 100644 index 0000000..99e3e25 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestTimelineMetricStore.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +import org.apache.hadoop.metrics2.sink.timeline.AggregationResult; +import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric; +import org.apache.hadoop.metrics2.sink.timeline.Precision; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.hadoop.metrics2.sink.timeline.TopNConfig; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; + +public class TestTimelineMetricStore implements TimelineMetricStore { + @Override + public TimelineMetrics getTimelineMetrics(List<String> metricNames, + List<String> hostnames, String applicationId, String instanceId, Long startTime, + Long endTime, Precision precision, Integer limit, boolean groupedByHost, + TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, + IOException { + TimelineMetrics timelineMetrics = new TimelineMetrics(); + List<TimelineMetric> metricList = new ArrayList<TimelineMetric>(); + timelineMetrics.setMetrics(metricList); + TimelineMetric metric1 = new TimelineMetric(); + TimelineMetric metric2 = new TimelineMetric(); + metricList.add(metric1); + metricList.add(metric2); + metric1.setMetricName("cpu_user"); + metric1.setAppId("1"); + metric1.setInstanceId(null); + metric1.setHostName("c6401"); + metric1.setStartTime(1407949812L); + metric1.setMetricValues(new TreeMap<Long, Double>() {{ + put(1407949812L, 1.0d); + put(1407949912L, 1.8d); + put(1407950002L, 0.7d); + }}); + + metric2.setMetricName("mem_free"); + metric2.setAppId("2"); + metric2.setInstanceId("3"); + metric2.setHostName("c6401"); + metric2.setStartTime(1407949812L); + metric2.setMetricValues(new TreeMap<Long, Double>() {{ + put(1407949812L, 2.5d); + put(1407949912L, 3.0d); + put(1407950002L, 0.9d); + }}); + + return timelineMetrics; + } + + @Override + public TimelinePutResponse putMetrics(TimelineMetrics metrics) + throws SQLException, IOException { + + return new TimelinePutResponse(); + } + + @Override + public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics) + throws SQLException, IOException { + return new TimelinePutResponse(); + } + + @Override + public Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String appId, String metricPattern, + boolean includeBlacklistedMetrics) throws SQLException, IOException { + return null; + } + + @Override + public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException { + return null; + } + + @Override + public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException { + return Collections.emptyMap(); + } + + @Override + public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException { + return Collections.emptyMap(); + } + + @Override + public List<String> getLiveInstances() { + return Collections.emptyList(); + } + + @Override + public byte[] getUuid(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException { + return null; + } + +} + +
