http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
new file mode 100644
index 0000000..325a039
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
@@ -0,0 +1,1844 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils;
+import org.apache.ambari.metrics.core.timeline.aggregators.Function;
+import 
org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import 
org.apache.ambari.metrics.core.timeline.aggregators.TimelineMetricReadHelper;
+import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata;
+import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataKey;
+import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.DefaultPhoenixDataSource;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixConnectionProvider;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL;
+import 
org.apache.ambari.metrics.core.timeline.query.SplitByMetricNamesCondition;
+import org.apache.ambari.metrics.core.timeline.sink.ExternalMetricsSink;
+import org.apache.ambari.metrics.core.timeline.sink.ExternalSinkProvider;
+import org.apache.ambari.metrics.core.timeline.source.InternalMetricsSource;
+import org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.phoenix.exception.PhoenixIOException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import com.google.common.collect.Multimap;
+
+
+/**
+ * Provides a facade over the Phoenix API to access HBase schema
+ */
+public class PhoenixHBaseAccessor {
+  private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
+
+  static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
+  // Default stale data allowance set to 3 minutes, 2 minutes more than time
+  // it was collected. Also 2 minutes is the default aggregation interval at
+  // cluster and host levels.
+  static final long DEFAULT_OUT_OF_BAND_TIME_ALLOWANCE = 300000;
+  /**
+   * 22 metrics for 2hours in SECONDS (10 second data)
+   * => Reasonable upper bound on the limit such that our Precision 
calculation for a given time range makes sense.
+   */
+  private static final int METRICS_PER_MINUTE = 22;
+  private static final int POINTS_PER_MINUTE = 6;
+  public static int RESULTSET_LIMIT = (int)TimeUnit.HOURS.toMinutes(2) * 
METRICS_PER_MINUTE * POINTS_PER_MINUTE ;
+
+  static TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new 
TimelineMetricReadHelper();
+  static ObjectMapper mapper = new ObjectMapper();
+  static TypeReference<TreeMap<Long, Double>> metricValuesTypeRef = new 
TypeReference<TreeMap<Long, Double>>() {};
+
+  private final Configuration hbaseConf;
+  private final Configuration metricsConf;
+  private final RetryCounterFactory retryCounterFactory;
+  private final PhoenixConnectionProvider dataSource;
+  private final long outOfBandTimeAllowance;
+  private final int cacheSize;
+  private final boolean cacheEnabled;
+  private final BlockingQueue<TimelineMetrics> insertCache;
+  private ScheduledExecutorService scheduledExecutorService;
+  private MetricsCacheCommitterThread metricsCommiterThread;
+  private TimelineMetricsAggregatorSink aggregatorSink;
+  private final int cacheCommitInterval;
+  private final boolean skipBlockCacheForAggregatorsEnabled;
+  private final String timelineMetricsTablesDurability;
+  private final String timelineMetricsPrecisionTableDurability;
+  private TimelineMetricMetadataManager metadataManagerInstance;
+
+  static final String HSTORE_COMPACTION_CLASS_KEY =
+    "hbase.hstore.defaultengine.compactionpolicy.class";
+  static final String HSTORE_ENGINE_CLASS =
+    "hbase.hstore.engine.class";
+  static final String FIFO_COMPACTION_POLICY_CLASS =
+    "org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy";
+  static final String DATE_TIERED_COMPACTION_POLICY =
+    "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine";
+  static final String BLOCKING_STORE_FILES_KEY =
+    "hbase.hstore.blockingStoreFiles";
+
+  private Map<String, Integer> tableTTL = new HashMap<>();
+
+  private final TimelineMetricConfiguration configuration;
+  private List<InternalMetricsSource> rawMetricsSources = new ArrayList<>();
+
+  public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) {
+    this(TimelineMetricConfiguration.getInstance(), dataSource);
+  }
+
+  // Test friendly construction since mock instrumentation is difficult to get
+  // working with hadoop mini cluster
+  PhoenixHBaseAccessor(TimelineMetricConfiguration configuration,
+                       PhoenixConnectionProvider dataSource) {
+    this.configuration = TimelineMetricConfiguration.getInstance();
+    try {
+      this.hbaseConf = configuration.getHbaseConf();
+      this.metricsConf = configuration.getMetricsConf();
+    } catch (Exception e) {
+      throw new ExceptionInInitializerError("Cannot initialize 
configuration.");
+    }
+    if (dataSource == null) {
+      dataSource = new DefaultPhoenixDataSource(hbaseConf);
+    }
+    this.dataSource = dataSource;
+
+    RESULTSET_LIMIT = 
metricsConf.getInt(TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT, 
RESULTSET_LIMIT);
+    try {
+      Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
+    } catch (ClassNotFoundException e) {
+      LOG.error("Phoenix client jar not found in the classpath.", e);
+      throw new IllegalStateException(e);
+    }
+
+    this.retryCounterFactory = new 
RetryCounterFactory(metricsConf.getInt(TimelineMetricConfiguration.GLOBAL_MAX_RETRIES,
 10),
+      (int) 
SECONDS.toMillis(metricsConf.getInt(TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL,
 3)));
+    this.outOfBandTimeAllowance = 
metricsConf.getLong(TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE,
+      DEFAULT_OUT_OF_BAND_TIME_ALLOWANCE);
+    this.cacheEnabled = 
Boolean.valueOf(metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_ENABLED,
 "true"));
+    this.cacheSize = 
Integer.valueOf(metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE,
 "150"));
+    this.cacheCommitInterval = 
Integer.valueOf(metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL,
 "3"));
+    this.insertCache = new ArrayBlockingQueue<TimelineMetrics>(cacheSize);
+    this.skipBlockCacheForAggregatorsEnabled = 
metricsConf.getBoolean(TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE,
 false);
+    this.timelineMetricsTablesDurability = 
metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY,
 "");
+    this.timelineMetricsPrecisionTableDurability = 
metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY,
 "");
