AMBARI-16828. Support round-robin scheduling with failover for Sinks with 
distributed collector. (swagle)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/954e61ee
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/954e61ee
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/954e61ee

Branch: refs/heads/branch-2.5
Commit: 954e61ee07416bc20b52bfda84dde5ea57876130
Parents: 798a70e
Author: Aravindan Vijayan <[email protected]>
Authored: Mon Nov 14 20:34:09 2016 -0800
Committer: Aravindan Vijayan <[email protected]>
Committed: Tue Nov 15 11:02:12 2016 -0800

----------------------------------------------------------------------
 .../logsearch/solr/metrics/SolrAmsClient.java   |  23 +-
 ambari-metrics/ambari-metrics-common/pom.xml    |  22 ++
 .../timeline/AbstractTimelineMetricsSink.java   | 272 +++++++++++++++++-
 .../MetricsSinkInitializationException.java     |  25 ++
 .../availability/MetricCollectorHAHelper.java   |  96 +++++++
 .../MetricCollectorUnavailableException.java    |  24 ++
 ...icSinkWriteShardHostnameHashingStrategy.java |  59 ++++
 .../MetricSinkWriteShardStrategy.java           |  24 ++
 .../availability/MetricCollectorHATest.java     | 149 ++++++++++
 .../availability/ShardingStrategyTest.java      |  52 ++++
 .../cache/HandleConnectExceptionTest.java       |  38 ++-
 .../sink/flume/FlumeTimelineMetricsSink.java    |  39 ++-
 .../timeline/HadoopTimelineMetricsSink.java     |  50 +++-
 .../timeline/HadoopTimelineMetricsSinkTest.java |  55 +++-
 .../kafka/KafkaTimelineMetricsReporter.java     |  50 +++-
 .../kafka/KafkaTimelineMetricsReporterTest.java |   1 +
 .../storm/StormTimelineMetricsReporter.java     |  44 ++-
 .../sink/storm/StormTimelineMetricsSink.java    |  38 ++-
 .../storm/StormTimelineMetricsReporter.java     |  59 +++-
 .../sink/storm/StormTimelineMetricsSink.java    |  38 ++-
 .../storm/StormTimelineMetricsSinkTest.java     |  29 +-
 .../timeline/HBaseTimelineMetricStore.java      |  22 +-
 .../aggregators/AbstractTimelineAggregator.java |   4 +-
 .../TimelineMetricAggregatorFactory.java        |  16 +-
 .../TimelineMetricClusterAggregator.java        |   4 +-
 .../TimelineMetricClusterAggregatorSecond.java  |   4 +-
 .../TimelineMetricHostAggregator.java           |   4 +-
 .../v2/TimelineMetricClusterAggregator.java     |   4 +-
 .../v2/TimelineMetricHostAggregator.java        |   4 +-
 .../availability/AggregationTaskRunner.java     |   6 +-
 .../MetricCollectorHAController.java            | 276 +++++++++++++++++++
 .../TimelineMetricHAController.java             | 276 -------------------
 .../webapp/TimelineWebServices.java             |   8 +
 .../MetricCollectorHAControllerTest.java        | 107 +++++++
 .../TimelineMetricHAControllerTest.java         | 107 -------
 .../FLUME/1.4.0.2.0/package/scripts/params.py   |  20 ++
 .../templates/flume-metrics2.properties.j2      |   5 +-
 ...-metrics2-hbase.properties-GANGLIA-MASTER.j2 |   4 +-
 .../STORM/0.9.1/package/scripts/params_linux.py |  15 +
 .../templates/storm-metrics2.properties.j2      |   5 +-
 .../2.0.6/hooks/before-START/scripts/params.py  |  21 ++
 .../templates/hadoop-metrics2.properties.j2     |  17 +-
 42 files changed, 1597 insertions(+), 519 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/solr/metrics/SolrAmsClient.java
----------------------------------------------------------------------
diff --git 
a/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/solr/metrics/SolrAmsClient.java
 
