AMBARI-18487 : Test and refine Collector writes w.r.t sharing and timeouts. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/abd19c61 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/abd19c61 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/abd19c61 Branch: refs/heads/branch-2.5 Commit: abd19c61e063a3918b71df6321255dd1c94c352d Parents: 0d01ab2 Author: Aravindan Vijayan <[email protected]> Authored: Mon Nov 14 21:37:53 2016 -0800 Committer: Aravindan Vijayan <[email protected]> Committed: Tue Nov 15 11:06:40 2016 -0800 ---------------------------------------------------------------------- .../timeline/AbstractTimelineMetricsSink.java | 77 +++++++++++++------- .../availability/MetricCollectorHAHelper.java | 9 ++- .../src/main/python/core/blacklisted_set.py | 14 ++++ .../src/main/python/core/config_reader.py | 3 +- 4 files changed, 71 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/abd19c61/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java index 8be871f..efa5cba 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java @@ -51,8 +51,12 @@ import java.net.URL; import java.security.KeyStore; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -76,8 +80,9 @@ public abstract class AbstractTimelineMetricsSink { protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0); public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100; - public int ZK_CONNECT_TRY_TIME = 10000; + public int ZK_CONNECT_TRY_COUNT = 10; public int ZK_SLEEP_BETWEEN_RETRY_TIME = 2000; + public boolean shardExpired = true; private SSLSocketFactory sslSocketFactory; @@ -93,7 +98,7 @@ public abstract class AbstractTimelineMetricsSink { // well as timed refresh protected Supplier<String> targetCollectorHostSupplier; - protected final List<String> allKnownLiveCollectors = new ArrayList<>(); + protected final SortedSet<String> allKnownLiveCollectors = new TreeSet<>(); private volatile boolean isInitializedForHA = false; @@ -125,7 +130,7 @@ public abstract class AbstractTimelineMetricsSink { protected void init() { metricSinkWriteShardStrategy = new MetricSinkWriteShardHostnameHashingStrategy(getHostname()); collectorHAHelper = new MetricCollectorHAHelper(getZookeeperQuorum(), - ZK_CONNECT_TRY_TIME, ZK_SLEEP_BETWEEN_RETRY_TIME); + ZK_CONNECT_TRY_COUNT, ZK_SLEEP_BETWEEN_RETRY_TIME); isInitializedForHA = true; } @@ -202,6 +207,8 @@ public abstract class AbstractTimelineMetricsSink { collectorHost = targetCollectorHostSupplier.get(); // Last X attempts have failed - force refresh if (failedCollectorConnectionsCounter.get() > RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER) { + LOG.info("Removing collector " + collectorHost + " from allKnownLiveCollectors."); + allKnownLiveCollectors.remove(collectorHost); targetCollectorHostSupplier = null; collectorHost = findPreferredCollectHost(); } @@ -319,6 +326,7 @@ public abstract class AbstractTimelineMetricsSink { init(); } + shardExpired = false; // Auto expire and re-calculate after 1 hour if (targetCollectorHostSupplier != null) { String targetCollector = targetCollectorHostSupplier.get(); @@ -327,32 +335,12 @@ public abstract class AbstractTimelineMetricsSink { } } - Collection<String> collectorHosts = getConfiguredCollectorHosts(); - - LOG.debug("Trying to find live collector host from : " + collectorHosts); // Reach out to all configured collectors before Zookeeper - if (collectorHosts != null && !collectorHosts.isEmpty()) { - for (String hostStr : collectorHosts) { - hostStr = hostStr.trim(); - if (!hostStr.isEmpty()) { - try { - Collection<String> liveHosts = findLiveCollectorHostsFromKnownCollector(hostStr, getCollectorPort()); - // Update live Hosts - current host will already be a part of this - for (String host : liveHosts) { - allKnownLiveCollectors.add(host); - } - break; // Found at least 1 live collector - } catch (MetricCollectorUnavailableException e) { - LOG.info("Collector " + hostStr + " is not longer live. Removing " + - "it from list of know live collector hosts : " + allKnownLiveCollectors); - allKnownLiveCollectors.remove(hostStr); - } - } - } - } + refreshCollectorsFromConfigured(); // Lookup Zookeeper for live hosts - max 10 seconds wait time if (allKnownLiveCollectors.size() == 0 && getZookeeperQuorum() != null) { + LOG.info("No live collectors from configuration. Requesting zookeeper..."); allKnownLiveCollectors.addAll(collectorHAHelper.findLiveCollectorHostsFromZNode()); } @@ -361,6 +349,13 @@ public abstract class AbstractTimelineMetricsSink { new Supplier<String>() { @Override public String get() { + //shardExpired flag is used to determine if the Supplier.get() is invoked through the + // findPreferredCollectHost method (No need to refresh collector hosts + // OR + // through Expiry (Refresh needed to pick up dead collectors that might have not become alive). + if (shardExpired) { + refreshCollectorsFromConfigured(); + } return metricSinkWriteShardStrategy.findCollectorShard(new ArrayList<>(allKnownLiveCollectors)); } }, // random.nextInt(max - min + 1) + min # (60 to 75 minutes) @@ -370,12 +365,40 @@ public abstract class AbstractTimelineMetricsSink { TimeUnit.MINUTES ); - return targetCollectorHostSupplier.get(); + String collectorHost = targetCollectorHostSupplier.get(); + shardExpired = true; + return collectorHost; } LOG.warn("Couldn't find any live collectors. Returning null"); + shardExpired = true; return null; } + private void refreshCollectorsFromConfigured() { + Collection<String> collectorHosts = getConfiguredCollectorHosts(); + + LOG.debug("Trying to find live collector host from : " + collectorHosts); + if (collectorHosts != null && !collectorHosts.isEmpty()) { + for (String hostStr : collectorHosts) { + hostStr = hostStr.trim(); + if (!hostStr.isEmpty()) { + try { + Collection<String> liveHosts = findLiveCollectorHostsFromKnownCollector(hostStr, getCollectorPort()); + // Update live Hosts - current host will already be a part of this + for (String host : liveHosts) { + allKnownLiveCollectors.add(host); + } + break; // Found at least 1 live collector + } catch (MetricCollectorUnavailableException e) { + LOG.info("Collector " + hostStr + " is not longer live. Removing " + + "it from list of know live collector hosts : " + allKnownLiveCollectors); + allKnownLiveCollectors.remove(hostStr); + } + } + } + } + } + Collection<String> findLiveCollectorHostsFromKnownCollector(String host, String port) throws MetricCollectorUnavailableException { List<String> collectors = new ArrayList<>(); HttpURLConnection connection = null; @@ -424,7 +447,7 @@ public abstract class AbstractTimelineMetricsSink { LOG.debug(errorMessage); LOG.debug(ioe); String warnMsg = "Unable to connect to collector to find live nodes."; - LOG.warn(warnMsg, ioe); + LOG.warn(warnMsg); throw new MetricCollectorUnavailableException(warnMsg); } return collectors; http://git-wip-us.apache.org/repos/asf/ambari/blob/abd19c61/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java index 4d0ec14..2254362 100644 --- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java +++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHAHelper.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.curator.CuratorZookeeperClient; import org.apache.curator.RetryLoop; import org.apache.curator.RetryPolicy; +import org.apache.curator.retry.BoundedExponentialBackoffRetry; import org.apache.curator.retry.RetryUntilElapsed; import org.apache.zookeeper.ZooKeeper; @@ -38,7 +39,7 @@ import java.util.concurrent.Callable; */ public class MetricCollectorHAHelper { private final String zookeeperQuorum; - private final int tryTime; + private final int tryCount; private final int sleepMsBetweenRetries; private static final int CONNECTION_TIMEOUT = 2000; @@ -50,9 +51,9 @@ public class MetricCollectorHAHelper { private static final Log LOG = LogFactory.getLog(MetricCollectorHAHelper.class); - public MetricCollectorHAHelper(String zookeeperQuorum, int tryTime, int sleepMsBetweenRetries) { + public MetricCollectorHAHelper(String zookeeperQuorum, int tryCount, int sleepMsBetweenRetries) { this.zookeeperQuorum = zookeeperQuorum; - this.tryTime = tryTime; + this.tryCount = tryCount; this.sleepMsBetweenRetries = sleepMsBetweenRetries; } @@ -63,7 +64,7 @@ public class MetricCollectorHAHelper { public Collection<String> findLiveCollectorHostsFromZNode() { Set<String> collectors = new HashSet<>(); - RetryPolicy retryPolicy = new RetryUntilElapsed(tryTime, sleepMsBetweenRetries); + RetryPolicy retryPolicy = new BoundedExponentialBackoffRetry(sleepMsBetweenRetries, 10*sleepMsBetweenRetries, tryCount); final CuratorZookeeperClient client = new CuratorZookeeperClient(zookeeperQuorum, SESSION_TIMEOUT, CONNECTION_TIMEOUT, null, retryPolicy); http://git-wip-us.apache.org/repos/asf/ambari/blob/abd19c61/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/blacklisted_set.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/blacklisted_set.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/blacklisted_set.py index ca31814..4c6a8dd 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/blacklisted_set.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/blacklisted_set.py @@ -42,6 +42,20 @@ class BlacklistedSet(set): if time.time() > self.__dict.get(item): yield item + def get_actual_size(self): + size = 0 + for item in self.__iter__(): + size += 1 + return size + + def get_item_at_index(self, index): + i = 0 + for item in self.__iter__(): + if i == index: + return item + i += 1 + return None + def blacklist(self, item): self.__dict[item] = time.time() + self.__blacklist_timeout http://git-wip-us.apache.org/repos/asf/ambari/blob/abd19c61/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py index fa9ea9f..9f95c26 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py @@ -101,6 +101,7 @@ AMBARI_AGENT_CONF = '/etc/ambari-agent/conf/ambari-agent.ini' config_content = """ [default] debug_level = INFO +hostname = localhost metrics_servers = ['localhost','host1','host2'] enable_time_threshold = false enable_value_threshold = false @@ -112,7 +113,7 @@ send_interval = 60 collector_sleep_interval = 5 max_queue_size = 5000 failover_strategy = round-robin -failover_strategy_blacklisted_interval_seconds = 0 +failover_strategy_blacklisted_interval_seconds = 60 host = localhost port = 6188 https_enabled = false
