Repository: ambari Updated Branches: refs/heads/branch-2.0.0 858959cd0 -> 278b7a027
AMBARI-9807 - Store Configuration In Agent Memory For Alerts (jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/278b7a02 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/278b7a02 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/278b7a02 Branch: refs/heads/branch-2.0.0 Commit: 278b7a027c991c98d224325e693170ce6585bb2b Parents: 858959c Author: Jonathan Hurley <[email protected]> Authored: Wed Feb 25 23:40:52 2015 -0500 Committer: Jonathan Hurley <[email protected]> Committed: Thu Feb 26 11:48:24 2015 -0500 ---------------------------------------------------------------------- .../ambari_agent/AlertSchedulerHandler.py | 166 ++----- .../python/ambari_agent/ClusterConfiguration.py | 164 +++++++ .../src/main/python/ambari_agent/Controller.py | 26 +- .../python/ambari_agent/alerts/base_alert.py | 116 ++--- .../python/ambari_agent/alerts/port_alert.py | 10 +- .../python/ambari_agent/alerts/script_alert.py | 41 +- .../python/ambari_agent/alerts/web_alert.py | 4 +- .../src/test/python/ambari_agent/TestAlerts.py | 489 ++++++++++++------- .../TestClusterConfigurationCache.py | 100 ++++ .../alerts/alert_nodemanagers_summary.py | 8 + 10 files changed, 714 insertions(+), 410 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/278b7a02/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py index ba25936..a53201e 100644 --- a/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py +++ b/ambari-agent/src/main/python/ambari_agent/AlertSchedulerHandler.py @@ -26,7 +26,6 @@ import logging import os import sys import time -import traceback from apscheduler.scheduler import Scheduler from alerts.collector import AlertCollector from alerts.metric_alert import MetricAlert @@ -38,8 +37,6 @@ logger = logging.getLogger() class AlertSchedulerHandler(): - make_cachedir = True - FILENAME = 'definitions.json' TYPE_PORT = 'PORT' TYPE_METRIC = 'METRIC' @@ -53,13 +50,17 @@ class AlertSchedulerHandler(): } - def __init__(self, cachedir, stacks_dir, common_services_dir, host_scripts_dir, config, in_minutes=True): + def __init__(self, cachedir, stacks_dir, common_services_dir, host_scripts_dir, + cluster_configuration, config, in_minutes=True): + self.cachedir = cachedir self.stacks_dir = stacks_dir self.common_services_dir = common_services_dir self.host_scripts_dir = host_scripts_dir + + self._cluster_configuration = cluster_configuration - if not os.path.exists(cachedir) and AlertSchedulerHandler.make_cachedir: + if not os.path.exists(cachedir): try: os.makedirs(cachedir) except: @@ -69,64 +70,35 @@ class AlertSchedulerHandler(): self.__scheduler = Scheduler(AlertSchedulerHandler.APS_CONFIG) self.__in_minutes = in_minutes self.config = config - self.__config_maps = {} - - - def update_definitions(self, alert_commands, reschedule_jobs=False): - """ updates the persisted definitions and restarts the scheduler """ - - with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f: - json.dump(alert_commands, f, indent=2) - - if reschedule_jobs: - self.reschedule() - def __update_definition_configs(self, newConfigurations, reschedule_jobs=False): + def update_definitions(self, heartbeat): """ - Updates the definitions and configurations stored on disk. Optionally - can reschedule jobs. Job rescheduling is only necessary when data that - an existing job uses has changed. In many cases, configuration values - have changed, yet no jobs need rescheduling. - - :param reschedule_jobs: + Updates the persisted alert definitions JSON. + :param heartbeat: :return: """ - - if reschedule_jobs: - logger.info("[AlertScheduler] Updating {0} with the latest configuration values and rescheduling alert jobs".format(self.FILENAME)) - else: - logger.info("[AlertScheduler] Updating {0} with the latest configuration values".format(self.FILENAME)) - - # Load definitions from json - try: - with open(os.path.join(self.cachedir, self.FILENAME), 'r') as fp: - all_commands = json.load(fp) - except IOError, ValueError: - if logger.isEnabledFor(logging.DEBUG): - logger.exception("[AlertScheduler] Failed to load definitions. {0}".format(traceback.format_exc())) + if 'alertDefinitionCommands' not in heartbeat: + logger.warning("There are no alert definition commands in the heartbeat; unable to update definitions") return - # Update definitions with current config - for command_json in all_commands: - if 'clusterName' in command_json: - clusterName = command_json['clusterName'] - else: - clusterName = '' + # prune out things we don't want to store + alert_definitions = [] + for command in heartbeat['alertDefinitionCommands']: + command_copy = command.copy() - self.__update_config_values(command_json['configurations'], - self.__config_maps[clusterName]) + # no need to store these since we always use the in-memory cached values + if 'configurations' in command_copy: + del command_copy['configurations'] - # update the configurations before writing the file back out - command_json['configurations'] = newConfigurations + alert_definitions.append(command_copy) - # Save definitions to file + # write out the new definitions with open(os.path.join(self.cachedir, self.FILENAME), 'w') as f: - json.dump(all_commands, f, indent=2) + json.dump(alert_definitions, f, indent=2) - # only reschdule jobs if instructed to - if reschedule_jobs: - self.reschedule_all() + # reschedule only the jobs that have changed + self.reschedule() def __make_function(self, alert_def): @@ -206,6 +178,7 @@ class AlertSchedulerHandler(): logger.info("[AlertScheduler] Reschedule Summary: {0} rescheduled, {1} unscheduled".format( str(jobs_scheduled), str(jobs_removed))) + def reschedule_all(self): """ Removes jobs that are scheduled where their UUID no longer is valid. @@ -239,7 +212,11 @@ class AlertSchedulerHandler(): def __load_definitions(self): - """ loads all alert commands from the file. all clusters are stored in one file """ + """ + Loads all alert definitions from a file. All clusters are stored in + a single file. + :return: + """ definitions = [] all_commands = None @@ -248,31 +225,20 @@ class AlertSchedulerHandler(): with open(alerts_definitions_path) as fp: all_commands = json.load(fp) except: - logger.warning('[AlertScheduler] {0} not found. No alerts will be scheduled.'.format(alerts_definitions_path)) + logger.warning('[AlertScheduler] {0} not found or invalid. No alerts will be scheduled until registration occurs.'.format(alerts_definitions_path)) return definitions for command_json in all_commands: clusterName = '' if not 'clusterName' in command_json else command_json['clusterName'] hostName = '' if not 'hostName' in command_json else command_json['hostName'] - configmap = None - # each cluster gets a map of key/value pairs of substitution values - self.__config_maps[clusterName] = {} - if 'configurations' in command_json: - configmap = command_json['configurations'] - for definition in command_json['alertDefinitions']: alert = self.__json_to_callable(clusterName, hostName, definition) if alert is None: continue - # get the config values for the alerts 'lookup keys', - # eg: hdfs-site/dfs.namenode.http-address : host_and_port - vals = self.__find_config_values(configmap, alert.get_lookup_keys()) - self.__config_maps[clusterName].update(vals) - - alert.set_helpers(self._collector, self.__config_maps[clusterName]) + alert.set_helpers(self._collector, self._cluster_configuration) definitions.append(alert) @@ -310,69 +276,6 @@ class AlertSchedulerHandler(): return alert - def __find_config_values(self, configmap, obj_keylist): - """ - finds templated values in the configuration map provided by the server - and returns a dictionary of template key to value - """ - - if configmap is None: - return {} - - result = {} - - for key in obj_keylist: - try: - obj = configmap - for layer in key.split('/'): - obj = obj[layer] - result[key] = obj - except KeyError: # the nested key is missing somewhere - pass - - return result - - def __update_config_values(self, configs, actual_configs): - for slashkey in actual_configs.keys(): - dicts = slashkey.split('/') - current_dict = configs - for i in range(len(dicts)): - if i+1 >= len(dicts): - current_dict[dicts[i]] = actual_configs[slashkey] - else: - if not dicts[i] in current_dict: - current_dict[dicts[i]]={} - current_dict = current_dict[dicts[i]] - - def update_configurations(self, commands): - """ - Checks the execution command's configurations against those stored in - memory. If there are differences, this will reschedule alerts. The - on-disk JSON file is always updated so that it reflects the correct state - of configurations - """ - for command in commands: - clusterName = command['clusterName'] - if not clusterName in self.__config_maps: - continue - - if not 'configurations' in command: - continue - - existingConfigurationKeys = self.__config_maps[clusterName].keys() - newConfigurations = command['configurations'] - newConfigurationValues = self.__find_config_values(newConfigurations, - existingConfigurationKeys) - - # if we have updated values push them to config_maps and reschedule - rescheduleJobs = False - if newConfigurationValues != self.__config_maps[clusterName]: - rescheduleJobs = True - self.__config_maps[clusterName].update(newConfigurationValues) - - self.__update_definition_configs(newConfigurations, rescheduleJobs) - - def schedule_definition(self,definition): """ Schedule a definition (callable). Scheduled jobs are given the UUID @@ -382,7 +285,7 @@ class AlertSchedulerHandler(): simply NOOP. """ # NOOP if the definition is disabled; don't schedule it - if definition.is_enabled() == False: + if not definition.is_enabled(): logger.info("[AlertScheduler] The alert {0} with UUID {1} is disabled and will not be scheduled".format( definition.get_name(),definition.get_uuid())) return @@ -408,7 +311,7 @@ class AlertSchedulerHandler(): def get_job_count(self): """ Gets the number of jobs currently scheduled. This is mainly used for - test verification of scheduling + test verification of scheduling. """ if self.__scheduler is None: return 0 @@ -440,11 +343,12 @@ class AlertSchedulerHandler(): logger.info("[AlertScheduler] Executing on-demand alert {0} ({1})".format(alert.get_name(), alert.get_uuid())) - alert.set_helpers(self._collector, self.__config_maps[clusterName]) + alert.set_helpers(self._collector, self._cluster_configuration) alert.collect() except: logger.exception("[AlertScheduler] Unable to execute the alert outside of the job scheduler") + def main(): args = list(sys.argv) del args[0] http://git-wip-us.apache.org/repos/asf/ambari/blob/278b7a02/ambari-agent/src/main/python/ambari_agent/ClusterConfiguration.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterConfiguration.py b/ambari-agent/src/main/python/ambari_agent/ClusterConfiguration.py new file mode 100644 index 0000000..ec2cff7 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/ClusterConfiguration.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import logging +import json +import os +import threading + +logger = logging.getLogger() + +class ClusterConfiguration(): + """ + Maintains an in-memory cache and disk cache of the configurations for + every cluster. This is useful for having quick access to any of the + configuration properties. + """ + + FILENAME = 'configurations.json' + + # constants that define which commands hold configurations that can be + # used to populate this cache + EXECUTION_COMMANDS = 'executionCommands' + ALERT_DEFINITION_COMMANDS = 'alertDefinitionCommands' + COMMANDS_WITH_CONFIGURATIONS = [EXECUTION_COMMANDS, ALERT_DEFINITION_COMMANDS] + + def __init__(self, cluster_config_cache_dir): + """ + Initializes the configuration cache. + :param cluster_config_cache_dir: + :return: + """ + self.cluster_config_cache_dir = cluster_config_cache_dir + + # keys are cluster names, values are configurations + self.__configurations = {} + + self.__file_lock = threading.RLock() + self.__cache_lock = threading.RLock() + self.__config_json_file = os.path.join(self.cluster_config_cache_dir, self.FILENAME) + + # ensure that our cache directory exists + if not os.path.exists(cluster_config_cache_dir): + try: + os.makedirs(cluster_config_cache_dir) + except: + logger.critical("Could not create the cluster configuration cache directory {0}".format(cluster_config_cache_dir)) + + # if the file exists, then load it + try: + if os.path.isfile(self.__config_json_file): + with open(self.__config_json_file, 'r') as fp: + self.__configurations = json.load(fp) + except Exception, exception: + logger.warning("Unable to load configurations from {0}. This file will be regenerated on registration".format(self.__config_json_file)) + + + def update_configurations_from_heartbeat(self, heartbeat): + """ + Updates the in-memory and disk-based cluster configurations based on + the heartbeat. This will only update configurations on the following + types of commands in the heartbeat: execution, and alert definition. + :param new_configurations: + :return: + """ + heartbeat_keys = heartbeat.keys() + + heartbeat_contains_configurations = False + for commandType in self.COMMANDS_WITH_CONFIGURATIONS: + if commandType in heartbeat_keys: + heartbeat_contains_configurations = True + + # if this heartbeat doesn't contain a command with configurations, then + # don't process it + if not heartbeat_contains_configurations: + return + + if self.EXECUTION_COMMANDS in heartbeat_keys: + execution_commands = heartbeat[self.EXECUTION_COMMANDS] + for command in execution_commands: + if 'clusterName' in command and 'configurations' in command: + cluster_name = command['clusterName'] + configurations = command['configurations'] + self._update_configurations(cluster_name, configurations) + + return + + if self.ALERT_DEFINITION_COMMANDS in heartbeat_keys: + alert_definition_commands = heartbeat[self.ALERT_DEFINITION_COMMANDS] + for command in alert_definition_commands: + if 'clusterName' in command and 'configurations' in command: + cluster_name = command['clusterName'] + configurations = command['configurations'] + self._update_configurations(cluster_name, configurations) + + return + + + def _update_configurations(self, cluster_name, configuration): + """ + Thread-safe method for writing out the specified cluster configuration + and updating the in-memory representation. + :param cluster_name: + :param configuration: + :return: + """ + logger.info("Updating cached configurations for cluster {0}".format(cluster_name)) + + self.__cache_lock.acquire() + try: + self.__configurations[cluster_name] = configuration + except Exception, exception : + logger.exception("Unable to update configurations for cluster {0}".format(cluster_name)) + finally: + self.__cache_lock.release() + + + self.__file_lock.acquire() + try: + with open(self.__config_json_file, 'w') as f: + json.dump(self.__configurations, f, indent=2) + except Exception, exception : + logger.exception("Unable to update configurations for cluster {0}".format(cluster_name)) + finally: + self.__file_lock.release() + + + def get_configuration_value(self, cluster_name, key): + """ + Gets a value from the cluster configuration map for the given cluster and + key. The key is expected to be of the form 'foo-bar/baz' or + 'foo-bar/bar-baz/foobarbaz' where every / denotes a new mapping + :param key: a lookup key, like 'foo-bar/baz' + :return: the value, or None if not found + """ + self.__cache_lock.acquire() + try: + dictionary = self.__configurations[cluster_name] + for layer_key in key.split('/'): + dictionary = dictionary[layer_key] + + return dictionary + + except Exception: + logger.debug("Cache miss for configuration property {0} in cluster {1}".format(key, cluster_name)) + return None + finally: + self.__cache_lock.release() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/278b7a02/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 1bf2613..bd961c1 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -35,6 +35,7 @@ import hostname import security import ssl import AmbariConfig + from Heartbeat import Heartbeat from Register import Register from ActionQueue import ActionQueue @@ -42,6 +43,7 @@ from FileCache import FileCache from NetUtil import NetUtil from LiveStatus import LiveStatus from AlertSchedulerHandler import AlertSchedulerHandler +from ClusterConfiguration import ClusterConfiguration from HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers logger = logging.getLogger() @@ -89,9 +91,13 @@ class Controller(threading.Thread): common_services_cache_dir = os.path.join(cache_dir, FileCache.COMMON_SERVICES_DIRECTORY) host_scripts_cache_dir = os.path.join(cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY) alerts_cache_dir = os.path.join(cache_dir, 'alerts') - + cluster_config_cache_dir = os.path.join(cache_dir, 'cluster_configuration') + + self.cluster_configuration = ClusterConfiguration(cluster_config_cache_dir) + self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir, - stacks_cache_dir, common_services_cache_dir, host_scripts_cache_dir, config) + stacks_cache_dir, common_services_cache_dir, host_scripts_cache_dir, + self.cluster_configuration, config) def __del__(self): @@ -149,12 +155,11 @@ class Controller(threading.Thread): else: self.hasMappedComponents = False - if 'alertDefinitionCommands' in ret.keys(): - logger.info("Got alert definition update on registration " + pprint.pformat(ret['alertDefinitionCommands'])) - self.alert_scheduler_handler.update_definitions(ret['alertDefinitionCommands']) - pass + # always update cached cluster configurations on registration + self.cluster_configuration.update_configurations_from_heartbeat(ret) - pass + # always update alert definitions on registration + self.alert_scheduler_handler.update_definitions(ret) except ssl.SSLError: self.repeatRegistration = False self.isRegistered = False @@ -258,6 +263,10 @@ class Controller(threading.Thread): else: self.responseId = serverId + # if the response contains configurations, update the in-memory and + # disk-based configuration cache (execution and alert commands have this) + self.cluster_configuration.update_configurations_from_heartbeat(response) + response_keys = response.keys() if 'cancelCommands' in response_keys: self.cancelCommandInQueue(response['cancelCommands']) @@ -265,13 +274,12 @@ class Controller(threading.Thread): if 'executionCommands' in response_keys: execution_commands = response['executionCommands'] self.addToQueue(execution_commands) - self.alert_scheduler_handler.update_configurations(execution_commands) if 'statusCommands' in response_keys: self.addToStatusQueue(response['statusCommands']) if 'alertDefinitionCommands' in response_keys: - self.alert_scheduler_handler.update_definitions(response['alertDefinitionCommands'], True) + self.alert_scheduler_handler.update_definitions(response) if 'alertExecutionCommands' in response_keys: self.alert_scheduler_handler.execute_alert(response['alertExecutionCommands']) http://git-wip-us.apache.org/repos/asf/ambari/blob/278b7a02/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py index 2f2c997..34aeff6 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py @@ -35,7 +35,7 @@ class BaseAlert(object): def __init__(self, alert_meta, alert_source_meta): self.alert_meta = alert_meta self.alert_source_meta = alert_source_meta - self.cluster = '' + self.cluster_name = '' self.host_name = '' self._lookup_keys = [] @@ -70,16 +70,28 @@ class BaseAlert(object): return self.alert_meta['uuid'] - def set_helpers(self, collector, value_dict): - """ sets helper objects for alerts without having to use them in a constructor """ + def set_helpers(self, collector, cluster_configuration): + """ + sets helper objects for alerts without having to use them in a constructor + """ self.collector = collector - self.config_value_dict = value_dict + self.cluster_configuration = cluster_configuration - def set_cluster(self, cluster, host): + def set_cluster(self, cluster_name, host_name): """ sets cluster information for the alert """ - self.cluster = cluster - self.host_name = host + self.cluster_name = cluster_name + self.host_name = host_name + + + def _get_alert_meta_value_safely(self, meta_key): + """ + safe way to get a value when outputting result json. will not throw an exception + """ + if self.alert_meta.has_key(meta_key): + return self.alert_meta[meta_key] + else: + return None def collect(self): @@ -129,73 +141,51 @@ class BaseAlert(object): logger.debug("[Alert][{0}] result = {1}".format(self.get_name(), str(res))) data = {} - data['name'] = self._find_value('name') - data['label'] = self._find_value('label') + data['name'] = self._get_alert_meta_value_safely('name') + data['label'] = self._get_alert_meta_value_safely('label') data['state'] = res[0] data['text'] = res_base_text.format(*res[1]) - data['cluster'] = self.cluster - data['service'] = self._find_value('serviceName') - data['component'] = self._find_value('componentName') + data['cluster'] = self.cluster_name + data['service'] = self._get_alert_meta_value_safely('serviceName') + data['component'] = self._get_alert_meta_value_safely('componentName') data['timestamp'] = long(time.time() * 1000) - data['uuid'] = self._find_value('uuid') - data['enabled'] = self._find_value('enabled') + data['uuid'] = self._get_alert_meta_value_safely('uuid') + data['enabled'] = self._get_alert_meta_value_safely('enabled') if logger.isEnabledFor(logging.DEBUG): logger.debug("[Alert][{0}] text = {1}".format(self.get_name(), data['text'])) - self.collector.put(self.cluster, data) - - - def _find_value(self, meta_key): - """ safe way to get a value when outputting result json. will not throw an exception """ - if self.alert_meta.has_key(meta_key): - return self.alert_meta[meta_key] - else: - return None - - - def get_lookup_keys(self): - """ returns a list of lookup keys found for this alert """ - return self._lookup_keys + self.collector.put(self.cluster_name, data) - def _find_lookup_property(self, key): + def _get_configuration_value(self, key): """ - check if the supplied key is parameterized and appends the extracted key - to the array of keys + Gets the value of the specified configuration key from the cache. The key + should be of the form {{foo-bar/baz}}. If the key is not a lookup key + and is instead a constant, such as "foo" or "5", then the constant is + returned. + :return: """ - keys = re.findall("{{([\S]+)}}", key) - - if len(keys) > 0: - if logger.isEnabledFor(logging.DEBUG): - logger.debug("[Alert][{0}] Found parameterized key {1} for {2}".format( - self.get_name(), str(keys), str(self))) - - self._lookup_keys.append(keys[0]) - return keys[0] - - return key - + # parse {{foo-bar/baz}} + placeholder_keys = re.findall("{{([\S]+)}}", key) - def _lookup_property_value(self, key): - """ - in the case of specifying a configuration path, lookup that path's value - """ - if not key in self._lookup_keys: + # if none found, then return the original + if len(placeholder_keys) == 0: return key - if key in self.config_value_dict: - return self.config_value_dict[key] - else: - return None + # this is a lookup key, so transform it into a value from the config cache + placeholder_key = placeholder_keys[0] + + return self.cluster_configuration.get_configuration_value( + self.cluster_name, placeholder_key) def _lookup_uri_property_keys(self, uri_structure): """ Loads the configuration lookup keys that the URI structure needs. This will return a named tuple that contains the keys needed to lookup - parameterized URI values from the URI structure. The URI structure looks - something like: + parameterized URI values from the cached configuration. + The URI structure looks something like: "uri":{ "http": foo, @@ -216,13 +206,13 @@ class BaseAlert(object): kerberos_principal = None if 'http' in uri_structure: - http_key = self._find_lookup_property(uri_structure['http']) + http_key = uri_structure['http'] if 'https' in uri_structure: - https_key = self._find_lookup_property(uri_structure['https']) + https_key = uri_structure['https'] if 'https_property' in uri_structure: - https_property_key = self._find_lookup_property(uri_structure['https_property']) + https_property_key = uri_structure['https_property'] if 'https_property_value' in uri_structure: https_property_value_key = uri_structure['https_property_value'] @@ -231,10 +221,10 @@ class BaseAlert(object): default_port = uri_structure['default_port'] if 'kerberos_keytab' in uri_structure: - kerberos_keytab = self._find_lookup_property(uri_structure['kerberos_keytab']) + kerberos_keytab = uri_structure['kerberos_keytab'] if 'kerberos_principal' in uri_structure: - kerberos_principal = self._find_lookup_property(uri_structure['kerberos_principal']) + kerberos_principal = uri_structure['kerberos_principal'] AlertUriLookupKeys = namedtuple('AlertUriLookupKeys', 'http https https_property https_property_value default_port kerberos_keytab kerberos_principal') @@ -274,16 +264,16 @@ class BaseAlert(object): # attempt to parse and parameterize the various URIs; properties that # do not exist int he lookup map are returned as None if alert_uri_lookup_keys.http is not None: - http_uri = self._lookup_property_value(alert_uri_lookup_keys.http) + http_uri = self._get_configuration_value(alert_uri_lookup_keys.http) if alert_uri_lookup_keys.https is not None: - https_uri = self._lookup_property_value(alert_uri_lookup_keys.https) + https_uri = self._get_configuration_value(alert_uri_lookup_keys.https) if alert_uri_lookup_keys.https_property is not None: - https_property = self._lookup_property_value(alert_uri_lookup_keys.https_property) + https_property = self._get_configuration_value(alert_uri_lookup_keys.https_property) if alert_uri_lookup_keys.https_property_value is not None: - https_property_value = self._lookup_property_value(alert_uri_lookup_keys.https_property_value) + https_property_value = self._get_configuration_value(alert_uri_lookup_keys.https_property_value) # without a URI, there's no way to create the structure we need - return # the default port if specified, otherwise throw an exception http://git-wip-us.apache.org/repos/asf/ambari/blob/278b7a02/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py index bc2e554..1dbd450 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/port_alert.py @@ -41,9 +41,8 @@ class PortAlert(BaseAlert): self.warning_timeout = DEFAULT_WARNING_TIMEOUT self.critical_timeout = DEFAULT_CRITICAL_TIMEOUT - # can be parameterized or static if 'uri' in alert_source_meta: - self.uri = self._find_lookup_property(alert_source_meta['uri']) + self.uri = alert_source_meta['uri'] # always static if 'default_port' in alert_source_meta: @@ -80,8 +79,10 @@ class PortAlert(BaseAlert): def _collect(self): + # can be parameterized or static # if not parameterized, this will return the static value - uri_value = self._lookup_property_value(self.uri) + uri_value = self._get_configuration_value(self.uri) + if uri_value is None: uri_value = self.host_name logger.debug("[Alert][{0}] Setting the URI to this host since it wasn't specified".format( @@ -128,7 +129,8 @@ class PortAlert(BaseAlert): start_time = time.time() s.connect((host, port)) - milliseconds = time.time() - start_time + end_time = time.time() + milliseconds = end_time - start_time seconds = milliseconds / 1000.0 # not sure why this happens sometimes, but we don't always get a http://git-wip-us.apache.org/repos/asf/ambari/blob/278b7a02/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py index 8f24f62..d79e0a7 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/script_alert.py @@ -24,7 +24,6 @@ import os import re from alerts.base_alert import BaseAlert from resource_management.core.environment import Environment -from symbol import parameters logger = logging.getLogger() @@ -59,32 +58,26 @@ class ScriptAlert(BaseAlert): if 'host_scripts_directory' in alert_source_meta: self.host_scripts_dir = alert_source_meta['host_scripts_directory'] - - # execute the get_tokens() method so that this script correctly populates - # its list of keys - try: - cmd_module = self._load_source() - tokens = cmd_module.get_tokens() - - # for every token, populate the array keys that this alert will need - if tokens is not None: - for token in tokens: - # append the key to the list of keys for this alert - self._find_lookup_property(token) - except: - logger.exception("[Alert][{0}] Unable to parameterize tokens for script {1}".format( - self.get_name(), self.path)) - - + def _collect(self): cmd_module = self._load_source() + if cmd_module is not None: - # convert the dictionary from - # {'foo-site/bar': 'baz'} into - # {'{{foo-site/bar}}': 'baz'} parameters = {} - for key in self.config_value_dict: - parameters['{{' + key + '}}'] = self.config_value_dict[key] + + try: + tokens = cmd_module.get_tokens() + if tokens is not None: + # for each token, if there is a value, store in; otherwise don't store + # a key with a value of None + for token in tokens: + value = self._get_configuration_value(token) + if value is not None: + parameters[token] = value + except AttributeError: + # it's OK if the module doesn't have get_tokens() ; no tokens will + # be passed in so hopefully the script doesn't need any + logger.debug("The script {0} does not have a get_tokens() function".format(str(cmd_module))) # try to get basedir for scripts # it's needed for server side scripts to properly use resource management @@ -135,7 +128,7 @@ class ScriptAlert(BaseAlert): return None - return imp.load_source(self._find_value('name'), self.path_to_script) + return imp.load_source(self._get_alert_meta_value_safely('name'), self.path_to_script) def _get_reporting_text(self, state): http://git-wip-us.apache.org/repos/asf/ambari/blob/278b7a02/ambari-agent/src/main/python/ambari_agent/alerts/web_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/web_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/web_alert.py index 8ee6606..5052ca0 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/web_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/web_alert.py @@ -120,7 +120,7 @@ class WebAlert(BaseAlert): kerberos_principal = None if self.uri_property_keys.kerberos_principal is not None: - kerberos_principal = self._lookup_property_value( + kerberos_principal = self._get_configuration_value( self.uri_property_keys.kerberos_principal) if kerberos_principal is not None: @@ -128,7 +128,7 @@ class WebAlert(BaseAlert): kerberos_principal = kerberos_principal.replace('_HOST', self.host_name) if self.uri_property_keys.kerberos_keytab is not None: - kerberos_keytab = self._lookup_property_value(self.uri_property_keys.kerberos_keytab) + kerberos_keytab = self._get_configuration_value(self.uri_property_keys.kerberos_keytab) if kerberos_principal is not None and kerberos_keytab is not None: os.system("kinit -kt {0} {1} > /dev/null".format(kerberos_keytab, kerberos_principal)) http://git-wip-us.apache.org/repos/asf/ambari/blob/278b7a02/ambari-agent/src/test/python/ambari_agent/TestAlerts.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py index a04088a..ed95dc4 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py @@ -21,8 +21,6 @@ limitations under the License. import os import socket import sys -import re -import json from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler from ambari_agent.alerts.collector import AlertCollector @@ -31,6 +29,7 @@ from ambari_agent.alerts.port_alert import PortAlert from ambari_agent.alerts.script_alert import ScriptAlert from ambari_agent.alerts.web_alert import WebAlert from ambari_agent.apscheduler.scheduler import Scheduler +from ambari_agent.ClusterConfiguration import ClusterConfiguration from collections import namedtuple from mock.mock import MagicMock, patch @@ -42,7 +41,6 @@ class TestAlerts(TestCase): # save original open() method for later use self.original_open = open - def tearDown(self): sys.stdout == sys.__stdout__ @@ -55,7 +53,12 @@ class TestAlerts(TestCase): test_common_services_path = os.path.join('ambari_agent', 'dummy_files') test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files') - ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_common_services_path, test_host_scripts_path, None) + cluster_configuration = self.__get_cluster_configuration() + + ash = AlertSchedulerHandler(test_file_path, test_stack_path, + test_common_services_path, test_host_scripts_path, cluster_configuration, + None) + ash.start() self.assertTrue(aps_add_interval_job_mock.called) @@ -64,13 +67,7 @@ class TestAlerts(TestCase): @patch('time.time') @patch.object(socket.socket,"connect") def test_port_alert(self, socket_connect_mock, time_mock): - # called 3x with 3 calls per alert - # - 900ms and then a time.time() for the date from base_alert - # - 2000ms and then a time.time() for the date from base_alert - # - socket.timeout to simulate a timeout and then a time.time() for the date from base_alert - time_mock.side_effect = [0,900,336283200000,0,2000,336283200000,socket.timeout,336283200000] - - json = { "name": "namenode_process", + definition_json = { "name": "namenode_process", "service": "HDFS", "component": "NAMENODE", "label": "NameNode process", @@ -98,26 +95,39 @@ class TestAlerts(TestCase): } } + configuration = { 'hdfs-site' : { 'my-key': 'value1' } } + collector = AlertCollector() + cluster_configuration = self.__get_cluster_configuration() + self.__update_cluster_configuration(cluster_configuration, configuration) + + # called 3x with 3 calls per alert + # - 900ms and then a time.time() for the date from base_alert + # - 2000ms and then a time.time() for the date from base_alert + # - socket.timeout to simulate a timeout and then a time.time() for the date from base_alert + time_mock.side_effect = [0,900,336283000000, + 0,2000,336283100000, + socket.timeout,336283200000] - pa = PortAlert(json, json['source']) - pa.set_helpers(collector, {'hdfs-site/my-key': 'value1'}) - self.assertEquals(6, pa.interval()) + alert = PortAlert(definition_json, definition_json['source']) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") + self.assertEquals(6, alert.interval()) # 900ms is OK - pa.collect() + alert.collect() alerts = collector.alerts() self.assertEquals(0, len(collector.alerts())) self.assertEquals('OK', alerts[0]['state']) # 2000ms is WARNING - pa.collect() + alert.collect() alerts = collector.alerts() self.assertEquals(0, len(collector.alerts())) self.assertEquals('WARNING', alerts[0]['state']) # throws a socket.timeout exception, causes a CRITICAL - pa.collect() + alert.collect() alerts = collector.alerts() self.assertEquals(0, len(collector.alerts())) self.assertEquals('CRITICAL', alerts[0]['state']) @@ -125,7 +135,7 @@ class TestAlerts(TestCase): @patch.object(socket.socket,"connect") def test_port_alert_complex_uri(self, socket_connect_mock): - json = { "name": "namenode_process", + definition_json = { "name": "namenode_process", "service": "HDFS", "component": "NAMENODE", "label": "NameNode process", @@ -148,16 +158,24 @@ class TestAlerts(TestCase): } } + configuration = {'hdfs-site' : + { 'my-key': 'c6401.ambari.apache.org:2181,c6402.ambari.apache.org:2181,c6403.ambari.apache.org:2181'} + } + collector = AlertCollector() + cluster_configuration = self.__get_cluster_configuration() + self.__update_cluster_configuration(cluster_configuration, configuration) - pa = PortAlert(json, json['source']) + alert = PortAlert(definition_json, definition_json['source']) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6402.ambari.apache.org") # use a URI that has commas to verify that we properly parse it - pa.set_helpers(collector, {'hdfs-site/my-key': 'c6401.ambari.apache.org:2181,c6402.ambari.apache.org:2181,c6403.ambari.apache.org:2181'}) - pa.host_name = 'c6402.ambari.apache.org' - self.assertEquals(6, pa.interval()) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") + self.assertEquals(6, alert.interval()) - pa.collect() + alert.collect() alerts = collector.alerts() self.assertEquals(0, len(collector.alerts())) @@ -168,7 +186,7 @@ class TestAlerts(TestCase): def test_port_alert_no_sub(self): - json = { "name": "namenode_process", + definition_json = { "name": "namenode_process", "service": "HDFS", "component": "NAMENODE", "label": "NameNode process", @@ -191,16 +209,19 @@ class TestAlerts(TestCase): } } - pa = PortAlert(json, json['source']) - pa.set_helpers(AlertCollector(), '') - self.assertEquals('http://c6401.ambari.apache.org', pa.uri) + cluster_configuration = self.__get_cluster_configuration() + + alert = PortAlert(definition_json, definition_json['source']) + alert.set_helpers(AlertCollector(), cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") - pa.collect() + self.assertEquals('http://c6401.ambari.apache.org', alert.uri) + + alert.collect() - @patch.object(re, 'match', new = MagicMock()) def test_script_alert(self): - json = { + definition_json = { "name": "namenode_process", "service": "HDFS", "component": "NAMENODE", @@ -216,19 +237,28 @@ class TestAlerts(TestCase): } # normally set by AlertSchedulerHandler - json['source']['stacks_directory'] = os.path.join('ambari_agent', 'dummy_files') - json['source']['common_services_directory'] = os.path.join('ambari_agent', 'common-services') - json['source']['host_scripts_directory'] = os.path.join('ambari_agent', 'host_scripts') + definition_json['source']['stacks_directory'] = os.path.join('ambari_agent', 'dummy_files') + definition_json['source']['common_services_directory'] = os.path.join('ambari_agent', 'common-services') + definition_json['source']['host_scripts_directory'] = os.path.join('ambari_agent', 'host_scripts') + + configuration = {'foo-site' : + { 'bar': 'rendered-bar', 'baz' : 'rendered-baz' } + } collector = AlertCollector() - sa = ScriptAlert(json, json['source'], MagicMock()) - sa.set_helpers(collector, {'foo-site/bar': 'rendered-bar', 'foo-site/baz':'rendered-baz'} ) - self.assertEquals(json['source']['path'], sa.path) - self.assertEquals(json['source']['stacks_directory'], sa.stacks_dir) - self.assertEquals(json['source']['common_services_directory'], sa.common_services_dir) - self.assertEquals(json['source']['host_scripts_directory'], sa.host_scripts_dir) + cluster_configuration = self.__get_cluster_configuration() + self.__update_cluster_configuration(cluster_configuration, configuration) + + alert = ScriptAlert(definition_json, definition_json['source'], MagicMock()) + alert.set_helpers(collector, cluster_configuration ) + alert.set_cluster("c1", "c6401.ambari.apache.org") + + self.assertEquals(definition_json['source']['path'], alert.path) + self.assertEquals(definition_json['source']['stacks_directory'], alert.stacks_dir) + self.assertEquals(definition_json['source']['common_services_directory'], alert.common_services_dir) + self.assertEquals(definition_json['source']['host_scripts_directory'], alert.host_scripts_dir) - sa.collect() + alert.collect() alerts = collector.alerts() self.assertEquals(0, len(collector.alerts())) @@ -239,7 +269,7 @@ class TestAlerts(TestCase): @patch.object(MetricAlert, "_load_jmx") def test_metric_alert(self, ma_load_jmx_mock): - json = { + definition_json = { "name": "cpu_check", "service": "HDFS", "component": "NAMENODE", @@ -278,10 +308,19 @@ class TestAlerts(TestCase): ma_load_jmx_mock.return_value = [1, 3] + configuration = {'hdfs-site' : + { 'dfs.datanode.http.address': '1.2.3.4:80'} + } + collector = AlertCollector() - ma = MetricAlert(json, json['source']) - ma.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'}) - ma.collect() + cluster_configuration = self.__get_cluster_configuration() + self.__update_cluster_configuration(cluster_configuration, configuration) + + alert = MetricAlert(definition_json, definition_json['source']) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") + + alert.collect() alerts = collector.alerts() self.assertEquals(0, len(collector.alerts())) @@ -289,11 +328,14 @@ class TestAlerts(TestCase): self.assertEquals('CRITICAL', alerts[0]['state']) self.assertEquals('(Unit Tests) crit_arr: 1 3 223', alerts[0]['text']) - del json['source']['jmx']['value'] + del definition_json['source']['jmx']['value'] collector = AlertCollector() - ma = MetricAlert(json, json['source']) - ma.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'}) - ma.collect() + + alert = MetricAlert(definition_json, definition_json['source']) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") + + alert.collect() alerts = collector.alerts() self.assertEquals(0, len(collector.alerts())) @@ -304,7 +346,7 @@ class TestAlerts(TestCase): @patch.object(MetricAlert, "_load_jmx") def test_alert_uri_structure(self, ma_load_jmx_mock): - json = { + definition_json = { "name": "cpu_check", "service": "HDFS", "component": "NAMENODE", @@ -349,52 +391,81 @@ class TestAlerts(TestCase): # run the alert without specifying any keys; an exception should be thrown # indicating that there was no URI and the result is UNKNOWN collector = AlertCollector() - ma = MetricAlert(json, json['source']) - ma.set_helpers(collector, '') - ma.collect() + cluster_configuration = self.__get_cluster_configuration() + alert = MetricAlert(definition_json, definition_json['source']) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") + alert.collect() self.assertEquals('UNKNOWN', collector.alerts()[0]['state']) - # set 2 properties that make no sense wihtout the main URI properties + # set properties that make no sense wihtout the main URI properties + configuration = {'hdfs-site' : + { 'dfs.http.policy' : 'HTTP_ONLY'} + } + collector = AlertCollector() - ma = MetricAlert(json, json['source']) - ma.set_helpers(collector, {'hdfs-site/dfs.http.policy': 'HTTP_ONLY'}) - ma.collect() + cluster_configuration = self.__get_cluster_configuration() + self.__update_cluster_configuration(cluster_configuration, configuration) + + alert = MetricAlert(definition_json, definition_json['source']) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") + alert.collect() self.assertEquals('UNKNOWN', collector.alerts()[0]['state']) # set an actual property key (http) + configuration = {'hdfs-site' : + { 'dfs.http.policy' : 'HTTP_ONLY', 'dfs.datanode.http.address' : '1.2.3.4:80' } + } + + self.__update_cluster_configuration(cluster_configuration, configuration) + collector = AlertCollector() - ma = MetricAlert(json, json['source']) - ma.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80', - 'hdfs-site/dfs.http.policy': 'HTTP_ONLY'}) - ma.collect() + alert = MetricAlert(definition_json, definition_json['source']) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") + alert.collect() self.assertEquals('OK', collector.alerts()[0]['state']) # set an actual property key (https) + configuration = {'hdfs-site' : + { 'dfs.http.policy' : 'HTTP_ONLY', 'dfs.datanode.https.address' : '1.2.3.4:443' } + } + + self.__update_cluster_configuration(cluster_configuration, configuration) + collector = AlertCollector() - ma = MetricAlert(json, json['source']) - ma.set_helpers(collector, {'hdfs-site/dfs.datanode.https.address': '1.2.3.4:443', - 'hdfs-site/dfs.http.policy': 'HTTP_ONLY'}) - ma.collect() + alert = MetricAlert(definition_json, definition_json['source']) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") + alert.collect() self.assertEquals('OK', collector.alerts()[0]['state']) # set both (http and https) + configuration = {'hdfs-site' : + { 'dfs.http.policy' : 'HTTP_ONLY', + 'dfs.datanode.http.address' : '1.2.3.4:80', + 'dfs.datanode.https.address' : '1.2.3.4:443' } + } + + self.__update_cluster_configuration(cluster_configuration, configuration) + collector = AlertCollector() - ma = MetricAlert(json, json['source']) - ma.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80', - 'hdfs-site/dfs.datanode.https.address': '1.2.3.4:443', - 'hdfs-site/dfs.http.policy': 'HTTP_ONLY'}) - ma.collect() + alert = MetricAlert(definition_json, definition_json['source']) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") + alert.collect() self.assertEquals('OK', collector.alerts()[0]['state']) @patch.object(WebAlert, "_make_web_request") def test_web_alert(self, wa_make_web_request_mock): - json = { + definition_json = { "name": "webalert_test", "service": "HDFS", "component": "DATANODE", @@ -429,9 +500,17 @@ class TestAlerts(TestCase): wa_make_web_request_mock.return_value = WebResponse(200,1.234,None) # run the alert and check HTTP 200 + configuration = {'hdfs-site' : + { 'dfs.datanode.http.address' : '1.2.3.4:80' } + } + collector = AlertCollector() - alert = WebAlert(json, json['source']) - alert.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'}) + cluster_configuration = self.__get_cluster_configuration() + self.__update_cluster_configuration(cluster_configuration, configuration) + + alert = WebAlert(definition_json, definition_json['source']) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") alert.collect() alerts = collector.alerts() @@ -441,10 +520,13 @@ class TestAlerts(TestCase): self.assertEquals('OK', alerts[0]['state']) # run the alert and check HTTP 500 + + wa_make_web_request_mock.return_value = WebResponse(500,1.234,None) collector = AlertCollector() - alert = WebAlert(json, json['source']) - alert.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'}) + alert = WebAlert(definition_json, definition_json['source']) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") alert.collect() alerts = collector.alerts() @@ -457,8 +539,9 @@ class TestAlerts(TestCase): wa_make_web_request_mock.return_value = WebResponse(0,0,'error message') collector = AlertCollector() - alert = WebAlert(json, json['source']) - alert.set_helpers(collector, {'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80'}) + alert = WebAlert(definition_json, definition_json['source']) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") alert.collect() alerts = collector.alerts() @@ -467,13 +550,19 @@ class TestAlerts(TestCase): # http assertion indicating that we properly determined non-SSL self.assertEquals('CRITICAL', alerts[0]['state']) self.assertEquals('(Unit Tests) critical: http://1.2.3.4:80. error message', alerts[0]['text']) - + + configuration = {'hdfs-site' : + { 'dfs.http.policy' : 'HTTPS_ONLY', + 'dfs.datanode.http.address' : '1.2.3.4:80', + 'dfs.datanode.https.address' : '1.2.3.4:443' } + } + + self.__update_cluster_configuration(cluster_configuration, configuration) + collector = AlertCollector() - alert = WebAlert(json, json['source']) - alert.set_helpers(collector, { - 'hdfs-site/dfs.datanode.http.address': '1.2.3.4:80', - 'hdfs-site/dfs.datanode.https.address': '1.2.3.4:8443', - 'hdfs-site/dfs.http.policy': 'HTTPS_ONLY'}) + alert = WebAlert(definition_json, definition_json['source']) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") alert.collect() @@ -482,15 +571,20 @@ class TestAlerts(TestCase): # SSL assertion self.assertEquals('CRITICAL', alerts[0]['state']) - self.assertEquals('(Unit Tests) critical: https://1.2.3.4:8443. error message', alerts[0]['text']) + self.assertEquals('(Unit Tests) critical: https://1.2.3.4:443. error message', alerts[0]['text']) def test_reschedule(self): test_file_path = os.path.join('ambari_agent', 'dummy_files') test_stack_path = os.path.join('ambari_agent', 'dummy_files') test_common_services_path = os.path.join('ambari_agent', 'dummy_files') test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files') - - ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_common_services_path, test_host_scripts_path, None) + + cluster_configuration = self.__get_cluster_configuration() + + ash = AlertSchedulerHandler(test_file_path, test_stack_path, + test_common_services_path, test_host_scripts_path, cluster_configuration, + None) + ash.start() self.assertEquals(1, ash.get_job_count()) @@ -499,7 +593,7 @@ class TestAlerts(TestCase): def test_alert_collector_purge(self): - json = { "name": "namenode_process", + definition_json = { "name": "namenode_process", "service": "HDFS", "component": "NAMENODE", "label": "NameNode process", @@ -522,13 +616,20 @@ class TestAlerts(TestCase): } } + configuration = {'hdfs-site' : + { 'my-key': 'value1' } + } + collector = AlertCollector() + cluster_configuration = self.__get_cluster_configuration() + self.__update_cluster_configuration(cluster_configuration, configuration) - pa = PortAlert(json, json['source']) - pa.set_helpers(collector, {'hdfs-site/my-key': 'value1'}) - self.assertEquals(6, pa.interval()) + alert = PortAlert(definition_json, definition_json['source']) + alert.set_helpers(collector, cluster_configuration) + alert.set_cluster("c1", "c6401.ambari.apache.org") + self.assertEquals(6, alert.interval()) - res = pa.collect() + res = alert.collect() alerts = collector.alerts() self.assertEquals(0, len(collector.alerts())) @@ -546,12 +647,17 @@ class TestAlerts(TestCase): test_common_services_path = os.path.join('ambari_agent', 'dummy_files') test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files') - ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_common_services_path, test_host_scripts_path, None) + cluster_configuration = self.__get_cluster_configuration() + + ash = AlertSchedulerHandler(test_file_path, test_stack_path, + test_common_services_path, test_host_scripts_path, cluster_configuration, + None) + ash.start() self.assertEquals(1, ash.get_job_count()) - json = { "name": "namenode_process", + definition_json = { "name": "namenode_process", "service": "HDFS", "component": "NAMENODE", "label": "NameNode process", @@ -574,20 +680,20 @@ class TestAlerts(TestCase): } } - pa = PortAlert(json, json['source']) - ash.schedule_definition(pa) + alert = PortAlert(definition_json, definition_json['source']) + ash.schedule_definition(alert) self.assertEquals(2, ash.get_job_count()) - json['enabled'] = False - pa = PortAlert(json, json['source']) - ash.schedule_definition(pa) + definition_json['enabled'] = False + alert = PortAlert(definition_json, definition_json['source']) + ash.schedule_definition(alert) # verify disabled alert not scheduled self.assertEquals(2, ash.get_job_count()) - json['enabled'] = True - pa = PortAlert(json, json['source']) + definition_json['enabled'] = True + pa = PortAlert(definition_json, definition_json['source']) ash.schedule_definition(pa) # verify enabled alert was scheduled @@ -599,9 +705,14 @@ class TestAlerts(TestCase): test_common_services_path = os.path.join('ambari_agent', 'dummy_files') test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files') - ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_common_services_path, test_host_scripts_path, None) + cluster_configuration = self.__get_cluster_configuration() + ash = AlertSchedulerHandler(test_file_path, test_stack_path, + test_common_services_path, test_host_scripts_path, cluster_configuration, + None) + ash.start() + self.assertEquals(1, ash.get_job_count()) self.assertEquals(0, len(ash._collector.alerts())) @@ -639,7 +750,7 @@ class TestAlerts(TestCase): def test_skipped_alert(self): - json = { + definition_json = { "name": "namenode_process", "service": "HDFS", "component": "NAMENODE", @@ -655,27 +766,35 @@ class TestAlerts(TestCase): } # normally set by AlertSchedulerHandler - json['source']['stacks_directory'] = os.path.join('ambari_agent', 'dummy_files') - json['source']['common_services_directory'] = os.path.join('ambari_agent', 'common-services') - json['source']['host_scripts_directory'] = os.path.join('ambari_agent', 'host_scripts') + definition_json['source']['stacks_directory'] = os.path.join('ambari_agent', 'dummy_files') + definition_json['source']['common_services_directory'] = os.path.join('ambari_agent', 'common-services') + definition_json['source']['host_scripts_directory'] = os.path.join('ambari_agent', 'host_scripts') + + configuration = {'foo-site' : + { 'skip': 'true' } + } collector = AlertCollector() - sa = ScriptAlert(json, json['source'], None) + cluster_configuration = self.__get_cluster_configuration() + self.__update_cluster_configuration(cluster_configuration, configuration) + + alert = ScriptAlert(definition_json, definition_json['source'], None) # instruct the test alert script to be skipped - sa.set_helpers(collector, {'foo-site/skip': 'true'} ) + alert.set_helpers(collector, cluster_configuration ) + alert.set_cluster("c1", "c6401.ambari.apache.org") - self.assertEquals(json['source']['path'], sa.path) - self.assertEquals(json['source']['stacks_directory'], sa.stacks_dir) - self.assertEquals(json['source']['common_services_directory'], sa.common_services_dir) - self.assertEquals(json['source']['host_scripts_directory'], sa.host_scripts_dir) + self.assertEquals(definition_json['source']['path'], alert.path) + self.assertEquals(definition_json['source']['stacks_directory'], alert.stacks_dir) + self.assertEquals(definition_json['source']['common_services_directory'], alert.common_services_dir) + self.assertEquals(definition_json['source']['host_scripts_directory'], alert.host_scripts_dir) # ensure that it was skipped self.assertEquals(0,len(collector.alerts())) def test_default_reporting_text(self): - json = { + definition_json = { "name": "namenode_process", "service": "HDFS", "component": "NAMENODE", @@ -690,100 +809,116 @@ class TestAlerts(TestCase): } } - alert = ScriptAlert(json, json['source'], None) + alert = ScriptAlert(definition_json, definition_json['source'], None) self.assertEquals(alert._get_reporting_text(alert.RESULT_OK), '{0}') self.assertEquals(alert._get_reporting_text(alert.RESULT_WARNING), '{0}') self.assertEquals(alert._get_reporting_text(alert.RESULT_CRITICAL), '{0}') - json['source']['type'] = 'PORT' - alert = PortAlert(json, json['source']) + definition_json['source']['type'] = 'PORT' + alert = PortAlert(definition_json, definition_json['source']) self.assertEquals(alert._get_reporting_text(alert.RESULT_OK), 'TCP OK - {0:.4f} response on port {1}') self.assertEquals(alert._get_reporting_text(alert.RESULT_WARNING), 'TCP OK - {0:.4f} response on port {1}') self.assertEquals(alert._get_reporting_text(alert.RESULT_CRITICAL), 'Connection failed: {0} to {1}:{2}') - json['source']['type'] = 'WEB' - alert = WebAlert(json, json['source']) + definition_json['source']['type'] = 'WEB' + alert = WebAlert(definition_json, definition_json['source']) self.assertEquals(alert._get_reporting_text(alert.RESULT_OK), 'HTTP {0} response in {2:.4f} seconds') self.assertEquals(alert._get_reporting_text(alert.RESULT_WARNING), 'HTTP {0} response in {2:.4f} seconds') self.assertEquals(alert._get_reporting_text(alert.RESULT_CRITICAL), 'Connection failed to {1}') - json['source']['type'] = 'METRIC' - alert = MetricAlert(json, json['source']) + definition_json['source']['type'] = 'METRIC' + alert = MetricAlert(definition_json, definition_json['source']) self.assertEquals(alert._get_reporting_text(alert.RESULT_OK), '{0}') self.assertEquals(alert._get_reporting_text(alert.RESULT_WARNING), '{0}') self.assertEquals(alert._get_reporting_text(alert.RESULT_CRITICAL), '{0}') - - @patch("json.dump") - def test_update_configurations(self, json_mock): - def open_side_effect(file, mode): - if mode == 'w': - file_mock = MagicMock() - return file_mock - else: - return self.original_open(file, mode) - test_file_path = os.path.join('ambari_agent', 'dummy_files') - test_stack_path = os.path.join('ambari_agent', 'dummy_files') - test_common_services_path = os.path.join('ambari_agent', 'dummy_files') - test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files') + def test_configuration_updates(self): + definition_json = { + "name": "namenode_process", + "service": "HDFS", + "component": "NAMENODE", + "label": "NameNode process", + "interval": 6, + "scope": "host", + "enabled": True, + "uuid": "c1f73191-4481-4435-8dae-fd380e4c0be1", + "source": { + "type": "SCRIPT", + "path": "test_script.py", + } + } - commands = [{"clusterName": "c1", - "configurations": { - "hdfs-site": { - "dfs.namenode.http-address": "c6401.ambari.apache.org:50071" - } - }}] - with open(os.path.join(test_stack_path, "definitions.json"),"r") as fp: - all_commands = json.load(fp) - all_commands[0]['configurations']['hdfs-site'].update({"dfs.namenode.http-address": "c6401.ambari.apache.org:50071"}) - - ash = AlertSchedulerHandler(test_file_path, test_stack_path, test_common_services_path, test_host_scripts_path, None) - ash.start() + # normally set by AlertSchedulerHandler + definition_json['source']['stacks_directory'] = os.path.join('ambari_agent', 'dummy_files') + definition_json['source']['common_services_directory'] = os.path.join('ambari_agent', 'common-services') + definition_json['source']['host_scripts_directory'] = os.path.join('ambari_agent', 'host_scripts') - with patch("__builtin__.open") as open_mock: - open_mock.side_effect = open_side_effect - ash.update_configurations(commands) + configuration = {'foo-site' : + { 'bar': 'rendered-bar', 'baz' : 'rendered-baz' } + } - self.assertTrue(json_mock.called) - self.assertTrue(json_mock.called_with(all_commands)) + # populate the configuration cache with the initial configs + collector = AlertCollector() + cluster_configuration = self.__get_cluster_configuration() + self.__update_cluster_configuration(cluster_configuration, configuration) + # run the alert and verify the output + alert = ScriptAlert(definition_json, definition_json['source'], MagicMock()) + alert.set_helpers(collector, cluster_configuration ) + alert.set_cluster("c1", "c6401.ambari.apache.org") + alert.collect() - @patch.object(AlertSchedulerHandler,"reschedule_all") - @patch("json.dump") - def test_update_configurations_without_reschedule(self, json_mock, reschedule_mock): + alerts = collector.alerts() + self.assertEquals(0, len(collector.alerts())) - def open_side_effect(file, mode): - if mode == 'w': - file_mock = MagicMock() - return file_mock - else: - return self.original_open(file, mode) + self.assertEquals('WARNING', alerts[0]['state']) + self.assertEquals('bar is rendered-bar, baz is rendered-baz', alerts[0]['text']) - test_file_path = os.path.join('ambari_agent', 'dummy_files') - test_stack_path = os.path.join('ambari_agent', 'dummy_files') - test_common_services_path = os.path.join('ambari_agent', 'dummy_files') - test_host_scripts_path = os.path.join('ambari_agent', 'dummy_files') + # now update only the configs and run the same alert again and check + # for different output + configuration = {'foo-site' : + { 'bar': 'rendered-bar2', 'baz' : 'rendered-baz2' } + } - with open(os.path.join(test_stack_path, "definitions.json"),"r") as fp: - all_commands = json.load(fp) + # populate the configuration cache with the initial configs + self.__update_cluster_configuration(cluster_configuration, configuration) - # create a copy of the configurations from definitions.json, then add - # a brand new property - this should not cause a restart since there are - # no alerts that use this new property - commands = [{"clusterName": "c1" }] - commands[0]['configurations'] = all_commands[0]['configurations'] - commands[0]['configurations'].update({ "foo" : "bar" }) + alert.collect() - ash = AlertSchedulerHandler(test_file_path, test_stack_path, - test_common_services_path, test_host_scripts_path, None) + alerts = collector.alerts() + self.assertEquals(0, len(collector.alerts())) - ash.start() + self.assertEquals('WARNING', alerts[0]['state']) + self.assertEquals('bar is rendered-bar2, baz is rendered-baz2', alerts[0]['text']) + + def __get_cluster_configuration(self): + """ + Gets an instance of the cluster cache where the file read and write + operations have been mocked out + :return: + """ with patch("__builtin__.open") as open_mock: - open_mock.side_effect = open_side_effect - ash.update_configurations(commands) + open_mock.side_effect = self.open_side_effect + cluster_configuration = ClusterConfiguration("") + return cluster_configuration + + + def __update_cluster_configuration(self, cluster_configuration, configuration): + """ + Updates the configuration cache, using as mock file as the disk based + cache so that a file is not created during tests + :return: + """ + with patch("__builtin__.open") as open_mock: + open_mock.side_effect = self.open_side_effect + cluster_configuration._update_configurations("c1", configuration) + - self.assertTrue(json_mock.called) - self.assertTrue(json_mock.called_with(all_commands)) - self.assertFalse(reschedule_mock.called) + def open_side_effect(self, file, mode): + if mode == 'w': + file_mock = MagicMock() + return file_mock + else: + return self.original_open(file, mode) http://git-wip-us.apache.org/repos/asf/ambari/blob/278b7a02/ambari-agent/src/test/python/ambari_agent/TestClusterConfigurationCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestClusterConfigurationCache.py b/ambari-agent/src/test/python/ambari_agent/TestClusterConfigurationCache.py new file mode 100644 index 0000000..422cb8c --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/TestClusterConfigurationCache.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import sys + +from ambari_agent.ClusterConfiguration import ClusterConfiguration + +from mock.mock import MagicMock, patch, mock_open, ANY +from unittest import TestCase + +class TestClusterConfigurationCache(TestCase): + + def setUp(self): + # save original open() method for later use + self.original_open = open + + def tearDown(self): + sys.stdout == sys.__stdout__ + + + @patch("os.path.exists", new = MagicMock(return_value=True)) + @patch("os.path.isfile", new = MagicMock(return_value=True)) + def test_cluster_configuration_cache_initialization(self): + configuration_json = '{ "c1" : { "foo-site" : { "foo" : "bar", "foobar" : "baz" } } }' + open_mock = mock_open(read_data=configuration_json) + + with patch("__builtin__.open", open_mock): + cluster_configuration = ClusterConfiguration("/foo/bar/baz") + + open_mock.assert_called_with("/foo/bar/baz/configurations.json", 'r') + + self.assertEqual('bar', cluster_configuration.get_configuration_value('c1', 'foo-site/foo') ) + self.assertEqual('baz', cluster_configuration.get_configuration_value('c1', 'foo-site/foobar') ) + self.assertEqual(None, cluster_configuration.get_configuration_value('c1', 'INVALID') ) + self.assertEqual(None, cluster_configuration.get_configuration_value('c1', 'INVALID/INVALID') ) + self.assertEqual(None, cluster_configuration.get_configuration_value('INVALID', 'foo-site/foo') ) + self.assertEqual(None, cluster_configuration.get_configuration_value('INVALID', 'foo-site/foobar') ) + + + @patch("json.dump") + def test_cluster_configuration_update(self, json_dump_mock): + cluster_configuration = self.__get_cluster_configuration() + + configuration = {'foo-site' : + { 'bar': 'rendered-bar', 'baz' : 'rendered-baz' } + } + + file_mock = self.__update_cluster_configuration(cluster_configuration, configuration) + file_mock.assert_called_with('/foo/bar/baz/configurations.json', 'w') + + json_dump_mock.assert_called_with({'c1': {'foo-site': {'baz': 'rendered-baz', 'bar': 'rendered-bar'}}}, ANY, indent=2) + + def __get_cluster_configuration(self): + """ + Gets an instance of the cluster cache where the file read and write + operations have been mocked out + :return: + """ + with patch("__builtin__.open") as open_mock: + open_mock.side_effect = self.open_side_effect + cluster_configuration = ClusterConfiguration("/foo/bar/baz") + return cluster_configuration + + + def __update_cluster_configuration(self, cluster_configuration, configuration): + """ + Updates the configuration cache, using as mock file as the disk based + cache so that a file is not created during tests + :return: + """ + with patch("__builtin__.open") as open_mock: + open_mock.side_effect = self.open_side_effect + cluster_configuration._update_configurations("c1", configuration) + + return open_mock + + + def open_side_effect(self, file, mode): + if mode == 'w': + file_mock = MagicMock() + return file_mock + else: + return self.original_open(file, mode) http://git-wip-us.apache.org/repos/asf/ambari/blob/278b7a02/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/alerts/alert_nodemanagers_summary.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/alerts/alert_nodemanagers_summary.py b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/alerts/alert_nodemanagers_summary.py index 72527aa..6895889 100644 --- a/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/alerts/alert_nodemanagers_summary.py +++ b/ambari-server/src/main/resources/common-services/YARN/2.1.0.2.0/package/alerts/alert_nodemanagers_summary.py @@ -28,6 +28,14 @@ NODEMANAGER_HTTP_ADDRESS_KEY = '{{yarn-site/yarn.resourcemanager.webapp.address} NODEMANAGER_HTTPS_ADDRESS_KEY = '{{yarn-site/yarn.resourcemanager.webapp.https.address}}' YARN_HTTP_POLICY_KEY = '{{yarn-site/yarn.http.policy}}' +def get_tokens(): + """ + Returns a tuple of tokens in the format {{site/property}} that will be used + to build the dictionary passed into execute + """ + return NODEMANAGER_HTTP_ADDRESS_KEY, NODEMANAGER_HTTPS_ADDRESS_KEY, \ + YARN_HTTP_POLICY_KEY + def execute(parameters=None, host_name=None): """
