http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java
new file mode 100644
index 0000000..8517105
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/ITMetricAggregator.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.ambari.metrics.core.timeline.AbstractMiniHBaseClusterTest;
+import org.apache.ambari.metrics.core.timeline.MetricTestHelper;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL;
+import org.junit.Test;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+import static 
org.apache.ambari.metrics.core.timeline.MetricTestHelper.createEmptyTimelineMetric;
+import static 
org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+import static 
org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
+import static 
org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static 
org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
+
+  @Test
+  public void testShouldInsertMetrics() throws Exception {
+    // GIVEN
+
+    // WHEN
+    long startTime = System.currentTimeMillis();
+    TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local");
+    hdb.insertMetricRecords(metricsSent, true);
+
+    Condition queryCondition = new DefaultCondition(null,
+        Collections.singletonList("local"), null, null, startTime,
+        startTime + (15 * 60 * 1000), null, null, false);
+    TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition, null);
+
+    // THEN
+    assertThat(recordRead.getMetrics()).hasSize(2)
+      .extracting("metricName")
+      .containsOnly("mem_free", "disk_free");
+
+    assertThat(metricsSent.getMetrics())
+      .usingElementComparator(TIME_IGNORING_COMPARATOR)
+      .containsAll(recordRead.getMetrics());
+  }
+
+  private Configuration getConfigurationForTest(boolean useGroupByAggregators) 
{
+    Configuration configuration = new Configuration();
+    configuration.set("timeline.metrics.service.use.groupBy.aggregators", 
String.valueOf(useGroupByAggregators));
+    return configuration;
+  }
+
+  @Test
+  public void testShouldAggregateMinuteProperly() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator aggregatorMinute =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
+        getConfigurationForTest(false), metadataManager, null);
+    TimelineMetricReadHelper readHelper = new 
TimelineMetricReadHelper(metadataManager, false);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"), true);
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), 
true);
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), 
true);
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), 
true);
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), 
true);
+
+    // WHEN
+    long endTime = startTime + 1000 * 60 * 4;
+    boolean success = aggregatorMinute.doWork(startTime, endTime + 1);
+
+    //THEN
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime + 2, null, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+      METRICS_AGGREGATE_MINUTE_TABLE_NAME));
+
+    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+    MetricHostAggregate expectedAggregate =
+      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+
+    int count = 0;
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        readHelper.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
+        count++;
+      } else if ("mem_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
+        count++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+    assertEquals("Two aggregated entries expected", 2, count);
+  }
+
+  @Test
+   public void testShouldAggregateHourProperly() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator aggregator =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb,
+        getConfigurationForTest(false), metadataManager, null);
+    TimelineMetricReadHelper readHelper = new 
TimelineMetricReadHelper(metadataManager, false);
+    long startTime = System.currentTimeMillis();
+
+    MetricHostAggregate expectedAggregate =
+      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+    Map<TimelineMetric, MetricHostAggregate>
+      aggMap = new HashMap<TimelineMetric,
+      MetricHostAggregate>();
+
+    int min_5 = 5 * 60 * 1000;
+    long ctime = startTime - min_5;
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+
+    hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+    //WHEN
+    long endTime = ctime + min_5;
+    boolean success = aggregator.doWork(startTime, endTime);
+    assertTrue(success);
+
+    //THEN
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime + 1, null, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, 
METRICS_AGGREGATE_HOURLY_TABLE_NAME));
+
+    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        readHelper.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_used".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(12 * 15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
+      }
+    }
+  }
+
+  @Test
+  public void testMetricAggregateDaily() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator aggregator =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hdb,
+        getConfigurationForTest(false), metadataManager, null);
+    TimelineMetricReadHelper readHelper = new 
TimelineMetricReadHelper(metadataManager, false);
+    long startTime = System.currentTimeMillis();
+
+    MetricHostAggregate expectedAggregate =
+      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+    Map<TimelineMetric, MetricHostAggregate>
+      aggMap = new HashMap<TimelineMetric, MetricHostAggregate>();
+
+    int min_5 = 5 * 60 * 1000;
+    long ctime = startTime - min_5;
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+    aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+
+    hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_HOURLY_TABLE_NAME);
+
+    //WHEN
+    long endTime = ctime + min_5;
+    boolean success = aggregator.doWork(startTime, endTime);
+    assertTrue(success);
+
+    //THEN
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime + 1, null, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, 
METRICS_AGGREGATE_DAILY_TABLE_NAME));
+
+    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        readHelper.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_used".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(12 * 15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
+      }
+    }
+  }
+
+  @Test
+  public void testAggregationUsingGroupByQuery() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator aggregatorMinute =
+      TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb,
+        getConfigurationForTest(true), metadataManager, null);
+    TimelineMetricReadHelper readHelper = new 
TimelineMetricReadHelper(metadataManager, false);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"), true);
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), 
true);
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), 
true);
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), 
true);
+    hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"), 
true);
+
+    long endTime = startTime + 1000 * 60 * 4;
+    boolean success = aggregatorMinute.doWork(startTime - 1, endTime);
+    assertTrue(success);
+
+    Condition condition = new DefaultCondition(null, null, null, null, 
startTime,
+      endTime + 1, null, null, true);
+    condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL, 
METRICS_AGGREGATE_MINUTE_TABLE_NAME));
+
+    PreparedStatement pstmt = 
PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+    ResultSet rs = pstmt.executeQuery();
+    MetricHostAggregate expectedAggregate =
+      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+
+    int count = 0;
+    while (rs.next()) {
+      TimelineMetric currentMetric =
+        readHelper.getTimelineMetricKeyFromResultSet(rs);
+      MetricHostAggregate currentHostAggregate =
+        readHelper.getMetricHostAggregateFromResultSet(rs);
+
+      if ("disk_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
+        count++;
+      } else if ("mem_free".equals(currentMetric.getMetricName())) {
+        assertEquals(2.0, currentHostAggregate.getMax());
+        assertEquals(0.0, currentHostAggregate.getMin());
+        assertEquals(20, currentHostAggregate.getNumberOfSamples());
+        assertEquals(15.0, currentHostAggregate.getSum());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
+        count++;
+      } else {
+        fail("Unexpected entry");
+      }
+    }
+    assertEquals("Two aggregated entries expected", 2, count);
+  }
+
+  private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
+    new Comparator<TimelineMetric>() {
+      @Override
+      public int compare(TimelineMetric o1, TimelineMetric o2) {
+        return o1.equalsExceptTime(o2) ? 0 : 1;
+      }
+    };
+
+  private TimelineMetrics prepareTimelineMetrics(long startTime, String host) {
+    TimelineMetrics metrics = new TimelineMetrics();
+    metrics.setMetrics(Arrays.asList(
+      createMetric(startTime, "disk_free", host),
+      createMetric(startTime, "mem_free", host)));
+
+    return metrics;
+  }
+
+  private TimelineMetric createMetric(long startTime, String metricName, 
String host) {
+    TimelineMetric m = new TimelineMetric();
+    m.setAppId("host");
+    m.setHostName(host);
+    m.setMetricName(metricName);
+    m.setStartTime(startTime);
+    TreeMap<Long, Double> vals = new TreeMap<Long, Double>();
+    vals.put(startTime + 15000l, 0.0);
+    vals.put(startTime + 30000l, 0.0);
+    vals.put(startTime + 45000l, 1.0);
+    vals.put(startTime + 60000l, 2.0);
+
+    m.setMetricValues(vals);
+
+    return m;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
new file mode 100644
index 0000000..c73ac36
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
@@ -0,0 +1,404 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+
+import static 
org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedAggregateTimeMillis;
+import static 
org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+import static 
org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
+import static 
org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
+import static 
org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+import java.sql.ResultSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
+import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+
+ import junit.framework.Assert;
+
+public class TimelineMetricClusterAggregatorSecondTest {
+
+  @Test
+  public void testClusterSecondAggregatorWithInterpolation() {
+
+    long aggregatorInterval = 120000l;
+    long sliceInterval = 30000l;
+    long metricInterval = 10000l;
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = 
createNiceMock(TimelineMetricMetadataManager.class);
+    
expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new
 byte[16]).once();
+    replay(metricMetadataManagerMock);
+
+    long roundedEndTime = getRoundedAggregateTimeMillis(aggregatorInterval);
+    long roundedStartTime = roundedEndTime - aggregatorInterval;
+    List<Long[]> timeSlices = getTimeSlices(roundedStartTime ,
+      roundedEndTime, sliceInterval);
+
+    TreeMap<Long, Double> metricValues = new TreeMap<>();
+
+    long startTime = roundedEndTime - aggregatorInterval;
+
+    for (int i=1; startTime < roundedEndTime; i++) {
+      startTime += metricInterval;
+      if (i%6 == 1 || i%6 == 2) {
+        metricValues.put(startTime, (double)i);
+      }
+    }
+
+    TimelineMetric counterMetric = new TimelineMetric();
+    counterMetric.setMetricName("TestMetric");
+    counterMetric.setHostName("TestHost");
+    counterMetric.setAppId("TestAppId");
+    counterMetric.setMetricValues(metricValues);
+    counterMetric.setType("COUNTER");
+
+    Map<TimelineClusterMetric, Double> timelineClusterMetricMap = 
sliceFromTimelineMetric(counterMetric, timeSlices, true);
+
+    TimelineClusterMetric timelineClusterMetric = new 
TimelineClusterMetric(counterMetric.getMetricName(), counterMetric.getAppId(),
+      counterMetric.getInstanceId(), 0l);
+
+    timelineClusterMetric.setTimestamp(roundedStartTime + 2*sliceInterval);
+    
Assert.assertTrue(timelineClusterMetricMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(timelineClusterMetricMap.get(timelineClusterMetric), 
6.0);
+
+    timelineClusterMetric.setTimestamp(roundedStartTime + 4*sliceInterval);
+    
Assert.assertTrue(timelineClusterMetricMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(timelineClusterMetricMap.get(timelineClusterMetric), 
12.0);
+
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName("TestMetric");
+    metric.setHostName("TestHost");
+    metric.setAppId("TestAppId");
+    metric.setMetricValues(metricValues);
+
+    timelineClusterMetricMap = sliceFromTimelineMetric(metric, timeSlices, 
true);
+
+    timelineClusterMetric = new TimelineClusterMetric(metric.getMetricName(), 
metric.getAppId(),
+      metric.getInstanceId(), 0l);
+
+    timelineClusterMetric.setTimestamp(roundedStartTime + 2*sliceInterval);
+    
Assert.assertTrue(timelineClusterMetricMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(timelineClusterMetricMap.get(timelineClusterMetric), 
4.5);
+
+    timelineClusterMetric.setTimestamp(roundedStartTime + 4*sliceInterval);
+    
Assert.assertTrue(timelineClusterMetricMap.containsKey(timelineClusterMetric));
+    Assert.assertEquals(timelineClusterMetricMap.get(timelineClusterMetric), 
7.5);
+  }
+
+  @Test
+  public void testShouldAggregateProperly() {
+
+    long aggregatorInterval = 120000l;
+    long sliceInterval = 30000l;
+
+    Configuration configuration = new Configuration();
+    TimelineMetricMetadataManager metricMetadataManagerMock = 
createNiceMock(TimelineMetricMetadataManager.class);
+
+    
expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey)
 anyObject()))
+      .andReturn(null).anyTimes();
+    replay(metricMetadataManagerMock);
+
+    TimelineMetricClusterAggregatorSecond secondAggregator = new 
TimelineMetricClusterAggregatorSecond(
+      METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, 
null,
+      aggregatorInterval, 2, "false", "", "", aggregatorInterval, 
sliceInterval, null
+    );
+
+    long startTime = 
getRoundedCheckPointTimeMillis(System.currentTimeMillis(),aggregatorInterval);
+    List<Long[]> timeslices = getTimeSlices(startTime, startTime + 
aggregatorInterval, sliceInterval);
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics 
= new HashMap<>();
+    long seconds = 1000;
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName("m1");
+    timelineMetric.setHostName("h1");
+    timelineMetric.setAppId("a1");
+    timelineMetric.setType("GUAGE");
+    timelineMetric.setStartTime(startTime);
+
+    /*
+
+    0      +30s    +60s    +90s    +120s   +150s   +180s
+    |       |       |       |       |       |       |
+     (1)        (2)      (3)    (4)   (5)     (6)
+
+    */
+    // Case 1 : Points present in all the required timeslices.
+    // Life is good! Ignore (5) and (6).
+
+    TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
+    metricValues.put(startTime + 15*seconds, 1.0);
+    metricValues.put(startTime + 45*seconds, 2.0);
+    metricValues.put(startTime + 75*seconds, 3.0);
+    metricValues.put(startTime + 105*seconds, 4.0);
+    metricValues.put(startTime + 135*seconds, 5.0);
+    metricValues.put(startTime + 165*seconds, 6.0);
+
+    timelineMetric.setMetricValues(metricValues);
+    secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, 
timelineMetric, timeslices);
+
+    Assert.assertEquals(aggregateClusterMetrics.size(), 4);
+    TimelineClusterMetric timelineClusterMetric = new 
TimelineClusterMetric(timelineMetric.getMetricName(),
+      timelineMetric.getAppId(), timelineMetric.getInstanceId(), startTime + 
30*seconds);
+
+    
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
+    
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(),
 1.0);
