http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java new file mode 100644 index 0000000..8517105 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.ambari.metrics.core.timeline.AbstractMiniHBaseClusterTest; +import org.apache.ambari.metrics.core.timeline.MetricTestHelper; +import org.apache.ambari.metrics.core.timeline.query.Condition; +import org.apache.ambari.metrics.core.timeline.query.DefaultCondition; +import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL; +import org.junit.Test; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; +import static junit.framework.Assert.fail; +import static org.apache.ambari.metrics.core.timeline.MetricTestHelper.createEmptyTimelineMetric; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME; +import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME; +import static org.assertj.core.api.Assertions.assertThat; + +public class ITMetricAggregator extends AbstractMiniHBaseClusterTest { + + @Test + public void testShouldInsertMetrics() throws Exception { + // GIVEN + + // WHEN + long startTime = System.currentTimeMillis(); + TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local"); + hdb.insertMetricRecords(metricsSent, true); + + Condition queryCondition = new DefaultCondition(null, + Collections.singletonList("local"), null, null, startTime, + startTime + (15 * 60 * 1000), null, null, false); + TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition, null); + + // THEN + assertThat(recordRead.getMetrics()).hasSize(2) + .extracting("metricName") + .containsOnly("mem_free", "disk_free"); + + assertThat(metricsSent.getMetrics()) + .usingElementComparator(TIME_IGNORING_COMPARATOR) + .containsAll(recordRead.getMetrics()); + } + + private Configuration getConfigurationForTest(boolean useGroupByAggregators) { + Configuration configuration = new Configuration(); + configuration.set("timeline.metrics.service.use.groupBy.aggregators", String.valueOf(useGroupByAggregators)); + return configuration; + } + + @Test + public void testShouldAggregateMinuteProperly() throws Exception { + // GIVEN + TimelineMetricAggregator aggregatorMinute = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, + getConfigurationForTest(false), metadataManager, null); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"), true); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), true); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), true); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), true); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), true); + + // WHEN + long endTime = startTime + 1000 * 60 * 4; + boolean success = aggregatorMinute.doWork(startTime, endTime + 1); + + //THEN + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime + 2, null, null, true); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, + METRICS_AGGREGATE_MINUTE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + MetricHostAggregate expectedAggregate = + MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); + + int count = 0; + while (rs.next()) { + TimelineMetric currentMetric = + readHelper.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + readHelper.getMetricHostAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(20, currentHostAggregate.getNumberOfSamples()); + assertEquals(15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.calculateAverage()); + count++; + } else if ("mem_free".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(20, currentHostAggregate.getNumberOfSamples()); + assertEquals(15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.calculateAverage()); + count++; + } else { + fail("Unexpected entry"); + } + } + assertEquals("Two aggregated entries expected", 2, count); + } + + @Test + public void testShouldAggregateHourProperly() throws Exception { + // GIVEN + TimelineMetricAggregator aggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, + getConfigurationForTest(false), metadataManager, null); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false); + long startTime = System.currentTimeMillis(); + + MetricHostAggregate expectedAggregate = + MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); + Map<TimelineMetric, MetricHostAggregate> + aggMap = new HashMap<TimelineMetric, + MetricHostAggregate>(); + + int min_5 = 5 * 60 * 1000; + long ctime = startTime - min_5; + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + + hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME); + + //WHEN + long endTime = ctime + min_5; + boolean success = aggregator.doWork(startTime, endTime); + assertTrue(success); + + //THEN + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime + 1, null, null, true); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, METRICS_AGGREGATE_HOURLY_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + while (rs.next()) { + TimelineMetric currentMetric = + readHelper.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + readHelper.getMetricHostAggregateFromResultSet(rs); + + if ("disk_used".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples()); + assertEquals(12 * 15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.calculateAverage()); + } + } + } + + @Test + public void testMetricAggregateDaily() throws Exception { + // GIVEN + TimelineMetricAggregator aggregator = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb, + getConfigurationForTest(false), metadataManager, null); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false); + long startTime = System.currentTimeMillis(); + + MetricHostAggregate expectedAggregate = + MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); + Map<TimelineMetric, MetricHostAggregate> + aggMap = new HashMap<TimelineMetric, MetricHostAggregate>(); + + int min_5 = 5 * 60 * 1000; + long ctime = startTime - min_5; + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate); + + hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_HOURLY_TABLE_NAME); + + //WHEN + long endTime = ctime + min_5; + boolean success = aggregator.doWork(startTime, endTime); + assertTrue(success); + + //THEN + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime + 1, null, null, true); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, METRICS_AGGREGATE_DAILY_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + + while (rs.next()) { + TimelineMetric currentMetric = + readHelper.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + readHelper.getMetricHostAggregateFromResultSet(rs); + + if ("disk_used".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples()); + assertEquals(12 * 15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.calculateAverage()); + } + } + } + + @Test + public void testAggregationUsingGroupByQuery() throws Exception { + // GIVEN + TimelineMetricAggregator aggregatorMinute = + TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, + getConfigurationForTest(true), metadataManager, null); + TimelineMetricReadHelper readHelper = new TimelineMetricReadHelper(metadataManager, false); + + long startTime = System.currentTimeMillis(); + long ctime = startTime; + long minute = 60 * 1000; + hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"), true); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), true); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), true); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), true); + hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), true); + + long endTime = startTime + 1000 * 60 * 4; + boolean success = aggregatorMinute.doWork(startTime - 1, endTime); + assertTrue(success); + + Condition condition = new DefaultCondition(null, null, null, null, startTime, + endTime + 1, null, null, true); + condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, METRICS_AGGREGATE_MINUTE_TABLE_NAME)); + + PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition); + ResultSet rs = pstmt.executeQuery(); + MetricHostAggregate expectedAggregate = + MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0); + + int count = 0; + while (rs.next()) { + TimelineMetric currentMetric = + readHelper.getTimelineMetricKeyFromResultSet(rs); + MetricHostAggregate currentHostAggregate = + readHelper.getMetricHostAggregateFromResultSet(rs); + + if ("disk_free".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(20, currentHostAggregate.getNumberOfSamples()); + assertEquals(15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.calculateAverage()); + count++; + } else if ("mem_free".equals(currentMetric.getMetricName())) { + assertEquals(2.0, currentHostAggregate.getMax()); + assertEquals(0.0, currentHostAggregate.getMin()); + assertEquals(20, currentHostAggregate.getNumberOfSamples()); + assertEquals(15.0, currentHostAggregate.getSum()); + assertEquals(15.0 / 20, currentHostAggregate.calculateAverage()); + count++; + } else { + fail("Unexpected entry"); + } + } + assertEquals("Two aggregated entries expected", 2, count); + } + + private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR = + new Comparator<TimelineMetric>() { + @Override + public int compare(TimelineMetric o1, TimelineMetric o2) { + return o1.equalsExceptTime(o2) ? 0 : 1; + } + }; + + private TimelineMetrics prepareTimelineMetrics(long startTime, String host) { + TimelineMetrics metrics = new TimelineMetrics(); + metrics.setMetrics(Arrays.asList( + createMetric(startTime, "disk_free", host), + createMetric(startTime, "mem_free", host))); + + return metrics; + } + + private TimelineMetric createMetric(long startTime, String metricName, String host) { + TimelineMetric m = new TimelineMetric(); + m.setAppId("host"); + m.setHostName(host); + m.setMetricName(metricName); + m.setStartTime(startTime); + TreeMap<Long, Double> vals = new TreeMap<Long, Double>(); + vals.put(startTime + 15000l, 0.0); + vals.put(startTime + 30000l, 0.0); + vals.put(startTime + 45000l, 1.0); + vals.put(startTime + 60000l, 2.0); + + m.setMetricValues(vals); + + return m; + } + +}
http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java new file mode 100644 index 0000000..c73ac36 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java @@ -0,0 +1,404 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + + +import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedAggregateTimeMillis; +import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis; +import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices; +import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; + +import java.sql.ResultSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Test; + + import junit.framework.Assert; + +public class TimelineMetricClusterAggregatorSecondTest { + + @Test + public void testClusterSecondAggregatorWithInterpolation() { + + long aggregatorInterval = 120000l; + long sliceInterval = 30000l; + long metricInterval = 10000l; + + TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class); + expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new byte[16]).once(); + replay(metricMetadataManagerMock); + + long roundedEndTime = getRoundedAggregateTimeMillis(aggregatorInterval); + long roundedStartTime = roundedEndTime - aggregatorInterval; + List<Long[]> timeSlices = getTimeSlices(roundedStartTime , + roundedEndTime, sliceInterval); + + TreeMap<Long, Double> metricValues = new TreeMap<>(); + + long startTime = roundedEndTime - aggregatorInterval; + + for (int i=1; startTime < roundedEndTime; i++) { + startTime += metricInterval; + if (i%6 == 1 || i%6 == 2) { + metricValues.put(startTime, (double)i); + } + } + + TimelineMetric counterMetric = new TimelineMetric(); + counterMetric.setMetricName("TestMetric"); + counterMetric.setHostName("TestHost"); + counterMetric.setAppId("TestAppId"); + counterMetric.setMetricValues(metricValues); + counterMetric.setType("COUNTER"); + + Map<TimelineClusterMetric, Double> timelineClusterMetricMap = sliceFromTimelineMetric(counterMetric, timeSlices, true); + + TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(counterMetric.getMetricName(), counterMetric.getAppId(), + counterMetric.getInstanceId(), 0l); + + timelineClusterMetric.setTimestamp(roundedStartTime + 2*sliceInterval); + Assert.assertTrue(timelineClusterMetricMap.containsKey(timelineClusterMetric)); + Assert.assertEquals(timelineClusterMetricMap.get(timelineClusterMetric), 6.0); + + timelineClusterMetric.setTimestamp(roundedStartTime + 4*sliceInterval); + Assert.assertTrue(timelineClusterMetricMap.containsKey(timelineClusterMetric)); + Assert.assertEquals(timelineClusterMetricMap.get(timelineClusterMetric), 12.0); + + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName("TestMetric"); + metric.setHostName("TestHost"); + metric.setAppId("TestAppId"); + metric.setMetricValues(metricValues); + + timelineClusterMetricMap = sliceFromTimelineMetric(metric, timeSlices, true); + + timelineClusterMetric = new TimelineClusterMetric(metric.getMetricName(), metric.getAppId(), + metric.getInstanceId(), 0l); + + timelineClusterMetric.setTimestamp(roundedStartTime + 2*sliceInterval); + Assert.assertTrue(timelineClusterMetricMap.containsKey(timelineClusterMetric)); + Assert.assertEquals(timelineClusterMetricMap.get(timelineClusterMetric), 4.5); + + timelineClusterMetric.setTimestamp(roundedStartTime + 4*sliceInterval); + Assert.assertTrue(timelineClusterMetricMap.containsKey(timelineClusterMetric)); + Assert.assertEquals(timelineClusterMetricMap.get(timelineClusterMetric), 7.5); + } + + @Test + public void testShouldAggregateProperly() { + + long aggregatorInterval = 120000l; + long sliceInterval = 30000l; + + Configuration configuration = new Configuration(); + TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class); + + expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())) + .andReturn(null).anyTimes(); + replay(metricMetadataManagerMock); + + TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond( + METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null, + aggregatorInterval, 2, "false", "", "", aggregatorInterval, sliceInterval, null + ); + + long startTime = getRoundedCheckPointTimeMillis(System.currentTimeMillis(),aggregatorInterval); + List<Long[]> timeslices = getTimeSlices(startTime, startTime + aggregatorInterval, sliceInterval); + + Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics = new HashMap<>(); + long seconds = 1000; + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName("m1"); + timelineMetric.setHostName("h1"); + timelineMetric.setAppId("a1"); + timelineMetric.setType("GUAGE"); + timelineMetric.setStartTime(startTime); + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (1) (2) (3) (4) (5) (6) + + */ + // Case 1 : Points present in all the required timeslices. + // Life is good! Ignore (5) and (6). + + TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>(); + metricValues.put(startTime + 15*seconds, 1.0); + metricValues.put(startTime + 45*seconds, 2.0); + metricValues.put(startTime + 75*seconds, 3.0); + metricValues.put(startTime + 105*seconds, 4.0); + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric(timelineMetric.getMetricName(), + timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 30*seconds); + + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 1.0); + + timelineClusterMetric.setTimestamp(startTime + 4*30*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(),4.0); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (1) (3) (4) (5) (6) + + */ + // Case 2 : Some "middle" point missing in the required timeslices. + // Interpolate the middle point. Ignore (5) and (6). + metricValues.put(startTime + 15*seconds, 1.0); + metricValues.put(startTime + 75*seconds, 3.0); + metricValues.put(startTime + 105*seconds, 4.0); + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 60*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 2.0); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (1) (2) (3) (5) (6) + + */ + // Case 3 : "end" point missing in the required timeslices. + // Use all points to get missing point if COUNTER. Else use just (3). Ignore (6). + metricValues.put(startTime + 15*seconds, 1.0); + metricValues.put(startTime + 45*seconds, 2.0); + metricValues.put(startTime + 75*seconds, 3.0); + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + timelineMetric.setType("GUAGE"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 120*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 3.0); + + aggregateClusterMetrics.clear(); + + timelineMetric.setType("COUNTER"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 120*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 4.5); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (2) (3) (4) (5) (6) + + */ + // Case 4 : "start" point missing in the required timeslices. + // Interpolate with only (2) to get missing point if GUAGE metric. Else use all points for COUNTER. + + metricValues.put(startTime + 45*seconds, 2.0); + metricValues.put(startTime + 75*seconds, 3.0); + metricValues.put(startTime + 105*seconds, 4.0); + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + timelineMetric.setType("GUAGE"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 30*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 2.0); + + aggregateClusterMetrics.clear(); + + timelineMetric.setType("COUNTER"); + int liveHosts = secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(liveHosts, 1); + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 30*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 1.5); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + /* + + 0 +30s +60s +90s +120s +150s +180s + | | | | | | | + (5) (6) + + */ + // Case 5 : Well, we have nothing in the 2 min window. + // Use (5) to paint the 2 min window as (5). + + metricValues.put(startTime + 135*seconds, 5.0); + metricValues.put(startTime + 165*seconds, 6.0); + + timelineMetric.setMetricValues(metricValues); + timelineMetric.setType("GUAGE"); + secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 30*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 5.0); + + aggregateClusterMetrics.clear(); + + timelineMetric.setType("COUNTER"); + liveHosts = secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, timelineMetric, timeslices); + + Assert.assertEquals(liveHosts, 1); + Assert.assertEquals(aggregateClusterMetrics.size(), 4); + timelineClusterMetric.setTimestamp(startTime + 60*seconds); + Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric)); + Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(), 2.5); + + metricValues.clear(); + aggregateClusterMetrics.clear(); + + } + + @Test + public void testLiveHostCounterMetrics() throws Exception { + long aggregatorInterval = 120000; + long sliceInterval = 30000; + + Configuration configuration = new Configuration(); + TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class); + + expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes(); + + /* + m1-h1-a1 + m2-h1-a1 + m2-h1-a2 + m2-h2-a1 + m2-h2-a2 + m2-h3-a2 + + So live_hosts : a1 = 2 + live_hosts : a2 = 3 + */ + + TimelineMetric metric1 = new TimelineMetric("m1", "h1", "a1", null); + TimelineMetric metric2 = new TimelineMetric("m2", "h1", "a1", null); + TimelineMetric metric3 = new TimelineMetric("m2", "h1", "a2", null); + TimelineMetric metric4 = new TimelineMetric("m2", "h2", "a1", null); + TimelineMetric metric5 = new TimelineMetric("m2", "h2", "a2", null); + TimelineMetric metric6 = new TimelineMetric("m2", "h3", "a2", null); + + expect(metricMetadataManagerMock.getMetricFromUuid((byte[]) anyObject())). + andReturn(metric1).andReturn(metric2).andReturn(metric3). + andReturn(metric4).andReturn(metric5).andReturn(metric6); + replay(metricMetadataManagerMock); + + TimelineMetricClusterAggregatorSecond secondAggregator = new TimelineMetricClusterAggregatorSecond( + METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null, + aggregatorInterval, 2, "false", "", "", aggregatorInterval, + sliceInterval, null); + + long now = System.currentTimeMillis(); + long startTime = now - 120000; + long seconds = 1000; + List<Long[]> slices = getTimeSlices(startTime, now, sliceInterval); + ResultSet rs = createNiceMock(ResultSet.class); + + TreeMap<Long, Double> metricValues = new TreeMap<>(); + metricValues.put(startTime + 15 * seconds, 1.0); + metricValues.put(startTime + 45 * seconds, 2.0); + metricValues.put(startTime + 75 * seconds, 3.0); + metricValues.put(startTime + 105 * seconds, 4.0); + + expect(rs.next()).andReturn(true).times(6); + expect(rs.next()).andReturn(false); + + expect(rs.getLong("SERVER_TIME")).andReturn(now - 150000).times(6); + expect(rs.getLong("START_TIME")).andReturn(now - 150000).times(6); + + ObjectMapper mapper = new ObjectMapper(); + expect(rs.getString("METRICS")).andReturn(mapper.writeValueAsString(metricValues)).times(6); + + replay(rs); + + Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = secondAggregator.aggregateMetricsFromResultSet(rs, slices); + + Assert.assertNotNull(aggregates); + + MetricClusterAggregate a1 = null, a2 = null; + + for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> m : aggregates.entrySet()) { + if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a1")) { + a1 = m.getValue(); + } + if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a2")) { + a2 = m.getValue(); + } + } + + Assert.assertNotNull(a1); + Assert.assertNotNull(a2); + Assert.assertEquals(2d, a1.getSum()); + Assert.assertEquals(3d, a2.getSum()); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java new file mode 100644 index 0000000..34d470c --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.aggregators; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate; +import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration; +import org.apache.ambari.metrics.core.timeline.TimelineMetricsIgniteCache; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey; +import org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES; +import static org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices; +import static org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND; +import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replayAll; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(TimelineMetricConfiguration.class) + +@PowerMockIgnore("javax.management.*") +public class TimelineMetricClusterAggregatorSecondWithCacheSourceTest { + + private static TimelineMetricsIgniteCache timelineMetricsIgniteCache; + @BeforeClass + public static void setupConf() throws Exception { + TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new + Configuration(), new Configuration()); + mockStatic(TimelineMetricConfiguration.class); + expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes(); + conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, "localhost"); + replayAll(); + + timelineMetricsIgniteCache = new TimelineMetricsIgniteCache(); + } + + @Test + public void testLiveHostCounterMetrics() throws Exception { + long aggregatorInterval = 120000; + long sliceInterval = 30000; + + Configuration configuration = new Configuration(); + + TimelineMetricMetadataManager metricMetadataManagerMock = createNiceMock(TimelineMetricMetadataManager.class); + expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey) anyObject())).andReturn(null).anyTimes(); + replay(metricMetadataManagerMock); + + TimelineMetricClusterAggregatorSecondWithCacheSource secondAggregator = new TimelineMetricClusterAggregatorSecondWithCacheSource( + METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, null, + aggregatorInterval, 2, "false", "", "", aggregatorInterval, + sliceInterval, null, timelineMetricsIgniteCache); + + long now = System.currentTimeMillis(); + long startTime = now - 120000; + long seconds = 1000; + + Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = new HashMap<>(); + metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime + 15 * seconds), + new MetricClusterAggregate(1.0, 2, 1.0, 1.0, 1.0)); + metricsFromCache.put(new TimelineClusterMetric("m2", "a2", "i1",startTime + 18 * seconds), + new MetricClusterAggregate(1.0, 5, 1.0, 1.0, 1.0)); + + List<Long[]> timeslices = getTimeSlices(startTime, startTime + 120*seconds, 30*seconds); + Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = secondAggregator.aggregateMetricsFromMetricClusterAggregates(metricsFromCache, timeslices); + + Assert.assertNotNull(aggregates); + + MetricClusterAggregate a1 = null, a2 = null; + + for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> m : aggregates.entrySet()) { + if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a1")) { + a1 = m.getValue(); + } + if (m.getKey().getMetricName().equals("live_hosts") && m.getKey().getAppId().equals("a2")) { + a2 = m.getValue(); + } + } + + Assert.assertNotNull(a1); + Assert.assertNotNull(a2); + Assert.assertEquals(2d, a1.getSum()); + Assert.assertEquals(5d, a2.getSum()); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java new file mode 100644 index 0000000..385a5a1 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.availability; + +import junit.framework.Assert; +import org.apache.ambari.metrics.core.timeline.AbstractMiniHBaseClusterTest; +import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; +import org.junit.Before; +import org.junit.Test; +import java.util.HashMap; +import java.util.Map; +import static org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController.DEFAULT_STATE_MODEL; +import static org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController.METRIC_AGGREGATORS; +import static org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController.CLUSTER_NAME; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; + +public class MetricCollectorHAControllerTest extends AbstractMiniHBaseClusterTest { + TimelineMetricConfiguration configuration; + + @Before + public void setup() throws Exception { + configuration = createNiceMock(TimelineMetricConfiguration.class); + + expect(configuration.getInstanceHostnameFromEnv()).andReturn("h1"); + expect(configuration.getInstancePort()).andReturn("12000"); + // jdbc:phoenix:localhost:52887:/hbase;test=true + String zkUrl = getUrl(); + String port = zkUrl.split(":")[3]; + String quorum = zkUrl.split(":")[2]; + + expect(configuration.getClusterZKClientPort()).andReturn(port); + expect(configuration.getClusterZKQuorum()).andReturn(quorum); + expect(configuration.getZkConnectionUrl(port, quorum)).andReturn(quorum + ":" + port); + + replay(configuration); + } + + @Test(timeout = 180000) + public void testHAControllerDistributedAggregation() throws Exception { + MetricCollectorHAController haController = new MetricCollectorHAController(configuration); + haController.initializeHAController(); + // Wait for task assignment + Thread.sleep(10000); + + Assert.assertTrue(haController.isInitialized()); + Assert.assertEquals(1, haController.getLiveInstanceHostNames().size()); + Assert.assertTrue(haController.getAggregationTaskRunner().performsClusterAggregation()); + Assert.assertTrue(haController.getAggregationTaskRunner().performsHostAggregation()); + + // Add new instance + InstanceConfig instanceConfig2 = new InstanceConfig("h2_12001"); + haController.admin.addInstance(CLUSTER_NAME, instanceConfig2); + HelixManager manager2 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, + instanceConfig2.getInstanceName(), + InstanceType.PARTICIPANT, haController.zkConnectUrl); + manager2.getStateMachineEngine().registerStateModelFactory(DEFAULT_STATE_MODEL, + new OnlineOfflineStateModelFactory(instanceConfig2.getInstanceName(), + new AggregationTaskRunner(instanceConfig2.getInstanceName(), "", CLUSTER_NAME))); + manager2.connect(); + haController.admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1); + + // Wait on re-assignment of partitions + Thread.sleep(10000); + Assert.assertEquals(2, haController.getLiveInstanceHostNames().size()); + + ExternalView view = haController.admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS); + + Map<String, String> partitionInstanceMap = new HashMap<>(); + + for (String partition : view.getPartitionSet()) { + Map<String, String> states = view.getStateMap(partition); + // (instance, state) pairs + for (Map.Entry<String, String> stateEntry : states.entrySet()) { + partitionInstanceMap.put(partition, stateEntry.getKey()); + Assert.assertEquals("ONLINE", stateEntry.getValue()); + } + } + // Re-assigned partitions + Assert.assertEquals(2, partitionInstanceMap.size()); + + haController.getAggregationTaskRunner().stop(); + haController.manager.disconnect(); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java new file mode 100644 index 0000000..94fbb30 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.discovery; + +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; + +import junit.framework.Assert; +import org.apache.ambari.metrics.core.timeline.AbstractMiniHBaseClusterTest; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration; +import org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; + +public class TestMetadataManager extends AbstractMiniHBaseClusterTest { + TimelineMetricMetadataManager metadataManager; + + @Before + public void insertDummyRecords() throws IOException, SQLException, URISyntaxException { + + final long now = System.currentTimeMillis(); + + TimelineMetrics timelineMetrics = new TimelineMetrics(); + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("dummy_metric1"); + metric1.setHostName("dummy_host1"); + metric1.setStartTime(now - 1000); + metric1.setAppId("dummy_app1"); + metric1.setType("Integer"); + metric1.setMetricValues(new TreeMap<Long, Double>() {{ + put(now - 100, 1.0); + put(now - 200, 2.0); + put(now - 300, 3.0); + }}); + timelineMetrics.getMetrics().add(metric1); + TimelineMetric metric2 = new TimelineMetric(); + metric2.setMetricName("dummy_metric2"); + metric2.setHostName("dummy_host2"); + metric2.setStartTime(now - 1000); + metric2.setAppId("dummy_app2"); + metric2.setType("Integer"); + metric2.setInstanceId("instance2"); + metric2.setMetricValues(new TreeMap<Long, Double>() {{ + put(now - 100, 1.0); + put(now - 200, 2.0); + put(now - 300, 3.0); + }}); + timelineMetrics.getMetrics().add(metric2); + + Configuration metricsConf = createNiceMock(Configuration.class); + expect(metricsConf.get("timeline.metrics.service.operation.mode")).andReturn("distributed").anyTimes(); + replay(metricsConf); + + // Initialize new manager + metadataManager = new TimelineMetricMetadataManager(metricsConf, hdb); + hdb.setMetadataInstance(metadataManager); + + hdb.insertMetricRecordsWithMetadata(metadataManager, timelineMetrics, true); + } + + @Test(timeout = 180000) + public void testSaveMetricsMetadata() throws Exception { + Map<TimelineMetricMetadataKey, TimelineMetricMetadata> cachedData = metadataManager.getMetadataCache(); + + Assert.assertNotNull(cachedData); + Assert.assertEquals(2, cachedData.size()); + TimelineMetricMetadataKey key1 = new TimelineMetricMetadataKey("dummy_metric1", "dummy_app1", null); + TimelineMetricMetadataKey key2 = new TimelineMetricMetadataKey("dummy_metric2", "dummy_app2", "instance2"); + TimelineMetricMetadata value1 = new TimelineMetricMetadata("dummy_metric1", + "dummy_app1", null, null, "Integer", 1L, true, true); + TimelineMetricMetadata value2 = new TimelineMetricMetadata("dummy_metric2", + "dummy_app2", "instance2", null, "Integer", 1L, true, true); + + Assert.assertEquals(value1, cachedData.get(key1)); + Assert.assertEquals(value2, cachedData.get(key2)); + + TimelineMetricMetadataSync syncRunnable = new TimelineMetricMetadataSync(metadataManager); + syncRunnable.run(); + + Map<TimelineMetricMetadataKey, TimelineMetricMetadata> savedData = + hdb.getTimelineMetricMetadata(); + + Assert.assertNotNull(savedData); + Assert.assertEquals(2, savedData.size()); + Assert.assertEquals(value1, savedData.get(key1)); + Assert.assertEquals(value2, savedData.get(key2)); + + Map<String, TimelineMetricHostMetadata> cachedHostData = metadataManager.getHostedAppsCache(); + Map<String, TimelineMetricHostMetadata> savedHostData = metadataManager.getHostedAppsFromStore(); + Assert.assertEquals(cachedData.size(), savedData.size()); + Assert.assertEquals("dummy_app1", cachedHostData.get("dummy_host1").getHostedApps().keySet().iterator().next()); + Assert.assertEquals("dummy_app2", cachedHostData.get("dummy_host2").getHostedApps().keySet().iterator().next()); + Assert.assertEquals("dummy_app1", savedHostData.get("dummy_host1").getHostedApps().keySet().iterator().next()); + Assert.assertEquals("dummy_app2", savedHostData.get("dummy_host2").getHostedApps().keySet().iterator().next()); + + Map<String, Set<String>> cachedHostInstanceData = metadataManager.getHostedInstanceCache(); + Map<String, Set<String>> savedHostInstanceData = metadataManager.getHostedInstancesFromStore(); + Assert.assertEquals(cachedHostInstanceData.size(), savedHostInstanceData.size()); + Assert.assertEquals("dummy_host2", cachedHostInstanceData.get("instance2").iterator().next()); + } + + @Test + public void testGenerateUuidFromMetric() throws SQLException { + + TimelineMetric timelineMetric = new TimelineMetric(); + timelineMetric.setMetricName("regionserver.Server.blockCacheExpressHitPercent"); + timelineMetric.setAppId("hbase"); + timelineMetric.setHostName("avijayan-ams-2.openstacklocal"); + timelineMetric.setInstanceId("test1"); + + byte[] uuid = metadataManager.getUuid(timelineMetric); + Assert.assertNotNull(uuid); + Assert.assertEquals(uuid.length, 20); + + byte[] uuidWithoutHost = metadataManager.getUuid(new TimelineClusterMetric(timelineMetric.getMetricName(), timelineMetric.getAppId(), timelineMetric.getInstanceId(), -1)); + Assert.assertNotNull(uuidWithoutHost); + Assert.assertEquals(uuidWithoutHost.length, 16); + + TimelineMetric metric2 = metadataManager.getMetricFromUuid(uuid); + Assert.assertEquals(metric2, timelineMetric); + TimelineMetric metric3 = metadataManager.getMetricFromUuid(uuidWithoutHost); + Assert.assertEquals(metric3.getMetricName(), timelineMetric.getMetricName()); + Assert.assertEquals(metric3.getAppId(), timelineMetric.getAppId()); + Assert.assertEquals(metric3.getInstanceId(), timelineMetric.getInstanceId()); + Assert.assertEquals(metric3.getHostName(), null); + + String metricName1 = metadataManager.getMetricNameFromUuid(uuid); + Assert.assertEquals(metricName1, "regionserver.Server.blockCacheExpressHitPercent"); + String metricName2 = metadataManager.getMetricNameFromUuid(uuidWithoutHost); + Assert.assertEquals(metricName2, "regionserver.Server.blockCacheExpressHitPercent"); + } + + @Test + public void testWildcardSanitization() throws IOException, SQLException, URISyntaxException { + // Initialize new manager + metadataManager = new TimelineMetricMetadataManager(new Configuration(), hdb); + final long now = System.currentTimeMillis(); + + TimelineMetrics timelineMetrics = new TimelineMetrics(); + + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("dummy_m1"); + metric1.setHostName("dummy_host1"); + metric1.setStartTime(now - 1000); + metric1.setAppId("dummy_app1"); + metric1.setType("Integer"); + metric1.setMetricValues(new TreeMap<Long, Double>() {{ + put(now - 100, 1.0); + put(now - 200, 2.0); + put(now - 300, 3.0); + }}); + timelineMetrics.getMetrics().add(metric1); + + TimelineMetric metric2 = new TimelineMetric(); + metric2.setMetricName("dummy_m2"); + metric2.setHostName("dummy_host2"); + metric2.setStartTime(now - 1000); + metric2.setAppId("dummy_app2"); + metric2.setType("Integer"); + metric2.setMetricValues(new TreeMap<Long, Double>() {{ + put(now - 100, 1.0); + put(now - 200, 2.0); + put(now - 300, 3.0); + }}); + timelineMetrics.getMetrics().add(metric2); + + TimelineMetric metric3 = new TimelineMetric(); + metric3.setMetricName("gummy_3"); + metric3.setHostName("dummy_3h"); + metric3.setStartTime(now - 1000); + metric3.setAppId("dummy_app3"); + metric3.setType("Integer"); + metric3.setMetricValues(new TreeMap<Long, Double>() {{ + put(now - 100, 1.0); + put(now - 200, 2.0); + put(now - 300, 3.0); + }}); + timelineMetrics.getMetrics().add(metric3); + + Configuration metricsConf = new Configuration(); + TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class); + expect(configuration.getMetricsConf()).andReturn(metricsConf).once(); + replay(configuration); + + hdb.insertMetricRecordsWithMetadata(metadataManager, timelineMetrics, true); + + List<byte[]> uuids = metadataManager.getUuids(Collections.singletonList("dummy_m%"), + Collections.singletonList("dummy_host2"), "dummy_app1", null); + Assert.assertTrue(uuids.size() == 2); + + uuids = metadataManager.getUuids(Collections.singletonList("dummy_m%"), + Collections.singletonList("dummy_host%"), "dummy_app2", null); + Assert.assertTrue(uuids.size() == 4); + + Collection<String> metrics = Arrays.asList("dummy_m%", "dummy_3", "dummy_m2"); + List<String> hosts = Arrays.asList("dummy_host%", "dummy_3h"); + uuids = metadataManager.getUuids(metrics, hosts, "dummy_app2", null); + Assert.assertTrue(uuids.size() == 9); + } + + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataSync.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataSync.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataSync.java new file mode 100644 index 0000000..80eb89e --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataSync.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.discovery; + +import junit.framework.Assert; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata; +import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor; +import org.junit.Test; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata.MetricType.GAUGE; +import static org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +public class TestMetadataSync { + @Test + public void testRefreshMetadataOnWrite() throws Exception { + Configuration configuration = createNiceMock(Configuration.class); + PhoenixHBaseAccessor hBaseAccessor = createNiceMock(PhoenixHBaseAccessor.class); + + final TimelineMetricMetadata testMetadata1 = new TimelineMetricMetadata( + "m1", "a1", null, "", GAUGE.name(), System.currentTimeMillis(), true, false); + final TimelineMetricMetadata testMetadata2 = new TimelineMetricMetadata( + "m2", "a2", null, "", GAUGE.name(), System.currentTimeMillis(), true, false); + + Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = + new HashMap<TimelineMetricMetadataKey, TimelineMetricMetadata>() {{ + put(new TimelineMetricMetadataKey("m1", "a1", null), testMetadata1); + put(new TimelineMetricMetadataKey("m2", "a2", null), testMetadata2); + }}; + + Map<String, TimelineMetricHostMetadata> hostedApps = new HashMap<String, TimelineMetricHostMetadata>() {{ + put("h1", new TimelineMetricHostMetadata(new HashSet<>(Arrays.asList("a1")))); + put("h2", new TimelineMetricHostMetadata((new HashSet<>(Arrays.asList("a1", "a2"))))); + }}; + + Map<String, Set<String>> hostedInstances = new HashMap<String, Set<String>>() {{ + put("i1", new HashSet<>(Arrays.asList("h1"))); + put("i2", new HashSet<>(Arrays.asList("h1", "h2"))); + }}; + + expect(configuration.get("timeline.metrics.service.operation.mode")).andReturn("distributed"); + expect(hBaseAccessor.getTimelineMetricMetadata()).andReturn(metadata); + expect(hBaseAccessor.getHostedAppsMetadata()).andReturn(hostedApps); + expect(hBaseAccessor.getInstanceHostsMetdata()).andReturn(hostedInstances); + + replay(configuration, hBaseAccessor); + + TimelineMetricMetadataManager metadataManager = new TimelineMetricMetadataManager(configuration, hBaseAccessor); + + metadataManager.metricMetadataSync = new TimelineMetricMetadataSync(metadataManager); + + metadataManager.metricMetadataSync.run(); + + verify(configuration, hBaseAccessor); + + metadata = metadataManager.getMetadataCache(); + Assert.assertEquals(2, metadata.size()); + Assert.assertTrue(metadata.containsKey(new TimelineMetricMetadataKey("m1", "a1", null))); + Assert.assertTrue(metadata.containsKey(new TimelineMetricMetadataKey("m2", "a2", null))); + + hostedApps = metadataManager.getHostedAppsCache(); + Assert.assertEquals(2, hostedApps.size()); + Assert.assertEquals(1, hostedApps.get("h1").getHostedApps().size()); + Assert.assertEquals(2, hostedApps.get("h2").getHostedApps().size()); + + hostedInstances = metadataManager.getHostedInstanceCache(); + Assert.assertEquals(2, hostedInstances.size()); + Assert.assertEquals(1, hostedInstances.get("i1").size()); + Assert.assertEquals(2, hostedInstances.get("i2").size()); + + } + + @Test + public void testFilterByRegexOnMetricName() throws Exception { + Configuration configuration = createNiceMock(Configuration.class); + PhoenixHBaseAccessor hBaseAccessor = createNiceMock(PhoenixHBaseAccessor.class); + + TimelineMetricMetadata metadata1 = new TimelineMetricMetadata( + "xxx.abc.yyy", "a1", null, "", GAUGE.name(), System.currentTimeMillis(), true, false); + TimelineMetricMetadata metadata2 = new TimelineMetricMetadata( + "xxx.cdef.yyy", "a2", null, "", GAUGE.name(), System.currentTimeMillis(), true, false); + TimelineMetricMetadata metadata3 = new TimelineMetricMetadata( + "xxx.pqr.zzz", "a3", null, "", GAUGE.name(), System.currentTimeMillis(), true, false); + + expect(configuration.get(TIMELINE_METRIC_METADATA_FILTERS)).andReturn("abc,cde"); + + replay(configuration, hBaseAccessor); + + TimelineMetricMetadataManager metadataManager = new TimelineMetricMetadataManager(configuration, hBaseAccessor); + + metadataManager.putIfModifiedTimelineMetricMetadata(metadata1); + metadataManager.putIfModifiedTimelineMetricMetadata(metadata2); + metadataManager.putIfModifiedTimelineMetricMetadata(metadata3); + + verify(configuration, hBaseAccessor); + + Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata = + metadataManager.getMetadataCache(); + + Assert.assertEquals(1, metadata.size()); + Assert.assertEquals("xxx.pqr.zzz", metadata.keySet().iterator().next().getMetricName()); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionTest.java new file mode 100644 index 0000000..492d5a0 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionTest.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.function; + +import com.google.common.collect.Lists; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.junit.Test; + +import java.util.Map; +import java.util.TreeMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TimelineMetricsSeriesAggregateFunctionTest { + + public static final double DELTA = 0.00000000001; + + @Test public void testSeriesAggregateBySummation() throws Exception { + TimelineMetrics testMetrics = getTestObject(); + // all TimelineMetric are having same values + TreeMap<Long, Double> metricValues = testMetrics.getMetrics().get(0).getMetricValues(); + + TimelineMetricsSeriesAggregateFunction function = TimelineMetricsSeriesAggregateFunctionFactory + .newInstance(SeriesAggregateFunction.SUM); + TimelineMetric aggregatedMetric = function.apply(testMetrics); + + String aggregatedMetricName = aggregatedMetric.getMetricName(); + String aggregatedHostName = aggregatedMetric.getHostName(); + String aggregatedAppId = aggregatedMetric.getAppId(); + String aggregatedInstanceId = aggregatedMetric.getInstanceId(); + + for (TimelineMetric testMetric : testMetrics.getMetrics()) { + assertTrue(aggregatedMetricName.contains(testMetric.getMetricName())); + assertTrue(aggregatedHostName.contains(testMetric.getHostName())); + if (!testMetric.getMetricName().equals("byte_in.3")) { + assertTrue(aggregatedAppId.contains(testMetric.getAppId())); + assertTrue(aggregatedInstanceId.contains(testMetric.getInstanceId())); + } + } + + TreeMap<Long, Double> summationMetricValues = aggregatedMetric.getMetricValues(); + assertEquals(3, summationMetricValues.size()); + for (Map.Entry<Long, Double> tsAndValue : summationMetricValues.entrySet()) { + assertEquals(metricValues.get(tsAndValue.getKey()) * 3, tsAndValue.getValue(), DELTA); + } + } + + @Test public void testSeriesAggregateByAverage() throws Exception { + TimelineMetrics testMetrics = getTestObject(); + // all TimelineMetric are having same values + TreeMap<Long, Double> metricValues = testMetrics.getMetrics().get(0).getMetricValues(); + + TimelineMetricsSeriesAggregateFunction function = TimelineMetricsSeriesAggregateFunctionFactory + .newInstance(SeriesAggregateFunction.AVG); + TimelineMetric aggregatedMetric = function.apply(testMetrics); + + // checks only values, others are covered by testSeriesAggregateBySummation + TreeMap<Long, Double> averageMetricValues = aggregatedMetric.getMetricValues(); + assertEquals(3, averageMetricValues.size()); + for (Map.Entry<Long, Double> tsAndValue : averageMetricValues.entrySet()) { + assertEquals(metricValues.get(tsAndValue.getKey()), tsAndValue.getValue(), DELTA); + } + } + + @Test public void testSeriesAggregateByMax() throws Exception { + TimelineMetrics testMetrics = getTestObject(); + + // override metric values + TreeMap<Long, Double> metricValues = new TreeMap<>(); + metricValues.put(1L, 1.0); + metricValues.put(2L, 2.0); + metricValues.put(3L, 3.0); + + testMetrics.getMetrics().get(0).setMetricValues(metricValues); + + TreeMap<Long, Double> metricValues2 = new TreeMap<>(); + metricValues2.put(1L, 2.0); + metricValues2.put(2L, 4.0); + metricValues2.put(3L, 6.0); + + testMetrics.getMetrics().get(1).setMetricValues(metricValues2); + + TreeMap<Long, Double> metricValues3 = new TreeMap<>(); + metricValues3.put(1L, 3.0); + metricValues3.put(2L, 6.0); + metricValues3.put(3L, 9.0); + + testMetrics.getMetrics().get(2).setMetricValues(metricValues3); + + TimelineMetricsSeriesAggregateFunction function = TimelineMetricsSeriesAggregateFunctionFactory + .newInstance(SeriesAggregateFunction.MAX); + TimelineMetric aggregatedMetric = function.apply(testMetrics); + + // checks only values, others are covered by testSeriesAggregateBySummation + TreeMap<Long, Double> maxMetricValues = aggregatedMetric.getMetricValues(); + assertEquals(3, maxMetricValues.size()); + for (Map.Entry<Long, Double> tsAndValue : maxMetricValues.entrySet()) { + assertEquals(metricValues3.get(tsAndValue.getKey()), tsAndValue.getValue(), DELTA); + } + } + + @Test public void testSeriesAggregateByMin() throws Exception { + TimelineMetrics testMetrics = getTestObject(); + + // override metric values + TreeMap<Long, Double> metricValues = new TreeMap<>(); + metricValues.put(1L, 1.0); + metricValues.put(2L, 2.0); + metricValues.put(3L, 3.0); + + testMetrics.getMetrics().get(0).setMetricValues(metricValues); + + TreeMap<Long, Double> metricValues2 = new TreeMap<>(); + metricValues2.put(1L, 2.0); + metricValues2.put(2L, 4.0); + metricValues2.put(3L, 6.0); + + testMetrics.getMetrics().get(1).setMetricValues(metricValues2); + + TreeMap<Long, Double> metricValues3 = new TreeMap<>(); + metricValues3.put(1L, 3.0); + metricValues3.put(2L, 6.0); + metricValues3.put(3L, 9.0); + + testMetrics.getMetrics().get(2).setMetricValues(metricValues3); + + TimelineMetricsSeriesAggregateFunction function = TimelineMetricsSeriesAggregateFunctionFactory + .newInstance(SeriesAggregateFunction.MIN); + TimelineMetric aggregatedMetric = function.apply(testMetrics); + + // checks only values, others are covered by testSeriesAggregateBySummation + TreeMap<Long, Double> minMetricValues = aggregatedMetric.getMetricValues(); + assertEquals(3, minMetricValues.size()); + for (Map.Entry<Long, Double> tsAndValue : minMetricValues.entrySet()) { + assertEquals(metricValues.get(tsAndValue.getKey()), tsAndValue.getValue(), DELTA); + } + } + + private TimelineMetrics getTestObject() { + TimelineMetric metric = new TimelineMetric(); + metric.setMetricName("byte_in.1"); + metric.setHostName("host1"); + metric.setAppId("app1"); + metric.setInstanceId("instance1"); + + TimelineMetric metric2 = new TimelineMetric(); + metric2.setMetricName("byte_in.2"); + metric2.setHostName("host2"); + metric2.setAppId("app2"); + metric2.setInstanceId("instance2"); + + TimelineMetric metric3 = new TimelineMetric(); + metric3.setMetricName("byte_in.3"); + metric3.setHostName("host3"); + // appId and instanceId for metric3 are null + + TreeMap<Long, Double> metricValues = new TreeMap<>(); + metricValues.put(1L, 3.0); + metricValues.put(2L, 2 * 3.0); + metricValues.put(3L, 3 * 3.0); + + metric.setMetricValues(metricValues); + metric2.setMetricValues(metricValues); + metric3.setMetricValues(metricValues); + + TimelineMetrics metrics = new TimelineMetrics(); + metrics.setMetrics(Lists.newArrayList(metric, metric2, metric3)); + + return metrics; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/query/DefaultConditionTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/query/DefaultConditionTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/query/DefaultConditionTest.java new file mode 100644 index 0000000..7d7e0f7 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/query/DefaultConditionTest.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.metrics.core.timeline.query; + +import junit.framework.Assert; +import org.apache.commons.collections.CollectionUtils; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class DefaultConditionTest { + + @Test + public void testMetricNameWhereCondition() { + //EMPTY + List<byte[]> uuids = new ArrayList<>(); + DefaultCondition condition = new DefaultCondition(uuids,null,null,null,null,null,null,null,null,true); + StringBuilder sb = new StringBuilder(); + condition.appendUuidClause(sb); + Assert.assertEquals(sb.toString(), ""); + Assert.assertTrue(CollectionUtils.isEqualCollection(uuids, condition.getUuids())); + + //Metric uuid + uuids.add(new byte[16]); + condition = new DefaultCondition(uuids,null,null,null,null,null,null,null,null,true); + sb = new StringBuilder(); + condition.appendUuidClause(sb); + Assert.assertEquals(sb.toString(), "(UUID LIKE ?)"); + Assert.assertEquals(uuids.size(), condition.getUuids().size()); + Assert.assertTrue(new String(condition.getUuids().get(0)).endsWith("%")); + + //metric uuid + Host uuid + uuids.add(new byte[4]); + condition = new DefaultCondition(uuids,null,null,null,null,null,null,null,null,true); + sb = new StringBuilder(); + condition.appendUuidClause(sb); + Assert.assertEquals(sb.toString(), "(UUID LIKE ? AND UUID LIKE ?)"); + Assert.assertEquals(uuids.size(), condition.getUuids().size()); + Assert.assertTrue(new String(condition.getUuids().get(1)).startsWith("%")); + + //metric + host + full + uuids.add(new byte[20]); + uuids.add(new byte[20]); + condition = new DefaultCondition(uuids,null,null,null,null,null,null,null,null,true); + sb = new StringBuilder(); + condition.appendUuidClause(sb); + Assert.assertEquals(sb.toString(), "(UUID IN (?, ?) AND UUID LIKE ? AND UUID LIKE ?)"); + Assert.assertEquals(uuids.size(), condition.getUuids().size()); + + //Only IN clause. + uuids.clear(); + uuids.add(new byte[20]); + condition = new DefaultCondition(uuids,null,null,null,null,null,null,null,null,true); + sb = new StringBuilder(); + condition.appendUuidClause(sb); + Assert.assertEquals(sb.toString(), "(UUID IN (?))"); + Assert.assertEquals(uuids.size(), condition.getUuids().size()); + + //metric NOT LIKE + uuids.clear(); + uuids.add(new byte[16]); + condition = new DefaultCondition(uuids,null,null,null,null,null,null,null,null,true); + sb = new StringBuilder(); + condition.setMetricNamesNotCondition(true); + condition.appendUuidClause(sb); + Assert.assertEquals(sb.toString(), "(UUID NOT LIKE ?)"); + Assert.assertEquals(uuids.size(), condition.getUuids().size()); + + //metric NOT LIKE host LIKE + uuids.clear(); + uuids.add(new byte[16]); + uuids.add(new byte[4]); + condition = new DefaultCondition(uuids,null,null,null,null,null,null,null,null,true); + sb = new StringBuilder(); + condition.setMetricNamesNotCondition(true); + condition.appendUuidClause(sb); + Assert.assertEquals(sb.toString(), "(UUID NOT LIKE ? AND UUID LIKE ?)"); + Assert.assertEquals(uuids.size(), condition.getUuids().size()); + Assert.assertTrue(new String(condition.getUuids().get(0)).endsWith("%")); + Assert.assertTrue(new String(condition.getUuids().get(1)).startsWith("%")); + + //metric LIKE host NOT LIKE + uuids.clear(); + uuids.add(new byte[16]); + uuids.add(new byte[4]); + condition = new DefaultCondition(uuids,null,null,null,null,null,null,null,null,true); + sb = new StringBuilder(); + condition.setHostnamesNotCondition(true); + condition.appendUuidClause(sb); + Assert.assertEquals(sb.toString(), "(UUID LIKE ? AND UUID NOT LIKE ?)"); + Assert.assertEquals(uuids.size(), condition.getUuids().size()); + Assert.assertTrue(new String(condition.getUuids().get(0)).endsWith("%")); + Assert.assertTrue(new String(condition.getUuids().get(1)).startsWith("%")); + + //metric LIKE or LIKE host LIKE + uuids.clear(); + uuids.add(new byte[4]); + uuids.add(new byte[16]); + uuids.add(new byte[16]); + condition = new DefaultCondition(uuids,null,null,null,null,null,null,null,null,true); + sb = new StringBuilder(); + condition.appendUuidClause(sb); + Assert.assertEquals(sb.toString(), "((UUID LIKE ? OR UUID LIKE ?) AND UUID LIKE ?)"); + Assert.assertEquals(uuids.size(), condition.getUuids().size()); + Assert.assertTrue(new String(condition.getUuids().get(0)).endsWith("%")); + Assert.assertTrue(new String(condition.getUuids().get(1)).endsWith("%")); + Assert.assertTrue(new String(condition.getUuids().get(2)).startsWith("%")); + + //UUID in metric LIKE or LIKE host LIKE + uuids.clear(); + uuids.add(new byte[16]); + uuids.add(new byte[16]); + uuids.add(new byte[20]); + uuids.add(new byte[4]); + condition = new DefaultCondition(uuids,null,null,null,null,null,null,null,null,true); + sb = new StringBuilder(); + condition.appendUuidClause(sb); + Assert.assertEquals(sb.toString(), "(UUID IN (?) AND (UUID LIKE ? OR UUID LIKE ?) AND UUID LIKE ?)"); + Assert.assertEquals(uuids.size(), condition.getUuids().size()); + Assert.assertTrue(new String(condition.getUuids().get(1)).endsWith("%")); + Assert.assertTrue(new String(condition.getUuids().get(2)).endsWith("%")); + Assert.assertTrue(new String(condition.getUuids().get(3)).startsWith("%")); + + //metric LIKE host LIKE or LIKE + uuids.clear(); + uuids.add(new byte[16]); + uuids.add(new byte[4]); + uuids.add(new byte[4]); + condition = new DefaultCondition(uuids,null,null,null,null,null,null,null,null,true); + sb = new StringBuilder(); + condition.appendUuidClause(sb); + Assert.assertEquals(sb.toString(), "(UUID LIKE ? AND (UUID LIKE ? OR UUID LIKE ?))"); + Assert.assertEquals(uuids.size(), condition.getUuids().size()); + Assert.assertTrue(new String(condition.getUuids().get(0)).endsWith("%")); + Assert.assertTrue(new String(condition.getUuids().get(1)).startsWith("%")); + Assert.assertTrue(new String(condition.getUuids().get(2)).startsWith("%")); + + //UUID NOT IN metric LIKE host LIKE + uuids.clear(); + uuids.add(new byte[20]); + uuids.add(new byte[16]); + uuids.add(new byte[4]); + condition = new DefaultCondition(uuids,null,null,null,null,null,null,null,null,true); + sb = new StringBuilder(); + condition.setUuidNotCondition(true); + condition.appendUuidClause(sb); + Assert.assertEquals(sb.toString(), "(UUID NOT IN (?) AND UUID LIKE ? AND UUID LIKE ?)"); + Assert.assertEquals(uuids.size(), condition.getUuids().size()); + Assert.assertTrue(new String(condition.getUuids().get(1)).endsWith("%")); + Assert.assertTrue(new String(condition.getUuids().get(2)).startsWith("%")); + } +} + http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSourceTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSourceTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSourceTest.java new file mode 100644 index 0000000..7d58682 --- /dev/null +++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSourceTest.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.metrics.core.timeline.source; + +import static org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS; +import static org.easymock.EasyMock.capture; +import static org.easymock.EasyMock.createNiceMock; +import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; +import static org.powermock.api.easymock.PowerMock.mockStatic; +import static org.powermock.api.easymock.PowerMock.replayAll; + +import java.util.Collection; +import java.util.Collections; +import java.util.TreeMap; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration; +import org.apache.ambari.metrics.core.timeline.sink.ExternalMetricsSink; +import org.easymock.Capture; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import junit.framework.Assert; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(TimelineMetricConfiguration.class) +public class RawMetricsSourceTest { + + @Before + public void setupConf() throws Exception { + TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new + Configuration(), new Configuration()); + mockStatic(TimelineMetricConfiguration.class); + expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes(); + replayAll(); + } + + @Test + public void testRawMetricsSourcedAtFlushInterval() throws Exception { + InternalSourceProvider internalSourceProvider = new DefaultInternalMetricsSourceProvider(); + ExternalMetricsSink rawMetricsSink = createNiceMock(ExternalMetricsSink.class); + expect(rawMetricsSink.getFlushSeconds()).andReturn(1); + expect(rawMetricsSink.getSinkTimeOutSeconds()).andReturn(1); + Capture<Collection<TimelineMetrics>> metricsCapture = new Capture<>(); + rawMetricsSink.sinkMetricData(capture(metricsCapture)); + expectLastCall(); + replay(rawMetricsSink); + + InternalMetricsSource rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, 1, rawMetricsSink); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + + final long now = System.currentTimeMillis(); + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("m1"); + metric1.setAppId("a1"); + metric1.setInstanceId("i1"); + metric1.setHostName("h1"); + metric1.setStartTime(now - 200); + metric1.setMetricValues(new TreeMap<Long, Double>() {{ + put(now - 100, 1.0); + put(now - 200, 2.0); + }}); + timelineMetrics.getMetrics().add(metric1); + + rawMetricsSource.publishTimelineMetrics(Collections.singletonList(timelineMetrics)); + + verify(rawMetricsSink); + } + + @Test(timeout = 10000) + public void testRawMetricsCachedAndSourced() throws Exception { + ExternalMetricsSink rawMetricsSink = createNiceMock(ExternalMetricsSink.class); + expect(rawMetricsSink.getFlushSeconds()).andReturn(2).anyTimes(); + expect(rawMetricsSink.getSinkTimeOutSeconds()).andReturn(1).anyTimes(); + + class CaptureOnce<T> extends Capture<T> { + @Override + public void setValue(T value) { + if (!hasCaptured()) { + super.setValue(value); + } + } + } + Capture<Collection<TimelineMetrics>> metricsCapture = new CaptureOnce<>(); + + rawMetricsSink.sinkMetricData(capture(metricsCapture)); + expectLastCall(); + replay(rawMetricsSink); + + InternalSourceProvider internalSourceProvider = new DefaultInternalMetricsSourceProvider(); + InternalMetricsSource rawMetricsSource = internalSourceProvider.getInternalMetricsSource(RAW_METRICS, 1, rawMetricsSink); + TimelineMetrics timelineMetrics = new TimelineMetrics(); + + final long now = System.currentTimeMillis(); + TimelineMetric metric1 = new TimelineMetric(); + metric1.setMetricName("m1"); + metric1.setAppId("a1"); + metric1.setInstanceId("i1"); + metric1.setHostName("h1"); + metric1.setStartTime(now - 200); + metric1.setMetricValues(new TreeMap<Long, Double>() {{ + put(now - 100, 1.0); + put(now - 200, 2.0); + }}); + timelineMetrics.getMetrics().add(metric1); + + rawMetricsSource.publishTimelineMetrics(Collections.singletonList(timelineMetrics)); + + // Wait on eviction + Thread.sleep(5000); + + verify(rawMetricsSink); + + Assert.assertTrue(metricsCapture.hasCaptured()); + Assert.assertTrue(metricsCapture.getValue().iterator().next().getMetrics().iterator().next().equals(metric1)); + } + +}
