http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
deleted file mode 100644
index c438299..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ /dev/null
@@ -1,1904 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_SECOND_TABLE_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_BLOCKING_STORE_FILES;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_ENABLED;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_INSTANCE_HOST_TABLE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_TABLE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_HOSTED_APPS_METADATA_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_INSTANCE_HOST_METADATA_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.GET_METRIC_METADATA_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.PHOENIX_TABLES_REGEX_PATTERN;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_HOSTED_APPS_METADATA_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_INSTANCE_HOST_METADATA_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
-import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricHostMetadata;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalSinkProvider;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalMetricsSource;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import org.apache.phoenix.exception.PhoenixIOException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-
-import com.google.common.collect.Multimap;
-
-
-/**
- * Provides a facade over the Phoenix API to access HBase schema
- */
-public class PhoenixHBaseAccessor {
-  private static final Log LOG = LogFactory.getLog(PhoenixHBaseAccessor.class);
-
-  static final int PHOENIX_MAX_MUTATION_STATE_SIZE = 50000;
-  // Default stale data allowance set to 3 minutes, 2 minutes more than time
-  // it was collected. Also 2 minutes is the default aggregation interval at
-  // cluster and host levels.
-  static final long DEFAULT_OUT_OF_BAND_TIME_ALLOWANCE = 300000;
-  /**
-   * 22 metrics for 2hours in SECONDS (10 second data)
-   * => Reasonable upper bound on the limit such that our Precision 
calculation for a given time range makes sense.
-   */
-  private static final int METRICS_PER_MINUTE = 22;
-  private static final int POINTS_PER_MINUTE = 6;
-  public static int RESULTSET_LIMIT = (int)TimeUnit.HOURS.toMinutes(2) * 
METRICS_PER_MINUTE * POINTS_PER_MINUTE ;
-
-  static TimelineMetricReadHelper TIMELINE_METRIC_READ_HELPER = new 
TimelineMetricReadHelper();
-  static ObjectMapper mapper = new ObjectMapper();
-  static TypeReference<TreeMap<Long, Double>> metricValuesTypeRef = new 
TypeReference<TreeMap<Long, Double>>() {};
-
-  private final Configuration hbaseConf;
-  private final Configuration metricsConf;
-  private final RetryCounterFactory retryCounterFactory;
-  private final PhoenixConnectionProvider dataSource;
-  private final long outOfBandTimeAllowance;
-  private final int cacheSize;
-  private final boolean cacheEnabled;
-  private final BlockingQueue<TimelineMetrics> insertCache;
-  private ScheduledExecutorService scheduledExecutorService;
-  private MetricsCacheCommitterThread metricsCommiterThread;
-  private TimelineMetricsAggregatorSink aggregatorSink;
-  private final int cacheCommitInterval;
-  private final boolean skipBlockCacheForAggregatorsEnabled;
-  private final String timelineMetricsTablesDurability;
-  private final String timelineMetricsPrecisionTableDurability;
-  private TimelineMetricMetadataManager metadataManagerInstance;
-
-  static final String HSTORE_COMPACTION_CLASS_KEY =
-    "hbase.hstore.defaultengine.compactionpolicy.class";
-  static final String HSTORE_ENGINE_CLASS =
-    "hbase.hstore.engine.class";
-  static final String FIFO_COMPACTION_POLICY_CLASS =
-    "org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy";
-  static final String DATE_TIERED_COMPACTION_POLICY =
-    "org.apache.hadoop.hbase.regionserver.DateTieredStoreEngine";
-  static final String BLOCKING_STORE_FILES_KEY =
-    "hbase.hstore.blockingStoreFiles";
-
-  private Map<String, Integer> tableTTL = new HashMap<>();
-
-  private final TimelineMetricConfiguration configuration;
-  private List<InternalMetricsSource> rawMetricsSources = new ArrayList<>();
-
-  public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) {
-    this(TimelineMetricConfiguration.getInstance(), dataSource);
-  }
-
-  // Test friendly construction since mock instrumentation is difficult to get
-  // working with hadoop mini cluster
-  PhoenixHBaseAccessor(TimelineMetricConfiguration configuration,
-                       PhoenixConnectionProvider dataSource) {
-    this.configuration = TimelineMetricConfiguration.getInstance();
-    try {
-      this.hbaseConf = configuration.getHbaseConf();
-      this.metricsConf = configuration.getMetricsConf();
-    } catch (Exception e) {
-      throw new ExceptionInInitializerError("Cannot initialize 
configuration.");
-    }
-    if (dataSource == null) {
-      dataSource = new DefaultPhoenixDataSource(hbaseConf);
-    }
-    this.dataSource = dataSource;
-
-    RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, RESULTSET_LIMIT);
-    try {
-      Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
-    } catch (ClassNotFoundException e) {
-      LOG.error("Phoenix client jar not found in the classpath.", e);
-      throw new IllegalStateException(e);
-    }
-
-    this.retryCounterFactory = new 
RetryCounterFactory(metricsConf.getInt(GLOBAL_MAX_RETRIES, 10),
-      (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 3)));
-    this.outOfBandTimeAllowance = 
metricsConf.getLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE,
-      DEFAULT_OUT_OF_BAND_TIME_ALLOWANCE);
-    this.cacheEnabled = 
Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_ENABLED, "true"));
-    this.cacheSize = 
Integer.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_SIZE, "150"));
-    this.cacheCommitInterval = 
Integer.valueOf(metricsConf.get(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, "3"));
-    this.insertCache = new ArrayBlockingQueue<TimelineMetrics>(cacheSize);
-    this.skipBlockCacheForAggregatorsEnabled = 
metricsConf.getBoolean(AGGREGATORS_SKIP_BLOCK_CACHE, false);
-    this.timelineMetricsTablesDurability = 
metricsConf.get(TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY, "");
-    this.timelineMetricsPrecisionTableDurability = 
metricsConf.get(TIMELINE_METRICS_PRECISION_TABLE_DURABILITY, "");
-
-    tableTTL.put(METRICS_RECORD_TABLE_NAME, 
metricsConf.getInt(PRECISION_TABLE_TTL, 1 * 86400));  // 1 day
-    tableTTL.put(CONTAINER_METRICS_TABLE_NAME, 
metricsConf.getInt(CONTAINER_METRICS_TTL, 30 * 86400));  // 30 days
-    tableTTL.put(METRICS_AGGREGATE_MINUTE_TABLE_NAME, 
metricsConf.getInt(HOST_MINUTE_TABLE_TTL, 7 * 86400)); //7 days
-    tableTTL.put(METRICS_AGGREGATE_HOURLY_TABLE_NAME, 
metricsConf.getInt(HOST_HOUR_TABLE_TTL, 30 * 86400)); //30 days
-    tableTTL.put(METRICS_AGGREGATE_DAILY_TABLE_NAME, 
metricsConf.getInt(HOST_DAILY_TABLE_TTL, 365 * 86400)); //1 year
-    tableTTL.put(METRICS_CLUSTER_AGGREGATE_TABLE_NAME, 
metricsConf.getInt(CLUSTER_SECOND_TABLE_TTL, 7 * 86400)); //7 days
-    tableTTL.put(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, 
metricsConf.getInt(CLUSTER_MINUTE_TABLE_TTL, 30 * 86400)); //30 days
-    tableTTL.put(METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, 
metricsConf.getInt(CLUSTER_HOUR_TABLE_TTL, 365 * 86400)); //1 year
-    tableTTL.put(METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, 
metricsConf.getInt(CLUSTER_DAILY_TABLE_TTL, 730 * 86400)); //2 years
-
-    if (cacheEnabled) {
-      LOG.debug("Initialising and starting metrics cache committer thread...");
-      metricsCommiterThread = new MetricsCacheCommitterThread(this);
-      scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
-      scheduledExecutorService.scheduleWithFixedDelay(metricsCommiterThread, 
0, cacheCommitInterval, TimeUnit.SECONDS);
-    }
-
-    Class<? extends TimelineMetricsAggregatorSink> metricSinkClass =
-        metricsConf.getClass(TIMELINE_METRIC_AGGREGATOR_SINK_CLASS, null,
-            TimelineMetricsAggregatorSink.class);
-    if (metricSinkClass != null) {
-      aggregatorSink = ReflectionUtils.newInstance(metricSinkClass, 
metricsConf);
-      LOG.info("Initialized aggregator sink class " + metricSinkClass);
-    }
-
-    List<ExternalSinkProvider> externalSinkProviderList = 
configuration.getExternalSinkProviderList();
-    InternalSourceProvider internalSourceProvider = 
configuration.getInternalSourceProvider();
-    if (!externalSinkProviderList.isEmpty()) {
-      for (ExternalSinkProvider externalSinkProvider : 
externalSinkProviderList) {
-        ExternalMetricsSink rawMetricsSink = 
externalSinkProvider.getExternalMetricsSink(RAW_METRICS);
-        int interval = 
configuration.getExternalSinkInterval(externalSinkProvider.getClass().getSimpleName(),
 RAW_METRICS);
-        if (interval == -1) {
-          interval = cacheCommitInterval;
-        }
-        
rawMetricsSources.add(internalSourceProvider.getInternalMetricsSource(RAW_METRICS,
 interval, rawMetricsSink));
-      }
-    }
-    TIMELINE_METRIC_READ_HELPER = new 
TimelineMetricReadHelper(this.metadataManagerInstance);
-  }
-
-  public boolean isInsertCacheEmpty() {
-    return insertCache.isEmpty();
-  }
-
-  public void commitMetricsFromCache() {
-    LOG.debug("Clearing metrics cache");
-    List<TimelineMetrics> metricsList = new 
ArrayList<TimelineMetrics>(insertCache.size());
-    if (!insertCache.isEmpty()) {
-      insertCache.drainTo(metricsList); // More performant than poll
-    }
-    if (metricsList.size() > 0) {
-      commitMetrics(metricsList);
-      if (!rawMetricsSources.isEmpty()) {
-        for (InternalMetricsSource rawMetricsSource : rawMetricsSources) {
-          rawMetricsSource.publishTimelineMetrics(metricsList);
-        }
-      }
-    }
-  }
-
-  public void commitMetrics(TimelineMetrics timelineMetrics) {
-    commitMetrics(Collections.singletonList(timelineMetrics));
-  }
-
-  public void commitMetrics(Collection<TimelineMetrics> 
timelineMetricsCollection) {
-    LOG.debug("Committing metrics to store");
-    Connection conn = null;
-    PreparedStatement metricRecordStmt = null;
-
-    try {
-      conn = getConnection();
-      metricRecordStmt = conn.prepareStatement(String.format(
-              UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
-      for (TimelineMetrics timelineMetrics : timelineMetricsCollection) {
-        for (TimelineMetric metric : timelineMetrics.getMetrics()) {
-
-          metricRecordStmt.clearParameters();
-
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("host: " + metric.getHostName() + ", " +
-                    "metricName = " + metric.getMetricName() + ", " +
-                    "values: " + metric.getMetricValues());
-          }
-          double[] aggregates = AggregatorUtils.calculateAggregates(
-                  metric.getMetricValues());
-
-          byte[] uuid = metadataManagerInstance.getUuid(metric);
-          if (uuid == null) {
-            LOG.error("Error computing UUID for metric. Cannot write metrics : 
" + metric.toString());
-            continue;
-          }
-          metricRecordStmt.setBytes(1, uuid);
-          metricRecordStmt.setLong(2, metric.getStartTime());
-          metricRecordStmt.setDouble(3, aggregates[0]);
-          metricRecordStmt.setDouble(4, aggregates[1]);
-          metricRecordStmt.setDouble(5, aggregates[2]);
-          metricRecordStmt.setLong(6, (long) aggregates[3]);
-          String json = 
TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
-          metricRecordStmt.setString(7, json);
-
-          try {
-            metricRecordStmt.executeUpdate();
-          } catch (SQLException sql) {
-            LOG.error("Failed on insert records to store.", sql);
-          }
-        }
-      }
-
-      // commit() blocked if HBase unavailable
-      conn.commit();
-    } catch (Exception exception){
-      exception.printStackTrace();
-    }
-    finally {
-      if (metricRecordStmt != null) {
-        try {
-          metricRecordStmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-  }
-
-  private static TimelineMetric getLastTimelineMetricFromResultSet(ResultSet 
rs)
-      throws SQLException, IOException {
-    TimelineMetric metric = 
TIMELINE_METRIC_READ_HELPER.getTimelineMetricCommonsFromResultSet(rs);
-    
metric.setMetricValues(readLastMetricValueFromJSON(rs.getString("METRICS")));
-    return metric;
-  }
-
-  private static TreeMap<Long, Double> readLastMetricValueFromJSON(String json)
-      throws IOException {
-    TreeMap<Long, Double> values = readMetricFromJSON(json);
-    Long lastTimeStamp = values.lastKey();
-
-    TreeMap<Long, Double> valueMap = new TreeMap<Long, Double>();
-    valueMap.put(lastTimeStamp, values.get(lastTimeStamp));
-    return valueMap;
-  }
-
-  @SuppressWarnings("unchecked")
-  public static TreeMap<Long, Double> readMetricFromJSON(String json) throws 
IOException {
-    return mapper.readValue(json, metricValuesTypeRef);
-  }
-
-  private Connection getConnectionRetryingOnException()
-    throws SQLException, InterruptedException {
-    RetryCounter retryCounter = retryCounterFactory.create();
-    while (true) {
-      try{
-        return getConnection();
-      } catch (SQLException e) {
-        if(!retryCounter.shouldRetry()){
-          LOG.error("HBaseAccessor getConnection failed after "
-            + retryCounter.getMaxAttempts() + " attempts");
-          throw e;
-        }
-      }
-      retryCounter.sleepUntilNextRetry();
-    }
-  }
-
-  /**
-   * Get JDBC connection to HBase store. Assumption is that the hbase
-   * configuration is present on the classpath and loaded by the caller into
-   * the Configuration object.
-   * Phoenix already caches the HConnection between the client and HBase
-   * cluster.
-   *
-   * @return @java.sql.Connection
-   */
-  public Connection getConnection() throws SQLException {
-    return dataSource.getConnection();
-  }
-
-  /**
-   * Unit test purpose only for now.
-   * @return @HBaseAdmin
-   * @throws IOException
-   */
-  Admin getHBaseAdmin() throws IOException {
-    return dataSource.getHBaseAdmin();
-  }
-
-  protected void initMetricSchema() {
-    Connection conn = null;
-    Statement stmt = null;
-
-    String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING);
-    String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, 
DEFAULT_TABLE_COMPRESSION);
-
-
-    try {
-      LOG.info("Initializing metrics schema...");
-      conn = getConnectionRetryingOnException();
-      stmt = conn.createStatement();
-
-      // Metadata
-      String metadataSql = String.format(CREATE_METRICS_METADATA_TABLE_SQL,
-        encoding, compression);
-      stmt.executeUpdate(metadataSql);
-      stmt.executeUpdate(ALTER_METRICS_METADATA_TABLE);
-
-      String hostedAppSql = 
String.format(CREATE_HOSTED_APPS_METADATA_TABLE_SQL,
-        encoding, compression);
-      stmt.executeUpdate(hostedAppSql);
-
-      //Host Instances table
-      String hostedInstancesSql = String.format(CREATE_INSTANCE_HOST_TABLE_SQL,
-        encoding, compression);
-      stmt.executeUpdate(hostedInstancesSql);
-
-      // Container Metrics
-      stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL,
-        encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression));
-
-      // Host level
-      String precisionSql = String.format(CREATE_METRICS_TABLE_SQL,
-        encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression);
-      stmt.executeUpdate(precisionSql);
-
-      String hostMinuteAggregrateSql = 
String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
-        METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding,
-        tableTTL.get(METRICS_AGGREGATE_MINUTE_TABLE_NAME),
-        compression);
-      stmt.executeUpdate(hostMinuteAggregrateSql);
-
-      stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
-        METRICS_AGGREGATE_HOURLY_TABLE_NAME, encoding,
-        tableTTL.get(METRICS_AGGREGATE_HOURLY_TABLE_NAME),
-        compression));
-      stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
-        METRICS_AGGREGATE_DAILY_TABLE_NAME, encoding,
-        tableTTL.get(METRICS_AGGREGATE_DAILY_TABLE_NAME),
-        compression));
-
-      // Cluster level
-      String aggregateSql = 
String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL,
-        METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding,
-        tableTTL.get(METRICS_CLUSTER_AGGREGATE_TABLE_NAME),
-        compression);
-
-      stmt.executeUpdate(aggregateSql);
-      
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
-        METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, encoding,
-        tableTTL.get(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME),
-        compression));
-      
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
-        METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, encoding,
-        tableTTL.get(METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME),
-        compression));
-      
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
-        METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding,
-        tableTTL.get(METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME),
-        compression));
-
-
-      conn.commit();
-
-      LOG.info("Metrics schema initialized.");
-    } catch (SQLException | InterruptedException sql) {
-      LOG.error("Error creating Metrics Schema in HBase using Phoenix.", sql);
-      throw new MetricsSystemInitializationException(
-        "Error creating Metrics Schema in HBase using Phoenix.", sql);
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-    }
-  }
-
-  protected void initPoliciesAndTTL() {
-    Admin hBaseAdmin = null;
-    try {
-      hBaseAdmin = dataSource.getHBaseAdmin();
-    } catch (IOException e) {
-      LOG.warn("Unable to initialize HBaseAdmin for setting policies.", e);
-    }
-
-    TableName[] tableNames = null;
-    if (hBaseAdmin != null) {
-      try {
-        tableNames = hBaseAdmin.listTableNames(PHOENIX_TABLES_REGEX_PATTERN, 
false);
-      } catch (IOException e) {
-        LOG.warn("Unable to get table names from HBaseAdmin for setting 
policies.", e);
-        return;
-      }
-      if (tableNames == null || tableNames.length == 0) {
-        LOG.warn("Unable to get table names from HBaseAdmin for setting 
policies.");
-        return;
-      }
-      for (String tableName : PHOENIX_TABLES) {
-        try {
-          boolean modifyTable = false;
-          Optional<TableName> tableNameOptional = Arrays.stream(tableNames)
-            .filter(t -> tableName.equals(t.getNameAsString())).findFirst();
-
-          TableDescriptor tableDescriptor = null;
-          if (tableNameOptional.isPresent()) {
-            tableDescriptor = 
hBaseAdmin.getTableDescriptor(tableNameOptional.get());
-          }
-
-          if (tableDescriptor == null) {
-            LOG.warn("Unable to get table descriptor for " + tableName);
-            continue;
-          }
-
-          // @TableDescriptor is immutable by design
-          TableDescriptorBuilder tableDescriptorBuilder =
-            TableDescriptorBuilder.newBuilder(tableDescriptor);
-
-          //Set normalizer preferences
-          boolean enableNormalizer = 
hbaseConf.getBoolean("hbase.normalizer.enabled", false);
-          if (enableNormalizer ^ tableDescriptor.isNormalizationEnabled()) {
-            tableDescriptorBuilder.setNormalizationEnabled(enableNormalizer);
-            LOG.info("Normalizer set to " + enableNormalizer + " for " + 
tableName);
-            modifyTable = true;
-          }
-
-          //Set durability preferences
-          boolean durabilitySettingsModified = 
setDurabilityForTable(tableName, tableDescriptorBuilder);
-          modifyTable = modifyTable || durabilitySettingsModified;
-
-          //Set compaction policy preferences
-          boolean compactionPolicyModified = false;
-          compactionPolicyModified = setCompactionPolicyForTable(tableName, 
tableDescriptorBuilder);
-          modifyTable = modifyTable || compactionPolicyModified;
-
-          // Change TTL setting to match user configuration
-          ColumnFamilyDescriptor[] columnFamilyDescriptors = 
tableDescriptor.getColumnFamilies();
-          if (columnFamilyDescriptors != null) {
-            for (ColumnFamilyDescriptor familyDescriptor : 
columnFamilyDescriptors) {
-              int ttlValue = familyDescriptor.getTimeToLive();
-              if (ttlValue != tableTTL.get(tableName)) {
-                ColumnFamilyDescriptorBuilder familyDescriptorBuilder =
-                  ColumnFamilyDescriptorBuilder.newBuilder(familyDescriptor);
-
-                familyDescriptorBuilder.setTimeToLive(tableTTL.get(tableName));
-
-                LOG.info("Setting TTL on table: " + tableName + " to : " +
-                  tableTTL.get(tableName) + " seconds.");
-
-                hBaseAdmin.modifyColumnFamily(tableNameOptional.get(), 
familyDescriptorBuilder.build());
-                // modifyTable = true;
-              }
-            }
-          }
-
-          // Persist only if anything changed
-          if (modifyTable) {
-            hBaseAdmin.modifyTable(tableNameOptional.get(), 
tableDescriptorBuilder.build());
-          }
-
-        } catch (IOException e) {
-          LOG.error("Failed setting policies for " + tableName, e);
-        }
-      }
-      try {
-        hBaseAdmin.close();
-      } catch (IOException e) {
-        LOG.warn("Exception on HBaseAdmin close.", e);
-      }
-    }
-  }
-
-  private boolean setDurabilityForTable(String tableName, 
TableDescriptorBuilder tableDescriptor) {
-
-    boolean modifyTable = false;
-    // Set WAL preferences
-    if (METRICS_RECORD_TABLE_NAME.equals(tableName)) {
-      if (!timelineMetricsPrecisionTableDurability.isEmpty()) {
-        LOG.info("Setting WAL option " + 
timelineMetricsPrecisionTableDurability + " for table : " + tableName);
-        boolean validDurability = true;
-        if ("SKIP_WAL".equals(timelineMetricsPrecisionTableDurability)) {
-          tableDescriptor.setDurability(Durability.SKIP_WAL);
-        } else if ("SYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) 
{
-          tableDescriptor.setDurability(Durability.SYNC_WAL);
-        } else if 
("ASYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) {
-          tableDescriptor.setDurability(Durability.ASYNC_WAL);
-        } else if 
("FSYNC_WAL".equals(timelineMetricsPrecisionTableDurability)) {
-          tableDescriptor.setDurability(Durability.FSYNC_WAL);
-        } else {
-          LOG.info("Unknown value for " + 
TIMELINE_METRICS_PRECISION_TABLE_DURABILITY + " : " + 
timelineMetricsPrecisionTableDurability);
-          validDurability = false;
-        }
-        if (validDurability) {
-          modifyTable = true;
-        }
-      }
-    } else {
-      if (!timelineMetricsTablesDurability.isEmpty()) {
-        LOG.info("Setting WAL option " + timelineMetricsTablesDurability + " 
for table : " + tableName);
-        boolean validDurability = true;
-        if ("SKIP_WAL".equals(timelineMetricsTablesDurability)) {
-          tableDescriptor.setDurability(Durability.SKIP_WAL);
-        } else if ("SYNC_WAL".equals(timelineMetricsTablesDurability)) {
-          tableDescriptor.setDurability(Durability.SYNC_WAL);
-        } else if ("ASYNC_WAL".equals(timelineMetricsTablesDurability)) {
-          tableDescriptor.setDurability(Durability.ASYNC_WAL);
-        } else if ("FSYNC_WAL".equals(timelineMetricsTablesDurability)) {
-          tableDescriptor.setDurability(Durability.FSYNC_WAL);
-        } else {
-          LOG.info("Unknown value for " + 
TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY + " : " + 
timelineMetricsTablesDurability);
-          validDurability = false;
-        }
-        if (validDurability) {
-          modifyTable = true;
-        }
-      }
-    }
-    return modifyTable;
-  }
-
-  private boolean setCompactionPolicyForTable(String tableName, 
TableDescriptorBuilder tableDescriptorBuilder) {
-
-    boolean modifyTable = false;
-
-    String compactionPolicyKey = 
metricsConf.get(TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY,
-      HSTORE_ENGINE_CLASS);
-    String compactionPolicyClass = 
metricsConf.get(TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS,
-      DATE_TIERED_COMPACTION_POLICY);
-    int blockingStoreFiles = 
hbaseConf.getInt(TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES, 
60);
-
-    if (tableName.equals(METRICS_RECORD_TABLE_NAME)) {
-      compactionPolicyKey = 
metricsConf.get(TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY,
-        HSTORE_COMPACTION_CLASS_KEY);
-      compactionPolicyClass = 
metricsConf.get(TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS,
-        FIFO_COMPACTION_POLICY_CLASS);
-      blockingStoreFiles = 
hbaseConf.getInt(TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES, 
1000);
-    }
-    
-    if (StringUtils.isEmpty(compactionPolicyKey) || 
StringUtils.isEmpty(compactionPolicyClass)) {
-      // Default blockingStoreFiles = 300
-      modifyTable = setHbaseBlockingStoreFiles(tableDescriptorBuilder, 
tableName, 300);
-    } else {
-      tableDescriptorBuilder.setValue(compactionPolicyKey, 
compactionPolicyClass);
-      tableDescriptorBuilder.removeValue(HSTORE_ENGINE_CLASS.getBytes());
-      
tableDescriptorBuilder.removeValue(HSTORE_COMPACTION_CLASS_KEY.getBytes());
-      setHbaseBlockingStoreFiles(tableDescriptorBuilder, tableName, 
blockingStoreFiles);
-      modifyTable = true;
-    }
-
-    return modifyTable;
-  }
-
-  private boolean setHbaseBlockingStoreFiles(TableDescriptorBuilder 
tableDescriptor,
-                                             String tableName, int value) {
-    int blockingStoreFiles = hbaseConf.getInt(HBASE_BLOCKING_STORE_FILES, 
value);
-    if (blockingStoreFiles != value) {
-      blockingStoreFiles = value;
-      tableDescriptor.setValue(BLOCKING_STORE_FILES_KEY, 
String.valueOf(value));
-      LOG.info("Setting config property " + BLOCKING_STORE_FILES_KEY +
-        " = " + blockingStoreFiles + " for " + tableName);
-      return true;
-    }
-    return false;
-  }
-
-  protected String getSplitPointsStr(String splitPoints) {
-    if (StringUtils.isEmpty(splitPoints.trim())) {
-      return "";
-    }
-    String[] points = splitPoints.split(",");
-    if (points.length > 0) {
-      StringBuilder sb = new StringBuilder(" SPLIT ON ");
-      sb.append("(");
-      for (String point : points) {
-        sb.append("'");
-        sb.append(point.trim());
-        sb.append("'");
-        sb.append(",");
-      }
-      sb.deleteCharAt(sb.length() - 1);
-      sb.append(")");
-      return sb.toString();
-    }
-    return "";
-  }
-
-  /**
-   * Insert precision YARN container data.
-   */
-  public void insertContainerMetrics(List<ContainerMetric> metrics)
-      throws SQLException, IOException {
-    Connection conn = getConnection();
-    PreparedStatement metricRecordStmt = null;
-
-    try {
-      metricRecordStmt = conn.prepareStatement(
-          String.format(UPSERT_CONTAINER_METRICS_SQL, 
CONTAINER_METRICS_TABLE_NAME));
-      for (ContainerMetric metric : metrics) {
-        metricRecordStmt.clearParameters();
-        metricRecordStmt.setString(1, 
ContainerId.fromString(metric.getContainerId())
-            .getApplicationAttemptId().getApplicationId().toString());
-        metricRecordStmt.setString(2, metric.getContainerId());
-        metricRecordStmt.setTimestamp(3, new Timestamp(metric.getStartTime()));
-        metricRecordStmt.setTimestamp(4, new 
Timestamp(metric.getFinishTime()));
-        metricRecordStmt.setLong(5, metric.getFinishTime() - 
metric.getStartTime());
-        metricRecordStmt.setString(6, metric.getHostName());
-        metricRecordStmt.setInt(7, metric.getExitCode());
-        metricRecordStmt.setLong(8, metric.getLocalizationDuration());
-        metricRecordStmt.setLong(9, metric.getLaunchDuration());
-        metricRecordStmt.setDouble(10, (double) metric.getPmemLimit() / 1024);
-        metricRecordStmt.setDouble(11,
-            ((double) metric.getPmemLimit() / 1024) * (metric.getFinishTime()
-                - metric.getStartTime()));
-        metricRecordStmt.setDouble(12, (double) metric.getVmemLimit() / 1024);
-        metricRecordStmt.setDouble(13, (double) metric.getPmemUsedMin() / 
1024);
-        metricRecordStmt.setDouble(14, (double) metric.getPmemUsedMax() / 
1024);
-        metricRecordStmt.setDouble(15, (double) metric.getPmemUsedAvg() / 
1024);
-        metricRecordStmt.setDouble(16, (double) metric.getPmem50Pct() / 1024);
-        metricRecordStmt.setDouble(17, (double) metric.getPmem75Pct() / 1024);
-        metricRecordStmt.setDouble(18, (double) metric.getPmem90Pct() / 1024);
-        metricRecordStmt.setDouble(19, (double) metric.getPmem95Pct()/ 1024);
-        metricRecordStmt.setDouble(20, (double) metric.getPmem99Pct() / 1024);
-        metricRecordStmt.setDouble(21, (double) metric.getPmemLimit() / 1024
-            - (double) metric.getPmemUsedMax() / 1024);
-        metricRecordStmt.setDouble(22, ((double) metric.getPmemLimit() / 1024
-            - (double) metric.getPmemUsedMax() / 1024) * 
(metric.getFinishTime()
-            - metric.getStartTime()));
-
-        try {
-          metricRecordStmt.executeUpdate();
-        } catch (SQLException sql) {
-          LOG.error("Failed on insert records to store.", sql);
-        }
-      }
-
-      conn.commit();
-    } finally {
-      if (metricRecordStmt != null) {
-        try {
-          metricRecordStmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-  }
-
-  /**
-   * Insert precision data.
-   */
-  public void insertMetricRecordsWithMetadata(TimelineMetricMetadataManager 
metadataManager,
-                                              TimelineMetrics metrics, boolean 
skipCache) throws SQLException, IOException {
-    List<TimelineMetric> timelineMetrics = metrics.getMetrics();
-    if (timelineMetrics == null || timelineMetrics.isEmpty()) {
-      LOG.debug("Empty metrics insert request.");
-      return;
-    }
-    for (Iterator<TimelineMetric> iterator = timelineMetrics.iterator(); 
iterator.hasNext();) {
-
-      TimelineMetric tm = iterator.next();
-
-      boolean acceptMetric = TimelineMetricsFilter.acceptMetric(tm);
-
-      // Write to metadata cache on successful write to store
-      if (metadataManager != null) {
-        metadataManager.putIfModifiedTimelineMetricMetadata(
-                metadataManager.getTimelineMetricMetadata(tm, acceptMetric));
-
-        metadataManager.putIfModifiedHostedAppsMetadata(
-                tm.getHostName(), tm.getAppId());
-
-        if (!tm.getAppId().equals("FLUME_HANDLER")) {
-          
metadataManager.putIfModifiedHostedInstanceMetadata(tm.getInstanceId(), 
tm.getHostName());
-        }
-      }
-      if (!acceptMetric) {
-        iterator.remove();
-      }
-    }
-
-    if  (!skipCache && cacheEnabled) {
-      LOG.debug("Adding metrics to cache");
-      if (insertCache.size() >= cacheSize) {
-        commitMetricsFromCache();
-      }
-      try {
-        insertCache.put(metrics); // blocked while the queue is full
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-    } else {
-      LOG.debug("Skipping metrics cache");
-      commitMetrics(metrics);
-    }
-  }
-
-  public void insertMetricRecords(TimelineMetrics metrics, boolean skipCache) 
throws SQLException, IOException {
-    insertMetricRecordsWithMetadata(null, metrics, skipCache);
-  }
-
-  public void insertMetricRecords(TimelineMetrics metrics) throws 
SQLException, IOException {
-    insertMetricRecords(metrics, false);
-  }
-
-
-  @SuppressWarnings("unchecked")
-  public TimelineMetrics getMetricRecords(
-    final Condition condition, Multimap<String, List<Function>> 
metricFunctions)
-    throws SQLException, IOException {
-
-    validateConditionIsNotEmpty(condition);
-
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-    TimelineMetrics metrics = new TimelineMetrics();
-
-    try {
-      //get latest
-      if (condition.isPointInTime()){
-        getLatestMetricRecords(condition, conn, metrics);
-      } else {
-        if (condition.getEndTime() >= condition.getStartTime()) {
-          stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
-          rs = stmt.executeQuery();
-          while (rs.next()) {
-            appendMetricFromResultSet(metrics, condition, metricFunctions, rs);
-          }
-        } else {
-          LOG.warn("Skipping metrics query because endTime < startTime");
-        }
-      }
-
-    } catch (PhoenixIOException pioe) {
-      Throwable pioe2 = pioe.getCause();
-      // Need to find out if this is exception "Could not find hash cache
-      // for joinId" or another PhoenixIOException
-      if (pioe2 instanceof PhoenixIOException &&
-        pioe2.getCause() instanceof DoNotRetryIOException) {
-        String className = null;
-        for (StackTraceElement ste : pioe2.getCause().getStackTrace()) {
-          className = ste.getClassName();
-        }
-
-        if (className != null && className.equals("HashJoinRegionScanner")) {
-          LOG.error("The cache might have expired and have been removed. Try 
to" +
-            " increase the cache size by setting bigger value for " +
-            "phoenix.coprocessor.maxMetaDataCacheSize in ams-hbase-site 
config." +
-            " Falling back to sort-merge join algorithm.");
-          PhoenixTransactSQL.setSortMergeJoinEnabled(true);
-        }
-      }
-      throw pioe;
-    } catch (RuntimeException ex) {
-      // We need to find out if this is a real IO exception
-      // or exception "maxStamp is smaller than minStamp"
-      // which is thrown in hbase TimeRange.java
-      Throwable io = ex.getCause();
-      String className = null;
-      if (io != null) {
-        for (StackTraceElement ste : io.getStackTrace()) {
-          className = ste.getClassName();
-        }
-      }
-      if (className != null && className.equals("TimeRange")) {
-        // This is "maxStamp is smaller than minStamp" exception
-        // Log error and return empty metrics
-        LOG.debug(io);
-        return new TimelineMetrics();
-      } else {
-        throw ex;
-      }
-
-    } finally {
-      if (rs != null) {
-        try {
-          rs.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-
-    LOG.debug("Metrics records size: " + metrics.getMetrics().size());
-    return metrics;
-  }
-
-  /**
-   * Apply aggregate function to the result if supplied else get precision
-   * or aggregate data with default function applied.
-   */
-  private void appendMetricFromResultSet(TimelineMetrics metrics, Condition 
condition,
-                                         Multimap<String, List<Function>> 
metricFunctions,
-                                         ResultSet rs) throws SQLException, 
IOException {
-    byte[] uuid = rs.getBytes("UUID");
-    String metricName = metadataManagerInstance.getMetricNameFromUuid(uuid);
-    Collection<List<Function>> functionList = 
findMetricFunctions(metricFunctions, metricName);
-
-    for (List<Function> functions : functionList) {
-      // Apply aggregation function if present
-      if ((functions != null && !functions.isEmpty())) {
-        if (functions.size() > 1) {
-          throw new IllegalArgumentException("Multiple aggregate functions not 
supported.");
-        }
-        for (Function f : functions) {
-          if (f.getReadFunction() == Function.ReadFunction.VALUE) {
-            getTimelineMetricsFromResultSet(metrics, f, condition, rs);
-          } else {
-            SingleValuedTimelineMetric metric =
-              
TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f);
-
-            if (condition.isGrouped()) {
-              metrics.addOrMergeTimelineMetric(metric);
-            } else {
-              metrics.getMetrics().add(metric.getTimelineMetric());
-            }
-          }
-        }
-      } else {
-        // No aggregation requested
-        // Execution never goes here, function always contain at least 1 
element
-        getTimelineMetricsFromResultSet(metrics, null, condition, rs);
-      }
-    }
-  }
-
-  private void getTimelineMetricsFromResultSet(TimelineMetrics metrics, 
Function f, Condition condition, ResultSet rs) throws SQLException, IOException 
{
-    if (condition.getPrecision().equals(Precision.SECONDS)) {
-      TimelineMetric metric = 
TIMELINE_METRIC_READ_HELPER.getTimelineMetricFromResultSet(rs);
-      if (f != null && f.getSuffix() != null) { //Case : Requesting "._rate" 
for precision data
-        metric.setMetricName(metric.getMetricName() + f.getSuffix());
-      }
-      if (condition.isGrouped()) {
-        metrics.addOrMergeTimelineMetric(metric);
-      } else {
-        metrics.getMetrics().add(metric);
-      }
-
-    } else {
-      SingleValuedTimelineMetric metric =
-        
TIMELINE_METRIC_READ_HELPER.getAggregatedTimelineMetricFromResultSet(rs, f);
-      if (condition.isGrouped()) {
-        metrics.addOrMergeTimelineMetric(metric);
-      } else {
-        metrics.getMetrics().add(metric.getTimelineMetric());
-      }
-    }
-  }
-
-  private void getLatestMetricRecords(Condition condition, Connection conn,
-                                      TimelineMetrics metrics) throws 
SQLException, IOException {
-
-    validateConditionIsNotEmpty(condition);
-
-    PreparedStatement stmt = 
PhoenixTransactSQL.prepareGetLatestMetricSqlStmt(conn, condition);
-    ResultSet rs = null;
-    try {
-      rs = stmt.executeQuery();
-      while (rs.next()) {
-        TimelineMetric metric = getLastTimelineMetricFromResultSet(rs);
-        metrics.getMetrics().add(metric);
-      }
-    } finally {
-      if (rs != null) {
-        try {
-          rs.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (stmt != null) {
-        stmt.close();
-      }
-    }
-  }
-
-  /**
-   * Get metrics aggregated across hosts.
-   *
-   * @param condition @Condition
-   * @return @TimelineMetrics
-   * @throws SQLException
-   */
-  public TimelineMetrics getAggregateMetricRecords(final Condition condition,
-      Multimap<String, List<Function>> metricFunctions) throws SQLException {
-
-    validateConditionIsNotEmpty(condition);
-
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-    TimelineMetrics metrics = new TimelineMetrics();
-
-    try {
-      //get latest
-      if(condition.isPointInTime()) {
-        getLatestAggregateMetricRecords(condition, conn, metrics, 
metricFunctions);
-      } else {
-        stmt = PhoenixTransactSQL.prepareGetAggregateSqlStmt(conn, condition);
-
-        rs = stmt.executeQuery();
-        while (rs.next()) {
-          appendAggregateMetricFromResultSet(metrics, condition, 
metricFunctions, rs);
-        }
-      }
-    } finally {
-      if (rs != null) {
-        try {
-          rs.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-
-    LOG.debug("Aggregate records size: " + metrics.getMetrics().size());
-    return metrics;
-  }
-
-  private void appendAggregateMetricFromResultSet(TimelineMetrics metrics,
-      Condition condition, Multimap<String, List<Function>> metricFunctions,
-      ResultSet rs) throws SQLException {
-
-    byte[] uuid = rs.getBytes("UUID");
-    String metricName = metadataManagerInstance.getMetricNameFromUuid(uuid);
-    Collection<List<Function>> functionList = 
findMetricFunctions(metricFunctions, metricName);
-
-    for (List<Function> functions : functionList) {
-      for (Function aggregateFunction : functions) {
-        SingleValuedTimelineMetric metric;
-
-        if (condition.getPrecision() == Precision.MINUTES
-          || condition.getPrecision() == Precision.HOURS
-          || condition.getPrecision() == Precision.DAYS) {
-          metric = getAggregateTimelineMetricFromResultSet(rs, 
aggregateFunction, false);
-        } else {
-          metric = getAggregateTimelineMetricFromResultSet(rs, 
aggregateFunction, true);
-        }
-
-        if (condition.isGrouped()) {
-          metrics.addOrMergeTimelineMetric(metric);
-        } else {
-          metrics.getMetrics().add(metric.getTimelineMetric());
-        }
-      }
-    }
-
-  }
-
-  private void getLatestAggregateMetricRecords(Condition condition,
-      Connection conn, TimelineMetrics metrics,
-      Multimap<String, List<Function>> metricFunctions) throws SQLException {
-
-    PreparedStatement stmt = null;
-    SplitByMetricNamesCondition splitCondition =
-      new SplitByMetricNamesCondition(condition);
-
-    for (byte[] uuid: condition.getUuids()) {
-
-      splitCondition.setCurrentUuid(uuid);
-      stmt = PhoenixTransactSQL.prepareGetLatestAggregateMetricSqlStmt(conn, 
splitCondition);
-      ResultSet rs = null;
-      try {
-        rs = stmt.executeQuery();
-        while (rs.next()) {
-          String metricName = 
metadataManagerInstance.getMetricNameFromUuid(uuid);
-          Collection<List<Function>> functionList = 
findMetricFunctions(metricFunctions, metricName);
-          for (List<Function> functions : functionList) {
-            if (functions != null) {
-              for (Function f : functions) {
-                SingleValuedTimelineMetric metric =
-                  getAggregateTimelineMetricFromResultSet(rs, f, true);
-
-                if (condition.isGrouped()) {
-                  metrics.addOrMergeTimelineMetric(metric);
-                } else {
-                  metrics.getMetrics().add(metric.getTimelineMetric());
-                }
-              }
-            } else {
-              SingleValuedTimelineMetric metric =
-                getAggregateTimelineMetricFromResultSet(rs, new Function(), 
true);
-              metrics.getMetrics().add(metric.getTimelineMetric());
-            }
-          }
-        }
-      } finally {
-        if (rs != null) {
-          try {
-            rs.close();
-          } catch (SQLException e) {
-            // Ignore
-          }
-        }
-        if (stmt != null) {
-          stmt.close();
-        }
-      }
-    }
-  }
-
-  private SingleValuedTimelineMetric 
getAggregateTimelineMetricFromResultSet(ResultSet rs,
-      Function f, boolean useHostCount) throws SQLException {
-
-    String countColumnName = "METRIC_COUNT";
-    if (useHostCount) {
-      countColumnName = "HOSTS_COUNT";
-    }
-
-    byte[] uuid = rs.getBytes("UUID");
-    TimelineMetric timelineMetric = 
metadataManagerInstance.getMetricFromUuid(uuid);
-
-    SingleValuedTimelineMetric metric = new SingleValuedTimelineMetric(
-      timelineMetric.getMetricName() + f.getSuffix(),
-      timelineMetric.getAppId(),
-      timelineMetric.getInstanceId(),
-      null,
-      rs.getLong("SERVER_TIME")
-    );
-
-    double value;
-    switch(f.getReadFunction()){
-      case AVG:
-        value = rs.getDouble("METRIC_SUM") / rs.getInt(countColumnName);
-        break;
-      case MIN:
-        value = rs.getDouble("METRIC_MIN");
-        break;
-      case MAX:
-        value = rs.getDouble("METRIC_MAX");
-        break;
-      case SUM:
-        value = rs.getDouble("METRIC_SUM");
-        break;
-      default:
-        value = rs.getDouble("METRIC_SUM") / rs.getInt(countColumnName);
-        break;
-    }
-
-    metric.setSingleTimeseriesValue(rs.getLong("SERVER_TIME"), value);
-
-    return metric;
-  }
-
-  private void validateConditionIsNotEmpty(Condition condition) {
-    if (condition.isEmpty()) {
-      throw new IllegalArgumentException("No filter criteria specified.");
-    }
-  }
-
-  private Collection<List<Function>> findMetricFunctions(Multimap<String, 
List<Function>> metricFunctions,
-      String metricName) {
-    if (metricFunctions.containsKey(metricName)) {
-      return metricFunctions.get(metricName);
-    }
-
-    for (String metricNameEntry : metricFunctions.keySet()) {
-
-      String metricRegEx;
-      //Special case handling for metric name with * and __%.
-      //For example, dfs.NNTopUserOpCounts.windowMs=300000.op=*.user=%.count
-      // or dfs.NNTopUserOpCounts.windowMs=300000.op=__%.user=%.count
-      if (metricNameEntry.contains("*") || metricNameEntry.contains("__%")) {
-        String metricNameWithEscSeq = metricNameEntry.replace("*", 
"\\*").replace("__%", "..%");
-        metricRegEx = metricNameWithEscSeq.replace("%", ".*");
-      } else {
-        metricRegEx = metricNameEntry.replace("%", ".*");
-      }
-      if (metricName.matches(metricRegEx)) {
-        return metricFunctions.get(metricNameEntry);
-      }
-    }
-
-    return null;
-  }
-
-  public void saveHostAggregateRecords(Map<TimelineMetric, 
MetricHostAggregate> hostAggregateMap,
-                                       String phoenixTableName) throws 
SQLException {
-
-    if (hostAggregateMap == null || hostAggregateMap.isEmpty()) {
-      LOG.debug("Empty aggregate records.");
-      return;
-    }
-
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-
-    long start = System.currentTimeMillis();
-    int rowCount = 0;
-
-    try {
-      stmt = conn.prepareStatement(
-        String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
-
-      for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
-        hostAggregateMap.entrySet()) {
-
-        TimelineMetric metric = metricAggregate.getKey();
-        MetricHostAggregate hostAggregate = metricAggregate.getValue();
-
-        byte[] uuid = metadataManagerInstance.getUuid(metric);
-        if (uuid == null) {
-          LOG.error("Error computing UUID for metric. Cannot write metric : " 
+ metric.toString());
-          continue;
-        }
-        rowCount++;
-        stmt.clearParameters();
-        stmt.setBytes(1, uuid);
-        stmt.setLong(2, metric.getStartTime());
-        stmt.setDouble(3, hostAggregate.getSum());
-        stmt.setDouble(4, hostAggregate.getMax());
-        stmt.setDouble(5, hostAggregate.getMin());
-        stmt.setDouble(6, hostAggregate.getNumberOfSamples());
-
-        try {
-          stmt.executeUpdate();
-        } catch (SQLException sql) {
-          LOG.error(sql);
-        }
-
-        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
-          conn.commit();
-          rowCount = 0;
-        }
-
-      }
-
-      conn.commit();
-
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-
-    long end = System.currentTimeMillis();
-
-    if ((end - start) > 60000l) {
-      LOG.info("Time to save map: " + (end - start) + ", " +
-        "thread = " + Thread.currentThread().getClass());
-    }
-    if (aggregatorSink != null) {
-      try {
-        aggregatorSink.saveHostAggregateRecords(hostAggregateMap,
-            getTablePrecision(phoenixTableName));
-      } catch (Exception e) {
-        LOG.warn(
-            "Error writing host aggregate records metrics to external sink. "
-                + e);
-      }
-    }
-  }
-
-  /**
-   * Save Metric aggregate records.
-   *
-   * @throws SQLException
-   */
-  public void saveClusterAggregateRecords(Map<TimelineClusterMetric, 
MetricClusterAggregate> records)
-      throws SQLException {
-
-    if (records == null || records.isEmpty()) {
-      LOG.debug("Empty aggregate records.");
-      return;
-    }
-
-    long start = System.currentTimeMillis();
-    String sqlStr = String.format(UPSERT_CLUSTER_AGGREGATE_SQL, 
METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(sqlStr);
-      int rowCount = 0;
-
-      for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
-        aggregateEntry : records.entrySet()) {
-        TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
-        MetricClusterAggregate aggregate = aggregateEntry.getValue();
-
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("clusterMetric = " + clusterMetric + ", " +
-            "aggregate = " + aggregate);
-        }
-
-        rowCount++;
-        byte[] uuid =  metadataManagerInstance.getUuid(clusterMetric);
-        if (uuid == null) {
-          LOG.error("Error computing UUID for metric. Cannot write metrics : " 
+ clusterMetric.toString());
-          continue;
-        }
-        stmt.clearParameters();
-        stmt.setBytes(1, uuid);
-        stmt.setLong(2, clusterMetric.getTimestamp());
-        stmt.setDouble(3, aggregate.getSum());
-        stmt.setInt(4, aggregate.getNumberOfHosts());
-        stmt.setDouble(5, aggregate.getMax());
-        stmt.setDouble(6, aggregate.getMin());
-
-        try {
-          stmt.executeUpdate();
-        } catch (SQLException sql) {
-          // we have no way to verify it works!!!
-          LOG.error(sql);
-        }
-
-        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
-          conn.commit();
-          rowCount = 0;
-        }
-      }
-
-      conn.commit();
-
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-    long end = System.currentTimeMillis();
-    if ((end - start) > 60000l) {
-      LOG.info("Time to save: " + (end - start) + ", " +
-        "thread = " + Thread.currentThread().getName());
-    }
-    if (aggregatorSink != null) {
-      try {
-        aggregatorSink.saveClusterAggregateRecords(records);
-      } catch (Exception e) {
-        LOG.warn("Error writing cluster aggregate records metrics to external 
sink. ", e);
-      }
-    }
-  }
-
-
-  /**
-   * Save Metric aggregate records.
-   *
-   * @throws SQLException
-   */
-  public void saveClusterAggregateRecordsSecond(Map<TimelineClusterMetric, 
MetricHostAggregate> records,
-                                                String tableName) throws 
SQLException {
-    if (records == null || records.isEmpty()) {
-      LOG.debug("Empty aggregate records.");
-      return;
-    }
-
-    long start = System.currentTimeMillis();
-
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-    try {
-      stmt = 
conn.prepareStatement(String.format(UPSERT_CLUSTER_AGGREGATE_TIME_SQL, 
tableName));
-      int rowCount = 0;
-
-      for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> 
aggregateEntry : records.entrySet()) {
-        TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
-        MetricHostAggregate aggregate = aggregateEntry.getValue();
-
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("clusterMetric = " + clusterMetric + ", " +
-            "aggregate = " + aggregate);
-        }
-
-        byte[] uuid = metadataManagerInstance.getUuid(clusterMetric);
-        if (uuid == null) {
-          LOG.error("Error computing UUID for metric. Cannot write metric : " 
+ clusterMetric.toString());
-          continue;
-        }
-
-        rowCount++;
-        stmt.clearParameters();
-        stmt.setBytes(1, uuid);
-        stmt.setLong(2, clusterMetric.getTimestamp());
-        stmt.setDouble(3, aggregate.getSum());
-        stmt.setLong(4, aggregate.getNumberOfSamples());
-        stmt.setDouble(5, aggregate.getMax());
-        stmt.setDouble(6, aggregate.getMin());
-
-        try {
-          stmt.executeUpdate();
-        } catch (SQLException sql) {
-          // we have no way to verify it works!!!
-          LOG.error(sql);
-        }
-
-        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
-          conn.commit();
-          rowCount = 0;
-        }
-      }
-
-      conn.commit();
-
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-    long end = System.currentTimeMillis();
-    if ((end - start) > 60000l) {
-      LOG.info("Time to save: " + (end - start) + ", " +
-        "thread = " + Thread.currentThread().getName());
-    }
-    if (aggregatorSink != null) {
-      try {
-        aggregatorSink.saveClusterTimeAggregateRecords(records,
-            getTablePrecision(tableName));
-      } catch (Exception e) {
-        LOG.warn(
-            "Error writing cluster time aggregate records metrics to external 
sink. "
-                + e);
-      }
-    }
-  }
-
-  /**
-   * Get precision for a table
-   * @param tableName
-   * @return precision
-   */
-  private Precision getTablePrecision(String tableName) {
-    Precision tablePrecision = null;
-    switch (tableName) {
-    case METRICS_RECORD_TABLE_NAME:
-      tablePrecision = Precision.SECONDS;
-      break;
-    case METRICS_AGGREGATE_MINUTE_TABLE_NAME:
-    case METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME:
-      tablePrecision = Precision.MINUTES;
-      break;
-    case METRICS_AGGREGATE_HOURLY_TABLE_NAME:
-    case METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME:
-      tablePrecision = Precision.HOURS;
-      break;
-    case METRICS_AGGREGATE_DAILY_TABLE_NAME:
-    case METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME:
-      tablePrecision = Precision.DAYS;
-      break;
-    }
-    return tablePrecision;
-  }
-
-  /**
-   * Provide skip block cache hint for aggregator queries.
-   */
-  public boolean isSkipBlockCacheForAggregatorsEnabled() {
-    return skipBlockCacheForAggregatorsEnabled;
-  }
-
-  /**
-   * One time save of metadata when discovering topology during aggregation.
-   * @throws SQLException
-   */
-  public void saveHostAppsMetadata(Map<String, TimelineMetricHostMetadata> 
hostMetadata) throws SQLException {
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(UPSERT_HOSTED_APPS_METADATA_SQL);
-      int rowCount = 0;
-
-      for (Map.Entry<String, TimelineMetricHostMetadata> hostedAppsEntry : 
hostMetadata.entrySet()) {
-        TimelineMetricHostMetadata timelineMetricHostMetadata = 
hostedAppsEntry.getValue();
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("HostedAppsMetadata: " + hostedAppsEntry);
-        }
-
-        stmt.clearParameters();
-        stmt.setString(1, hostedAppsEntry.getKey());
-        stmt.setBytes(2, timelineMetricHostMetadata.getUuid());
-        stmt.setString(3, 
StringUtils.join(timelineMetricHostMetadata.getHostedApps().keySet(), ","));
-        try {
-          stmt.executeUpdate();
-          rowCount++;
-        } catch (SQLException sql) {
-          LOG.error("Error saving hosted apps metadata.", sql);
-        }
-      }
-
-      conn.commit();
-      LOG.info("Saved " + rowCount + " hosted apps metadata records.");
-
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-  }
-
-  public void saveInstanceHostsMetadata(Map<String, Set<String>> 
instanceHostsMap) throws SQLException {
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-    try {
-      stmt = conn.prepareStatement(UPSERT_INSTANCE_HOST_METADATA_SQL);
-      int rowCount = 0;
-
-      for (Map.Entry<String, Set<String>> hostInstancesEntry : 
instanceHostsMap.entrySet()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Host Instances Entry: " + hostInstancesEntry);
-        }
-
-        String instanceId = hostInstancesEntry.getKey();
-
-        for(String hostname : hostInstancesEntry.getValue()) {
-          stmt.clearParameters();
-          stmt.setString(1, instanceId);
-          stmt.setString(2, hostname);
-          try {
-            stmt.executeUpdate();
-            rowCount++;
-          } catch (SQLException sql) {
-            LOG.error("Error saving host instances metadata.", sql);
-          }
-        }
-
-      }
-
-      conn.commit();
-      LOG.info("Saved " + rowCount + " host instances metadata records.");
-
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-  }
-
-  /**
-   * Save metdata on updates.
-   * @param metricMetadata @Collection<@TimelineMetricMetadata>
-   * @throws SQLException
-   */
-  public void saveMetricMetadata(Collection<TimelineMetricMetadata> 
metricMetadata) throws SQLException {
-    if (metricMetadata.isEmpty()) {
-      LOG.info("No metadata records to save.");
-      return;
-    }
-
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-
-    try {
-      stmt = conn.prepareStatement(UPSERT_METADATA_SQL);
-      int rowCount = 0;
-
-      for (TimelineMetricMetadata metadata : metricMetadata) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("TimelineMetricMetadata: metricName = " + 
metadata.getMetricName()
-            + ", appId = " + metadata.getAppId()
-            + ", seriesStartTime = " + metadata.getSeriesStartTime()
-          );
-        }
-        try {
-          stmt.clearParameters();
-          stmt.setString(1, metadata.getMetricName());
-          stmt.setString(2, metadata.getAppId());
-          stmt.setString(3, metadata.getInstanceId());
-          stmt.setBytes(4, metadata.getUuid());
-          stmt.setString(5, metadata.getUnits());
-          stmt.setString(6, metadata.getType());
-          stmt.setLong(7, metadata.getSeriesStartTime());
-          stmt.setBoolean(8, metadata.isSupportsAggregates());
-          stmt.setBoolean(9, metadata.isWhitelisted());
-        } catch (Exception e) {
-          LOG.error("Exception in saving metric metadata entry. ");
-          continue;
-        }
-
-        try {
-          stmt.executeUpdate();
-          rowCount++;
-        } catch (SQLException sql) {
-          LOG.error("Error saving metadata.", sql);
-        }
-      }
-
-      conn.commit();
-      LOG.info("Saved " + rowCount + " metadata records.");
-
-    } finally {
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-  }
-
-  public Map<String, TimelineMetricHostMetadata> getHostedAppsMetadata() 
throws SQLException {
-    Map<String, TimelineMetricHostMetadata> hostedAppMap = new HashMap<>();
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-
-    try {
-      stmt = conn.prepareStatement(GET_HOSTED_APPS_METADATA_SQL);
-      rs = stmt.executeQuery();
-
-      while (rs.next()) {
-        TimelineMetricHostMetadata hostMetadata = new 
TimelineMetricHostMetadata(new 
HashSet<>(Arrays.asList(StringUtils.split(rs.getString("APP_IDS"), ","))));
-        hostMetadata.setUuid(rs.getBytes("UUID"));
-        hostedAppMap.put(rs.getString("HOSTNAME"), hostMetadata);
-      }
-
-    } finally {
-      if (rs != null) {
-        try {
-          rs.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-
-    return hostedAppMap;
-  }
-
-  public Map<String, Set<String>> getInstanceHostsMetdata() throws 
SQLException {
-    Map<String, Set<String>> instanceHostsMap = new HashMap<>();
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-
-    try {
-      stmt = conn.prepareStatement(GET_INSTANCE_HOST_METADATA_SQL);
-      rs = stmt.executeQuery();
-
-      while (rs.next()) {
-        String instanceId = rs.getString("INSTANCE_ID");
-        String hostname = rs.getString("HOSTNAME");
-
-        if (!instanceHostsMap.containsKey(instanceId)) {
-          instanceHostsMap.put(instanceId, new HashSet<String>());
-        }
-        instanceHostsMap.get(instanceId).add(hostname);
-      }
-
-    } finally {
-      if (rs != null) {
-        try {
-          rs.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-
-    return instanceHostsMap;
-  }
-
-  // No filter criteria support for now.
-  public Map<TimelineMetricMetadataKey, TimelineMetricMetadata> 
getTimelineMetricMetadata() throws SQLException {
-    Map<TimelineMetricMetadataKey, TimelineMetricMetadata> metadataMap = new 
HashMap<>();
-    Connection conn = getConnection();
-    PreparedStatement stmt = null;
-    ResultSet rs = null;
-
-    try {
-      stmt = conn.prepareStatement(GET_METRIC_METADATA_SQL);
-      rs = stmt.executeQuery();
-
-      while (rs.next()) {
-        String metricName = rs.getString("METRIC_NAME");
-        String appId = rs.getString("APP_ID");
-        String instanceId = rs.getString("INSTANCE_ID");
-        TimelineMetricMetadata metadata = new TimelineMetricMetadata(
-          metricName,
-          appId,
-          instanceId,
-          rs.getString("UNITS"),
-          rs.getString("TYPE"),
-          rs.getLong("START_TIME"),
-          rs.getBoolean("SUPPORTS_AGGREGATION"),
-          rs.getBoolean("IS_WHITELISTED")
-        );
-
-        TimelineMetricMetadataKey key = new 
TimelineMetricMetadataKey(metricName, appId, instanceId);
-        metadata.setIsPersisted(true); // Always true on retrieval
-        metadata.setUuid(rs.getBytes("UUID"));
-        metadataMap.put(key, metadata);
-      }
-
-    } finally {
-      if (rs != null) {
-        try {
-          rs.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (stmt != null) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          // Ignore
-        }
-      }
-      if (conn != null) {
-        try {
-          conn.close();
-        } catch (SQLException sql) {
-          // Ignore
-        }
-      }
-    }
-
-    return metadataMap;
-  }
-
-  public void setMetadataInstance(TimelineMetricMetadataManager 
metadataManager) {
-    this.metadataManagerInstance = metadataManager;
-    TIMELINE_METRIC_READ_HELPER = new 
TimelineMetricReadHelper(this.metadataManagerInstance);
-  }
-}

Reply via email to