b/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/solr/metrics/SolrAmsClient.java
index cdeb63d..85ea69d 100644
--- 
a/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/solr/metrics/SolrAmsClient.java
+++ 
b/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/solr/metrics/SolrAmsClient.java
@@ -22,6 +22,7 @@ package org.apache.ambari.logsearch.solr.metrics;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 
+// TODO: Refactor for failover
 public class SolrAmsClient extends AbstractTimelineMetricsSink {
   private final String collectorHost;
 
@@ -30,7 +31,7 @@ public class SolrAmsClient extends 
AbstractTimelineMetricsSink {
   }
 
   @Override
-  public String getCollectorUri() {
+  public String getCollectorUri(String host) {
     return collectorHost;
   }
 
@@ -40,7 +41,27 @@ public class SolrAmsClient extends 
AbstractTimelineMetricsSink {
   }
 
   @Override
+  protected String getZookeeperQuorum() {
+    return null;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return null;
+  }
+
+  @Override
+  protected String getHostname() {
+    return null;
+  }
+
+  @Override
   protected boolean emitMetrics(TimelineMetrics metrics) {
     return super.emitMetrics(metrics);
   }
+
+  @Override
+  protected String getCollectorProtocol() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/pom.xml 
b/ambari-metrics/ambari-metrics-common/pom.xml
index fb01c81..b972fb2 100644
--- a/ambari-metrics/ambari-metrics-common/pom.xml
+++ b/ambari-metrics/ambari-metrics-common/pom.xml
@@ -62,6 +62,28 @@
       <artifactId>commons-logging</artifactId>
       <version>1.1.1</version>
     </dependency>
+    <!-- TODO: Need to add these as shaded dependencies -->
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>2.4</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.code.gson</groupId>
+      <artifactId>gson</artifactId>
+      <version>2.2.2</version>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>14.0.1</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+      <version>2.7.1</version>
+    </dependency>
+    <!--  END TODO -->
     <dependency>
       <groupId>org.codehaus.jackson</groupId>
       <artifactId>jackson-xc</artifactId>

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 5a716df..ca7ccea 100644
--- 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -17,8 +17,18 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline;
 
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import 
org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorHAHelper;
+import 
org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorUnavailableException;
+import 
org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardHostnameHashingStrategy;
+import 
org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardStrategy;
 import org.codehaus.jackson.map.AnnotationIntrospector;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -35,9 +45,17 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.io.StringWriter;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.security.KeyStore;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class AbstractTimelineMetricsSink {
@@ -46,20 +64,22 @@ public abstract class AbstractTimelineMetricsSink {
   public static final String METRICS_SEND_INTERVAL = "sendInterval";
   public static final String METRICS_POST_TIMEOUT_SECONDS = "timeout";
   public static final String COLLECTOR_PROPERTY = "collector";
+  public static final String COLLECTOR_PROTOCOL = "protocol";
+  public static final String COLLECTOR_PORT = "port";
+  public static final String ZOOKEEPER_QUORUM = "zookeeper.quorum";
   public static final int DEFAULT_POST_TIMEOUT_SECONDS = 10;
   public static final String SKIP_COUNTER_TRANSFROMATION = 
"skipCounterDerivative";
   public static final String RPC_METRIC_PREFIX = "metric.rpc";
-  public static final String RPC_METRIC_NAME_SUFFIX = "suffix";
-  public static final String RPC_METRIC_PORT_SUFFIX = "port";
-
   public static final String WS_V1_TIMELINE_METRICS = 
"/ws/v1/timeline/metrics";
-
   public static final String SSL_KEYSTORE_PATH_PROPERTY = "truststore.path";
   public static final String SSL_KEYSTORE_TYPE_PROPERTY = "truststore.type";
   public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = 
"truststore.password";
+  public static final String COLLECTOR_LIVE_NODES_PATH = 
"/ws/v1/timeline/metrics/livenodes";
 
   protected static final AtomicInteger failedCollectorConnectionsCounter = new 
AtomicInteger(0);
   public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100;
+  public int ZK_CONNECT_TRY_TIME = 10000;
+  public int ZK_SLEEP_BETWEEN_RETRY_TIME = 2000;
 
   private SSLSocketFactory sslSocketFactory;
 
@@ -67,6 +87,28 @@ public abstract class AbstractTimelineMetricsSink {
 
   protected static ObjectMapper mapper;
 
+  protected MetricCollectorHAHelper collectorHAHelper;
+
+  protected MetricSinkWriteShardStrategy metricSinkWriteShardStrategy;
+
+  // Single element cache with fixed expiration - Helps adjacent Sinks as
+  // well as timed refresh
+  protected Supplier targetCollectorHostSupplier;
+
+  protected final List<String> allKnownLiveCollectors = new ArrayList<>();
+
+  private volatile boolean isInitializedForHA = false;
+
+  @SuppressWarnings("all")
+  private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 5;
+
+  private final Gson gson = new Gson();
+
+  private final Random rand = new Random();
+
+  private static final int COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES = 75;
+  private static final int COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES = 60;
+
   static {
     mapper = new ObjectMapper();
     AnnotationIntrospector introspector = new JaxbAnnotationIntrospector();
@@ -79,6 +121,16 @@ public abstract class AbstractTimelineMetricsSink {
     LOG = LogFactory.getLog(this.getClass());
   }
 
+  /**
+   * Initialize Sink write strategy with respect to HA Collector
+   */
+  protected void init() {
+    metricSinkWriteShardStrategy = new 
MetricSinkWriteShardHostnameHashingStrategy(getHostname());
+    collectorHAHelper = new MetricCollectorHAHelper(getZookeeperQuorum(),
+      ZK_CONNECT_TRY_TIME, ZK_SLEEP_BETWEEN_RETRY_TIME);
+    isInitializedForHA = true;
+  }
+
   protected boolean emitMetricsJson(String connectUrl, String jsonData) {
     int timeout = getTimeoutSeconds() * 1000;
     HttpURLConnection connection = null;
@@ -113,7 +165,7 @@ public abstract class AbstractTimelineMetricsSink {
         }
       }
       cleanupInputStream(connection.getInputStream());
-      //reset failedCollectorConnectionsCounter to "0"
+      // reset failedCollectorConnectionsCounter to "0"
       failedCollectorConnectionsCounter.set(0);
       return true;
     } catch (IOException ioe) {
@@ -146,7 +198,20 @@ public abstract class AbstractTimelineMetricsSink {
   }
 
   protected boolean emitMetrics(TimelineMetrics metrics) {
-    String connectUrl = getCollectorUri();
+    String collectorHost;
+    // Get cached target
+    if (targetCollectorHostSupplier != null) {
+      collectorHost = (String) targetCollectorHostSupplier.get();
+      // Last X attempts have failed - force refresh
+      if (failedCollectorConnectionsCounter.get() > 
RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER) {
+        targetCollectorHostSupplier = null;
+        collectorHost = findPreferredCollectHost();
+      }
+    } else {
+      collectorHost = findPreferredCollectHost();
+    }
+
+    String connectUrl = getCollectorUri(collectorHost);
     String jsonData = null;
     try {
       jsonData = mapper.writeValueAsString(metrics);
@@ -196,8 +261,7 @@ public abstract class AbstractTimelineMetricsSink {
   protected HttpsURLConnection getSSLConnection(String spec)
     throws IOException, IllegalStateException {
 
-    HttpsURLConnection connection = (HttpsURLConnection) (new URL(spec)
-      .openConnection());
+    HttpsURLConnection connection = (HttpsURLConnection) (new 
URL(spec).openConnection());
 
     connection.setSSLSocketFactory(sslSocketFactory);
 
@@ -208,11 +272,7 @@ public abstract class AbstractTimelineMetricsSink {
                                 String trustStorePassword) {
     if (sslSocketFactory == null) {
       if (trustStorePath == null || trustStorePassword == null) {
-
-        String msg =
-          String.format("Can't load TrustStore. " +
-            "Truststore path or password is not set.");
-
+        String msg = "Can't load TrustStore. Truststore path or password is 
not set.";
         LOG.error(msg);
         throw new IllegalStateException(msg);
       }
@@ -242,7 +302,191 @@ public abstract class AbstractTimelineMetricsSink {
     }
   }
 
-  abstract protected String getCollectorUri();
+  /**
+   * Find appropriate write shard for this sink using the {@link 
org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardStrategy}
+   *
+   * 1. Use configured collector(s) to discover available collectors
+   * 2. If configured collector(s) are unresponsive check Zookeeper to find 
live hosts
+   * 3. Refresh known collector list using ZK
+   * 4. Default: Return configured collector with no side effect due to 
discovery.
+   *
+   * throws {#link MetricsSinkInitializationException} if called before
+   * initialization, not other side effect
+   *
+   * @return String Collector hostname
+   */
+  protected synchronized String findPreferredCollectHost() {
+    if (!isInitializedForHA) {
+      init();
+    }
+
+    // Auto expire and re-calculate after 1 hour
+    if (targetCollectorHostSupplier != null) {
+      Object targetCollector = targetCollectorHostSupplier.get();
+      if (targetCollector != null) {
+        return (String) targetCollector;
+      }
+    }
+
+    String configuredCollectors = getConfiguredCollectors();
+    // Reach out to all configured collectors before Zookeeper
+    if (configuredCollectors != null && !configuredCollectors.isEmpty()) {
+      String collectorHosts = getConfiguredCollectors();
+      if (!collectorHosts.isEmpty()) {
+        String[] hosts = collectorHosts.split(",");
+        for (String hostPortStr : hosts) {
+          if (hostPortStr != null && !hostPortStr.isEmpty()) {
+            String[] hostPortPair = hostPortStr.split(":");
+            if (hostPortPair.length < 2) {
+              LOG.warn("Collector port is missing from the configuration.");
+              continue;
+            }
+            String hostStr = hostPortPair[0].trim();
+            String portStr = hostPortPair[1].trim();
+            // Check liveliness and get known instances
+            try {
+              Collection<String> liveHosts = 
findLiveCollectorHostsFromKnownCollector(hostStr, portStr);
+              // Update live Hosts - current host will already be a part of 
this
+              for (String host : liveHosts) {
+                allKnownLiveCollectors.add(host);
+              }
+            } catch (MetricCollectorUnavailableException e) {
+              allKnownLiveCollectors.remove(hostStr);
+              LOG.info("Collector " + hostStr + " is not longer live. Removing 
" +
+                "it from list of know live collector hosts : " + 
allKnownLiveCollectors);
+            }
+          }
+        }
+      }
+    }
+
+    // Lookup Zookeeper for live hosts - max 10 seconds wait time
+    if (allKnownLiveCollectors.size() == 0) {
+      
allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode());
+    }
+
+    if (allKnownLiveCollectors.size() != 0) {
+      targetCollectorHostSupplier = Suppliers.memoizeWithExpiration(
+        new Supplier() {
+          @Override
+          public Object get() {
+            return metricSinkWriteShardStrategy.findCollectorShard(new 
ArrayList<>(allKnownLiveCollectors));
+          }
+        },  // random.nextInt(max - min + 1) + min # (60 to 75 minutes)
+        rand.nextInt(COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES
+          - COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES + 1)
+          + COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES,
+        TimeUnit.MINUTES
+      );
+
+      return (String) targetCollectorHostSupplier.get();
+    }
+    return null;
+  }
+
+  Collection<String> findLiveCollectorHostsFromKnownCollector(String host, 
String port) throws MetricCollectorUnavailableException {
+    List<String> collectors = new ArrayList<>();
+    HttpURLConnection connection = null;
+    StringBuilder sb = new StringBuilder(getCollectorProtocol());
+    sb.append("://");
+    sb.append(host);
+    sb.append(":");
+    sb.append(port);
+    sb.append(COLLECTOR_LIVE_NODES_PATH);
+    String connectUrl = sb.toString();
+    LOG.debug("Requesting live collector nodes : " + connectUrl);
+    try {
+      connection = getCollectorProtocol().startsWith("https") ?
+        getSSLConnection(connectUrl) : getConnection(connectUrl);
+
+      connection.setRequestMethod("GET");
+      // 5 seconds for this op is plenty of wait time
+      connection.setConnectTimeout(3000);
+      connection.setReadTimeout(2000);
+
+      int responseCode = connection.getResponseCode();
+      if (responseCode == 200) {
+        try (InputStream in = connection.getInputStream()) {
+          StringWriter writer = new StringWriter();
+          IOUtils.copy(in, writer);
+          try {
+            collectors = gson.fromJson(writer.toString(), new 
TypeToken<List<String>>(){}.getType());
+          } catch (JsonSyntaxException jse) {
+            // Swallow this at the behest of still trying to POST
+            LOG.debug("Exception deserializing the json data on live " +
+              "collector nodes.", jse);
+          }
+        }
+      }
+
+    } catch (IOException ioe) {
+      StringBuilder errorMessage =
+        new StringBuilder("Unable to connect to collector, " + connectUrl);
+      try {
+        if ((connection != null)) {
+          errorMessage.append(cleanupInputStream(connection.getErrorStream()));
+        }
+      } catch (IOException e) {
+        //NOP
+      }
+      LOG.debug(errorMessage);
+      LOG.debug(ioe);
+      String warnMsg = "Unable to connect to collector to find live nodes.";
+      LOG.warn(warnMsg, ioe);
+      throw new MetricCollectorUnavailableException(warnMsg);
+    }
+    return collectors;
+  }
+
+  // Constructing without UriBuilder to avoid unfavorable httpclient
+  // dependencies
+  protected String constructTimelineMetricUri(String protocol, String host, 
String port) {
+    StringBuilder sb = new StringBuilder(protocol);
+    sb.append("://");
+    sb.append(host);
+    sb.append(":");
+    sb.append(port);
+    sb.append(WS_V1_TIMELINE_METRICS);
+    return sb.toString();
+  }
+
+  protected String constructContainerMetricUri(String protocol, String host, 
String port) {
+    StringBuilder sb = new StringBuilder(protocol);
+    sb.append("://");
+    sb.append(host);
+    sb.append(":");
+    sb.append(port);
+    sb.append(WS_V1_TIMELINE_METRICS);
+    return sb.toString();
+  }
 
+  /**
+   * Get a pre-formatted URI for the collector
+   */
+  abstract protected String getCollectorUri(String host);
+
+  abstract protected String getCollectorProtocol();
+
+  /**
+   * How soon to timeout on the emit calls in seconds.
+   */
   abstract protected int getTimeoutSeconds();
+
+  /**
+   * Get the zookeeper quorum for the cluster used to find collector
+   * @return String "host1:port1,host2:port2"
+   */
+  abstract protected String getZookeeperQuorum();
+
+  /**
+   * Get pre-configured list of collectors available
+   * @return String "host1:port,host2:port"
+   */
+  abstract protected String getConfiguredCollectors();
+
+  /**
+   * Get hostname used for calculating write shard.
+   * @return String "host1"
+   */
+  abstract protected String getHostname();
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricsSinkInitializationException.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricsSinkInitializationException.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricsSinkInitializationException.java
new file mode 100644
index 0000000..5760b34
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricsSinkInitializationException.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+public class MetricsSinkInitializationException extends RuntimeException {
+  // Default constructor
+  public MetricsSinkInitializationException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java
new file mode 100644
index 0000000..7b13362
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.availability;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.RetryLoop;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.retry.RetryUntilElapsed;
+import org.apache.zookeeper.ZooKeeper;
+
+import java.net.HttpURLConnection;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+/**
+ * Find a live Collector instance from Zookeeper
+ * This class allows connect to ZK on-demand and
+ * does not add a watcher on the znode.
+ */
+public class MetricCollectorHAHelper {
+  private final String zookeeperQuorum;
+  private final int tryTime;
+  private final int sleepMsBetweenRetries;
+
+  private static final int CONNECTION_TIMEOUT = 2000;
+  private static final int SESSION_TIMEOUT = 5000;
+  private static final String ZK_PATH = 
"/ambari-metrics-cluster/LIVEINSTANCES";
+  private static final String INSTANCE_NAME_DELIMITER = "_";
+
+
+
+  private static final Log LOG = 
LogFactory.getLog(MetricCollectorHAHelper.class);
+
+  public MetricCollectorHAHelper(String zookeeperQuorum, int tryTime, int 
sleepMsBetweenRetries) {
+    this.zookeeperQuorum = zookeeperQuorum;
+    this.tryTime = tryTime;
+    this.sleepMsBetweenRetries = sleepMsBetweenRetries;
+  }
+
+  /**
+   * Connect to Zookeeper to find live instances of metrics collector
+   * @return {#link Collection} hostnames
+   */
+  public Collection<String> findLiveCollectorHostsFromZNode() {
+    Set<String> collectors = new HashSet<>();
+
+    RetryPolicy retryPolicy = new RetryUntilElapsed(tryTime, 
sleepMsBetweenRetries);
+    final CuratorZookeeperClient client = new 
CuratorZookeeperClient(zookeeperQuorum,
+      SESSION_TIMEOUT, CONNECTION_TIMEOUT, null, retryPolicy);
+
+    String liveInstances = null;
+
+    try {
+      liveInstances = RetryLoop.callWithRetry(client, new Callable<String>() {
+        @Override
+        public String call() throws Exception {
+          ZooKeeper zookeeper = client.getZooKeeper();
+          byte[] data = zookeeper.getData(ZK_PATH, null, null);
+          return data != null ? new String(data) : null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.warn("Unable to connect to zookeeper.", e);
+      LOG.debug(e);
+    }
+
+    // [ambari-sid-3.c.pramod-thangali.internal_12001]
+    if (liveInstances != null && !liveInstances.isEmpty()) {
+      for (String instanceStr : liveInstances.split(",")) {
+        collectors.add(instanceStr.substring(0, 
instanceStr.indexOf(INSTANCE_NAME_DELIMITER)));
+      }
+    }
+
+    return collectors;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorUnavailableException.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorUnavailableException.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorUnavailableException.java
new file mode 100644
index 0000000..c381bbb
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorUnavailableException.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.availability;
+
+public class MetricCollectorUnavailableException extends Exception {
+  public MetricCollectorUnavailableException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java
new file mode 100644
index 0000000..1c89884
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.availability;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.util.List;
+
+/**
+ * Provides sharding based on hostname
+ */
+public class MetricSinkWriteShardHostnameHashingStrategy implements 
MetricSinkWriteShardStrategy {
+  private final String hostname;
+  private final long hostnameHash;
+  private static final Log LOG = 
LogFactory.getLog(MetricSinkWriteShardHostnameHashingStrategy.class);
+
+  public MetricSinkWriteShardHostnameHashingStrategy(String hostname) {
+    this.hostname = hostname;
+    this.hostnameHash = hostname != null ? computeHash(hostname) : 1000; // 
some constant
+  }
+
+  @Override
+  public String findCollectorShard(List<String> collectorHosts) {
+    int index = (int) (hostnameHash % collectorHosts.size());
+    String collectorHost = collectorHosts.get(index);
+    LOG.info(String.format("Calculated collector shard %s based on hostname: 
%s", collectorHost, hostname));
+    return collectorHost;
+  }
+
+  /**
+   * Compute consistent hash based on hostname which should give decently
+   * uniform distribution assuming hostname generally have a sequential
+   * numeric suffix.
+   */
+  long computeHash(String hostname) {
+    long h = 11987L; // prime
+    int len = hostname.length();
+
+    for (int i = 0; i < len; i++) {
+      h = 31 * h + hostname.charAt(i);
+    }
+    return h;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardStrategy.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardStrategy.java
 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardStrategy.java
new file mode 100644
index 0000000..7619555
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardStrategy.java
@@ -0,0 +1,24 @@
+package org.apache.hadoop.metrics2.sink.timeline.availability;
+
+import java.util.List;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface MetricSinkWriteShardStrategy {
+  String findCollectorShard(List<String> collectorHosts);
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
 
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
new file mode 100644
index 0000000..7fadeb2
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.availability;
+
+import com.google.gson.Gson;
+import junit.framework.Assert;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashSet;
+import static org.easymock.EasyMock.expect;
+import static org.powermock.api.easymock.PowerMock.createNiceMock;
+import static org.powermock.api.easymock.PowerMock.expectNew;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({AbstractTimelineMetricsSink.class, URL.class, 
HttpURLConnection.class})
+public class MetricCollectorHATest {
+
+  @Test
+  public void findCollectorUsingZKTest() throws Exception {
+    InputStream is = createNiceMock(InputStream.class);
+    HttpURLConnection connection = createNiceMock(HttpURLConnection.class);
+    URL url = createNiceMock(URL.class);
+    MetricCollectorHAHelper haHelper = 
createNiceMock(MetricCollectorHAHelper.class);
+
+    expectNew(URL.class, 
"http://localhost:2181/ws/v1/timeline/metrics/livenodes";).andReturn(url).anyTimes();
+    expect(url.openConnection()).andReturn(connection).anyTimes();
+    expect(connection.getInputStream()).andReturn(is).anyTimes();
+    expect(connection.getResponseCode()).andThrow(new 
IOException()).anyTimes();
+    expect(haHelper.findLiveCollectorHostsFromZNode()).andReturn(
+      new ArrayList<String>() {{ add("h2"); add("h3"); }});
+
+    replayAll();
+    TestTimelineMetricsSink sink = new TestTimelineMetricsSink(haHelper);
+    sink.init();
+
+    String host = sink.findPreferredCollectHost();
+
+    verifyAll();
+
+    Assert.assertNotNull(host);
+    Assert.assertEquals("h2", host);
+
+  }
+
+  @Test
+  public void findCollectorUsingKnownCollectorTest() throws Exception {
+    HttpURLConnection connection = createNiceMock(HttpURLConnection.class);
+    URL url = createNiceMock(URL.class);
+    MetricCollectorHAHelper haHelper = 
createNiceMock(MetricCollectorHAHelper.class);
+
+    Gson gson = new Gson();
+    ArrayList<String> output = new ArrayList<>();
+    output.add("h1");
+    output.add("h2");
+    output.add("h3");
+    InputStream is = IOUtils.toInputStream(gson.toJson(output));
+
+    expectNew(URL.class, 
"http://localhost:2181/ws/v1/timeline/metrics/livenodes";).andReturn(url).anyTimes();
+    expect(url.openConnection()).andReturn(connection).anyTimes();
+    expect(connection.getInputStream()).andReturn(is).anyTimes();
+    expect(connection.getResponseCode()).andReturn(200).anyTimes();
+
+    replayAll();
+    TestTimelineMetricsSink sink = new TestTimelineMetricsSink(haHelper);
+    sink.init();
+
+    String host = sink.findPreferredCollectHost();
+    Assert.assertNotNull(host);
+    Assert.assertEquals("h3", host);
+
+    verifyAll();
+  }
+
+  private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink {
+    MetricCollectorHAHelper testHelper;
+
+    TestTimelineMetricsSink(MetricCollectorHAHelper haHelper) {
+      testHelper = haHelper;
+    }
+
+    @Override
+    protected void init() {
+      super.init();
+      this.collectorHAHelper = testHelper;
+    }
+
+    @Override
+    protected synchronized String findPreferredCollectHost() {
+      return super.findPreferredCollectHost();
+    }
+
+    @Override
+    protected String getCollectorUri(String host) {
+      return null;
+    }
+
+    @Override
+    protected String getCollectorProtocol() {
+      return "http";
+    }
+
+    @Override
+    protected int getTimeoutSeconds() {
+      return 10;
+    }
+
+    @Override
+    protected String getZookeeperQuorum() {
+      return "localhost:2181";
+    }
+
+    @Override
+    protected String getConfiguredCollectors() {
+      return "localhost:2181";
+    }
+
+    @Override
+    protected String getHostname() {
+      return "h1";
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/ShardingStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/ShardingStrategyTest.java
 
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/ShardingStrategyTest.java
new file mode 100644
index 0000000..c6041db
--- /dev/null
+++ 
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/ShardingStrategyTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.availability;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ShardingStrategyTest {
+  @Test
+  public void testHostnameShardingStrategy() throws Exception {
+    List<String> collectorHosts = new ArrayList<String>() {{
+      add("mycollector-1.hostname.domain");
+      add("mycollector-2.hostname.domain");
+    }};
+
+    String hostname1 = 
"some-very-long-hostname-with-a-trailing-number-identifier-10.mylocalhost.domain";
+
+    // Consistency check
+    String collectorShard1 = null;
+    for (int i = 0; i < 100; i++) {
+      MetricSinkWriteShardStrategy strategy = new 
MetricSinkWriteShardHostnameHashingStrategy(hostname1);
+      collectorShard1 = strategy.findCollectorShard(collectorHosts);
+      Assert.assertEquals(collectorShard1, 
strategy.findCollectorShard(collectorHosts));
+    }
+
+    // Shard 2 hosts
+    String hostname2 = 
"some-very-long-hostname-with-a-trailing-number-identifier-20.mylocalhost.domain";
+    MetricSinkWriteShardStrategy strategy = new 
MetricSinkWriteShardHostnameHashingStrategy(hostname2);
+    String collectorShard2 = strategy.findCollectorShard(collectorHosts);
+
+    Assert.assertEquals("mycollector-1.hostname.domain", collectorShard1);
+    Assert.assertEquals("mycollector-2.hostname.domain", collectorShard2);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
 
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
index a192802..ccaa574 100644
--- 
a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
+++ 
b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
@@ -33,14 +33,14 @@ import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import static org.easymock.EasyMock.anyString;
 import static org.easymock.EasyMock.expect;
 import static org.powermock.api.easymock.PowerMock.createNiceMock;
 import static org.powermock.api.easymock.PowerMock.expectNew;
 import static org.powermock.api.easymock.PowerMock.replayAll;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({AbstractTimelineMetricsSink.class, URL.class,
-  HttpURLConnection.class})
+@PrepareForTest({AbstractTimelineMetricsSink.class, URL.class, 
HttpURLConnection.class})
 public class HandleConnectExceptionTest {
   private static final String COLLECTOR_URL = "collector";
   private TestTimelineMetricsSink sink;
@@ -53,7 +53,7 @@ public class HandleConnectExceptionTest {
     URL url = createNiceMock(URL.class);
     AbstractTimelineMetricsSink.NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 2;
     try {
-      expectNew(URL.class, "collector").andReturn(url).anyTimes();
+      expectNew(URL.class, anyString()).andReturn(url).anyTimes();
       expect(url.openConnection()).andReturn(connection).anyTimes();
       expect(connection.getOutputStream()).andReturn(os).anyTimes();
       expect(connection.getResponseCode()).andThrow(new 
IOException()).anyTimes();
@@ -79,27 +79,51 @@ public class HandleConnectExceptionTest {
     try{
       sink.emitMetrics(timelineMetrics);
       Assert.fail();
-    }catch(UnableToConnectException e){
+    } catch (UnableToConnectException e){
       Assert.assertEquals(COLLECTOR_URL, e.getConnectUrl());
-    }catch(Exception e){
+    } catch (Exception e){
+      e.printStackTrace();
       Assert.fail(e.getMessage());
     }
   }
 
-  class TestTimelineMetricsSink extends AbstractTimelineMetricsSink{
+  private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink{
     @Override
-    protected String getCollectorUri() {
+    protected String getCollectorUri(String host) {
       return COLLECTOR_URL;
     }
 
     @Override
+    protected String getCollectorProtocol() {
+      return "http";
+    }
+
+    @Override
     protected int getTimeoutSeconds() {
       return 10;
     }
 
     @Override
+    protected String getZookeeperQuorum() {
+      return "localhost:2181";
+    }
+
+    @Override
+    protected String getConfiguredCollectors() {
+      return "localhost:2181";
+    }
+
+    @Override
+    protected String getHostname() {
+      return "h1";
+    }
+
+    @Override
     public boolean emitMetrics(TimelineMetrics metrics) {
+      super.init();
       return super.emitMetrics(metrics);
     }
+
+
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 3040c48..1b36e9a 100644
--- 
a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit;
 
 public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink 
implements MonitorService {
   private String collectorUri;
+  private String protocol;
   // Key - component(instance_id)
   private Map<String, TimelineMetricsCache> metricsCaches;
   private int maxRowCacheSize;
@@ -53,6 +54,9 @@ public class FlumeTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
   private ScheduledExecutorService scheduledExecutorService;
   private long pollFrequency;
   private String hostname;
+  private String port;
+  private String collectors;
+  private String zookeeperQuorum;
   private final static String COUNTER_METRICS_PROPERTY = "counters";
   private final Set<String> counterMetrics = new HashSet<String>();
   private int timeoutSeconds = 10;
@@ -95,8 +99,15 @@ public class FlumeTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
     metricsSendInterval = 
Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
         String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
     metricsCaches = new HashMap<String, TimelineMetricsCache>();
-    collectorUri = configuration.getProperty(COLLECTOR_PROPERTY) + 
WS_V1_TIMELINE_METRICS;
-    if (collectorUri.toLowerCase().startsWith("https://";)) {
+    collectors = configuration.getProperty(COLLECTOR_PROPERTY);
+    zookeeperQuorum = configuration.getProperty("zookeeper.quorum");
+    protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
+    port = configuration.getProperty(COLLECTOR_PORT, "6188");
+    // Initialize the collector write strategy
+    super.init();
+
+    collectorUri = constructTimelineMetricUri(protocol, 
findPreferredCollectHost(), port);
+    if (protocol.contains("https")) {
       String trustStorePath = 
configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
       String trustStoreType = 
configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
       String trustStorePwd = 
configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
@@ -109,8 +120,13 @@ public class FlumeTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
   }
 
   @Override
-  public String getCollectorUri() {
-    return collectorUri;
+  public String getCollectorUri(String host) {
+    return constructTimelineMetricUri(protocol, host, port);
+  }
+
+  @Override
+  protected String getCollectorProtocol() {
+    return protocol;
   }
 
   @Override
@@ -118,6 +134,21 @@ public class FlumeTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
     return timeoutSeconds;
   }
 
+  @Override
+  protected String getZookeeperQuorum() {
+    return zookeeperQuorum;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostname;
+  }
+
   public void setPollFrequency(long pollFrequency) {
     this.pollFrequency = pollFrequency;
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 65f93f9..8e78e6f 100644
--- 
a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -28,14 +28,9 @@ import org.apache.hadoop.metrics2.MetricsSink;
 import org.apache.hadoop.metrics2.MetricsTag;
 import org.apache.hadoop.metrics2.impl.MsInfo;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.hadoop.metrics2.util.Servers;
 import org.apache.hadoop.net.DNS;
-
 import java.io.Closeable;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -57,9 +52,11 @@ public class HadoopTimelineMetricsSink extends 
AbstractTimelineMetricsSink imple
   private TimelineMetricsCache metricsCache;
   private String hostName = "UNKNOWN.example.com";
   private String serviceName = "";
-  private List<? extends SocketAddress> metricsServers;
+  private String collectors;
   private String collectorUri;
   private String containerMetricsUri;
+  private String protocol;
+  private String port;
   public static final String WS_V1_CONTAINER_METRICS = 
"/ws/v1/timeline/containermetrics";
 
   private static final String SERVICE_NAME_PREFIX = "serviceName-prefix";
@@ -99,16 +96,21 @@ public class HadoopTimelineMetricsSink extends 
AbstractTimelineMetricsSink imple
     serviceName = getServiceName(conf);
 
     LOG.info("Identified hostname = " + hostName + ", serviceName = " + 
serviceName);
+    // Initialize the collector write strategy
+    super.init();
 
     // Load collector configs
-    metricsServers = Servers.parse(conf.getString(COLLECTOR_PROPERTY), 6188);
+    protocol = conf.getString(COLLECTOR_PROTOCOL, "http");
+    collectors = conf.getString(COLLECTOR_PROPERTY, "").trim();
+    port = conf.getString(COLLECTOR_PORT, "6188");
 
-    if (metricsServers == null || metricsServers.isEmpty()) {
+    if (StringUtils.isEmpty(collectors)) {
       LOG.error("No Metric collector configured.");
     } else {
-      collectorUri = conf.getString(COLLECTOR_PROPERTY).trim() + 
WS_V1_TIMELINE_METRICS;
-      containerMetricsUri = conf.getString(COLLECTOR_PROPERTY).trim() + 
WS_V1_CONTAINER_METRICS;
-      if (collectorUri.toLowerCase().startsWith("https://";)) {
+      String preferredCollectorHost = findPreferredCollectHost();
+      collectorUri = constructTimelineMetricUri(protocol, 
preferredCollectorHost, port);
+      containerMetricsUri = constructContainerMetricUri(protocol, 
preferredCollectorHost, port);
+      if (protocol.contains("https")) {
         String trustStorePath = 
conf.getString(SSL_KEYSTORE_PATH_PROPERTY).trim();
         String trustStoreType = 
conf.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim();
         String trustStorePwd = 
conf.getString(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
@@ -163,6 +165,7 @@ public class HadoopTimelineMetricsSink extends 
AbstractTimelineMetricsSink imple
         }
       }
     }
+
     if (!rpcPortSuffixes.isEmpty()) {
       LOG.info("RPC port properties configured: " + rpcPortSuffixes);
     }
@@ -190,8 +193,13 @@ public class HadoopTimelineMetricsSink extends 
AbstractTimelineMetricsSink imple
   }
 
   @Override
-  protected String getCollectorUri() {
-    return collectorUri;
+  protected String getCollectorUri(String host) {
+    return constructTimelineMetricUri(protocol, host, port);
+  }
+
+  @Override
+  protected String getCollectorProtocol() {
+    return protocol;
   }
 
   @Override
@@ -200,6 +208,21 @@ public class HadoopTimelineMetricsSink extends 
AbstractTimelineMetricsSink imple
   }
 
   @Override
+  protected String getZookeeperQuorum() {
+    return conf.getString(ZOOKEEPER_QUORUM);
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostName;
+  }
+
+  @Override
   public void putMetrics(MetricsRecord record) {
     try {
       String recordName = record.name();
@@ -384,6 +407,7 @@ public class HadoopTimelineMetricsSink extends 
AbstractTimelineMetricsSink imple
       LOG.error("Unable to parse container metrics ", e);
     }
     if (jsonData != null) {
+      // TODO: Container metrics should be able to utilize failover mechanism
       emitMetricsJson(containerMetricsUri, jsonData);
     }
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
 
b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index 4410402..5f22065 100644
--- 
a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ 
b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -18,12 +18,15 @@
 
 package org.apache.hadoop.metrics2.sink.timeline;
 
+import com.google.gson.Gson;
 import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.metrics2.AbstractMetric;
 import org.apache.hadoop.metrics2.MetricType;
 import org.apache.hadoop.metrics2.MetricsInfo;
 import org.apache.hadoop.metrics2.MetricsRecord;
 import org.apache.hadoop.metrics2.MetricsTag;
+import 
org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorHAHelper;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.easymock.EasyMock;
@@ -32,10 +35,13 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -46,11 +52,14 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import static 
org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PORT;
 import static 
org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PROPERTY;
+import static 
org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PROTOCOL;
 import static 
org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE;
 import static 
org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL;
 import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
 import static org.easymock.EasyMock.createMockBuilder;
 import static org.easymock.EasyMock.createNiceMock;
 import static org.easymock.EasyMock.eq;
@@ -58,9 +67,14 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
+import static org.powermock.api.easymock.PowerMock.expectNew;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
 
 @RunWith(PowerMockRunner.class)
+@PrepareForTest({AbstractTimelineMetricsSink.class, HttpURLConnection.class})
 public class HadoopTimelineMetricsSinkTest {
+  Gson gson = new Gson();
 
   @Before
   public void setup() {
@@ -68,16 +82,28 @@ public class HadoopTimelineMetricsSinkTest {
   }
 
   @Test
-  @PrepareForTest({URL.class, OutputStream.class})
+  @PrepareForTest({URL.class, OutputStream.class, 
AbstractTimelineMetricsSink.class, HttpURLConnection.class})
   public void testPutMetrics() throws Exception {
     HadoopTimelineMetricsSink sink = new HadoopTimelineMetricsSink();
 
+    HttpURLConnection connection = 
PowerMock.createNiceMock(HttpURLConnection.class);
+    URL url = PowerMock.createNiceMock(URL.class);
+    InputStream is = 
IOUtils.toInputStream(gson.toJson(Collections.singletonList("localhost")));
+    expectNew(URL.class, anyString()).andReturn(url).anyTimes();
+    expect(url.openConnection()).andReturn(connection).anyTimes();
+    expect(connection.getInputStream()).andReturn(is).anyTimes();
+    expect(connection.getResponseCode()).andReturn(200).anyTimes();
+    OutputStream os = PowerMock.createNiceMock(OutputStream.class);
+    expect(connection.getOutputStream()).andReturn(os).anyTimes();
+
     SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
-    
expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes();
+    
expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes();
     expect(conf.getParent()).andReturn(null).anyTimes();
     expect(conf.getPrefix()).andReturn("service").anyTimes();
-    
expect(conf.getString(eq(COLLECTOR_PROPERTY))).andReturn("localhost:63188").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PROPERTY), 
eq(""))).andReturn("localhost:6188").anyTimes();
     expect(conf.getString(eq("serviceName-prefix"), 
eq(""))).andReturn("").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PROTOCOL), 
eq("http"))).andReturn("http").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PORT), 
eq("6188"))).andReturn("6188").anyTimes();
 
     expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), 
anyInt())).andReturn(10).anyTimes();
     expect(conf.getInt(eq(METRICS_SEND_INTERVAL), 
anyInt())).andReturn(1000).anyTimes();
@@ -121,6 +147,7 @@ public class HadoopTimelineMetricsSinkTest {
     expect(record.metrics()).andReturn(Arrays.asList(metric)).anyTimes();
 
     replay(conf, record, metric);
+    replayAll();
 
     sink.init(conf);
 
@@ -130,7 +157,7 @@ public class HadoopTimelineMetricsSinkTest {
 
     sink.putMetrics(record);
 
-    verify(conf, record, metric);
+    verifyAll();
   }
 
   @Test
@@ -138,20 +165,26 @@ public class HadoopTimelineMetricsSinkTest {
     HadoopTimelineMetricsSink sink =
       createMockBuilder(HadoopTimelineMetricsSink.class)
         .withConstructor().addMockedMethod("appendPrefix")
+        .addMockedMethod("findLiveCollectorHostsFromKnownCollector")
         .addMockedMethod("emitMetrics").createNiceMock();
 
     SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
-    
expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes();
+    
expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes();
     expect(conf.getParent()).andReturn(null).anyTimes();
     expect(conf.getPrefix()).andReturn("service").anyTimes();
-    
expect(conf.getString(eq(COLLECTOR_PROPERTY))).andReturn("localhost:63188").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PROPERTY), 
eq(""))).andReturn("localhost:6188").anyTimes();
     expect(conf.getString(eq("serviceName-prefix"), 
eq(""))).andReturn("").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PROTOCOL), 
eq("http"))).andReturn("http").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PORT), 
eq("6188"))).andReturn("6188").anyTimes();
 
     expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), 
anyInt())).andReturn(10).anyTimes();
     // Return eviction time smaller than time diff for first 3 entries
     // Third entry will result in eviction
     expect(conf.getInt(eq(METRICS_SEND_INTERVAL), 
anyInt())).andReturn(10).anyTimes();
 
+    expect(sink.findLiveCollectorHostsFromKnownCollector("localhost", "6188"))
+      .andReturn(Collections.singletonList("localhost")).anyTimes();
+
     conf.setListDelimiter(eq(','));
     expectLastCall().anyTimes();
 
@@ -260,14 +293,20 @@ public class HadoopTimelineMetricsSinkTest {
     HadoopTimelineMetricsSink sink =
       createMockBuilder(HadoopTimelineMetricsSink.class)
         .withConstructor().addMockedMethod("appendPrefix")
+        .addMockedMethod("findLiveCollectorHostsFromKnownCollector")
         .addMockedMethod("emitMetrics").createNiceMock();
 
     SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class);
-    
expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes();
+    
expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes();
     expect(conf.getParent()).andReturn(null).anyTimes();
     expect(conf.getPrefix()).andReturn("service").anyTimes();
-    
expect(conf.getString(eq(COLLECTOR_PROPERTY))).andReturn("localhost:63188").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PROPERTY), 
eq(""))).andReturn("localhost:6188").anyTimes();
     expect(conf.getString(eq("serviceName-prefix"), 
eq(""))).andReturn("").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PROTOCOL), 
eq("http"))).andReturn("http").anyTimes();
+    expect(conf.getString(eq(COLLECTOR_PORT), 
eq("6188"))).andReturn("6188").anyTimes();
+
+    expect(sink.findLiveCollectorHostsFromKnownCollector("localhost", "6188"))
+      .andReturn(Collections.singletonList("localhost")).anyTimes();
 
     expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), 
anyInt())).andReturn(10).anyTimes();
     expect(conf.getInt(eq(METRICS_SEND_INTERVAL), 
anyInt())).andReturn(10).anyTimes();

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
 
b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index d6d251c..11a1c75 100644
--- 
a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ 
b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -71,14 +71,18 @@ public class KafkaTimelineMetricsReporter extends 
AbstractTimelineMetricsSink
   private static final String TIMELINE_DEFAULT_PORT = "6188";
   private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
 
-  private boolean initialized = false;
+  private volatile boolean initialized = false;
   private boolean running = false;
   private final Object lock = new Object();
   private String collectorUri;
   private String hostname;
+  private String metricCollectorPort;
+  private String collectors;
+  private String metricCollectorProtocol;
   private TimelineScheduledReporter reporter;
   private TimelineMetricsCache metricsCache;
   private int timeoutSeconds = 10;
+  private String zookeeperQuorum;
 
   private String[] excludedMetricsPrefixes;
   private String[] includedMetricsPrefixes;
@@ -86,8 +90,13 @@ public class KafkaTimelineMetricsReporter extends 
AbstractTimelineMetricsSink
   private Set<String> excludedMetrics = new HashSet<>();
 
   @Override
-  protected String getCollectorUri() {
-    return collectorUri;
+  protected String getCollectorUri(String host) {
+    return constructTimelineMetricUri(metricCollectorProtocol, host, 
metricCollectorPort);
+  }
+
+  @Override
+  protected String getCollectorProtocol() {
+    return metricCollectorProtocol;
   }
 
   @Override
@@ -95,10 +104,26 @@ public class KafkaTimelineMetricsReporter extends 
AbstractTimelineMetricsSink
     return timeoutSeconds;
   }
 
+  @Override
+  protected String getZookeeperQuorum() {
+    return zookeeperQuorum;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostname;
+  }
+
   public void setMetricsCache(TimelineMetricsCache metricsCache) {
     this.metricsCache = metricsCache;
   }
 
+  @Override
   public void init(VerifiableProperties props) {
     synchronized (lock) {
       if (!initialized) {
@@ -113,26 +138,33 @@ public class KafkaTimelineMetricsReporter extends 
AbstractTimelineMetricsSink
           LOG.error("Could not identify hostname.");
           throw new RuntimeException("Could not identify hostname.", e);
         }
+        // Initialize the collector write strategy
+        super.init();
+
         KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props);
         timeoutSeconds = props.getInt(METRICS_POST_TIMEOUT_SECONDS, 
DEFAULT_POST_TIMEOUT_SECONDS);
         int metricsSendInterval = 
props.getInt(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY, MAX_EVICTION_TIME_MILLIS);
         int maxRowCacheSize = 
props.getInt(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY, 
MAX_RECS_PER_NAME_DEFAULT);
+
+        zookeeperQuorum = props.getString("zookeeper.connect");
+        collectors = props.getString(TIMELINE_HOST_PROPERTY, 
TIMELINE_DEFAULT_HOST);
+        metricCollectorProtocol = props.getString(TIMELINE_PROTOCOL_PROPERTY, 
TIMELINE_DEFAULT_PROTOCOL);
+
         String metricCollectorHost = props.getString(TIMELINE_HOST_PROPERTY, 
TIMELINE_DEFAULT_HOST);
-        String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, 
TIMELINE_DEFAULT_PORT);
-        String metricCollectorProtocol = 
props.getString(TIMELINE_PROTOCOL_PROPERTY, TIMELINE_DEFAULT_PROTOCOL);
+        metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, 
TIMELINE_DEFAULT_PORT);
+
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, 
metricsSendInterval));
 
-        collectorUri = metricCollectorProtocol + "://" + metricCollectorHost +
-                       ":" + metricCollectorPort + WS_V1_TIMELINE_METRICS;
+        collectorUri = constructTimelineMetricUri(metricCollectorProtocol,
+          metricCollectorHost, metricCollectorPort);
 
-        if (collectorUri.toLowerCase().startsWith("https://";)) {
+        if (metricCollectorProtocol.contains("https")) {
           String trustStorePath = 
props.getString(SSL_KEYSTORE_PATH_PROPERTY).trim();
           String trustStoreType = 
props.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim();
           String trustStorePwd = 
props.getString(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
           loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
         }
 
-
         // Exclusion policy
         String excludedMetricsStr = props.getString(EXCLUDED_METRICS_PROPERTY, 
"");
         if (!StringUtils.isEmpty(excludedMetricsStr.trim())) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
 
b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
index e0adb4b..9027716 100644
--- 
a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
+++ 
b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
@@ -76,6 +76,7 @@ public class KafkaTimelineMetricsReporterTest {
     list.add(meter);
     list.add(timer);
     Properties properties = new Properties();
+    properties.setProperty("zookeeper.connect", "localhost:2181");
     properties.setProperty("kafka.timeline.metrics.sendInterval", "5900");
     properties.setProperty("kafka.timeline.metrics.maxRowCacheSize", "10000");
     properties.setProperty("kafka.timeline.metrics.host", "localhost");

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
 
b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index ab5f1e4..4294837 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -47,22 +47,46 @@ public class StormTimelineMetricsReporter extends 
AbstractTimelineMetricsSink
   private NimbusClient nimbusClient;
   private String applicationId;
   private int timeoutSeconds;
+  private String port;
+  private String collectors;
+  private String zkQuorum;
+  private String protocol;
 
   public StormTimelineMetricsReporter() {
 
   }
 
   @Override
-  protected String getCollectorUri() {
+  protected String getCollectorUri(String host) {
     return this.collectorUri;
   }
 
   @Override
+  protected String getCollectorProtocol() {
+    return protocol;
+  }
+
+  @Override
   protected int getTimeoutSeconds() {
     return timeoutSeconds;
   }
 
   @Override
+  protected String getZookeeperQuorum() {
+    return zkQuorum;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostname;
+  }
+
+  @Override
   public void prepare(Map conf) {
     LOG.info("Preparing Storm Metrics Reporter");
     try {
@@ -80,18 +104,24 @@ public class StormTimelineMetricsReporter extends 
AbstractTimelineMetricsSink
       Map cf = (Map) conf.get(METRICS_COLLECTOR_CATEGORY);
       Map stormConf = Utils.readStormConfig();
       this.nimbusClient = NimbusClient.getConfiguredClient(stormConf);
-      String collector = cf.get(COLLECTOR_PROPERTY).toString();
-      timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ?
-        Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
-        DEFAULT_POST_TIMEOUT_SECONDS;
-      applicationId = cf.get(APP_ID).toString();
-      collectorUri = collector + WS_V1_TIMELINE_METRICS;
+
+      collectors = cf.get(COLLECTOR_PROPERTY).toString();
+      protocol = cf.get(COLLECTOR_PROTOCOL) != null ? 
cf.get(COLLECTOR_PROTOCOL).toString() : "http";
+      port = cf.get(COLLECTOR_PORT) != null ? 
cf.get(COLLECTOR_PORT).toString() : "6188";
+      zkQuorum = cf.get(ZOOKEEPER_QUORUM) != null ? 
cf.get(ZOOKEEPER_QUORUM).toString() : null;
+
       if (collectorUri.toLowerCase().startsWith("https://";)) {
         String trustStorePath = 
cf.get(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
         String trustStoreType = 
cf.get(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
         String trustStorePwd = 
cf.get(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
         loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
       }
+
+      timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ?
+        Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
+        DEFAULT_POST_TIMEOUT_SECONDS;
+      applicationId = cf.get(APP_ID).toString();
+
     } catch (Exception e) {
       LOG.warn("Could not initialize metrics collector, please specify " +
         "protocol, host, port under $STORM_HOME/conf/config.yaml ", e);

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index ea05491..80f0333 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -55,18 +55,42 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
   private int timeoutSeconds;
   private String topologyName;
   private String applicationId;
+  private String collectors;
+  private String zkQuorum;
+  private String protocol;
+  private String port;
 
   @Override
-  protected String getCollectorUri() {
+  protected String getCollectorUri(String host) {
     return collectorUri;
   }
 
   @Override
+  protected String getCollectorProtocol() {
+    return protocol;
+  }
+
+  @Override
   protected int getTimeoutSeconds() {
     return timeoutSeconds;
   }
 
   @Override
+  protected String getZookeeperQuorum() {
+    return zkQuorum;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostname;
+  }
+
+  @Override
   public void prepare(Map map, Object o, TopologyContext topologyContext, 
IErrorReporter iErrorReporter) {
     LOG.info("Preparing Storm Metrics Sink");
     try {
@@ -88,8 +112,16 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
         String.valueOf(MAX_EVICTION_TIME_MILLIS)));
     applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, 
DEFAULT_CLUSTER_REPORTER_APP_ID);
     metricsCache = new TimelineMetricsCache(maxRowCacheSize, 
metricsSendInterval);
-    collectorUri = configuration.getProperty(COLLECTOR_PROPERTY) + 
WS_V1_TIMELINE_METRICS;
-    if (collectorUri.toLowerCase().startsWith("https://";)) {
+
+    collectors = configuration.getProperty(COLLECTOR_PROPERTY);
+    zkQuorum = configuration.getProperty("zookeeper.quorum");
+    protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
+    port = configuration.getProperty(COLLECTOR_PORT, "6188");
+
+    // Initialize the collector write strategy
+    super.init();
+
+    if (protocol.toLowerCase().startsWith("https://";)) {
       String trustStorePath = 
configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
       String trustStoreType = 
configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
       String trustStorePwd = 
configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
 
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 923871e..4c546ad 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -46,14 +46,23 @@ public class StormTimelineMetricsReporter extends 
AbstractTimelineMetricsSink
   private String collectorUri;
   private String applicationId;
   private int timeoutSeconds;
+  private String port;
+  private String collectors;
+  private String zkQuorum;
+  private String protocol;
 
   public StormTimelineMetricsReporter() {
 
   }
 
   @Override
-  protected String getCollectorUri() {
-    return this.collectorUri;
+  protected String getCollectorUri(String host) {
+    return constructTimelineMetricUri(protocol, host, port);
+  }
+
+  @Override
+  protected String getCollectorProtocol() {
+    return protocol;
   }
 
   @Override
@@ -62,6 +71,22 @@ public class StormTimelineMetricsReporter extends 
AbstractTimelineMetricsSink
   }
 
   @Override
+  protected String getZookeeperQuorum() {
+    return zkQuorum;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostname;
+  }
+
+
+  @Override
   public void prepare(Object registrationArgument) {
     LOG.info("Preparing Storm Metrics Reporter");
     try {
@@ -75,24 +100,34 @@ public class StormTimelineMetricsReporter extends 
AbstractTimelineMetricsSink
         LOG.error("Could not identify hostname.");
         throw new RuntimeException("Could not identify hostname.", e);
       }
-      Configuration configuration = new 
Configuration("/storm-metrics2.properties");
-      String collector = 
configuration.getProperty(COLLECTOR_PROPERTY).toString();
-      timeoutSeconds = configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS) 
!= null ?
-        
Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS).toString())
 :
+      Configuration conf = new Configuration("/storm-metrics2.properties");
+
+      collectors = conf.getProperty(COLLECTOR_PROPERTY);
+      protocol = conf.getProperty(COLLECTOR_PROTOCOL) != null ? 
conf.getProperty(COLLECTOR_PROTOCOL) : "http";
+      port = conf.getProperty(COLLECTOR_PORT) != null ? 
conf.getProperty(COLLECTOR_PORT) : "6188";
+      zkQuorum = conf.getProperty(ZOOKEEPER_QUORUM) != null ? 
conf.getProperty(ZOOKEEPER_QUORUM) : null;
+
+      timeoutSeconds = conf.getProperty(METRICS_POST_TIMEOUT_SECONDS) != null ?
+        
Integer.parseInt(conf.getProperty(METRICS_POST_TIMEOUT_SECONDS).toString()) :
         DEFAULT_POST_TIMEOUT_SECONDS;
-      applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, 
DEFAULT_CLUSTER_REPORTER_APP_ID);
-      collectorUri = collector + WS_V1_TIMELINE_METRICS;
+      applicationId = conf.getProperty(CLUSTER_REPORTER_APP_ID, 
DEFAULT_CLUSTER_REPORTER_APP_ID);
+
+      collectorUri = constructTimelineMetricUri(protocol, 
findPreferredCollectHost(), port);
+
       if (collectorUri.toLowerCase().startsWith("https://";)) {
-        String trustStorePath = 
configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
-        String trustStoreType = 
configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
-        String trustStorePwd = 
configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
+        String trustStorePath = 
conf.getProperty(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
+        String trustStoreType = 
conf.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
+        String trustStorePwd = 
conf.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
         loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
       }
+
+
     } catch (Exception e) {
       LOG.warn("Could not initialize metrics collector, please specify " +
         "protocol, host, port under $STORM_HOME/conf/config.yaml ", e);
     }
-
+    // Initialize the collector write strategy
+    super.init();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
 
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 138c5a0..f8c34d5 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -63,18 +63,42 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
   private int timeoutSeconds;
   private String topologyName;
   private String applicationId;
+  private String collectors;
+  private String zkQuorum;
+  private String protocol;
+  private String port;
 
   @Override
-  protected String getCollectorUri() {
+  protected String getCollectorUri(String host) {
     return collectorUri;
   }
 
   @Override
+  protected String getCollectorProtocol() {
+    return protocol;
+  }
+
+  @Override
   protected int getTimeoutSeconds() {
     return timeoutSeconds;
   }
 
   @Override
+  protected String getZookeeperQuorum() {
+    return zkQuorum;
+  }
+
+  @Override
+  protected String getConfiguredCollectors() {
+    return collectors;
+  }
+
+  @Override
+  protected String getHostname() {
+    return hostname;
+  }
+
+  @Override
   public void prepare(Map map, Object o, TopologyContext topologyContext, 
IErrorReporter iErrorReporter) {
     LOG.info("Preparing Storm Metrics Sink");
     try {
@@ -96,8 +120,16 @@ public class StormTimelineMetricsSink extends 
AbstractTimelineMetricsSink implem
         String.valueOf(MAX_EVICTION_TIME_MILLIS)));
     applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, 
DEFAULT_CLUSTER_REPORTER_APP_ID);
     metricsCache = new TimelineMetricsCache(maxRowCacheSize, 
metricsSendInterval);
-    collectorUri = configuration.getProperty(COLLECTOR_PROPERTY) + 
WS_V1_TIMELINE_METRICS;
-    if (collectorUri.toLowerCase().startsWith("https://";)) {
+
+    collectors = configuration.getProperty(COLLECTOR_PROPERTY);
+    zkQuorum = configuration.getProperty("zookeeper.quorum");
+    protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
+    port = configuration.getProperty(COLLECTOR_PORT, "6188");
+
+    // Initialize the collector write strategy
+    super.init();
+
+    if (protocol.toLowerCase().startsWith("https://";)) {
       String trustStorePath = 
configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
       String trustStoreType = 
configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
       String trustStorePwd = 
configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();

http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
----------------------------------------------------------------------
diff --git 
a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
 
b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
index dca0c25..8e0bda6 100644
--- 
a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
+++ 
b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java
@@ -18,6 +18,18 @@
 
 package org.apache.hadoop.metrics2.sink.storm;
 
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.shade.com.google.common.collect.Lists;
+import org.junit.Ignore;
+import org.junit.Test;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import static 
org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.METRIC_NAME_PREFIX_KAFKA_OFFSET;
 import static 
org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.SYSTEM_TASK_ID;
 import static org.easymock.EasyMock.anyObject;
@@ -27,23 +39,6 @@ import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.storm.Constants;
-import org.apache.storm.shade.com.google.common.collect.Lists;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import org.apache.storm.metric.api.IMetricsConsumer;
-
 public class StormTimelineMetricsSinkTest {
   @Test
   public void testNonNumericMetricMetricExclusion() throws 
InterruptedException, IOException {

Reply via email to