+
+    timelineClusterMetric.setTimestamp(startTime + 4*30*seconds);
+    
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
+    
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(),4.0);
+
+    metricValues.clear();
+    aggregateClusterMetrics.clear();
+
+    /*
+
+    0      +30s    +60s    +90s    +120s   +150s   +180s
+    |       |       |       |       |       |       |
+     (1)              (3)      (4)    (5)     (6)
+
+    */
+    // Case 2 : Some "middle" point missing in the required timeslices.
+    // Interpolate the middle point. Ignore (5) and (6).
+    metricValues.put(startTime + 15*seconds, 1.0);
+    metricValues.put(startTime + 75*seconds, 3.0);
+    metricValues.put(startTime + 105*seconds, 4.0);
+    metricValues.put(startTime + 135*seconds, 5.0);
+    metricValues.put(startTime + 165*seconds, 6.0);
+
+    timelineMetric.setMetricValues(metricValues);
+    secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, 
timelineMetric, timeslices);
+
+    Assert.assertEquals(aggregateClusterMetrics.size(), 4);
+    timelineClusterMetric.setTimestamp(startTime + 60*seconds);
+    
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
+    
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(),
 2.0);
+
+    metricValues.clear();
+    aggregateClusterMetrics.clear();
+
+
+    /*
+
+    0      +30s    +60s    +90s    +120s   +150s   +180s
+    |       |       |       |       |       |       |
+     (1)        (2)    (3)              (5)     (6)
+
+    */
+    // Case 3 : "end" point missing in the required timeslices.
+    // Use all points to get missing point if COUNTER. Else use just (3). 
Ignore (6).
+    metricValues.put(startTime + 15*seconds, 1.0);
+    metricValues.put(startTime + 45*seconds, 2.0);
+    metricValues.put(startTime + 75*seconds, 3.0);
+    metricValues.put(startTime + 135*seconds, 5.0);
+    metricValues.put(startTime + 165*seconds, 6.0);
+
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetric.setType("GUAGE");
+    secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, 
timelineMetric, timeslices);
+
+    Assert.assertEquals(aggregateClusterMetrics.size(), 4);
+    timelineClusterMetric.setTimestamp(startTime + 120*seconds);
+    
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
+    
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(),
 3.0);
+
+    aggregateClusterMetrics.clear();
+
+    timelineMetric.setType("COUNTER");
+    secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, 
timelineMetric, timeslices);
+
+    Assert.assertEquals(aggregateClusterMetrics.size(), 4);
+    timelineClusterMetric.setTimestamp(startTime + 120*seconds);
+    
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
+    
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(),
 4.5);
