http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsServiceTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsServiceTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsServiceTest.java
new file mode 100644
index 0000000..514bf86
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsServiceTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import com.google.common.collect.Multimap;
+import junit.framework.Assert;
+import org.apache.ambari.metrics.core.timeline.aggregators.Function;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static 
org.apache.ambari.metrics.core.timeline.aggregators.Function.ReadFunction.AVG;
+import static 
org.apache.ambari.metrics.core.timeline.aggregators.Function.ReadFunction.SUM;
+import static 
org.apache.ambari.metrics.core.timeline.aggregators.Function.PostProcessingFunction.RATE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class HBaseTimelineMetricsServiceTest {
+
+  public static final String MEM_METRIC = "mem";
+  public static final String BYTES_IN_METRIC = "bytes_in";
+  public static final String BYTES_NOT_AFUNCTION_METRIC = 
"bytes._not._afunction";
+
+  @Test
+  public void testParseMetricNamesToAggregationFunctions() throws Exception {
+    //giwen
+    List<String> metricNames = Arrays.asList(
+      MEM_METRIC + "._avg",
+      MEM_METRIC + "._sum",
+      MEM_METRIC + "._rate._avg",
+      BYTES_IN_METRIC,
+      BYTES_NOT_AFUNCTION_METRIC);
+
+    //when
+    Multimap<String, List<Function>> multimap =
+      
HBaseTimelineMetricsService.parseMetricNamesToAggregationFunctions(metricNames);
+
+    //then
+    Assert.assertEquals(multimap.keySet().size(), 3);
+    Assert.assertTrue(multimap.containsKey(MEM_METRIC));
+    Assert.assertTrue(multimap.containsKey(BYTES_IN_METRIC));
+    Assert.assertTrue(multimap.containsKey(BYTES_NOT_AFUNCTION_METRIC));
+
+    List<List<Function>> metricEntry = (List<List<Function>>) 
multimap.get(MEM_METRIC);
+    HashMap<String, List<Function>> mfm = new HashMap<String, 
List<Function>>();
+    mfm.put(MEM_METRIC, metricEntry.get(0));
+
+    assertThat(mfm.get(MEM_METRIC)).containsOnly(
+      new Function(AVG, null));
+
+    mfm = new HashMap<String, List<Function>>();
+    mfm.put(MEM_METRIC, metricEntry.get(1));
+    assertThat(mfm.get(MEM_METRIC)).containsOnly(
+      new Function(SUM, null));
+
+    mfm = new HashMap<String, List<Function>>();
+    mfm.put(MEM_METRIC, metricEntry.get(2));
+    assertThat(mfm.get(MEM_METRIC)).containsOnly(
+      new Function(AVG, RATE));
+
+    metricEntry = (List<List<Function>>) multimap.get(BYTES_IN_METRIC);
+    mfm = new HashMap<String, List<Function>>();
+    mfm.put(BYTES_IN_METRIC, metricEntry.get(0));
+
+    assertThat(mfm.get(BYTES_IN_METRIC))
+      .contains(Function.DEFAULT_VALUE_FUNCTION);
+
+    metricEntry = (List<List<Function>>) 
multimap.get(BYTES_NOT_AFUNCTION_METRIC);
+    mfm = new HashMap<String, List<Function>>();
+    mfm.put(BYTES_NOT_AFUNCTION_METRIC, metricEntry.get(0));
+
+    assertThat(mfm.get(BYTES_NOT_AFUNCTION_METRIC))
+      .contains(Function.DEFAULT_VALUE_FUNCTION);
+
+  }
+
+  @Test
+  public void testRateCalculationOnMetricsWithEqualValues() throws Exception {
+    Map<Long, Double> metricValues = new TreeMap<>();
+    metricValues.put(1454000000000L, 1.0);
+    metricValues.put(1454000001000L, 6.0);
+    metricValues.put(1454000002000L, 0.0);
+    metricValues.put(1454000003000L, 3.0);
+    metricValues.put(1454000004000L, 4.0);
+    metricValues.put(1454000005000L, 7.0);
+
+    // Calculate rate
+    Map<Long, Double> rates = 
HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), 
false);
+
+    // Make sure rate is zero
+    Assert.assertTrue(rates.size() == 4);
+
+    Assert.assertFalse(rates.containsKey(1454000000000L));
+    Assert.assertFalse(rates.containsKey(1454000002000L));
+
+    Assert.assertEquals(rates.get(1454000001000L), 5.0);
+    Assert.assertEquals(rates.get(1454000003000L), 3.0);
+    Assert.assertEquals(rates.get(1454000004000L), 1.0);
+    Assert.assertEquals(rates.get(1454000005000L), 3.0);
+  }
+
+  @Test
+  public void testDiffCalculation() throws Exception {
+    Map<Long, Double> metricValues = new TreeMap<>();
+    metricValues.put(1454016368371L, 1011.25);
+    metricValues.put(1454016428371L, 1010.25);
+    metricValues.put(1454016488371L, 1012.25);
+    metricValues.put(1454016548371L, 1015.25);
+    metricValues.put(1454016608371L, 1020.25);
+
+    Map<Long, Double> rates = 
HBaseTimelineMetricsService.updateValuesAsRate(new TreeMap<>(metricValues), 
true);
+
+    Assert.assertTrue(rates.size() == 3);
+    Assert.assertTrue(rates.containsValue(2.0));
+    Assert.assertTrue(rates.containsValue(3.0));
+    Assert.assertTrue(rates.containsValue(5.0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
new file mode 100644
index 0000000..d4171bc
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
@@ -0,0 +1,472 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static 
org.apache.ambari.metrics.core.timeline.MetricTestHelper.createEmptyTimelineClusterMetric;
+import static 
org.apache.ambari.metrics.core.timeline.MetricTestHelper.prepareSingleTimelineMetric;
+import static 
org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.DATE_TIERED_COMPACTION_POLICY;
+import static 
org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.FIFO_COMPACTION_POLICY_CLASS;
+import static 
org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.HSTORE_COMPACTION_CLASS_KEY;
+import static 
org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor.HSTORE_ENGINE_CLASS;
+import static 
org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static 
org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+import static 
org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
+import static 
org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES_REGEX_PATTERN_STRING;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.ambari.metrics.core.timeline.aggregators.Function;
+import 
org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import 
org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregator;
+import 
org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricAggregatorFactory;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
+import org.junit.Test;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+import junit.framework.Assert;
+
+
+
+public class ITPhoenixHBaseAccessor extends AbstractMiniHBaseClusterTest {
+
+  @Test
+  public void testGetMetricRecordsSeconds() throws IOException, SQLException {
+    // GIVEN
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+    
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, 
"local1",
+      "disk_free", 1), true);
+    
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, 
"local2",
+      "disk_free", 2), true);
+    ctime += minute;
+    
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, 
"local1",
+      "disk_free", 2), true);
+    
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, 
"local2",
+      "disk_free", 1), true);
+
+    // WHEN
+    long endTime = ctime + minute;
+    List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ 
add("disk_free"); }},
+      Collections.singletonList("local1"),
+      "host", null);
+
+    Condition condition = new DefaultCondition(uuids,
+      new ArrayList<String>() {{ add("disk_free"); }},
+      Collections.singletonList("local1"),
+      "host", null, startTime, endTime, Precision.SECONDS, null, true);
+    TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
+      singletonValueFunctionMap("disk_free"));
+
+    //THEN
+    assertEquals(1, timelineMetrics.getMetrics().size());
+    TimelineMetric metric = timelineMetrics.getMetrics().get(0);
+
+    assertEquals("disk_free", metric.getMetricName());
+    assertEquals("local1", metric.getHostName());
+    assertEquals(8, metric.getMetricValues().size());
+  }
+
+  @Test
+  public void testGetMetricRecordsMinutes() throws IOException, SQLException {
+    // GIVEN
+    TimelineMetricAggregator aggregatorMinute =
+      
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hdb, new 
Configuration(), metadataManager, null);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+
+    TimelineMetrics metrics1 = 
MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
+      "disk_free", 1);
+    hdb.insertMetricRecords(metrics1, true);
+
+    TimelineMetrics metrics2 = 
MetricTestHelper.prepareSingleTimelineMetric(ctime + minute, "local1",
+      "disk_free", 2);
+    hdb.insertMetricRecords(metrics2, true);
+
+    
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, 
"local2",
+      "disk_free", 2), true);
+    long endTime = ctime + minute;
+    boolean success = aggregatorMinute.doWork(startTime - 1000, endTime);
+    assertTrue(success);
+
+    // WHEN
+    List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ 
add("disk_%"); }},
+      Collections.singletonList("local1"),
+      "host", null);
+    Condition condition = new DefaultCondition(uuids,
+      new ArrayList<String>() {{ add("disk_free"); }},
+      Collections.singletonList("local1"),
+      "host", null, startTime, endTime + 1000, Precision.MINUTES, null, false);
+    TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
+      singletonValueFunctionMap("disk_free"));
+
+    //THEN
+    assertEquals(1, timelineMetrics.getMetrics().size());
+    TimelineMetric metric = timelineMetrics.getMetrics().get(0);
+
+    assertEquals("disk_free", metric.getMetricName());
+    assertEquals("local1", metric.getHostName());
+    assertEquals(1, metric.getMetricValues().size());
+    Iterator<Map.Entry<Long, Double>> iterator = 
metric.getMetricValues().entrySet().iterator();
+    assertEquals(1.5, iterator.next().getValue(), 0.00001);
+  }
+
+  @Test
+  public void testGetMetricRecordsHours() throws IOException, SQLException {
+    // GIVEN
+    TimelineMetricAggregator aggregator =
+      
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hdb, new 
Configuration(), metadataManager, null);
+
+    MetricHostAggregate expectedAggregate =
+      MetricTestHelper.createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+    Map<TimelineMetric, MetricHostAggregate>
+      aggMap = new HashMap<TimelineMetric,
+      MetricHostAggregate>();
+
+    long startTime = System.currentTimeMillis();
+    int min_5 = 5 * 60 * 1000;
+    long ctime = startTime - min_5;
+    aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), 
expectedAggregate);
+    aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), 
expectedAggregate);
+    aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), 
expectedAggregate);
+    aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), 
expectedAggregate);
+    aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), 
expectedAggregate);
+    aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), 
expectedAggregate);
+    aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), 
expectedAggregate);
+    aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), 
expectedAggregate);
+    aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), 
expectedAggregate);
+    aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), 
expectedAggregate);
+    aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), 
expectedAggregate);
+    aggMap.put(MetricTestHelper.createEmptyTimelineMetric(ctime += min_5), 
expectedAggregate);
+
+    hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+    long endTime = ctime + min_5;
+    boolean success = aggregator.doWork(startTime - 1000, endTime);
+    assertTrue(success);
+
+    // WHEN
+    List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ 
add("disk_used"); }},
+      Collections.singletonList("test_host"),
+      "test_app", "test_instance");
+
+    Condition condition = new DefaultCondition(uuids,
+      new ArrayList<String>() {{ add("disk_used"); }},
+      Collections.singletonList("test_host"),
+      "test_app", "test_instance", startTime, endTime + 1000, Precision.HOURS, 
null, true);
+    TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
+      singletonValueFunctionMap("disk_used"));
+
+    //THEN
+    assertEquals(1, timelineMetrics.getMetrics().size());
+    TimelineMetric metric = timelineMetrics.getMetrics().get(0);
+
+    assertEquals("disk_used", metric.getMetricName());
+    assertEquals("test_host", metric.getHostName());
+    assertEquals(1, metric.getMetricValues().size());
+    Iterator<Map.Entry<Long, Double>> iterator = 
metric.getMetricValues().entrySet().iterator();
+    assertEquals(0.75, iterator.next().getValue(), 0.00001);
+  }
+
+  @Test
+  public void testGetClusterMetricRecordsSeconds() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
+        hdb, new Configuration(), metadataManager, null, null);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime + 1;
+    long minute = 60 * 1000;
+    
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, 
"local1",
+      "disk_free", 1), true);
+    
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, 
"local2",
+      "disk_free", 2), true);
+    ctime += minute;
+    
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, 
"local1",
+      "disk_free", 2), true);
+    
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, 
"local2",
+      "disk_free", 1), true);
+
+    long endTime = ctime + minute + 1;
+    boolean success = agg.doWork(startTime, endTime);
+    assertTrue(success);
+
+    // WHEN
+    List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ 
add("disk_free"); }},
+      null,
+      "host", null);
+
+    Condition condition = new DefaultCondition(uuids,
+      new ArrayList<String>() {{ add("disk_free"); }},
+      null, "host", null, startTime - 90000, endTime, Precision.SECONDS, null, 
true);
+    TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition,
+      singletonValueFunctionMap("disk_free"));
+
+    //THEN
+    assertEquals(1, timelineMetrics.getMetrics().size());
+    TimelineMetric metric = timelineMetrics.getMetrics().get(0);
+
+    assertEquals("disk_free", metric.getMetricName());
+    assertEquals(5, metric.getMetricValues().size());
+    assertEquals(1.5, metric.getMetricValues().values().iterator().next(), 
0.00001);
+  }
+
+  @Test
+  public void testGetClusterMetricRecordLatestWithFunction() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hdb,
+        new Configuration(), metadataManager, null, null);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime + 1;
+    long minute = 60 * 1000;
+    
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, 
"local1",
+      "disk_free", 1), true);
+    
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, 
"local2",
+      "disk_free", 2), true);
+    ctime += minute;
+    
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, 
"local1",
+      "disk_free", 2), true);
+    
hdb.insertMetricRecords(MetricTestHelper.prepareSingleTimelineMetric(ctime, 
"local2",
+      "disk_free", 1), true);
+
+    long endTime = ctime + minute + 1;
+    boolean success = agg.doWork(startTime, endTime);
+    assertTrue(success);
+
+    // WHEN
+    List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ 
add("disk_free"); }},
+      null,
+      "host", null);
+
+    Condition condition = new DefaultCondition(uuids,
+      new ArrayList<String>() {{ add("disk_free"); }},
+      null, "host", null, null, null, Precision.SECONDS, null, true);
+
+    Multimap<String, List<Function>> mmap = ArrayListMultimap.create();
+    mmap.put("disk_free", Collections.singletonList(new 
Function(Function.ReadFunction.SUM, null)));
+    TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition, 
mmap);
+
+    //THEN
+    assertEquals(1, timelineMetrics.getMetrics().size());
+    TimelineMetric metric = timelineMetrics.getMetrics().get(0);
+
+    assertEquals("disk_free._sum", metric.getMetricName());
+    assertEquals(1, metric.getMetricValues().size());
+    assertEquals(3, 
metric.getMetricValues().values().iterator().next().intValue());
+  }
+
+  @Test
+  public void testGetClusterMetricRecordsHours() throws Exception {
+    // GIVEN
+    TimelineMetricAggregator agg =
+      
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hdb, new 
Configuration(), metadataManager, null);
+
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+
+    Map<TimelineClusterMetric, MetricClusterAggregate> records =
+      new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += 
minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += 
minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+    records.put(MetricTestHelper.createEmptyTimelineClusterMetric(ctime += 
minute),
+      new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+    hdb.saveClusterAggregateRecords(records);
+    boolean success = agg.doWork(startTime, ctime + minute);
+    assertTrue(success);
+
+    // WHEN
+    List<byte[]> uuids = metadataManager.getUuids(new ArrayList<String>() {{ 
add("disk_used"); }},
+      null,
+      "test_app",
+      "instance_id");
+
+    Condition condition = new DefaultCondition( uuids,
+      new ArrayList<String>() {{ add("disk_used"); }},
+      null,
+      "test_app",
+      "instance_id",
+      startTime,
+      ctime + minute + 1000,
+      Precision.HOURS,
+      null,
+      true);
+    TimelineMetrics timelineMetrics = hdb.getAggregateMetricRecords(condition,
+      singletonValueFunctionMap("disk_used"));
+
+    // THEN
+    assertEquals(1, timelineMetrics.getMetrics().size());
+    TimelineMetric metric = timelineMetrics.getMetrics().get(0);
+
+    assertEquals("disk_used", metric.getMetricName());
+    assertEquals("test_app", metric.getAppId());
+    assertEquals(1, metric.getMetricValues().size());
+    assertEquals(2.0, metric.getMetricValues().values().iterator().next(), 
0.00001);
+  }
+
+  @Test
+  public void testInitPoliciesAndTTL() throws Exception {
+    Admin hBaseAdmin = hdb.getHBaseAdmin();
+    int precisionTtl = 2 * 86400;
+
+    Field f = PhoenixHBaseAccessor.class.getDeclaredField("tableTTL");
+    f.setAccessible(true);
+    Map<String, Integer> precisionValues = (Map<String, Integer>) f.get(hdb);
+    precisionValues.put(METRICS_RECORD_TABLE_NAME, precisionTtl);
+    f.set(hdb, precisionValues);
+
+    Field f2 = 
PhoenixHBaseAccessor.class.getDeclaredField("timelineMetricsTablesDurability");
+    f2.setAccessible(true);
+    f2.set(hdb, "ASYNC_WAL");
+
+    hdb.initPoliciesAndTTL();
+
+    // Verify expected policies are set
+    boolean normalizerEnabled = false;
+    String precisionTableCompactionPolicy = null;
+    String aggregateTableCompactionPolicy = null;
+    boolean tableDurabilitySet  = false;
+    for (int i = 0; i < 10; i++) {
+      LOG.warn("Policy check retry : " + i);
+      for (String tableName : PHOENIX_TABLES) {
+        TableName[] tableNames = 
hBaseAdmin.listTableNames(PHOENIX_TABLES_REGEX_PATTERN_STRING, false);
+        Optional<TableName> tableNameOptional = Arrays.stream(tableNames)
+          .filter(t -> tableName.equals(t.getNameAsString())).findFirst();
+
+        TableDescriptor tableDescriptor = 
hBaseAdmin.getTableDescriptor(tableNameOptional.get());
+        
+        normalizerEnabled = tableDescriptor.isNormalizationEnabled();
+        tableDurabilitySet = 
(Durability.ASYNC_WAL.equals(tableDescriptor.getDurability()));
+        if (tableName.equals(METRICS_RECORD_TABLE_NAME)) {
+          precisionTableCompactionPolicy = 
tableDescriptor.getValue(HSTORE_COMPACTION_CLASS_KEY);
+        } else {
+          aggregateTableCompactionPolicy = 
tableDescriptor.getValue(HSTORE_ENGINE_CLASS);
+        }
+        LOG.debug("Table: " + tableName + ", normalizerEnabled = " + 
normalizerEnabled);
+        // Best effort for 20 seconds
+        if (normalizerEnabled || (precisionTableCompactionPolicy == null && 
aggregateTableCompactionPolicy == null)) {
+          Thread.sleep(2000l);
+        }
+        if (tableName.equals(METRICS_RECORD_TABLE_NAME)) {
+          for (ColumnFamilyDescriptor family : 
tableDescriptor.getColumnFamilies()) {
+            precisionTtl = family.getTimeToLive();
+          }
+        }
+      }
+    }
+
+    Assert.assertFalse("Normalizer disabled.", normalizerEnabled);
+    Assert.assertTrue("Durability Set.", tableDurabilitySet);
+    Assert.assertEquals("FIFO compaction policy is set for METRIC_RECORD.", 
FIFO_COMPACTION_POLICY_CLASS, precisionTableCompactionPolicy);
+    Assert.assertEquals("FIFO compaction policy is set for aggregate tables", 
DATE_TIERED_COMPACTION_POLICY, aggregateTableCompactionPolicy);
+    Assert.assertEquals("Precision TTL value as expected.", 86400, 
precisionTtl);
+
+    hBaseAdmin.close();
+  }
+
+  private Multimap<String, List<Function>> singletonValueFunctionMap(String 
metricName) {
+    Multimap<String, List<Function>> mmap = ArrayListMultimap.create();
+    mmap.put(metricName, Collections.singletonList(new Function()));
+    return mmap;
+  }
+
+  @Test
+  public void testInsertContainerMetrics() throws Exception {
+    ContainerMetric metric = new ContainerMetric();
+    metric.setContainerId("container_1450744875949_0001_01_000001");
+    metric.setHostName("host1");
+    metric.setPmemLimit(2048);
+    metric.setVmemLimit(2048);
+    metric.setPmemUsedAvg(1024);
+    metric.setPmemUsedMin(1024);
+    metric.setPmemUsedMax(1024);
+    metric.setLaunchDuration(2000);
+    metric.setLocalizationDuration(3000);
+    long startTime = System.currentTimeMillis();
+    long finishTime = startTime + 5000;
+    metric.setStartTime(startTime);
+    metric.setFinishTime(finishTime);
+    metric.setExitCode(0);
+    List<ContainerMetric> list = Arrays.asList(metric);
+    hdb.insertContainerMetrics(list);
+    PreparedStatement stmt = conn.prepareStatement("SELECT * FROM 
CONTAINER_METRICS");
+    ResultSet set = stmt.executeQuery();
+    // check each filed is set properly when read back.
+    boolean foundRecord = false;
+    while (set.next()) {
+      assertEquals("application_1450744875949_0001", set.getString("APP_ID"));
+      assertEquals("container_1450744875949_0001_01_000001", 
set.getString("CONTAINER_ID"));
+      assertEquals(new java.sql.Timestamp(startTime), 
set.getTimestamp("START_TIME"));
+      assertEquals(new java.sql.Timestamp(finishTime), 
set.getTimestamp("FINISH_TIME"));
+      assertEquals(5000, set.getLong("DURATION"));
+      assertEquals("host1", set.getString("HOSTNAME"));
+      assertEquals(0, set.getInt("EXIT_CODE"));
+      assertEquals(3000, set.getLong("LOCALIZATION_DURATION"));
+      assertEquals(2000, set.getLong("LAUNCH_DURATION"));
+      assertEquals((double)2, set.getDouble("MEM_REQUESTED_GB"));
+      assertEquals((double)2 * 5000, set.getDouble("MEM_REQUESTED_GB_MILLIS"));
+      assertEquals((double)2, set.getDouble("MEM_VIRTUAL_GB"));
+      assertEquals((double)1, set.getDouble("MEM_USED_GB_MIN"));
+      assertEquals((double)1, set.getDouble("MEM_USED_GB_MAX"));
+      assertEquals((double)1, set.getDouble("MEM_USED_GB_AVG"));
+      assertEquals((double)(2 - 1), set.getDouble("MEM_UNUSED_GB"));
+      assertEquals((double)(2-1) * 5000, 
set.getDouble("MEM_UNUSED_GB_MILLIS"));
+      foundRecord = true;
+    }
+    assertTrue(foundRecord);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/MetricTestHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/MetricTestHelper.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/MetricTestHelper.java
new file mode 100644
index 0000000..a99d488
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/MetricTestHelper.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import 
org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+
+import java.util.Arrays;
+import java.util.TreeMap;
+
+public class MetricTestHelper {
+
+  public static MetricHostAggregate createMetricHostAggregate(double max, 
double min, int numberOfSamples, double sum) {
+    MetricHostAggregate expectedAggregate = new MetricHostAggregate();
+    expectedAggregate.setMax(max);
+    expectedAggregate.setMin(min);
+    expectedAggregate.setNumberOfSamples(numberOfSamples);
+    expectedAggregate.setSum(sum);
+
+    return expectedAggregate;
+  }
+
+  public static TimelineMetrics prepareSingleTimelineMetric(long startTime,
+                                                        String host,
+                                                        String metricName,
+                                                        double val) {
+    return prepareSingleTimelineMetric(startTime, host, null, metricName, val);
+  }
+
+  public static TimelineMetrics prepareSingleTimelineMetric(long startTime,
+                                                        String host,
+                                                        String instanceId,
+                                                        String metricName,
+                                                        double val) {
+    TimelineMetrics m = new TimelineMetrics();
+    m.setMetrics(Arrays.asList(
+        createTimelineMetric(startTime, metricName, host, null, instanceId, 
val)));
+
+    return m;
+  }
+
+  public static TimelineMetrics prepareSingleTimelineMetric(long startTime,
+                                                            String host,
+                                                            String appId,
+                                                            String instanceId,
+                                                            String metricName,
+                                                            double val) {
+    TimelineMetrics m = new TimelineMetrics();
+    m.setMetrics(Arrays.asList(
+      createTimelineMetric(startTime, metricName, host, appId, instanceId, 
val)));
+
+    return m;
+  }
+
+
+  public static TimelineMetric createTimelineMetric(long startTime,
+                                                String metricName,
+                                                String host,
+                                                String appId,
+                                                String instanceId,
+                                                double val) {
+    TimelineMetric m = new TimelineMetric();
+    m.setHostName(host);
+    m.setAppId(appId != null ? appId : "host");
+    m.setInstanceId(instanceId);
+    m.setMetricName(metricName);
+    m.setStartTime(startTime);
+    TreeMap<Long, Double> vals = new TreeMap<Long, Double>();
+    vals.put(startTime + 15000l, val);
+    vals.put(startTime + 30000l, val);
+    vals.put(startTime + 45000l, val);
+    vals.put(startTime + 60000l, val);
+
+    m.setMetricValues(vals);
+
+    return m;
+  }
+
+  public static TimelineMetric createEmptyTimelineMetric(long startTime) {
+    TimelineMetric metric = new TimelineMetric();
+    metric.setMetricName("disk_used");
+    metric.setAppId("test_app");
+    metric.setInstanceId("test_instance");
+    metric.setHostName("test_host");
+    metric.setStartTime(startTime);
+
+    return metric;
+  }
+
+  public static TimelineClusterMetric createEmptyTimelineClusterMetric(
+      String name, long startTime) {
+    TimelineClusterMetric metric = new TimelineClusterMetric(name,
+        "test_app", "instance_id", startTime);
+
+    return metric;
+  }
+
+  public static TimelineClusterMetric createEmptyTimelineClusterMetric(
+      long startTime) {
+    return createEmptyTimelineClusterMetric("disk_used", startTime);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java
new file mode 100644
index 0000000..8c31691
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessorTest.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import 
org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import org.apache.ambari.metrics.core.timeline.aggregators.Function;
+import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixConnectionProvider;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL;
+import org.apache.phoenix.exception.PhoenixIOException;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.powermock.api.easymock.PowerMock.*;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({PhoenixTransactSQL.class, TimelineMetricConfiguration.class})
+public class PhoenixHBaseAccessorTest {
+  private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+
+  PhoenixConnectionProvider connectionProvider;
+  PhoenixHBaseAccessor accessor;
+
+  @Before
+  public void setupConf() throws Exception {
+    Configuration hbaseConf = new Configuration();
+    hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
+    Configuration metricsConf = new Configuration();
+    
metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE, 
"1");
+    
metricsConf.setStrings(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL,
 "100");
+    metricsConf.setStrings(
+      TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS,
+      
"org.apache.ambari.metrics.core.timeline.TimelineMetricsAggregatorMemorySink");
+
+    TimelineMetricConfiguration conf = new 
TimelineMetricConfiguration(hbaseConf, metricsConf);
+    mockStatic(TimelineMetricConfiguration.class);
+    
expect(TimelineMetricConfiguration.getInstance()).andReturn(conf).anyTimes();
+    replayAll();
+
+    connectionProvider = new PhoenixConnectionProvider() {
+      @Override
+      public HBaseAdmin getHBaseAdmin() throws IOException {
+        return null;
+      }
+
+      @Override
+      public Connection getConnection() throws SQLException {
+        return null;
+      }
+
+      };
+
+    accessor = new PhoenixHBaseAccessor(connectionProvider);
+  }
+
+  @Test
+  public void testGetMetricRecords() throws SQLException, IOException {
+    List<String> metricNames = new LinkedList<>();
+    List<String> hostnames = new LinkedList<>();
+    Multimap<String, List<Function>> metricFunctions = 
ArrayListMultimap.create();
+
+    mockStatic(PhoenixTransactSQL.class);
+    PreparedStatement preparedStatementMock = 
EasyMock.createNiceMock(PreparedStatement.class);
+    Condition condition = new DefaultCondition(metricNames, hostnames, 
"appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true);
+    expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, 
condition)).andReturn(preparedStatementMock).once();
+    ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class);
+    expect(preparedStatementMock.executeQuery()).andReturn(rsMock);
+
+
+    replayAll();
+    EasyMock.replay(preparedStatementMock, rsMock);
+
+    // Check when startTime < endTime
+    TimelineMetrics tml = accessor.getMetricRecords(condition, 
metricFunctions);
+
+    // Check when startTime > endTime
+    Condition condition2 = new DefaultCondition(metricNames, hostnames, 
"appid", "instanceid", 234L, 123L, Precision.SECONDS, 10, true);
+    TimelineMetrics tml2 = accessor.getMetricRecords(condition2, 
metricFunctions);
+    assertEquals(0, tml2.getMetrics().size());
+
+    verifyAll();
+    EasyMock.verify(preparedStatementMock, rsMock);
+  }
+
+  @Test
+  public void testGetMetricRecordsIOException() throws SQLException, 
IOException {
+    List<String> metricNames = new LinkedList<>();
+    List<String> hostnames = new LinkedList<>();
+    Multimap<String, List<Function>> metricFunctions = 
ArrayListMultimap.create();
+
+    mockStatic(PhoenixTransactSQL.class);
+    PreparedStatement preparedStatementMock = 
EasyMock.createNiceMock(PreparedStatement.class);
+    Condition condition = new DefaultCondition(metricNames, hostnames, 
"appid", "instanceid", 123L, 234L, Precision.SECONDS, 10, true);
+    expect(PhoenixTransactSQL.prepareGetMetricsSqlStmt(null, 
condition)).andReturn(preparedStatementMock).once();
+    ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class);
+    RuntimeException runtimeException = 
EasyMock.createNiceMock(RuntimeException.class);
+    IOException io = EasyMock.createNiceMock(IOException.class);
+    expect(preparedStatementMock.executeQuery()).andThrow(runtimeException);
+    expect(runtimeException.getCause()).andReturn(io).atLeastOnce();
+    StackTraceElement stackTrace[] = new StackTraceElement[]{new 
StackTraceElement("TimeRange","method","file",1)};
+    expect(io.getStackTrace()).andReturn(stackTrace).atLeastOnce();
+
+
+    replayAll();
+    EasyMock.replay(preparedStatementMock, rsMock, io, runtimeException);
+
+    TimelineMetrics tml = accessor.getMetricRecords(condition, 
metricFunctions);
+
+    assertEquals(0, tml.getMetrics().size());
+
+    verifyAll();
+    EasyMock.verify(preparedStatementMock, rsMock, io, runtimeException);
+  }
+
+  @Test
+  public void testGetMetricRecordsPhoenixIOExceptionDoNotRetryException() 
throws SQLException, IOException {
+    List<String> metricNames = new LinkedList<>();
+    List<String> hostnames = new LinkedList<>();
+    Multimap<String, List<Function>> metricFunctions = 
ArrayListMultimap.create();
+
+    mockStatic(PhoenixTransactSQL.class);
+    PreparedStatement preparedStatementMock = 
EasyMock.createNiceMock(PreparedStatement.class);
+    Condition condition = new DefaultCondition(metricNames, hostnames, 
"appid", "instanceid", null, null, Precision.SECONDS, 10, true);
+    expect(PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(null, 
condition)).andReturn(preparedStatementMock).once();
+    PhoenixTransactSQL.setSortMergeJoinEnabled(true);
+    EasyMock.expectLastCall();
+    ResultSet rsMock = EasyMock.createNiceMock(ResultSet.class);
+    PhoenixIOException pioe1 = 
EasyMock.createNiceMock(PhoenixIOException.class);
+    PhoenixIOException pioe2 = 
EasyMock.createNiceMock(PhoenixIOException.class);
+    DoNotRetryIOException dnrioe = 
EasyMock.createNiceMock(DoNotRetryIOException.class);
+    expect(preparedStatementMock.executeQuery()).andThrow(pioe1);
+    expect(pioe1.getCause()).andReturn(pioe2).atLeastOnce();
+    expect(pioe2.getCause()).andReturn(dnrioe).atLeastOnce();
+    StackTraceElement stackTrace[] = new StackTraceElement[]{new 
StackTraceElement("HashJoinRegionScanner","method","file",1)};
+    expect(dnrioe.getStackTrace()).andReturn(stackTrace).atLeastOnce();
+
+
+    replayAll();
+    EasyMock.replay(preparedStatementMock, rsMock, pioe1, pioe2, dnrioe);
+    try {
+      accessor.getMetricRecords(condition, metricFunctions);
+      fail();
+    } catch (Exception e) {
+      //NOP
+    }
+    verifyAll();
+  }
+
+  @Test
+  public void testMetricsCacheCommittingWhenFull() throws IOException, 
SQLException {
+    Configuration hbaseConf = new Configuration();
+    hbaseConf.setStrings(ZOOKEEPER_QUORUM, "quorum");
+
+    final Connection connection = EasyMock.createNiceMock(Connection.class);
+
+    accessor = new PhoenixHBaseAccessor(connectionProvider) {
+      @Override
+      public void commitMetrics(Collection<TimelineMetrics> 
timelineMetricsCollection) {
+        try {
+          connection.commit();
+        } catch (SQLException e) {
+          e.printStackTrace();
+        }
+      }
+    };
+
+    TimelineMetrics timelineMetrics = 
EasyMock.createNiceMock(TimelineMetrics.class);
+    
expect(timelineMetrics.getMetrics()).andReturn(Collections.singletonList(new 
TimelineMetric())).anyTimes();
+    connection.commit();
+    EasyMock.expectLastCall().once();
+
+    EasyMock.replay(timelineMetrics, connection);
+
+    accessor.insertMetricRecords(timelineMetrics);
+    accessor.insertMetricRecords(timelineMetrics);
+    accessor.insertMetricRecords(timelineMetrics);
+
+    EasyMock.verify(timelineMetrics, connection);
+  }
+
+  @Test
+  public void testMetricsAggregatorSink() throws IOException, SQLException {
+    Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMap =
+        new HashMap<>();
+    Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap =
+        new HashMap<>();
+    Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = new 
HashMap<>();
+
+
+    final Connection connection = EasyMock.createNiceMock(Connection.class);
+    final PreparedStatement statement = 
EasyMock.createNiceMock(PreparedStatement.class);
+    
expect(connection.prepareStatement(EasyMock.anyString())).andReturn(statement).anyTimes();
+    EasyMock.replay(statement);
+    EasyMock.replay(connection);
+
+    connectionProvider = new PhoenixConnectionProvider() {
+
+      @Override
+      public HBaseAdmin getHBaseAdmin() throws IOException {
+        return null;
+      }
+
+      @Override
+      public Connection getConnection() throws SQLException {
+        return connection;
+      }
+    };
+
+    accessor = new PhoenixHBaseAccessor(connectionProvider);
+
+    TimelineClusterMetric clusterMetric =
+        new TimelineClusterMetric("metricName", "appId", "instanceId",
+            System.currentTimeMillis());
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setMetricName("Metric1");
+    timelineMetric.setType("type1");
+    timelineMetric.setAppId("App1");
+    timelineMetric.setInstanceId("instance1");
+    timelineMetric.setHostName("host1");
+
+    clusterAggregateMap.put(clusterMetric, new MetricClusterAggregate());
+    clusterTimeAggregateMap.put(clusterMetric, new MetricHostAggregate());
+    hostAggregateMap.put(timelineMetric, new MetricHostAggregate());
+
+    TimelineMetricMetadataManager metricMetadataManagerMock = 
EasyMock.createMock(TimelineMetricMetadataManager.class);
+    
expect(metricMetadataManagerMock.getUuid(anyObject(TimelineClusterMetric.class))).andReturn(new
 byte[16]).times(2);
+    
expect(metricMetadataManagerMock.getUuid(anyObject(TimelineMetric.class))).andReturn(new
 byte[20]).once();
+    replay(metricMetadataManagerMock);
+
+    accessor.setMetadataInstance(metricMetadataManagerMock);
+    accessor.saveClusterAggregateRecords(clusterAggregateMap);
+    accessor.saveHostAggregateRecords(hostAggregateMap,
+        PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+    accessor.saveClusterAggregateRecordsSecond(clusterTimeAggregateMap,
+        PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+
+    TimelineMetricsAggregatorMemorySink memorySink =
+        new TimelineMetricsAggregatorMemorySink();
+    assertEquals(1, memorySink.getClusterAggregateRecords().size());
+    assertEquals(1, memorySink.getClusterTimeAggregateRecords().size());
+    assertEquals(1, memorySink.getHostAggregateRecords().size());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestClusterSuite.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestClusterSuite.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestClusterSuite.java
new file mode 100644
index 0000000..b8c28ed
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestClusterSuite.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+
+import org.apache.ambari.metrics.core.timeline.aggregators.ITClusterAggregator;
+import org.apache.ambari.metrics.core.timeline.aggregators.ITMetricAggregator;
+import org.junit.Ignore;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+import static org.junit.runners.Suite.SuiteClasses;
+
+@Ignore
+@RunWith(Suite.class)
+@SuiteClasses({ITMetricAggregator.class, ITClusterAggregator.class, 
ITPhoenixHBaseAccessor.class})
+public class TestClusterSuite {
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestMetricHostAggregate.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestMetricHostAggregate.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestMetricHostAggregate.java
new file mode 100644
index 0000000..ad27b74
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestMetricHostAggregate.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestMetricHostAggregate {
+
+  @Test
+  public void testCreateAggregate() throws Exception {
+    // given
+    MetricHostAggregate aggregate = createAggregate(3.0, 1.0, 2.0, 2);
+
+    //then
+    assertThat(aggregate.getSum()).isEqualTo(3.0);
+    assertThat(aggregate.getMin()).isEqualTo(1.0);
+    assertThat(aggregate.getMax()).isEqualTo(2.0);
+    assertThat(aggregate.calculateAverage()).isEqualTo(3.0 / 2);
+  }
+
+  @Test
+  public void testUpdateAggregates() throws Exception {
+    // given
+    MetricHostAggregate aggregate = createAggregate(3.0, 1.0, 2.0, 2);
+
+    //when
+    aggregate.updateAggregates(createAggregate(8.0, 0.5, 7.5, 2));
+    aggregate.updateAggregates(createAggregate(1.0, 1.0, 1.0, 1));
+
+    //then
+    assertThat(aggregate.getSum()).isEqualTo(12.0);
+    assertThat(aggregate.getMin()).isEqualTo(0.5);
+    assertThat(aggregate.getMax()).isEqualTo(7.5);
+    assertThat(aggregate.calculateAverage()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
+  }
+
+  static MetricHostAggregate createAggregate (Double sum, Double min,
+                                              Double max, Integer 
samplesCount) {
+    MetricHostAggregate aggregate = new MetricHostAggregate();
+    aggregate.setSum(sum);
+    aggregate.setMax(max);
+    aggregate.setMin(min);
+    aggregate.setDeviation(0.0);
+    aggregate.setNumberOfSamples(samplesCount);
+    return aggregate;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java
new file mode 100644
index 0000000..656034b
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java
@@ -0,0 +1,608 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import 
org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.DefaultCondition;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL;
+import 
org.apache.ambari.metrics.core.timeline.query.SplitByMetricNamesCondition;
+import org.apache.ambari.metrics.core.timeline.query.TopNCondition;
+import org.easymock.Capture;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import java.sql.Connection;
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
+import static org.easymock.EasyMock.verify;
+import org.easymock.EasyMock;
+
+public class TestPhoenixTransactSQL {
+  @Test
+  public void testConditionClause() throws Exception {
+    Condition condition = new DefaultCondition(Arrays.asList(new byte[8], new 
byte[8]),
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Collections.singletonList("h1"),
+      "a1", "i1", 1407959718L, 1407959918L, null, null, false);
+
+    String preparedClause = condition.getConditionClause().toString();
+    String expectedClause = "(UUID IN (?, ?)) AND SERVER_TIME >= ? AND 
SERVER_TIME < ?";
+
+    Assert.assertNotNull(preparedClause);
+    Assert.assertEquals(expectedClause, preparedClause);
+  }
+
+  @Test
+  public void testSplitByMetricNamesCondition() throws Exception {
+    Condition c = new DefaultCondition(Arrays.asList(new byte[8], new byte[8]),
+      Arrays.asList("cpu_user", "mem_free"), Collections.singletonList("h1"),
+      "a1", "i1", 1407959718L, 1407959918L, null, null, false);
+
+    SplitByMetricNamesCondition condition = new SplitByMetricNamesCondition(c);
+    condition.setCurrentUuid(new byte[8]);
+
+    String preparedClause = condition.getConditionClause().toString();
+    String expectedClause = "UUID = ? AND SERVER_TIME >= ? AND SERVER_TIME < 
?";
+
+    Assert.assertNotNull(preparedClause);
+    Assert.assertEquals(expectedClause, preparedClause);
+  }
+
+  @Ignore
+  @Test
+  public void testLikeConditionClause() throws Exception {
+    Condition condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("cpu_user", "some=%.metric")),
+      Collections.singletonList("h1"), "a1", "i1", 1407959718L, 1407959918L,
+      null, null, false);
+
+    String preparedClause = condition.getConditionClause().toString();
+    String expectedClause = "(METRIC_NAME IN (?) OR METRIC_NAME LIKE ?) AND 
HOSTNAME = ? AND " +
+        "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < 
?";
+
+    Assert.assertNotNull(preparedClause);
+    Assert.assertEquals(expectedClause, preparedClause);
+
+
+    condition = new DefaultCondition(
+        Collections.<String>emptyList(), Collections.singletonList("h1"), 
"a1", "i1",
+        1407959718L, 1407959918L, null, null, false);
+
+    preparedClause = condition.getConditionClause().toString();
+    expectedClause = " HOSTNAME = ? AND " +
+        "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < 
?";
+
+    Assert.assertNotNull(preparedClause);
+    Assert.assertEquals(expectedClause, preparedClause);
+
+
+    condition = new DefaultCondition(
+        null, Collections.singletonList("h1"), "a1", "i1",
+        1407959718L, 1407959918L, null, null, false);
+
+    preparedClause = condition.getConditionClause().toString();
+    expectedClause = " HOSTNAME = ? AND " +
+        "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < 
?";
+
+    Assert.assertNotNull(preparedClause);
+    Assert.assertEquals(expectedClause, preparedClause);
+
+
+    condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("some=%.metric")), 
Collections.singletonList("h1"), "a1", "i1",
+        1407959718L, 1407959918L, null, null, false);
+
+    preparedClause = condition.getConditionClause().toString();
+    expectedClause = "(METRIC_NAME LIKE ?) AND HOSTNAME = ? AND " +
+        "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < 
?";
+
+    Assert.assertNotNull(preparedClause);
+    Assert.assertEquals(expectedClause, preparedClause);
+
+
+    condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("some=%.metric1", "some=%.metric2", 
"some=%.metric3")),
+      Collections.singletonList("h1"), "a1", "i1",
+      1407959718L, 1407959918L, null, null, false);
+
+    preparedClause = condition.getConditionClause().toString();
+    expectedClause = "(METRIC_NAME LIKE ? OR METRIC_NAME LIKE ? OR METRIC_NAME 
LIKE ?) AND HOSTNAME = ? AND " +
+        "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < 
?";
+
+    Assert.assertNotNull(preparedClause);
+    Assert.assertEquals(expectedClause, preparedClause);
+  }
+
+  @Test
+  public void testPrepareGetAggregatePrecisionMINUTES() throws SQLException {
+    Condition condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Collections.singletonList("h1"),
+      "a1", "i1", 1407959718L, 1407959918L, Precision.MINUTES, null, false);
+    Connection connection = createNiceMock(Connection.class);
+    PreparedStatement preparedStatement = 
createNiceMock(PreparedStatement.class);
+    Capture<String> stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+        .andReturn(preparedStatement);
+
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition);
+    String stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE"));
+    verify(connection, preparedStatement);
+  }
+
+  @Test
+  public void testPrepareGetAggregateNoPrecision() throws SQLException {
+    Long endTime = 1407959918L;
+    Long startTime = 1407959718L;
+    //SECONDS precision
+    Condition condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Collections.singletonList("h1"),
+      "a1", "i1", startTime, endTime, null, null, false);
+    Connection connection = createNiceMock(Connection.class);
+    PreparedStatement preparedStatement = 
createNiceMock(PreparedStatement.class);
+    Capture<String> stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+        .andReturn(preparedStatement);
+
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition);
+    String stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE"));
+    Assert.assertEquals(Precision.SECONDS, condition.getPrecision());
+    verify(connection, preparedStatement);
+
+    // MINUTES precision
+    startTime = endTime-PhoenixTransactSQL.DAY/1000;
+    condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Collections.singletonList("h1"),
+      "a1", "i1", startTime, endTime, null, null, false);
+    connection = createNiceMock(Connection.class);
+    preparedStatement = createNiceMock(PreparedStatement.class);
+    stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+        .andReturn(preparedStatement);
+
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition);
+    stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE"));
+    Assert.assertEquals(Precision.MINUTES, condition.getPrecision());
+    verify(connection, preparedStatement);
+
+    // HOURS precision
+    startTime = endTime-PhoenixTransactSQL.DAY*30/1000;
+    condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Collections.singletonList("h1"),
+      "a1", "i1", startTime, endTime, null, null, false);
+    connection = createNiceMock(Connection.class);
+    preparedStatement = createNiceMock(PreparedStatement.class);
+    stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+        .andReturn(preparedStatement);
+
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition);
+    stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE_HOURLY"));
+    Assert.assertEquals(Precision.HOURS, condition.getPrecision());
+    verify(connection, preparedStatement);
+
+    // DAYS precision
+    startTime = endTime-PhoenixTransactSQL.DAY*30*2/1000;
+    condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Collections.singletonList("h1"),
+      "a1", "i1", startTime, endTime, null, null, false);
+    connection = createNiceMock(Connection.class);
+    preparedStatement = createNiceMock(PreparedStatement.class);
+    stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+        .andReturn(preparedStatement);
+
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition);
+    stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE_DAILY"));
+    Assert.assertEquals(Precision.DAYS, condition.getPrecision());
+    verify(connection, preparedStatement);
+  }
+
+  @Test
+  public void testPrepareGetAggregatePrecisionHours() throws SQLException {
+    Condition condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Collections.singletonList("h1"),
+      "a1", "i1", 1407959718L, 1407959918L, Precision.HOURS, null, false);
+    Connection connection = createNiceMock(Connection.class);
+    PreparedStatement preparedStatement = 
createNiceMock(PreparedStatement.class);
+    Capture<String> stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+        .andReturn(preparedStatement);
+
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition);
+    String stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE_HOURLY"));
+    verify(connection, preparedStatement);
+  }
+
+  @Test
+  public void testPrepareGetMetricsPrecisionMinutes() throws SQLException {
+    Condition condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Collections.singletonList("h1"),
+      "a1", "i1", 1407959718L, 1407959918L, Precision.MINUTES, null, false);
+    Connection connection = createNiceMock(Connection.class);
+    PreparedStatement preparedStatement = 
createNiceMock(PreparedStatement.class);
+    Capture<String> stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+        .andReturn(preparedStatement);
+
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition);
+    String stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_MINUTE"));
+    verify(connection, preparedStatement);
+  }
+
+  @Test
+  public void testPrepareGetMetricsNoPrecision() throws SQLException {
+    Long endTime = 1407959918L;
+    Long startTime = endTime - 200;
+    // SECONDS precision
+    Condition condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Collections.singletonList("h1"),
+      "a1", "i1", startTime, endTime, null, null, false);
+    Connection connection = createNiceMock(Connection.class);
+    PreparedStatement preparedStatement = 
createNiceMock(PreparedStatement.class);
+    Capture<String> stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+        .andReturn(preparedStatement);
+
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition);
+    String stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_RECORD"));
+    Assert.assertEquals(Precision.SECONDS, condition.getPrecision());
+    verify(connection, preparedStatement);
+    reset(connection, preparedStatement);
+
+    // SECONDS precision
+    startTime = endTime-PhoenixTransactSQL.HOUR*2/1000;
+    condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Collections.singletonList("h1"),
+      "a1", "i1", startTime, endTime, null, null, false);
+    connection = createNiceMock(Connection.class);
+    preparedStatement = createNiceMock(PreparedStatement.class);
+    stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+      .andReturn(preparedStatement);
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition);
+    stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_RECORD"));
+    Assert.assertEquals(Precision.SECONDS, condition.getPrecision());
+    verify(connection, preparedStatement);
+
+    // MINUTES precision
+    startTime = endTime-PhoenixTransactSQL.DAY/1000;
+    condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Collections.singletonList("h1"),
+      "a1", "i1", startTime, endTime, null, null, false);
+    connection = createNiceMock(Connection.class);
+    preparedStatement = createNiceMock(PreparedStatement.class);
+    stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+      .andReturn(preparedStatement);
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition);
+    stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_MINUTE"));
+    Assert.assertEquals(Precision.MINUTES, condition.getPrecision());
+    verify(connection, preparedStatement);
+
+    // HOURS precision
+    startTime = endTime-PhoenixTransactSQL.DAY*30/1000;
+    condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Collections.singletonList("h1"),
+      "a1", "i1", startTime, endTime, null, null, false);
+    connection = createNiceMock(Connection.class);
+    preparedStatement = createNiceMock(PreparedStatement.class);
+    stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+      .andReturn(preparedStatement);
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition);
+    stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_HOURLY"));
+    Assert.assertEquals(Precision.HOURS, condition.getPrecision());
+    verify(connection, preparedStatement);
+
+    // DAYS precision
+    startTime = endTime-PhoenixTransactSQL.DAY*30*2/1000;
+    condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Collections.singletonList("h1"),
+      "a1", "i1", startTime, endTime, null, null, false);
+    connection = createNiceMock(Connection.class);
+    preparedStatement = createNiceMock(PreparedStatement.class);
+    stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+      .andReturn(preparedStatement);
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition);
+    stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_DAILY"));
+    Assert.assertEquals(Precision.DAYS, condition.getPrecision());
+    verify(connection, preparedStatement);
+
+  }
+
+  @Test
+  public void testPrepareGetLatestMetricSqlStmtMultipleHostNames() throws 
SQLException {
+    Condition condition = new DefaultCondition(Arrays.asList(new byte[16], new 
byte[16], new byte[16], new byte[16]),
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Arrays.asList("h1", "h2"),
+      "a1", "i1", null, null, null, null, false);
+    Connection connection = createNiceMock(Connection.class);
+    PreparedStatement preparedStatement = 
createNiceMock(PreparedStatement.class);
+    ParameterMetaData parameterMetaData = 
createNiceMock(ParameterMetaData.class);
+    Capture<String> stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+        .andReturn(preparedStatement);
+    expect(preparedStatement.getParameterMetaData())
+      .andReturn(parameterMetaData).once();
+    // 6 = 1 instance_id + 1 appd_id + 2 hostnames + 2 metric names
+    expect(parameterMetaData.getParameterCount())
+      .andReturn(4).once();
+
+    replay(connection, preparedStatement, parameterMetaData);
+    PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(connection, condition);
+    String stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_RECORD"));
+    Assert.assertTrue(stmt.contains("JOIN"));
+    verify(connection, preparedStatement, parameterMetaData);
+  }
+
+  @Test
+  public void testPrepareGetLatestMetricSqlStmtSortMergeJoinAlgorithm()
+    throws SQLException {
+    Condition condition = new DefaultCondition(Arrays.asList(new byte[16], new 
byte[16]),
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Arrays.asList("h1"),
+      "a1", "i1", null, null, null, null, false);
+    Connection connection = createNiceMock(Connection.class);
+    PreparedStatement preparedStatement = 
createNiceMock(PreparedStatement.class);
+    ParameterMetaData parameterMetaData = 
createNiceMock(ParameterMetaData.class);
+    Capture<String> stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+        .andReturn(preparedStatement);
+    expect(preparedStatement.getParameterMetaData())
+      .andReturn(parameterMetaData).anyTimes();
+    expect(parameterMetaData.getParameterCount())
+      .andReturn(2).anyTimes();
+
+    replay(connection, preparedStatement, parameterMetaData);
+    PhoenixTransactSQL.setSortMergeJoinEnabled(true);
+    PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(connection, condition);
+    String stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("/*+ USE_SORT_MERGE_JOIN NO_CACHE */"));
+  }
+
+  @Test
+  public void testPrepareGetMetricsPrecisionHours() throws SQLException {
+    Condition condition = new DefaultCondition(
+      new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), 
Collections.singletonList("h1"),
+      "a1", "i1", 1407959718L, 1407959918L, Precision.HOURS, null, false);
+    Connection connection = createNiceMock(Connection.class);
+    PreparedStatement preparedStatement = 
createNiceMock(PreparedStatement.class);
+    Capture<String> stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+        .andReturn(preparedStatement);
+
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition);
+    String stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_HOURLY"));
+    verify(connection, preparedStatement);
+  }
+
+  @Test
+  public void testResultSetLimitCheck() throws SQLException {
+
+    List<String> metrics = new ArrayList<String>();
+    List<String> hosts = new ArrayList<String>();
+    int numMetrics = 0;
+    int numHosts = 0;
+    int limit = PhoenixHBaseAccessor.RESULTSET_LIMIT;
+
+    // 22 Metrics x 2 Hosts x 1 hour with requested SECONDS precision = 15840 
points. Should be OK!
+    numMetrics = 22;
+    numHosts = 2;
+    for (int i = 0; i < numMetrics; i++) {
+      metrics.add("TestMetric"+i);
+    }
+    for (int i = 0; i < numHosts; i++) {
+      hosts.add("TestHost"+i);
+    }
+
+    Condition condition = new DefaultCondition(
+      metrics, hosts,
+      "a1", "i1", 1407950000L, 1407953600L, Precision.SECONDS, null, false);
+    Connection connection = createNiceMock(Connection.class);
+    PreparedStatement preparedStatement = 
createNiceMock(PreparedStatement.class);
+    Capture<String> stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+      .andReturn(preparedStatement);
+
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition);
+    String stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_RECORD"));
+    verify(connection, preparedStatement);
+
+    //Check without passing precision. Should be OK!
+    condition = new DefaultCondition(
+      metrics, hosts,
+      "a1", "i1", 1407950000L, 1407953600L, null, null, false);
+    connection = createNiceMock(Connection.class);
+    preparedStatement = createNiceMock(PreparedStatement.class);
+    stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+      .andReturn(preparedStatement);
+
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition);
+    stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_RECORD"));
+    verify(connection, preparedStatement);
+
+    //Check with more hosts and lesser metrics for 1 day data = 11520 points 
Should be OK!
+    metrics.clear();
+    hosts.clear();
+    numMetrics = 2;
+    numHosts = 20;
+    for (int i = 0; i < numMetrics; i++) {
+      metrics.add("TestMetric"+i);
+    }
+    for (int i = 0; i < numHosts; i++) {
+      hosts.add("TestHost"+i);
+    }
+    condition = new DefaultCondition(
+      metrics, hosts,
+      "a1", "i1", 1407867200L, 1407953600L, null, null, false);
+    connection = createNiceMock(Connection.class);
+    preparedStatement = createNiceMock(PreparedStatement.class);
+    stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+      .andReturn(preparedStatement);
+
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition);
+    stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_MINUTE"));
+    verify(connection, preparedStatement);
+
+    //Check with 20 metrics, NO hosts and 1 day data = 5760 points. Should be 
OK!
+    metrics.clear();
+    hosts.clear();
+    numMetrics = 20;
+    for (int i = 0; i < numMetrics; i++) {
+      metrics.add("TestMetric"+i);
+    }
+    condition = new DefaultCondition(
+      metrics, hosts,
+      "a1", "i1", 1407867200L, 1407953600L, null, null, false);
+    connection = createNiceMock(Connection.class);
+    preparedStatement = createNiceMock(PreparedStatement.class);
+    stmtCapture = new Capture<String>();
+    expect(connection.prepareStatement(EasyMock.and(EasyMock.anyString(), 
EasyMock.capture(stmtCapture))))
+      .andReturn(preparedStatement);
+
+    replay(connection, preparedStatement);
+    PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition);
+    stmt = stmtCapture.getValue();
+    Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE_MINUTE"));
+    verify(connection, preparedStatement);
+
+    //Check with 5 hosts and 10 metrics for 1 hour data = 18000 points. Should 
throw out Exception!
+    metrics.clear();
+    hosts.clear();
+    numMetrics = 10;
+    numHosts = 5;
+    for (int i = 0; i < numMetrics; i++) {
+      metrics.add("TestMetric"+i);
+    }
+    for (int i = 0; i < numHosts; i++) {
+      hosts.add("TestHost"+i);
+    }
+    condition = new DefaultCondition(
+      metrics, hosts,
+      "a1", "i1", 1407950000L, 1407953600L, null, null, false);
+    boolean exceptionThrown = false;
+    boolean requestedSizeFoundInMessage = false;
+
+    try {
+      PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition);
+    } catch (PrecisionLimitExceededException pe) {
+      exceptionThrown = true;
+      String message = pe.getMessage();
+      if (message !=null && message.contains("18000")) {
+        requestedSizeFoundInMessage = true;
+      }
+    }
+    Assert.assertTrue(exceptionThrown);
+    Assert.assertTrue(requestedSizeFoundInMessage);
+  }
+
+  @Test
+  public void testTopNHostsConditionClause() throws Exception {
+    List<String> hosts = Arrays.asList("h1", "h2");
+    List<byte[]> uuids = Arrays.asList(new byte[16], new byte[16]);
+
+    Condition condition = new TopNCondition(uuids, new 
ArrayList<>(Collections.singletonList("cpu_user")), hosts,
+      "a1", "i1", 1407959718L, 1407959918L, null, null, false, 2, null, false);
+
+    String conditionClause = condition.getConditionClause().toString();
+    String expectedClause = " UUID IN (" +
+      "SELECT UUID FROM METRIC_RECORD WHERE " +
+          "(UUID LIKE ? OR UUID LIKE ?) AND " +
+          "SERVER_TIME >= ? AND SERVER_TIME < ? " +
+          "GROUP BY UUID ORDER BY MAX(METRIC_MAX) DESC LIMIT 2) AND 
SERVER_TIME >= ? AND SERVER_TIME < ?";
+
+    Assert.assertEquals(expectedClause, conditionClause);
+  }
+
+  @Test
+  public void testTopNMetricsConditionClause() throws Exception {
+    List<String> metricNames = new ArrayList<>(Arrays.asList("m1", "m2", 
"m3"));
+    List<byte[]> uuids = Arrays.asList(new byte[16], new byte[16], new 
byte[16]);
+
+    Condition condition = new TopNCondition(uuids, metricNames, 
Collections.singletonList("h1"),
+      "a1", "i1", 1407959718L, 1407959918L, null, null, false, 2, null, false);
+
+    String conditionClause = condition.getConditionClause().toString();
+    String expectedClause = " UUID IN (" +
+      "SELECT UUID FROM METRIC_RECORD WHERE " +
+      "(UUID LIKE ? OR UUID LIKE ? OR UUID LIKE ?) AND " +
+      "SERVER_TIME >= ? AND SERVER_TIME < ? " +
+      "GROUP BY UUID ORDER BY MAX(METRIC_MAX) DESC LIMIT 2) AND SERVER_TIME >= 
? AND SERVER_TIME < ?";
+
+    Assert.assertEquals(expectedClause, conditionClause);
+  }
+
+  @Test
+  public void testTopNMetricsIllegalConditionClause() throws Exception {
+    List<String> metricNames = new ArrayList<>(Arrays.asList("m1", "m2"));
+
+    List<String> hosts = Arrays.asList("h1", "h2");
+    List<byte[]> uuids = Arrays.asList(new byte[16], new byte[16], new 
byte[16], new byte[16]);
+
+    Condition condition = new TopNCondition(uuids, metricNames, hosts,
+      "a1", "i1", 1407959718L, 1407959918L, null, null, false, 2, null, false);
+
+    Assert.assertEquals(condition.getConditionClause(), null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestTimelineMetricStore.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestTimelineMetricStore.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestTimelineMetricStore.java
new file mode 100644
index 0000000..99e3e25
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestTimelineMetricStore.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
+import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+public class TestTimelineMetricStore implements TimelineMetricStore {
+  @Override
+  public TimelineMetrics getTimelineMetrics(List<String> metricNames,
+      List<String> hostnames, String applicationId, String instanceId, Long 
startTime,
+      Long endTime, Precision precision, Integer limit, boolean groupedByHost,
+      TopNConfig topNConfig, String seriesAggregateFunction) throws 
SQLException,
+    IOException {
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+    timelineMetrics.setMetrics(metricList);
+    TimelineMetric metric1 = new TimelineMetric();
+    TimelineMetric metric2 = new TimelineMetric();
+    metricList.add(metric1);
+    metricList.add(metric2);
+    metric1.setMetricName("cpu_user");
+    metric1.setAppId("1");
+    metric1.setInstanceId(null);
+    metric1.setHostName("c6401");
+    metric1.setStartTime(1407949812L);
+    metric1.setMetricValues(new TreeMap<Long, Double>() {{
+      put(1407949812L, 1.0d);
+      put(1407949912L, 1.8d);
+      put(1407950002L, 0.7d);
+    }});
+
+    metric2.setMetricName("mem_free");
+    metric2.setAppId("2");
+    metric2.setInstanceId("3");
+    metric2.setHostName("c6401");
+    metric2.setStartTime(1407949812L);
+    metric2.setMetricValues(new TreeMap<Long, Double>() {{
+      put(1407949812L, 2.5d);
+      put(1407949912L, 3.0d);
+      put(1407950002L, 0.9d);
+    }});
+
+    return timelineMetrics;
+  }
+
+  @Override
+  public TimelinePutResponse putMetrics(TimelineMetrics metrics)
+      throws SQLException, IOException {
+
+    return new TimelinePutResponse();
+  }
+
+  @Override
+  public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
+      throws SQLException, IOException {
+    return new TimelinePutResponse();
+  }
+
+  @Override
+  public Map<String, List<TimelineMetricMetadata>> 
getTimelineMetricMetadata(String appId, String metricPattern,
+                                                                             
boolean includeBlacklistedMetrics) throws SQLException, IOException {
+    return null;
+  }
+
+  @Override
+  public TimelinePutResponse putHostAggregatedMetrics(AggregationResult 
aggregationResult) throws SQLException, IOException {
+    return null;
+  }
+
+  @Override
+  public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, 
IOException {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String 
instanceId, String appId) throws SQLException, IOException {
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public List<String> getLiveInstances() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public byte[] getUuid(String metricName, String appId, String instanceId, 
String hostname) throws SQLException, IOException {
+    return null;
+  }
+
+}
+
+

Reply via email to