Repository: ambari Updated Branches: refs/heads/branch-3.0-perf 7fb94cb81 -> 194e391cb
AMBARI-21763. Cache hashes should not be reset after restart ; some ambari-server restart fixes (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/194e391c Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/194e391c Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/194e391c Branch: refs/heads/branch-3.0-perf Commit: 194e391cbfe811075664481d9a9f13e62cc83413 Parents: 7fb94cb Author: Andrew Onishuk <[email protected]> Authored: Tue Aug 22 13:37:15 2017 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Tue Aug 22 13:37:15 2017 +0300 ---------------------------------------------------------------------- .../python/ambari_agent/AlertStatusReporter.py | 4 ++-- .../ClusterAlertDefinitionsCache.py | 8 +++---- .../main/python/ambari_agent/ClusterCache.py | 25 ++++++++++++++------ .../python/ambari_agent/ClusterTopologyCache.py | 8 +++---- .../python/ambari_agent/CommandStatusDict.py | 4 ++-- .../ambari_agent/ComponentStatusExecutor.py | 23 ++++++++++++++---- .../python/ambari_agent/HostStatusReporter.py | 4 ++-- .../python/ambari_agent/InitializerModule.py | 4 ++-- .../listeners/AlertDefinitionsEventListener.py | 9 +++---- .../listeners/ConfigurationEventListener.py | 3 +-- .../listeners/HostLevelParamsEventListener.py | 3 +-- .../listeners/MetadataEventListener.py | 3 +-- .../listeners/TopologyEventListener.py | 9 +++---- .../src/main/python/ambari_agent/security.py | 4 ---- .../ambari_agent/TestAgentStompResponses.py | 4 ---- 15 files changed, 61 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py index 20cb717..084a342 100644 --- a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py +++ b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py @@ -20,7 +20,7 @@ limitations under the License. import logging import threading -from ambari_agent import security +from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed from ambari_agent import Constants logger = logging.getLogger(__name__) @@ -46,7 +46,7 @@ class AlertStatusReporter(threading.Thread): alerts = self.collector.alerts() if alerts: self.initializer_module.connection.send(message=alerts, destination=Constants.ALERTS_STATUS_REPORTS_ENDPOINT) - except security.ConnectionIsNotEstablished: # server and agent disconnected during sending data. Not an issue + except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue pass except: logger.exception("Exception in AlertStatusReporter. Re-running it") http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py index a1f7199..d3ab48e 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterAlertDefinitionsCache.py @@ -66,7 +66,7 @@ class ClusterAlertDefinitionsCache(ClusterCache): return None - def cache_update(self, cache_update): + def cache_update(self, cache_update, cache_hash): mutable_dict = self._get_mutable_copy() for cluster_id in mutable_dict: @@ -78,9 +78,9 @@ class ClusterAlertDefinitionsCache(ClusterCache): else: mutable_dict[cluster_id]['alertDefinitions'][index_of_alert] = alert_definition - self.rewrite_cache(mutable_dict) + self.rewrite_cache(mutable_dict, cache_hash) - def cache_delete(self, cache_update): + def cache_delete(self, cache_update, cache_hash): mutable_dict = self._get_mutable_copy() for cluster_id in mutable_dict: @@ -93,7 +93,7 @@ class ClusterAlertDefinitionsCache(ClusterCache): del mutable_dict[cluster_id]['alertDefinitions'][index_of_alert] - self.rewrite_cache(mutable_dict) + self.rewrite_cache(mutable_dict, cache_hash) def get_cache_name(self): http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/main/python/ambari_agent/ClusterCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py index 2316866..7704443 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py @@ -48,18 +48,23 @@ class ClusterCache(dict): self.hash = None self.__current_cache_json_file = os.path.join(self.cluster_cache_dir, self.get_cache_name()+'.json') + self.__current_cache_hash_file = os.path.join(self.cluster_cache_dir, '.'+self.get_cache_name()+'.hash') self._cache_lock = threading.RLock() self.__file_lock = ClusterCache.file_locks[self.__current_cache_json_file] # if the file exists, then load it cache_dict = {} - if os.path.isfile(self.__current_cache_json_file): - with self.__file_lock: + with self.__file_lock: + if os.path.isfile(self.__current_cache_json_file): with open(self.__current_cache_json_file, 'r') as fp: cache_dict = json.load(fp) - self.rewrite_cache(cache_dict) + if os.path.isfile(self.__current_cache_hash_file): + with open(self.__current_cache_hash_file, 'r') as fp: + self.hash = fp.read() + + self.rewrite_cache(cache_dict, self.hash) def get_cluster_indepedent_data(self): return self[ClusterCache.COMMON_DATA_CLUSTER] @@ -70,7 +75,7 @@ class ClusterCache(dict): cluster_ids.remove(ClusterCache.COMMON_DATA_CLUSTER) return cluster_ids - def rewrite_cache(self, cache): + def rewrite_cache(self, cache, cache_hash): cache_ids_to_delete = [] for existing_cluster_id in self: if not existing_cluster_id in cache: @@ -83,17 +88,19 @@ class ClusterCache(dict): for cache_id_to_delete in cache_ids_to_delete: del self[cache_id_to_delete] + self.hash = cache_hash + self.on_cache_update() self.persist_cache() - def cache_update(self, update_dict): + def cache_update(self, update_dict, cache_hash): """ Update the current dictionary by other one """ merged_dict = Utils.update_nested(self._get_mutable_copy(), update_dict) - self.rewrite_cache(merged_dict) + self.rewrite_cache(merged_dict, cache_hash) - def cache_delete(self, delete_dict): + def cache_delete(self, delete_dict, cache_hash): raise NotImplemented() def rewrite_cluster_cache(self, cluster_id, cache): @@ -122,6 +129,10 @@ class ClusterCache(dict): with open(self.__current_cache_json_file, 'w') as f: json.dump(self, f, indent=2) + if self.hash is not None: + with open(self.__current_cache_hash_file, 'w') as fp: + fp.write(self.hash) + def _get_mutable_copy(self): with self._cache_lock: return Utils.get_mutable_copy(self) http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py index 559a956..e35b219 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py @@ -127,7 +127,7 @@ class ClusterTopologyCache(ClusterCache): return component_dict return None - def cache_update(self, cache_update): + def cache_update(self, cache_update, cache_hash): """ Handle event of update of topology. @@ -171,9 +171,9 @@ class ClusterTopologyCache(ClusterCache): else: components_mutable_list.append(component_updates_dict) - self.rewrite_cache(mutable_dict) + self.rewrite_cache(mutable_dict, cache_hash) - def cache_delete(self, cache_update): + def cache_delete(self, cache_update, cache_hash): """ Handle event of delete on topology. @@ -219,6 +219,6 @@ class ClusterTopologyCache(ClusterCache): for cluster_id in clusters_ids_to_delete: del mutable_dict[cluster_id] - self.rewrite_cache(mutable_dict) + self.rewrite_cache(mutable_dict, cache_hash) http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py index e7b7e49..fa71d15 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py @@ -25,7 +25,7 @@ from collections import defaultdict from Grep import Grep from ambari_agent import Constants -from ambari_agent import security +from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed logger = logging.getLogger() @@ -66,7 +66,7 @@ class CommandStatusDict(): try: self.initializer_module.connection.send(message={'clusters':reports_dict}, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT) return True - except security.ConnectionIsNotEstablished: + except ConnectionIsAlreadyClosed: return False def report(self): http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py index 66df15a..4ccd815 100644 --- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py @@ -24,7 +24,7 @@ import threading from ambari_agent import Constants from ambari_agent.LiveStatus import LiveStatus from collections import defaultdict -from ambari_agent import security +from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed logger = logging.getLogger(__name__) @@ -36,7 +36,7 @@ class ComponentStatusExecutor(threading.Thread): self.customServiceOrchestrator = initializer_module.customServiceOrchestrator self.stop_event = initializer_module.stop_event self.recovery_manager = initializer_module.recovery_manager - self.reported_component_status = defaultdict(lambda:defaultdict(lambda:None)) # component statuses which were received by server + self.reported_component_status = defaultdict(lambda:defaultdict(lambda:defaultdict(lambda:None))) # component statuses which were received by server threading.Thread.__init__(self) def run(self): @@ -45,6 +45,7 @@ class ComponentStatusExecutor(threading.Thread): """ while not self.stop_event.is_set(): try: + self.clean_not_existing_clusters_info() cluster_reports = defaultdict(lambda:[]) for cluster_id in self.topology_cache.get_cluster_ids(): @@ -71,6 +72,10 @@ class ComponentStatusExecutor(threading.Thread): if self.stop_event.is_set(): break + # cluster was already removed + if not cluster_id in self.topology_cache.get_cluster_ids(): + break + service_name = component_dict.serviceName component_name = component_dict.componentName @@ -93,13 +98,13 @@ class ComponentStatusExecutor(threading.Thread): 'clusterId': cluster_id, } - if status != self.reported_component_status[component_name][command_name]: + if status != self.reported_component_status[cluster_id][component_name][command_name]: logging.info("Status for {0} has changed to {1}".format(component_name, status)) cluster_reports[cluster_id].append(result) self.recovery_manager.handle_status_change(component_name, status) self.send_updates_to_server(cluster_reports) - except security.ConnectionIsNotEstablished: # server and agent disconnected during sending data. Not an issue + except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue pass except: logger.exception("Exception in ComponentStatusExecutor. Re-running it") @@ -119,4 +124,12 @@ class ComponentStatusExecutor(threading.Thread): command = report['command'] status = report['status'] - self.reported_component_status[component_name][command] = status + self.reported_component_status[cluster_id][component_name][command] = status + + def clean_not_existing_clusters_info(self): + """ + This needs to be done to remove information about clusters which where deleted (e.g. ambari-server reset) + """ + for cluster_id in self.reported_component_status.keys(): + if not cluster_id in self.topology_cache.get_cluster_ids(): + del self.reported_component_status[cluster_id] http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py index c60ea36..09ce016 100644 --- a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py +++ b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py @@ -23,7 +23,7 @@ from ambari_agent import Constants from ambari_agent.HostInfo import HostInfo from ambari_agent.Utils import Utils from ambari_agent.Hardware import Hardware -from ambari_agent import security +from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed logger = logging.getLogger(__name__) @@ -53,7 +53,7 @@ class HostStatusReporter(threading.Thread): # don't use else to avoid race condition if not self.initializer_module.is_registered: self.last_report = {} - except security.ConnectionIsNotEstablished: # server and agent disconnected during sending data. Not an issue + except ConnectionIsAlreadyClosed: # server and agent disconnected during sending data. Not an issue pass except: logger.exception("Exception in HostStatusReporter. Re-running it") http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/main/python/ambari_agent/InitializerModule.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index 8208b32..4fbef65 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -34,7 +34,7 @@ from ambari_agent.CommandStatusDict import CommandStatusDict from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator from ambari_agent.RecoveryManager import RecoveryManager from ambari_agent.AlertSchedulerHandler import AlertSchedulerHandler -from ambari_agent import security +from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed logger = logging.getLogger(__name__) @@ -104,4 +104,4 @@ class InitializerModule: Can be a result of race condition: begin sending X -> got disconnected by HeartbeatThread -> continue sending X """ - raise security.ConnectionIsNotEstablished("Connection to server is not established") + raise ConnectionIsAlreadyClosed("Connection to server is not established") http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py index cf72a4d..91ae5de 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py @@ -48,14 +48,11 @@ class AlertDefinitionsEventListener(EventListener): event_type = message['eventType'] if event_type == 'CREATE': - self.alert_definitions_cache.rewrite_cache(message['clusters']) - self.alert_definitions_cache.hash = message['hash'] + self.alert_definitions_cache.rewrite_cache(message['clusters'], message['hash']) elif event_type == 'UPDATE': - self.alert_definitions_cache.cache_update(message['clusters']) - self.alert_definitions_cache.hash = message['hash'] + self.alert_definitions_cache.cache_update(message['clusters'], message['hash']) elif event_type == 'DELETE': - self.alert_definitions_cache.cache_delete(message['clusters']) - self.alert_definitions_cache.hash = message['hash'] + self.alert_definitions_cache.cache_delete(message['clusters'], message['hash']) else: logger.error("Unknown event type '{0}' for alert event") http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py index e32c503..a4884d8 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py @@ -44,8 +44,7 @@ class ConfigurationEventListener(EventListener): if message == {}: return - self.configuration_cache.rewrite_cache(message['clusters']) - self.configuration_cache.hash = message['hash'] + self.configuration_cache.rewrite_cache(message['clusters'], message['hash']) def get_handled_path(self): return Constants.CONFIGURATIONS_TOPIC \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py index aee2992..de5cd03 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py @@ -45,8 +45,7 @@ class HostLevelParamsEventListener(EventListener): if message == {}: return - self.host_level_params_cache.rewrite_cache(message['clusters']) - self.host_level_params_cache.hash = message['hash'] + self.host_level_params_cache.rewrite_cache(message['clusters'], message['hash']) if message['clusters']: # FIXME: Recovery manager does not support multiple cluster as of now. http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py index 1e9b6e7..f31820e 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py @@ -46,8 +46,7 @@ class MetadataEventListener(EventListener): if message == {}: return - self.metadata_cache.cache_update(message['clusters']) - self.metadata_cache.hash = message['hash'] + self.metadata_cache.cache_update(message['clusters'], message['hash']) def get_handled_path(self): return Constants.METADATA_TOPIC \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py index 19a1d32..71aa161 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py @@ -47,14 +47,11 @@ class TopologyEventListener(EventListener): event_type = message['eventType'] if event_type == 'CREATE': - self.topology_cache.rewrite_cache(message['clusters']) - self.topology_cache.hash = message['hash'] + self.topology_cache.rewrite_cache(message['clusters'], message['hash']) elif event_type == 'UPDATE': - self.topology_cache.cache_update(message['clusters']) - self.topology_cache.hash = message['hash'] + self.topology_cache.cache_update(message['clusters'], message['hash']) elif event_type == 'DELETE': - self.topology_cache.cache_delete(message['clusters']) - self.topology_cache.hash = message['hash'] + self.topology_cache.cache_delete(message['clusters'], message['hash']) else: logger.error("Unknown event type '{0}' for topology event") http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/main/python/ambari_agent/security.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py index a505658..db43494 100644 --- a/ambari-agent/src/main/python/ambari_agent/security.py +++ b/ambari-agent/src/main/python/ambari_agent/security.py @@ -103,10 +103,6 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection): return sock - -class ConnectionIsNotEstablished(Exception): - pass - def establish_connection(connection_url): """ Create a stomp connection http://git-wip-us.apache.org/repos/asf/ambari/blob/194e391c/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py index 55c489f..3c571ab 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py @@ -51,10 +51,6 @@ class TestAgentStompResponses(BaseStompServerTestCase): return super(TestAgentStompResponses, self).setUp() - def tearDown(self): - self.initializer_module.stop_event.set() - return super(TestAgentStompResponses, self).tearDown() - @patch.object(CustomServiceOrchestrator, "runCommand") def test_mock_server_can_start(self, runCommand_mock): runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 'structuredOut' : '{}', 'exitcode':1}
