AMBARI-21079. Add ability to sink Raw metrics to external system via Http. 
(swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c32eebf8
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c32eebf8
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c32eebf8

Branch: refs/heads/branch-3.0-ams
Commit: c32eebf8935acab9d1a510bf05e8f8aeb8873a6f
Parents: cd769e2
Author: Siddharth Wagle <swa...@hortonworks.com>
Authored: Tue May 23 14:01:14 2017 -0700
Committer: Siddharth Wagle <swa...@hortonworks.com>
Committed: Tue May 23 14:01:14 2017 -0700

----------------------------------------------------------------------
 ambari-metrics/ambari-metrics-common/pom.xml    |  29 ++-
 .../TimelineMetricsEhCacheSizeOfEngine.java     | 115 +++++++++
 .../ApplicationHistoryServer.java               |  13 +-
 .../timeline/HBaseTimelineMetricStore.java      |  30 ++-
 .../metrics/timeline/PhoenixHBaseAccessor.java  | 230 ++++++++++--------
 .../timeline/TimelineMetricConfiguration.java   | 176 +++++++++-----
 .../TimelineMetricClusterAggregator.java        |   2 +-
 .../TimelineMetricMetadataManager.java          |  15 +-
 .../timeline/sink/DefaultFSSinkProvider.java    | 153 ++++++++++++
 .../timeline/sink/ExternalMetricsSink.java      |  48 ++++
 .../timeline/sink/ExternalSinkProvider.java     |  35 +++
 .../metrics/timeline/sink/HttpSinkProvider.java | 231 +++++++++++++++++++
 .../DefaultInternalMetricsSourceProvider.java   |  42 ++++
 .../timeline/source/InternalMetricsSource.java  |  30 +++
 .../timeline/source/InternalSourceProvider.java |  39 ++++
 .../timeline/source/RawMetricsSource.java       |  93 ++++++++
 .../source/cache/InternalMetricCacheKey.java    | 109 +++++++++
 .../source/cache/InternalMetricCacheValue.java  |  37 +++
 .../source/cache/InternalMetricsCache.java      | 231 +++++++++++++++++++
 .../cache/InternalMetricsCacheProvider.java     |  48 ++++
 .../cache/InternalMetricsCacheSizeOfEngine.java |  66 ++++++
 .../TestApplicationHistoryServer.java           |   4 +-
 .../timeline/AbstractMiniHBaseClusterTest.java  |  49 ++--
 .../timeline/HBaseTimelineMetricStoreTest.java  |   8 +-
 .../timeline/ITPhoenixHBaseAccessor.java        | 110 ++++-----
 .../timeline/PhoenixHBaseAccessorTest.java      | 167 ++++++--------
 .../TimelineMetricStoreWatcherTest.java         |   4 +-
 .../aggregators/ITClusterAggregator.java        |  72 +++---
 .../timeline/discovery/TestMetadataManager.java |   2 +-
 .../timeline/discovery/TestMetadataSync.java    |   6 +-
 .../timeline/source/RawMetricsSourceTest.java   | 142 ++++++++++++
 .../cache/TimelineMetricsCacheSizeOfEngine.java |  71 +-----
 pom.xml                                         |   1 +
 33 files changed, 1933 insertions(+), 475 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-common/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/pom.xml 
b/ambari-metrics/ambari-metrics-common/pom.xml
index 62ae75f..dc2ab5e 100644
--- a/ambari-metrics/ambari-metrics-common/pom.xml
+++ b/ambari-metrics/ambari-metrics-common/pom.xml
@@ -70,43 +70,47 @@
               <relocations>
                 <relocation>
                   <pattern>com.google</pattern>
-                  
<shadedPattern>org.apache.hadoop.metrics2.sink.relocated.google</shadedPattern>
+                  
<shadedPattern>org.apache.ambari.metrics.sink.relocated.google</shadedPattern>
                 </relocation>
                 <relocation>
                   <pattern>org.apache.commons.io</pattern>
-                  
<shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.io</shadedPattern>StormTimelineMetricsReporter
+                  
<shadedPattern>org.apache.ambari.metrics.sink.relocated.commons.io</shadedPattern>StormTimelineMetricsReporter
                 </relocation>
                 <relocation>
                   <pattern>org.apache.commons.lang</pattern>
-                  
<shadedPattern>org.apache.hadoop.metrics2.sink.relocated.commons.lang</shadedPattern>
+                  
<shadedPattern>org.apache.ambari.metrics.relocated.commons.lang</shadedPattern>
                 </relocation>
                 <relocation>
                   <pattern>org.apache.curator</pattern>
-                  
<shadedPattern>org.apache.hadoop.metrics2.sink.relocated.curator</shadedPattern>
+                  
<shadedPattern>org.apache.ambari.metrics.sink.relocated.curator</shadedPattern>
                 </relocation>
                 <relocation>
                   <pattern>org.apache.jute</pattern>
-                  
<shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jute</shadedPattern>
+                  
<shadedPattern>org.apache.ambari.metrics.sink.relocated.jute</shadedPattern>
                 </relocation>
                 <relocation>
                   <pattern>org.apache.zookeeper</pattern>
-                  
<shadedPattern>org.apache.hadoop.metrics2.sink.relocated.zookeeper</shadedPattern>
+                  
<shadedPattern>org.apache.ambari.metrics.sink.relocated.zookeeper</shadedPattern>
                 </relocation>
                 <relocation>
                   <pattern>org.slf4j</pattern>
-                  
<shadedPattern>org.apache.hadoop.metrics2.sink.relocated.slf4j</shadedPattern>
+                  
<shadedPattern>org.apache.ambari.metrics.sink.relocated.slf4j</shadedPattern>
                 </relocation>
                 <relocation>
                   <pattern>org.apache.log4j</pattern>
-                  
<shadedPattern>org.apache.hadoop.metrics2.sink.relocated.log4j</shadedPattern>
+                  
<shadedPattern>org.apache.ambari.metrics.sink.relocated.log4j</shadedPattern>
                 </relocation>
                 <relocation>
                   <pattern>jline</pattern>
-                  
<shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jline</shadedPattern>
+                  
<shadedPattern>org.apache.ambari.metrics.sink.relocated.jline</shadedPattern>
                 </relocation>
                 <relocation>
                   <pattern>org.jboss</pattern>
-                  
<shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jboss</shadedPattern>
+                  
<shadedPattern>org.apache.ambari.metrics.sink.relocated.jboss</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>net.sf.ehcache</pattern>
+                  
<shadedPattern>org.apache.ambari.metrics.sink.relocated.ehcache</shadedPattern>
                 </relocation>
               </relocations>
             </configuration>
@@ -118,6 +122,11 @@
 
   <dependencies>
     <dependency>
+      <groupId>net.sf.ehcache</groupId>
+      <artifactId>ehcache</artifactId>
+      <version>2.10.0</version>
+    </dependency>
+    <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging</artifactId>
       <version>1.1.1</version>

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsEhCacheSizeOfEngine.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsEhCacheSizeOfEngine.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsEhCacheSizeOfEngine.java
new file mode 100644
index 0000000..ea694b7
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsEhCacheSizeOfEngine.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.cache;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import net.sf.ehcache.pool.SizeOfEngine;
+import net.sf.ehcache.pool.impl.DefaultSizeOfEngine;
+import net.sf.ehcache.pool.sizeof.ReflectionSizeOf;
+import net.sf.ehcache.pool.sizeof.SizeOf;
+
+/**
+ * Cache sizing engine that reduces reflective calls over the Object graph to
+ * find total Heap usage. Used for ehcache based on available memory.
+ */
+public abstract class TimelineMetricsEhCacheSizeOfEngine implements 
SizeOfEngine {
+  private final static Logger LOG = 
LoggerFactory.getLogger(TimelineMetricsEhCacheSizeOfEngine.class);
+
+  private static int DEFAULT_MAX_DEPTH = 1000;
+  private static boolean DEFAULT_ABORT_WHEN_MAX_DEPTH_EXCEEDED = false;
+
+  // Base Engine
+  protected SizeOfEngine underlying = null;
+
+  // Counter
+  protected SizeOf reflectionSizeOf = new ReflectionSizeOf();
+
+  // Optimizations
+  private volatile long timelineMetricPrimitivesApproximation = 0;
+
+  // Map entry sizing
+  private long sizeOfMapEntry;
+  private long sizeOfMapEntryOverhead;
+
+  protected TimelineMetricsEhCacheSizeOfEngine(SizeOfEngine underlying) {
+    this.underlying = underlying;
+  }
+
+  public TimelineMetricsEhCacheSizeOfEngine() {
+    this(new DefaultSizeOfEngine(DEFAULT_MAX_DEPTH, 
DEFAULT_ABORT_WHEN_MAX_DEPTH_EXCEEDED));
+
+    this.sizeOfMapEntry = reflectionSizeOf.sizeOf(new Long(1)) +
+      reflectionSizeOf.sizeOf(new Double(2.0));
+
+    //SizeOfMapEntryOverhead = SizeOfMapWithOneEntry - (SizeOfEmptyMap + 
SizeOfOneEntry)
+    TreeMap<Long, Double> map = new TreeMap<>();
+    long emptyMapSize = reflectionSizeOf.sizeOf(map);
+    map.put(new Long(1), new Double(2.0));
+    long sizeOfMapOneEntry = reflectionSizeOf.deepSizeOf(DEFAULT_MAX_DEPTH, 
DEFAULT_ABORT_WHEN_MAX_DEPTH_EXCEEDED, map).getCalculated();
+    this.sizeOfMapEntryOverhead =  sizeOfMapOneEntry - (emptyMapSize + 
this.sizeOfMapEntry);
+
+    LOG.info("Creating custom sizeof engine for TimelineMetrics.");
+  }
+
+  /**
+   * Return size of the metrics TreeMap in an optimized way.
+   *
+   */
+  protected long getTimelineMetricsSize(TimelineMetrics metrics) {
+    long size = 8; // Object reference
+
+    if (metrics != null) {
+      for (TimelineMetric metric : metrics.getMetrics()) {
+
+        if (timelineMetricPrimitivesApproximation == 0) {
+          timelineMetricPrimitivesApproximation += 
reflectionSizeOf.sizeOf(metric.getMetricName());
+          timelineMetricPrimitivesApproximation += 
reflectionSizeOf.sizeOf(metric.getAppId());
+          timelineMetricPrimitivesApproximation += 
reflectionSizeOf.sizeOf(metric.getHostName());
+          timelineMetricPrimitivesApproximation += 
reflectionSizeOf.sizeOf(metric.getInstanceId());
+          timelineMetricPrimitivesApproximation += 
reflectionSizeOf.sizeOf(metric.getTimestamp());
+          timelineMetricPrimitivesApproximation += 
reflectionSizeOf.sizeOf(metric.getStartTime());
+          timelineMetricPrimitivesApproximation += 
reflectionSizeOf.sizeOf(metric.getType());
+          timelineMetricPrimitivesApproximation += 8; // Object overhead
+
+          LOG.debug("timelineMetricPrimitivesApproximation bytes = " + 
timelineMetricPrimitivesApproximation);
+        }
+        size += timelineMetricPrimitivesApproximation;
+        size += getValueMapSize(metric.getMetricValues());
+      }
+      LOG.debug("Total Size of metric values in cache: " + size);
+    }
+    return size;
+  }
+
+  protected long getValueMapSize(Map<Long, Double> metricValues) {
+    long size = 0;
+    if (metricValues != null && !metricValues.isEmpty()) {
+      // Numeric wrapper: 12 bytes + 8 bytes Data type + 4 bytes alignment = 
48 (Long, Double)
+      // Tree Map: 12 bytes for header + 20 bytes for 5 object fields : 
pointers + 1 byte for flag = 40
+      LOG.debug("Size of metric value: " + (sizeOfMapEntry + 
sizeOfMapEntryOverhead) * metricValues.size());
+      size += (sizeOfMapEntry + sizeOfMapEntryOverhead) * metricValues.size(); 
// Treemap size is O(1)
+    }
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
index 1ca9c33..331670d 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricsService;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricStore;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.MemoryTimelineStore;
@@ -71,7 +71,7 @@ public class ApplicationHistoryServer extends 
CompositeService {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    metricConfiguration = new TimelineMetricConfiguration();
+    metricConfiguration = TimelineMetricConfiguration.getInstance();
     metricConfiguration.initialize();
     historyManager = createApplicationHistory();
     ahsClientService = createApplicationHistoryClientService(historyManager);
@@ -164,11 +164,16 @@ public class ApplicationHistoryServer extends 
CompositeService {
 
   protected TimelineMetricStore createTimelineMetricStore(Configuration conf) {
     LOG.info("Creating metrics store.");
-    return new HBaseTimelineMetricStore(metricConfiguration);
+    return new HBaseTimelineMetricsService(metricConfiguration);
   }
 
   protected void startWebApp() {
-    String bindAddress = metricConfiguration.getWebappAddress();
+    String bindAddress = null;
+    try {
+      bindAddress = metricConfiguration.getWebappAddress();
+    } catch (Exception e) {
+      throw new ExceptionInInitializerError("Cannot find bind address");
+    }
     LOG.info("Instantiating AHSWebApp at " + bindAddress);
     try {
       Configuration conf = metricConfiguration.getMetricsConf();

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index f984253..c8eb65f 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -51,6 +51,8 @@ import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunctionFactory;
 
 import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -71,9 +73,9 @@ import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
 
-public class HBaseTimelineMetricStore extends AbstractService implements 
TimelineMetricStore {
+public class HBaseTimelineMetricsService extends AbstractService implements 
TimelineMetricStore {
 
-  static final Log LOG = LogFactory.getLog(HBaseTimelineMetricStore.class);
+  static final Log LOG = LogFactory.getLog(HBaseTimelineMetricsService.class);
   private final TimelineMetricConfiguration configuration;
   private PhoenixHBaseAccessor hBaseAccessor;
   private static volatile boolean isInitialized = false;
@@ -87,25 +89,28 @@ public class HBaseTimelineMetricStore extends 
AbstractService implements Timelin
    * Construct the service.
    *
    */
-  public HBaseTimelineMetricStore(TimelineMetricConfiguration configuration) {
-    super(HBaseTimelineMetricStore.class.getName());
+  public HBaseTimelineMetricsService(TimelineMetricConfiguration 
configuration) {
+    super(HBaseTimelineMetricsService.class.getName());
     this.configuration = configuration;
   }
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
-    initializeSubsystem(configuration.getHbaseConf(), 
configuration.getMetricsConf());
+    initializeSubsystem();
   }
 
-  private synchronized void initializeSubsystem(Configuration hbaseConf,
-                                                Configuration metricsConf) {
+  private synchronized void initializeSubsystem() {
     if (!isInitialized) {
-      hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
+      hBaseAccessor = new PhoenixHBaseAccessor(null);
       // Initialize schema
       hBaseAccessor.initMetricSchema();
       // Initialize metadata from store
-      metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor, 
metricsConf);
+      try {
+        metricMetadataManager = new 
TimelineMetricMetadataManager(hBaseAccessor);
+      } catch (MalformedURLException | URISyntaxException e) {
+        throw new ExceptionInInitializerError("Unable to initialize metadata 
manager");
+      }
       metricMetadataManager.initializeMetadata();
       // Initialize policies before TTL update
       hBaseAccessor.initPoliciesAndTTL();
@@ -127,6 +132,13 @@ public class HBaseTimelineMetricStore extends 
AbstractService implements Timelin
       //Initialize whitelisting & blacklisting if needed
       TimelineMetricsFilter.initializeMetricFilter(configuration);
 
+      Configuration metricsConf = null;
+      try {
+        metricsConf = configuration.getMetricsConf();
+      } catch (Exception e) {
+        throw new ExceptionInInitializerError("Cannot initialize 
configuration.");
+      }
+
       defaultTopNHostsLimit = 
Integer.parseInt(metricsConf.get(DEFAULT_TOPN_HOSTS_LIMIT, "20"));
       if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, 
"true"))) {
         LOG.info("Using group by aggregators for aggregating host and cluster 
metrics.");

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 3b2a119..15b0bb8 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -17,88 +17,18 @@
  */
 package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
-import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
-import org.apache.hadoop.metrics2.sink.timeline.Precision;
-import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import org.apache.phoenix.exception.PhoenixIOException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATE_TABLE_SPLIT_POINTS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_BLOCKING_STORE_FILES;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATORS_SKIP_BLOCK_CACHE;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_DAILY_TABLE_TTL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_SECOND_TABLE_TTL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_MAX_RETRIES;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RETRY_INTERVAL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_BLOCKING_STORE_FILES;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_DAILY_TABLE_TTL;
@@ -107,11 +37,19 @@ import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.OUT_OFF_BAND_DATA_TIME_ALLOWANCE;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_SPLIT_POINTS;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CONTAINER_METRICS_TTL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATE_TABLE_HBASE_BLOCKING_STORE_FILES;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_ENABLED;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_SIZE;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_CLASS;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_AGGREGATE_TABLE_COMPACTION_POLICY_KEY;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_CLASS;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HBASE_PRECISION_TABLE_COMPACTION_POLICY_KEY;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_DURABILITY;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRIC_AGGREGATOR_SINK_CLASS;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_CONTAINER_METRICS_TABLE_SQL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_HOSTED_APPS_METADATA_TABLE_SQL;
@@ -120,7 +58,6 @@ import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_METADATA_TABLE_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.ALTER_METRICS_METADATA_TABLE;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_ENCODING;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
@@ -139,11 +76,80 @@ import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_HOSTED_APPS_METADATA_SQL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_INSTANCE_HOST_METADATA_SQL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METADATA_SQL;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_CONTAINER_METRICS_SQL;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME.RAW_METRICS;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.Precision;
+import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalSinkProvider;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalMetricsSource;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.apache.phoenix.exception.PhoenixIOException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
 
 
 /**
@@ -198,16 +204,29 @@ public class PhoenixHBaseAccessor {
 
   private HashMap<String, String> tableTTL = new HashMap<>();
 
-  public PhoenixHBaseAccessor(Configuration hbaseConf,
-                              Configuration metricsConf){
-    this(hbaseConf, metricsConf, new DefaultPhoenixDataSource(hbaseConf));
+  private final TimelineMetricConfiguration configuration;
+  private InternalMetricsSource rawMetricsSource;
+
+  public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) {
+    this(TimelineMetricConfiguration.getInstance(), dataSource);
   }
 
-  PhoenixHBaseAccessor(Configuration hbaseConf,
-                       Configuration metricsConf,
+  // Test friendly construction since mock instrumentation is difficult to get
+  // working with hadoop mini cluster
+  PhoenixHBaseAccessor(TimelineMetricConfiguration configuration,
                        PhoenixConnectionProvider dataSource) {
-    this.hbaseConf = hbaseConf;
-    this.metricsConf = metricsConf;
+    this.configuration = TimelineMetricConfiguration.getInstance();
+    try {
+      this.hbaseConf = configuration.getHbaseConf();
+      this.metricsConf = configuration.getMetricsConf();
+    } catch (Exception e) {
+      throw new ExceptionInInitializerError("Cannot initialize 
configuration.");
+    }
+    if (dataSource == null) {
+      dataSource = new DefaultPhoenixDataSource(hbaseConf);
+    }
+    this.dataSource = dataSource;
+
     RESULTSET_LIMIT = metricsConf.getInt(GLOBAL_RESULT_LIMIT, RESULTSET_LIMIT);
     try {
       Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
@@ -215,7 +234,7 @@ public class PhoenixHBaseAccessor {
       LOG.error("Phoenix client jar not found in the classpath.", e);
       throw new IllegalStateException(e);
     }
-    this.dataSource = dataSource;
+
     this.retryCounterFactory = new 
RetryCounterFactory(metricsConf.getInt(GLOBAL_MAX_RETRIES, 10),
       (int) SECONDS.toMillis(metricsConf.getInt(GLOBAL_RETRY_INTERVAL, 3)));
     this.outOfBandTimeAllowance = 
metricsConf.getLong(OUT_OFF_BAND_DATA_TIME_ALLOWANCE,
@@ -249,10 +268,20 @@ public class PhoenixHBaseAccessor {
         metricsConf.getClass(TIMELINE_METRIC_AGGREGATOR_SINK_CLASS, null,
             TimelineMetricsAggregatorSink.class);
     if (metricSinkClass != null) {
-      aggregatorSink =
-          ReflectionUtils.newInstance(metricSinkClass, metricsConf);
+      aggregatorSink = ReflectionUtils.newInstance(metricSinkClass, 
metricsConf);
       LOG.info("Initialized aggregator sink class " + metricSinkClass);
     }
+
+    ExternalSinkProvider externalSinkProvider = 
configuration.getExternalSinkProvider();
+    InternalSourceProvider internalSourceProvider = 
configuration.getInternalSourceProvider();
+    if (externalSinkProvider != null) {
+      ExternalMetricsSink rawMetricsSink = 
externalSinkProvider.getExternalMetricsSink(RAW_METRICS);
+      int interval = configuration.getExternalSinkInterval(RAW_METRICS);
+      if (interval == -1){
+        interval = cacheCommitInterval;
+      }
+      rawMetricsSource = 
internalSourceProvider.getInternalMetricsSource(RAW_METRICS, interval, 
rawMetricsSink);
+    }
   }
 
   public boolean isInsertCacheEmpty() {
@@ -261,12 +290,15 @@ public class PhoenixHBaseAccessor {
 
   public void commitMetricsFromCache() {
     LOG.debug("Clearing metrics cache");
-    List<TimelineMetrics> metricsArray = new 
ArrayList<TimelineMetrics>(insertCache.size());
-    while (!insertCache.isEmpty()) {
-      metricsArray.add(insertCache.poll());
+    List<TimelineMetrics> metricsList = new 
ArrayList<TimelineMetrics>(insertCache.size());
+    if (!insertCache.isEmpty()) {
+      insertCache.drainTo(metricsList); // More performant than poll
     }
-    if (metricsArray.size() > 0) {
-      commitMetrics(metricsArray);
+    if (metricsList.size() > 0) {
+      commitMetrics(metricsList);
+      if (rawMetricsSource != null) {
+        rawMetricsSource.publishTimelineMetrics(metricsList);
+      }
     }
   }
 
@@ -367,7 +399,7 @@ public class PhoenixHBaseAccessor {
   }
 
   @SuppressWarnings("unchecked")
-  public static TreeMap<Long, Double>  readMetricFromJSON(String json) throws 
IOException {
+  public static TreeMap<Long, Double> readMetricFromJSON(String json) throws 
IOException {
     return mapper.readValue(json, metricValuesTypeRef);
   }
 
@@ -701,6 +733,9 @@ public class PhoenixHBaseAccessor {
     return "";
   }
 
+  /**
+   * Insert precision YARN container data.
+   */
   public void insertContainerMetrics(List<ContainerMetric> metrics)
       throws SQLException, IOException {
     Connection conn = getConnection();
@@ -766,6 +801,9 @@ public class PhoenixHBaseAccessor {
     }
   }
 
+  /**
+   * Insert precision data.
+   */
   public void insertMetricRecordsWithMetadata(TimelineMetricMetadataManager 
metadataManager,
                                               TimelineMetrics metrics, boolean 
skipCache) throws SQLException, IOException {
     List<TimelineMetric> timelineMetrics = metrics.getMetrics();
@@ -1385,9 +1423,7 @@ public class PhoenixHBaseAccessor {
       try {
         aggregatorSink.saveClusterAggregateRecords(records);
       } catch (Exception e) {
-        LOG.warn(
-            "Error writing cluster aggregate records metrics to external sink. 
"
-                + e);
+        LOG.warn("Error writing cluster aggregate records metrics to external 
sink. ", e);
       }
     }
   }
@@ -1398,8 +1434,8 @@ public class PhoenixHBaseAccessor {
    *
    * @throws SQLException
    */
-  public void saveClusterTimeAggregateRecords(Map<TimelineClusterMetric, 
MetricHostAggregate> records,
-                                              String tableName) throws 
SQLException {
+  public void saveClusterAggregateRecordsSecond(Map<TimelineClusterMetric, 
MetricHostAggregate> records,
+                                                String tableName) throws 
SQLException {
     if (records == null || records.isEmpty()) {
       LOG.debug("Empty aggregate records.");
       return;

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 023465b..de33bd1 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -17,15 +17,8 @@
  */
 package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-
 import java.io.BufferedReader;
-import java.io.IOException;
+import java.io.File;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.InetAddress;
@@ -37,6 +30,21 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalSinkProvider;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.DefaultInternalMetricsSourceProvider;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
+import org.apache.log4j.Appender;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Logger;
+
 /**
  * Configuration class that reads properties from ams-site.xml. All values
  * for time or intervals are given in seconds.
@@ -56,6 +64,12 @@ public class TimelineMetricConfiguration {
   public static final String TIMELINE_METRIC_AGGREGATOR_SINK_CLASS =
     "timeline.metrics.service.aggregator.sink.class";
 
+  public static final String TIMELINE_METRICS_SOURCE_PROVIDER_CLASS =
+    "timeline.metrics.service.source.provider.class";
+
+  public static final String TIMELINE_METRICS_SINK_PROVIDER_CLASS =
+    "timeline.metrics.service.sink.provider.class";
+
   public static final String TIMELINE_METRICS_CACHE_SIZE =
     "timeline.metrics.cache.size";
 
@@ -297,38 +311,63 @@ public class TimelineMetricConfiguration {
   public static final String AMSHBASE_METRICS_WHITESLIST_FILE = 
"amshbase_metrics_whitelist";
 
   public static final String TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION = 
"timeline.metrics.host.inmemory.aggregation";
+  public static final String INTERNAL_CACHE_HEAP_PERCENT =
+    "timeline.metrics.service.cache.%s.heap.percent";
+
+  public static final String EXTERNAL_SINK_INTERVAL =
+    "timeline.metrics.service.external.sink.%s.interval";
+
+  public static final String DEFAULT_EXTERNAL_SINK_DIR =
+    "timeline.metrics.service.external.sink.dir";
 
   private Configuration hbaseConf;
   private Configuration metricsConf;
   private Configuration amsEnvConf;
   private volatile boolean isInitialized = false;
 
+  private static TimelineMetricConfiguration instance = new 
TimelineMetricConfiguration();
+
+  private TimelineMetricConfiguration() {}
+
+  public static TimelineMetricConfiguration getInstance() {
+    return instance;
+  }
+
+  // Tests
+  public TimelineMetricConfiguration(Configuration hbaseConf, Configuration 
metricsConf) {
+    this.hbaseConf = hbaseConf;
+    this.metricsConf = metricsConf;
+    this.isInitialized = true;
+  }
+
   public void initialize() throws URISyntaxException, MalformedURLException {
-    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-    if (classLoader == null) {
-      classLoader = getClass().getClassLoader();
-    }
-    URL hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE);
-    URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
-    LOG.info("Found hbase site configuration: " + hbaseResUrl);
-    LOG.info("Found metric service configuration: " + amsResUrl);
-
-    if (hbaseResUrl == null) {
-      throw new IllegalStateException("Unable to initialize the metrics " +
-        "subsystem. No hbase-site present in the classpath.");
-    }
+    if (!isInitialized) {
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      if (classLoader == null) {
+        classLoader = getClass().getClassLoader();
+      }
+      URL hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE);
+      URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
+      LOG.info("Found hbase site configuration: " + hbaseResUrl);
+      LOG.info("Found metric service configuration: " + amsResUrl);
+
+      if (hbaseResUrl == null) {
+        throw new IllegalStateException("Unable to initialize the metrics " +
+          "subsystem. No hbase-site present in the classpath.");
+      }
 
-    if (amsResUrl == null) {
-      throw new IllegalStateException("Unable to initialize the metrics " +
-        "subsystem. No ams-site present in the classpath.");
-    }
+      if (amsResUrl == null) {
+        throw new IllegalStateException("Unable to initialize the metrics " +
+          "subsystem. No ams-site present in the classpath.");
+      }
 
-    hbaseConf = new Configuration(true);
-    hbaseConf.addResource(hbaseResUrl.toURI().toURL());
-    metricsConf = new Configuration(true);
-    metricsConf.addResource(amsResUrl.toURI().toURL());
+      hbaseConf = new Configuration(true);
+      hbaseConf.addResource(hbaseResUrl.toURI().toURL());
+      metricsConf = new Configuration(true);
+      metricsConf.addResource(amsResUrl.toURI().toURL());
 
-    isInitialized = true;
+      isInitialized = true;
+    }
   }
 
   public Configuration getHbaseConf() throws URISyntaxException, 
MalformedURLException {
@@ -346,31 +385,19 @@ public class TimelineMetricConfiguration {
   }
 
   public String getZKClientPort() throws MalformedURLException, 
URISyntaxException {
-    if (!isInitialized) {
-      initialize();
-    }
-    return hbaseConf.getTrimmed("hbase.zookeeper.property.clientPort", "2181");
+    return getHbaseConf().getTrimmed("hbase.zookeeper.property.clientPort", 
"2181");
   }
 
   public String getZKQuorum() throws MalformedURLException, URISyntaxException 
{
-    if (!isInitialized) {
-      initialize();
-    }
-    return hbaseConf.getTrimmed("hbase.zookeeper.quorum");
+    return getHbaseConf().getTrimmed("hbase.zookeeper.quorum");
   }
 
   public String getClusterZKClientPort() throws MalformedURLException, 
URISyntaxException {
-    if (!isInitialized) {
-      initialize();
-    }
-    return metricsConf.getTrimmed("cluster.zookeeper.property.clientPort", 
"2181");
+    return 
getMetricsConf().getTrimmed("cluster.zookeeper.property.clientPort", "2181");
   }
 
   public String getClusterZKQuorum() throws MalformedURLException, 
URISyntaxException {
-    if (!isInitialized) {
-      initialize();
-    }
-    return metricsConf.getTrimmed("cluster.zookeeper.quorum");
+    return getMetricsConf().getTrimmed("cluster.zookeeper.quorum");
   }
 
   public String getInstanceHostnameFromEnv() throws UnknownHostException {
@@ -390,12 +417,9 @@ public class TimelineMetricConfiguration {
     return DEFAULT_INSTANCE_PORT;
   }
 
-  public String getWebappAddress() {
+  public String getWebappAddress() throws MalformedURLException, 
URISyntaxException {
     String defaultHttpAddress = "0.0.0.0:6188";
-    if (metricsConf != null) {
-      return metricsConf.get(WEBAPP_HTTP_ADDRESS, defaultHttpAddress);
-    }
-    return defaultHttpAddress;
+    return getMetricsConf().get(WEBAPP_HTTP_ADDRESS, defaultHttpAddress);
   }
 
   public int getTimelineMetricsServiceHandlerThreadCount() {
@@ -450,8 +474,8 @@ public class TimelineMetricConfiguration {
 
   public boolean isDistributedCollectorModeDisabled() {
     try {
-      if (metricsConf != null) {
-        return 
Boolean.parseBoolean(metricsConf.get("timeline.metrics.service.distributed.collector.mode.disabled",
 "false"));
+      if (getMetricsConf() != null) {
+        return 
Boolean.parseBoolean(getMetricsConf().get("timeline.metrics.service.distributed.collector.mode.disabled",
 "false"));
       }
       return false;
     } catch (Exception e) {
@@ -497,4 +521,50 @@ public class TimelineMetricConfiguration {
 
     return whitelist;
   }
+
+  public int getExternalSinkInterval(SOURCE_NAME sourceName) {
+    return 
Integer.parseInt(metricsConf.get(String.format(EXTERNAL_SINK_INTERVAL, 
sourceName), "-1"));
+  }
+
+  public InternalSourceProvider getInternalSourceProvider() {
+    Class<? extends InternalSourceProvider> providerClass =
+      metricsConf.getClass(TIMELINE_METRICS_SOURCE_PROVIDER_CLASS,
+        DefaultInternalMetricsSourceProvider.class, 
InternalSourceProvider.class);
+    return ReflectionUtils.newInstance(providerClass, metricsConf);
+  }
+
+  public ExternalSinkProvider getExternalSinkProvider() {
+    Class<?> providerClass = 
metricsConf.getClassByNameOrNull(TIMELINE_METRICS_SINK_PROVIDER_CLASS);
+    if (providerClass != null) {
+      return (ExternalSinkProvider) ReflectionUtils.newInstance(providerClass, 
metricsConf);
+    }
+    return null;
+  }
+
+  public String getInternalCacheHeapPercent(String instanceName) {
+    String heapPercent = 
metricsConf.get(String.format(INTERNAL_CACHE_HEAP_PERCENT, instanceName));
+    if (StringUtils.isEmpty(heapPercent)) {
+      return "5%";
+    } else {
+      return heapPercent.endsWith("%") ? heapPercent : heapPercent + "%";
+    }
+  }
+
+  public String getDefaultMetricsSinkDir() {
+    String dirPath = metricsConf.get(DEFAULT_EXTERNAL_SINK_DIR);
+    if (dirPath == null) {
+      // Only one logger at the time of writing
+      Appender appender = (Appender) 
Logger.getRootLogger().getAllAppenders().nextElement();
+      if (appender instanceof FileAppender) {
+        File f = new File(((FileAppender) appender).getFile());
+        if (f.exists()) {
+          dirPath = f.getParent();
+        } else {
+          dirPath = "/tmp";
+        }
+      }
+    }
+
+    return dirPath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index ba16b43..74d4013 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -83,7 +83,7 @@ public class TimelineMetricClusterAggregator extends 
AbstractTimelineAggregator
     Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = 
aggregateMetricsFromResultSet(rs, endTime);
 
     LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-    hBaseAccessor.saveClusterTimeAggregateRecords(hostAggregateMap, 
outputTableName);
+    hBaseAccessor.saveClusterAggregateRecordsSecond(hostAggregateMap, 
outputTableName);
   }
 
   private Map<TimelineClusterMetric, MetricHostAggregate> 
aggregateMetricsFromResultSet(ResultSet rs, long endTime)

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
index f904ebe..8a71756 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/discovery/TimelineMetricMetadataManager.java
@@ -27,8 +27,11 @@ import 
org.apache.hadoop.metrics2.sink.timeline.MetadataException;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 
+import java.net.MalformedURLException;
+import java.net.URISyntaxException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -69,17 +72,21 @@ public class TimelineMetricMetadataManager {
   // Filter metrics names matching given patterns, from metadata
   final List<String> metricNameFilters = new ArrayList<>();
 
-  public TimelineMetricMetadataManager(PhoenixHBaseAccessor hBaseAccessor,
-                                       Configuration metricsConf) {
-    this.hBaseAccessor = hBaseAccessor;
+  // Test friendly construction since mock instrumentation is difficult to get
+  // working with hadoop mini cluster
+  public TimelineMetricMetadataManager(Configuration metricsConf, 
PhoenixHBaseAccessor hBaseAccessor) {
     this.metricsConf = metricsConf;
-
+    this.hBaseAccessor = hBaseAccessor;
     String patternStrings = metricsConf.get(TIMELINE_METRIC_METADATA_FILTERS);
     if (!StringUtils.isEmpty(patternStrings)) {
       metricNameFilters.addAll(Arrays.asList(patternStrings.split(",")));
     }
   }
 
+  public TimelineMetricMetadataManager(PhoenixHBaseAccessor hBaseAccessor) 
throws MalformedURLException, URISyntaxException {
+    this(TimelineMetricConfiguration.getInstance().getMetricsConf(), 
hBaseAccessor);
+  }
+
   /**
    * Initialize Metadata from the store
    */

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java
new file mode 100644
index 0000000..6ec6cf9
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/DefaultFSSinkProvider.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Date;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
+
+public class DefaultFSSinkProvider implements ExternalSinkProvider {
+  private static final Log LOG = 
LogFactory.getLog(DefaultFSSinkProvider.class);
+  TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance();
+  private final DefaultExternalMetricsSink sink = new 
DefaultExternalMetricsSink();
+  private long FIXED_FILE_SIZE;
+  private final String SINK_FILE_NAME = "external-metrics-sink.dat";
+  private final String SEPARATOR = ", ";
+  private final String LINE_SEP = System.lineSeparator();
+  private final String HEADERS = "METRIC, APP_ID, INSTANCE_ID, HOSTNAME, 
START_TIME, DATA";
+
+  public DefaultFSSinkProvider() {
+    try {
+      FIXED_FILE_SIZE = 
conf.getMetricsConf().getLong("timeline.metrics.service.external.fs.sink.filesize",
 FileUtils.ONE_MB * 100);
+    } catch (Exception ignored) {
+      FIXED_FILE_SIZE = FileUtils.ONE_MB * 100;
+    }
+  }
+
+  @Override
+  public ExternalMetricsSink 
getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) {
+    return sink;
+  }
+
+  class DefaultExternalMetricsSink implements ExternalMetricsSink {
+
+    @Override
+    public int getSinkTimeOutSeconds() {
+      return 10;
+    }
+
+    @Override
+    public int getFlushSeconds() {
+      try {
+        return 
conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
+      } catch (Exception e) {
+        LOG.warn("Cannot read cache commit interval.");
+      }
+      return 3;
+    }
+
+    private boolean createFile(File f) {
+      boolean created = false;
+      if (!f.exists()) {
+        try {
+          created = f.createNewFile();
+          FileUtils.writeStringToFile(f, HEADERS);
+        } catch (IOException e) {
+          LOG.error("Cannot create " + SINK_FILE_NAME + " at " + f.getPath());
+          return false;
+        }
+      }
+
+      return created;
+    }
+
+    private boolean shouldReCreate(File f) {
+      if (!f.exists()) {
+        return true;
+      }
+      if (FileUtils.sizeOf(f) > FIXED_FILE_SIZE) {
+        return true;
+      }
+      return false;
+    }
+
+    @Override
+    public void sinkMetricData(Collection<TimelineMetrics> metrics) {
+      String dirPath = 
TimelineMetricConfiguration.getInstance().getDefaultMetricsSinkDir();
+      File dir = new File(dirPath);
+      if (!dir.exists()) {
+        LOG.error("Cannot sink data to file system, incorrect dir path " + 
dirPath);
+        return;
+      }
+
+      File f = FileUtils.getFile(dirPath, SINK_FILE_NAME);
+      if (shouldReCreate(f)) {
+        if (!f.delete()) {
+          LOG.warn("Unable to delete external sink file.");
+          return;
+        }
+        createFile(f);
+      }
+
+      if (metrics != null) {
+        for (TimelineMetrics timelineMetrics : metrics) {
+          for (TimelineMetric metric : timelineMetrics.getMetrics()) {
+            StringBuilder sb = new StringBuilder();
+            sb.append(metric.getMetricName());
+            sb.append(SEPARATOR);
+            sb.append(metric.getAppId());
+            sb.append(SEPARATOR);
+            if (StringUtils.isEmpty(metric.getInstanceId())) {
+              sb.append(SEPARATOR);
+            } else {
+              sb.append(metric.getInstanceId());
+              sb.append(SEPARATOR);
+            }
+            if (StringUtils.isEmpty(metric.getHostName())) {
+              sb.append(SEPARATOR);
+            } else {
+              sb.append(metric.getHostName());
+              sb.append(SEPARATOR);
+            }
+            sb.append(new Date(metric.getStartTime()));
+            sb.append(SEPARATOR);
+            sb.append(metric.getMetricValues().toString());
+            sb.append(LINE_SEP);
+            try {
+              FileUtils.writeStringToFile(f, sb.toString());
+            } catch (IOException e) {
+              LOG.warn("Unable to sink data to file " + f.getPath());
+            }
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java
new file mode 100644
index 0000000..ff06307
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalMetricsSink.java
@@ -0,0 +1,48 @@
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import java.util.Collection;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface ExternalMetricsSink {
+  /**
+   * How many seconds to wait on sink before dropping metrics.
+   * Note: Care should be taken that this timeout does not bottleneck the
+   * sink thread.
+   */
+  int getSinkTimeOutSeconds();
+
+  /**
+   * How frequently to flush data to external system.
+   * Default would be between 60 - 120 seconds, coherent with default sink
+   * interval of AMS.
+   */
+  int getFlushSeconds();
+
+  /**
+   * Raw data stream to process / store on external system.
+   * The data will be held in an in-memory cache and flushed at flush seconds
+   * or when the cache size limit is exceeded we will flush the cache and
+   * drop data if write fails.
+   *
+   * @param metrics {@link Collection<TimelineMetrics>}
+   */
+  void sinkMetricData(Collection<TimelineMetrics> metrics);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
new file mode 100644
index 0000000..48887d9
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/ExternalSinkProvider.java
@@ -0,0 +1,35 @@
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider.SOURCE_NAME;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Configurable provider for sink classes that match the metrics sources.
+ * Provider can return same sink of different sinks for each source.
+ */
+public interface ExternalSinkProvider {
+
+  /**
+   * Return an instance of the metrics sink for the give source
+   * @return {@link ExternalMetricsSink}
+   */
+  ExternalMetricsSink getExternalMetricsSink(SOURCE_NAME sourceName);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
new file mode 100644
index 0000000..bb84c8a
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/sink/HttpSinkProvider.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink;
+
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_CACHE_COMMIT_INTERVAL;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.security.KeyStore;
+import java.util.Collection;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source.InternalSourceProvider;
+import org.apache.http.client.utils.URIBuilder;
+import org.codehaus.jackson.map.AnnotationIntrospector;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+
+public class HttpSinkProvider implements ExternalSinkProvider {
+  private static final Log LOG = LogFactory.getLog(HttpSinkProvider.class);
+  TimelineMetricConfiguration conf = TimelineMetricConfiguration.getInstance();
+
+  private String connectUrl;
+  private SSLSocketFactory sslSocketFactory;
+  protected static ObjectMapper mapper;
+
+  static {
+    mapper = new ObjectMapper();
+    AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
+    mapper.setAnnotationIntrospector(introspector);
+    mapper.getSerializationConfig()
+      .withSerializationInclusion(JsonSerialize.Inclusion.NON_NULL);
+  }
+
+  public HttpSinkProvider() {
+    Configuration config;
+    try {
+      config = conf.getMetricsConf();
+    } catch (Exception e) {
+      throw new ExceptionInInitializerError("Unable to read configuration for 
sink.");
+    }
+    String protocol = 
config.get("timeline.metrics.service.external.http.sink.protocol", "http");
+    String host = 
config.get("timeline.metrics.service.external.http.sink.host", "localhost");
+    String port = 
config.get("timeline.metrics.service.external.http.sink.port", "6189");
+
+    if (protocol.contains("https")) {
+      loadTruststore(
+        
config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.path"),
+        
config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.type"),
+        
config.getTrimmed("timeline.metrics.service.external.http.sink.truststore.password")
+      );
+    }
+
+    URIBuilder uriBuilder = new URIBuilder();
+    uriBuilder.setScheme(protocol);
+    uriBuilder.setHost(host);
+    uriBuilder.setPort(Integer.parseInt(port));
+    connectUrl = uriBuilder.toString();
+  }
+
+  @Override
+  public ExternalMetricsSink 
getExternalMetricsSink(InternalSourceProvider.SOURCE_NAME sourceName) {
+    return null;
+  }
+
+  protected HttpURLConnection getConnection(String spec) throws IOException {
+    return (HttpURLConnection) new URL(spec).openConnection();
+  }
+
+  // Get an ssl connection
+  protected HttpsURLConnection getSSLConnection(String spec)
+    throws IOException, IllegalStateException {
+
+    HttpsURLConnection connection = (HttpsURLConnection) (new 
URL(spec).openConnection());
+    connection.setSSLSocketFactory(sslSocketFactory);
+    return connection;
+  }
+
+  protected void loadTruststore(String trustStorePath, String trustStoreType,
+                                String trustStorePassword) {
+    if (sslSocketFactory == null) {
+      if (trustStorePath == null || trustStorePassword == null) {
+        String msg = "Can't load TrustStore. Truststore path or password is 
not set.";
+        LOG.error(msg);
+        throw new IllegalStateException(msg);
+      }
+      FileInputStream in = null;
+      try {
+        in = new FileInputStream(new File(trustStorePath));
+        KeyStore store = KeyStore.getInstance(trustStoreType == null ?
+          KeyStore.getDefaultType() : trustStoreType);
+        store.load(in, trustStorePassword.toCharArray());
+        TrustManagerFactory tmf = TrustManagerFactory
+          .getInstance(TrustManagerFactory.getDefaultAlgorithm());
+        tmf.init(store);
+        SSLContext context = SSLContext.getInstance("TLS");
+        context.init(null, tmf.getTrustManagers(), null);
+        sslSocketFactory = context.getSocketFactory();
+      } catch (Exception e) {
+        LOG.error("Unable to load TrustStore", e);
+      } finally {
+        if (in != null) {
+          try {
+            in.close();
+          } catch (IOException e) {
+            LOG.error("Unable to load TrustStore", e);
+          }
+        }
+      }
+    }
+  }
+
+  class DefaultHttpMetricsSink implements ExternalMetricsSink {
+
+    @Override
+    public int getSinkTimeOutSeconds() {
+      try {
+        return 
conf.getMetricsConf().getInt("timeline.metrics.service.external.http.sink.timeout.seconds",
 10);
+      } catch (Exception e) {
+        return 10;
+      }
+    }
+
+    @Override
+    public int getFlushSeconds() {
+      try {
+        return 
conf.getMetricsConf().getInt(TIMELINE_METRICS_CACHE_COMMIT_INTERVAL, 3);
+      } catch (Exception e) {
+        LOG.warn("Cannot read cache commit interval.");
+      }
+      return 3;
+    }
+
+    /**
+     * Cleans up and closes an input stream
+     * see 
http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
+     * @param is the InputStream to clean up
+     * @return string read from the InputStream
+     * @throws IOException
+     */
+    protected String cleanupInputStream(InputStream is) throws IOException {
+      StringBuilder sb = new StringBuilder();
+      if (is != null) {
+        try (
+          InputStreamReader isr = new InputStreamReader(is);
+          BufferedReader br = new BufferedReader(isr)
+        ) {
+          // read the response body
+          String line;
+          while ((line = br.readLine()) != null) {
+            if (LOG.isDebugEnabled()) {
+              sb.append(line);
+            }
+          }
+        } finally {
+          is.close();
+        }
+      }
+      return sb.toString();
+    }
+
+    @Override
+    public void sinkMetricData(Collection<TimelineMetrics> metrics) {
+      HttpURLConnection connection = null;
+      try {
+        connection = connectUrl.startsWith("https") ? 
getSSLConnection(connectUrl) : getConnection(connectUrl);
+
+        connection.setRequestMethod("POST");
+        connection.setRequestProperty("Content-Type", "application/json");
+        connection.setRequestProperty("Connection", "Keep-Alive");
+        connection.setConnectTimeout(getSinkTimeOutSeconds());
+        connection.setReadTimeout(getSinkTimeOutSeconds());
+        connection.setDoOutput(true);
+
+        if (metrics != null) {
+          String jsonData = mapper.writeValueAsString(metrics);
+          try (OutputStream os = connection.getOutputStream()) {
+            os.write(jsonData.getBytes("UTF-8"));
+          }
+        }
+
+        int statusCode = connection.getResponseCode();
+
+        if (statusCode != 200) {
+          LOG.info("Unable to POST metrics to external sink, " + connectUrl +
+            ", statusCode = " + statusCode);
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Metrics posted to external sink " + connectUrl);
+          }
+        }
+        cleanupInputStream(connection.getInputStream());
+
+      } catch (IOException io) {
+        LOG.warn("Unable to sink data to external system.", io);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
new file mode 100644
index 0000000..b97c39f
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/DefaultInternalMetricsSourceProvider.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink;
+
+public class DefaultInternalMetricsSourceProvider implements 
InternalSourceProvider {
+  private static final Log LOG = 
LogFactory.getLog(DefaultInternalMetricsSourceProvider.class);
+
+  // TODO: Implement read based sources for higher level data
+  @Override
+  public InternalMetricsSource getInternalMetricsSource(SOURCE_NAME 
sourceName, int sinkIntervalSeconds, ExternalMetricsSink sink) {
+    if (sink == null) {
+      LOG.warn("No external sink configured for source " + sourceName);
+      return null;
+    }
+
+    switch (sourceName) {
+      case RAW_METRICS:
+        return new RawMetricsSource(sinkIntervalSeconds, sink);
+      default:
+        throw new UnsupportedOperationException("Unimplemented source type " + 
sourceName);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalMetricsSource.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalMetricsSource.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalMetricsSource.java
new file mode 100644
index 0000000..a6e1092
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalMetricsSource.java
@@ -0,0 +1,30 @@
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source;
+
+import java.util.Collection;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface InternalMetricsSource {
+  /**
+   * Write metrics to external sink.
+   * Allows pre-processing and caching capabilities to the consumer.
+   */
+  void publishTimelineMetrics(Collection<TimelineMetrics> metrics);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c32eebf8/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalSourceProvider.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalSourceProvider.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalSourceProvider.java
new file mode 100644
index 0000000..9d8ca36
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/source/InternalSourceProvider.java
@@ -0,0 +1,39 @@
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.source;
+
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface InternalSourceProvider {
+
+  enum SOURCE_NAME {
+    RAW_METRICS,
+    MINUTE_HOST_AGGREAGATE_METRICS,
+    HOURLY_HOST_AGGREAGATE_METRICS,
+    DAILY_HOST_AGGREAGATE_METRICS,
+    MINUTE_CLUSTER_AGGREAGATE_METRICS,
+    HOURLY_CLUSTER_AGGREAGATE_METRICS,
+    DAILY_CLUSTER_AGGREAGATE_METRICS,
+  }
+
+  /**
+   * Provide Source for metrics data.
+   * @return {@link InternalMetricsSource}
+   */
+  InternalMetricsSource getInternalMetricsSource(SOURCE_NAME sourceName, int 
sinkIntervalSeconds, ExternalMetricsSink sink);
+}

Reply via email to