+
+    metricValues.clear();
+    aggregateClusterMetrics.clear();
+
+    /*
+
+    0      +30s    +60s    +90s    +120s   +150s   +180s
+    |       |       |       |       |       |       |
+              (2)      (3)     (4)     (5)     (6)
+
+    */
+    // Case 4 : "start" point missing in the required timeslices.
+    // Interpolate with only (2) to get missing point if GUAGE metric. Else 
use all points for COUNTER.
+
+    metricValues.put(startTime + 45*seconds, 2.0);
+    metricValues.put(startTime + 75*seconds, 3.0);
+    metricValues.put(startTime + 105*seconds, 4.0);
+    metricValues.put(startTime + 135*seconds, 5.0);
+    metricValues.put(startTime + 165*seconds, 6.0);
+
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetric.setType("GUAGE");
+    secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, 
timelineMetric, timeslices);
+
+    Assert.assertEquals(aggregateClusterMetrics.size(), 4);
+    timelineClusterMetric.setTimestamp(startTime + 30*seconds);
+    
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
+    
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(),
 2.0);
+
+    aggregateClusterMetrics.clear();
+
+    timelineMetric.setType("COUNTER");
+    int liveHosts = 
secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, 
timelineMetric, timeslices);
+
+    Assert.assertEquals(liveHosts, 1);
+    Assert.assertEquals(aggregateClusterMetrics.size(), 4);
+    timelineClusterMetric.setTimestamp(startTime + 30*seconds);
+    
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
+    
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(),
 1.5);
+
+    metricValues.clear();
+    aggregateClusterMetrics.clear();
+
+    /*
+
+    0      +30s    +60s    +90s    +120s   +150s   +180s
+    |       |       |       |       |       |       |
+                                        (5)     (6)
+
+    */
+    // Case 5 : Well, we have nothing in the 2 min window.
+    // Use (5) to paint the 2 min window as (5).
+
+    metricValues.put(startTime + 135*seconds, 5.0);
+    metricValues.put(startTime + 165*seconds, 6.0);
+
+    timelineMetric.setMetricValues(metricValues);
+    timelineMetric.setType("GUAGE");
+    secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, 
timelineMetric, timeslices);
+
+    Assert.assertEquals(aggregateClusterMetrics.size(), 4);
+    timelineClusterMetric.setTimestamp(startTime + 30*seconds);
+    
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
+    
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(),
 5.0);
+
+    aggregateClusterMetrics.clear();
+
+    timelineMetric.setType("COUNTER");
+    liveHosts = 
secondAggregator.processAggregateClusterMetrics(aggregateClusterMetrics, 
timelineMetric, timeslices);
+
+    Assert.assertEquals(liveHosts, 1);
+    Assert.assertEquals(aggregateClusterMetrics.size(), 4);
+    timelineClusterMetric.setTimestamp(startTime + 60*seconds);
+    
Assert.assertTrue(aggregateClusterMetrics.containsKey(timelineClusterMetric));
+    
Assert.assertEquals(aggregateClusterMetrics.get(timelineClusterMetric).getSum(),
 2.5);
+
+    metricValues.clear();
+    aggregateClusterMetrics.clear();
+
+  }
+
+  @Test
+  public void testLiveHostCounterMetrics() throws Exception {
+    long aggregatorInterval = 120000;
+    long sliceInterval = 30000;
+
+    Configuration configuration = new Configuration();
+    TimelineMetricMetadataManager metricMetadataManagerMock = 
createNiceMock(TimelineMetricMetadataManager.class);
+
+    
expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey)
 anyObject())).andReturn(null).anyTimes();
