AMBARI-20620. Add hashing support into agent for caches ; add topology and metadata cache (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/df752f86 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/df752f86 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/df752f86 Branch: refs/heads/branch-3.0-perf Commit: df752f86e5e887e67016e81125702192a97f46a7 Parents: 3bcf2ad Author: Andrew Onishuk <[email protected]> Authored: Wed Mar 29 14:56:00 2017 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Wed Mar 29 14:56:00 2017 +0300 ---------------------------------------------------------------------- .../main/python/ambari_agent/ClusterCache.py | 57 ++++++++++++-------- .../ambari_agent/ClusterConfigurationCache.py | 4 +- .../python/ambari_agent/ClusterMetadataCache.py | 42 +++++++++++++++ .../python/ambari_agent/ClusterTopologyCache.py | 42 +++++++++++++++ .../src/main/python/ambari_agent/Controller.py | 12 +++++ .../src/main/python/ambari_agent/Utils.py | 52 ++++++++++++++++++ 6 files changed, 186 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/df752f86/ambari-agent/src/main/python/ambari_agent/ClusterCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py index d7ea08c..1986733 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py @@ -18,14 +18,16 @@ See the License for the specific language governing permissions and limitations under the License. """ +import hashlib import logging import ambari_simplejson as json import os import threading +from ambari_agent.Utils import Utils logger = logging.getLogger(__name__) -class ClusterCache(object): +class ClusterCache(dict): """ Maintains an in-memory cache and disk cache (for debugging purposes) for every cluster. This is useful for having quick access to any of the properties. @@ -37,10 +39,8 @@ class ClusterCache(object): :param cluster_cache_dir: :return: """ - self.cluster_cache_dir = cluster_cache_dir - # keys are cluster names, values are caches for the clusters - self._cache_dict = {} + self.cluster_cache_dir = cluster_cache_dir self.__file_lock = threading.RLock() self._cache_lock = threading.RLock() @@ -51,9 +51,12 @@ class ClusterCache(object): os.makedirs(cluster_cache_dir) # if the file exists, then load it + cache_dict = {} if os.path.isfile(self.__current_cache_json_file): with open(self.__current_cache_json_file, 'r') as fp: - self._cache_dict = json.load(fp) + cache_dict = json.load(fp) + + super(ClusterCache, self).__init__(cache_dict) def update_cache(self, cluster_name, cache): """ @@ -65,22 +68,34 @@ class ClusterCache(object): """ logger.info("Updating cache {0} for cluster {1}".format(self.__class__.__name__, cluster_name)) - self._cache_lock.acquire() - try: - self._cache_dict[cluster_name] = cache - finally: - self._cache_lock.release() + # The cache should contain exactly the data received from server. + # Modifications on agent-side will lead to unnecessary cache sync every agent registration. Which is a big concern on perf clusters! + # Also immutability can lead to multithreading issues. + immutable_cache = Utils.make_immutable(cache) + with self._cache_lock: + self[cluster_name] = immutable_cache - self.__file_lock.acquire() - try: + with self.__file_lock: with os.fdopen(os.open(self.__current_cache_json_file, os.O_WRONLY | os.O_CREAT, 0o600), "w") as f: - json.dump(self._cache_dict, f, indent=2) - finally: - self.__file_lock.release() - - def get_cache(self): - self._cache_lock.acquire() - cache_copy = self._cache_dict[:] - self._cache_lock.release() - return cache_copy + json.dump(self, f, indent=2) + + def get_md5_hashsum(self, cluster_name): + """ + Thread-safe method for writing out the specified cluster cache + and updating the in-memory representation. + :param cluster_name: + :param cache: + :return: + """ + with self._cache_lock: + # have to make sure server generates json in exactly the same way. So hashes are equal + json_repr = json.dumps(self, sort_keys=True) + + md5_calculator = hashlib.md5() + md5_calculator.update(json_repr) + result = md5_calculator.hexdigest() + + logger.info("Cache value for {0} is {1}".format(self.__class__.__name__, result)) + + return result \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/df752f86/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py index 2c35018..61fdf94 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterConfigurationCache.py @@ -33,7 +33,7 @@ class ClusterConfigurationCache(ClusterCache): def __init__(self, cluster_cache_dir): """ Initializes the configuration cache. - :param cluster_cache_dir: + :param cluster_cache_dir: directory the changed json are saved :return: """ super(ClusterConfigurationCache, self).__init__(cluster_cache_dir) @@ -51,7 +51,7 @@ class ClusterConfigurationCache(ClusterCache): """ self._cache_lock.acquire() try: - dictionary = self._cache_dict[cluster_name] + dictionary = self[cluster_name] for layer_key in key.split('/'): dictionary = dictionary[layer_key] http://git-wip-us.apache.org/repos/asf/ambari/blob/df752f86/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py new file mode 100644 index 0000000..4859b3f --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/ClusterMetadataCache.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from ambari_agent.ClusterCache import ClusterCache +import logging + +logger = logging.getLogger(__name__) + +class ClusterMetadataCache(ClusterCache): + """ + Maintains an in-memory cache and disk cache of the metadata send from server for + every cluster. This is useful for having quick access to any of the + topology properties. + """ + + def __init__(self, cluster_cache_dir): + """ + Initializes the topology cache. + :param cluster_cache_dir: + :return: + """ + super(ClusterMetadataCache, self).__init__(cluster_cache_dir) + + def get_file_name(self): + return 'metadata.json' http://git-wip-us.apache.org/repos/asf/ambari/blob/df752f86/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py new file mode 100644 index 0000000..19313c5 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from ambari_agent.ClusterCache import ClusterCache +import logging + +logger = logging.getLogger(__name__) + +class ClusterTopologyCache(ClusterCache): + """ + Maintains an in-memory cache and disk cache of the topology for + every cluster. This is useful for having quick access to any of the + topology properties. + """ + + def __init__(self, cluster_cache_dir): + """ + Initializes the topology cache. + :param cluster_cache_dir: + :return: + """ + super(ClusterTopologyCache, self).__init__(cluster_cache_dir) + + def get_file_name(self): + return 'topology.json' http://git-wip-us.apache.org/repos/asf/ambari/blob/df752f86/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 49bb3d5..4d81947 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -45,6 +45,7 @@ from ambari_agent.NetUtil import NetUtil from ambari_agent.LiveStatus import LiveStatus from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache +from ambari_agent.ClusterTopologyCache import ClusterTopologyCache from ambari_agent.RecoveryManager import RecoveryManager from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers from ambari_agent.ExitHelper import ExitHelper @@ -127,6 +128,7 @@ class Controller(threading.Thread): self.recovery_manager = RecoveryManager(recovery_cache_dir) self.cluster_configuration_cache = ClusterConfigurationCache(cluster_cache_dir) + self.cluster_topology_cache = ClusterTopologyCache(cluster_cache_dir) self.move_data_dir_mount_file() @@ -242,7 +244,14 @@ class Controller(threading.Thread): if 'clusterName' in command and 'configurations' in command: cluster_name = command['clusterName'] configurations = command['configurations'] + topology = command['clusterHostInfo'] self.cluster_configuration_cache.update_cache(cluster_name, configurations) + self.cluster_topology_cache.update_cache(cluster_name, topology) + + # TODO: use this once server part is ready. + self.cluster_topology_cache.get_md5_hashsum(cluster_name) + self.cluster_configuration_cache.get_md5_hashsum(cluster_name) + if self.ALERT_DEFINITION_COMMANDS in heartbeat_keys: alert_definition_commands = heartbeat[self.ALERT_DEFINITION_COMMANDS] @@ -252,6 +261,9 @@ class Controller(threading.Thread): configurations = command['configurations'] self.cluster_configuration_cache.update_cache(cluster_name, configurations) + # TODO: use this once server part is ready. + self.cluster_configuration_cache.get_md5_hashsum(cluster_name) + def cancelCommandInQueue(self, commands): """ Remove from the queue commands, kill the process if it's in progress """ if commands: http://git-wip-us.apache.org/repos/asf/ambari/blob/df752f86/ambari-agent/src/main/python/ambari_agent/Utils.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py new file mode 100644 index 0000000..e7b03f9 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/Utils.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +class Utils(object): + @staticmethod + def make_immutable(value): + if isinstance(value, dict): + return ImmutableDictionary(value) + if isinstance(value, list): + return tuple(value) + return value + +class ImmutableDictionary(dict): + def __init__(self, dictionary): + """ + Recursively turn dict to ImmutableDictionary + """ + for k, v in dictionary.iteritems(): + dictionary[k] = Utils.make_immutable(v) + + super(ImmutableDictionary, self).__init__(dictionary) + +def raise_immutable_error(*args, **kwargs): + """ + PLEASE MAKE SURE YOU NEVER UPDATE CACHE on agent side. The cache should contain exactly the data received from server. + Modifications on agent-side will lead to unnecessary cache sync every agent registration. Which is a big concern on perf clusters! + Also immutability can lead to multithreading issues. + """ + raise TypeError("The dictionary is immutable cannot change it") + +ImmutableDictionary.__setitem__ = raise_immutable_error +ImmutableDictionary.__delitem__ = raise_immutable_error +ImmutableDictionary.clear = raise_immutable_error +ImmutableDictionary.pop = raise_immutable_error +ImmutableDictionary.update = raise_immutable_error \ No newline at end of file
