AMBARI-21667. Create a topic to send alert_definitions (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6578b5a2 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6578b5a2 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6578b5a2 Branch: refs/heads/branch-3.0-perf Commit: 6578b5a28516e8afa66c2f302cfe7abfdbc31472 Parents: 44c1cb5 Author: Andrew Onishuk <[email protected]> Authored: Tue Aug 8 12:11:28 2017 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Tue Aug 8 12:11:28 2017 +0300 ---------------------------------------------------------------------- .../ambari_agent/AlertSchedulerHandler.py | 69 +- .../main/python/ambari_agent/AmbariConfig.py | 2 +- .../ClusterAlertDefinitionsCache.py | 62 + .../src/main/python/ambari_agent/Constants.py | 4 +- .../main/python/ambari_agent/HeartbeatThread.py | 7 +- .../python/ambari_agent/InitializerModule.py | 12 +- .../python/ambari_agent/alerts/base_alert.py | 2 +- .../python/ambari_agent/alerts/collector.py | 2 +- .../python/ambari_agent/alerts/metric_alert.py | 2 +- .../python/ambari_agent/alerts/port_alert.py | 2 +- .../ambari_agent/alerts/recovery_alert.py | 2 +- .../python/ambari_agent/alerts/script_alert.py | 2 +- .../ambari_agent/apscheduler/threadpool.py | 2 +- .../listeners/AlertDefinitionsEventListener.py | 55 + .../listeners/HostLevelParamsEventListener.py | 2 +- .../src/main/python/ambari_agent/main.py | 14 +- .../ambari_agent/BaseStompServerTestCase.py | 6 + .../ambari_agent/TestAgentStompResponses.py | 23 +- .../dummy_files/stomp/alert_definitions.json | 2700 ++++++++++++++++++ 19 files changed, 2902 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py index 6c1d29c..0dfc977 100644 --- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py +++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py @@ -36,11 +36,12 @@ from alerts.script_alert import ScriptAlert from alerts.web_alert import WebAlert from alerts.recovery_alert import RecoveryAlert from ambari_agent.ExitHelper import ExitHelper +from ambari_agent.FileCache import FileCache +from ambari_agent.Utils import Utils logger = logging.getLogger(__name__) class AlertSchedulerHandler(): - FILENAME = 'definitions.json' TYPE_PORT = 'PORT' TYPE_METRIC = 'METRIC' TYPE_AMS = 'AMS' @@ -48,29 +49,24 @@ class AlertSchedulerHandler(): TYPE_WEB = 'WEB' TYPE_RECOVERY = 'RECOVERY' - def __init__(self, cachedir, stacks_dir, common_services_dir, extensions_dir, - host_scripts_dir, cluster_configuration, config, recovery_manager, - in_minutes=True): + def __init__(self, initializer_module, in_minutes=True): - self.cachedir = cachedir - self.stacks_dir = stacks_dir - self.common_services_dir = common_services_dir - self.extensions_dir = extensions_dir - self.host_scripts_dir = host_scripts_dir + self.cachedir = initializer_module.alerts_cachedir + self.stacks_dir = initializer_module.stacks_dir + self.common_services_dir = initializer_module.common_services_dir + self.extensions_dir = initializer_module.extensions_dir + self.host_scripts_dir = initializer_module.host_scripts_dir - self._cluster_configuration = cluster_configuration + self._cluster_configuration = initializer_module.configurations_cache + self.alert_definitions_cache = initializer_module.alert_definitions_cache + + self.config = initializer_module.config # a mapping between a cluster name and a unique hash for all definitions self._cluster_hashes = {} # the amount of time, in seconds, that an alert can run after it's scheduled time - alert_grace_period = int(config.get('agent', 'alert_grace_period', 5)) - - if not os.path.exists(cachedir): - try: - os.makedirs(cachedir) - except: - logger.critical("[AlertScheduler] Could not create the cache directory {0}".format(cachedir)) + alert_grace_period = int(self.config.get('agent', 'alert_grace_period', 5)) apscheduler_standalone = False @@ -80,14 +76,13 @@ class AlertSchedulerHandler(): 'apscheduler.standalone': apscheduler_standalone, 'apscheduler.misfire_grace_time': alert_grace_period, 'apscheduler.threadpool.context_injector': self._job_context_injector if not apscheduler_standalone else None, - 'apscheduler.threadpool.agent_config': config + 'apscheduler.threadpool.agent_config': self.config } self._collector = AlertCollector() self.__scheduler = Scheduler(self.APS_CONFIG) self.__in_minutes = in_minutes - self.config = config - self.recovery_manger = recovery_manager + self.recovery_manger = initializer_module.recovery_manager # register python exit handler ExitHelper().register(self.exit_handler) @@ -113,31 +108,17 @@ class AlertSchedulerHandler(): self.stop() - def update_definitions(self, heartbeat): + def update_definitions(self): """ Updates the persisted alert definitions JSON. - :param heartbeat: :return: """ - if 'alertDefinitionCommands' not in heartbeat: - logger.warning("There are no alert definition commands in the heartbeat; unable to update definitions") - return - # prune out things we don't want to store alert_definitions = [] - for command in heartbeat['alertDefinitionCommands']: - command_copy = command.copy() - - # no need to store these since we always use the in-memory cached values - if 'configurations' in command_copy: - del command_copy['configurations'] - + for cluster_id, command in self.alert_definitions_cache.iteritems(): + command_copy = Utils.get_mutable_copy(command) alert_definitions.append(command_copy) - # write out the new definitions - with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f: - json.dump(alert_definitions, f, indent=2) - # determine how to reschedule the jobs reschedule_all = False if "clusterName" in command_copy and command_copy["clusterName"] not in self._cluster_hashes: @@ -271,16 +252,7 @@ class AlertSchedulerHandler(): :return: """ definitions = [] - - alerts_definitions_path = os.path.join(self.cachedir, self.FILENAME) - try: - with open(alerts_definitions_path) as fp: - all_commands = json.load(fp) - except: - logger.warning('[AlertScheduler] {0} not found or invalid. No alerts will be scheduled until registration occurs.'.format(alerts_definitions_path)) - return definitions - - for command_json in all_commands: + for cluster_id, command_json in self.alert_definitions_cache.iteritems(): clusterName = '' if not 'clusterName' in command_json else command_json['clusterName'] hostName = '' if not 'hostName' in command_json else command_json['hostName'] clusterHash = None if not 'hash' in command_json else command_json['hash'] @@ -289,9 +261,8 @@ class AlertSchedulerHandler(): if clusterName != '' and clusterHash is not None: logger.info('[AlertScheduler] Caching cluster {0} with alert hash {1}'.format(clusterName, clusterHash)) self._cluster_hashes[clusterName] = clusterHash - for definition in command_json['alertDefinitions']: - alert = self.__json_to_callable(clusterName, hostName, definition) + alert = self.__json_to_callable(clusterName, hostName, Utils.get_mutable_copy(definition)) if alert is None: continue http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py index 9507c9d..e1c40c8 100644 --- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py +++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py @@ -241,7 +241,7 @@ class AmbariConfig: :return: Alerts log file path. """ if 'AMBARI_AGENT_LOG_DIR' in os.environ: - return os.path.join(os.environ['AMBARI_AGENT_LOG_DIR'], "ambari-agent.log") + return os.path.join(os.environ['AMBARI_AGENT_LOG_DIR'], "ambari-alerts.log") else: return os.path.join(os.sep, home_dir, "var", "log", "ambari-agent", "ambari-alerts.log") http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py new file mode 100644 index 0000000..39b88e6 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from ambari_agent.ClusterCache import ClusterCache + +class ClusterAlertDefinitionsCache(ClusterCache): + """ + Maintains an in-memory cache and disk cache of the host level params send from server for + every cluster. This is useful for having quick access to any of the + topology properties. + + Host level params. Is parameters used by execution and status commands which can be generated + differently for every host. + """ + + def __init__(self, cluster_cache_dir): + """ + Initializes the host level params cache. + :param cluster_cache_dir: + :return: + """ + super(ClusterAlertDefinitionsCache, self).__init__(cluster_cache_dir) + + def get_cache_name(self): + return 'alert_definitions' \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/Constants.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py index 17ed2be..37b5df2 100644 --- a/ambari-agent/src/main/python/ambari_agent/Constants.py +++ b/ambari-agent/src/main/python/ambari_agent/Constants.py @@ -22,17 +22,19 @@ limitations under the License. COMMANDS_TOPIC = '/user/commands' CONFIGURATIONS_TOPIC = '/user/configs' HOST_LEVEL_PARAMS_TOPIC = '/user/host_level_params' +ALERTS_DEFENITIONS_TOPIC = '/user/alert_defenitions' METADATA_TOPIC = '/events/metadata' TOPOLOGIES_TOPIC = '/events/topologies' SERVER_RESPONSES_TOPIC = '/user/' PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC] -POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC, HOST_LEVEL_PARAMS_TOPIC] +POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC, HOST_LEVEL_PARAMS_TOPIC, ALERTS_DEFENITIONS_TOPIC] TOPOLOGY_REQUEST_ENDPOINT = '/agents/topologies' METADATA_REQUEST_ENDPOINT = '/agents/metadata' CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs' HOST_LEVEL_PARAMS_TOPIC_ENPOINT = '/agents/host_level_params' +ALERTS_DEFENITIONS_REQUEST_ENDPOINT = '/agents/alert_defenitions' COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status' COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status' HOST_STATUS_REPORTS_ENDPOINT = '/reports/host_status' http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index 85840d0..e2fe4af 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -34,6 +34,7 @@ from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEvent from ambari_agent.listeners.MetadataEventListener import MetadataEventListener from ambari_agent.listeners.CommandsEventListener import CommandsEventListener from ambari_agent.listeners.HostLevelParamsEventListener import HostLevelParamsEventListener +from ambari_agent.listeners.AlertDefinitionsEventListener import AlertDefinitionsEventListener HEARTBEAT_INTERVAL = 10 REQUEST_RESPONSE_TIMEOUT = 10 @@ -60,13 +61,15 @@ class HeartbeatThread(threading.Thread): self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache) self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache) self.host_level_params_events_listener = HostLevelParamsEventListener(initializer_module.host_level_params_cache, initializer_module.recovery_manager) - self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener, self.host_level_params_events_listener] + self.alert_definitions_events_listener = AlertDefinitionsEventListener(initializer_module.alert_definitions_cache, initializer_module.alert_scheduler_handler) + self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener, self.host_level_params_events_listener, self.alert_definitions_events_listener] self.post_registration_requests = [ (Constants.TOPOLOGY_REQUEST_ENDPOINT, initializer_module.topology_cache, self.topology_events_listener), (Constants.METADATA_REQUEST_ENDPOINT, initializer_module.metadata_cache, self.metadata_events_listener), (Constants.CONFIGURATIONS_REQUEST_ENDPOINT, initializer_module.configurations_cache, self.configuration_events_listener), - (Constants.HOST_LEVEL_PARAMS_TOPIC_ENPOINT, initializer_module.host_level_params_cache, self.host_level_params_events_listener) + (Constants.HOST_LEVEL_PARAMS_TOPIC_ENPOINT, initializer_module.host_level_params_cache, self.host_level_params_events_listener), + (Constants.ALERTS_DEFENITIONS_REQUEST_ENDPOINT, initializer_module.alert_definitions_cache, self.alert_definitions_events_listener) ] self.responseId = 0 self.file_cache = initializer_module.file_cache http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/InitializerModule.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index 6ef4a04..9b031f7 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -29,14 +29,16 @@ from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache from ambari_agent.ClusterTopologyCache import ClusterTopologyCache from ambari_agent.ClusterMetadataCache import ClusterMetadataCache from ambari_agent.ClusterHostLevelParamsCache import ClusterHostLevelParamsCache +from ambari_agent.ClusterAlertDefinitionsCache import ClusterAlertDefinitionsCache from ambari_agent.Utils import lazy_property from ambari_agent.security import AmbariStompConnection from ambari_agent.ActionQueue import ActionQueue from ambari_agent.CommandStatusDict import CommandStatusDict from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator from ambari_agent.RecoveryManager import RecoveryManager +from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler -logger = logging.getLogger() +logger = logging.getLogger(__name__) class InitializerModule: """ @@ -60,8 +62,14 @@ class InitializerModule: self.cache_dir = self.config.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache') self.command_reports_interval = int(self.config.get('agent', 'command_reports_interval', default='5')) + self.cluster_cache_dir = os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY) self.recovery_cache_dir = os.path.join(self.cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY) + self.alerts_cachedir = os.path.join(self.cache_dir, FileCache.ALERTS_CACHE_DIRECTORY) + self.stacks_dir = os.path.join(self.cache_dir, FileCache.STACKS_CACHE_DIRECTORY) + self.common_services_dir = os.path.join(self.cache_dir, FileCache.COMMON_SERVICES_DIRECTORY) + self.extensions_dir = os.path.join(self.cache_dir, FileCache.EXTENSIONS_CACHE_DIRECTORY) + self.host_scripts_dir = os.path.join(self.cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY) self.host_status_report_interval = int(self.config.get('heartbeat', 'state_interval_seconds', '60')) @@ -77,6 +85,7 @@ class InitializerModule: self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir, self.config) self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir) self.host_level_params_cache = ClusterHostLevelParamsCache(self.cluster_cache_dir) + self.alert_definitions_cache = ClusterAlertDefinitionsCache(self.cluster_cache_dir) self.file_cache = FileCache(self.config) @@ -85,6 +94,7 @@ class InitializerModule: self.recovery_manager = RecoveryManager(self.recovery_cache_dir) self.commandStatuses = CommandStatusDict(self) self.action_queue = ActionQueue(self) + self.alert_scheduler_handler = AlertSchedulerHandler(self) @lazy_property def connection(self): http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py index b75b540..ce1583f 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py @@ -23,7 +23,7 @@ import re import time from collections import namedtuple -logger = logging.getLogger() +logger = logging.getLogger(__name__) # create a named tuple to return both the concrete URI and SSL flag AlertUri = namedtuple('AlertUri', 'uri is_ssl_enabled') http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/alerts/collector.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/collector.py b/ambari-agent/src/main/python/ambari_agent/alerts/collector.py index adc7f47..089301f 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/collector.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/collector.py @@ -21,7 +21,7 @@ limitations under the License. import logging import threading -logger = logging.getLogger() +logger = logging.getLogger(__name__) class AlertCollector(): """ http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py index 803bdc6..da49d2a 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/metric_alert.py @@ -32,7 +32,7 @@ from resource_management.libraries.functions.get_port_from_url import get_port_f from resource_management.libraries.functions.curl_krb_request import curl_krb_request from ambari_agent import Constants -logger = logging.getLogger() +logger = logging.getLogger(__name__) SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}' http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py index 1e32718..b99b964 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py @@ -25,7 +25,7 @@ from alerts.base_alert import BaseAlert from resource_management.libraries.functions.get_port_from_url import get_port_from_url from ambari_commons import OSCheck from ambari_commons.inet_utils import resolve_address -logger = logging.getLogger() +logger = logging.getLogger(__name__) # default timeouts DEFAULT_WARNING_TIMEOUT = 1.5 http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py index 3092a39..de221ae 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/recovery_alert.py @@ -21,7 +21,7 @@ limitations under the License. import logging import datetime from alerts.base_alert import BaseAlert -logger = logging.getLogger() +logger = logging.getLogger(__name__) # default recoveries counts DEFAULT_WARNING_RECOVERIES_COUNT = 2 http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py index 945a222..301e440 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py @@ -27,7 +27,7 @@ from resource_management.core.environment import Environment from resource_management.libraries.functions.curl_krb_request import KERBEROS_KINIT_TIMER_PARAMETER from ambari_agent import Constants -logger = logging.getLogger("ambari_alerts") +logger = logging.getLogger(__name__) class ScriptAlert(BaseAlert): http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py b/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py index d6624cc..cb5f7ed 100644 --- a/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py +++ b/ambari-agent/src/main/python/ambari_agent/apscheduler/threadpool.py @@ -118,7 +118,7 @@ class ThreadPool(object): if self._shutdown: return - logging.info('Shutting down thread pool') + logger.info('Shutting down thread pool') self._shutdown = True _threadpools.remove(ref(self)) http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py new file mode 100644 index 0000000..0829c31 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import ambari_stomp + +from ambari_agent.listeners import EventListener +from ambari_agent import Constants + +logger = logging.getLogger(__name__) + +class AlertDefinitionsEventListener(EventListener): + """ + Listener of Constants.ALERTS_DEFENITIONS_TOPIC events from server. + """ + def __init__(self, alert_definitions_cache, alert_scheduler_handler): + self.alert_definitions_cache = alert_definitions_cache + self.alert_scheduler_handler = alert_scheduler_handler + + def on_event(self, headers, message): + """ + Is triggered when an event to Constants.ALERTS_DEFENITIONS_TOPIC topic is received from server. + + @param headers: headers dictionary + @param message: message payload dictionary + """ + # this kind of response is received if hash was identical. And server does not need to change anything + if message == {}: + return + + self.alert_definitions_cache.rewrite_cache(message['clusters']) + print message + self.alert_definitions_cache.hash = message['hash'] + + self.alert_scheduler_handler.update_definitions() + + def get_handled_path(self): + return Constants.ALERTS_DEFENITIONS_TOPIC \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py index 5aee634..aee2992 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py @@ -36,7 +36,7 @@ class HostLevelParamsEventListener(EventListener): def on_event(self, headers, message): """ - Is triggered when an event to Constants.CONFIGURATIONS_TOPIC topic is received from server. + Is triggered when an event to Constants.HOST_LEVEL_PARAMS_TOPIC topic is received from server. @param headers: headers dictionary @param message: message payload dictionary http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index 56962d3..c255e8b 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -109,8 +109,13 @@ from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor from ambari_agent.CommandStatusReporter import CommandStatusReporter from ambari_agent.HostStatusReporter import HostStatusReporter +#logging.getLogger('ambari_agent').propagate = False + logger = logging.getLogger() -alerts_logger = logging.getLogger('ambari_alerts') +alerts_logger = logging.getLogger('alerts') +alerts_logger_global = logging.getLogger('ambari_agent.alerts') +apscheduler_logger = logging.getLogger('apscheduler') +apscheduler_logger_global = logging.getLogger('ambari_agent.apscheduler') formatstr = "%(levelname)s %(asctime)s %(filename)s:%(lineno)d - %(message)s" agentPid = os.getpid() @@ -131,6 +136,7 @@ SYSLOG_FORMATTER = logging.Formatter(SYSLOG_FORMAT_STRING) _file_logging_handlers ={} def setup_logging(logger, filename, logging_level): + logger.propagate = False formatter = logging.Formatter(formatstr) if filename in _file_logging_handlers: @@ -139,6 +145,7 @@ def setup_logging(logger, filename, logging_level): rotateLog = logging.handlers.RotatingFileHandler(filename, "a", 10000000, 25) rotateLog.setFormatter(formatter) _file_logging_handlers[filename] = rotateLog + logger.handlers = [] logger.addHandler(rotateLog) logging.basicConfig(format=formatstr, level=logging_level, filename=filename) @@ -345,6 +352,8 @@ def reset_agent(options): MAX_RETRIES = 10 def run_threads(initializer_module): + initializer_module.alert_scheduler_handler.start() + heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module) heartbeat_thread.start() @@ -387,6 +396,9 @@ def main(initializer_module, heartbeat_stop_callback=None): global is_logger_setup is_logger_setup = True setup_logging(alerts_logger, AmbariConfig.AmbariConfig.getAlertsLogFile(), logging_level) + setup_logging(alerts_logger_global, AmbariConfig.AmbariConfig.getAlertsLogFile(), logging_level) + setup_logging(apscheduler_logger, AmbariConfig.AmbariConfig.getAlertsLogFile(), logging_level) + setup_logging(apscheduler_logger_global, AmbariConfig.AmbariConfig.getAlertsLogFile(), logging_level) Logger.initialize_logger('resource_management', logging_level=logging_level) if home_dir != "": http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py index 5db53c8..b7529a8 100644 --- a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py +++ b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py @@ -150,6 +150,12 @@ class BaseStompServerTestCase(unittest.TestCase): logging.getLogger('stomp.py').setLevel(logging.WARN) logging.getLogger('coilmq').setLevel(logging.INFO) + logging.getLogger('ambari_agent.apscheduler').setLevel(logging.WARN) + logging.getLogger('apscheduler').setLevel(logging.WARN) + logging.getLogger('ambari_agent.alerts').setLevel(logging.WARN) + logging.getLogger('alerts').setLevel(logging.WARN) + logging.getLogger('ambari_agent.AlertSchedulerHandler').setLevel(logging.WARN) + def remove_files(self, filepathes): for filepath in filepathes: http://git-wip-us.apache.org/repos/asf/ambari/blob/6578b5a2/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py index 88f1e69..38d0e9e 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py @@ -40,7 +40,7 @@ from mock.mock import MagicMock, patch @patch("ambari_agent.hostname.hostname", new=MagicMock(return_value="c6401.ambari.apache.org")) class TestAgentStompResponses(BaseStompServerTestCase): def setUp(self): - self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json', '/tmp/host_level_params.json']) + self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json', '/tmp/host_level_params.json', '/tmp/cluster_cache/alert_definitions.json']) if not os.path.exists("/tmp/ambari-agent"): os.mkdir("/tmp/ambari-agent") @@ -60,6 +60,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): action_queue = initializer_module.action_queue action_queue.start() + initializer_module.alert_scheduler_handler.start() component_status_executor = ComponentStatusExecutor(initializer_module) component_status_executor.start() @@ -86,10 +87,14 @@ class TestAgentStompResponses(BaseStompServerTestCase): f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=self.get_json("host_level_params.json")) self.server.topic_manager.send(f) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body=self.get_json("alert_definitions.json")) + self.server.topic_manager.send(f) + initial_topology_request = self.server.frames_queue.get() initial_metadata_request = self.server.frames_queue.get() initial_configs_request = self.server.frames_queue.get() initial_host_level_params_request = self.server.frames_queue.get() + initial_alert_definitions_request = self.server.frames_queue.get() while not initializer_module.is_registered: time.sleep(0.1) @@ -108,6 +113,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): metadata_subscribe_frame = self.server.frames_queue.get() topologies_subscribe_frame = self.server.frames_queue.get() host_level_params_subscribe_frame = self.server.frames_queue.get() + alert_definitions_subscribe_frame = self.server.frames_queue.get() heartbeat_frame = self.server.frames_queue.get() dn_install_in_progress_frame = json.loads(self.server.frames_queue.get().body) dn_install_failed_frame = json.loads(self.server.frames_queue.get().body) @@ -121,7 +127,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): initializer_module.stop_event.set() - f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body=json.dumps({'id':'1'})) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '6'}, body=json.dumps({'id':'1'})) self.server.topic_manager.send(f) command_status_reporter.join() @@ -130,7 +136,6 @@ class TestAgentStompResponses(BaseStompServerTestCase): host_status_reporter.join() action_queue.join() - self.assertTrue('mounts' in host_status_report) self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostName'], 'c6401.ambari.apache.org') self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'], ('STATUS',)) @@ -156,6 +161,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): action_queue = initializer_module.action_queue action_queue.start() + initializer_module.alert_scheduler_handler.start() component_status_executor = ComponentStatusExecutor(initializer_module) component_status_executor.start() @@ -186,6 +192,9 @@ class TestAgentStompResponses(BaseStompServerTestCase): f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body='{}') self.server.topic_manager.send(f) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body='{}') + self.server.topic_manager.send(f) + commands_subscribe_frame = self.server.frames_queue.get() configurations_subscribe_frame = self.server.frames_queue.get() metadata_subscribe_frame = self.server.frames_queue.get() @@ -195,7 +204,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): initializer_module.stop_event.set() - f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body=json.dumps({'id':'1'})) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '6'}, body=json.dumps({'id':'1'})) self.server.topic_manager.send(f) heartbeat_thread.join() @@ -232,10 +241,14 @@ class TestAgentStompResponses(BaseStompServerTestCase): f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body='{}') self.server.topic_manager.send(f) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body='{}') + self.server.topic_manager.send(f) + initial_topology_request = self.server.frames_queue.get() initial_metadata_request = self.server.frames_queue.get() initial_configs_request = self.server.frames_queue.get() initial_host_level_params_request = self.server.frames_queue.get() + initial_alert_definitions_request = self.server.frames_queue.get() while not initializer_module.is_registered: time.sleep(0.1) @@ -273,7 +286,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): initializer_module.stop_event.set() - f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body=json.dumps({'id':'1'})) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '6'}, body=json.dumps({'id':'1'})) self.server.topic_manager.send(f) heartbeat_thread.join()
