Repository: ambari Updated Branches: refs/heads/branch-3.0-perf 9b9402e93 -> a9774d664
AMBARI-21732. Report alerts status to server (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a9774d66 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a9774d66 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a9774d66 Branch: refs/heads/branch-3.0-perf Commit: a9774d6648474911716d9abc278272437ba56d11 Parents: 9b9402e Author: Andrew Onishuk <aonis...@hortonworks.com> Authored: Wed Aug 16 13:36:34 2017 +0300 Committer: Andrew Onishuk <aonis...@hortonworks.com> Committed: Wed Aug 16 13:36:34 2017 +0300 ---------------------------------------------------------------------- ambari-agent/conf/unix/ambari-agent.ini | 1 + .../ambari_agent/AlertSchedulerHandler.py | 2 +- .../python/ambari_agent/AlertStatusReporter.py | 56 ++++++++++++++++++++ .../ambari_agent/ClusterConfigurationCache.py | 2 +- .../src/main/python/ambari_agent/Constants.py | 1 + .../python/ambari_agent/InitializerModule.py | 1 + .../python/ambari_agent/alerts/base_alert.py | 6 ++- .../src/main/python/ambari_agent/main.py | 10 +++- 8 files changed, 74 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/conf/unix/ambari-agent.ini ---------------------------------------------------------------------- diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini index 609f0fa..e1df1d2 100644 --- a/ambari-agent/conf/unix/ambari-agent.ini +++ b/ambari-agent/conf/unix/ambari-agent.ini @@ -36,6 +36,7 @@ run_as_user=root parallel_execution=0 alert_grace_period=5 status_command_timeout=5 +alert_reports_interval=5 ; 0 - don't report commands output periodically. Reduces bandwidth on big cluster command_reports_interval=5 alert_kinit_timeout=14400000 http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py index 0dfc977..3d7c30c 100644 --- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py +++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py @@ -307,7 +307,7 @@ class AlertSchedulerHandler(): alert = RecoveryAlert(json_definition, source, self.config, self.recovery_manger) if alert is not None: - alert.set_cluster(clusterName, hostName) + alert.set_cluster(clusterName, json_definition['clusterId'], hostName) except Exception, exception: logger.exception("[AlertScheduler] Unable to load an invalid alert definition. It will be skipped.") http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py new file mode 100644 index 0000000..20cb717 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import threading +from ambari_agent import security +from ambari_agent import Constants + +logger = logging.getLogger(__name__) + +class AlertStatusReporter(threading.Thread): + def __init__(self, initializer_module): + self.initializer_module = initializer_module + self.collector = initializer_module.alert_scheduler_handler.collector() + self.stop_event = initializer_module.stop_event + self.alert_reports_interval = initializer_module.alert_reports_interval + threading.Thread.__init__(self) + + def run(self): + """ + Run an endless loop which reports all the alert statuses got from collector + """ + if self.alert_reports_interval == 0: + return + + while not self.stop_event.is_set(): + try: + if self.initializer_module.is_registered: + alerts = self.collector.alerts() + if alerts: + self.initializer_module.connection.send(message=alerts, destination=Constants.ALERTS_STATUS_REPORTS_ENDPOINT) + except security.ConnectionIsNotEstablished: # server and agent disconnected during sending data. Not an issue + pass + except: + logger.exception("Exception in AlertStatusReporter. Re-running it") + + self.stop_event.wait(self.alert_reports_interval) + + logger.info("AlertStatusReporter has successfully finished") http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py index f0c3945..77ca4c1 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py @@ -51,7 +51,7 @@ class ClusterConfigurationCache(ClusterCache): """ self._cache_lock.acquire() try: - dictionary = self[cluster_id] + dictionary = self[str(cluster_id)]['configurations'] for layer_key in key.split('/'): dictionary = dictionary[layer_key] http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/src/main/python/ambari_agent/Constants.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py index 37b5df2..e36eda5 100644 --- a/ambari-agent/src/main/python/ambari_agent/Constants.py +++ b/ambari-agent/src/main/python/ambari_agent/Constants.py @@ -38,6 +38,7 @@ ALERTS_DEFENITIONS_REQUEST_ENDPOINT = '/agents/alert_defenitions' COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status' COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status' HOST_STATUS_REPORTS_ENDPOINT = '/reports/host_status' +ALERTS_STATUS_REPORTS_ENDPOINT = '/reports/alerts_status' HEARTBEAT_ENDPOINT = '/heartbeat' REGISTRATION_ENDPOINT = '/register' http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/src/main/python/ambari_agent/InitializerModule.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index cbf0780..8208b32 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -60,6 +60,7 @@ class InitializerModule: self.cache_dir = self.config.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache') self.command_reports_interval = int(self.config.get('agent', 'command_reports_interval', default='5')) + self.alert_reports_interval = int(self.config.get('agent', 'alert_reports_interval', default='5')) self.cluster_cache_dir = os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY) self.recovery_cache_dir = os.path.join(self.cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY) http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py index ce1583f..76d8390 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py @@ -45,6 +45,7 @@ class BaseAlert(object): self.alert_meta = alert_meta self.alert_source_meta = alert_source_meta self.cluster_name = '' + self.cluster_id = None self.host_name = '' self.config = config @@ -86,9 +87,10 @@ class BaseAlert(object): self.cluster_configuration_cache = cluster_configuration_cache - def set_cluster(self, cluster_name, host_name): + def set_cluster(self, cluster_name, cluster_id, host_name): """ sets cluster information for the alert """ self.cluster_name = cluster_name + self.cluster_id = cluster_id self.host_name = host_name @@ -216,7 +218,7 @@ class BaseAlert(object): resolved_key = key for placeholder_key in placeholder_keys: value = self.cluster_configuration_cache.get_configuration_value( - self.cluster_name, placeholder_key) + self.cluster_id, placeholder_key) # if any of the placeholder keys is missing from the configuration, then # return None as per the contract of this function http://git-wip-us.apache.org/repos/asf/ambari/blob/a9774d66/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index c255e8b..ece9b7a 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -108,6 +108,7 @@ from ambari_agent.InitializerModule import InitializerModule from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor from ambari_agent.CommandStatusReporter import CommandStatusReporter from ambari_agent.HostStatusReporter import HostStatusReporter +from ambari_agent.AlertStatusReporter import AlertStatusReporter #logging.getLogger('ambari_agent').propagate = False @@ -366,13 +367,20 @@ def run_threads(initializer_module): host_status_reporter = HostStatusReporter(initializer_module) host_status_reporter.start() + alert_status_reporter = AlertStatusReporter(initializer_module) + alert_status_reporter.start() + initializer_module.action_queue.start() while not initializer_module.stop_event.is_set(): time.sleep(0.1) - heartbeat_thread.join() + command_status_reporter.join() component_status_executor.join() + host_status_reporter.join() + alert_status_reporter.join() + heartbeat_thread.join() + initializer_module.action_queue.join() # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process # we need this for windows os, where no sigterm available