+
+    /*
+    m1-h1-a1
+    m2-h1-a1
+    m2-h1-a2
+    m2-h2-a1
+    m2-h2-a2
+    m2-h3-a2
+
+    So live_hosts : a1 = 2
+       live_hosts : a2 = 3
+     */
+
+    TimelineMetric metric1  = new TimelineMetric("m1", "h1", "a1", null);
+    TimelineMetric metric2  = new TimelineMetric("m2", "h1", "a1", null);
+    TimelineMetric metric3  = new TimelineMetric("m2", "h1", "a2", null);
+    TimelineMetric metric4  = new TimelineMetric("m2", "h2", "a1", null);
+    TimelineMetric metric5  = new TimelineMetric("m2", "h2", "a2", null);
+    TimelineMetric metric6  = new TimelineMetric("m2", "h3", "a2", null);
+
+    expect(metricMetadataManagerMock.getMetricFromUuid((byte[]) anyObject())).
+      andReturn(metric1).andReturn(metric2).andReturn(metric3).
+      andReturn(metric4).andReturn(metric5).andReturn(metric6);
+    replay(metricMetadataManagerMock);
+
+    TimelineMetricClusterAggregatorSecond secondAggregator = new 
TimelineMetricClusterAggregatorSecond(
+      METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, configuration, 
null,
+      aggregatorInterval, 2, "false", "", "", aggregatorInterval,
+      sliceInterval, null);
+
+    long now = System.currentTimeMillis();
+    long startTime = now - 120000;
+    long seconds = 1000;
+    List<Long[]> slices = getTimeSlices(startTime, now, sliceInterval);
+    ResultSet rs = createNiceMock(ResultSet.class);
+
+    TreeMap<Long, Double> metricValues = new TreeMap<>();
+    metricValues.put(startTime + 15 * seconds, 1.0);
+    metricValues.put(startTime + 45 * seconds, 2.0);
+    metricValues.put(startTime + 75 * seconds, 3.0);
+    metricValues.put(startTime + 105 * seconds, 4.0);
+
+    expect(rs.next()).andReturn(true).times(6);
+    expect(rs.next()).andReturn(false);
+
+    expect(rs.getLong("SERVER_TIME")).andReturn(now - 150000).times(6);
+    expect(rs.getLong("START_TIME")).andReturn(now - 150000).times(6);
+
+    ObjectMapper mapper = new ObjectMapper();
+    
expect(rs.getString("METRICS")).andReturn(mapper.writeValueAsString(metricValues)).times(6);
+
+    replay(rs);
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = 
secondAggregator.aggregateMetricsFromResultSet(rs, slices);
+
+    Assert.assertNotNull(aggregates);
+
+    MetricClusterAggregate a1 = null, a2 = null;
+
+    for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> m : 
aggregates.entrySet()) {
+      if (m.getKey().getMetricName().equals("live_hosts") && 
m.getKey().getAppId().equals("a1")) {
+        a1 = m.getValue();
+      }
+      if (m.getKey().getMetricName().equals("live_hosts") && 
m.getKey().getAppId().equals("a2")) {
+        a2 = m.getValue();
+      }
+    }
+
+    Assert.assertNotNull(a1);
+    Assert.assertNotNull(a2);
+    Assert.assertEquals(2d, a1.getSum());
+    Assert.assertEquals(3d, a2.getSum());
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
new file mode 100644
index 0000000..34d470c
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricClusterAggregatorSecondWithCacheSourceTest.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricsIgniteCache;
+import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
+import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES;
+import static 
org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
+import static 
org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_AGGREGATE_SECOND;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TimelineMetricConfiguration.class)
+
+@PowerMockIgnore("javax.management.*")
+public class TimelineMetricClusterAggregatorSecondWithCacheSourceTest {
+
+  private static TimelineMetricsIgniteCache timelineMetricsIgniteCache;
+  @BeforeClass
+  public static void setupConf() throws Exception {
+    TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new
+        Configuration(), new Configuration());
+    mockStatic(TimelineMetricConfiguration.class);
+    
expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
+    conf.getMetricsConf().set(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES, 
"localhost");
+    replayAll();
+
+    timelineMetricsIgniteCache = new TimelineMetricsIgniteCache();
+  }
+
+  @Test
+  public void testLiveHostCounterMetrics() throws Exception {
+    long aggregatorInterval = 120000;
+    long sliceInterval = 30000;
+
+    Configuration configuration = new Configuration();
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = 
createNiceMock(TimelineMetricMetadataManager.class);
+    
expect(metricMetadataManagerMock.getMetadataCacheValue((TimelineMetricMetadataKey)
 anyObject())).andReturn(null).anyTimes();
+    replay(metricMetadataManagerMock);
+
+    TimelineMetricClusterAggregatorSecondWithCacheSource secondAggregator = 
new TimelineMetricClusterAggregatorSecondWithCacheSource(
+        METRIC_AGGREGATE_SECOND, metricMetadataManagerMock, null, 
configuration, null,
+        aggregatorInterval, 2, "false", "", "", aggregatorInterval,
+        sliceInterval, null, timelineMetricsIgniteCache);
+
+    long now = System.currentTimeMillis();
+    long startTime = now - 120000;
+    long seconds = 1000;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> metricsFromCache = new 
HashMap<>();
+    metricsFromCache.put(new TimelineClusterMetric("m1", "a1", "i1",startTime 
+ 15 * seconds),
+        new MetricClusterAggregate(1.0, 2, 1.0, 1.0, 1.0));
+    metricsFromCache.put(new TimelineClusterMetric("m2", "a2", "i1",startTime 
+ 18 * seconds),
+        new MetricClusterAggregate(1.0, 5, 1.0, 1.0, 1.0));
+
+    List<Long[]> timeslices = getTimeSlices(startTime, startTime + 
120*seconds, 30*seconds);
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregates = 
secondAggregator.aggregateMetricsFromMetricClusterAggregates(metricsFromCache, 
timeslices);
+
+    Assert.assertNotNull(aggregates);
+
+    MetricClusterAggregate a1 = null, a2 = null;
+
+    for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate> m : 
aggregates.entrySet()) {
+      if (m.getKey().getMetricName().equals("live_hosts") && 
m.getKey().getAppId().equals("a1")) {
+        a1 = m.getValue();
+      }
+      if (m.getKey().getMetricName().equals("live_hosts") && 
m.getKey().getAppId().equals("a2")) {
+        a2 = m.getValue();
+      }
+    }
+
+    Assert.assertNotNull(a1);
+    Assert.assertNotNull(a2);
+    Assert.assertEquals(2d, a1.getSum());
+    Assert.assertEquals(5d, a2.getSum());
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java
new file mode 100644
index 0000000..385a5a1
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/availability/MetricCollectorHAControllerTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.availability;
+
+import junit.framework.Assert;
+import org.apache.ambari.metrics.core.timeline.AbstractMiniHBaseClusterTest;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.junit.Before;
+import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+import static 
org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController.DEFAULT_STATE_MODEL;
+import static 
org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController.METRIC_AGGREGATORS;
+import static 
org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController.CLUSTER_NAME;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+public class MetricCollectorHAControllerTest extends 
AbstractMiniHBaseClusterTest {
+  TimelineMetricConfiguration configuration;
+
+  @Before
+  public void setup() throws Exception {
+    configuration = createNiceMock(TimelineMetricConfiguration.class);
+
+    expect(configuration.getInstanceHostnameFromEnv()).andReturn("h1");
+    expect(configuration.getInstancePort()).andReturn("12000");
+    // jdbc:phoenix:localhost:52887:/hbase;test=true
+    String zkUrl = getUrl();
+    String port = zkUrl.split(":")[3];
+    String quorum = zkUrl.split(":")[2];
+
+    expect(configuration.getClusterZKClientPort()).andReturn(port);
+    expect(configuration.getClusterZKQuorum()).andReturn(quorum);
+    expect(configuration.getZkConnectionUrl(port, quorum)).andReturn(quorum + 
":" + port);
+
+    replay(configuration);
+  }
+
+  @Test(timeout = 180000)
+  public void testHAControllerDistributedAggregation() throws Exception {
+    MetricCollectorHAController haController = new 
MetricCollectorHAController(configuration);
+    haController.initializeHAController();
+    // Wait for task assignment
+    Thread.sleep(10000);
+
+    Assert.assertTrue(haController.isInitialized());
+    Assert.assertEquals(1, haController.getLiveInstanceHostNames().size());
+    
Assert.assertTrue(haController.getAggregationTaskRunner().performsClusterAggregation());
+    
Assert.assertTrue(haController.getAggregationTaskRunner().performsHostAggregation());
+
+    // Add new instance
+    InstanceConfig instanceConfig2 = new InstanceConfig("h2_12001");
+    haController.admin.addInstance(CLUSTER_NAME, instanceConfig2);
+    HelixManager manager2 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+      instanceConfig2.getInstanceName(),
+      InstanceType.PARTICIPANT, haController.zkConnectUrl);
+    
manager2.getStateMachineEngine().registerStateModelFactory(DEFAULT_STATE_MODEL,
+      new OnlineOfflineStateModelFactory(instanceConfig2.getInstanceName(),
+        new AggregationTaskRunner(instanceConfig2.getInstanceName(), "", 
CLUSTER_NAME)));
+    manager2.connect();
+    haController.admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);
+
+    // Wait on re-assignment of partitions
+    Thread.sleep(10000);
+    Assert.assertEquals(2, haController.getLiveInstanceHostNames().size());
+
+    ExternalView view = 
haController.admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
+
+    Map<String, String> partitionInstanceMap = new HashMap<>();
+
+    for (String partition : view.getPartitionSet()) {
+      Map<String, String> states = view.getStateMap(partition);
+      // (instance, state) pairs
+      for (Map.Entry<String, String> stateEntry : states.entrySet()) {
+        partitionInstanceMap.put(partition, stateEntry.getKey());
+        Assert.assertEquals("ONLINE", stateEntry.getValue());
+      }
+    }
+    // Re-assigned partitions
+    Assert.assertEquals(2, partitionInstanceMap.size());
+
+    haController.getAggregationTaskRunner().stop();
+    haController.manager.disconnect();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java
new file mode 100644
index 0000000..94fbb30
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataManager.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.discovery;
+
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+import junit.framework.Assert;
+import org.apache.ambari.metrics.core.timeline.AbstractMiniHBaseClusterTest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import 
org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+public class TestMetadataManager extends AbstractMiniHBaseClusterTest {
+  TimelineMetricMetadataManager metadataManager;
+
+  @Before
+  public void insertDummyRecords() throws IOException, SQLException, 
URISyntaxException {
+
+    final long now = System.currentTimeMillis();
+
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("dummy_metric1");
+    metric1.setHostName("dummy_host1");
+    metric1.setStartTime(now - 1000);
+    metric1.setAppId("dummy_app1");
+    metric1.setType("Integer");
+    metric1.setMetricValues(new TreeMap<Long, Double>() {{
+      put(now - 100, 1.0);
+      put(now - 200, 2.0);
+      put(now - 300, 3.0);
+    }});
+    timelineMetrics.getMetrics().add(metric1);
+    TimelineMetric metric2 = new TimelineMetric();
+    metric2.setMetricName("dummy_metric2");
+    metric2.setHostName("dummy_host2");
+    metric2.setStartTime(now - 1000);
+    metric2.setAppId("dummy_app2");
+    metric2.setType("Integer");
+    metric2.setInstanceId("instance2");
+    metric2.setMetricValues(new TreeMap<Long, Double>() {{
+      put(now - 100, 1.0);
+      put(now - 200, 2.0);
+      put(now - 300, 3.0);
+    }});
+    timelineMetrics.getMetrics().add(metric2);
+
+    Configuration metricsConf = createNiceMock(Configuration.class);
+    
expect(metricsConf.get("timeline.metrics.service.operation.mode")).andReturn("distributed").anyTimes();
+    replay(metricsConf);
+
+    // Initialize new manager
+    metadataManager = new TimelineMetricMetadataManager(metricsConf, hdb);
+    hdb.setMetadataInstance(metadataManager);
+
+    hdb.insertMetricRecordsWithMetadata(metadataManager, timelineMetrics, 
true);
+  }
+
+  @Test(timeout = 180000)
+  public void testSaveMetricsMetadata() throws Exception {
+    Map<TimelineMetricMetadataKey, TimelineMetricMetadata> cachedData = 
metadataManager.getMetadataCache();
+
+    Assert.assertNotNull(cachedData);
+    Assert.assertEquals(2, cachedData.size());
+    TimelineMetricMetadataKey key1 = new 
TimelineMetricMetadataKey("dummy_metric1", "dummy_app1", null);
+    TimelineMetricMetadataKey key2 = new 
TimelineMetricMetadataKey("dummy_metric2", "dummy_app2", "instance2");
+    TimelineMetricMetadata value1 = new TimelineMetricMetadata("dummy_metric1",
+      "dummy_app1", null, null, "Integer", 1L, true, true);
+    TimelineMetricMetadata value2 = new TimelineMetricMetadata("dummy_metric2",
+      "dummy_app2", "instance2", null, "Integer", 1L, true, true);
+
+    Assert.assertEquals(value1, cachedData.get(key1));
+    Assert.assertEquals(value2, cachedData.get(key2));
+
+    TimelineMetricMetadataSync syncRunnable = new 
TimelineMetricMetadataSync(metadataManager);
+    syncRunnable.run();
+
+    Map<TimelineMetricMetadataKey, TimelineMetricMetadata> savedData =
+      hdb.getTimelineMetricMetadata();
+
+    Assert.assertNotNull(savedData);
+    Assert.assertEquals(2, savedData.size());
+    Assert.assertEquals(value1, savedData.get(key1));
+    Assert.assertEquals(value2, savedData.get(key2));
+
+    Map<String, TimelineMetricHostMetadata> cachedHostData = 
metadataManager.getHostedAppsCache();
+    Map<String, TimelineMetricHostMetadata> savedHostData = 
metadataManager.getHostedAppsFromStore();
+    Assert.assertEquals(cachedData.size(), savedData.size());
+    Assert.assertEquals("dummy_app1", 
cachedHostData.get("dummy_host1").getHostedApps().keySet().iterator().next());
+    Assert.assertEquals("dummy_app2", 
cachedHostData.get("dummy_host2").getHostedApps().keySet().iterator().next());
+    Assert.assertEquals("dummy_app1", 
savedHostData.get("dummy_host1").getHostedApps().keySet().iterator().next());
+    Assert.assertEquals("dummy_app2", 
savedHostData.get("dummy_host2").getHostedApps().keySet().iterator().next());
+
+    Map<String, Set<String>> cachedHostInstanceData = 
metadataManager.getHostedInstanceCache();
+    Map<String, Set<String>> savedHostInstanceData = 
metadataManager.getHostedInstancesFromStore();
+    Assert.assertEquals(cachedHostInstanceData.size(), 
savedHostInstanceData.size());
+    Assert.assertEquals("dummy_host2", 
cachedHostInstanceData.get("instance2").iterator().next());
+  }
+
+  @Test
+  public void testGenerateUuidFromMetric() throws SQLException {
+
+    TimelineMetric timelineMetric = new TimelineMetric();
+    
timelineMetric.setMetricName("regionserver.Server.blockCacheExpressHitPercent");
+    timelineMetric.setAppId("hbase");
+    timelineMetric.setHostName("avijayan-ams-2.openstacklocal");
+    timelineMetric.setInstanceId("test1");
+
+    byte[] uuid = metadataManager.getUuid(timelineMetric);
+    Assert.assertNotNull(uuid);
+    Assert.assertEquals(uuid.length, 20);
+
+    byte[] uuidWithoutHost = metadataManager.getUuid(new 
TimelineClusterMetric(timelineMetric.getMetricName(), 
timelineMetric.getAppId(), timelineMetric.getInstanceId(), -1));
+    Assert.assertNotNull(uuidWithoutHost);
+    Assert.assertEquals(uuidWithoutHost.length, 16);
+
+    TimelineMetric metric2 = metadataManager.getMetricFromUuid(uuid);
+    Assert.assertEquals(metric2, timelineMetric);
+    TimelineMetric metric3 = 
metadataManager.getMetricFromUuid(uuidWithoutHost);
+    Assert.assertEquals(metric3.getMetricName(), 
timelineMetric.getMetricName());
+    Assert.assertEquals(metric3.getAppId(), timelineMetric.getAppId());
+    Assert.assertEquals(metric3.getInstanceId(), 
timelineMetric.getInstanceId());
+    Assert.assertEquals(metric3.getHostName(), null);
+
+    String metricName1 = metadataManager.getMetricNameFromUuid(uuid);
+    Assert.assertEquals(metricName1, 
"regionserver.Server.blockCacheExpressHitPercent");
+    String metricName2 = 
metadataManager.getMetricNameFromUuid(uuidWithoutHost);
+    Assert.assertEquals(metricName2, 
"regionserver.Server.blockCacheExpressHitPercent");
+  }
+
+  @Test
+  public void testWildcardSanitization() throws IOException, SQLException, 
URISyntaxException {
+    // Initialize new manager
+    metadataManager = new TimelineMetricMetadataManager(new Configuration(), 
hdb);
+    final long now = System.currentTimeMillis();
+
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("dummy_m1");
+    metric1.setHostName("dummy_host1");
+    metric1.setStartTime(now - 1000);
+    metric1.setAppId("dummy_app1");
+    metric1.setType("Integer");
+    metric1.setMetricValues(new TreeMap<Long, Double>() {{
+      put(now - 100, 1.0);
+      put(now - 200, 2.0);
+      put(now - 300, 3.0);
+    }});
+    timelineMetrics.getMetrics().add(metric1);
+
+    TimelineMetric metric2 = new TimelineMetric();
+    metric2.setMetricName("dummy_m2");
+    metric2.setHostName("dummy_host2");
+    metric2.setStartTime(now - 1000);
+    metric2.setAppId("dummy_app2");
+    metric2.setType("Integer");
+    metric2.setMetricValues(new TreeMap<Long, Double>() {{
+      put(now - 100, 1.0);
+      put(now - 200, 2.0);
+      put(now - 300, 3.0);
+    }});
+    timelineMetrics.getMetrics().add(metric2);
+
+    TimelineMetric metric3 = new TimelineMetric();
+    metric3.setMetricName("gummy_3");
+    metric3.setHostName("dummy_3h");
+    metric3.setStartTime(now - 1000);
+    metric3.setAppId("dummy_app3");
+    metric3.setType("Integer");
+    metric3.setMetricValues(new TreeMap<Long, Double>() {{
+      put(now - 100, 1.0);
+      put(now - 200, 2.0);
+      put(now - 300, 3.0);
+    }});
+    timelineMetrics.getMetrics().add(metric3);
+
+    Configuration metricsConf = new Configuration();
+    TimelineMetricConfiguration configuration = 
EasyMock.createNiceMock(TimelineMetricConfiguration.class);
+    expect(configuration.getMetricsConf()).andReturn(metricsConf).once();
+    replay(configuration);
+
+    hdb.insertMetricRecordsWithMetadata(metadataManager, timelineMetrics, 
true);
+
+    List<byte[]> uuids = 
metadataManager.getUuids(Collections.singletonList("dummy_m%"),
+      Collections.singletonList("dummy_host2"), "dummy_app1", null);
+    Assert.assertTrue(uuids.size() == 2);
+
+    uuids = metadataManager.getUuids(Collections.singletonList("dummy_m%"),
+      Collections.singletonList("dummy_host%"), "dummy_app2", null);
+    Assert.assertTrue(uuids.size() == 4);
+
+    Collection<String> metrics = Arrays.asList("dummy_m%", "dummy_3", 
"dummy_m2");
+    List<String> hosts = Arrays.asList("dummy_host%", "dummy_3h");
+    uuids = metadataManager.getUuids(metrics, hosts, "dummy_app2", null);
+    Assert.assertTrue(uuids.size() == 9);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataSync.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataSync.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataSync.java
new file mode 100644
index 0000000..80eb89e
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/discovery/TestMetadataSync.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.discovery;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import org.junit.Test;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import static 
org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata.MetricType.GAUGE;
+import static 
org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_METADATA_FILTERS;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class TestMetadataSync {
+  @Test
+  public void testRefreshMetadataOnWrite() throws Exception {
+    Configuration configuration = createNiceMock(Configuration.class);
+    PhoenixHBaseAccessor hBaseAccessor = 
createNiceMock(PhoenixHBaseAccessor.class);
+
+    final TimelineMetricMetadata testMetadata1 = new TimelineMetricMetadata(
+      "m1", "a1", null, "", GAUGE.name(), System.currentTimeMillis(), true, 
false);
+    final TimelineMetricMetadata testMetadata2 = new TimelineMetricMetadata(
+      "m2", "a2", null, "", GAUGE.name(), System.currentTimeMillis(), true, 
false);
+
+    Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata =
+      new HashMap<TimelineMetricMetadataKey, TimelineMetricMetadata>() {{
+        put(new TimelineMetricMetadataKey("m1", "a1", null), testMetadata1);
+        put(new TimelineMetricMetadataKey("m2", "a2", null), testMetadata2);
+      }};
+
+    Map<String, TimelineMetricHostMetadata> hostedApps = new HashMap<String, 
TimelineMetricHostMetadata>() {{
+      put("h1", new TimelineMetricHostMetadata(new 
HashSet<>(Arrays.asList("a1"))));
+      put("h2", new TimelineMetricHostMetadata((new 
HashSet<>(Arrays.asList("a1", "a2")))));
+    }};
+
+    Map<String, Set<String>> hostedInstances = new HashMap<String, 
Set<String>>() {{
+      put("i1", new HashSet<>(Arrays.asList("h1")));
+      put("i2", new HashSet<>(Arrays.asList("h1", "h2")));
+    }};
+
+    
expect(configuration.get("timeline.metrics.service.operation.mode")).andReturn("distributed");
+    expect(hBaseAccessor.getTimelineMetricMetadata()).andReturn(metadata);
+    expect(hBaseAccessor.getHostedAppsMetadata()).andReturn(hostedApps);
+    expect(hBaseAccessor.getInstanceHostsMetdata()).andReturn(hostedInstances);
+
+    replay(configuration, hBaseAccessor);
+
+    TimelineMetricMetadataManager metadataManager = new 
TimelineMetricMetadataManager(configuration, hBaseAccessor);
+
+    metadataManager.metricMetadataSync = new 
TimelineMetricMetadataSync(metadataManager);
+
+    metadataManager.metricMetadataSync.run();
+
+    verify(configuration, hBaseAccessor);
+
+    metadata = metadataManager.getMetadataCache();
+    Assert.assertEquals(2, metadata.size());
+    Assert.assertTrue(metadata.containsKey(new TimelineMetricMetadataKey("m1", 
"a1", null)));
+    Assert.assertTrue(metadata.containsKey(new TimelineMetricMetadataKey("m2", 
"a2", null)));
+
+    hostedApps = metadataManager.getHostedAppsCache();
+    Assert.assertEquals(2, hostedApps.size());
+    Assert.assertEquals(1, hostedApps.get("h1").getHostedApps().size());
+    Assert.assertEquals(2, hostedApps.get("h2").getHostedApps().size());
+
+    hostedInstances = metadataManager.getHostedInstanceCache();
+    Assert.assertEquals(2, hostedInstances.size());
+    Assert.assertEquals(1, hostedInstances.get("i1").size());
+    Assert.assertEquals(2, hostedInstances.get("i2").size());
+
+  }
+
+  @Test
+  public void testFilterByRegexOnMetricName() throws Exception {
+    Configuration configuration = createNiceMock(Configuration.class);
+    PhoenixHBaseAccessor hBaseAccessor = 
createNiceMock(PhoenixHBaseAccessor.class);
+
+    TimelineMetricMetadata metadata1 = new TimelineMetricMetadata(
+      "xxx.abc.yyy", "a1", null, "", GAUGE.name(), System.currentTimeMillis(), 
true, false);
+    TimelineMetricMetadata metadata2 = new TimelineMetricMetadata(
+      "xxx.cdef.yyy", "a2", null, "", GAUGE.name(), 
System.currentTimeMillis(), true, false);
+    TimelineMetricMetadata metadata3 = new TimelineMetricMetadata(
+      "xxx.pqr.zzz", "a3", null, "", GAUGE.name(), System.currentTimeMillis(), 
true, false);
+
+    
expect(configuration.get(TIMELINE_METRIC_METADATA_FILTERS)).andReturn("abc,cde");
+
+    replay(configuration, hBaseAccessor);
+
+    TimelineMetricMetadataManager metadataManager = new 
TimelineMetricMetadataManager(configuration, hBaseAccessor);
+
+    metadataManager.putIfModifiedTimelineMetricMetadata(metadata1);
+    metadataManager.putIfModifiedTimelineMetricMetadata(metadata2);
+    metadataManager.putIfModifiedTimelineMetricMetadata(metadata3);
+
+    verify(configuration, hBaseAccessor);
+
+    Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadata =
+      metadataManager.getMetadataCache();
+
+    Assert.assertEquals(1, metadata.size());
+    Assert.assertEquals("xxx.pqr.zzz", 
metadata.keySet().iterator().next().getMetricName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionTest.java
new file mode 100644
index 0000000..492d5a0
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/function/TimelineMetricsSeriesAggregateFunctionTest.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.function;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TimelineMetricsSeriesAggregateFunctionTest {
+
+  public static final double DELTA = 0.00000000001;
+
+  @Test public void testSeriesAggregateBySummation() throws Exception {
+    TimelineMetrics testMetrics = getTestObject();
+    // all TimelineMetric are having same values
+    TreeMap<Long, Double> metricValues = 
testMetrics.getMetrics().get(0).getMetricValues();
+
+    TimelineMetricsSeriesAggregateFunction function = 
TimelineMetricsSeriesAggregateFunctionFactory
+        .newInstance(SeriesAggregateFunction.SUM);
+    TimelineMetric aggregatedMetric = function.apply(testMetrics);
+
+    String aggregatedMetricName = aggregatedMetric.getMetricName();
+    String aggregatedHostName = aggregatedMetric.getHostName();
+    String aggregatedAppId = aggregatedMetric.getAppId();
+    String aggregatedInstanceId = aggregatedMetric.getInstanceId();
+
+    for (TimelineMetric testMetric : testMetrics.getMetrics()) {
+      assertTrue(aggregatedMetricName.contains(testMetric.getMetricName()));
+      assertTrue(aggregatedHostName.contains(testMetric.getHostName()));
+      if (!testMetric.getMetricName().equals("byte_in.3")) {
+        assertTrue(aggregatedAppId.contains(testMetric.getAppId()));
+        assertTrue(aggregatedInstanceId.contains(testMetric.getInstanceId()));
+      }
+    }
+
+    TreeMap<Long, Double> summationMetricValues = 
aggregatedMetric.getMetricValues();
+    assertEquals(3, summationMetricValues.size());
+    for (Map.Entry<Long, Double> tsAndValue : 
summationMetricValues.entrySet()) {
+      assertEquals(metricValues.get(tsAndValue.getKey()) * 3, 
tsAndValue.getValue(), DELTA);
+    }
+  }
+
+  @Test public void testSeriesAggregateByAverage() throws Exception {
+    TimelineMetrics testMetrics = getTestObject();
+    // all TimelineMetric are having same values
+    TreeMap<Long, Double> metricValues = 
testMetrics.getMetrics().get(0).getMetricValues();
+
+    TimelineMetricsSeriesAggregateFunction function = 
TimelineMetricsSeriesAggregateFunctionFactory
+        .newInstance(SeriesAggregateFunction.AVG);
+    TimelineMetric aggregatedMetric = function.apply(testMetrics);
+
+    // checks only values, others are covered by testSeriesAggregateBySummation
+    TreeMap<Long, Double> averageMetricValues = 
aggregatedMetric.getMetricValues();
+    assertEquals(3, averageMetricValues.size());
+    for (Map.Entry<Long, Double> tsAndValue : averageMetricValues.entrySet()) {
+      assertEquals(metricValues.get(tsAndValue.getKey()), 
tsAndValue.getValue(), DELTA);
+    }
+  }
+
+  @Test public void testSeriesAggregateByMax() throws Exception {
+    TimelineMetrics testMetrics = getTestObject();
+
+    // override metric values
+    TreeMap<Long, Double> metricValues = new TreeMap<>();
+    metricValues.put(1L, 1.0);
+    metricValues.put(2L, 2.0);
+    metricValues.put(3L, 3.0);
+
+    testMetrics.getMetrics().get(0).setMetricValues(metricValues);
+
+    TreeMap<Long, Double> metricValues2 = new TreeMap<>();
+    metricValues2.put(1L, 2.0);
+    metricValues2.put(2L, 4.0);
+    metricValues2.put(3L, 6.0);
+
+    testMetrics.getMetrics().get(1).setMetricValues(metricValues2);
+
+    TreeMap<Long, Double> metricValues3 = new TreeMap<>();
+    metricValues3.put(1L, 3.0);
+    metricValues3.put(2L, 6.0);
+    metricValues3.put(3L, 9.0);
+
+    testMetrics.getMetrics().get(2).setMetricValues(metricValues3);
+
+    TimelineMetricsSeriesAggregateFunction function = 
TimelineMetricsSeriesAggregateFunctionFactory
+        .newInstance(SeriesAggregateFunction.MAX);
+    TimelineMetric aggregatedMetric = function.apply(testMetrics);
+
+    // checks only values, others are covered by testSeriesAggregateBySummation
+    TreeMap<Long, Double> maxMetricValues = aggregatedMetric.getMetricValues();
+    assertEquals(3, maxMetricValues.size());
+    for (Map.Entry<Long, Double> tsAndValue : maxMetricValues.entrySet()) {
+      assertEquals(metricValues3.get(tsAndValue.getKey()), 
tsAndValue.getValue(), DELTA);
+    }
+  }
+
+  @Test public void testSeriesAggregateByMin() throws Exception {
+    TimelineMetrics testMetrics = getTestObject();
+
+    // override metric values
+    TreeMap<Long, Double> metricValues = new TreeMap<>();
+    metricValues.put(1L, 1.0);
+    metricValues.put(2L, 2.0);
+    metricValues.put(3L, 3.0);
+
+    testMetrics.getMetrics().get(0).setMetricValues(metricValues);
+
+    TreeMap<Long, Double> metricValues2 = new TreeMap<>();
+    metricValues2.put(1L, 2.0);
+    metricValues2.put(2L, 4.0);
+    metricValues2.put(3L, 6.0);
+
+    testMetrics.getMetrics().get(1).setMetricValues(metricValues2);
+
+    TreeMap<Long, Double> metricValues3 = new TreeMap<>();
+    metricValues3.put(1L, 3.0);
+    metricValues3.put(2L, 6.0);
+    metricValues3.put(3L, 9.0);
+
+    testMetrics.getMetrics().get(2).setMetricValues(metricValues3);
+
+    TimelineMetricsSeriesAggregateFunction function = 
TimelineMetricsSeriesAggregateFunctionFactory
+        .newInstance(SeriesAggregateFunction.MIN);
+    TimelineMetric aggregatedMetric = function.apply(testMetrics);
+
+    // checks only values, others are covered by testSeriesAggregateBySummation
+    TreeMap<Long, Double> minMetricValues = aggregatedMetric.getMetricValues();
+    assertEquals(3, minMetricValues.size());
+    for (Map.Entry<Long, Double> tsAndValue : minMetricValues.entrySet()) {
+      assertEquals(metricValues.get(tsAndValue.getKey()), 
tsAndValue.getValue(), DELTA);
+    }
+  }
+
+  private TimelineMetrics getTestObject() {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName("byte_in.1");
+    metric.setHostName("host1");
+    metric.setAppId("app1");
+    metric.setInstanceId("instance1");
+
+    TimelineMetric metric2 = new TimelineMetric();
+    metric2.setMetricName("byte_in.2");
+    metric2.setHostName("host2");
+    metric2.setAppId("app2");
+    metric2.setInstanceId("instance2");
+
+    TimelineMetric metric3 = new TimelineMetric();
+    metric3.setMetricName("byte_in.3");
+    metric3.setHostName("host3");
+    // appId and instanceId for metric3 are null
+
+    TreeMap<Long, Double> metricValues = new TreeMap<>();
+    metricValues.put(1L, 3.0);
+    metricValues.put(2L, 2 * 3.0);
+    metricValues.put(3L, 3 * 3.0);
+
+    metric.setMetricValues(metricValues);
+    metric2.setMetricValues(metricValues);
+    metric3.setMetricValues(metricValues);
+
+    TimelineMetrics metrics = new TimelineMetrics();
+    metrics.setMetrics(Lists.newArrayList(metric, metric2, metric3));
+
+    return metrics;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/query/DefaultConditionTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/query/DefaultConditionTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/query/DefaultConditionTest.java
new file mode 100644
index 0000000..7d7e0f7
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/query/DefaultConditionTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline.query;
+
+import junit.framework.Assert;
+import org.apache.commons.collections.CollectionUtils;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DefaultConditionTest {
+
+  @Test
+  public void testMetricNameWhereCondition() {
+    //EMPTY
+    List<byte[]> uuids = new ArrayList<>();
+    DefaultCondition condition = new 
DefaultCondition(uuids,null,null,null,null,null,null,null,null,true);
+    StringBuilder sb = new StringBuilder();
+    condition.appendUuidClause(sb);
+    Assert.assertEquals(sb.toString(), "");
+    Assert.assertTrue(CollectionUtils.isEqualCollection(uuids, 
condition.getUuids()));
+
+    //Metric uuid
+    uuids.add(new byte[16]);
+    condition = new 
DefaultCondition(uuids,null,null,null,null,null,null,null,null,true);
+    sb = new StringBuilder();
+    condition.appendUuidClause(sb);
+    Assert.assertEquals(sb.toString(), "(UUID LIKE ?)");
+    Assert.assertEquals(uuids.size(), condition.getUuids().size());
+    Assert.assertTrue(new String(condition.getUuids().get(0)).endsWith("%"));
+
+    //metric uuid + Host uuid
+    uuids.add(new byte[4]);
+    condition = new 
DefaultCondition(uuids,null,null,null,null,null,null,null,null,true);
+    sb = new StringBuilder();
+    condition.appendUuidClause(sb);
+    Assert.assertEquals(sb.toString(), "(UUID LIKE ? AND UUID LIKE ?)");
+    Assert.assertEquals(uuids.size(), condition.getUuids().size());
+    Assert.assertTrue(new String(condition.getUuids().get(1)).startsWith("%"));
+
+    //metric + host + full
+    uuids.add(new byte[20]);
+    uuids.add(new byte[20]);
+    condition = new 
DefaultCondition(uuids,null,null,null,null,null,null,null,null,true);
+    sb = new StringBuilder();
+    condition.appendUuidClause(sb);
+    Assert.assertEquals(sb.toString(), "(UUID IN (?, ?) AND UUID LIKE ? AND 
UUID LIKE ?)");
+    Assert.assertEquals(uuids.size(), condition.getUuids().size());
+
+    //Only IN clause.
+    uuids.clear();
+    uuids.add(new byte[20]);
+    condition = new 
DefaultCondition(uuids,null,null,null,null,null,null,null,null,true);
+    sb = new StringBuilder();
+    condition.appendUuidClause(sb);
+    Assert.assertEquals(sb.toString(), "(UUID IN (?))");
+    Assert.assertEquals(uuids.size(), condition.getUuids().size());
+
+    //metric NOT LIKE
+    uuids.clear();
+    uuids.add(new byte[16]);
+    condition = new 
DefaultCondition(uuids,null,null,null,null,null,null,null,null,true);
+    sb = new StringBuilder();
+    condition.setMetricNamesNotCondition(true);
+    condition.appendUuidClause(sb);
+    Assert.assertEquals(sb.toString(), "(UUID NOT LIKE ?)");
+    Assert.assertEquals(uuids.size(), condition.getUuids().size());
+
+    //metric NOT LIKE host LIKE
+    uuids.clear();
+    uuids.add(new byte[16]);
+    uuids.add(new byte[4]);
+    condition = new 
DefaultCondition(uuids,null,null,null,null,null,null,null,null,true);
+    sb = new StringBuilder();
+    condition.setMetricNamesNotCondition(true);
+    condition.appendUuidClause(sb);
+    Assert.assertEquals(sb.toString(), "(UUID NOT LIKE ? AND UUID LIKE ?)");
+    Assert.assertEquals(uuids.size(), condition.getUuids().size());
+    Assert.assertTrue(new String(condition.getUuids().get(0)).endsWith("%"));
+    Assert.assertTrue(new String(condition.getUuids().get(1)).startsWith("%"));
+
+    //metric LIKE host NOT LIKE
+    uuids.clear();
+    uuids.add(new byte[16]);
+    uuids.add(new byte[4]);
+    condition = new 
DefaultCondition(uuids,null,null,null,null,null,null,null,null,true);
+    sb = new StringBuilder();
+    condition.setHostnamesNotCondition(true);
+    condition.appendUuidClause(sb);
+    Assert.assertEquals(sb.toString(), "(UUID LIKE ? AND UUID NOT LIKE ?)");
+    Assert.assertEquals(uuids.size(), condition.getUuids().size());
+    Assert.assertTrue(new String(condition.getUuids().get(0)).endsWith("%"));
+    Assert.assertTrue(new String(condition.getUuids().get(1)).startsWith("%"));
+
+    //metric LIKE or LIKE host LIKE
+    uuids.clear();
+    uuids.add(new byte[4]);
+    uuids.add(new byte[16]);
+    uuids.add(new byte[16]);
+    condition = new 
DefaultCondition(uuids,null,null,null,null,null,null,null,null,true);
+    sb = new StringBuilder();
+    condition.appendUuidClause(sb);
+    Assert.assertEquals(sb.toString(), "((UUID LIKE ? OR UUID LIKE ?) AND UUID 
LIKE ?)");
+    Assert.assertEquals(uuids.size(), condition.getUuids().size());
+    Assert.assertTrue(new String(condition.getUuids().get(0)).endsWith("%"));
+    Assert.assertTrue(new String(condition.getUuids().get(1)).endsWith("%"));
+    Assert.assertTrue(new String(condition.getUuids().get(2)).startsWith("%"));
+
+    //UUID in metric LIKE or LIKE host LIKE
+    uuids.clear();
+    uuids.add(new byte[16]);
+    uuids.add(new byte[16]);
+    uuids.add(new byte[20]);
+    uuids.add(new byte[4]);
+    condition = new 
DefaultCondition(uuids,null,null,null,null,null,null,null,null,true);
+    sb = new StringBuilder();
+    condition.appendUuidClause(sb);
+    Assert.assertEquals(sb.toString(), "(UUID IN (?) AND (UUID LIKE ? OR UUID 
LIKE ?) AND UUID LIKE ?)");
+    Assert.assertEquals(uuids.size(), condition.getUuids().size());
+    Assert.assertTrue(new String(condition.getUuids().get(1)).endsWith("%"));
+    Assert.assertTrue(new String(condition.getUuids().get(2)).endsWith("%"));
+    Assert.assertTrue(new String(condition.getUuids().get(3)).startsWith("%"));
+
+    //metric LIKE host LIKE or LIKE
+    uuids.clear();
+    uuids.add(new byte[16]);
+    uuids.add(new byte[4]);
+    uuids.add(new byte[4]);
+    condition = new 
DefaultCondition(uuids,null,null,null,null,null,null,null,null,true);
+    sb = new StringBuilder();
+    condition.appendUuidClause(sb);
+    Assert.assertEquals(sb.toString(), "(UUID LIKE ? AND (UUID LIKE ? OR UUID 
LIKE ?))");
+    Assert.assertEquals(uuids.size(), condition.getUuids().size());
+    Assert.assertTrue(new String(condition.getUuids().get(0)).endsWith("%"));
+    Assert.assertTrue(new String(condition.getUuids().get(1)).startsWith("%"));
+    Assert.assertTrue(new String(condition.getUuids().get(2)).startsWith("%"));
+
+    //UUID NOT IN metric LIKE host LIKE
+    uuids.clear();
+    uuids.add(new byte[20]);
+    uuids.add(new byte[16]);
+    uuids.add(new byte[4]);
+    condition = new 
DefaultCondition(uuids,null,null,null,null,null,null,null,null,true);
+    sb = new StringBuilder();
+    condition.setUuidNotCondition(true);
+    condition.appendUuidClause(sb);
+    Assert.assertEquals(sb.toString(), "(UUID NOT IN (?) AND UUID LIKE ? AND 
UUID LIKE ?)");
+    Assert.assertEquals(uuids.size(), condition.getUuids().size());
+    Assert.assertTrue(new String(condition.getUuids().get(1)).endsWith("%"));
+    Assert.assertTrue(new String(condition.getUuids().get(2)).startsWith("%"));
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSourceTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSourceTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSourceTest.java
new file mode 100644
index 0000000..7d58682
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/source/RawMetricsSourceTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.source;
+
+import static 
org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import org.apache.ambari.metrics.core.timeline.sink.ExternalMetricsSink;
+import org.easymock.Capture;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import junit.framework.Assert;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(TimelineMetricConfiguration.class)
+public class RawMetricsSourceTest {
+
+  @Before
+  public void setupConf() throws Exception {
+    TimelineMetricConfiguration conf = new TimelineMetricConfiguration(new
+      Configuration(), new Configuration());
+    mockStatic(TimelineMetricConfiguration.class);
+    
expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
+    replayAll();
+  }
+
+  @Test
+  public void testRawMetricsSourcedAtFlushInterval() throws Exception {
+    InternalSourceProvider internalSourceProvider = new 
DefaultInternalMetricsSourceProvider();
+    ExternalMetricsSink rawMetricsSink = 
createNiceMock(ExternalMetricsSink.class);
+    expect(rawMetricsSink.getFlushSeconds()).andReturn(1);
+    expect(rawMetricsSink.getSinkTimeOutSeconds()).andReturn(1);
+    Capture<Collection<TimelineMetrics>> metricsCapture = new Capture<>();
+    rawMetricsSink.sinkMetricData(capture(metricsCapture));
+    expectLastCall();
+    replay(rawMetricsSink);
+
+    InternalMetricsSource rawMetricsSource = 
internalSourceProvider.getInternalMetricsSource(RAW_METRICS, 1, rawMetricsSink);
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+    final long now = System.currentTimeMillis();
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("m1");
+    metric1.setAppId("a1");
+    metric1.setInstanceId("i1");
+    metric1.setHostName("h1");
+    metric1.setStartTime(now - 200);
+    metric1.setMetricValues(new TreeMap<Long, Double>() {{
+      put(now - 100, 1.0);
+      put(now - 200, 2.0);
+    }});
+    timelineMetrics.getMetrics().add(metric1);
+
+    
rawMetricsSource.publishTimelineMetrics(Collections.singletonList(timelineMetrics));
+
+    verify(rawMetricsSink);
+  }
+
+  @Test(timeout = 10000)
+  public void testRawMetricsCachedAndSourced() throws Exception {
+    ExternalMetricsSink rawMetricsSink = 
createNiceMock(ExternalMetricsSink.class);
+    expect(rawMetricsSink.getFlushSeconds()).andReturn(2).anyTimes();
+    expect(rawMetricsSink.getSinkTimeOutSeconds()).andReturn(1).anyTimes();
+
+    class CaptureOnce<T> extends Capture<T> {
+      @Override
+      public void setValue(T value) {
+        if (!hasCaptured()) {
+          super.setValue(value);
+        }
+      }
+    }
+    Capture<Collection<TimelineMetrics>> metricsCapture = new CaptureOnce<>();
+
+    rawMetricsSink.sinkMetricData(capture(metricsCapture));
+    expectLastCall();
+    replay(rawMetricsSink);
+
+    InternalSourceProvider internalSourceProvider = new 
DefaultInternalMetricsSourceProvider();
+    InternalMetricsSource rawMetricsSource = 
internalSourceProvider.getInternalMetricsSource(RAW_METRICS, 1, rawMetricsSink);
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+
+    final long now = System.currentTimeMillis();
+    TimelineMetric metric1 = new TimelineMetric();
+    metric1.setMetricName("m1");
+    metric1.setAppId("a1");
+    metric1.setInstanceId("i1");
+    metric1.setHostName("h1");
+    metric1.setStartTime(now - 200);
+    metric1.setMetricValues(new TreeMap<Long, Double>() {{
+      put(now - 100, 1.0);
+      put(now - 200, 2.0);
+    }});
+    timelineMetrics.getMetrics().add(metric1);
+
+    
rawMetricsSource.publishTimelineMetrics(Collections.singletonList(timelineMetrics));
+
+    // Wait on eviction
+    Thread.sleep(5000);
+
+    verify(rawMetricsSink);
+
+    Assert.assertTrue(metricsCapture.hasCaptured());
+    
Assert.assertTrue(metricsCapture.getValue().iterator().next().getMetrics().iterator().next().equals(metric1));
+  }
+
+}

Reply via email to