Support round-robin scheduling with failover for monitors with distributed collector. (dsen via swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9b237774 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9b237774 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9b237774 Branch: refs/heads/branch-2.5 Commit: 9b23777498b16251f7bab159599edd7992a19c30 Parents: aa26ebb Author: Aravindan Vijayan <[email protected]> Authored: Mon Nov 14 21:05:15 2016 -0800 Committer: Aravindan Vijayan <[email protected]> Committed: Tue Nov 15 11:06:38 2016 -0800 ---------------------------------------------------------------------- .../conf/unix/metric_monitor.ini | 4 +- .../conf/windows/metric_monitor.ini | 5 +- .../src/main/python/core/blacklisted_set.py | 60 +++++++++ .../src/main/python/core/config_reader.py | 16 ++- .../src/main/python/core/controller.py | 1 - .../src/main/python/core/emitter.py | 121 +++++++++++++------ .../src/main/python/core/event_definition.py | 5 +- .../src/test/python/core/TestEmitter.py | 29 ++++- .../0.1.0/configuration/ams-env.xml | 14 +++ .../0.1.0/configuration/ams-site.xml | 8 ++ .../0.1.0/package/scripts/params.py | 2 + .../package/templates/metric_monitor.ini.j2 | 5 +- 12 files changed, 224 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/9b237774/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini index fc896b8..e98c65c 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini +++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini @@ -18,7 +18,7 @@ [default] debug_level = INFO -hostname = localhost +metrics_servers = localhost enable_time_threshold = false enable_value_threshold = false skip_disk_patterns = @@ -29,6 +29,8 @@ send_interval = 60 [collector] collector_sleep_interval = 5 max_queue_size = 5000 +failover_strategy = round-robin +failover_strategy_blacklisted_interval_seconds = 600 host = localhost port = 6188 https_enabled = false http://git-wip-us.apache.org/repos/asf/ambari/blob/9b237774/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_monitor.ini ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_monitor.ini b/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_monitor.ini index bc2b461..77c9ce1 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_monitor.ini +++ b/ambari-metrics/ambari-metrics-host-monitoring/conf/windows/metric_monitor.ini @@ -18,7 +18,7 @@ [default] debug_level = INFO -metrics_server = {{ams_collector_host_single}}:{{ams_collector_port}} +metrics_servers = localhost enable_time_threshold = false enable_value_threshold = false @@ -28,3 +28,6 @@ send_interval = 60 [collector] collector_sleep_interval = 5 max_queue_size = 5000 +failover_strategy = round-robin +failover_strategy_blacklisted_interval_seconds = 600 +port = 6188 http://git-wip-us.apache.org/repos/asf/ambari/blob/9b237774/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/blacklisted_set.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/blacklisted_set.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/blacklisted_set.py new file mode 100644 index 0000000..ca31814 --- /dev/null +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/blacklisted_set.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import time + +class BlacklistedSet(set): + BLACKLIST_TIMEOUT = 5 + + def __init__(self, items=[], blacklist_timeout=BLACKLIST_TIMEOUT): + self.__dict = {} + self.__blacklist_timeout = blacklist_timeout + for item in items: + set.add(self, item) + + def add(self, item): + self.__dict[item] = time.time() + set.add(self, item) + + def __contains__(self, item): + return item in self.__dict and time.time() > self.__dict.get(item) + + def __iter__(self): + for item in set.__iter__(self): + if time.time() > self.__dict.get(item): + yield item + + def blacklist(self, item): + self.__dict[item] = time.time() + self.__blacklist_timeout + +if __name__ == "__main__": + hosts = [1, 2, 3, 4] + bs = BlacklistedSet(hosts) + bs.blacklist(4) + print bs + for a in bs: + print a + time.sleep(2) + + bs.blacklist(1) + bs.blacklist(5) + for a in bs: + print a http://git-wip-us.apache.org/repos/asf/ambari/blob/9b237774/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py index e2ed83f..b0a7733 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py @@ -86,6 +86,9 @@ EXITCODE_OUT_FILE = PID_DIR + os.sep + "ambari-metrics-host-monitoring.exitcode" SERVICE_USERNAME_KEY = "TMP_AMHM_USERNAME" SERVICE_PASSWORD_KEY = "TMP_AMHM_PASSWORD" +# Failover strategies +ROUND_ROBIN_FAILOVER_STRATEGY = 'round-robin' + SETUP_ACTION = "setup" START_ACTION = "start" STOP_ACTION = "stop" @@ -97,6 +100,7 @@ AMBARI_AGENT_CONF = '/etc/ambari-agent/conf/ambari-agent.ini' config_content = """ [default] debug_level = INFO +metrics_servers = localhost,host1,host2 enable_time_threshold = false enable_value_threshold = false @@ -106,6 +110,8 @@ send_interval = 60 [collector] collector_sleep_interval = 5 max_queue_size = 5000 +failover_strategy = round-robin +failover_strategy_blacklisted_interval_seconds = 0 host = localhost port = 6188 https_enabled = false @@ -204,8 +210,14 @@ class Configuration: def get_collector_sleep_interval(self): return int(self.get("collector", "collector_sleep_interval", 10)) - def get_hostname_config(self): - return self.get("default", "hostname", None) + def get_metrics_collector_hosts(self): + return self.get("default", "metrics_servers", "localhost").split(",") + + def get_failover_strategy(self): + return self.get("collector", "failover_strategy", ROUND_ROBIN_FAILOVER_STRATEGY) + + def get_failover_strategy_blacklisted_interval_seconds(self): + return self.get("collector", "failover_strategy_blacklisted_interval_seconds", 600) def get_hostname_script(self): if self.hostname_script: http://git-wip-us.apache.org/repos/asf/ambari/blob/9b237774/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py index c2ee448..c0feed5 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/controller.py @@ -20,7 +20,6 @@ limitations under the License. import logging import threading -import time from Queue import Queue from threading import Timer from application_metric_map import ApplicationMetricMap http://git-wip-us.apache.org/repos/asf/ambari/blob/9b237774/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py index 6997108..6dc0207 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/emitter.py @@ -22,6 +22,8 @@ import logging import threading from security import CachedHTTPSConnection, CachedHTTPConnection +from blacklisted_set import BlacklistedSet +from config_reader import ROUND_ROBIN_FAILOVER_STRATEGY logger = logging.getLogger() @@ -37,19 +39,22 @@ class Emitter(threading.Thread): logger.debug('Initializing Emitter thread.') self.lock = threading.Lock() self.send_interval = config.get_send_interval() + self.hostname = config.get_hostname_config() + self.hostname_hash = self.compute_hash(self.hostname) self._stop_handler = stop_handler self.application_metric_map = application_metric_map - # TODO verify certificate - timeout = int(self.send_interval - 10) - if config.is_server_https_enabled(): - self.connection = CachedHTTPSConnection(config.get_server_host(), - config.get_server_port(), - timeout=timeout, - ca_certs=config.get_ca_certs()) + self.collector_port = config.get_server_port() + self.all_metrics_collector_hosts = config.get_metrics_collector_hosts() + self.is_server_https_enabled = config.is_server_https_enabled() + + if self.is_server_https_enabled: + self.ca_certs = config.get_ca_certs() + + # TimedRoundRobinSet + if config.get_failover_strategy() == ROUND_ROBIN_FAILOVER_STRATEGY: + self.active_collector_hosts = BlacklistedSet(self.all_metrics_collector_hosts, float(config.get_failover_strategy_blacklisted_interval_seconds())) else: - self.connection = CachedHTTPConnection(config.get_server_host(), - config.get_server_port(), - timeout=timeout) + raise Exception(-1, "Uknown failover strategy {0}".format(config.get_failover_strategy())) def run(self): logger.info('Running Emitter thread: %s' % threading.currentThread().getName()) @@ -66,7 +71,6 @@ class Emitter(threading.Thread): pass def submit_metrics(self): - retry_count = 0 # This call will acquire lock on the map and clear contents before returning # After configured number of retries the data will not be sent to the # collector @@ -75,36 +79,85 @@ class Emitter(threading.Thread): logger.info("Nothing to emit, resume waiting.") return pass + self.push_metrics(json_data) - response = None - while retry_count < self.MAX_RETRY_COUNT: - try: - response = self.push_metrics(json_data) - except Exception, e: - logger.warn('Error sending metrics to server. %s' % str(e)) + def push_metrics(self, data): + success = False + while self.active_collector_hosts.get_actual_size() > 0: + collector_host = self.get_collector_host_shard() + success = self.try_with_collector_host(collector_host, data) + if success: + break + pass + + if not success: + logger.info('No valid collectors found...') + for collector_host in self.active_collector_hosts: + success = self.try_with_collector_host(collector_host, data) pass + def try_with_collector_host(self, collector_host, data): + headers = {"Content-Type" : "application/json", "Accept" : "*/*"} + connection = self.get_connection(collector_host) + logger.debug("message to send: %s" % data) + retry_count = 0 + while retry_count < self.MAX_RETRY_COUNT: + response = self.get_response_from_submission(connection, data, headers) if response and response.status == 200: - retry_count = self.MAX_RETRY_COUNT + return True else: logger.warn("Retrying after {0} ...".format(self.RETRY_SLEEP_INTERVAL)) retry_count += 1 #Wait for the service stop event instead of sleeping blindly if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL): - return - pass + return True pass - # TODO verify certificate - def push_metrics(self, data): - headers = {"Content-Type" : "application/json", - "Accept" : "*/*", - "Connection":" Keep-Alive"} - logger.debug("message to sent: %s" % data) - self.connection.request("POST", self.AMS_METRICS_POST_URL, data, headers) - response = self.connection.getresponse() - if response: - logger.debug("POST response from server: retcode = {0}, reason = {1}" - .format(response.status, response.reason)) - logger.debug(str(response.read())) - - return response + + if retry_count >= self.MAX_RETRY_COUNT: + self.active_collector_hosts.blacklist(collector_host) + logger.warn("Metric collector host {0} was blacklisted.".format(collector_host)) + return False + + def get_connection(self, collector_host): + timeout = int(self.send_interval - 10) + if self.is_server_https_enabled: + connection = CachedHTTPSConnection(collector_host, + self.collector_port, + timeout=timeout, + ca_certs=self.ca_certs) + else: + connection = CachedHTTPConnection(collector_host, + self.collector_port, + timeout=timeout) + return connection + + def get_response_from_submission(self, connection, data, headers): + try: + connection.request("POST", self.AMS_METRICS_POST_URL, data, headers) + response = connection.getresponse() + if response: + logger.debug("POST response from server: retcode = {0}, reason = {1}" + .format(response.status, response.reason)) + logger.debug(str(response.read())) + return response + except Exception, e: + logger.warn('Error sending metrics to server. %s' % str(e)) + return None + + def get_collector_host_shard(self): + size = self.active_collector_hosts.get_actual_size() + index = self.hostname_hash % size + index = index if index >= 0 else index + size + hostname = self.active_collector_hosts.get_item_at_index(index) + logger.info('Calculated collector shard based on hostname : %s' % hostname) + return hostname + + def compute_hash(self, hostname): + hash = 11987 + length = len(hostname) + for i in xrange(0, length - 1): + hash = 31*hash + ord(hostname[i]) + return hash + + + http://git-wip-us.apache.org/repos/asf/ambari/blob/9b237774/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/event_definition.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/event_definition.py b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/event_definition.py index f356b03..8c905da 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/event_definition.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/main/python/core/event_definition.py @@ -37,9 +37,8 @@ class Event: class EmmitEvent(Event): - def __init__(self, application_metric_map, config): + def __init__(self, application_metric_map): Event.__init__(self) - self.collector_address = config.get_server_address() self.application_metric_map = application_metric_map def get_emmit_payload(self): @@ -82,4 +81,4 @@ class ProcessMetricCollectEvent: def __init__(self, group_config, group_name): # Initialize the Process metric event - pass \ No newline at end of file + pass http://git-wip-us.apache.org/repos/asf/ambari/blob/9b237774/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py ---------------------------------------------------------------------- diff --git a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py index be0608f..de97292 100644 --- a/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py +++ b/ambari-metrics/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py @@ -20,11 +20,13 @@ limitations under the License. import json import logging +import time from unittest import TestCase from only_for_platform import get_platform, PLATFORM_WINDOWS from mock.mock import patch, MagicMock from security import CachedHTTPConnection +from blacklisted_set import BlacklistedSet if get_platform() != PLATFORM_WINDOWS: os_distro_value = ('Suse','11','Final') @@ -68,7 +70,7 @@ class TestEmitter(TestCase): @patch.object(CachedHTTPConnection, "create_connection", new = MagicMock()) @patch.object(CachedHTTPConnection, "getresponse", new = MagicMock()) @patch.object(CachedHTTPConnection, "request") - def testRetryFetch(self, request_mock): + def testRetryFetchAndRoundRobin(self, request_mock): stop_handler = bind_signal_handlers() request_mock.return_value = MagicMock() @@ -81,7 +83,7 @@ class TestEmitter(TestCase): emitter.RETRY_SLEEP_INTERVAL = .001 emitter.submit_metrics() - self.assertEqual(request_mock.call_count, 3) + self.assertEqual(request_mock.call_count, 9) self.assertUrlData(request_mock) def assertUrlData(self, request_mock): @@ -94,3 +96,26 @@ class TestEmitter(TestCase): self.assertEqual(metrics['metrics'][0]['metricname'],'metric1') self.assertEqual(metrics['metrics'][0]['starttime'],1) pass + + def test_blacklisted_set(self): + hosts = ["1", "2", "3", "4"] + sleep_time = 1 + bs = BlacklistedSet(hosts, sleep_time) + bs.blacklist("4") + counter = 0 + for host in bs: + counter = counter + 1 + self.assertEqual(counter, 3) + time.sleep(sleep_time) + counter = 0 + for host in bs: + counter = counter + 1 + self.assertEqual(counter, 4) + bs.blacklist("1") + bs.blacklist("2") + counter = 0 + for host in bs: + counter = counter + 1 + self.assertEqual(counter, 2) + + http://git-wip-us.apache.org/repos/asf/ambari/blob/9b237774/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml index fed00c5..6651b6c 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-env.xml @@ -101,6 +101,17 @@ <on-ambari-upgrade add="true"/> </property> <property> + <name>failover_strategy_blacklisted_interval</name> + <value>600</value> + <description> + Metrics collector host will be blacklisted for specified number of seconds if metric monitor failed to connect to it. + </description> + <value-attributes> + <type>int</type> + </value-attributes> + <on-ambari-upgrade add="true"/> + </property> + <property> <name>content</name> <display-name>ams-env template</display-name> <value> @@ -143,6 +154,9 @@ export AMS_COLLECTOR_OPTS="$AMS_COLLECTOR_OPTS -Djava.security.auth.login.config export AMS_COLLECTOR_GC_OPTS="-XX:+UseConcMarkSweepGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:{{ams_collector_log_dir}}/collector-gc.log-`date +'%Y%m%d%H%M'`" export AMS_COLLECTOR_OPTS="$AMS_COLLECTOR_OPTS $AMS_COLLECTOR_GC_OPTS" +# Metrics collector host will be blacklisted for specified number of seconds if metric monitor failed to connect to it. +export AMS_FAILOVER_STRATEGY_BLACKLISTED_INTERVAL={{failover_strategy_blacklisted_interval}} + </value> <value-attributes> <type>content</type> http://git-wip-us.apache.org/repos/asf/ambari/blob/9b237774/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml index e217011..a47581d 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/configuration/ams-site.xml @@ -645,6 +645,14 @@ <on-ambari-upgrade add="true"/> </property> <property> + <name>failover.strategy</name> + <value>round-robin</value> + <description> + Failover strategy for metric monitors + </description> + <on-ambari-upgrade add="true"/> + </property> + <property> <name>timeline.metrics.hbase.init.check.enabled</name> <value>true</value> <description> http://git-wip-us.apache.org/repos/asf/ambari/blob/9b237774/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py index 1bb0ab0..db046ff 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py @@ -53,6 +53,8 @@ ams_pid_dir = status_params.ams_collector_pid_dir ams_collector_script = "/usr/sbin/ambari-metrics-collector" ams_collector_pid_dir = status_params.ams_collector_pid_dir ams_collector_hosts = default("/clusterHostInfo/metrics_collector_hosts", []) +failover_strategy_blacklisted_interval_seconds = default("/configurations/ams-env/failover_strategy_blacklisted_interval", "600") +failover_strategy = default("/configurations/ams-site/failover.strategy", "round-robin") if default("/configurations/ams-site/timeline.metrics.service.http.policy", "HTTP_ONLY") == "HTTPS_ONLY": metric_collector_https_enabled = True metric_collector_protocol = 'https' http://git-wip-us.apache.org/repos/asf/ambari/blob/9b237774/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 index 06109f4..b011fd5 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metric_monitor.ini.j2 @@ -18,7 +18,7 @@ [default] debug_level = INFO -hostname = {{hostname}} +metrics_servers = {{ams_collector_hosts}} enable_time_threshold = false enable_value_threshold = false skip_disk_patterns = {{skip_disk_metrics_patterns}} @@ -29,6 +29,7 @@ send_interval = {{metrics_report_interval}} [collector] collector_sleep_interval = 10 max_queue_size = 5000 -host = {{metric_collector_host}} +failover_strategy = {{failover_strategy}} +failover_strategy_blacklisted_interval_seconds = {{failover_strategy_blacklisted_interval_seconds}} port = {{metric_collector_port}} https_enabled = {{metric_collector_https_enabled}}
