http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
new file mode 100644
index 0000000..a1ad18c
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
@@ -0,0 +1,712 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetAddress;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.ambari.metrics.core.timeline.source.InternalSourceProvider;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.ambari.metrics.core.timeline.sink.ExternalSinkProvider;
+import 
org.apache.ambari.metrics.core.timeline.source.DefaultInternalMetricsSourceProvider;
+import org.apache.log4j.Appender;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Logger;
+
+/**
+ * Configuration class that reads properties from ams-site.xml. All values
+ * for time or intervals are given in seconds.
+ */
+public class TimelineMetricConfiguration {
+  private static final Log LOG = 
LogFactory.getLog(TimelineMetricConfiguration.class);
+
+  public static final String HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml";
+  public static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+  public static final String METRICS_ENV_CONFIGURATION_FILE = "ams-env.xml";
+  public static final String METRICS_SSL_SERVER_CONFIGURATION_FILE = 
"ssl-server.xml";
+
+  public static final String TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR =
+    "timeline.metrics.aggregator.checkpoint.dir";
+
+  public static final String TIMELINE_METRIC_AGGREGATOR_SINK_CLASS =
+    "timeline.metrics.service.aggregator.sink.class";
+
+  public static final String TIMELINE_METRICS_SOURCE_PROVIDER_CLASS =
+    "timeline.metrics.service.source.provider.class";
+
+  public static final String TIMELINE_METRICS_SINK_PROVIDER_CLASS =
+    "timeline.metrics.service.sink.provider.class";
+
+  public static final String TIMELINE_METRICS_CACHE_SIZE =
+    "timeline.metrics.cache.size";
+
+  public static final String TIMELINE_METRICS_CACHE_COMMIT_INTERVAL =
+    "timeline.metrics.cache.commit.interval";
+
+  public static final String TIMELINE_METRICS_CACHE_ENABLED =
+    "timeline.metrics.cache.enabled";
+
+  public static final String DEFAULT_CHECKPOINT_LOCATION =
+    System.getProperty("java.io.tmpdir");
+
+  public static final String HBASE_ENCODING_SCHEME =
+    "timeline.metrics.hbase.data.block.encoding";
+
+  public static final String HBASE_COMPRESSION_SCHEME =
+    "timeline.metrics.hbase.compression.scheme";
+
+  public static final String CONTAINER_METRICS_TTL =
+    "timeline.container-metrics.ttl";
+
+  public static final String PRECISION_TABLE_TTL =
+    "timeline.metrics.host.aggregator.ttl";
+
+  public static final String HOST_MINUTE_TABLE_TTL =
+    "timeline.metrics.host.aggregator.minute.ttl";
+
+  public static final String HOST_DAILY_TABLE_TTL =
+    "timeline.metrics.host.aggregator.daily.ttl";
+
+  public static final String HOST_HOUR_TABLE_TTL =
+    "timeline.metrics.host.aggregator.hourly.ttl";
+
+  public static final String CLUSTER_SECOND_TABLE_TTL =
+    "timeline.metrics.cluster.aggregator.second.ttl";
+
+  public static final String CLUSTER_MINUTE_TABLE_TTL =
+    "timeline.metrics.cluster.aggregator.minute.ttl";
+
+  public static final String CLUSTER_HOUR_TABLE_TTL =
+    "timeline.metrics.cluster.aggregator.hourly.ttl";
+
+  public static final String CLUSTER_DAILY_TABLE_TTL =
+    "timeline.metrics.cluster.aggregator.daily.ttl";
+
+  public static final String CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL =
+    "timeline.metrics.cluster.aggregator.second.timeslice.interval";
+
+  public static final String AGGREGATOR_CHECKPOINT_DELAY =
+    "timeline.metrics.service.checkpointDelay";
+
+  public static final String RESULTSET_FETCH_SIZE =
+    "timeline.metrics.service.resultset.fetchSize";
+
+  public static final String HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL =
+    "timeline.metrics.host.aggregator.minute.interval";
+
+  public static final String HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL =
+    "timeline.metrics.host.aggregator.hourly.interval";
+
+  public static final String HOST_AGGREGATOR_DAILY_SLEEP_INTERVAL =
+    "timeline.metrics.host.aggregator.daily.interval";
+
+  public static final String CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL =
+    "timeline.metrics.cluster.aggregator.second.interval";
+
+  public static final String CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL =
+    "timeline.metrics.cluster.aggregator.minute.interval";
+
+  public static final String CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL =
+    "timeline.metrics.cluster.aggregator.hourly.interval";
+
+  public static final String CLUSTER_AGGREGATOR_DAILY_SLEEP_INTERVAL =
+    "timeline.metrics.cluster.aggregator.daily.interval";
+
+  public static final String 
HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER =
+    "timeline.metrics.host.aggregator.minute.checkpointCutOffMultiplier";
+
+  public static final String HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER 
=
+    "timeline.metrics.host.aggregator.hourly.checkpointCutOffMultiplier";
+
+  public static final String 
HOST_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER =
+    "timeline.metrics.host.aggregator.daily.checkpointCutOffMultiplier";
+
+  public static final String 
CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER =
+    "timeline.metrics.cluster.aggregator.second.checkpointCutOffMultiplier";
+
+  public static final String 
CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER =
+    "timeline.metrics.cluster.aggregator.minute.checkpointCutOffMultiplier";
+
+  public static final String 
CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER =
+    "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffMultiplier";
+
+  public static final String 
CLUSTER_AGGREGATOR_DAILY_CHECKPOINT_CUTOFF_MULTIPLIER =
+    "timeline.metrics.cluster.aggregator.daily.checkpointCutOffMultiplier";
+
+  public static final String GLOBAL_RESULT_LIMIT =
+    "timeline.metrics.service.default.result.limit";
+
+  public static final String GLOBAL_MAX_RETRIES =
+    "timeline.metrics.service.default.max_retries";
+
+  public static final String GLOBAL_RETRY_INTERVAL =
+    "timeline.metrics.service.default.retryInterval";
+
+  public static final String HOST_AGGREGATOR_MINUTE_DISABLED =
+    "timeline.metrics.host.aggregator.minute.disabled";
+
+  public static final String HOST_AGGREGATOR_HOUR_DISABLED =
+    "timeline.metrics.host.aggregator.hourly.disabled";
+
+  public static final String HOST_AGGREGATOR_DAILY_DISABLED =
+    "timeline.metrics.host.aggregator.hourly.disabled";
+
+  public static final String CLUSTER_AGGREGATOR_SECOND_DISABLED =
+    "timeline.metrics.cluster.aggregator.second.disabled";
+
+  public static final String CLUSTER_AGGREGATOR_MINUTE_DISABLED =
+    "timeline.metrics.cluster.aggregator.minute.disabled";
+
+  public static final String CLUSTER_AGGREGATOR_HOUR_DISABLED =
+    "timeline.metrics.cluster.aggregator.hourly.disabled";
+
+  public static final String CLUSTER_AGGREGATOR_DAILY_DISABLED =
+    "timeline.metrics.cluster.aggregator.daily.disabled";
+
+  public static final String WEBAPP_HTTP_ADDRESS =
+    "timeline.metrics.service.webapp.address";
+
+  public static final String TIMELINE_SERVICE_RPC_ADDRESS =
+    "timeline.metrics.service.rpc.address";
+
+  public static final String TIMELINE_SERVICE_DISABLE_CONTAINER_METRICS =
+    "timeline.metrics.service.container.metrics.disabled";
+
+  public static final String CLUSTER_AGGREGATOR_APP_IDS =
+    "timeline.metrics.service.cluster.aggregator.appIds";
+
+  public static final String SERVER_SIDE_TIMESIFT_ADJUSTMENT =
+    "timeline.metrics.service.cluster.aggregator.timeshift.adjustment";
+
+  public static final String OUT_OFF_BAND_DATA_TIME_ALLOWANCE =
+    "timeline.metrics.service.outofband.time.allowance.millis";
+
+  public static final String USE_GROUPBY_AGGREGATOR_QUERIES =
+    "timeline.metrics.service.use.groupBy.aggregators";
+
+  public static final String HANDLER_THREAD_COUNT =
+    "timeline.metrics.service.handler.thread.count";
+
+  public static final String WATCHER_DISABLED =
+    "timeline.metrics.service.watcher.disabled";
+
+  public static final String WATCHER_INITIAL_DELAY =
+    "timeline.metrics.service.watcher.initial.delay";
+
+  public static final String WATCHER_DELAY =
+    "timeline.metrics.service.watcher.delay";
+
+  public static final String WATCHER_TIMEOUT =
+    "timeline.metrics.service.watcher.timeout";
+
+  public static final String WATCHER_MAX_FAILURES =
+    "timeline.metrics.service.watcher.max.failures";
+
+  public static final String PRECISION_TABLE_SPLIT_POINTS =
+    "timeline.metrics.host.aggregate.splitpoints";
+
+  public static final String AGGREGATE_TABLE_SPLIT_POINTS =
+    "timeline.metrics.cluster.aggregate.splitpoints";
+
+  public static final String AGGREGATORS_SKIP_BLOCK_CACHE =
+    "timeline.metrics.aggregators.skip.blockcache.enabled";
+
+  public static final String TIMELINE_SERVICE_HTTP_POLICY =
+    "timeline.metrics.service.http.policy";
+
+  public static final String DISABLE_METRIC_METADATA_MGMT =
+    "timeline.metrics.service.metadata.management.disabled";
+
+  public static final String METRICS_METADATA_SYNC_INIT_DELAY =
+    "timeline.metrics.service.metadata.sync.init.delay";
+
+  public static final String METRICS_METADATA_SYNC_SCHEDULE_DELAY =
+    "timeline.metrics.service.metadata.sync.delay";
+
+  public static final String 
TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED =
+    "timeline.metrics.cluster.aggregator.interpolation.enabled";
+
+  public static final String TIMELINE_METRICS_SINK_COLLECTION_PERIOD =
+    "timeline.metrics.sink.collection.period";
+
+  public static final String TIMELINE_METRICS_PRECISION_TABLE_DURABILITY =
+    "timeline.metrics.precision.table.durability";
+
+  public static final String TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY =
+      "timeline.metrics.aggregate.tables.durability";
+
+  public static final String TIMELINE_METRICS_WHITELIST_ENABLED =
+    "timeline.metrics.whitelisting.enabled";
+
+  public static final String TIMELINE_METRICS_WHITELIST_FILE =
+    "timeline.metrics.whitelist.file";
+
+  public static final String TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT =
+    "/etc/ambari-metrics-collector/conf/metrics_whitelist";
+
+  public static final String TIMELINE_METRIC_METADATA_FILTERS =
+    "timeline.metrics.service.metadata.filters";
+
+  public static final String TIMELINE_METRICS_APPS_BLACKLIST =
+    "timeline.metrics.apps.blacklist";
+
+  public static final String TIMELINE_METRICS_APPS_WHITELIST =
+    "timeline.metrics.apps.whitelist";
+
+  public static final String HBASE_BLOCKING_STORE_FILES =
+    "hbase.hstore.blockingStoreFiles";
+
+  public static final String DEFAULT_TOPN_HOSTS_LIMIT =
+    "timeline.metrics.default.topn.hosts.limit";
+
+  public static final String TIMELINE_METRIC_AGGREGATION_SQL_FILTERS =
+    "timeline.metrics.cluster.aggregation.sql.filters";
+
+  public static final String 
TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY =
+    "timeline.metrics.hbase.aggregate.table.compaction.policy.key";
+
+  public static final String 
TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS =
+    "timeline.metrics.hbase.aggregate.table.compaction.policy.class";
+
+  public static final String 
TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES =
+    "timeline.metrics.aggregate.table.hbase.hstore.blockingStoreFiles";
+
+  public static final String 
TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY =
+    "timeline.metrics.hbase.precision.table.compaction.policy.key";
+
+  public static final String 
TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS =
+    "timeline.metrics.hbase.precision.table.compaction.policy.class";
+
+  public static final String 
TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES =
+    "timeline.metrics.precision.table.hbase.hstore.blockingStoreFiles";
+
+  public static final String TIMELINE_METRICS_UUID_GEN_STRATEGY =
+    "timeline.metrics.uuid.gen.strategy";
+
+  public static final String TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS =
+    "timeline.metrics.support.multiple.clusters";
+
+  public static final String HOST_APP_ID = "HOST";
+
+  public static final String DEFAULT_INSTANCE_PORT = "12001";
+
+  public static final String AMSHBASE_METRICS_WHITESLIST_FILE = 
"amshbase_metrics_whitelist";
+
+  public static final String TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION = 
"timeline.metrics.host.inmemory.aggregation";
+
+  public static final String TIMELINE_METRICS_COLLECTOR_INMEMORY_AGGREGATION = 
"timeline.metrics.collector.inmemory.aggregation";
+
+  public static final String TIMELINE_METRICS_COLLECTOR_IGNITE_NODES = 
"timeline.metrics.collector.ignite.nodes.list";
+
+  public static final String TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS = 
"timeline.metrics.collector.ignite.nodes.backups";
+
+  public static final String INTERNAL_CACHE_HEAP_PERCENT =
+    "timeline.metrics.internal.cache.%s.heap.percent";
+
+  public static final String EXTERNAL_SINK_INTERVAL =
+    "timeline.metrics.external.sink.%s.%s.interval";
+
+  public static final String DEFAULT_EXTERNAL_SINK_DIR =
+    "timeline.metrics.external.sink.dir";
+
+  public static final String KAFKA_SERVERS = 
"timeline.metrics.external.sink.kafka.bootstrap.servers";
+  public static final String KAFKA_ACKS = 
"timeline.metrics.external.sink.kafka.acks";
+  public static final String KAFKA_RETRIES = 
"timeline.metrics.external.sink.kafka.bootstrap.retries";
+  public static final String KAFKA_BATCH_SIZE = 
"timeline.metrics.external.sink.kafka.batch.size";
+  public static final String KAFKA_LINGER_MS = 
"timeline.metrics.external.sink.kafka.linger.ms";
+  public static final String KAFKA_BUFFER_MEM = 
"timeline.metrics.external.sink.kafka.buffer.memory";
+  public static final String KAFKA_SINK_TIMEOUT_SECONDS = 
"timeline.metrics.external.sink.kafka.timeout.seconds";
+  
+  private Configuration hbaseConf;
+  private Configuration metricsConf;
+  private Configuration metricsSslConf;
+  private Configuration amsEnvConf;
+  private volatile boolean isInitialized = false;
+
+  private static TimelineMetricConfiguration instance = new 
TimelineMetricConfiguration();
+
+  private TimelineMetricConfiguration() {}
+
+  public static TimelineMetricConfiguration getInstance() {
+    return instance;
+  }
+
+  // Tests
+  public TimelineMetricConfiguration(Configuration hbaseConf, Configuration 
metricsConf) {
+    this.hbaseConf = hbaseConf;
+    this.metricsConf = metricsConf;
+    this.isInitialized = true;
+  }
+
+  public void initialize() throws URISyntaxException, MalformedURLException {
+    if (!isInitialized) {
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      if (classLoader == null) {
+        classLoader = getClass().getClassLoader();
+      }
+      URL hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE);
+      URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
+      LOG.info("Found hbase site configuration: " + hbaseResUrl);
+      LOG.info("Found metric service configuration: " + amsResUrl);
+
+      if (hbaseResUrl == null) {
+        throw new IllegalStateException("Unable to initialize the metrics " +
+          "subsystem. No hbase-site present in the classpath.");
+      }
+
+      if (amsResUrl == null) {
+        throw new IllegalStateException("Unable to initialize the metrics " +
+          "subsystem. No ams-site present in the classpath.");
+      }
+
+      hbaseConf = new Configuration(true);
+      hbaseConf.addResource(hbaseResUrl.toURI().toURL());
+      metricsConf = new Configuration(true);
+      metricsConf.addResource(amsResUrl.toURI().toURL());
+
+      if (metricsConf.get("timeline.metrics.service.http.policy", 
"HTTP_ONLY").equalsIgnoreCase("HTTPS_ONLY")) {
+        URL amsSllResUrl = 
classLoader.getResource(METRICS_SSL_SERVER_CONFIGURATION_FILE);
+        LOG.info("Found metric ssl service configuration: " + amsResUrl);
+        if (amsSllResUrl == null) {
+          throw new IllegalStateException("Unable to initialize the metrics " +
+            "subsystem. No ams-ssl-server present in the classpath.");
+        }
+        metricsSslConf = new Configuration(true);
+        metricsSslConf.addResource(amsSllResUrl.toURI().toURL());
+      }
+
+      isInitialized = true;
+    }
+  }
+
+  public Configuration getHbaseConf() throws URISyntaxException, 
MalformedURLException {
+    if (!isInitialized) {
+      initialize();
+    }
+    return hbaseConf;
+  }
+
+  public Configuration getMetricsConf() throws URISyntaxException, 
MalformedURLException {
+    if (!isInitialized) {
+      initialize();
+    }
+    return metricsConf;
+  }
+
+  public Configuration getMetricsSslConf() throws URISyntaxException, 
MalformedURLException {
+    if (!isInitialized) {
+      initialize();
+    }
+    return metricsSslConf;
+  }
+
+  public String getZKClientPort() throws MalformedURLException, 
URISyntaxException {
+    return getHbaseConf().getTrimmed("hbase.zookeeper.property.clientPort", 
"2181");
+  }
+
+  public String getZKQuorum() throws MalformedURLException, URISyntaxException 
{
+    return getHbaseConf().getTrimmed("hbase.zookeeper.quorum");
+  }
+
+  public String getClusterZKClientPort() throws MalformedURLException, 
URISyntaxException {
+    return 
getMetricsConf().getTrimmed("cluster.zookeeper.property.clientPort", "2181");
+  }
+
+  public String getClusterZKQuorum() throws MalformedURLException, 
URISyntaxException {
+    return getMetricsConf().getTrimmed("cluster.zookeeper.quorum");
+  }
+
+  public String getInstanceHostnameFromEnv() throws UnknownHostException {
+    String amsInstanceName = System.getProperty("AMS_INSTANCE_NAME");
+    if (amsInstanceName == null) {
+      amsInstanceName = InetAddress.getLocalHost().getHostName();
+    }
+    return amsInstanceName;
+  }
+
+  public String getInstancePort() throws MalformedURLException, 
URISyntaxException {
+    String amsInstancePort = System.getProperty("AMS_INSTANCE_PORT");
+    if (amsInstancePort == null) {
+      // Check config
+      return 
getMetricsConf().get("timeline.metrics.availability.instance.port", 
DEFAULT_INSTANCE_PORT);
+    }
+    return DEFAULT_INSTANCE_PORT;
+  }
+
+  public String getWebappAddress() throws MalformedURLException, 
URISyntaxException {
+    String defaultHttpAddress = "0.0.0.0:6188";
+    return getMetricsConf().get(WEBAPP_HTTP_ADDRESS, defaultHttpAddress);
+  }
+
+  public int getTimelineMetricsServiceHandlerThreadCount() {
+    if (metricsConf != null) {
+      return Integer.parseInt(metricsConf.get(HANDLER_THREAD_COUNT, "20"));
+    }
+    return 20;
+  }
+
+  public boolean isTimelineMetricsServiceWatcherDisabled() {
+    if (metricsConf != null) {
+      return Boolean.parseBoolean(metricsConf.get(WATCHER_DISABLED, "false"));
+    }
+    return false;
+  }
+
+  public int getTimelineMetricsServiceWatcherInitDelay() {
+    if (metricsConf != null) {
+      return Integer.parseInt(metricsConf.get(WATCHER_INITIAL_DELAY, "600"));
+    }
+    return 600;
+  }
+
+  public int getTimelineMetricsServiceWatcherDelay() {
+    if (metricsConf != null) {
+      return Integer.parseInt(metricsConf.get(WATCHER_DELAY, "30"));
+    }
+    return 30;
+  }
+
+  public int getTimelineMetricsServiceWatcherTimeout() {
+    if (metricsConf != null) {
+      return Integer.parseInt(metricsConf.get(WATCHER_TIMEOUT, "30"));
+    }
+    return 30;
+  }
+
+  public int getTimelineMetricsServiceWatcherMaxFailures() {
+    if (metricsConf != null) {
+      return Integer.parseInt(metricsConf.get(WATCHER_MAX_FAILURES, "3"));
+    }
+    return 3;
+  }
+
+  public boolean getTimelineMetricsMultipleClusterSupport() {
+    if (metricsConf != null) {
+      return 
Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS,
 "false"));
