AMBARI-16828. Support round-robin scheduling with failover for Sinks with distributed collector. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/954e61ee Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/954e61ee Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/954e61ee Branch: refs/heads/branch-2.5 Commit: 954e61ee07416bc20b52bfda84dde5ea57876130 Parents: 798a70e Author: Aravindan Vijayan <[email protected]> Authored: Mon Nov 14 20:34:09 2016 -0800 Committer: Aravindan Vijayan <[email protected]> Committed: Tue Nov 15 11:02:12 2016 -0800 ---------------------------------------------------------------------- .../logsearch/solr/metrics/SolrAmsClient.java | 23 +- ambari-metrics/ambari-metrics-common/pom.xml | 22 ++ .../timeline/AbstractTimelineMetricsSink.java | 272 +++++++++++++++++- .../MetricsSinkInitializationException.java | 25 ++ .../availability/MetricCollectorHAHelper.java | 96 +++++++ .../MetricCollectorUnavailableException.java | 24 ++ ...icSinkWriteShardHostnameHashingStrategy.java | 59 ++++ .../MetricSinkWriteShardStrategy.java | 24 ++ .../availability/MetricCollectorHATest.java | 149 ++++++++++ .../availability/ShardingStrategyTest.java | 52 ++++ .../cache/HandleConnectExceptionTest.java | 38 ++- .../sink/flume/FlumeTimelineMetricsSink.java | 39 ++- .../timeline/HadoopTimelineMetricsSink.java | 50 +++- .../timeline/HadoopTimelineMetricsSinkTest.java | 55 +++- .../kafka/KafkaTimelineMetricsReporter.java | 50 +++- .../kafka/KafkaTimelineMetricsReporterTest.java | 1 + .../storm/StormTimelineMetricsReporter.java | 44 ++- .../sink/storm/StormTimelineMetricsSink.java | 38 ++- .../storm/StormTimelineMetricsReporter.java | 59 +++- .../sink/storm/StormTimelineMetricsSink.java | 38 ++- .../storm/StormTimelineMetricsSinkTest.java | 29 +- .../timeline/HBaseTimelineMetricStore.java | 22 +- .../aggregators/AbstractTimelineAggregator.java | 4 +- .../TimelineMetricAggregatorFactory.java | 16 +- .../TimelineMetricClusterAggregator.java | 4 +- .../TimelineMetricClusterAggregatorSecond.java | 4 +- .../TimelineMetricHostAggregator.java | 4 +- .../v2/TimelineMetricClusterAggregator.java | 4 +- .../v2/TimelineMetricHostAggregator.java | 4 +- .../availability/AggregationTaskRunner.java | 6 +- .../MetricCollectorHAController.java | 276 +++++++++++++++++++ .../TimelineMetricHAController.java | 276 ------------------- .../webapp/TimelineWebServices.java | 8 + .../MetricCollectorHAControllerTest.java | 107 +++++++ .../TimelineMetricHAControllerTest.java | 107 ------- .../FLUME/1.4.0.2.0/package/scripts/params.py | 20 ++ .../templates/flume-metrics2.properties.j2 | 5 +- ...-metrics2-hbase.properties-GANGLIA-MASTER.j2 | 4 +- .../STORM/0.9.1/package/scripts/params_linux.py | 15 + .../templates/storm-metrics2.properties.j2 | 5 +- .../2.0.6/hooks/before-START/scripts/params.py | 21 ++ .../templates/hadoop-metrics2.properties.j2 | 17 +- 42 files changed, 1597 insertions(+), 519 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/solr/metrics/SolrAmsClient.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/solr/metrics/SolrAmsClient.java b/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/solr/metrics/SolrAmsClient.java index cdeb63d..85ea69d 100644 --- a/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/solr/metrics/SolrAmsClient.java +++ b/ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/solr/metrics/SolrAmsClient.java @@ -22,6 +22,7 @@ package org.apache.ambari.logsearch.solr.metrics; import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics; +// TODO: Refactor for failover public class SolrAmsClient extends AbstractTimelineMetricsSink { private final String collectorHost; @@ -30,7 +31,7 @@ public class SolrAmsClient extends AbstractTimelineMetricsSink { } @Override - public String getCollectorUri() { + public String getCollectorUri(String host) { return collectorHost; } @@ -40,7 +41,27 @@ public class SolrAmsClient extends AbstractTimelineMetricsSink { } @Override + protected String getZookeeperQuorum() { + return null; + } + + @Override + protected String getConfiguredCollectors() { + return null; + } + + @Override + protected String getHostname() { + return null; + } + + @Override protected boolean emitMetrics(TimelineMetrics metrics) { return super.emitMetrics(metrics); } + + @Override + protected String getCollectorProtocol() { + return null; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/pom.xml ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml index fb01c81..b972fb2 100644 --- a/ambari-metrics/ambari-metrics-common/pom.xml +++ b/ambari-metrics/ambari-metrics-common/pom.xml @@ -62,6 +62,28 @@ <artifactId>commons-logging</artifactId> <version>1.1.1</version> </dependency> + <!-- TODO: Need to add these as shaded dependencies --> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.4</version> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.2.2</version> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>14.0.1</version> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <version>2.7.1</version> + </dependency> + <!-- END TODO --> <dependency> <groupId>org.codehaus.jackson</groupId> <artifactId>jackson-xc</artifactId> http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index 5a716df..ca7ccea 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -17,8 +17,18 @@ */ package org.apache.hadoop.metrics2.sink.timeline; +import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorHAHelper; +import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorUnavailableException; +import org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardHostnameHashingStrategy; +import org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardStrategy; import org.codehaus.jackson.map.AnnotationIntrospector; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.annotate.JsonSerialize; @@ -35,9 +45,17 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; +import java.io.StringWriter; import java.net.HttpURLConnection; import java.net.URL; import java.security.KeyStore; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public abstract class AbstractTimelineMetricsSink { @@ -46,20 +64,22 @@ public abstract class AbstractTimelineMetricsSink { public static final String METRICS_SEND_INTERVAL = "sendInterval"; public static final String METRICS_POST_TIMEOUT_SECONDS = "timeout"; public static final String COLLECTOR_PROPERTY = "collector"; + public static final String COLLECTOR_PROTOCOL = "protocol"; + public static final String COLLECTOR_PORT = "port"; + public static final String ZOOKEEPER_QUORUM = "zookeeper.quorum"; public static final int DEFAULT_POST_TIMEOUT_SECONDS = 10; public static final String SKIP_COUNTER_TRANSFROMATION = "skipCounterDerivative"; public static final String RPC_METRIC_PREFIX = "metric.rpc"; - public static final String RPC_METRIC_NAME_SUFFIX = "suffix"; - public static final String RPC_METRIC_PORT_SUFFIX = "port"; - public static final String WS_V1_TIMELINE_METRICS = "/ws/v1/timeline/metrics"; - public static final String SSL_KEYSTORE_PATH_PROPERTY = "truststore.path"; public static final String SSL_KEYSTORE_TYPE_PROPERTY = "truststore.type"; public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = "truststore.password"; + public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes"; protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0); public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100; + public int ZK_CONNECT_TRY_TIME = 10000; + public int ZK_SLEEP_BETWEEN_RETRY_TIME = 2000; private SSLSocketFactory sslSocketFactory; @@ -67,6 +87,28 @@ public abstract class AbstractTimelineMetricsSink { protected static ObjectMapper mapper; + protected MetricCollectorHAHelper collectorHAHelper; + + protected MetricSinkWriteShardStrategy metricSinkWriteShardStrategy; + + // Single element cache with fixed expiration - Helps adjacent Sinks as + // well as timed refresh + protected Supplier targetCollectorHostSupplier; + + protected final List<String> allKnownLiveCollectors = new ArrayList<>(); + + private volatile boolean isInitializedForHA = false; + + @SuppressWarnings("all") + private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 5; + + private final Gson gson = new Gson(); + + private final Random rand = new Random(); + + private static final int COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES = 75; + private static final int COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES = 60; + static { mapper = new ObjectMapper(); AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(); @@ -79,6 +121,16 @@ public abstract class AbstractTimelineMetricsSink { LOG = LogFactory.getLog(this.getClass()); } + /** + * Initialize Sink write strategy with respect to HA Collector + */ + protected void init() { + metricSinkWriteShardStrategy = new MetricSinkWriteShardHostnameHashingStrategy(getHostname()); + collectorHAHelper = new MetricCollectorHAHelper(getZookeeperQuorum(), + ZK_CONNECT_TRY_TIME, ZK_SLEEP_BETWEEN_RETRY_TIME); + isInitializedForHA = true; + } + protected boolean emitMetricsJson(String connectUrl, String jsonData) { int timeout = getTimeoutSeconds() * 1000; HttpURLConnection connection = null; @@ -113,7 +165,7 @@ public abstract class AbstractTimelineMetricsSink { } } cleanupInputStream(connection.getInputStream()); - //reset failedCollectorConnectionsCounter to "0" + // reset failedCollectorConnectionsCounter to "0" failedCollectorConnectionsCounter.set(0); return true; } catch (IOException ioe) { @@ -146,7 +198,20 @@ public abstract class AbstractTimelineMetricsSink { } protected boolean emitMetrics(TimelineMetrics metrics) { - String connectUrl = getCollectorUri(); + String collectorHost; + // Get cached target + if (targetCollectorHostSupplier != null) { + collectorHost = (String) targetCollectorHostSupplier.get(); + // Last X attempts have failed - force refresh + if (failedCollectorConnectionsCounter.get() > RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER) { + targetCollectorHostSupplier = null; + collectorHost = findPreferredCollectHost(); + } + } else { + collectorHost = findPreferredCollectHost(); + } + + String connectUrl = getCollectorUri(collectorHost); String jsonData = null; try { jsonData = mapper.writeValueAsString(metrics); @@ -196,8 +261,7 @@ public abstract class AbstractTimelineMetricsSink { protected HttpsURLConnection getSSLConnection(String spec) throws IOException, IllegalStateException { - HttpsURLConnection connection = (HttpsURLConnection) (new URL(spec) - .openConnection()); + HttpsURLConnection connection = (HttpsURLConnection) (new URL(spec).openConnection()); connection.setSSLSocketFactory(sslSocketFactory); @@ -208,11 +272,7 @@ public abstract class AbstractTimelineMetricsSink { String trustStorePassword) { if (sslSocketFactory == null) { if (trustStorePath == null || trustStorePassword == null) { - - String msg = - String.format("Can't load TrustStore. " + - "Truststore path or password is not set."); - + String msg = "Can't load TrustStore. Truststore path or password is not set."; LOG.error(msg); throw new IllegalStateException(msg); } @@ -242,7 +302,191 @@ public abstract class AbstractTimelineMetricsSink { } } - abstract protected String getCollectorUri(); + /** + * Find appropriate write shard for this sink using the {@link org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardStrategy} + * + * 1. Use configured collector(s) to discover available collectors + * 2. If configured collector(s) are unresponsive check Zookeeper to find live hosts + * 3. Refresh known collector list using ZK + * 4. Default: Return configured collector with no side effect due to discovery. + * + * throws {#link MetricsSinkInitializationException} if called before + * initialization, not other side effect + * + * @return String Collector hostname + */ + protected synchronized String findPreferredCollectHost() { + if (!isInitializedForHA) { + init(); + } + + // Auto expire and re-calculate after 1 hour + if (targetCollectorHostSupplier != null) { + Object targetCollector = targetCollectorHostSupplier.get(); + if (targetCollector != null) { + return (String) targetCollector; + } + } + + String configuredCollectors = getConfiguredCollectors(); + // Reach out to all configured collectors before Zookeeper + if (configuredCollectors != null && !configuredCollectors.isEmpty()) { + String collectorHosts = getConfiguredCollectors(); + if (!collectorHosts.isEmpty()) { + String[] hosts = collectorHosts.split(","); + for (String hostPortStr : hosts) { + if (hostPortStr != null && !hostPortStr.isEmpty()) { + String[] hostPortPair = hostPortStr.split(":"); + if (hostPortPair.length < 2) { + LOG.warn("Collector port is missing from the configuration."); + continue; + } + String hostStr = hostPortPair[0].trim(); + String portStr = hostPortPair[1].trim(); + // Check liveliness and get known instances + try { + Collection<String> liveHosts = findLiveCollectorHostsFromKnownCollector(hostStr, portStr); + // Update live Hosts - current host will already be a part of this + for (String host : liveHosts) { + allKnownLiveCollectors.add(host); + } + } catch (MetricCollectorUnavailableException e) { + allKnownLiveCollectors.remove(hostStr); + LOG.info("Collector " + hostStr + " is not longer live. Removing " + + "it from list of know live collector hosts : " + allKnownLiveCollectors); + } + } + } + } + } + + // Lookup Zookeeper for live hosts - max 10 seconds wait time + if (allKnownLiveCollectors.size() == 0) { + allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode()); + } + + if (allKnownLiveCollectors.size() != 0) { + targetCollectorHostSupplier = Suppliers.memoizeWithExpiration( + new Supplier() { + @Override + public Object get() { + return metricSinkWriteShardStrategy.findCollectorShard(new ArrayList<>(allKnownLiveCollectors)); + } + }, // random.nextInt(max - min + 1) + min # (60 to 75 minutes) + rand.nextInt(COLLECTOR_HOST_CACHE_MAX_EXPIRATION_MINUTES + - COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES + 1) + + COLLECTOR_HOST_CACHE_MIN_EXPIRATION_MINUTES, + TimeUnit.MINUTES + ); + + return (String) targetCollectorHostSupplier.get(); + } + return null; + } + + Collection<String> findLiveCollectorHostsFromKnownCollector(String host, String port) throws MetricCollectorUnavailableException { + List<String> collectors = new ArrayList<>(); + HttpURLConnection connection = null; + StringBuilder sb = new StringBuilder(getCollectorProtocol()); + sb.append("://"); + sb.append(host); + sb.append(":"); + sb.append(port); + sb.append(COLLECTOR_LIVE_NODES_PATH); + String connectUrl = sb.toString(); + LOG.debug("Requesting live collector nodes : " + connectUrl); + try { + connection = getCollectorProtocol().startsWith("https") ? + getSSLConnection(connectUrl) : getConnection(connectUrl); + + connection.setRequestMethod("GET"); + // 5 seconds for this op is plenty of wait time + connection.setConnectTimeout(3000); + connection.setReadTimeout(2000); + + int responseCode = connection.getResponseCode(); + if (responseCode == 200) { + try (InputStream in = connection.getInputStream()) { + StringWriter writer = new StringWriter(); + IOUtils.copy(in, writer); + try { + collectors = gson.fromJson(writer.toString(), new TypeToken<List<String>>(){}.getType()); + } catch (JsonSyntaxException jse) { + // Swallow this at the behest of still trying to POST + LOG.debug("Exception deserializing the json data on live " + + "collector nodes.", jse); + } + } + } + + } catch (IOException ioe) { + StringBuilder errorMessage = + new StringBuilder("Unable to connect to collector, " + connectUrl); + try { + if ((connection != null)) { + errorMessage.append(cleanupInputStream(connection.getErrorStream())); + } + } catch (IOException e) { + //NOP + } + LOG.debug(errorMessage); + LOG.debug(ioe); + String warnMsg = "Unable to connect to collector to find live nodes."; + LOG.warn(warnMsg, ioe); + throw new MetricCollectorUnavailableException(warnMsg); + } + return collectors; + } + + // Constructing without UriBuilder to avoid unfavorable httpclient + // dependencies + protected String constructTimelineMetricUri(String protocol, String host, String port) { + StringBuilder sb = new StringBuilder(protocol); + sb.append("://"); + sb.append(host); + sb.append(":"); + sb.append(port); + sb.append(WS_V1_TIMELINE_METRICS); + return sb.toString(); + } + + protected String constructContainerMetricUri(String protocol, String host, String port) { + StringBuilder sb = new StringBuilder(protocol); + sb.append("://"); + sb.append(host); + sb.append(":"); + sb.append(port); + sb.append(WS_V1_TIMELINE_METRICS); + return sb.toString(); + } + /** + * Get a pre-formatted URI for the collector + */ + abstract protected String getCollectorUri(String host); + + abstract protected String getCollectorProtocol(); + + /** + * How soon to timeout on the emit calls in seconds. + */ abstract protected int getTimeoutSeconds(); + + /** + * Get the zookeeper quorum for the cluster used to find collector + * @return String "host1:port1,host2:port2" + */ + abstract protected String getZookeeperQuorum(); + + /** + * Get pre-configured list of collectors available + * @return String "host1:port,host2:port" + */ + abstract protected String getConfiguredCollectors(); + + /** + * Get hostname used for calculating write shard. + * @return String "host1" + */ + abstract protected String getHostname(); } http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricsSinkInitializationException.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricsSinkInitializationException.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricsSinkInitializationException.java new file mode 100644 index 0000000..5760b34 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricsSinkInitializationException.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline; + +public class MetricsSinkInitializationException extends RuntimeException { + // Default constructor + public MetricsSinkInitializationException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java new file mode 100644 index 0000000..7b13362 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline.availability; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.curator.CuratorZookeeperClient; +import org.apache.curator.RetryLoop; +import org.apache.curator.RetryPolicy; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.retry.RetryUntilElapsed; +import org.apache.zookeeper.ZooKeeper; + +import java.net.HttpURLConnection; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Callable; + +/** + * Find a live Collector instance from Zookeeper + * This class allows connect to ZK on-demand and + * does not add a watcher on the znode. + */ +public class MetricCollectorHAHelper { + private final String zookeeperQuorum; + private final int tryTime; + private final int sleepMsBetweenRetries; + + private static final int CONNECTION_TIMEOUT = 2000; + private static final int SESSION_TIMEOUT = 5000; + private static final String ZK_PATH = "/ambari-metrics-cluster/LIVEINSTANCES"; + private static final String INSTANCE_NAME_DELIMITER = "_"; + + + + private static final Log LOG = LogFactory.getLog(MetricCollectorHAHelper.class); + + public MetricCollectorHAHelper(String zookeeperQuorum, int tryTime, int sleepMsBetweenRetries) { + this.zookeeperQuorum = zookeeperQuorum; + this.tryTime = tryTime; + this.sleepMsBetweenRetries = sleepMsBetweenRetries; + } + + /** + * Connect to Zookeeper to find live instances of metrics collector + * @return {#link Collection} hostnames + */ + public Collection<String> findLiveCollectorHostsFromZNode() { + Set<String> collectors = new HashSet<>(); + + RetryPolicy retryPolicy = new RetryUntilElapsed(tryTime, sleepMsBetweenRetries); + final CuratorZookeeperClient client = new CuratorZookeeperClient(zookeeperQuorum, + SESSION_TIMEOUT, CONNECTION_TIMEOUT, null, retryPolicy); + + String liveInstances = null; + + try { + liveInstances = RetryLoop.callWithRetry(client, new Callable<String>() { + @Override + public String call() throws Exception { + ZooKeeper zookeeper = client.getZooKeeper(); + byte[] data = zookeeper.getData(ZK_PATH, null, null); + return data != null ? new String(data) : null; + } + }); + } catch (Exception e) { + LOG.warn("Unable to connect to zookeeper.", e); + LOG.debug(e); + } + + // [ambari-sid-3.c.pramod-thangali.internal_12001] + if (liveInstances != null && !liveInstances.isEmpty()) { + for (String instanceStr : liveInstances.split(",")) { + collectors.add(instanceStr.substring(0, instanceStr.indexOf(INSTANCE_NAME_DELIMITER))); + } + } + + return collectors; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorUnavailableException.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorUnavailableException.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorUnavailableException.java new file mode 100644 index 0000000..c381bbb --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorUnavailableException.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline.availability; + +public class MetricCollectorUnavailableException extends Exception { + public MetricCollectorUnavailableException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java new file mode 100644 index 0000000..1c89884 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardHostnameHashingStrategy.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline.availability; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import java.util.List; + +/** + * Provides sharding based on hostname + */ +public class MetricSinkWriteShardHostnameHashingStrategy implements MetricSinkWriteShardStrategy { + private final String hostname; + private final long hostnameHash; + private static final Log LOG = LogFactory.getLog(MetricSinkWriteShardHostnameHashingStrategy.class); + + public MetricSinkWriteShardHostnameHashingStrategy(String hostname) { + this.hostname = hostname; + this.hostnameHash = hostname != null ? computeHash(hostname) : 1000; // some constant + } + + @Override + public String findCollectorShard(List<String> collectorHosts) { + int index = (int) (hostnameHash % collectorHosts.size()); + String collectorHost = collectorHosts.get(index); + LOG.info(String.format("Calculated collector shard %s based on hostname: %s", collectorHost, hostname)); + return collectorHost; + } + + /** + * Compute consistent hash based on hostname which should give decently + * uniform distribution assuming hostname generally have a sequential + * numeric suffix. + */ + long computeHash(String hostname) { + long h = 11987L; // prime + int len = hostname.length(); + + for (int i = 0; i < len; i++) { + h = 31 * h + hostname.charAt(i); + } + return h; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardStrategy.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardStrategy.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardStrategy.java new file mode 100644 index 0000000..7619555 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricSinkWriteShardStrategy.java @@ -0,0 +1,24 @@ +package org.apache.hadoop.metrics2.sink.timeline.availability; + +import java.util.List; + +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface MetricSinkWriteShardStrategy { + String findCollectorShard(List<String> collectorHosts); +} http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java new file mode 100644 index 0000000..7fadeb2 --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline.availability; + +import com.google.gson.Gson; +import junit.framework.Assert; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.ArrayList; +import java.util.HashSet; +import static org.easymock.EasyMock.expect; +import static org.powermock.api.easymock.PowerMock.createNiceMock; +import static org.powermock.api.easymock.PowerMock.expectNew; +import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.powermock.api.easymock.PowerMock.verifyAll; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({AbstractTimelineMetricsSink.class, URL.class, HttpURLConnection.class}) +public class MetricCollectorHATest { + + @Test + public void findCollectorUsingZKTest() throws Exception { + InputStream is = createNiceMock(InputStream.class); + HttpURLConnection connection = createNiceMock(HttpURLConnection.class); + URL url = createNiceMock(URL.class); + MetricCollectorHAHelper haHelper = createNiceMock(MetricCollectorHAHelper.class); + + expectNew(URL.class, "http://localhost:2181/ws/v1/timeline/metrics/livenodes").andReturn(url).anyTimes(); + expect(url.openConnection()).andReturn(connection).anyTimes(); + expect(connection.getInputStream()).andReturn(is).anyTimes(); + expect(connection.getResponseCode()).andThrow(new IOException()).anyTimes(); + expect(haHelper.findLiveCollectorHostsFromZNode()).andReturn( + new ArrayList<String>() {{ add("h2"); add("h3"); }}); + + replayAll(); + TestTimelineMetricsSink sink = new TestTimelineMetricsSink(haHelper); + sink.init(); + + String host = sink.findPreferredCollectHost(); + + verifyAll(); + + Assert.assertNotNull(host); + Assert.assertEquals("h2", host); + + } + + @Test + public void findCollectorUsingKnownCollectorTest() throws Exception { + HttpURLConnection connection = createNiceMock(HttpURLConnection.class); + URL url = createNiceMock(URL.class); + MetricCollectorHAHelper haHelper = createNiceMock(MetricCollectorHAHelper.class); + + Gson gson = new Gson(); + ArrayList<String> output = new ArrayList<>(); + output.add("h1"); + output.add("h2"); + output.add("h3"); + InputStream is = IOUtils.toInputStream(gson.toJson(output)); + + expectNew(URL.class, "http://localhost:2181/ws/v1/timeline/metrics/livenodes").andReturn(url).anyTimes(); + expect(url.openConnection()).andReturn(connection).anyTimes(); + expect(connection.getInputStream()).andReturn(is).anyTimes(); + expect(connection.getResponseCode()).andReturn(200).anyTimes(); + + replayAll(); + TestTimelineMetricsSink sink = new TestTimelineMetricsSink(haHelper); + sink.init(); + + String host = sink.findPreferredCollectHost(); + Assert.assertNotNull(host); + Assert.assertEquals("h3", host); + + verifyAll(); + } + + private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink { + MetricCollectorHAHelper testHelper; + + TestTimelineMetricsSink(MetricCollectorHAHelper haHelper) { + testHelper = haHelper; + } + + @Override + protected void init() { + super.init(); + this.collectorHAHelper = testHelper; + } + + @Override + protected synchronized String findPreferredCollectHost() { + return super.findPreferredCollectHost(); + } + + @Override + protected String getCollectorUri(String host) { + return null; + } + + @Override + protected String getCollectorProtocol() { + return "http"; + } + + @Override + protected int getTimeoutSeconds() { + return 10; + } + + @Override + protected String getZookeeperQuorum() { + return "localhost:2181"; + } + + @Override + protected String getConfiguredCollectors() { + return "localhost:2181"; + } + + @Override + protected String getHostname() { + return "h1"; + } + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/ShardingStrategyTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/ShardingStrategyTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/ShardingStrategyTest.java new file mode 100644 index 0000000..c6041db --- /dev/null +++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/ShardingStrategyTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.sink.timeline.availability; + +import junit.framework.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class ShardingStrategyTest { + @Test + public void testHostnameShardingStrategy() throws Exception { + List<String> collectorHosts = new ArrayList<String>() {{ + add("mycollector-1.hostname.domain"); + add("mycollector-2.hostname.domain"); + }}; + + String hostname1 = "some-very-long-hostname-with-a-trailing-number-identifier-10.mylocalhost.domain"; + + // Consistency check + String collectorShard1 = null; + for (int i = 0; i < 100; i++) { + MetricSinkWriteShardStrategy strategy = new MetricSinkWriteShardHostnameHashingStrategy(hostname1); + collectorShard1 = strategy.findCollectorShard(collectorHosts); + Assert.assertEquals(collectorShard1, strategy.findCollectorShard(collectorHosts)); + } + + // Shard 2 hosts + String hostname2 = "some-very-long-hostname-with-a-trailing-number-identifier-20.mylocalhost.domain"; + MetricSinkWriteShardStrategy strategy = new MetricSinkWriteShardHostnameHashingStrategy(hostname2); + String collectorShard2 = strategy.findCollectorShard(collectorHosts); + + Assert.assertEquals("mycollector-1.hostname.domain", collectorShard1); + Assert.assertEquals("mycollector-2.hostname.domain", collectorShard2); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java index a192802..ccaa574 100644 --- a/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java +++ b/ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java @@ -33,14 +33,14 @@ import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.expect; import static org.powermock.api.easymock.PowerMock.createNiceMock; import static org.powermock.api.easymock.PowerMock.expectNew; import static org.powermock.api.easymock.PowerMock.replayAll; @RunWith(PowerMockRunner.class) -@PrepareForTest({AbstractTimelineMetricsSink.class, URL.class, - HttpURLConnection.class}) +@PrepareForTest({AbstractTimelineMetricsSink.class, URL.class, HttpURLConnection.class}) public class HandleConnectExceptionTest { private static final String COLLECTOR_URL = "collector"; private TestTimelineMetricsSink sink; @@ -53,7 +53,7 @@ public class HandleConnectExceptionTest { URL url = createNiceMock(URL.class); AbstractTimelineMetricsSink.NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 2; try { - expectNew(URL.class, "collector").andReturn(url).anyTimes(); + expectNew(URL.class, anyString()).andReturn(url).anyTimes(); expect(url.openConnection()).andReturn(connection).anyTimes(); expect(connection.getOutputStream()).andReturn(os).anyTimes(); expect(connection.getResponseCode()).andThrow(new IOException()).anyTimes(); @@ -79,27 +79,51 @@ public class HandleConnectExceptionTest { try{ sink.emitMetrics(timelineMetrics); Assert.fail(); - }catch(UnableToConnectException e){ + } catch (UnableToConnectException e){ Assert.assertEquals(COLLECTOR_URL, e.getConnectUrl()); - }catch(Exception e){ + } catch (Exception e){ + e.printStackTrace(); Assert.fail(e.getMessage()); } } - class TestTimelineMetricsSink extends AbstractTimelineMetricsSink{ + private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink{ @Override - protected String getCollectorUri() { + protected String getCollectorUri(String host) { return COLLECTOR_URL; } @Override + protected String getCollectorProtocol() { + return "http"; + } + + @Override protected int getTimeoutSeconds() { return 10; } @Override + protected String getZookeeperQuorum() { + return "localhost:2181"; + } + + @Override + protected String getConfiguredCollectors() { + return "localhost:2181"; + } + + @Override + protected String getHostname() { + return "h1"; + } + + @Override public boolean emitMetrics(TimelineMetrics metrics) { + super.init(); return super.emitMetrics(metrics); } + + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java index 3040c48..1b36e9a 100644 --- a/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java @@ -46,6 +46,7 @@ import java.util.concurrent.TimeUnit; public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService { private String collectorUri; + private String protocol; // Key - component(instance_id) private Map<String, TimelineMetricsCache> metricsCaches; private int maxRowCacheSize; @@ -53,6 +54,9 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem private ScheduledExecutorService scheduledExecutorService; private long pollFrequency; private String hostname; + private String port; + private String collectors; + private String zookeeperQuorum; private final static String COUNTER_METRICS_PROPERTY = "counters"; private final Set<String> counterMetrics = new HashSet<String>(); private int timeoutSeconds = 10; @@ -95,8 +99,15 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL, String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS))); metricsCaches = new HashMap<String, TimelineMetricsCache>(); - collectorUri = configuration.getProperty(COLLECTOR_PROPERTY) + WS_V1_TIMELINE_METRICS; - if (collectorUri.toLowerCase().startsWith("https://")) { + collectors = configuration.getProperty(COLLECTOR_PROPERTY); + zookeeperQuorum = configuration.getProperty("zookeeper.quorum"); + protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http"); + port = configuration.getProperty(COLLECTOR_PORT, "6188"); + // Initialize the collector write strategy + super.init(); + + collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port); + if (protocol.contains("https")) { String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim(); String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim(); String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim(); @@ -109,8 +120,13 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem } @Override - public String getCollectorUri() { - return collectorUri; + public String getCollectorUri(String host) { + return constructTimelineMetricUri(protocol, host, port); + } + + @Override + protected String getCollectorProtocol() { + return protocol; } @Override @@ -118,6 +134,21 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem return timeoutSeconds; } + @Override + protected String getZookeeperQuorum() { + return zookeeperQuorum; + } + + @Override + protected String getConfiguredCollectors() { + return collectors; + } + + @Override + protected String getHostname() { + return hostname; + } + public void setPollFrequency(long pollFrequency) { this.pollFrequency = pollFrequency; } http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java index 65f93f9..8e78e6f 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java @@ -28,14 +28,9 @@ import org.apache.hadoop.metrics2.MetricsSink; import org.apache.hadoop.metrics2.MetricsTag; import org.apache.hadoop.metrics2.impl.MsInfo; import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; -import org.apache.hadoop.metrics2.util.Servers; import org.apache.hadoop.net.DNS; - import java.io.Closeable; import java.io.IOException; -import java.io.OutputStream; -import java.net.HttpURLConnection; -import java.net.SocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; @@ -57,9 +52,11 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple private TimelineMetricsCache metricsCache; private String hostName = "UNKNOWN.example.com"; private String serviceName = ""; - private List<? extends SocketAddress> metricsServers; + private String collectors; private String collectorUri; private String containerMetricsUri; + private String protocol; + private String port; public static final String WS_V1_CONTAINER_METRICS = "/ws/v1/timeline/containermetrics"; private static final String SERVICE_NAME_PREFIX = "serviceName-prefix"; @@ -99,16 +96,21 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple serviceName = getServiceName(conf); LOG.info("Identified hostname = " + hostName + ", serviceName = " + serviceName); + // Initialize the collector write strategy + super.init(); // Load collector configs - metricsServers = Servers.parse(conf.getString(COLLECTOR_PROPERTY), 6188); + protocol = conf.getString(COLLECTOR_PROTOCOL, "http"); + collectors = conf.getString(COLLECTOR_PROPERTY, "").trim(); + port = conf.getString(COLLECTOR_PORT, "6188"); - if (metricsServers == null || metricsServers.isEmpty()) { + if (StringUtils.isEmpty(collectors)) { LOG.error("No Metric collector configured."); } else { - collectorUri = conf.getString(COLLECTOR_PROPERTY).trim() + WS_V1_TIMELINE_METRICS; - containerMetricsUri = conf.getString(COLLECTOR_PROPERTY).trim() + WS_V1_CONTAINER_METRICS; - if (collectorUri.toLowerCase().startsWith("https://")) { + String preferredCollectorHost = findPreferredCollectHost(); + collectorUri = constructTimelineMetricUri(protocol, preferredCollectorHost, port); + containerMetricsUri = constructContainerMetricUri(protocol, preferredCollectorHost, port); + if (protocol.contains("https")) { String trustStorePath = conf.getString(SSL_KEYSTORE_PATH_PROPERTY).trim(); String trustStoreType = conf.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim(); String trustStorePwd = conf.getString(SSL_KEYSTORE_PASSWORD_PROPERTY).trim(); @@ -163,6 +165,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple } } } + if (!rpcPortSuffixes.isEmpty()) { LOG.info("RPC port properties configured: " + rpcPortSuffixes); } @@ -190,8 +193,13 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple } @Override - protected String getCollectorUri() { - return collectorUri; + protected String getCollectorUri(String host) { + return constructTimelineMetricUri(protocol, host, port); + } + + @Override + protected String getCollectorProtocol() { + return protocol; } @Override @@ -200,6 +208,21 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple } @Override + protected String getZookeeperQuorum() { + return conf.getString(ZOOKEEPER_QUORUM); + } + + @Override + protected String getConfiguredCollectors() { + return collectors; + } + + @Override + protected String getHostname() { + return hostName; + } + + @Override public void putMetrics(MetricsRecord record) { try { String recordName = record.name(); @@ -384,6 +407,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple LOG.error("Unable to parse container metrics ", e); } if (jsonData != null) { + // TODO: Container metrics should be able to utilize failover mechanism emitMetricsJson(containerMetricsUri, jsonData); } } http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java index 4410402..5f22065 100644 --- a/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java @@ -18,12 +18,15 @@ package org.apache.hadoop.metrics2.sink.timeline; +import com.google.gson.Gson; import org.apache.commons.configuration.SubsetConfiguration; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.metrics2.MetricType; import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsRecord; import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorHAHelper; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.easymock.EasyMock; @@ -32,10 +35,13 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import java.io.InputStream; import java.io.OutputStream; +import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; import java.util.Arrays; @@ -46,11 +52,14 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PORT; import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PROPERTY; +import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PROTOCOL; import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE; import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL; import static org.easymock.EasyMock.anyInt; import static org.easymock.EasyMock.anyObject; +import static org.easymock.EasyMock.anyString; import static org.easymock.EasyMock.createMockBuilder; import static org.easymock.EasyMock.createNiceMock; import static org.easymock.EasyMock.eq; @@ -58,9 +67,14 @@ import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; +import static org.powermock.api.easymock.PowerMock.expectNew; +import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.powermock.api.easymock.PowerMock.verifyAll; @RunWith(PowerMockRunner.class) +@PrepareForTest({AbstractTimelineMetricsSink.class, HttpURLConnection.class}) public class HadoopTimelineMetricsSinkTest { + Gson gson = new Gson(); @Before public void setup() { @@ -68,16 +82,28 @@ public class HadoopTimelineMetricsSinkTest { } @Test - @PrepareForTest({URL.class, OutputStream.class}) + @PrepareForTest({URL.class, OutputStream.class, AbstractTimelineMetricsSink.class, HttpURLConnection.class}) public void testPutMetrics() throws Exception { HadoopTimelineMetricsSink sink = new HadoopTimelineMetricsSink(); + HttpURLConnection connection = PowerMock.createNiceMock(HttpURLConnection.class); + URL url = PowerMock.createNiceMock(URL.class); + InputStream is = IOUtils.toInputStream(gson.toJson(Collections.singletonList("localhost"))); + expectNew(URL.class, anyString()).andReturn(url).anyTimes(); + expect(url.openConnection()).andReturn(connection).anyTimes(); + expect(connection.getInputStream()).andReturn(is).anyTimes(); + expect(connection.getResponseCode()).andReturn(200).anyTimes(); + OutputStream os = PowerMock.createNiceMock(OutputStream.class); + expect(connection.getOutputStream()).andReturn(os).anyTimes(); + SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class); - expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes(); + expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes(); expect(conf.getParent()).andReturn(null).anyTimes(); expect(conf.getPrefix()).andReturn("service").anyTimes(); - expect(conf.getString(eq(COLLECTOR_PROPERTY))).andReturn("localhost:63188").anyTimes(); + expect(conf.getString(eq(COLLECTOR_PROPERTY), eq(""))).andReturn("localhost:6188").anyTimes(); expect(conf.getString(eq("serviceName-prefix"), eq(""))).andReturn("").anyTimes(); + expect(conf.getString(eq(COLLECTOR_PROTOCOL), eq("http"))).andReturn("http").anyTimes(); + expect(conf.getString(eq(COLLECTOR_PORT), eq("6188"))).andReturn("6188").anyTimes(); expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes(); expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(1000).anyTimes(); @@ -121,6 +147,7 @@ public class HadoopTimelineMetricsSinkTest { expect(record.metrics()).andReturn(Arrays.asList(metric)).anyTimes(); replay(conf, record, metric); + replayAll(); sink.init(conf); @@ -130,7 +157,7 @@ public class HadoopTimelineMetricsSinkTest { sink.putMetrics(record); - verify(conf, record, metric); + verifyAll(); } @Test @@ -138,20 +165,26 @@ public class HadoopTimelineMetricsSinkTest { HadoopTimelineMetricsSink sink = createMockBuilder(HadoopTimelineMetricsSink.class) .withConstructor().addMockedMethod("appendPrefix") + .addMockedMethod("findLiveCollectorHostsFromKnownCollector") .addMockedMethod("emitMetrics").createNiceMock(); SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class); - expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes(); + expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes(); expect(conf.getParent()).andReturn(null).anyTimes(); expect(conf.getPrefix()).andReturn("service").anyTimes(); - expect(conf.getString(eq(COLLECTOR_PROPERTY))).andReturn("localhost:63188").anyTimes(); + expect(conf.getString(eq(COLLECTOR_PROPERTY), eq(""))).andReturn("localhost:6188").anyTimes(); expect(conf.getString(eq("serviceName-prefix"), eq(""))).andReturn("").anyTimes(); + expect(conf.getString(eq(COLLECTOR_PROTOCOL), eq("http"))).andReturn("http").anyTimes(); + expect(conf.getString(eq(COLLECTOR_PORT), eq("6188"))).andReturn("6188").anyTimes(); expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes(); // Return eviction time smaller than time diff for first 3 entries // Third entry will result in eviction expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes(); + expect(sink.findLiveCollectorHostsFromKnownCollector("localhost", "6188")) + .andReturn(Collections.singletonList("localhost")).anyTimes(); + conf.setListDelimiter(eq(',')); expectLastCall().anyTimes(); @@ -260,14 +293,20 @@ public class HadoopTimelineMetricsSinkTest { HadoopTimelineMetricsSink sink = createMockBuilder(HadoopTimelineMetricsSink.class) .withConstructor().addMockedMethod("appendPrefix") + .addMockedMethod("findLiveCollectorHostsFromKnownCollector") .addMockedMethod("emitMetrics").createNiceMock(); SubsetConfiguration conf = createNiceMock(SubsetConfiguration.class); - expect(conf.getString(eq("slave.host.name"))).andReturn("testhost").anyTimes(); + expect(conf.getString("slave.host.name")).andReturn("localhost").anyTimes(); expect(conf.getParent()).andReturn(null).anyTimes(); expect(conf.getPrefix()).andReturn("service").anyTimes(); - expect(conf.getString(eq(COLLECTOR_PROPERTY))).andReturn("localhost:63188").anyTimes(); + expect(conf.getString(eq(COLLECTOR_PROPERTY), eq(""))).andReturn("localhost:6188").anyTimes(); expect(conf.getString(eq("serviceName-prefix"), eq(""))).andReturn("").anyTimes(); + expect(conf.getString(eq(COLLECTOR_PROTOCOL), eq("http"))).andReturn("http").anyTimes(); + expect(conf.getString(eq(COLLECTOR_PORT), eq("6188"))).andReturn("6188").anyTimes(); + + expect(sink.findLiveCollectorHostsFromKnownCollector("localhost", "6188")) + .andReturn(Collections.singletonList("localhost")).anyTimes(); expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes(); expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes(); http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java index d6d251c..11a1c75 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java @@ -71,14 +71,18 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink private static final String TIMELINE_DEFAULT_PORT = "6188"; private static final String TIMELINE_DEFAULT_PROTOCOL = "http"; - private boolean initialized = false; + private volatile boolean initialized = false; private boolean running = false; private final Object lock = new Object(); private String collectorUri; private String hostname; + private String metricCollectorPort; + private String collectors; + private String metricCollectorProtocol; private TimelineScheduledReporter reporter; private TimelineMetricsCache metricsCache; private int timeoutSeconds = 10; + private String zookeeperQuorum; private String[] excludedMetricsPrefixes; private String[] includedMetricsPrefixes; @@ -86,8 +90,13 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink private Set<String> excludedMetrics = new HashSet<>(); @Override - protected String getCollectorUri() { - return collectorUri; + protected String getCollectorUri(String host) { + return constructTimelineMetricUri(metricCollectorProtocol, host, metricCollectorPort); + } + + @Override + protected String getCollectorProtocol() { + return metricCollectorProtocol; } @Override @@ -95,10 +104,26 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink return timeoutSeconds; } + @Override + protected String getZookeeperQuorum() { + return zookeeperQuorum; + } + + @Override + protected String getConfiguredCollectors() { + return collectors; + } + + @Override + protected String getHostname() { + return hostname; + } + public void setMetricsCache(TimelineMetricsCache metricsCache) { this.metricsCache = metricsCache; } + @Override public void init(VerifiableProperties props) { synchronized (lock) { if (!initialized) { @@ -113,26 +138,33 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink LOG.error("Could not identify hostname."); throw new RuntimeException("Could not identify hostname.", e); } + // Initialize the collector write strategy + super.init(); + KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props); timeoutSeconds = props.getInt(METRICS_POST_TIMEOUT_SECONDS, DEFAULT_POST_TIMEOUT_SECONDS); int metricsSendInterval = props.getInt(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY, MAX_EVICTION_TIME_MILLIS); int maxRowCacheSize = props.getInt(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY, MAX_RECS_PER_NAME_DEFAULT); + + zookeeperQuorum = props.getString("zookeeper.connect"); + collectors = props.getString(TIMELINE_HOST_PROPERTY, TIMELINE_DEFAULT_HOST); + metricCollectorProtocol = props.getString(TIMELINE_PROTOCOL_PROPERTY, TIMELINE_DEFAULT_PROTOCOL); + String metricCollectorHost = props.getString(TIMELINE_HOST_PROPERTY, TIMELINE_DEFAULT_HOST); - String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT); - String metricCollectorProtocol = props.getString(TIMELINE_PROTOCOL_PROPERTY, TIMELINE_DEFAULT_PROTOCOL); + metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT); + setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval)); - collectorUri = metricCollectorProtocol + "://" + metricCollectorHost + - ":" + metricCollectorPort + WS_V1_TIMELINE_METRICS; + collectorUri = constructTimelineMetricUri(metricCollectorProtocol, + metricCollectorHost, metricCollectorPort); - if (collectorUri.toLowerCase().startsWith("https://")) { + if (metricCollectorProtocol.contains("https")) { String trustStorePath = props.getString(SSL_KEYSTORE_PATH_PROPERTY).trim(); String trustStoreType = props.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim(); String trustStorePwd = props.getString(SSL_KEYSTORE_PASSWORD_PROPERTY).trim(); loadTruststore(trustStorePath, trustStoreType, trustStorePwd); } - // Exclusion policy String excludedMetricsStr = props.getString(EXCLUDED_METRICS_PROPERTY, ""); if (!StringUtils.isEmpty(excludedMetricsStr.trim())) { http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java index e0adb4b..9027716 100644 --- a/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java +++ b/ambari-metrics/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java @@ -76,6 +76,7 @@ public class KafkaTimelineMetricsReporterTest { list.add(meter); list.add(timer); Properties properties = new Properties(); + properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("kafka.timeline.metrics.sendInterval", "5900"); properties.setProperty("kafka.timeline.metrics.maxRowCacheSize", "10000"); properties.setProperty("kafka.timeline.metrics.host", "localhost"); http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java index ab5f1e4..4294837 100644 --- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java @@ -47,22 +47,46 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink private NimbusClient nimbusClient; private String applicationId; private int timeoutSeconds; + private String port; + private String collectors; + private String zkQuorum; + private String protocol; public StormTimelineMetricsReporter() { } @Override - protected String getCollectorUri() { + protected String getCollectorUri(String host) { return this.collectorUri; } @Override + protected String getCollectorProtocol() { + return protocol; + } + + @Override protected int getTimeoutSeconds() { return timeoutSeconds; } @Override + protected String getZookeeperQuorum() { + return zkQuorum; + } + + @Override + protected String getConfiguredCollectors() { + return collectors; + } + + @Override + protected String getHostname() { + return hostname; + } + + @Override public void prepare(Map conf) { LOG.info("Preparing Storm Metrics Reporter"); try { @@ -80,18 +104,24 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink Map cf = (Map) conf.get(METRICS_COLLECTOR_CATEGORY); Map stormConf = Utils.readStormConfig(); this.nimbusClient = NimbusClient.getConfiguredClient(stormConf); - String collector = cf.get(COLLECTOR_PROPERTY).toString(); - timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ? - Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) : - DEFAULT_POST_TIMEOUT_SECONDS; - applicationId = cf.get(APP_ID).toString(); - collectorUri = collector + WS_V1_TIMELINE_METRICS; + + collectors = cf.get(COLLECTOR_PROPERTY).toString(); + protocol = cf.get(COLLECTOR_PROTOCOL) != null ? cf.get(COLLECTOR_PROTOCOL).toString() : "http"; + port = cf.get(COLLECTOR_PORT) != null ? cf.get(COLLECTOR_PORT).toString() : "6188"; + zkQuorum = cf.get(ZOOKEEPER_QUORUM) != null ? cf.get(ZOOKEEPER_QUORUM).toString() : null; + if (collectorUri.toLowerCase().startsWith("https://")) { String trustStorePath = cf.get(SSL_KEYSTORE_PATH_PROPERTY).toString().trim(); String trustStoreType = cf.get(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim(); String trustStorePwd = cf.get(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim(); loadTruststore(trustStorePath, trustStoreType, trustStorePwd); } + + timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ? + Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) : + DEFAULT_POST_TIMEOUT_SECONDS; + applicationId = cf.get(APP_ID).toString(); + } catch (Exception e) { LOG.warn("Could not initialize metrics collector, please specify " + "protocol, host, port under $STORM_HOME/conf/config.yaml ", e); http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index ea05491..80f0333 100644 --- a/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -55,18 +55,42 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem private int timeoutSeconds; private String topologyName; private String applicationId; + private String collectors; + private String zkQuorum; + private String protocol; + private String port; @Override - protected String getCollectorUri() { + protected String getCollectorUri(String host) { return collectorUri; } @Override + protected String getCollectorProtocol() { + return protocol; + } + + @Override protected int getTimeoutSeconds() { return timeoutSeconds; } @Override + protected String getZookeeperQuorum() { + return zkQuorum; + } + + @Override + protected String getConfiguredCollectors() { + return collectors; + } + + @Override + protected String getHostname() { + return hostname; + } + + @Override public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) { LOG.info("Preparing Storm Metrics Sink"); try { @@ -88,8 +112,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem String.valueOf(MAX_EVICTION_TIME_MILLIS))); applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID); metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval); - collectorUri = configuration.getProperty(COLLECTOR_PROPERTY) + WS_V1_TIMELINE_METRICS; - if (collectorUri.toLowerCase().startsWith("https://")) { + + collectors = configuration.getProperty(COLLECTOR_PROPERTY); + zkQuorum = configuration.getProperty("zookeeper.quorum"); + protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http"); + port = configuration.getProperty(COLLECTOR_PORT, "6188"); + + // Initialize the collector write strategy + super.init(); + + if (protocol.toLowerCase().startsWith("https://")) { String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim(); String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim(); String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim(); http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java index 923871e..4c546ad 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java @@ -46,14 +46,23 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink private String collectorUri; private String applicationId; private int timeoutSeconds; + private String port; + private String collectors; + private String zkQuorum; + private String protocol; public StormTimelineMetricsReporter() { } @Override - protected String getCollectorUri() { - return this.collectorUri; + protected String getCollectorUri(String host) { + return constructTimelineMetricUri(protocol, host, port); + } + + @Override + protected String getCollectorProtocol() { + return protocol; } @Override @@ -62,6 +71,22 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink } @Override + protected String getZookeeperQuorum() { + return zkQuorum; + } + + @Override + protected String getConfiguredCollectors() { + return collectors; + } + + @Override + protected String getHostname() { + return hostname; + } + + + @Override public void prepare(Object registrationArgument) { LOG.info("Preparing Storm Metrics Reporter"); try { @@ -75,24 +100,34 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink LOG.error("Could not identify hostname."); throw new RuntimeException("Could not identify hostname.", e); } - Configuration configuration = new Configuration("/storm-metrics2.properties"); - String collector = configuration.getProperty(COLLECTOR_PROPERTY).toString(); - timeoutSeconds = configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS) != null ? - Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS).toString()) : + Configuration conf = new Configuration("/storm-metrics2.properties"); + + collectors = conf.getProperty(COLLECTOR_PROPERTY); + protocol = conf.getProperty(COLLECTOR_PROTOCOL) != null ? conf.getProperty(COLLECTOR_PROTOCOL) : "http"; + port = conf.getProperty(COLLECTOR_PORT) != null ? conf.getProperty(COLLECTOR_PORT) : "6188"; + zkQuorum = conf.getProperty(ZOOKEEPER_QUORUM) != null ? conf.getProperty(ZOOKEEPER_QUORUM) : null; + + timeoutSeconds = conf.getProperty(METRICS_POST_TIMEOUT_SECONDS) != null ? + Integer.parseInt(conf.getProperty(METRICS_POST_TIMEOUT_SECONDS).toString()) : DEFAULT_POST_TIMEOUT_SECONDS; - applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID); - collectorUri = collector + WS_V1_TIMELINE_METRICS; + applicationId = conf.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID); + + collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port); + if (collectorUri.toLowerCase().startsWith("https://")) { - String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).toString().trim(); - String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim(); - String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim(); + String trustStorePath = conf.getProperty(SSL_KEYSTORE_PATH_PROPERTY).toString().trim(); + String trustStoreType = conf.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim(); + String trustStorePwd = conf.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim(); loadTruststore(trustStorePath, trustStoreType, trustStorePwd); } + + } catch (Exception e) { LOG.warn("Could not initialize metrics collector, please specify " + "protocol, host, port under $STORM_HOME/conf/config.yaml ", e); } - + // Initialize the collector write strategy + super.init(); } @Override http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java index 138c5a0..f8c34d5 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java @@ -63,18 +63,42 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem private int timeoutSeconds; private String topologyName; private String applicationId; + private String collectors; + private String zkQuorum; + private String protocol; + private String port; @Override - protected String getCollectorUri() { + protected String getCollectorUri(String host) { return collectorUri; } @Override + protected String getCollectorProtocol() { + return protocol; + } + + @Override protected int getTimeoutSeconds() { return timeoutSeconds; } @Override + protected String getZookeeperQuorum() { + return zkQuorum; + } + + @Override + protected String getConfiguredCollectors() { + return collectors; + } + + @Override + protected String getHostname() { + return hostname; + } + + @Override public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) { LOG.info("Preparing Storm Metrics Sink"); try { @@ -96,8 +120,16 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem String.valueOf(MAX_EVICTION_TIME_MILLIS))); applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID); metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval); - collectorUri = configuration.getProperty(COLLECTOR_PROPERTY) + WS_V1_TIMELINE_METRICS; - if (collectorUri.toLowerCase().startsWith("https://")) { + + collectors = configuration.getProperty(COLLECTOR_PROPERTY); + zkQuorum = configuration.getProperty("zookeeper.quorum"); + protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http"); + port = configuration.getProperty(COLLECTOR_PORT, "6188"); + + // Initialize the collector write strategy + super.init(); + + if (protocol.toLowerCase().startsWith("https://")) { String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim(); String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim(); String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim(); http://git-wip-us.apache.org/repos/asf/ambari/blob/954e61ee/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java index dca0c25..8e0bda6 100644 --- a/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java +++ b/ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java @@ -18,6 +18,18 @@ package org.apache.hadoop.metrics2.sink.storm; +import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; +import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.shade.com.google.common.collect.Lists; +import org.junit.Ignore; +import org.junit.Test; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.METRIC_NAME_PREFIX_KAFKA_OFFSET; import static org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsSink.SYSTEM_TASK_ID; import static org.easymock.EasyMock.anyObject; @@ -27,23 +39,6 @@ import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; -import java.io.IOException; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric; -import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache; -import org.apache.storm.Constants; -import org.apache.storm.shade.com.google.common.collect.Lists; -import org.junit.Ignore; -import org.junit.Test; - -import org.apache.storm.metric.api.IMetricsConsumer; - public class StormTimelineMetricsSinkTest { @Test public void testNonNumericMetricMetricExclusion() throws InterruptedException, IOException {