+
+    tableTTL.put(PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME, 
metricsConf.getInt(TimelineMetricConfiguration.PRECISION_TABLE_TTL, 1 * 
86400));  // 1 day
+    tableTTL.put(PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME, 
metricsConf.getInt(TimelineMetricConfiguration.CONTAINER_METRICS_TTL, 30 * 
86400));  // 30 days
+    tableTTL.put(PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME, 
metricsConf.getInt(TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL, 7 * 
86400)); //7 days
+    tableTTL.put(PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME, 
metricsConf.getInt(TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL, 30 * 
86400)); //30 days
+    tableTTL.put(PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME, 
metricsConf.getInt(TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL, 365 * 
86400)); //1 year
+    tableTTL.put(PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME, 
metricsConf.getInt(TimelineMetricConfiguration.CLUSTER_SECOND_TABLE_TTL, 7 * 
86400)); //7 days
+    
tableTTL.put(PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, 
metricsConf.getInt(TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL, 30 * 
86400)); //30 days
+    
tableTTL.put(PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, 
metricsConf.getInt(TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL, 365 * 
86400)); //1 year
+    
tableTTL.put(PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, 
metricsConf.getInt(TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL, 730 * 
86400)); //2 years
+
+    if (cacheEnabled) {
+      LOG.debug("Initialising and starting metrics cache committer thread...");
+      metricsCommiterThread = new MetricsCacheCommitterThread(this);
+      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+      scheduledExecutorService.scheduleWithFixedDelay(metricsCommiterThread, 
0, cacheCommitInterval, TimeUnit.SECONDS);
+    }
+
+    Class<? extends TimelineMetricsAggregatorSink> metricSinkClass =
+        
metricsConf.getClass(TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS,
 null,
+            TimelineMetricsAggregatorSink.class);
+    if (metricSinkClass != null) {
+      aggregatorSink = ReflectionUtils.newInstance(metricSinkClass, 
metricsConf);
+      LOG.info("Initialized aggregator sink class " + metricSinkClass);
+    }
+
+    List<ExternalSinkProvider> externalSinkProviderList = 
configuration.getExternalSinkProviderList();
+    InternalSourceProvider internalSourceProvider = 
configuration.getInternalSourceProvider();
+    if (!externalSinkProviderList.isEmpty()) {
+      for (ExternalSinkProvider externalSinkProvider : 
externalSinkProviderList) {
+        ExternalMetricsSink rawMetricsSink = 
externalSinkProvider.getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME.RAW_METRICS);
+        int interval = 
configuration.getExternalSinkInterval(externalSinkProvider.getClass().getSimpleName(),
 InternalSourceProvider.SOURCE_NAME.RAW_METRICS);
+        if (interval == -1) {
+          interval = cacheCommitInterval;
+        }
+        
rawMetricsSources.add(internalSourceProvider.getInternalMetricsSource(InternalSourceProvider.SOURCE_NAME.RAW_METRICS,
 interval, rawMetricsSink));
+      }
+    }
+    TIMELINE_METRIC_READ_HELPER = new 
TimelineMetricReadHelper(this.metadataManagerInstance);
+  }
+
+  public boolean isInsertCacheEmpty() {
+    return insertCache.isEmpty();
+  }
+
+  public void commitMetricsFromCache() {
+    LOG.debug("Clearing metrics cache");
+    List<TimelineMetrics> metricsList = new 
ArrayList<TimelineMetrics>(insertCache.size());
+    if (!insertCache.isEmpty()) {
+      insertCache.drainTo(metricsList); // More performant than poll
+    }
+    if (metricsList.size() > 0) {
+      commitMetrics(metricsList);
+      if (!rawMetricsSources.isEmpty()) {
+        for (InternalMetricsSource rawMetricsSource : rawMetricsSources) {
+          rawMetricsSource.publishTimelineMetrics(metricsList);
+        }
+      }
+    }
+  }
+
+  public void commitMetrics(TimelineMetrics timelineMetrics) {
+    commitMetrics(Collections.singletonList(timelineMetrics));
+  }
+
+  public void commitMetrics(Collection<TimelineMetrics> 
timelineMetricsCollection) {
+    LOG.debug("Committing metrics to store");
+    Connection conn = null;
+    PreparedStatement metricRecordStmt = null;
+
+    try {
+      conn = getConnection();
+      metricRecordStmt = conn.prepareStatement(String.format(
+              PhoenixTransactSQL.UPSERT_METRICS_SQL, 
PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME));
+      for (TimelineMetrics timelineMetrics : timelineMetricsCollection) {
+        for (TimelineMetric metric : timelineMetrics.getMetrics()) {
+
+          metricRecordStmt.clearParameters();
+
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("host: " + metric.getHostName() + ", " +
+                    "metricName = " + metric.getMetricName() + ", " +
+                    "values: " + metric.getMetricValues());
+          }
+          double[] aggregates = AggregatorUtils.calculateAggregates(
+                  metric.getMetricValues());
+
+          byte[] uuid = metadataManagerInstance.getUuid(metric);
+          if (uuid == null) {
+            LOG.error("Error computing UUID for metric. Cannot write metrics : 
" + metric.toString());
+            continue;
+          }
+          metricRecordStmt.setBytes(1, uuid);
+          metricRecordStmt.setLong(2, metric.getStartTime());
+          metricRecordStmt.setDouble(3, aggregates[0]);
+          metricRecordStmt.setDouble(4, aggregates[1]);
+          metricRecordStmt.setDouble(5, aggregates[2]);
+          metricRecordStmt.setInt(6, (int) aggregates[3]);
+          String json = 
TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+          metricRecordStmt.setString(7, json);
+
+          try {
+            metricRecordStmt.executeUpdate();
+          } catch (SQLException sql) {
+            LOG.error("Failed on insert records to store.", sql);
+          }
+        }
+      }
+
+      // commit() blocked if HBase unavailable
+      conn.commit();
+    } catch (Exception exception){
+      exception.printStackTrace();
+    }
+    finally {
+      if (metricRecordStmt != null) {
+        try {
+          metricRecordStmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+  private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet 
rs)
+      throws SQLException, IOException {
+    TimelineMetric metric = 
TIMELINE_METRIC_READ_HELPER.getTimelineMetricCommonsFromResultSet(rs);
+    
metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS")));
+    return metric;
+  }
+
+  private static TreeMap<Long, Double> readLastMetricValueFromJSON(String json)
+      throws IOException {
+    TreeMap<Long, Double> values = readMetricFromJSON(json);
+    Long lastTimeStamp = values.lastKey();
+
+    TreeMap<Long, Double> valueMap = new TreeMap<Long, Double>();
+    valueMap.put(lastTimeStamp, values.get(lastTimeStamp));
+    return valueMap;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static TreeMap<Long, Double> readMetricFromJSON(String json) throws 
IOException {
+    return mapper.readValue(json, metricValuesTypeRef);
+  }
+
+  private Connection getConnectionRetryingOnException()
+    throws SQLException, InterruptedException {
+    RetryCounter retryCounter = retryCounterFactory.create();
+    while (true) {
+      try{
+        return getConnection();
+      } catch (SQLException e) {
+        if(!retryCounter.shouldRetry()){
+          LOG.error("HBaseAccessor getConnection failed after "
+            + retryCounter.getMaxAttempts() + " attempts");
+          throw e;
+        }
+      }
+      retryCounter.sleepUntilNextRetry();
+    }
+  }
+
+  /**
+   * Get JDBC connection to HBase store. Assumption is that the hbase
+   * configuration is present on the classpath and loaded by the caller into
+   * the Configuration object.
+   * Phoenix already caches the HConnection between the client and HBase
+   * cluster.
+   *
+   * @return @java.sql.Connection
+   */
+  public Connection getConnection() throws SQLException {
+    return dataSource.getConnection();
+  }
+
+  /**
+   * Unit test purpose only for now.
+   * @return @HBaseAdmin
+   * @throws IOException
+   */
+  Admin getHBaseAdmin() throws IOException {
+    return dataSource.getHBaseAdmin();
+  }
+
+  protected void initMetricSchema() {
+    Connection conn = null;
+    Statement stmt = null;
+
+    String encoding = 
metricsConf.get(TimelineMetricConfiguration.HBASE_ENCODING_SCHEME, 
PhoenixTransactSQL.DEFAULT_ENCODING);
+    String compression = 
metricsConf.get(TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, 
PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION);
+
+
+    try {
+      LOG.info("Initializing metrics schema...");
+      conn = getConnectionRetryingOnException();
+      stmt = conn.createStatement();
+
+      // Metadata
+      String metadataSql = 
String.format(PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL,
+        encoding, compression);
+      stmt.executeUpdate(metadataSql);
+      stmt.executeUpdate(PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE);
+
+      String hostedAppSql = 
String.format(PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL,
+        encoding, compression);
+      stmt.executeUpdate(hostedAppSql);
+
+      //Host Instances table
+      String hostedInstancesSql = 
String.format(PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL,
+        encoding, compression);
+      stmt.executeUpdate(hostedInstancesSql);
+
+      // Container Metrics
+      stmt.executeUpdate( 
String.format(PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL,
+        encoding, 
tableTTL.get(PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME), compression));
+
+      // Host level
+      String precisionSql = 
String.format(PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL,
+        encoding, tableTTL.get(PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME), 
compression);
+      stmt.executeUpdate(precisionSql);
+
+      String hostMinuteAggregrateSql = 
String.format(PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL,
+        PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding,
+        tableTTL.get(PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME),
+        compression);
+      stmt.executeUpdate(hostMinuteAggregrateSql);
+
+      
stmt.executeUpdate(String.format(PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL,
+        PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME, encoding,
+        tableTTL.get(PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME),
+        compression));
+      
stmt.executeUpdate(String.format(PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL,
+        PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME, encoding,
+        tableTTL.get(PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME),
+        compression));
+
+      // Cluster level
+      String aggregateSql = 
String.format(PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL,
+        PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding,
+        tableTTL.get(PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME),
+        compression);
+
+      stmt.executeUpdate(aggregateSql);
+      
stmt.executeUpdate(String.format(PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
+        PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, 
encoding,
+        
tableTTL.get(PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME),
+        compression));
+      
stmt.executeUpdate(String.format(PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
+        PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, 
encoding,
+        
tableTTL.get(PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME),
+        compression));
+      
stmt.executeUpdate(String.format(PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
+        PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, 
encoding,
+        
tableTTL.get(PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME),
+        compression));
+
+
+      conn.commit();
+
+      LOG.info("Metrics schema initialized.");
+    } catch (SQLException | InterruptedException sql) {
+      LOG.error("Error creating Metrics Schema in HBase using Phoenix.", sql);
+      throw new MetricsSystemInitializationException(
+        "Error creating Metrics Schema in HBase using Phoenix.", sql);
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+  protected void initPoliciesAndTTL() {
+    Admin hBaseAdmin = null;
+    try {
+      hBaseAdmin = dataSource.getHBaseAdmin();
+    } catch (IOException e) {
+      LOG.warn("Unable to initialize HBaseAdmin for setting policies.", e);
+    }
+
+    TableName[] tableNames = null;
+    if (hBaseAdmin != null) {
+      try {
+        tableNames = 
hBaseAdmin.listTableNames(PhoenixTransactSQL.PHOENIX_TABLES_REGEX_PATTERN_STRING,
 false);
+      } catch (IOException e) {
+        LOG.warn("Unable to get table names from HBaseAdmin for setting 
policies.", e);
+        return;
+      }
+      if (tableNames == null || tableNames.length == 0) {
+        LOG.warn("Unable to get table names from HBaseAdmin for setting 
policies.");
+        return;
+      }
+      for (String tableName : PhoenixTransactSQL.PHOENIX_TABLES) {
+        try {
+          boolean modifyTable = false;
+          Optional<TableName> tableNameOptional = Arrays.stream(tableNames)
+            .filter(t -> tableName.equals(t.getNameAsString())).findFirst();
+
+          TableDescriptor tableDescriptor = null;
+          if (tableNameOptional.isPresent()) {
+            tableDescriptor = 
hBaseAdmin.getTableDescriptor(tableNameOptional.get());
+          }
+
+          if (tableDescriptor == null) {
+            LOG.warn("Unable to get table descriptor for " + tableName);
+            continue;
+          }
+
+          // @TableDescriptor is immutable by design
+          TableDescriptorBuilder tableDescriptorBuilder =
+            TableDescriptorBuilder.newBuilder(tableDescriptor);
+
+          //Set normalizer preferences
+          boolean enableNormalizer = 
hbaseConf.getBoolean("hbase.normalizer.enabled", false);
+          if (enableNormalizer ^ tableDescriptor.isNormalizationEnabled()) {
+            tableDescriptorBuilder.setNormalizationEnabled(enableNormalizer);
+            LOG.info("Normalizer set to " + enableNormalizer + " for " + 
tableName);
+            modifyTable = true;
+          }
+
+          //Set durability preferences
+          boolean durabilitySettingsModified = 
setDurabilityForTable(tableName, tableDescriptorBuilder);
+          modifyTable = modifyTable || durabilitySettingsModified;
+
+          //Set compaction policy preferences
+          boolean compactionPolicyModified = false;
+          compactionPolicyModified = setCompactionPolicyForTable(tableName, 
tableDescriptorBuilder);
+          modifyTable = modifyTable || compactionPolicyModified;
+
+          // Change TTL setting to match user configuration
+          ColumnFamilyDescriptor[] columnFamilyDescriptors = 
tableDescriptor.getColumnFamilies();
+          if (columnFamilyDescriptors != null) {
+            for (ColumnFamilyDescriptor familyDescriptor : 
columnFamilyDescriptors) {
+              int ttlValue = familyDescriptor.getTimeToLive();
+              if (ttlValue != tableTTL.get(tableName)) {
+                ColumnFamilyDescriptorBuilder familyDescriptorBuilder =
+                  ColumnFamilyDescriptorBuilder.newBuilder(familyDescriptor);
+
+                familyDescriptorBuilder.setTimeToLive(tableTTL.get(tableName));
+
+                LOG.info("Setting TTL on table: " + tableName + " to : " +
+                  tableTTL.get(tableName) + " seconds.");
+
+                hBaseAdmin.modifyColumnFamily(tableNameOptional.get(), 
familyDescriptorBuilder.build());
+                // modifyTable = true;
+              }
+            }
+          }
+
+          // Persist only if anything changed
+          if (modifyTable) {
+            hBaseAdmin.modifyTable(tableNameOptional.get(), 
tableDescriptorBuilder.build());
+          }
+
+        } catch (IOException e) {
+          LOG.error("Failed setting policies for " + tableName, e);
+        }
+      }
+      try {
+        hBaseAdmin.close();
+      } catch (IOException e) {
+        LOG.warn("Exception on HBaseAdmin close.", e);
+      }
+    }
+  }
+
+  private boolean setDurabilityForTable(String tableName, 
TableDescriptorBuilder tableDescriptor) {
+
+    boolean modifyTable = false;
+    // Set WAL preferences
+    if (PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME.equals(tableName)) {
+      if (!timelineMetricsPrecisionTableDurability.isEmpty()) {
+        LOG.info("Setting WAL option " + 
timelineMetricsPrecisionTableDurability + " for table : " + tableName);
+        boolean validDurability = true;
+        if ("SKIP_WAL".equals(timelineMetricsPrecisionTableDurability)) {
+          tableDescriptor.setDurability(Durability.SKIP_WAL);
+        } else if ("SYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) 
{
+          tableDescriptor.setDurability(Durability.SYNC_WAL);
+        } else if 
("ASYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) {
+          tableDescriptor.setDurability(Durability.ASYNC_WAL);
+        } else if 
("FSYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) {
+          tableDescriptor.setDurability(Durability.FSYNC_WAL);
+        } else {
+          LOG.info("Unknown value for " + 
TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY + " : " 
+ timelineMetricsPrecisionTableDurability);
+          validDurability = false;
+        }
+        if (validDurability) {
+          modifyTable = true;
+        }
+      }
+    } else {
+      if (!timelineMetricsTablesDurability.isEmpty()) {
+        LOG.info("Setting WAL option " + timelineMetricsTablesDurability + " 
for table : " + tableName);
+        boolean validDurability = true;
+        if ("SKIP_WAL".equals(timelineMetricsTablesDurability)) {
+          tableDescriptor.setDurability(Durability.SKIP_WAL);
+        } else if ("SYNC_WAL".equals(timelineMetricsTablesDurability)) {
+          tableDescriptor.setDurability(Durability.SYNC_WAL);
+        } else if ("ASYNC_WAL".equals(timelineMetricsTablesDurability)) {
+          tableDescriptor.setDurability(Durability.ASYNC_WAL);
+        } else if ("FSYNC_WAL".equals(timelineMetricsTablesDurability)) {
+          tableDescriptor.setDurability(Durability.FSYNC_WAL);
+        } else {
+          LOG.info("Unknown value for " + 
TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY + " : 
" + timelineMetricsTablesDurability);
+          validDurability = false;
+        }
+        if (validDurability) {
+          modifyTable = true;
+        }
+      }
+    }
+    return modifyTable;
+  }
+
+  private boolean setCompactionPolicyForTable(String tableName, 
TableDescriptorBuilder tableDescriptorBuilder) {
+
+    boolean modifyTable = false;
+
+    String compactionPolicyKey = 
metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY,
+      HSTORE_ENGINE_CLASS);
+    String compactionPolicyClass = 
metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS,
+      DATE_TIERED_COMPACTION_POLICY);
+    int blockingStoreFiles = 
hbaseConf.getInt(TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES,
 60);
+
+    if (tableName.equals(PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME)) {
+      compactionPolicyKey = 
metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY,
+        HSTORE_COMPACTION_CLASS_KEY);
+      compactionPolicyClass = 
metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS,
+        FIFO_COMPACTION_POLICY_CLASS);
+      blockingStoreFiles = 
hbaseConf.getInt(TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES,
 1000);
+    }
+    
+    if (StringUtils.isEmpty(compactionPolicyKey) || 
StringUtils.isEmpty(compactionPolicyClass)) {
+      // Default blockingStoreFiles = 300
+      modifyTable = setHbaseBlockingStoreFiles(tableDescriptorBuilder, 
tableName, 300);
+    } else {
+      tableDescriptorBuilder.setValue(compactionPolicyKey, 
compactionPolicyClass);
+      if (tableName.equals(PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME)) {
+        tableDescriptorBuilder.removeValue(HSTORE_ENGINE_CLASS.getBytes());
+      } else {
+        
tableDescriptorBuilder.removeValue(HSTORE_COMPACTION_CLASS_KEY.getBytes());
+      }
+      setHbaseBlockingStoreFiles(tableDescriptorBuilder, tableName, 
blockingStoreFiles);
+      modifyTable = true;
+    }
+
+    return modifyTable;
+  }
+
+  private boolean setHbaseBlockingStoreFiles(TableDescriptorBuilder 
tableDescriptor,
+                                             String tableName, int value) {
+    tableDescriptor.setValue(BLOCKING_STORE_FILES_KEY, String.valueOf(value));
+    LOG.info("Setting config property " + BLOCKING_STORE_FILES_KEY +
+      " = " + value + " for " + tableName);
+    return true;
+  }
+
+  protected String getSplitPointsStr(String splitPoints) {
+    if (StringUtils.isEmpty(splitPoints.trim())) {
+      return "";
+    }
+    String[] points = splitPoints.split(",");
+    if (points.length > 0) {
+      StringBuilder sb = new StringBuilder(" SPLIT ON ");
+      sb.append("(");
+      for (String point : points) {
+        sb.append("'");
+        sb.append(point.trim());
+        sb.append("'");
+        sb.append(",");
+      }
+      sb.deleteCharAt(sb.length() - 1);
+      sb.append(")");
+      return sb.toString();
+    }
+    return "";
+  }
+
+  /**
+   * Insert precision YARN container data.
+   */
+  public void insertContainerMetrics(List<ContainerMetric> metrics)
+      throws SQLException, IOException {
+    Connection conn = getConnection();
+    PreparedStatement metricRecordStmt = null;
+
+    try {
+      metricRecordStmt = conn.prepareStatement(
+          String.format(PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL, 
PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME));
+      for (ContainerMetric metric : metrics) {
+        metricRecordStmt.clearParameters();
+        metricRecordStmt.setString(1, 
ContainerId.fromString(metric.getContainerId())
+            .getApplicationAttemptId().getApplicationId().toString());
+        metricRecordStmt.setString(2, metric.getContainerId());
+        metricRecordStmt.setTimestamp(3, new Timestamp(metric.getStartTime()));
+        metricRecordStmt.setTimestamp(4, new 
Timestamp(metric.getFinishTime()));
+        metricRecordStmt.setLong(5, metric.getFinishTime() - 
metric.getStartTime());
+        metricRecordStmt.setString(6, metric.getHostName());
+        metricRecordStmt.setInt(7, metric.getExitCode());
+        metricRecordStmt.setLong(8, metric.getLocalizationDuration());
+        metricRecordStmt.setLong(9, metric.getLaunchDuration());
+        metricRecordStmt.setDouble(10, (double) metric.getPmemLimit() / 1024);
+        metricRecordStmt.setDouble(11,
+            ((double) metric.getPmemLimit() / 1024) * (metric.getFinishTime()
+                - metric.getStartTime()));
+        metricRecordStmt.setDouble(12, (double) metric.getVmemLimit() / 1024);
+        metricRecordStmt.setDouble(13, (double) metric.getPmemUsedMin() / 
1024);
+        metricRecordStmt.setDouble(14, (double) metric.getPmemUsedMax() / 
1024);
+        metricRecordStmt.setDouble(15, (double) metric.getPmemUsedAvg() / 
1024);
+        metricRecordStmt.setDouble(16, (double) metric.getPmem50Pct() / 1024);
+        metricRecordStmt.setDouble(17, (double) metric.getPmem75Pct() / 1024);
+        metricRecordStmt.setDouble(18, (double) metric.getPmem90Pct() / 1024);
+        metricRecordStmt.setDouble(19, (double) metric.getPmem95Pct()/ 1024);
+        metricRecordStmt.setDouble(20, (double) metric.getPmem99Pct() / 1024);
+        metricRecordStmt.setDouble(21, (double) metric.getPmemLimit() / 1024
+            - (double) metric.getPmemUsedMax() / 1024);
+        metricRecordStmt.setDouble(22, ((double) metric.getPmemLimit() / 1024
+            - (double) metric.getPmemUsedMax() / 1024) * 
(metric.getFinishTime()
+            - metric.getStartTime()));
+
+        try {
+          metricRecordStmt.executeUpdate();
+        } catch (SQLException sql) {
+          LOG.error("Failed on insert records to store.", sql);
+        }
+      }
+
+      conn.commit();
+    } finally {
+      if (metricRecordStmt != null) {
+        try {
+          metricRecordStmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+  /**
+   * Insert precision data.
+   */
+  public void insertMetricRecordsWithMetadata(TimelineMetricMetadataManager 
metadataManager,
+                                              TimelineMetrics metrics, boolean 
skipCache) throws SQLException, IOException {
+    List<TimelineMetric> timelineMetrics = metrics.getMetrics();
+    if (timelineMetrics == null || timelineMetrics.isEmpty()) {
+      LOG.debug("Empty metrics insert request.");
+      return;
+    }
+    for (Iterator<TimelineMetric> iterator = timelineMetrics.iterator(); 
iterator.hasNext();) {
+
+      TimelineMetric tm = iterator.next();
+
+      boolean acceptMetric = TimelineMetricsFilter.acceptMetric(tm);
+
+      // Write to metadata cache on successful write to store
+      if (metadataManager != null) {
+        metadataManager.putIfModifiedTimelineMetricMetadata(
+                metadataManager.getTimelineMetricMetadata(tm, acceptMetric));
+
+        metadataManager.putIfModifiedHostedAppsMetadata(
+                tm.getHostName(), tm.getAppId());
+
+        if (!tm.getAppId().equals("FLUME_HANDLER")) {
+          
metadataManager.putIfModifiedHostedInstanceMetadata(tm.getInstanceId(), 
tm.getHostName());
+        }
+      }
+      if (!acceptMetric) {
+        iterator.remove();
+      }
+    }
+
+    if  (!skipCache && cacheEnabled) {
+      LOG.debug("Adding metrics to cache");
+      if (insertCache.size() >= cacheSize) {
+        commitMetricsFromCache();
+      }
+      try {
+        insertCache.put(metrics); // blocked while the queue is full
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    } else {
+      LOG.debug("Skipping metrics cache");
+      commitMetrics(metrics);
+    }
+  }
+
+  public void insertMetricRecords(TimelineMetrics metrics, boolean skipCache) 
throws SQLException, IOException {
+    insertMetricRecordsWithMetadata(null, metrics, skipCache);
+  }
+
+  public void insertMetricRecords(TimelineMetrics metrics) throws 
SQLException, IOException {
+    insertMetricRecords(metrics, false);
+  }
+
+
+  @SuppressWarnings("unchecked")
+  public TimelineMetrics getMetricRecords(
+    final Condition condition, Multimap<String, List<Function>> 
metricFunctions)
+    throws SQLException, IOException {
+
+    validateConditionIsNotEmpty(condition);
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+    TimelineMetrics metrics = new TimelineMetrics();
+
+    try {
+      //get latest
+      if (condition.isPointInTime()){
+        getLatestMetricRecords(condition, conn, metrics);
+      } else {
+        if (condition.getEndTime() >= condition.getStartTime()) {
+          stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+          rs = stmt.executeQuery();
+          while (rs.next()) {
+            appendMetricFromResultSet(metrics, condition, metricFunctions, rs);
+          }
+        } else {
+          LOG.warn("Skipping metrics query because endTime < startTime");
+        }
+      }
+
+    } catch (PhoenixIOException pioe) {
+      Throwable pioe2 = pioe.getCause();
+      // Need to find out if this is exception "Could not find hash cache
+      // for joinId" or another PhoenixIOException
+      if (pioe2 instanceof PhoenixIOException &&
+        pioe2.getCause() instanceof DoNotRetryIOException) {
+        String className = null;
+        for (StackTraceElement ste : pioe2.getCause().getStackTrace()) {
+          className = ste.getClassName();
+        }
+
+        if (className != null && className.equals("HashJoinRegionScanner")) {
+          LOG.error("The cache might have expired and have been removed. Try 
to" +
+            " increase the cache size by setting bigger value for " +
+            "phoenix.coprocessor.maxMetaDataCacheSize in ams-hbase-site 
config." +
+            " Falling back to sort-merge join algorithm.");
+          PhoenixTransactSQL.setSortMergeJoinEnabled(true);
+        }
+      }
+      throw pioe;
+    } catch (RuntimeException ex) {
+      // We need to find out if this is a real IO exception
+      // or exception "maxStamp is smaller than minStamp"
+      // which is thrown in hbase TimeRange.java
+      Throwable io = ex.getCause();
+      String className = null;
+      if (io != null) {
+        for (StackTraceElement ste : io.getStackTrace()) {
+          className = ste.getClassName();
+        }
+      }
+      if (className != null && className.equals("TimeRange")) {
+        // This is "maxStamp is smaller than minStamp" exception
+        // Log error and return empty metrics
+        LOG.debug(io);
+        return new TimelineMetrics();
+      } else {
+        throw ex;
+      }
+
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+
+    LOG.debug("Metrics records size: " + metrics.getMetrics().size());
+    return metrics;
+  }
+
+  /**
+   * Apply aggregate function to the result if supplied else get precision
+   * or aggregate data with default function applied.
+   */
+  private void appendMetricFromResultSet(TimelineMetrics metrics, Condition 
condition,
+                                         Multimap<String, List<Function>> 
metricFunctions,
+                                         ResultSet rs) throws SQLException, 
IOException {
+    byte[] uuid = rs.getBytes("UUID");
+    String metricName = metadataManagerInstance.getMetricNameFromUuid(uuid);
+    Collection<List<Function>> functionList = 
findMetricFunctions(metricFunctions, metricName);
+
+    for (List<Function> functions : functionList) {
+      // Apply aggregation function if present
+      if ((functions != null && !functions.isEmpty())) {
+        if (functions.size() > 1) {
+          throw new IllegalArgumentException("Multiple aggregate functions not 
supported.");
+        }
+        for (Function f : functions) {
+          if (f.getReadFunction() == Function.ReadFunction.VALUE) {
+            getTimelineMetricsFromResultSet(metrics, f, condition, rs);
+          } else {
+            SingleValuedTimelineMetric metric =
+              
TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f);
+
+            if (condition.isGrouped()) {
+              metrics.addOrMergeTimelineMetric(metric);
+            } else {
+              metrics.getMetrics().add(metric.getTimelineMetric());
+            }
+          }
+        }
+      } else {
+        // No aggregation requested
+        // Execution never goes here, function always contain at least 1 
element
+        getTimelineMetricsFromResultSet(metrics, null, condition, rs);
+      }
+    }
+  }
+
+  private void getTimelineMetricsFromResultSet(TimelineMetrics metrics, 
Function f, Condition condition, ResultSet rs) throws SQLException, IOException 
{
+    if (condition.getPrecision().equals(Precision.SECONDS)) {
+      TimelineMetric metric = 
TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs);
+      if (f != null && f.getSuffix() != null) { //Case : Requesting "._rate" 
for precision data
+        metric.setMetricName(metric.getMetricName() + f.getSuffix());
+      }
+      if (condition.isGrouped()) {
+        metrics.addOrMergeTimelineMetric(metric);
+      } else {
+        metrics.getMetrics().add(metric);
+      }
+
+    } else {
+      SingleValuedTimelineMetric metric =
+        
TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f);
+      if (condition.isGrouped()) {
+        metrics.addOrMergeTimelineMetric(metric);
+      } else {
+        metrics.getMetrics().add(metric.getTimelineMetric());
+      }
+    }
+  }
+
+  private void getLatestMetricRecords(Condition condition, Connection conn,
+                                      TimelineMetrics metrics) throws 
SQLException, IOException {
+
+    validateConditionIsNotEmpty(condition);
+
+    PreparedStatement stmt = 
PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(conn, condition);
+    ResultSet rs = null;
+    try {
+      rs = stmt.executeQuery();
+      while (rs.next()) {
+        TimelineMetric metric = getLastTimelineMetricFromResultSet(rs);
+        metrics.getMetrics().add(metric);
+      }
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (stmt != null) {
+        stmt.close();
+      }
+    }
+  }
+
+  /**
+   * Get metrics aggregated across hosts.
+   *
+   * @param condition @Condition
+   * @return @TimelineMetrics
+   * @throws SQLException
+   */
+  public TimelineMetrics getAggregateMetricRecords(final Condition condition,
+      Multimap<String, List<Function>> metricFunctions) throws SQLException {
+
+    validateConditionIsNotEmpty(condition);
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+    TimelineMetrics metrics = new TimelineMetrics();
+
+    try {
+      //get latest
+      if(condition.isPointInTime()) {
+        getLatestAggregateMetricRecords(condition, conn, metrics, 
metricFunctions);
+      } else {
+        stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
+
+        rs = stmt.executeQuery();
+        while (rs.next()) {
+          appendAggregateMetricFromResultSet(metrics, condition, 
metricFunctions, rs);
+        }
+      }
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+
+    LOG.debug("Aggregate records size: " + metrics.getMetrics().size());
+    return metrics;
+  }
+
+  private void appendAggregateMetricFromResultSet(TimelineMetrics metrics,
+      Condition condition, Multimap<String, List<Function>> metricFunctions,
+      ResultSet rs) throws SQLException {
+
+    byte[] uuid = rs.getBytes("UUID");
+    String metricName = metadataManagerInstance.getMetricNameFromUuid(uuid);
+    Collection<List<Function>> functionList = 
findMetricFunctions(metricFunctions, metricName);
+
+    for (List<Function> functions : functionList) {
+      for (Function aggregateFunction : functions) {
+        SingleValuedTimelineMetric metric;
+
+        if (condition.getPrecision() == Precision.MINUTES
+          || condition.getPrecision() == Precision.HOURS
+          || condition.getPrecision() == Precision.DAYS) {
+          metric = getAggregateTimelineMetricFromResultSet(rs, 
aggregateFunction, false);
+        } else {
+          metric = getAggregateTimelineMetricFromResultSet(rs, 
aggregateFunction, true);
+        }
+
+        if (condition.isGrouped()) {
+          metrics.addOrMergeTimelineMetric(metric);
+        } else {
+          metrics.getMetrics().add(metric.getTimelineMetric());
+        }
+      }
+    }
+
+  }
+
+  private void getLatestAggregateMetricRecords(Condition condition,
+      Connection conn, TimelineMetrics metrics,
+      Multimap<String, List<Function>> metricFunctions) throws SQLException {
+
+    PreparedStatement stmt = null;
+    SplitByMetricNamesCondition splitCondition =
+      new SplitByMetricNamesCondition(condition);
+
+    for (byte[] uuid: condition.getUuids()) {
+
+      splitCondition.setCurrentUuid(uuid);
+      stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn, 
splitCondition);
+      ResultSet rs = null;
+      try {
+        rs = stmt.executeQuery();
+        while (rs.next()) {
+          String metricName = 
metadataManagerInstance.getMetricNameFromUuid(uuid);
+          Collection<List<Function>> functionList = 
findMetricFunctions(metricFunctions, metricName);
+          for (List<Function> functions : functionList) {
+            if (functions != null) {
+              for (Function f : functions) {
+                SingleValuedTimelineMetric metric =
+                  getAggregateTimelineMetricFromResultSet(rs, f, true);
+
+                if (condition.isGrouped()) {
+                  metrics.addOrMergeTimelineMetric(metric);
+                } else {
+                  metrics.getMetrics().add(metric.getTimelineMetric());
+                }
+              }
+            } else {
+              SingleValuedTimelineMetric metric =
+                getAggregateTimelineMetricFromResultSet(rs, new Function(), 
true);
+              metrics.getMetrics().add(metric.getTimelineMetric());
+            }
+          }
+        }
+      } finally {
+        if (rs != null) {
+          try {
+            rs.close();
+          } catch (SQLException e) {
+            // Ignore
+          }
+        }
+        if (stmt != null) {
+          stmt.close();
+        }
+      }
+    }
+  }
+
+  private SingleValuedTimelineMetric 
getAggregateTimelineMetricFromResultSet(ResultSet rs,
+      Function f, boolean useHostCount) throws SQLException {
+
+    String countColumnName = "METRIC_COUNT";
+    if (useHostCount) {
+      countColumnName = "HOSTS_COUNT";
+    }
+
+    byte[] uuid = rs.getBytes("UUID");
+    TimelineMetric timelineMetric = 
metadataManagerInstance.getMetricFromUuid(uuid);
+
+    SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric(
+      timelineMetric.getMetricName() + f.getSuffix(),
+      timelineMetric.getAppId(),
+      timelineMetric.getInstanceId(),
+      null,
+      rs.getLong("SERVER_TIME")
+    );
+
+    double value;
+    switch(f.getReadFunction()){
+      case AVG:
+        value = rs.getDouble("METRIC_SUM") / rs.getInt(countColumnName);
+        break;
+      case MIN:
+        value = rs.getDouble("METRIC_MIN");
+        break;
+      case MAX:
+        value = rs.getDouble("METRIC_MAX");
+        break;
+      case SUM:
+        value = rs.getDouble("METRIC_SUM");
+        break;
+      default:
+        value = rs.getDouble("METRIC_SUM") / rs.getInt(countColumnName);
+        break;
+    }
+
+    metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value);
+
+    return metric;
+  }
+
+  private void validateConditionIsNotEmpty(Condition condition) {
+    if (condition.isEmpty()) {
+      throw new IllegalArgumentException("No filter criteria specified.");
+    }
+  }
+
+  private Collection<List<Function>> findMetricFunctions(Multimap<String, 
List<Function>> metricFunctions,
+      String metricName) {
+
+    if (metricFunctions == null) {
+      return Collections.singletonList(Collections.singletonList(new 
Function()));
+    }
+
+    if (metricFunctions.containsKey(metricName)) {
+      return metricFunctions.get(metricName);
+    }
+
+    for (String metricNameEntry : metricFunctions.keySet()) {
+
+      String metricRegEx;
+      //Special case handling for metric name with * and __%.
+      //For example, dfs.NNTopUserOpCounts.windowMs=300000.op=*.user=%.count
+      // or dfs.NNTopUserOpCounts.windowMs=300000.op=__%.user=%.count
+      if (metricNameEntry.contains("*") || metricNameEntry.contains("__%")) {
+        String metricNameWithEscSeq = metricNameEntry.replace("*", 
"\\*").replace("__%", "..%");
+        metricRegEx = metricNameWithEscSeq.replace("%", ".*");
+      } else {
+        metricRegEx = metricNameEntry.replace("%", ".*");
+      }
+      if (metricName.matches(metricRegEx)) {
+        return metricFunctions.get(metricNameEntry);
+      }
+    }
+
+    return Collections.singletonList(Collections.singletonList(new 
Function()));
+  }
+
+  public void saveHostAggregateRecords(Map<TimelineMetric, 
MetricHostAggregate> hostAggregateMap,
+                                       String phoenixTableName) throws 
SQLException {
+
+    if (hostAggregateMap == null || hostAggregateMap.isEmpty()) {
+      LOG.debug("Empty aggregate records.");
+      return;
+    }
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+
+    long start = System.currentTimeMillis();
+    int rowCount = 0;
+
+    try {
+      stmt = conn.prepareStatement(
+        String.format(PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL, 
phoenixTableName));
+
+      for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
+        hostAggregateMap.entrySet()) {
+
+        TimelineMetric metric = metricAggregate.getKey();
+        MetricHostAggregate hostAggregate = metricAggregate.getValue();
+
+        byte[] uuid = metadataManagerInstance.getUuid(metric);
+        if (uuid == null) {
+          LOG.error("Error computing UUID for metric. Cannot write metric : " 
+ metric.toString());
+          continue;
+        }
+        rowCount++;
+        stmt.clearParameters();
+        stmt.setBytes(1, uuid);
+        stmt.setLong(2, metric.getStartTime());
+        stmt.setDouble(3, hostAggregate.getSum());
+        stmt.setDouble(4, hostAggregate.getMax());
+        stmt.setDouble(5, hostAggregate.getMin());
+        stmt.setDouble(6, hostAggregate.getNumberOfSamples());
+
+        try {
+          stmt.executeUpdate();
+        } catch (SQLException sql) {
+          LOG.error(sql);
+        }
+
+        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+          conn.commit();
+          rowCount = 0;
+        }
+
+      }
+
+      conn.commit();
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+
+    long end = System.currentTimeMillis();
+
+    if ((end - start) > 60000l) {
+      LOG.info("Time to save map: " + (end - start) + ", " +
+        "thread = " + Thread.currentThread().getClass());
+    }
+    if (aggregatorSink != null) {
+      try {
+        aggregatorSink.saveHostAggregateRecords(hostAggregateMap,
+            getTablePrecision(phoenixTableName));
+      } catch (Exception e) {
+        LOG.warn(
+            "Error writing host aggregate records metrics to external sink. "
+                + e);
+      }
+    }
+  }
+
+  /**
+   * Save Metric aggregate records.
+   *
+   * @throws SQLException
+   */
+  public void saveClusterAggregateRecords(Map<TimelineClusterMetric, 
MetricClusterAggregate> records)
+      throws SQLException {
+
+    if (records == null || records.isEmpty()) {
+      LOG.debug("Empty aggregate records.");
+      return;
+    }
+
+    long start = System.currentTimeMillis();
+    String sqlStr = 
String.format(PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL, 
PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    try {
+      stmt = conn.prepareStatement(sqlStr);
+      int rowCount = 0;
+
+      for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
+        aggregateEntry : records.entrySet()) {
+        TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
+        MetricClusterAggregate aggregate = aggregateEntry.getValue();
+
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("clusterMetric = " + clusterMetric + ", " +
+            "aggregate = " + aggregate);
+        }
+
+        rowCount++;
+        byte[] uuid =  metadataManagerInstance.getUuid(clusterMetric);
+        if (uuid == null) {
+          LOG.error("Error computing UUID for metric. Cannot write metrics : " 
+ clusterMetric.toString());
+          continue;
+        }
+        stmt.clearParameters();
+        stmt.setBytes(1, uuid);
+        stmt.setLong(2, clusterMetric.getTimestamp());
+        stmt.setDouble(3, aggregate.getSum());
+        stmt.setInt(4, aggregate.getNumberOfHosts());
+        stmt.setDouble(5, aggregate.getMax());
+        stmt.setDouble(6, aggregate.getMin());
+
+        try {
+          stmt.executeUpdate();
+        } catch (SQLException sql) {
+          // we have no way to verify it works!!!
+          LOG.error(sql);
+        }
+
+        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+          conn.commit();
+          rowCount = 0;
+        }
+      }
+
+      conn.commit();
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+    long end = System.currentTimeMillis();
+    if ((end - start) > 60000l) {
+      LOG.info("Time to save: " + (end - start) + ", " +
+        "thread = " + Thread.currentThread().getName());
+    }
+    if (aggregatorSink != null) {
+      try {
+        aggregatorSink.saveClusterAggregateRecords(records);
+      } catch (Exception e) {
+        LOG.warn("Error writing cluster aggregate records metrics to external 
sink. ", e);
+      }
+    }
+  }
+
+
+  /**
+   * Save Metric aggregate records.
+   *
+   * @throws SQLException
+   */
+  public void saveClusterAggregateRecordsSecond(Map<TimelineClusterMetric, 
MetricHostAggregate> records,
+                                                String tableName) throws 
SQLException {
+    if (records == null || records.isEmpty()) {
+      LOG.debug("Empty aggregate records.");
+      return;
+    }
+
+    long start = System.currentTimeMillis();
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    try {
+      stmt = 
conn.prepareStatement(String.format(PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL,
 tableName));
+      int rowCount = 0;
+
+      for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> 
aggregateEntry : records.entrySet()) {
+        TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
+        MetricHostAggregate aggregate = aggregateEntry.getValue();
+
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("clusterMetric = " + clusterMetric + ", " +
+            "aggregate = " + aggregate);
+        }
+
+        byte[] uuid = metadataManagerInstance.getUuid(clusterMetric);
+        if (uuid == null) {
+          LOG.error("Error computing UUID for metric. Cannot write metric : " 
+ clusterMetric.toString());
+          continue;
+        }
+
+        rowCount++;
+        stmt.clearParameters();
+        stmt.setBytes(1, uuid);
+        stmt.setLong(2, clusterMetric.getTimestamp());
+        stmt.setDouble(3, aggregate.getSum());
+        stmt.setLong(4, aggregate.getNumberOfSamples());
+        stmt.setDouble(5, aggregate.getMax());
+        stmt.setDouble(6, aggregate.getMin());
+
+        try {
+          stmt.executeUpdate();
+        } catch (SQLException sql) {
+          // we have no way to verify it works!!!
+          LOG.error(sql);
+        }
+
+        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+          conn.commit();
+          rowCount = 0;
+        }
+      }
+
+      conn.commit();
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+    long end = System.currentTimeMillis();
+    if ((end - start) > 60000l) {
+      LOG.info("Time to save: " + (end - start) + ", " +
+        "thread = " + Thread.currentThread().getName());
+    }
+    if (aggregatorSink != null) {
+      try {
+        aggregatorSink.saveClusterTimeAggregateRecords(records,
+            getTablePrecision(tableName));
+      } catch (Exception e) {
+        LOG.warn(
+            "Error writing cluster time aggregate records metrics to external 
sink. "
+                + e);
+      }
+    }
+  }
+
+  /**
+   * Get precision for a table
+   * @param tableName
+   * @return precision
+   */
+  private Precision getTablePrecision(String tableName) {
+    Precision tablePrecision = null;
+    switch (tableName) {
+    case PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME:
+      tablePrecision = Precision.SECONDS;
+      break;
+    case PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME:
+    case PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME:
+      tablePrecision = Precision.MINUTES;
+      break;
+    case PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME:
+    case PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME:
+      tablePrecision = Precision.HOURS;
+      break;
+    case PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME:
+    case PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME:
+      tablePrecision = Precision.DAYS;
+      break;
+    }
+    return tablePrecision;
+  }
+
+  /**
+   * Provide skip block cache hint for aggregator queries.
+   */
+  public boolean isSkipBlockCacheForAggregatorsEnabled() {
+    return skipBlockCacheForAggregatorsEnabled;
+  }
+
+  /**
+   * One time save of metadata when discovering topology during aggregation.
+   * @throws SQLException
+   */
+  public void saveHostAppsMetadata(Map<String, TimelineMetricHostMetadata> 
hostMetadata) throws SQLException {
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    try {
+      stmt = 
conn.prepareStatement(PhoenixTransactSQL.UPSERT_HOSTED_APPS_METADATA_SQL);
+      int rowCount = 0;
+
+      for (Map.Entry<String, TimelineMetricHostMetadata> hostedAppsEntry : 
hostMetadata.entrySet()) {
+        TimelineMetricHostMetadata timelineMetricHostMetadata = 
hostedAppsEntry.getValue();
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("HostedAppsMetadata: " + hostedAppsEntry);
+        }
+
+        stmt.clearParameters();
+        stmt.setString(1, hostedAppsEntry.getKey());
+        stmt.setBytes(2, timelineMetricHostMetadata.getUuid());
+        stmt.setString(3, 
StringUtils.join(timelineMetricHostMetadata.getHostedApps().keySet(), ","));
+        try {
+          stmt.executeUpdate();
+          rowCount++;
+        } catch (SQLException sql) {
+          LOG.error("Error saving hosted apps metadata.", sql);
+        }
+      }
+
+      conn.commit();
+      LOG.info("Saved " + rowCount + " hosted apps metadata records.");
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+  public void saveInstanceHostsMetadata(Map<String, Set<String>> 
instanceHostsMap) throws SQLException {
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    try {
+      stmt = 
conn.prepareStatement(PhoenixTransactSQL.UPSERT_INSTANCE_HOST_METADATA_SQL);
+      int rowCount = 0;
+
+      for (Map.Entry<String, Set<String>> hostInstancesEntry : 
instanceHostsMap.entrySet()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Host Instances Entry: " + hostInstancesEntry);
+        }
+
+        String instanceId = hostInstancesEntry.getKey();
+
+        for(String hostname : hostInstancesEntry.getValue()) {
+          stmt.clearParameters();
+          stmt.setString(1, instanceId);
+          stmt.setString(2, hostname);
+          try {
+            stmt.executeUpdate();
+            rowCount++;
+          } catch (SQLException sql) {
+            LOG.error("Error saving host instances metadata.", sql);
+          }
+        }
+
+      }
+
+      conn.commit();
+      LOG.info("Saved " + rowCount + " host instances metadata records.");
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+  /**
+   * Save metdata on updates.
+   * @param metricMetadata @Collection<@TimelineMetricMetadata>
+   * @throws SQLException
+   */
+  public void saveMetricMetadata(Collection<TimelineMetricMetadata> 
metricMetadata) throws SQLException {
+    if (metricMetadata.isEmpty()) {
+      LOG.info("No metadata records to save.");
+      return;
+    }
+
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+
+    try {
+      stmt = conn.prepareStatement(PhoenixTransactSQL.UPSERT_METADATA_SQL);
+      int rowCount = 0;
+
+      for (TimelineMetricMetadata metadata : metricMetadata) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("TimelineMetricMetadata: metricName = " + 
metadata.getMetricName()
+            + ", appId = " + metadata.getAppId()
+            + ", seriesStartTime = " + metadata.getSeriesStartTime()
+          );
+        }
+        try {
+          stmt.clearParameters();
+          stmt.setString(1, metadata.getMetricName());
+          stmt.setString(2, metadata.getAppId());
+          stmt.setString(3, metadata.getInstanceId());
+          stmt.setBytes(4, metadata.getUuid());
+          stmt.setString(5, metadata.getUnits());
+          stmt.setString(6, metadata.getType());
+          stmt.setLong(7, metadata.getSeriesStartTime());
+          stmt.setBoolean(8, metadata.isSupportsAggregates());
+          stmt.setBoolean(9, metadata.isWhitelisted());
+        } catch (Exception e) {
+          LOG.error("Exception in saving metric metadata entry. ");
+          continue;
+        }
+
+        try {
+          stmt.executeUpdate();
+          rowCount++;
+        } catch (SQLException sql) {
+          LOG.error("Error saving metadata.", sql);
+        }
+      }
+
+      conn.commit();
+      LOG.info("Saved " + rowCount + " metadata records.");
+
+    } finally {
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+  public Map<String, TimelineMetricHostMetadata> getHostedAppsMetadata() 
throws SQLException {
+    Map<String, TimelineMetricHostMetadata> hostedAppMap = new HashMap<>();
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+
+    try {
+      stmt = 
conn.prepareStatement(PhoenixTransactSQL.GET_HOSTED_APPS_METADATA_SQL);
+      rs = stmt.executeQuery();
+
+      while (rs.next()) {
+        TimelineMetricHostMetadata hostMetadata = new 
TimelineMetricHostMetadata(new 
HashSet<>(Arrays.asList(StringUtils.split(rs.getString("APP_IDS"), ","))));
+        hostMetadata.setUuid(rs.getBytes("UUID"));
+        hostedAppMap.put(rs.getString("HOSTNAME"), hostMetadata);
+      }
+
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+
+    return hostedAppMap;
+  }
+
+  public Map<String, Set<String>> getInstanceHostsMetdata() throws 
SQLException {
+    Map<String, Set<String>> instanceHostsMap = new HashMap<>();
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+
+    try {
+      stmt = 
conn.prepareStatement(PhoenixTransactSQL.GET_INSTANCE_HOST_METADATA_SQL);
+      rs = stmt.executeQuery();
+
+      while (rs.next()) {
+        String instanceId = rs.getString("INSTANCE_ID");
+        String hostname = rs.getString("HOSTNAME");
+
+        if (!instanceHostsMap.containsKey(instanceId)) {
+          instanceHostsMap.put(instanceId, new HashSet<String>());
+        }
+        instanceHostsMap.get(instanceId).add(hostname);
+      }
+
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+
+    return instanceHostsMap;
+  }
+
+  // No filter criteria support for now.
+  public Map<TimelineMetricMetadataKey, TimelineMetricMetadata> 
getTimelineMetricMetadata() throws SQLException {
+    Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataMap = new 
HashMap<>();
+    Connection conn = getConnection();
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+
+    try {
+      stmt = conn.prepareStatement(PhoenixTransactSQL.GET_METRIC_METADATA_SQL);
+      rs = stmt.executeQuery();
+
+      while (rs.next()) {
+        String metricName = rs.getString("METRIC_NAME");
+        String appId = rs.getString("APP_ID");
+        String instanceId = rs.getString("INSTANCE_ID");
+        TimelineMetricMetadata metadata = new TimelineMetricMetadata(
+          metricName,
+          appId,
+          instanceId,
+          rs.getString("UNITS"),
+          rs.getString("TYPE"),
+          rs.getLong("START_TIME"),
+          rs.getBoolean("SUPPORTS_AGGREGATION"),
+          rs.getBoolean("IS_WHITELISTED")
+        );
+
+        TimelineMetricMetadataKey key = new 
TimelineMetricMetadataKey(metricName, appId, instanceId);
+        metadata.setIsPersisted(true); // Always true on retrieval
+        metadata.setUuid(rs.getBytes("UUID"));
+        metadataMap.put(key, metadata);
+      }
+
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+
+    return metadataMap;
+  }
+
+  public void setMetadataInstance(TimelineMetricMetadataManager 
metadataManager) {
+    this.metadataManagerInstance = metadataManager;
+    TIMELINE_METRIC_READ_HELPER = new 
TimelineMetricReadHelper(this.metadataManagerInstance);
+  }
+}

Reply via email to