+    }
+    return false;
+  }
+
+  public String getTimelineServiceRpcAddress() {
+    String defaultRpcAddress = "0.0.0.0:60200";
+    if (metricsConf != null) {
+      return metricsConf.get(TIMELINE_SERVICE_RPC_ADDRESS, defaultRpcAddress);
+    }
+    return defaultRpcAddress;
+  }
+
+  public String getKafkaServers() {
+    if (metricsConf != null) {
+      return metricsConf.get("timeline.metrics.kafka.servers", null);
+    }
+    return null;
+  }
+
+  public boolean isDistributedCollectorModeDisabled() {
+    try {
+      if (getMetricsConf() != null) {
+        return 
Boolean.parseBoolean(getMetricsConf().get("timeline.metrics.service.distributed.collector.mode.disabled",
 "false"));
+      }
+      return false;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  public boolean isSecurityEnabled() {
+    return hbaseConf.get("hbase.security.authentication", 
"").equals("kerberos");
+  }
+
+  public Set<String> getAmshbaseWhitelist() {
+
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    if (classLoader == null) {
+      classLoader = getClass().getClassLoader();
+    }
+
+    BufferedReader br = null;
+    String strLine;
+    Set<String> whitelist = new HashSet<>();
+
+    try(InputStream inputStream = 
classLoader.getResourceAsStream(AMSHBASE_METRICS_WHITESLIST_FILE)) {
+
+      if (inputStream == null) {
+        LOG.info("ams-hbase metrics whitelist file not present.");
+        return Collections.EMPTY_SET;
+      }
+
+      br = new BufferedReader(new InputStreamReader(inputStream));
+
+      while ((strLine = br.readLine()) != null)   {
+        strLine = strLine.trim();
+        if (StringUtils.isEmpty(strLine)) {
+          continue;
+        }
+        whitelist.add(strLine);
+      }
+    } catch (Exception ex) {
+      LOG.error("Unable to read ams-hbase metric whitelist file", ex);
+      return Collections.EMPTY_SET;
+    }
+
+    return whitelist;
+  }
+
+  /**
+   * Get the sink interval for a metrics source.
+   * Determines how often the metrics will be written to the sink.
+   * This determines whether any caching will be needed on the collector
+   * side, default interval disables caching by writing at the same time as
+   * we get data.
+   *
+   * @param sinkProviderClassName Simple name of your implementation of {@link 
ExternalSinkProvider}
+   * @param sourceName {@link InternalSourceProvider.SOURCE_NAME}
+   * @return seconds
+   */
+  public int getExternalSinkInterval(String sinkProviderClassName,
+                                     InternalSourceProvider.SOURCE_NAME 
sourceName) {
+    String sinkProviderSimpleClassName = sinkProviderClassName.substring(
+      sinkProviderClassName.lastIndexOf(".") + 1);
+
+    return Integer.parseInt(metricsConf.get(
+      String.format(EXTERNAL_SINK_INTERVAL, sinkProviderSimpleClassName, 
sourceName), "-1"));
+  }
+
+  public InternalSourceProvider getInternalSourceProvider() {
+    Class<? extends InternalSourceProvider> providerClass =
+      metricsConf.getClass(TIMELINE_METRICS_SOURCE_PROVIDER_CLASS,
+        DefaultInternalMetricsSourceProvider.class, 
InternalSourceProvider.class);
+    return ReflectionUtils.newInstance(providerClass, metricsConf);
+  }
+
+  /**
+   * List of external sink provider classes. Comma-separated.
+   */
+  public List<ExternalSinkProvider> getExternalSinkProviderList() {
+    Class<?>[] providerClasses = 
metricsConf.getClasses(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
+    List<ExternalSinkProvider> providerList = new ArrayList<>();
+    if (providerClasses != null) {
+      for (Class<?> providerClass : providerClasses) {
+        providerList.add((ExternalSinkProvider) 
ReflectionUtils.newInstance(providerClass, metricsConf));
+      }
+    }
+    return providerList;
+  }
+
+  public String getInternalCacheHeapPercent(String instanceName) {
+    String heapPercent = 
metricsConf.get(String.format(INTERNAL_CACHE_HEAP_PERCENT, instanceName));
+    if (StringUtils.isEmpty(heapPercent)) {
+      return "5%";
+    } else {
+      return heapPercent.endsWith("%") ? heapPercent : heapPercent + "%";
+    }
+  }
+
+  public String getDefaultMetricsSinkDir() {
+    String dirPath = metricsConf.get(DEFAULT_EXTERNAL_SINK_DIR);
+    if (dirPath == null) {
+      // Only one logger at the time of writing
+      Appender appender = (Appender) 
Logger.getRootLogger().getAllAppenders().nextElement();
+      if (appender instanceof FileAppender) {
+        File f = new File(((FileAppender) appender).getFile());
+        if (f.exists()) {
+          dirPath = f.getParent();
+        } else {
+          dirPath = "/tmp";
+        }
+      }
+    }
+
+    return dirPath;
+  }
+
+  public boolean isHostInMemoryAggregationEnabled() {
+    if (metricsConf != null) {
+      return 
Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, 
"false"));
+    } else {
+      return false;
+    }
+  }
+
+  public boolean isContainerMetricsDisabled() {
+    try {
+      return metricsConf != null && 
Boolean.parseBoolean(metricsConf.get(TIMELINE_SERVICE_DISABLE_CONTAINER_METRICS,
 "false"));
+    } catch (Exception e) {
+
+      return false;
+    }
+  }
+
+  public boolean isCollectorInMemoryAggregationEnabled() {
+    if (metricsConf != null) {
+      return 
Boolean.valueOf(metricsConf.get(TIMELINE_METRICS_COLLECTOR_INMEMORY_AGGREGATION,
 "false"));
+    } else {
+      return false;
+    }
+  }
+
+  public List<String> getAppIdsForHostAggregation() {
+    String appIds = metricsConf.get(CLUSTER_AGGREGATOR_APP_IDS);
+    if (!StringUtils.isEmpty(appIds)) {
+      return Arrays.asList(StringUtils.stripAll(appIds.split(",")));
+    }
+    return Collections.emptyList();
+  }
+
+  public String getZkConnectionUrl(String zkClientPort, String zkQuorum){
+      StringBuilder sb = new StringBuilder();
+      String[] quorumParts = zkQuorum.split(",");
+      String prefix = "";
+      for (String part : quorumParts) {
+        sb.append(prefix);
+        sb.append(part.trim());
+        if (!part.contains(":")) {
+          sb.append(":");
+          sb.append(zkClientPort);
+        }
+        prefix = ",";
+      }
+
+      return sb.toString();
+
+    }
+
+  public boolean isWhitelistingEnabled() {
+    if (metricsConf != null) {
+      return 
Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_WHITELIST_ENABLED, 
"false"));
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricDistributedCache.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricDistributedCache.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricDistributedCache.java
new file mode 100644
index 0000000..000b3bc
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricDistributedCache.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import 
org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+
+public interface TimelineMetricDistributedCache {
+  Map<TimelineClusterMetric, MetricClusterAggregate> 
evictMetricAggregates(Long startTime, Long endTime);
+  void putMetrics(Collection<TimelineMetric> elements, 
TimelineMetricMetadataManager metricMetadataManager);
+  Map<String, Double> getPointInTimeCacheMetrics();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStore.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStore.java
new file mode 100644
index 0000000..901d51e
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStore.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
+import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+public interface TimelineMetricStore {
+  /**
+   * This method retrieves metrics stored by the Timeline store.
+   *
+   * @param metricNames Names of the metric, e.g.: cpu_user
+   * @param hostnames Names of the host where the metric originated from
+   * @param applicationId Id of the application to which this metric belongs
+   * @param instanceId Application instance id.
+   * @param startTime Start timestamp
+   * @param endTime End timestamp
+   * @param precision Precision [ seconds, minutes, hours ]
+   * @param limit Override default result limit
+   * @param groupedByHosts Group {@link TimelineMetric} by metric name, 
hostname,
+   *                app id and instance id
+   * @param seriesAggregateFunction Specify this when caller want to aggregate 
multiple metrics
+   *                                series into one. [ SUM, AVG, MIN, MAX ]
+   *
+   * @return {@link TimelineMetric}
+   * @throws java.sql.SQLException
+   */
+  TimelineMetrics getTimelineMetrics(List<String> metricNames, List<String> 
hostnames,
+                                     String applicationId, String instanceId, 
Long startTime,
+                                     Long endTime, Precision precision, 
Integer limit, boolean groupedByHosts,
+                                     TopNConfig topNConfig, String 
seriesAggregateFunction)
+    throws SQLException, IOException;
+
+  /**
+   * Stores metric information to the timeline store. Any errors occurring for
+   * individual put request objects will be reported in the response.
+   *
+   * @param metrics An {@link TimelineMetrics}.
+   * @return An {@link 
org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse}.
+   * @throws SQLException, IOException
+   */
+  TimelinePutResponse putMetrics(TimelineMetrics metrics) throws SQLException, 
IOException;
+
+  /**
+   * Store container metric into the timeline tore
+   */
+  TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
+      throws SQLException, IOException;
+
+  /**
+   * Return all metrics metadata that have been written to the store.
+   * @return { appId : [ @TimelineMetricMetadata ] }
+   * @throws SQLException
+   * @throws IOException
+   */
+  Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String 
appId, String metricPattern,
+                                                                             
boolean includeBlacklistedMetrics) throws SQLException, IOException;
+
+  TimelinePutResponse putHostAggregatedMetrics(AggregationResult 
aggregationResult) throws SQLException, IOException;
+  /**
+   * Returns all hosts that have written metrics with the apps on the host
+   * @return { hostname : [ appIds ] }
+   * @throws SQLException
+   * @throws IOException
+   */
+  Map<String, Set<String>> getHostAppsMetadata() throws SQLException, 
IOException;
+
+  /**
+   * Returns all instances and the set of hosts each instance is present on
+   * @return { instanceId : [ hosts ] }
+   * @throws SQLException
+   * @throws IOException
+   */
+  Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String 
instanceId, String appId) throws SQLException, IOException;
+
+  byte[] getUuid(String metricName, String appId, String instanceId, String 
hostname) throws SQLException, IOException;
+
+    /**
+     * Return a list of known live collector nodes
+     * @return [ hostname ]
+     */
+  List<String> getLiveInstances();
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcher.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcher.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcher.java
new file mode 100644
index 0000000..0ab7929
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricStoreWatcher.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline;
+
+import java.util.Collections;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.util.ExitUtil;
+
+/**
+ * Acts as the single TimetineMetricStore Watcher.
+ */
+public class TimelineMetricStoreWatcher implements Runnable {
+
+  private static final Log LOG = 
LogFactory.getLog(TimelineMetricStoreWatcher.class);
+  private static final String FAKE_METRIC_NAME = 
"TimelineMetricStoreWatcher.FakeMetric";
+  private static final String FAKE_HOSTNAME = "fakehostname";
+  private static final String FAKE_APP_ID = "timeline_metric_store_watcher";
+
+  private static int failures = 0;
+  private final TimelineMetricConfiguration configuration;
+
+  private TimelineMetricStore timelineMetricStore;
+
+  //used to call timelineMetricStore blocking methods with timeout
+  private ExecutorService executor = Executors.newSingleThreadExecutor();
+
+
+  public TimelineMetricStoreWatcher(TimelineMetricStore timelineMetricStore,
+                                    TimelineMetricConfiguration configuration) 
{
+    this.timelineMetricStore = timelineMetricStore;
+    this.configuration = configuration;
+  }
+
+  @Override
+  public void run() {
+    if (checkMetricStore()) {
+      failures = 0;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Successfully got metrics from TimelineMetricStore");
+      }
+    } else {
+      LOG.info("Failed to get metrics from TimelineMetricStore, attempt = " + 
failures);
+      failures++;
+    }
+
+    if (failures >= 
configuration.getTimelineMetricsServiceWatcherMaxFailures()) {
+      String msg = "Error getting metrics from TimelineMetricStore. " +
+        "Shutting down by TimelineMetricStoreWatcher.";
+      LOG.fatal(msg);
+      ExitUtil.terminate(-1, msg);
+    }
+
+  }
+
+  /**
+   * Checks TimelineMetricStore functionality by adding and getting
+   * a fake metric to/from HBase
+   * @return if check was successful
+   */
+  private boolean checkMetricStore() {
+    final long startTime = System.currentTimeMillis();
+    final int delay = configuration.getTimelineMetricsServiceWatcherDelay();
+    final int timeout = 
configuration.getTimelineMetricsServiceWatcherTimeout();
+
+    TimelineMetric fakeMetric = new TimelineMetric();
+    fakeMetric.setMetricName(FAKE_METRIC_NAME);
+    fakeMetric.setHostName(FAKE_HOSTNAME);
+    fakeMetric.setAppId(FAKE_APP_ID);
+    fakeMetric.setStartTime(startTime);
+    fakeMetric.getMetricValues().put(startTime, 0.0);
+
+    final TimelineMetrics metrics = new TimelineMetrics();
+    metrics.setMetrics(Collections.singletonList(fakeMetric));
+
+    Callable<TimelineMetric> task = new Callable<TimelineMetric>() {
+      public TimelineMetric call() throws Exception {
+        timelineMetricStore.putMetrics(metrics);
+        TimelineMetrics timelineMetrics = 
timelineMetricStore.getTimelineMetrics(
+          Collections.singletonList(FAKE_METRIC_NAME), 
Collections.singletonList(FAKE_HOSTNAME),
+          FAKE_APP_ID, null, startTime - delay * 2 * 1000,
+          startTime + delay * 2 * 1000, Precision.SECONDS, 1, true, null, 
null);
+        return timelineMetrics.getMetrics().get(0);
+      }
+    };
+
+    Future<TimelineMetric> future = executor.submit(task);
+    TimelineMetric timelineMetric = null;
+    try {
+      timelineMetric = future.get(timeout, TimeUnit.SECONDS);
+    // Phoenix might throw RuntimeExeption's
+    } catch (Exception e) {
+      return false;
+    } finally {
+      future.cancel(true);
+    }
+
+    return timelineMetric != null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsAggregatorSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsAggregatorSink.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsAggregatorSink.java
new file mode 100644
index 0000000..bb379d8
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsAggregatorSink.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import java.util.Map;
+
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import 
org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+
+/**
+ * This Interface for storing aggregated metrics to any external storage
+ */
+public interface TimelineMetricsAggregatorSink {
+
+  /**
+   * Save host aggregated metrics
+   *
+   * @param hostAggregateMap Map of host aggregated metrics
+   * @param precision SECOND, MINUTE, HOUR, DAY
+   */
+  void saveHostAggregateRecords(
+      Map<TimelineMetric, MetricHostAggregate> hostAggregateMap,
+      Precision precision);
+
+  /**
+   * Save cluster time aggregated metrics
+   *
+   * @param clusterTimeAggregateMap Map of cluster aggregated metrics
+   * @param precision SECOND, MINUTE, HOUR, DAY
+   */
+  void saveClusterTimeAggregateRecords(
+      Map<TimelineClusterMetric, MetricHostAggregate> clusterTimeAggregateMap,
+      Precision precision);
+
+  /**
+   * Save cluster aggregated metrics
+   *
+   * @param clusterAggregateMap Map of cluster aggregated metrics
+   */
+  void saveClusterAggregateRecords(
+      Map<TimelineClusterMetric, MetricClusterAggregate> clusterAggregateMap);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsFilter.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsFilter.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsFilter.java
new file mode 100644
index 0000000..b2d5fd9
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsFilter.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.core.timeline;
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+public class TimelineMetricsFilter {
+
+  private static Set<String> whitelistedMetrics;
+  private static Set<Pattern> whitelistedMetricPatterns;
+  private static Set<String> whitelistedApps;
+  private static Set<String> blacklistedApps;
+  private static String patternPrefix = "._p_";
+  private static Set<String> amshbaseWhitelist;
+
+  private static final Log LOG = 
LogFactory.getLog(TimelineMetricsFilter.class);
+
+  public static void initializeMetricFilter(TimelineMetricConfiguration 
configuration) {
+
+    Configuration metricsConf = null;
+    try {
+      metricsConf = configuration.getMetricsConf();
+    } catch (Exception e) {
+      LOG.error("Error fetching metrics configuration for getting whitelisting 
information");
+      return;
+    }
+
+    whitelistedMetrics = new HashSet<String>();
+    whitelistedMetricPatterns = new HashSet<Pattern>();
+    blacklistedApps = new HashSet<>();
+    whitelistedApps = new HashSet<>();
+    amshbaseWhitelist = new HashSet<>();
+
+    if (configuration.isWhitelistingEnabled()) {
+      String whitelistFile = 
metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE, 
TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT);
+      readMetricWhitelistFromFile(whitelistFile);
+    }
+
+    String appsBlacklist = 
metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_APPS_BLACKLIST, 
"");
+    if (!StringUtils.isEmpty(appsBlacklist)) {
+      for (String app : appsBlacklist.split(",")) {
+        blacklistedApps.add(app);
+      }
+      LOG.info("Blacklisted apps : " + blacklistedApps.toString());
+    }
+
+    String appsWhitelist = 
metricsConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_APPS_WHITELIST, 
"");
+    if (!StringUtils.isEmpty(appsWhitelist)) {
+      for (String app : appsWhitelist.split(",")) {
+        whitelistedApps.add(app);
+      }
+      LOG.info("Whitelisted apps : " + whitelistedApps.toString());
+    }
+
+    amshbaseWhitelist = configuration.getAmshbaseWhitelist();
+    if (CollectionUtils.isNotEmpty(amshbaseWhitelist)) {
+      LOG.info("Whitelisting " + amshbaseWhitelist.size() + " ams-hbase 
metrics");
+    }
+  }
+
+  private static void readMetricWhitelistFromFile(String whitelistFile) {
+
+    BufferedReader br = null;
+    String strLine;
+
+    try(FileInputStream fstream = new FileInputStream(whitelistFile)) {
+      br = new BufferedReader(new InputStreamReader(fstream));
+
+      while ((strLine = br.readLine()) != null)   {
+        strLine = strLine.trim();
+        if (StringUtils.isEmpty(strLine)) {
+          continue;
+        }
+        if (strLine.startsWith(patternPrefix)) {
+          
whitelistedMetricPatterns.add(Pattern.compile(strLine.substring(patternPrefix.length())));
+        } else {
+          whitelistedMetrics.add(strLine);
+        }
+      }
+    } catch (IOException ioEx) {
+      LOG.error("Unable to parse metric whitelist file", ioEx);
+    }
+
+    LOG.info("Whitelisting " + whitelistedMetrics.size() + " metrics");
+    LOG.debug("Whitelisted metrics : " + 
Arrays.toString(whitelistedMetrics.toArray()));
+  }
+
+  public static boolean acceptMetric(String metricName, String appId) {
+    TimelineMetric timelineMetric = new TimelineMetric();
+    timelineMetric.setAppId(appId);
+    timelineMetric.setMetricName(metricName);
+    return acceptMetric(timelineMetric);
+  }
+
+  public static boolean acceptMetric(TimelineMetric metric) {
+
+    String appId = metric.getAppId();
+    String metricName = metric.getMetricName();
+    // App Blacklisting
+    if (CollectionUtils.isNotEmpty(blacklistedApps) && 
blacklistedApps.contains(appId)) {
+      return false;
+    }
+
+    //Special Case appId = ams-hbase whitelisting.
+    if ("ams-hbase".equals(appId) && 
CollectionUtils.isNotEmpty(amshbaseWhitelist)) {
+      return amshbaseWhitelist.contains(metric.getMetricName());
+    }
+
+    // App Whitelisting
+    if (CollectionUtils.isNotEmpty(whitelistedApps) && 
whitelistedApps.contains(appId)) {
+      return true;
+    }
+
+    // Metric Whitelisting
+    if (CollectionUtils.isEmpty(whitelistedMetrics) && 
CollectionUtils.isEmpty(whitelistedMetricPatterns)) {
+      return true;
+    }
+
+    if (whitelistedMetrics.contains(metricName)) {
+      return true;
+    }
+
+    for (Pattern p : whitelistedMetricPatterns) {
+      Matcher m = p.matcher(metricName);
+      if (m.find()) {
+        whitelistedMetrics.add(metricName);
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java
new file mode 100644
index 0000000..e6990e7
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+import static 
org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getTimeSlices;
+import static 
org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.sliceFromTimelineMetric;
+
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import javax.cache.Cache;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import 
org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorHAHelper;
+import 
org.apache.ambari.metrics.core.timeline.aggregators.TimelineClusterMetric;
+import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricHostMetadata;
+import 
org.apache.ambari.metrics.core.timeline.discovery.TimelineMetricMetadataManager;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.ssl.SslContextFactory;
+
+public class TimelineMetricsIgniteCache implements 
TimelineMetricDistributedCache {
+  private static final Log LOG =
+      LogFactory.getLog(TimelineMetricsIgniteCache.class);
+  private IgniteCache<TimelineClusterMetric, MetricClusterAggregate> 
igniteCache;
+  private long cacheSliceIntervalMillis;
+  private boolean interpolationEnabled;
+  private List<String> skipAggrPatternStrings = new ArrayList<>();
+  private List<String> appIdsToAggregate;
+
+
+  public TimelineMetricsIgniteCache() throws MalformedURLException, 
URISyntaxException {
+    TimelineMetricConfiguration timelineMetricConfiguration = 
TimelineMetricConfiguration.getInstance();
+    Configuration metricConf = timelineMetricConfiguration.getMetricsConf();
+    Configuration sslConf = timelineMetricConfiguration.getMetricsSslConf();
+
+    IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
+
+    //TODO add config to disable logging
+
+    //enable ssl for ignite requests
+    if 
(metricConf.get(TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY) != 
null && 
metricConf.get(TimelineMetricConfiguration.TIMELINE_SERVICE_HTTP_POLICY).equalsIgnoreCase("HTTPS_ONLY"))
 {
+      SslContextFactory sslContextFactory = new SslContextFactory();
+      String keyStorePath = sslConf.get("ssl.server.keystore.location");
+      String keyStorePassword = sslConf.get("ssl.server.keystore.password");
+      String trustStorePath = sslConf.get("ssl.server.truststore.location");
+      String trustStorePassword = 
sslConf.get("ssl.server.truststore.password");
+
+      sslContextFactory.setKeyStoreFilePath(keyStorePath);
+      sslContextFactory.setKeyStorePassword(keyStorePassword.toCharArray());
+      sslContextFactory.setTrustStoreFilePath(trustStorePath);
+      
sslContextFactory.setTrustStorePassword(trustStorePassword.toCharArray());
+      igniteConfiguration.setSslContextFactory(sslContextFactory);
+    }
+
+    //aggregation parameters
+    appIdsToAggregate = 
timelineMetricConfiguration.getAppIdsForHostAggregation();
+    interpolationEnabled = 
Boolean.parseBoolean(metricConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED,
 "true"));
+    cacheSliceIntervalMillis = 
SECONDS.toMillis(metricConf.getInt(TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL,
 30));
+    Long aggregationInterval = 
metricConf.getLong(TimelineMetricConfiguration.CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL,
 120L);
+
+    String filteredMetricPatterns = 
metricConf.get(TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
+    if (!StringUtils.isEmpty(filteredMetricPatterns)) {
+      LOG.info("Skipping aggregation for metric patterns : " + 
filteredMetricPatterns);
+      
skipAggrPatternStrings.addAll(Arrays.asList(filteredMetricPatterns.split(",")));
+    }
+
+    if 
(metricConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES)
 != null) {
+      TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
+      TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
+      
ipFinder.setAddresses(Arrays.asList(metricConf.get(TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_NODES).split(",")));
+      LOG.info("Setting ignite nodes to : " + 
ipFinder.getRegisteredAddresses());
+      discoverySpi.setIpFinder(ipFinder);
+      igniteConfiguration.setDiscoverySpi(discoverySpi);
+    } else {
+      //get live nodes from ZK
+      String zkClientPort = 
timelineMetricConfiguration.getClusterZKClientPort();
+      String zkQuorum = timelineMetricConfiguration.getClusterZKQuorum();
+      String zkConnectionURL = 
timelineMetricConfiguration.getZkConnectionUrl(zkClientPort, zkQuorum);
+      MetricCollectorHAHelper metricCollectorHAHelper = new 
MetricCollectorHAHelper(zkConnectionURL, 5, 200);
+      Collection<String> liveCollectors = 
metricCollectorHAHelper.findLiveCollectorHostsFromZNode();
+      if (liveCollectors != null && !liveCollectors.isEmpty()) {
+        TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
+        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
+        ipFinder.setAddresses(liveCollectors);
+        LOG.info("Setting ignite nodes to : " + 
ipFinder.getRegisteredAddresses());
+        discoverySpi.setIpFinder(ipFinder);
+        igniteConfiguration.setDiscoverySpi(discoverySpi);
+      }
+    }
+
+
+    //ignite cache configuration
+    CacheConfiguration<TimelineClusterMetric, MetricClusterAggregate> 
cacheConfiguration = new CacheConfiguration<>();
+    cacheConfiguration.setName("metrics_cache");
+    //set cache mode to partitioned with # of backups
+    cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
+    
cacheConfiguration.setBackups(metricConf.getInt(TimelineMetricConfiguration.TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS,
 1));
+    //disable throttling due to cpu impact
+    cacheConfiguration.setRebalanceThrottle(0);
+    //enable locks
+    cacheConfiguration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+    //expiry policy to remove lost keys, if any
+    cacheConfiguration.setEagerTtl(true);
+    
cacheConfiguration.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new 
Duration(TimeUnit.SECONDS, aggregationInterval * 3)));
+
+    Ignite igniteNode = Ignition.start(igniteConfiguration);
+    igniteCache = igniteNode.getOrCreateCache(cacheConfiguration);
+  }
+
+  /**
+   * Looks through the cache and evicts all elements within (startTime; 
endTime] half-interval
+   * All elements satisfying the half-interval will be removed from the cache.
+   * @param startTime
+   * @param endTime
+   * @return
+   */
+  @Override
+  public Map<TimelineClusterMetric, MetricClusterAggregate> 
evictMetricAggregates(Long startTime, Long endTime) {
+    Map<TimelineClusterMetric, MetricClusterAggregate> aggregatedMetricsMap = 
new HashMap<>();
+
+    //construct filter
+    IgniteBiPredicate<TimelineClusterMetric, MetricClusterAggregate> filter =
+        (IgniteBiPredicate<TimelineClusterMetric, MetricClusterAggregate>) 
(key, value) -> key.getTimestamp() > startTime && key.getTimestamp() <= endTime;
+
+    //get values from cache
+    try (QueryCursor<Cache.Entry<TimelineClusterMetric, 
MetricClusterAggregate>> cursor = igniteCache.query(new ScanQuery(filter))) {
+      for (Cache.Entry<TimelineClusterMetric, MetricClusterAggregate> e : 
cursor) {
+        aggregatedMetricsMap.put(e.getKey(), e.getValue());
+      }
+    }
+
+    //remove values from cache
+    igniteCache.removeAllAsync(aggregatedMetricsMap.keySet());
+
+    return aggregatedMetricsMap;
+  }
+
+  /**
+   * Iterates through elements skipping white-listed patterns;
+   * calculates average value for each slice of each metric (last slice values 
could be ignored in there is the possibility that values from this slice could 
be present in next post);
+   * updates/adds the value in the cache;
+   * calculates applications host metrics based on the metadata of hosted apps
+   * updates metadata of hosted apps if needed
+   * @param elements
+   * @param metadataManager
+   */
+  @Override
+  public void putMetrics(Collection<TimelineMetric> elements, 
TimelineMetricMetadataManager metadataManager) {
+    Map<String, TimelineMetricHostMetadata> hostMetadata = 
metadataManager.getHostedAppsCache();
+    for (TimelineMetric metric : elements) {
+      if (shouldBeSkipped(metric.getMetricName())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(String.format("Skipping %s metric from being aggregated", 
metric.getMetricName()));
+        }
+        continue;
+      }
+      List<Long[]> timeSlices = 
getTimeSlices(getRoundedCheckPointTimeMillis(metric.getMetricValues().firstKey(),
 cacheSliceIntervalMillis), metric.getMetricValues().lastKey(), 
cacheSliceIntervalMillis);
+      Map<TimelineClusterMetric, Double> slicedClusterMetrics = 
sliceFromTimelineMetric(metric, timeSlices, interpolationEnabled);
+
+      if (slicedClusterMetrics != null) {
+        for (Map.Entry<TimelineClusterMetric, Double> metricDoubleEntry : 
slicedClusterMetrics.entrySet()) {
+          MetricClusterAggregate newMetricClusterAggregate  = new 
MetricClusterAggregate(
+              metricDoubleEntry.getValue(), 1, null, 
metricDoubleEntry.getValue(), metricDoubleEntry.getValue());
+          //put app metric into cache
+          putMetricIntoCache(metricDoubleEntry.getKey(), 
newMetricClusterAggregate);
+          if (hostMetadata != null) {
+            //calculate app host metric
+            if 
(metric.getAppId().equalsIgnoreCase(TimelineMetricConfiguration.HOST_APP_ID)) {
+              // Candidate metric, update app aggregates
+              if (hostMetadata.containsKey(metric.getHostName())) {
+                updateAppAggregatesFromHostMetric(metricDoubleEntry.getKey(), 
newMetricClusterAggregate, hostMetadata.get(metric.getHostName()));
+              }
+            } else {
+              // Build the hostedapps map if not a host metric
+              // Check app candidacy for host aggregation
+              //TODO better to lock TimelineMetricHostMetadata instance to 
avoid dataloss, but generally the data could be lost only during initial 
collector start
+              if (appIdsToAggregate.contains(metric.getAppId())) {
+                TimelineMetricHostMetadata timelineMetricHostMetadata = 
hostMetadata.get(metric.getHostName());
+                ConcurrentHashMap<String, String> appIdsMap;
+                if (timelineMetricHostMetadata == null) {
+                  appIdsMap = new ConcurrentHashMap<>();
+                  hostMetadata.put(metric.getHostName(), new 
TimelineMetricHostMetadata(appIdsMap));
+                } else {
+                  appIdsMap = timelineMetricHostMetadata.getHostedApps();
+                }
+                if (!appIdsMap.containsKey(metric.getAppId())) {
+                  appIdsMap.put(metric.getAppId(), metric.getAppId());
+                  LOG.info("Adding appId to hosted apps: appId = " +
+                      metric.getAppId() + ", hostname = " + 
metric.getHostName());
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private void updateAppAggregatesFromHostMetric(TimelineClusterMetric key, 
MetricClusterAggregate newMetricClusterAggregate, TimelineMetricHostMetadata 
timelineMetricHostMetadata) {
+    for (String appId : timelineMetricHostMetadata.getHostedApps().keySet()) {
+      TimelineClusterMetric timelineClusterMetric = new 
TimelineClusterMetric(key.getMetricName(), appId, key.getInstanceId(), 
key.getTimestamp());
+      putMetricIntoCache(timelineClusterMetric, newMetricClusterAggregate);
+    }
+  }
+
+  private void putMetricIntoCache(TimelineClusterMetric metricKey, 
MetricClusterAggregate metricValue) {
+    Lock lock = igniteCache.lock(metricKey);
+    lock.lock();
+    try {
+      MetricClusterAggregate metricClusterAggregateFromCache = 
igniteCache.get(metricKey);
+      if (metricClusterAggregateFromCache == null) {
+        igniteCache.put(metricKey, metricValue);
+      } else {
+        metricClusterAggregateFromCache.updateAggregates(metricValue);
+        igniteCache.put(metricKey, metricClusterAggregateFromCache);
+      }
+    } catch (Exception e) {
+      LOG.error("Exception : ", e);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public Map<String, Double> getPointInTimeCacheMetrics() {
+    CacheMetrics clusterIgniteMetrics = igniteCache.metrics();
+    Map<String, Double> metricsMap = new HashMap<>();
+    metricsMap.put("Cluster_AverageGetTime", (double) 
clusterIgniteMetrics.getAverageGetTime());
+    metricsMap.put("Cluster_AveragePutTime", (double) 
clusterIgniteMetrics.getAveragePutTime());
+    metricsMap.put("Cluster_KeySize", (double) 
clusterIgniteMetrics.getKeySize());
+    metricsMap.put("Cluster_OffHeapAllocatedSize", (double) 
clusterIgniteMetrics.getOffHeapAllocatedSize());
+    return metricsMap;
+  }
+
+  private boolean shouldBeSkipped(String metricName) {
+    for (String pattern : skipAggrPatternStrings) {
+      if (metricName.matches(pattern)) {
+        return true;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/42112e28/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregator.java
new file mode 100644
index 0000000..f538bdf
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregator.java
@@ -0,0 +1,494 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.core.timeline.aggregators;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedAggregateTimeMillis;
+import static 
org.apache.ambari.metrics.core.timeline.aggregators.AggregatorUtils.getRoundedCheckPointTimeMillis;
+import static 
org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
+import 
org.apache.ambari.metrics.core.timeline.availability.MetricCollectorHAController;
+import org.apache.ambari.metrics.core.timeline.query.Condition;
+import org.apache.ambari.metrics.core.timeline.query.EmptyCondition;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.ambari.metrics.core.timeline.PhoenixHBaseAccessor;
+import 
org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner;
+import 
org.apache.ambari.metrics.core.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
+import org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for all runnable aggregators. Provides common functions like
+ * check pointing and scheduling.
+ */
+public abstract class AbstractTimelineAggregator implements 
TimelineMetricAggregator {
+  protected final PhoenixHBaseAccessor hBaseAccessor;
+  protected final Logger LOG;
+  protected final long checkpointDelayMillis;
+  protected final Integer resultsetFetchSize;
+  protected Configuration metricsConf;
+  private String checkpointLocation;
+  private Long sleepIntervalMillis;
+  private Integer checkpointCutOffMultiplier;
+  private String aggregatorDisableParam;
+  protected String tableName;
+  protected String outputTableName;
+  protected Long nativeTimeRangeDelay;
+  protected AggregationTaskRunner taskRunner;
+  protected List<String> downsampleMetricPatterns;
+  protected List<CustomDownSampler> configuredDownSamplers;
+
+  // Explicitly name aggregators for logging needs
+  private final AGGREGATOR_NAME aggregatorName;
+
+  AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName,
+                             PhoenixHBaseAccessor hBaseAccessor,
+                             Configuration metricsConf) {
+    this.aggregatorName = aggregatorName;
+    this.hBaseAccessor = hBaseAccessor;
+    this.metricsConf = metricsConf;
+    this.checkpointDelayMillis = 
SECONDS.toMillis(metricsConf.getInt(TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY,
 120));
+    this.resultsetFetchSize = 
metricsConf.getInt(TimelineMetricConfiguration.RESULTSET_FETCH_SIZE, 2000);
+    this.LOG = 
LoggerFactory.getLogger(ACTUAL_AGGREGATOR_NAMES.get(aggregatorName));
+    this.configuredDownSamplers = 
DownSamplerUtils.getDownSamplers(metricsConf);
+    this.downsampleMetricPatterns = 
DownSamplerUtils.getDownsampleMetricPatterns(metricsConf);
+  }
+
+  public AbstractTimelineAggregator(AGGREGATOR_NAME aggregatorName,
+                                    PhoenixHBaseAccessor hBaseAccessor,
+                                    Configuration metricsConf,
+                                    String checkpointLocation,
+                                    Long sleepIntervalMillis,
+                                    Integer checkpointCutOffMultiplier,
+                                    String aggregatorDisableParam,
+                                    String tableName,
+                                    String outputTableName,
+                                    Long nativeTimeRangeDelay,
+                                    MetricCollectorHAController haController) {
+    this(aggregatorName, hBaseAccessor, metricsConf);
+    this.checkpointLocation = checkpointLocation;
+    this.sleepIntervalMillis = sleepIntervalMillis;
+    this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
+    this.aggregatorDisableParam = aggregatorDisableParam;
+    this.tableName = tableName;
+    this.outputTableName = outputTableName;
+    this.nativeTimeRangeDelay = nativeTimeRangeDelay;
+    this.taskRunner = haController != null && haController.isInitialized() ?
+      haController.getAggregationTaskRunner() : null;
+  }
+
+  @Override
+  public void run() {
+    LOG.info("Started Timeline aggregator thread @ " + new Date());
+    Long SLEEP_INTERVAL = getSleepIntervalMillis();
+    runOnce(SLEEP_INTERVAL);
+  }
+
+  /**
+   * Access relaxed for tests
+   */
+  public void runOnce(Long SLEEP_INTERVAL) {
+    boolean performAggregationFunction = true;
+    if (taskRunner != null) {
+      switch (getAggregatorType()) {
+        case HOST:
+          performAggregationFunction = taskRunner.performsHostAggregation();
+          break;
+        case CLUSTER:
+          performAggregationFunction = taskRunner.performsClusterAggregation();
+      }
+    }
+
+    if (performAggregationFunction) {
+      long currentTime = System.currentTimeMillis();
+      long lastCheckPointTime = 
readLastCheckpointSavingOnFirstRun(currentTime);
+
+      if (lastCheckPointTime != -1) {
+        LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
+          + ((currentTime - lastCheckPointTime) / 1000)
+          + " seconds.");
+
+        boolean success = doWork(lastCheckPointTime, lastCheckPointTime + 
SLEEP_INTERVAL);
+
+        if (success) {
+          try {
+            saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
+          } catch (IOException io) {
+            LOG.warn("Error saving checkpoint, restarting aggregation at " +
+              "previous checkpoint.");
+          }
+        }
+      }
+    } else {
+      LOG.info("Skipping aggregation function not owned by this instance.");
+    }
+  }
+
+  private long readLastCheckpointSavingOnFirstRun(long currentTime) {
+    long lastCheckPointTime = -1;
+
+    try {
+      lastCheckPointTime = readCheckPoint();
+      if (lastCheckPointTime != -1) {
+        LOG.info("Last Checkpoint read : " + new Date(lastCheckPointTime));
+        if (isLastCheckPointTooOld(currentTime, lastCheckPointTime)) {
+          LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
+            "lastCheckPointTime = " + new Date(lastCheckPointTime));
+          lastCheckPointTime = 
getRoundedAggregateTimeMillis(getSleepIntervalMillis()) - 
getSleepIntervalMillis();
+          LOG.info("Saving checkpoint time. " + new 
Date((lastCheckPointTime)));
+          saveCheckPoint(lastCheckPointTime);
+
+        } else {
+
+          if (lastCheckPointTime > 0) {
+            lastCheckPointTime = 
getRoundedCheckPointTimeMillis(lastCheckPointTime, getSleepIntervalMillis());
+            LOG.info("Rounded off checkpoint : " + new 
Date(lastCheckPointTime));
+          }
+
+          if (isLastCheckPointTooYoung(lastCheckPointTime)) {
+            LOG.info("Last checkpoint too recent for aggregation. Sleeping for 
1 cycle.");
+            return -1; //Skip Aggregation this time around
+          }
+        }
+      } else {
+        /*
+          No checkpoint. Save current rounded checkpoint and sleep for 1 cycle.
+         */
+        LOG.info("No checkpoint found");
+        long firstCheckPoint = 
getRoundedAggregateTimeMillis(getSleepIntervalMillis());
+        LOG.info("Saving checkpoint time. " + new Date((firstCheckPoint)));
+        saveCheckPoint(firstCheckPoint);
+      }
+    } catch (IOException io) {
+      LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
+    }
+    return lastCheckPointTime;
+  }
+
+  private boolean isLastCheckPointTooOld(long currentTime, long checkpoint) {
+    // first checkpoint is saved checkpointDelayMillis in the past,
+    // so here we also need to take it into account
+    return checkpoint != -1 &&
+      ((currentTime - checkpoint) > getCheckpointCutOffIntervalMillis());
+  }
+
+  private boolean isLastCheckPointTooYoung(long checkpoint) {
+    return checkpoint != -1 &&
+      ((getRoundedAggregateTimeMillis(getSleepIntervalMillis()) <= 
checkpoint));
+  }
+
+  protected long readCheckPoint() {
+    if (taskRunner != null) {
+      return taskRunner.getCheckpointManager().readCheckpoint(aggregatorName);
+    }
+    try {
+      File checkpoint = new File(getCheckpointLocation());
+      if (checkpoint.exists()) {
+        String contents = FileUtils.readFileToString(checkpoint);
+        if (contents != null && !contents.isEmpty()) {
+          return Long.parseLong(contents);
+        }
+      }
+    } catch (IOException io) {
+      LOG.debug("", io);
+    }
+    return -1;
+  }
+
+  protected void saveCheckPoint(long checkpointTime) throws IOException {
+    if (taskRunner != null) {
+      boolean success = 
taskRunner.getCheckpointManager().writeCheckpoint(aggregatorName, 
checkpointTime);
+      if (!success) {
+        LOG.error("Error saving checkpoint with AggregationTaskRunner, " +
+          "aggregator = " + aggregatorName + "value = " + checkpointTime);
+      }
+    } else {
+      File checkpoint = new File(getCheckpointLocation());
+      if (!checkpoint.exists()) {
+        boolean done = checkpoint.createNewFile();
+        if (!done) {
+          throw new IOException("Could not create checkpoint at location, " +
+            getCheckpointLocation());
+        }
+      }
+      FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
+    }
+  }
+
+  /**
+   * Read metrics written during the time interval and save the sum and total
+   * in the aggregate table.
+   *
+   * @param startTime Sample start time
+   * @param endTime Sample end time
+   */
+  public boolean doWork(long startTime, long endTime) {
+    LOG.info("Start aggregation cycle @ " + new Date() + ", " +
+      "startTime = " + new Date(startTime) + ", endTime = " + new 
Date(endTime));
+
+    boolean success = true;
+    Condition condition = prepareMetricQueryCondition(startTime, endTime);
+
+    Connection conn = null;
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+
+    try {
+      conn = hBaseAccessor.getConnection();
+      // FLUME 2. aggregate and ignore the instance
+      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+      LOG.debug("Query issued @: " + new Date());
+      if (condition.doUpdate()) {
+        int rows = stmt.executeUpdate();
+        conn.commit();
+        LOG.info(rows + " row(s) updated in aggregation.");
+
+        //TODO : Fix downsampling after UUID change.
+        //downsample(conn, startTime, endTime);
+      } else {
+        rs = stmt.executeQuery();
+      }
+      LOG.debug("Query returned @: " + new Date());
+
+      aggregate(rs, startTime, endTime);
+
+    } catch (Exception e) {
+      LOG.error("Exception during aggregating metrics.", e);
+      success = false;
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+
+    LOG.info("End aggregation cycle @ " + new Date());
+    return success;
+  }
+
+  protected abstract Condition prepareMetricQueryCondition(long startTime, 
long endTime);
+
+  protected abstract void aggregate(ResultSet rs, long startTime, long 
endTime) throws IOException, SQLException;
+
+  protected void downsample(Connection conn, Long startTime, Long endTime) {
+
+    LOG.debug("Checking for downsampling requests.");
+    if (CollectionUtils.isEmpty(configuredDownSamplers)) {
+      LOG.debug("No downsamplers configured");
+      return;
+    }
+
+    // Generate UPSERT query prefix. UPSERT part of the query is needed on the 
Aggregator side.
+    // SELECT part of the query is provided by the downsampler.
+    String queryPrefix = 
PhoenixTransactSQL.DOWNSAMPLE_CLUSTER_METRIC_SQL_UPSERT_PREFIX;
+    if (outputTableName.contains("RECORD")) {
+      queryPrefix = 
PhoenixTransactSQL.DOWNSAMPLE_HOST_METRIC_SQL_UPSERT_PREFIX;
+    }
+    queryPrefix = String.format(queryPrefix, outputTableName);
+
+    for (Iterator<CustomDownSampler> iterator = 
configuredDownSamplers.iterator(); iterator.hasNext();){
+      CustomDownSampler downSampler = iterator.next();
+
+      if (downSampler.validateConfigs()) {
+        EmptyCondition downSamplingCondition = new EmptyCondition();
+        downSamplingCondition.setDoUpdate(true);
+        List<String> stmts = 
downSampler.prepareDownSamplingStatement(startTime, endTime, tableName);
+        for (String stmt : stmts) {
+          downSamplingCondition.setStatement(queryPrefix + stmt);
+          runDownSamplerQuery(conn, downSamplingCondition);
+        }
+      } else {
+        LOG.warn("The following downsampler failed config validation : " + 
downSampler.getClass().getName() + "." +
+          "Removing it from downsamplers list.");
+        iterator.remove();
+      }
+    }
+
+  }
+
+  public Long getSleepIntervalMillis() {
+    return sleepIntervalMillis;
+  }
+
+  public void setSleepIntervalMillis(Long sleepIntervalMillis) {
+    this.sleepIntervalMillis = sleepIntervalMillis;
+  }
+
+  protected Integer getCheckpointCutOffMultiplier() {
+    return checkpointCutOffMultiplier;
+  }
+
+  protected Long getCheckpointCutOffIntervalMillis() {
+    return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
+  }
+
+  public boolean isDisabled() {
+    return metricsConf.getBoolean(aggregatorDisableParam, false);
+  }
+
+  protected String getQueryHint(Long startTime) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("/*+ ");
+    sb.append("NATIVE_TIME_RANGE(");
+    sb.append(startTime - nativeTimeRangeDelay);
+    sb.append(") ");
+    if (hBaseAccessor.isSkipBlockCacheForAggregatorsEnabled()) {
+      sb.append("NO_CACHE ");
+    }
+    sb.append("*/");
+    return sb.toString();
+  }
+
+  protected String getCheckpointLocation() {
+    return checkpointLocation;
+  }
+
+  /**
+   * Run 1 downsampler query.
+   * @param conn
+   * @param condition
+   */
+  private void runDownSamplerQuery(Connection conn, Condition condition) {
+
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
+    LOG.debug("Downsampling query : " + condition.getStatement());
+
+    try {
+      stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+      LOG.debug("Downsampler Query issued...");
+      if (condition.doUpdate()) {
+        int rows = stmt.executeUpdate();
+        conn.commit();
+        LOG.info(rows + " row(s) updated in downsampling.");
+      } else {
+        rs = stmt.executeQuery();
+      }
+      LOG.debug("Downsampler Query returned ...");
+      LOG.info("End Downsampling cycle.");
+
+    } catch (SQLException e) {
+      LOG.error("Exception during downsampling metrics.", e);
+    } finally {
+      if (rs != null) {
+        try {
+          rs.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (stmt != null) {
+        try {
+          stmt.close();
+        } catch (SQLException e) {
+          // Ignore
+        }
+      }
+      if (conn != null) {
+        try {
+          conn.close();
+        } catch (SQLException sql) {
+          // Ignore
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns the METRIC_NAME NOT LIKE clause if certain metrics or metric 
patterns are to be skipped
+   * since they will be downsampled.
+   * @return
+   */
+  protected String getDownsampledMetricSkipClause() {
+
+    //TODO Fix downsampling for UUID change.
+    return StringUtils.EMPTY;
+
+//    if (CollectionUtils.isEmpty(this.downsampleMetricPatterns)) {
+//      return StringUtils.EMPTY;
+//    }
+//
+//    StringBuilder sb = new StringBuilder();
+//
+//    for (int i = 0; i < downsampleMetricPatterns.size(); i++) {
+//      sb.append(" METRIC_NAME");
+//      sb.append(" NOT");
+//      sb.append(" LIKE ");
+//      sb.append("'" + downsampleMetricPatterns.get(i) + "'");
+//
+//      if (i < downsampleMetricPatterns.size() - 1) {
+//        sb.append(" AND ");
+//      }
+//    }
+//
+//    sb.append(" AND ");
+//    return sb.toString();
+  }
+
+  /**
+   * Get @AGGREGATOR_TYPE based on the output table.
+   * This is solely used by the HAController to determine which lock to 
acquire.
+   */
+  public AGGREGATOR_TYPE getAggregatorType() {
+    if (outputTableName.contains("RECORD")) {
+      return AGGREGATOR_TYPE.HOST;
+    } else if (outputTableName.contains("AGGREGATE")) {
+      return AGGREGATOR_TYPE.CLUSTER;
+    }
+    return null;
+  }
+
+  @Override
+  public AGGREGATOR_NAME getName() {
+    return aggregatorName;
+  }
+}

Reply via email to