http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 933bdf0..034fb2f 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -36,7 +36,7 @@ import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
@@ -45,7 +45,9 @@ import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
+
 import java.io.IOException;
+import java.net.UnknownHostException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -56,8 +58,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
@@ -75,7 +78,7 @@ public class HBaseTimelineMetricStore extends AbstractService 
implements Timelin
   private final Map<AggregationTaskRunner.AGGREGATOR_NAME, 
ScheduledExecutorService> scheduledExecutors = new HashMap<>();
   private TimelineMetricMetadataManager metricMetadataManager;
   private Integer defaultTopNHostsLimit;
-  private TimelineMetricHAController haController;
+  private MetricCollectorHAController haController;
 
   /**
    * Construct the service.
@@ -106,7 +109,7 @@ public class HBaseTimelineMetricStore extends 
AbstractService implements Timelin
       // Start HA service
       if (configuration.isDistributedOperationModeEnabled()) {
         // Start the controller
-        haController = new TimelineMetricHAController(configuration);
+        haController = new MetricCollectorHAController(configuration);
         try {
           haController.initializeHAController();
         } catch (Exception e) {
@@ -384,7 +387,16 @@ public class HBaseTimelineMetricStore extends 
AbstractService implements Timelin
 
   @Override
   public List<String> getLiveInstances() {
-    return haController.getLiveInstanceHostNames();
+    List<String> instances = haController.getLiveInstanceHostNames();
+    if (instances == null || instances.isEmpty()) {
+      try {
+        // Always return current host as live (embedded operation mode)
+        instances = 
Collections.singletonList(configuration.getInstanceHostnameFromEnv());
+      } catch (UnknownHostException e) {
+        LOG.debug("Exception on getting hostname from env.", e);
+      }
+    }
+    return instances;
   }
 
   private void scheduleAggregatorThread(final TimelineMetricAggregator 
aggregator,

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index 04f5c1c..de63d4e 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
@@ -92,7 +92,7 @@ public abstract class AbstractTimelineAggregator implements 
TimelineMetricAggreg
                                     String tableName,
                                     String outputTableName,
                                     Long nativeTimeRangeDelay,
-                                    TimelineMetricHAController haController) {
+                                    MetricCollectorHAController haController) {
     this(aggregatorName, hBaseAccessor, metricsConf);
     this.checkpointLocation = checkpointLocation;
     this.sleepIntervalMillis = sleepIntervalMillis;

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
index 4c44f9e..2eb3553 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregatorFactory.java
@@ -20,7 +20,7 @@ package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.commons.io.FilenameUtils;
 import org.apache.hadoop.conf.Configuration;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -95,7 +95,7 @@ public class TimelineMetricAggregatorFactory {
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
     (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-     TimelineMetricHAController haController) {
+     MetricCollectorHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -145,7 +145,7 @@ public class TimelineMetricAggregatorFactory {
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
     (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-     TimelineMetricHAController haController) {
+     MetricCollectorHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -195,7 +195,7 @@ public class TimelineMetricAggregatorFactory {
    */
   public static TimelineMetricAggregator createTimelineMetricAggregatorDaily
     (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-     TimelineMetricHAController haController) {
+     MetricCollectorHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -247,7 +247,7 @@ public class TimelineMetricAggregatorFactory {
   public static TimelineMetricAggregator createTimelineClusterAggregatorSecond(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
     TimelineMetricMetadataManager metadataManager,
-    TimelineMetricHAController haController) {
+    MetricCollectorHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -291,7 +291,7 @@ public class TimelineMetricAggregatorFactory {
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorMinute(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricHAController haController) {
+    MetricCollectorHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -344,7 +344,7 @@ public class TimelineMetricAggregatorFactory {
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorHourly(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricHAController haController) {
+    MetricCollectorHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
@@ -397,7 +397,7 @@ public class TimelineMetricAggregatorFactory {
    */
   public static TimelineMetricAggregator createTimelineClusterAggregatorDaily(
     PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
-    TimelineMetricHAController haController) {
+    MetricCollectorHAController haController) {
 
     String checkpointDir = metricsConf.get(
       TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index 6438256..02677b9 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -20,7 +20,7 @@ package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline
 import org.apache.hadoop.conf.Configuration;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 
@@ -48,7 +48,7 @@ public class TimelineMetricClusterAggregator extends 
AbstractTimelineAggregator
                                          String inputTableName,
                                          String outputTableName,
                                          Long nativeTimeRangeDelay,
-                                         TimelineMetricHAController 
haController) {
+                                         MetricCollectorHAController 
haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier,
       hostAggregatorDisabledParam, inputTableName, outputTableName,

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index 98b3987..6f3d8bc 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -26,7 +26,7 @@ import 
org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
@@ -76,7 +76,7 @@ public class TimelineMetricClusterAggregatorSecond extends 
AbstractTimelineAggre
                                                String outputTableName,
                                                Long nativeTimeRangeDelay,
                                                Long timeSliceInterval,
-                                               TimelineMetricHAController 
haController) {
+                                               MetricCollectorHAController 
haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier, aggregatorDisabledParam,
       tableName, outputTableName, nativeTimeRangeDelay, haController);

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
index 364a4b5..0ea9c08 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
 
@@ -49,7 +49,7 @@ public class TimelineMetricHostAggregator extends 
AbstractTimelineAggregator {
                                       String tableName,
                                       String outputTableName,
                                       Long nativeTimeRangeDelay,
-                                      TimelineMetricHAController haController) 
{
+                                      MetricCollectorHAController 
haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier, 
hostAggregatorDisabledParam,
       tableName, outputTableName, nativeTimeRangeDelay, haController);

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
index bbb9991..c7b605f 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
 
@@ -46,7 +46,7 @@ public class TimelineMetricClusterAggregator extends 
AbstractTimelineAggregator
                                          String inputTableName,
                                          String outputTableName,
                                          Long nativeTimeRangeDelay,
-                                         TimelineMetricHAController 
haController) {
+                                         MetricCollectorHAController 
haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier,
       hostAggregatorDisabledParam, inputTableName, outputTableName,

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
index c071708..57a3034 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.EmptyCondition;
 
@@ -44,7 +44,7 @@ public class TimelineMetricHostAggregator extends 
AbstractTimelineAggregator {
                                       String tableName,
                                       String outputTableName,
                                       Long nativeTimeRangeDelay,
-                                      TimelineMetricHAController haController) 
{
+                                      MetricCollectorHAController 
haController) {
     super(aggregatorName, hBaseAccessor, metricsConf, checkpointLocation,
       sleepIntervalMillis, checkpointCutOffMultiplier, 
hostAggregatorDisabledParam,
       tableName, outputTableName, nativeTimeRangeDelay, haController);

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
index fcd26bd..07d4e2e 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/AggregationTaskRunner.java
@@ -38,9 +38,9 @@ import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_DAILY;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_HOURLY;
 import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME.METRIC_RECORD_MINUTE;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.CLUSTER_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.METRIC_AGGREGATORS;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.STATE_MODEL_NAME;
 
 public class AggregationTaskRunner {
   private final String instanceName;

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
new file mode 100644
index 0000000..18b9059
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricsSystemInitializationException;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.controller.GenericHelixController;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.OnlineOfflineSMD;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO;
+
+public class MetricCollectorHAController {
+  private static final Log LOG = 
LogFactory.getLog(MetricCollectorHAController.class);
+
+  static final String CLUSTER_NAME = "ambari-metrics-cluster";
+  static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
+  static final String STATE_MODEL_NAME = OnlineOfflineSMD.name;
+  static final String INSTANCE_NAME_DELIMITER = "_";
+
+  final String zkConnectUrl;
+  final String instanceHostname;
+  final InstanceConfig instanceConfig;
+  final AggregationTaskRunner aggregationTaskRunner;
+
+  // Cache list of known live instances
+  final List<String> liveInstanceNames = new ArrayList<>();
+
+  // Helix Admin
+  HelixAdmin admin;
+  // Helix Manager
+  HelixManager manager;
+
+  private volatile boolean isInitialized = false;
+
+  public MetricCollectorHAController(TimelineMetricConfiguration 
configuration) {
+    String instancePort;
+    try {
+      instanceHostname = configuration.getInstanceHostnameFromEnv();
+      instancePort = configuration.getInstancePort();
+
+    } catch (Exception e) {
+      LOG.error("Error reading configs from classpath, will resort to 
defaults.", e);
+      throw new MetricsSystemInitializationException(e.getMessage());
+    }
+
+    try {
+      String zkClientPort = configuration.getZKClientPort();
+      String zkQuorum = configuration.getZKQuorum();
+
+      if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) {
+        throw new Exception("Unable to parse zookeeper quorum. clientPort = "
+          + zkClientPort +", quorum = " + zkQuorum);
+      }
+
+      zkConnectUrl = getZkConnectionUrl(zkClientPort, zkQuorum);
+
+    } catch (Exception e) {
+      LOG.error("Unable to load hbase-site from classpath.", e);
+      throw new MetricsSystemInitializationException(e.getMessage());
+    }
+
+    instanceConfig = new InstanceConfig(instanceHostname + 
INSTANCE_NAME_DELIMITER + instancePort);
+    instanceConfig.setHostName(instanceHostname);
+    instanceConfig.setPort(instancePort);
+    instanceConfig.setInstanceEnabled(true);
+    aggregationTaskRunner = new 
AggregationTaskRunner(instanceConfig.getInstanceName(), zkConnectUrl);
+  }
+
+  /**
+   * Initialize the instance with zookeeper via Helix
+   */
+  public void initializeHAController() throws Exception {
+    admin = new ZKHelixAdmin(zkConnectUrl);
+    // create cluster
+    LOG.info("Creating zookeeper cluster node: " + CLUSTER_NAME);
+    admin.addCluster(CLUSTER_NAME, false);
+
+    // Adding host to the cluster
+    List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
+    if (nodes == null || !nodes.contains(instanceConfig.getInstanceName())) {
+      LOG.info("Adding participant instance " + instanceConfig);
+      admin.addInstance(CLUSTER_NAME, instanceConfig);
+    }
+
+    // Add a state model
+    if (admin.getStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME) == null) {
+      LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
+      admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, 
OnlineOfflineSMD.build());
+    }
+
+    // Add resources with 1 cluster-wide replica
+    // Since our aggregators are unbalanced in terms of work distribution we
+    // only need to distribute writes to METRIC_AGGREGATE and
+    // METRIC_RECORD_MINUTE
+    List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME);
+    if (!resources.contains(METRIC_AGGREGATORS)) {
+      LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions 
and 1 replicas");
+      admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, 2, 
OnlineOfflineSMD.name, FULL_AUTO.toString());
+    }
+    // this will set up the ideal state, it calculates the preference list for
+    // each partition similar to consistent hashing
+    admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);
+
+    // Start participant
+    startAggregators();
+
+    // Start controller
+    startController();
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      @Override
+      public void run() {
+        aggregationTaskRunner.stop();
+        manager.disconnect();
+      }
+    });
+
+    isInitialized = true;
+  }
+
+  /**
+   * Return true if HA controller is enabled.
+   */
+  public boolean isInitialized() {
+    return isInitialized;
+  }
+
+  private void startAggregators() {
+    try {
+      aggregationTaskRunner.initialize();
+
+    } catch (Exception e) {
+      LOG.error("Unable to start aggregators.", e);
+      throw new MetricsSystemInitializationException(e.getMessage());
+    }
+  }
+
+  private void startController() throws Exception {
+    manager = HelixManagerFactory.getZKHelixManager(
+      CLUSTER_NAME,
+      instanceHostname,
+      InstanceType.CONTROLLER,
+      zkConnectUrl
+    );
+
+    manager.connect();
+    HelixController controller = new HelixController();
+    manager.addLiveInstanceChangeListener(controller);
+  }
+
+  private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
+    StringBuilder sb = new StringBuilder();
+    String[] quorumParts = zkQuorum.split(",");
+    String prefix = "";
+    for (String part : quorumParts) {
+      sb.append(prefix);
+      sb.append(part.trim());
+      if (!part.contains(":")) {
+        sb.append(":");
+        sb.append(zkClientPort);
+      }
+      prefix = ",";
+    }
+
+    return sb.toString();
+  }
+
+  public AggregationTaskRunner getAggregationTaskRunner() {
+    return aggregationTaskRunner;
+  }
+
+  public List<String> getLiveInstanceHostNames() {
+    List<String> liveInstanceHostNames = new ArrayList<>();
+
+    for (String instance : liveInstanceNames) {
+      liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]);
+    }
+
+    return liveInstanceHostNames;
+  }
+
+  public class HelixController extends GenericHelixController {
+    ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
+    Joiner joiner = Joiner.on(", ").skipNulls();
+
+    @Override
+    public void onLiveInstanceChange(List<LiveInstance> liveInstances, 
NotificationContext changeContext) {
+      super.onLiveInstanceChange(liveInstances, changeContext);
+
+      liveInstanceNames.clear();
+      for (LiveInstance instance : liveInstances) {
+        liveInstanceNames.add(instance.getInstanceName());
+      }
+
+      LOG.info("Detected change in liveliness of Collector instances. " +
+        "LiveIsntances = " + joiner.join(liveInstanceNames));
+      // Print HA state - after some delay
+      executorService.schedule(new Runnable() {
+        @Override
+        public void run() {
+          printClusterState();
+        }
+      }, 30, TimeUnit.SECONDS);
+
+
+    }
+  }
+
+  public void printClusterState() {
+    StringBuilder sb = new StringBuilder("\n######################### Cluster 
HA state ########################");
+
+    ExternalView resourceExternalView = 
admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
+    if (resourceExternalView != null) {
+      getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb);
+    }
+    sb.append("\n##################################################");
+    LOG.info(sb.toString());
+  }
+
+  private void getPrintableResourceState(ExternalView resourceExternalView,
+                                         String resourceName,
+                                         StringBuilder sb) {
+    TreeSet<String> sortedSet = new 
TreeSet<>(resourceExternalView.getPartitionSet());
+    sb.append("\nCLUSTER: ");
+    sb.append(CLUSTER_NAME);
+    sb.append("\nRESOURCE: ");
+    sb.append(resourceName);
+    for (String partitionName : sortedSet) {
+      sb.append("\nPARTITION: ");
+      sb.append(partitionName).append("\t");
+      Map<String, String> states = 
resourceExternalView.getStateMap(partitionName);
+      for (Map.Entry<String, String> stateEntry : states.entrySet()) {
+        sb.append("\t");
+        sb.append(stateEntry.getKey());
+        sb.append("\t");
+        sb.append(stateEntry.getValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java
deleted file mode 100644
index 53b9e7e..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAController.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
-
-import com.google.common.base.Joiner;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricsSystemInitializationException;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
-import org.apache.helix.HelixAdmin;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.NotificationContext;
-import org.apache.helix.controller.GenericHelixController;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.OnlineOfflineSMD;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.helix.model.IdealState.RebalanceMode.FULL_AUTO;
-
-public class TimelineMetricHAController {
-  private static final Log LOG = 
LogFactory.getLog(TimelineMetricHAController.class);
-
-  static final String CLUSTER_NAME = "ambari-metrics-cluster";
-  static final String METRIC_AGGREGATORS = "METRIC_AGGREGATORS";
-  static final String STATE_MODEL_NAME = OnlineOfflineSMD.name;
-  static final String INSTANCE_NAME_DELIMITER = "_";
-
-  final String zkConnectUrl;
-  final String instanceHostname;
-  final InstanceConfig instanceConfig;
-  final AggregationTaskRunner aggregationTaskRunner;
-
-  // Cache list of known live instances
-  final List<String> liveInstanceNames = new ArrayList<>();
-
-  // Helix Admin
-  HelixAdmin admin;
-  // Helix Manager
-  HelixManager manager;
-
-  private volatile boolean isInitialized = false;
-
-  public TimelineMetricHAController(TimelineMetricConfiguration configuration) 
{
-    String instancePort;
-    try {
-      instanceHostname = configuration.getInstanceHostnameFromEnv();
-      instancePort = configuration.getInstancePort();
-
-    } catch (Exception e) {
-      LOG.error("Error reading configs from classpath, will resort to 
defaults.", e);
-      throw new MetricsSystemInitializationException(e.getMessage());
-    }
-
-    try {
-      String zkClientPort = configuration.getZKClientPort();
-      String zkQuorum = configuration.getZKQuorum();
-
-      if (StringUtils.isEmpty(zkClientPort) || StringUtils.isEmpty(zkQuorum)) {
-        throw new Exception("Unable to parse zookeeper quorum. clientPort = "
-          + zkClientPort +", quorum = " + zkQuorum);
-      }
-
-      zkConnectUrl = getZkConnectionUrl(zkClientPort, zkQuorum);
-
-    } catch (Exception e) {
-      LOG.error("Unable to load hbase-site from classpath.", e);
-      throw new MetricsSystemInitializationException(e.getMessage());
-    }
-
-    instanceConfig = new InstanceConfig(instanceHostname + 
INSTANCE_NAME_DELIMITER + instancePort);
-    instanceConfig.setHostName(instanceHostname);
-    instanceConfig.setPort(instancePort);
-    instanceConfig.setInstanceEnabled(true);
-    aggregationTaskRunner = new 
AggregationTaskRunner(instanceConfig.getInstanceName(), zkConnectUrl);
-  }
-
-  /**
-   * Initialize the instance with zookeeper via Helix
-   */
-  public void initializeHAController() throws Exception {
-    admin = new ZKHelixAdmin(zkConnectUrl);
-    // create cluster
-    LOG.info("Creating zookeeper cluster node: " + CLUSTER_NAME);
-    admin.addCluster(CLUSTER_NAME, false);
-
-    // Adding host to the cluster
-    List<String> nodes = admin.getInstancesInCluster(CLUSTER_NAME);
-    if (nodes == null || !nodes.contains(instanceConfig.getInstanceName())) {
-      LOG.info("Adding participant instance " + instanceConfig);
-      admin.addInstance(CLUSTER_NAME, instanceConfig);
-    }
-
-    // Add a state model
-    if (admin.getStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME) == null) {
-      LOG.info("Adding ONLINE-OFFLINE state model to the cluster");
-      admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, 
OnlineOfflineSMD.build());
-    }
-
-    // Add resources with 1 cluster-wide replica
-    // Since our aggregators are unbalanced in terms of work distribution we
-    // only need to distribute writes to METRIC_AGGREGATE and
-    // METRIC_RECORD_MINUTE
-    List<String> resources = admin.getResourcesInCluster(CLUSTER_NAME);
-    if (!resources.contains(METRIC_AGGREGATORS)) {
-      LOG.info("Adding resource " + METRIC_AGGREGATORS + " with 2 partitions 
and 1 replicas");
-      admin.addResource(CLUSTER_NAME, METRIC_AGGREGATORS, 2, 
OnlineOfflineSMD.name, FULL_AUTO.toString());
-    }
-    // this will set up the ideal state, it calculates the preference list for
-    // each partition similar to consistent hashing
-    admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);
-
-    // Start participant
-    startAggregators();
-
-    // Start controller
-    startController();
-
-    Runtime.getRuntime().addShutdownHook(new Thread() {
-      @Override
-      public void run() {
-        aggregationTaskRunner.stop();
-        manager.disconnect();
-      }
-    });
-
-    isInitialized = true;
-  }
-
-  /**
-   * Return true if HA controller is enabled.
-   */
-  public boolean isInitialized() {
-    return isInitialized;
-  }
-
-  private void startAggregators() {
-    try {
-      aggregationTaskRunner.initialize();
-
-    } catch (Exception e) {
-      LOG.error("Unable to start aggregators.", e);
-      throw new MetricsSystemInitializationException(e.getMessage());
-    }
-  }
-
-  private void startController() throws Exception {
-    manager = HelixManagerFactory.getZKHelixManager(
-      CLUSTER_NAME,
-      instanceHostname,
-      InstanceType.CONTROLLER,
-      zkConnectUrl
-    );
-
-    manager.connect();
-    HelixController controller = new HelixController();
-    manager.addLiveInstanceChangeListener(controller);
-  }
-
-  private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
-    StringBuilder sb = new StringBuilder();
-    String[] quorumParts = zkQuorum.split(",");
-    String prefix = "";
-    for (String part : quorumParts) {
-      sb.append(prefix);
-      sb.append(part.trim());
-      if (!part.contains(":")) {
-        sb.append(":");
-        sb.append(zkClientPort);
-      }
-      prefix = ",";
-    }
-
-    return sb.toString();
-  }
-
-  public AggregationTaskRunner getAggregationTaskRunner() {
-    return aggregationTaskRunner;
-  }
-
-  public List<String> getLiveInstanceHostNames() {
-    List<String> liveInstanceHostNames = new ArrayList<>();
-
-    for (String instance : liveInstanceNames) {
-      liveInstanceHostNames.add(instance.split(INSTANCE_NAME_DELIMITER)[0]);
-    }
-
-    return liveInstanceHostNames;
-  }
-
-  public class HelixController extends GenericHelixController {
-    ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
-    Joiner joiner = Joiner.on(", ").skipNulls();
-
-    @Override
-    public void onLiveInstanceChange(List<LiveInstance> liveInstances, 
NotificationContext changeContext) {
-      super.onLiveInstanceChange(liveInstances, changeContext);
-
-      liveInstanceNames.clear();
-      for (LiveInstance instance : liveInstances) {
-        liveInstanceNames.add(instance.getInstanceName());
-      }
-
-      LOG.info("Detected change in liveliness of Collector instances. " +
-        "LiveIsntances = " + joiner.join(liveInstanceNames));
-      // Print HA state - after some delay
-      executorService.schedule(new Runnable() {
-        @Override
-        public void run() {
-          printClusterState();
-        }
-      }, 30, TimeUnit.SECONDS);
-
-
-    }
-  }
-
-  public void printClusterState() {
-    StringBuilder sb = new StringBuilder("\n######################### Cluster 
HA state ########################");
-
-    ExternalView resourceExternalView = 
admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
-    if (resourceExternalView != null) {
-      getPrintableResourceState(resourceExternalView, METRIC_AGGREGATORS, sb);
-    }
-    sb.append("\n##################################################");
-    LOG.info(sb.toString());
-  }
-
-  private void getPrintableResourceState(ExternalView resourceExternalView,
-                                         String resourceName,
-                                         StringBuilder sb) {
-    TreeSet<String> sortedSet = new 
TreeSet<>(resourceExternalView.getPartitionSet());
-    sb.append("\nCLUSTER: ");
-    sb.append(CLUSTER_NAME);
-    sb.append("\nRESOURCE: ");
-    sb.append(resourceName);
-    for (String partitionName : sortedSet) {
-      sb.append("\nPARTITION: ");
-      sb.append(partitionName).append("\t");
-      Map<String, String> states = 
resourceExternalView.getStateMap(partitionName);
-      for (Map.Entry<String, String> stateEntry : states.entrySet()) {
-        sb.append("\t");
-        sb.append(stateEntry.getKey());
-        sb.append("\t");
-        sb.append(stateEntry.getValue());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 4f54284..6b15e29 100644
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -411,6 +411,14 @@ public class TimelineWebServices {
     }
   }
 
+  /**
+   * This is a discovery endpoint that advertises known live collector
+   * instances. Note: It will always answer with current instance as live.
+   * This can be utilized as a liveliness pinger endpoint since the instance
+   * names are cached and thereby no synchronous calls result from this API
+   *
+   * @return List<String> hostnames</String>
+   */
   @GET
   @Path("/metrics/livenodes")
   @Produces({ MediaType.APPLICATION_JSON })

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
new file mode 100644
index 0000000..91ec305
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAControllerTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
+
+import junit.framework.Assert;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
+import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.CLUSTER_NAME;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.METRIC_AGGREGATORS;
+import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController.STATE_MODEL_NAME;
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+
+public class MetricCollectorHAControllerTest extends 
AbstractMiniHBaseClusterTest {
+  TimelineMetricConfiguration configuration;
+
+  @Before
+  public void setup() throws Exception {
+    configuration = createNiceMock(TimelineMetricConfiguration.class);
+
+    expect(configuration.getInstanceHostnameFromEnv()).andReturn("h1");
+    expect(configuration.getInstancePort()).andReturn("12000");
+    // jdbc:phoenix:localhost:52887:/hbase;test=true
+    String zkUrl = getUrl();
+    String port = zkUrl.split(":")[3];
+    String quorum = zkUrl.split(":")[2];
+
+    expect(configuration.getZKClientPort()).andReturn(port);
+    expect(configuration.getZKQuorum()).andReturn(quorum);
+
+    replay(configuration);
+  }
+
+  @Test(timeout = 180000)
+  public void testHAControllerDistributedAggregation() throws Exception {
+    MetricCollectorHAController haController = new 
MetricCollectorHAController(configuration);
+    haController.initializeHAController();
+    // Wait for task assignment
+    Thread.sleep(10000);
+
+    Assert.assertTrue(haController.isInitialized());
+    Assert.assertEquals(1, haController.getLiveInstanceHostNames().size());
+    
Assert.assertTrue(haController.getAggregationTaskRunner().performsClusterAggregation());
+    
Assert.assertTrue(haController.getAggregationTaskRunner().performsHostAggregation());
+
+    // Add new instance
+    InstanceConfig instanceConfig2 = new InstanceConfig("h2_12001");
+    haController.admin.addInstance(CLUSTER_NAME, instanceConfig2);
+    HelixManager manager2 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
+      instanceConfig2.getInstanceName(),
+      InstanceType.PARTICIPANT, haController.zkConnectUrl);
+    
manager2.getStateMachineEngine().registerStateModelFactory(STATE_MODEL_NAME,
+      new OnlineOfflineStateModelFactory(instanceConfig2.getInstanceName(),
+        new AggregationTaskRunner(instanceConfig2.getInstanceName(), "")));
+    manager2.connect();
+    haController.admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);
+
+    // Wait on re-assignment of partitions
+    Thread.sleep(10000);
+    Assert.assertEquals(2, haController.getLiveInstanceHostNames().size());
+
+    ExternalView view = 
haController.admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
+
+    Map<String, String> partitionInstanceMap = new HashMap<>();
+
+    for (String partition : view.getPartitionSet()) {
+      Map<String, String> states = view.getStateMap(partition);
+      // (instance, state) pairs
+      for (Map.Entry<String, String> stateEntry : states.entrySet()) {
+        partitionInstanceMap.put(partition, stateEntry.getKey());
+        Assert.assertEquals("ONLINE", stateEntry.getValue());
+      }
+    }
+    // Re-assigned partitions
+    Assert.assertEquals(2, partitionInstanceMap.size());
+
+    haController.getAggregationTaskRunner().stop();
+    haController.manager.disconnect();
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java
 
b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java
deleted file mode 100644
index 04e8909..0000000
--- 
a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/TimelineMetricHAControllerTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability;
-
-import junit.framework.Assert;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
-import 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.InstanceConfig;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.CLUSTER_NAME;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.METRIC_AGGREGATORS;
-import static 
org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.TimelineMetricHAController.STATE_MODEL_NAME;
-import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-
-public class TimelineMetricHAControllerTest extends 
AbstractMiniHBaseClusterTest {
-  TimelineMetricConfiguration configuration;
-
-  @Before
-  public void setup() throws Exception {
-    configuration = createNiceMock(TimelineMetricConfiguration.class);
-
-    expect(configuration.getInstanceHostnameFromEnv()).andReturn("h1");
-    expect(configuration.getInstancePort()).andReturn("12000");
-    // jdbc:phoenix:localhost:52887:/hbase;test=true
-    String zkUrl = getUrl();
-    String port = zkUrl.split(":")[3];
-    String quorum = zkUrl.split(":")[2];
-
-    expect(configuration.getZKClientPort()).andReturn(port);
-    expect(configuration.getZKQuorum()).andReturn(quorum);
-
-    replay(configuration);
-  }
-
-  @Test(timeout = 150000)
-  public void testHAControllerDistributedAggregation() throws Exception {
-    TimelineMetricHAController haController = new 
TimelineMetricHAController(configuration);
-    haController.initializeHAController();
-    // Wait for task assignment
-    Thread.sleep(10000);
-
-    Assert.assertTrue(haController.isInitialized());
-    Assert.assertEquals(1, haController.getLiveInstanceHostNames().size());
-    
Assert.assertTrue(haController.getAggregationTaskRunner().performsClusterAggregation());
-    
Assert.assertTrue(haController.getAggregationTaskRunner().performsHostAggregation());
-
-    // Add new instance
-    InstanceConfig instanceConfig2 = new InstanceConfig("h2_12001");
-    haController.admin.addInstance(CLUSTER_NAME, instanceConfig2);
-    HelixManager manager2 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME,
-      instanceConfig2.getInstanceName(),
-      InstanceType.PARTICIPANT, haController.zkConnectUrl);
-    
manager2.getStateMachineEngine().registerStateModelFactory(STATE_MODEL_NAME,
-      new OnlineOfflineStateModelFactory(instanceConfig2.getInstanceName(),
-        new AggregationTaskRunner(instanceConfig2.getInstanceName(), "")));
-    manager2.connect();
-    haController.admin.rebalance(CLUSTER_NAME, METRIC_AGGREGATORS, 1);
-
-    // Wait on re-assignment of partitions
-    Thread.sleep(10000);
-    Assert.assertEquals(2, haController.getLiveInstanceHostNames().size());
-
-    ExternalView view = 
haController.admin.getResourceExternalView(CLUSTER_NAME, METRIC_AGGREGATORS);
-
-    Map<String, String> partitionInstanceMap = new HashMap<>();
-
-    for (String partition : view.getPartitionSet()) {
-      Map<String, String> states = view.getStateMap(partition);
-      // (instance, state) pairs
-      for (Map.Entry<String, String> stateEntry : states.entrySet()) {
-        partitionInstanceMap.put(partition, stateEntry.getKey());
-        Assert.assertEquals("ONLINE", stateEntry.getValue());
-      }
-    }
-    // Re-assigned partitions
-    Assert.assertEquals(2, partitionInstanceMap.size());
-
-    haController.getAggregationTaskRunner().stop();
-    haController.manager.disconnect();
-  }
-}

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
 
b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
index be50524..3203a52 100644
--- 
a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
+++ 
b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/scripts/params.py
@@ -86,6 +86,7 @@ if config.has_key('hostname'):
 
 ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", [])
 has_metric_collector = not len(ams_collector_hosts) == 0
+metric_collector_port = None
 if has_metric_collector:
   if 'cluster-env' in config['configurations'] and \
       'metrics_collector_vip_host' in config['configurations']['cluster-env']:
@@ -111,3 +112,22 @@ if has_metric_collector:
   pass
 metrics_report_interval = 
default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = 
default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
+
+#Collector hosts
+metric_collector_hosts = None
+if ams_collector_hosts:
+  for host in ams_collector_hosts:
+    metric_collector_hosts += host + ':' + metric_collector_port + ','
+  metric_collector_hosts = metric_collector_hosts[:-1]
+
+# Cluster Zookeeper quorum
+zookeeper_quorum = None
+if not len(default("/clusterHostInfo/zookeeper_hosts", [])) == 0:
+  if 'zoo.cfg' in config['configurations'] and 'clientPort' in 
config['configurations']['zoo.cfg']:
+    zookeeper_clientPort = config['configurations']['zoo.cfg']['clientPort']
+  else:
+    zookeeper_clientPort = '2181'
+  zookeeper_quorum = (':' + zookeeper_clientPort + 
',').join(config['clusterHostInfo']['zookeeper_hosts'])
+  # last port config
+  zookeeper_quorum += ':' + zookeeper_clientPort
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
 
b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
index df68242..b960296 100644
--- 
a/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
+++ 
b/ambari-server/src/main/resources/common-services/FLUME/1.4.0.2.0/package/templates/flume-metrics2.properties.j2
@@ -16,7 +16,10 @@
 # limitations under the License.
 #}
 
-collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
+collector={{metric_collector_hosts}}
+protocol={{metric_collector_protocol}}
+zookeeper.quorum={{zookeeper_quorum}}
+port={{metric_collector_port}}
 collectionFrequency={{metrics_collection_period}}000
 maxRowCacheSize=10000
 sendInterval={{metrics_report_interval}}000

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
 
b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
index 7763bdd..e62ce9e 100644
--- 
a/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
+++ 
b/ambari-server/src/main/resources/common-services/HBASE/0.96.0.2.0/package/templates/hadoop-metrics2-hbase.properties-GANGLIA-MASTER.j2
@@ -51,6 +51,7 @@ hbase.extendedperiod = 3600
 
 
*.timeline.plugin.urls=file:///usr/lib/ambari-metrics-hadoop-sink/ambari-metrics-hadoop-sink.jar
 *.sink.timeline.slave.host.name={{hostname}}
+
 hbase.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
 hbase.period={{metrics_collection_period}}
 hbase.collector={{metric_collector_host}}:{{metric_collector_port}}
@@ -66,7 +67,8 @@ 
rpc.collector={{metric_collector_host}}:{{metric_collector_port}}
 
hbase.sink.timeline.class=org.apache.hadoop.metrics2.sink.timeline.HadoopTimelineMetricsSink
 hbase.sink.timeline.period={{metrics_collection_period}}
 hbase.sink.timeline.sendInterval={{metrics_report_interval}}000
-hbase.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
+hbase.sink.timeline.collector={{metric_collector_hosts}}
+hbase.sink.timeline.protocol={{metric_collector_protocol}}
 
 # HTTPS properties
 hbase.sink.timeline.truststore.path = {{metric_truststore_path}}

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
 
b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
index 1ee4422..17f9e2c 100644
--- 
a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
+++ 
b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/scripts/params_linux.py
@@ -170,6 +170,7 @@ if stack_supports_storm_kerberos:
 
 ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", [])
 has_metric_collector = not len(ams_collector_hosts) == 0
+metric_collector_port = None
 if has_metric_collector:
   if 'cluster-env' in config['configurations'] and \
       'metrics_collector_vip_host' in config['configurations']['cluster-env']:
@@ -201,6 +202,20 @@ metrics_collection_period = 
default("/configurations/ams-site/timeline.metrics.s
 metric_collector_sink_jar = 
"/usr/lib/storm/lib/ambari-metrics-storm-sink-with-common-*.jar"
 metric_collector_legacy_sink_jar = 
"/usr/lib/storm/lib/ambari-metrics-storm-sink-legacy-with-common-*.jar"
 
+# Collector hosts
+metric_collector_hosts = None
+if ams_collector_hosts:
+  for host in ams_collector_hosts:
+    metric_collector_hosts += host + ':' + metric_collector_port + ','
+  metric_collector_hosts = metric_collector_hosts[:-1]
+
+# Cluster Zookeeper quorum
+zookeeper_quorum = None
+if storm_zookeeper_servers:
+  for server in storm_zookeeper_servers:
+    zookeeper_quorum += server + ':' + storm_zookeeper_port + ","
+  zookeeper_quorum = zookeeper_quorum[:-1]
+
 jar_jvm_opts = ''
 
 ########################################################

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
 
b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
index ebeb887..1f0875f 100644
--- 
a/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
+++ 
b/ambari-server/src/main/resources/common-services/STORM/0.9.1/package/templates/storm-metrics2.properties.j2
@@ -16,7 +16,10 @@
 # limitations under the License.
 #}
 
-collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
+collector={{metric_collector_hosts}}
+protocol={{metric_collector_protocol}}
+port={{metric_collector_port}}
+zookeeper.quorum={{zookeeper_quorum}}
 maxRowCacheSize=10000
 sendInterval={{metrics_report_interval}}000
 clusterReporterAppId=nimbus

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
index 568d418..cf61539 100644
--- 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
+++ 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/scripts/params.py
@@ -109,8 +109,11 @@ is_rmnode_master = hostname in rm_host
 is_hsnode_master = hostname in hs_host
 is_hbase_master = hostname in hbase_master_hosts
 is_slave = hostname in slave_hosts
+
 if has_ganglia_server:
   ganglia_server_host = ganglia_server_hosts[0]
+
+metric_collector_port = None
 if has_metric_collector:
   if 'cluster-env' in config['configurations'] and \
       'metrics_collector_vip_host' in config['configurations']['cluster-env']:
@@ -138,6 +141,24 @@ if has_metric_collector:
 metrics_report_interval = 
default("/configurations/ams-site/timeline.metrics.sink.report.interval", 60)
 metrics_collection_period = 
default("/configurations/ams-site/timeline.metrics.sink.collection.period", 10)
 
+#Collector hosts
+metric_collector_hosts = None
+if ams_collector_hosts:
+  for host in ams_collector_hosts:
+    metric_collector_hosts += host + ':' + metric_collector_port + ','
+  metric_collector_hosts = metric_collector_hosts[:-1]
+
+# Cluster Zookeeper quorum
+zookeeper_quorum = None
+if has_zk_host:
+  if 'zoo.cfg' in config['configurations'] and 'clientPort' in 
config['configurations']['zoo.cfg']:
+    zookeeper_clientPort = config['configurations']['zoo.cfg']['clientPort']
+  else:
+    zookeeper_clientPort = '2181'
+  zookeeper_quorum = (':' + zookeeper_clientPort + 
',').join(config['clusterHostInfo']['zookeeper_hosts'])
+  # last port config
+  zookeeper_quorum += ':' + zookeeper_clientPort
+
 #hadoop params
 
 if has_namenode or dfs_type == 'HCFS':

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
index ad5c743..075d2c9 100644
--- 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
+++ 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/hooks/before-START/templates/hadoop-metrics2.properties.j2
@@ -72,19 +72,22 @@ resourcemanager.sink.ganglia.tagsForPrefix.yarn=Queue
 *.sink.timeline.period={{metrics_collection_period}}
 *.sink.timeline.sendInterval={{metrics_report_interval}}000
 *.sink.timeline.slave.host.name = {{hostname}}
+*.sink.timeline.zookeeper.quorum={{zookeeper_quorum}}
+*.sink.timeline.protocol={{metric_collector_protocol}}
+*.sink.timeline.port={{metric_collector_port}}
 
 # HTTPS properties
 *.sink.timeline.truststore.path = {{metric_truststore_path}}
 *.sink.timeline.truststore.type = {{metric_truststore_type}}
 *.sink.timeline.truststore.password = {{metric_truststore_password}}
 
-datanode.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
-namenode.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
-resourcemanager.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
-nodemanager.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
-jobhistoryserver.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
-journalnode.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
-applicationhistoryserver.sink.timeline.collector={{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}
+datanode.sink.timeline.collector={{metrics_collector_hosts}}
+namenode.sink.timeline.collector={{metrics_collector_hosts}}
+resourcemanager.sink.timeline.collector={{metrics_collector_hosts}}
+nodemanager.sink.timeline.collector={{metrics_collector_hosts}}
+jobhistoryserver.sink.timeline.collector={{metrics_collector_hosts}}
+journalnode.sink.timeline.collector={{metrics_collector_hosts}}
+applicationhistoryserver.sink.timeline.collector={{metrics_collector_hosts}}
 
 resourcemanager.sink.timeline.tagsForPrefix.yarn=Queue
 

Reply via email to