AMBARI-20559. Create a base class for caching in ambari-agent. (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/447485da Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/447485da Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/447485da Branch: refs/heads/branch-3.0-perf Commit: 447485daa73e21497fdc745b8cdcc6b5dab5bf5f Parents: b7d5ef7 Author: Andrew Onishuk <[email protected]> Authored: Mon Mar 27 13:38:27 2017 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Mon Mar 27 13:38:27 2017 +0300 ---------------------------------------------------------------------- .../main/python/ambari_agent/ClusterCache.py | 86 ++++++++++ .../python/ambari_agent/ClusterConfiguration.py | 160 ------------------- .../ambari_agent/ClusterConfigurationCache.py | 64 ++++++++ .../src/main/python/ambari_agent/Controller.py | 34 +++- .../src/main/python/ambari_agent/FileCache.py | 2 +- .../python/ambari_agent/alerts/base_alert.py | 6 +- .../src/test/python/ambari_agent/TestAlerts.py | 6 +- .../TestClusterConfigurationCache.py | 12 +- 8 files changed, 191 insertions(+), 179 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/447485da/ambari-agent/src/main/python/ambari_agent/ClusterCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py new file mode 100644 index 0000000..d7ea08c --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import logging +import ambari_simplejson as json +import os +import threading + +logger = logging.getLogger(__name__) + +class ClusterCache(object): + """ + Maintains an in-memory cache and disk cache (for debugging purposes) for + every cluster. This is useful for having quick access to any of the properties. + """ + + def __init__(self, cluster_cache_dir): + """ + Initializes the cache. + :param cluster_cache_dir: + :return: + """ + self.cluster_cache_dir = cluster_cache_dir + + # keys are cluster names, values are caches for the clusters + self._cache_dict = {} + + self.__file_lock = threading.RLock() + self._cache_lock = threading.RLock() + self.__current_cache_json_file = os.path.join(self.cluster_cache_dir, self.get_file_name()) + + # ensure that our cache directory exists + if not os.path.exists(cluster_cache_dir): + os.makedirs(cluster_cache_dir) + + # if the file exists, then load it + if os.path.isfile(self.__current_cache_json_file): + with open(self.__current_cache_json_file, 'r') as fp: + self._cache_dict = json.load(fp) + + def update_cache(self, cluster_name, cache): + """ + Thread-safe method for writing out the specified cluster cache + and updating the in-memory representation. + :param cluster_name: + :param cache: + :return: + """ + logger.info("Updating cache {0} for cluster {1}".format(self.__class__.__name__, cluster_name)) + + self._cache_lock.acquire() + try: + self._cache_dict[cluster_name] = cache + finally: + self._cache_lock.release() + + + self.__file_lock.acquire() + try: + with os.fdopen(os.open(self.__current_cache_json_file, os.O_WRONLY | os.O_CREAT, 0o600), "w") as f: + json.dump(self._cache_dict, f, indent=2) + finally: + self.__file_lock.release() + + def get_cache(self): + self._cache_lock.acquire() + cache_copy = self._cache_dict[:] + self._cache_lock.release() + return cache_copy http://git-wip-us.apache.org/repos/asf/ambari/blob/447485da/ambari-agent/src/main/python/ambari_agent/ClusterConfiguration.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterConfiguration.py b/ambari-agent/src/main/python/ambari_agent/ClusterConfiguration.py deleted file mode 100644 index 72b87be..0000000 --- a/ambari-agent/src/main/python/ambari_agent/ClusterConfiguration.py +++ /dev/null @@ -1,160 +0,0 @@ -#!/usr/bin/env python - -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" - -import logging -import ambari_simplejson as json -import os -import threading - -logger = logging.getLogger(__name__) - -class ClusterConfiguration(): - """ - Maintains an in-memory cache and disk cache of the configurations for - every cluster. This is useful for having quick access to any of the - configuration properties. - """ - - FILENAME = 'configurations.json' - - # constants that define which commands hold configurations that can be - # used to populate this cache - EXECUTION_COMMANDS = 'executionCommands' - ALERT_DEFINITION_COMMANDS = 'alertDefinitionCommands' - COMMANDS_WITH_CONFIGURATIONS = [EXECUTION_COMMANDS, ALERT_DEFINITION_COMMANDS] - - def __init__(self, cluster_config_cache_dir): - """ - Initializes the configuration cache. - :param cluster_config_cache_dir: - :return: - """ - self.cluster_config_cache_dir = cluster_config_cache_dir - - # keys are cluster names, values are configurations - self.__configurations = {} - - self.__file_lock = threading.RLock() - self.__cache_lock = threading.RLock() - self.__config_json_file = os.path.join(self.cluster_config_cache_dir, self.FILENAME) - - # ensure that our cache directory exists - if not os.path.exists(cluster_config_cache_dir): - try: - os.makedirs(cluster_config_cache_dir) - except: - logger.critical("Could not create the cluster configuration cache directory {0}".format(cluster_config_cache_dir)) - - # if the file exists, then load it - try: - if os.path.isfile(self.__config_json_file): - with open(self.__config_json_file, 'r') as fp: - self.__configurations = json.load(fp) - except Exception, exception: - logger.warning("Unable to load configurations from {0}. This file will be regenerated on registration".format(self.__config_json_file)) - - - def update_configurations_from_heartbeat(self, heartbeat): - """ - Updates the in-memory and disk-based cluster configurations based on - the heartbeat. This will only update configurations on the following - types of commands in the heartbeat: execution, and alert definition. - :param heartbeat: the heartbeat response - :return: - """ - heartbeat_keys = heartbeat.keys() - - heartbeat_contains_configurations = False - for commandType in self.COMMANDS_WITH_CONFIGURATIONS: - if commandType in heartbeat_keys: - heartbeat_contains_configurations = True - - # if this heartbeat doesn't contain a command with configurations, then - # don't process it - if heartbeat_contains_configurations is False: - return - - if self.EXECUTION_COMMANDS in heartbeat_keys: - execution_commands = heartbeat[self.EXECUTION_COMMANDS] - for command in execution_commands: - if 'clusterName' in command and 'configurations' in command: - cluster_name = command['clusterName'] - configurations = command['configurations'] - self._update_configurations(cluster_name, configurations) - - if self.ALERT_DEFINITION_COMMANDS in heartbeat_keys: - alert_definition_commands = heartbeat[self.ALERT_DEFINITION_COMMANDS] - for command in alert_definition_commands: - if 'clusterName' in command and 'configurations' in command: - cluster_name = command['clusterName'] - configurations = command['configurations'] - self._update_configurations(cluster_name, configurations) - - - def _update_configurations(self, cluster_name, configuration): - """ - Thread-safe method for writing out the specified cluster configuration - and updating the in-memory representation. - :param cluster_name: - :param configuration: - :return: - """ - logger.info("Updating cached configurations for cluster {0}".format(cluster_name)) - - self.__cache_lock.acquire() - try: - self.__configurations[cluster_name] = configuration - except Exception, exception : - logger.exception("Unable to update configurations for cluster {0}".format(cluster_name)) - finally: - self.__cache_lock.release() - - - self.__file_lock.acquire() - try: - with os.fdopen(os.open(self.__config_json_file, os.O_WRONLY | os.O_CREAT, 0o600), "w") as f: - json.dump(self.__configurations, f, indent=2) - except Exception, exception : - logger.exception("Unable to update configurations for cluster {0}".format(cluster_name)) - finally: - self.__file_lock.release() - - - def get_configuration_value(self, cluster_name, key): - """ - Gets a value from the cluster configuration map for the given cluster and - key. The key is expected to be of the form 'foo-bar/baz' or - 'foo-bar/bar-baz/foobarbaz' where every / denotes a new mapping - :param key: a lookup key, like 'foo-bar/baz' - :return: the value, or None if not found - """ - self.__cache_lock.acquire() - try: - dictionary = self.__configurations[cluster_name] - for layer_key in key.split('/'): - dictionary = dictionary[layer_key] - - return dictionary - - except Exception: - logger.debug("Cache miss for configuration property {0} in cluster {1}".format(key, cluster_name)) - return None - finally: - self.__cache_lock.release() http://git-wip-us.apache.org/repos/asf/ambari/blob/447485da/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py new file mode 100644 index 0000000..2c35018 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from ambari_agent.ClusterCache import ClusterCache +import logging + +logger = logging.getLogger(__name__) + +class ClusterConfigurationCache(ClusterCache): + """ + Maintains an in-memory cache and disk cache of the configurations for + every cluster. This is useful for having quick access to any of the + configuration properties. + """ + + def __init__(self, cluster_cache_dir): + """ + Initializes the configuration cache. + :param cluster_cache_dir: + :return: + """ + super(ClusterConfigurationCache, self).__init__(cluster_cache_dir) + + def get_file_name(self): + return 'configurations.json' + + def get_configuration_value(self, cluster_name, key): + """ + Gets a value from the cluster configuration map for the given cluster and + key. The key is expected to be of the form 'foo-bar/baz' or + 'foo-bar/bar-baz/foobarbaz' where every / denotes a new mapping + :param key: a lookup key, like 'foo-bar/baz' + :return: the value, or None if not found + """ + self._cache_lock.acquire() + try: + dictionary = self._cache_dict[cluster_name] + for layer_key in key.split('/'): + dictionary = dictionary[layer_key] + + return dictionary + + except KeyError: + logger.debug("Cache miss for configuration property {0} in cluster {1}".format(key, cluster_name)) + return None + finally: + self._cache_lock.release() http://git-wip-us.apache.org/repos/asf/ambari/blob/447485da/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 78b5c0c..49bb3d5 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -44,7 +44,7 @@ from ambari_agent.FileCache import FileCache from ambari_agent.NetUtil import NetUtil from ambari_agent.LiveStatus import LiveStatus from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler -from ambari_agent.ClusterConfiguration import ClusterConfiguration +from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache from ambari_agent.RecoveryManager import RecoveryManager from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers from ambari_agent.ExitHelper import ExitHelper @@ -61,6 +61,9 @@ AGENT_RAM_OVERUSE_MESSAGE = "Ambari-agent RAM usage {used_ram} MB went above {co class Controller(threading.Thread): + EXECUTION_COMMANDS = 'executionCommands' + ALERT_DEFINITION_COMMANDS = 'alertDefinitionCommands' + def __init__(self, config, server_hostname, heartbeat_stop_callback = None): threading.Thread.__init__(self) logger.debug('Initializing Controller RPC thread.') @@ -107,7 +110,7 @@ class Controller(threading.Thread): extensions_cache_dir = os.path.join(cache_dir, FileCache.EXTENSIONS_CACHE_DIRECTORY) host_scripts_cache_dir = os.path.join(cache_dir, FileCache.HOST_SCRIPTS_CACHE_DIRECTORY) alerts_cache_dir = os.path.join(cache_dir, FileCache.ALERTS_CACHE_DIRECTORY) - cluster_config_cache_dir = os.path.join(cache_dir, FileCache.CLUSTER_CONFIGURATION_CACHE_DIRECTORY) + cluster_cache_dir = os.path.join(cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY) recovery_cache_dir = os.path.join(cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY) self.heartbeat_idle_interval_min = int(self.config.get('heartbeat', 'idle_interval_min')) if self.config.get('heartbeat', 'idle_interval_min') else self.netutil.HEARTBEAT_IDLE_INTERVAL_DEFAULT_MIN_SEC @@ -123,7 +126,7 @@ class Controller(threading.Thread): self.recovery_manager = RecoveryManager(recovery_cache_dir) - self.cluster_configuration = ClusterConfiguration(cluster_config_cache_dir) + self.cluster_configuration_cache = ClusterConfigurationCache(cluster_cache_dir) self.move_data_dir_mount_file() @@ -132,7 +135,7 @@ class Controller(threading.Thread): self.alert_scheduler_handler = AlertSchedulerHandler(alerts_cache_dir, stacks_cache_dir, common_services_cache_dir, extensions_cache_dir, - host_scripts_cache_dir, self.cluster_configuration, config, + host_scripts_cache_dir, self.cluster_configuration_cache, config, self.recovery_manager) self.alert_scheduler_handler.start() @@ -199,7 +202,7 @@ class Controller(threading.Thread): # always update cached cluster configurations on registration # must be prior to any other operation - self.cluster_configuration.update_configurations_from_heartbeat(ret) + self.update_caches_from_heartbeat(ret) self.recovery_manager.update_configuration_from_registration(ret) self.config.update_configuration_from_registration(ret) logger.debug("Updated config:" + str(self.config)) @@ -230,6 +233,25 @@ class Controller(threading.Thread): return ret + def update_caches_from_heartbeat(self, heartbeat): + heartbeat_keys = heartbeat.keys() + + if self.EXECUTION_COMMANDS in heartbeat_keys: + execution_commands = heartbeat[self.EXECUTION_COMMANDS] + for command in execution_commands: + if 'clusterName' in command and 'configurations' in command: + cluster_name = command['clusterName'] + configurations = command['configurations'] + self.cluster_configuration_cache.update_cache(cluster_name, configurations) + + if self.ALERT_DEFINITION_COMMANDS in heartbeat_keys: + alert_definition_commands = heartbeat[self.ALERT_DEFINITION_COMMANDS] + for command in alert_definition_commands: + if 'clusterName' in command and 'configurations' in command: + cluster_name = command['clusterName'] + configurations = command['configurations'] + self.cluster_configuration_cache.update_cache(cluster_name, configurations) + def cancelCommandInQueue(self, commands): """ Remove from the queue commands, kill the process if it's in progress """ if commands: @@ -375,7 +397,7 @@ class Controller(threading.Thread): # if the response contains configurations, update the in-memory and # disk-based configuration cache (execution and alert commands have this) logger.log(logging_level, "Updating configurations from heartbeat") - self.cluster_configuration.update_configurations_from_heartbeat(response) + self.update_caches_from_heartbeat(response) response_keys = response.keys() http://git-wip-us.apache.org/repos/asf/ambari/blob/447485da/ambari-agent/src/main/python/ambari_agent/FileCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/FileCache.py b/ambari-agent/src/main/python/ambari_agent/FileCache.py index d0c8bdb..139dcba 100644 --- a/ambari-agent/src/main/python/ambari_agent/FileCache.py +++ b/ambari-agent/src/main/python/ambari_agent/FileCache.py @@ -39,7 +39,7 @@ class FileCache(): downloads relevant files from the server. """ - CLUSTER_CONFIGURATION_CACHE_DIRECTORY="cluster_configuration" + CLUSTER_CACHE_DIRECTORY="cluster_cache" ALERTS_CACHE_DIRECTORY="alerts" RECOVERY_CACHE_DIRECTORY="recovery" STACKS_CACHE_DIRECTORY="stacks" http://git-wip-us.apache.org/repos/asf/ambari/blob/447485da/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py index 7f3b2a5..b75b540 100644 --- a/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py +++ b/ambari-agent/src/main/python/ambari_agent/alerts/base_alert.py @@ -78,12 +78,12 @@ class BaseAlert(object): return self.alert_meta['uuid'] - def set_helpers(self, collector, cluster_configuration): + def set_helpers(self, collector, cluster_configuration_cache): """ sets helper objects for alerts without having to use them in a constructor """ self.collector = collector - self.cluster_configuration = cluster_configuration + self.cluster_configuration_cache = cluster_configuration_cache def set_cluster(self, cluster_name, host_name): @@ -215,7 +215,7 @@ class BaseAlert(object): # for every match, get its configuration value and replace it in the key resolved_key = key for placeholder_key in placeholder_keys: - value = self.cluster_configuration.get_configuration_value( + value = self.cluster_configuration_cache.get_configuration_value( self.cluster_name, placeholder_key) # if any of the placeholder keys is missing from the configuration, then http://git-wip-us.apache.org/repos/asf/ambari/blob/447485da/ambari-agent/src/test/python/ambari_agent/TestAlerts.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py index 64479a2..b344b24 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAlerts.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAlerts.py @@ -35,7 +35,7 @@ from ambari_agent.alerts.script_alert import ScriptAlert from ambari_agent.alerts.web_alert import WebAlert from ambari_agent.alerts.recovery_alert import RecoveryAlert from ambari_agent.apscheduler.scheduler import Scheduler -from ambari_agent.ClusterConfiguration import ClusterConfiguration +from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache from ambari_commons.urllib_handlers import RefreshHeaderProcessor from collections import namedtuple @@ -1288,7 +1288,7 @@ class TestAlerts(TestCase): """ with patch("__builtin__.open") as open_mock: open_mock.side_effect = self.open_side_effect - cluster_configuration = ClusterConfiguration("") + cluster_configuration = ClusterConfigurationCache("/tmp/test_cache") return cluster_configuration @@ -1301,7 +1301,7 @@ class TestAlerts(TestCase): :return: """ osfdopen_mock.side_effect = self.osfdopen_side_effect - cluster_configuration._update_configurations("c1", configuration) + cluster_configuration.update_cache("c1", configuration) def open_side_effect(self, file, mode): http://git-wip-us.apache.org/repos/asf/ambari/blob/447485da/ambari-agent/src/test/python/ambari_agent/TestClusterConfigurationCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestClusterConfigurationCache.py b/ambari-agent/src/test/python/ambari_agent/TestClusterConfigurationCache.py index a418f6d..f7159d9 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestClusterConfigurationCache.py +++ b/ambari-agent/src/test/python/ambari_agent/TestClusterConfigurationCache.py @@ -21,7 +21,7 @@ limitations under the License. import os import sys -from ambari_agent.ClusterConfiguration import ClusterConfiguration +from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache from mock.mock import MagicMock, patch, mock_open, ANY from unittest import TestCase @@ -46,9 +46,9 @@ class TestClusterConfigurationCache(TestCase): open_mock = mock_open(read_data=configuration_json) with patch("__builtin__.open", open_mock): - cluster_configuration = ClusterConfiguration(os.path.join(os.sep, "foo", "bar", "baz")) + cluster_configuration = ClusterConfigurationCache(os.path.join(os.sep, "tmp", "bar", "baz")) - open_mock.assert_called_with(os.sep + "foo" + os.sep + "bar" + os.sep + "baz" + os.sep + "configurations.json", 'r') + open_mock.assert_called_with(os.sep + "tmp" + os.sep + "bar" + os.sep + "baz" + os.sep + "configurations.json", 'r') self.assertEqual('bar', cluster_configuration.get_configuration_value('c1', 'foo-site/foo') ) self.assertEqual('baz', cluster_configuration.get_configuration_value('c1', 'foo-site/foobar') ) @@ -68,7 +68,7 @@ class TestClusterConfigurationCache(TestCase): } osopen_mock, osfdopen_mock = self.__update_cluster_configuration(cluster_configuration, configuration) - osopen_mock.assert_called_with(os.sep + "foo" + os.sep + "bar" + os.sep + "baz" + os.sep + "configurations.json", + osopen_mock.assert_called_with(os.sep + "tmp" + os.sep + "bar" + os.sep + "baz" + os.sep + "configurations.json", TestClusterConfigurationCache.o_flags, TestClusterConfigurationCache.perms); osfdopen_mock.assert_called_with(11, "w") @@ -84,7 +84,7 @@ class TestClusterConfigurationCache(TestCase): """ with patch("__builtin__.open") as open_mock: open_mock.side_effect = self.open_side_effect - cluster_configuration = ClusterConfiguration(os.path.join(os.sep, "foo", "bar", "baz")) + cluster_configuration = ClusterConfigurationCache(os.path.join(os.sep, "tmp", "bar", "baz")) return cluster_configuration @@ -97,7 +97,7 @@ class TestClusterConfigurationCache(TestCase): :return: """ osopen_mock.return_value = 11 - cluster_configuration._update_configurations("c1", configuration) + cluster_configuration.update_cache("c1", configuration) return osopen_mock, osfdopen_mock
