Repository: ambari Updated Branches: refs/heads/branch-3.0-perf 51b1a14d7 -> 917898cdb
AMBARI-21394. Create a topic to send hostLevelParams (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/917898cd Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/917898cd Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/917898cd Branch: refs/heads/branch-3.0-perf Commit: 917898cdbed3bd37e1bbfa813b115f93dfb6dcaf Parents: 51b1a14 Author: Andrew Onishuk <[email protected]> Authored: Tue Jul 4 11:29:31 2017 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Tue Jul 4 11:29:31 2017 +0300 ---------------------------------------------------------------------- .../ambari_agent/ClusterHostLevelParamsCache.py | 45 +++++++++++++++ .../src/main/python/ambari_agent/Constants.py | 4 +- .../ambari_agent/CustomServiceOrchestrator.py | 4 +- .../main/python/ambari_agent/HeartbeatThread.py | 14 +++-- .../python/ambari_agent/InitializerModule.py | 2 + .../main/python/ambari_agent/RecoveryManager.py | 16 ++--- .../listeners/HostLevelParamsEventListener.py | 61 ++++++++++++++++++++ .../listeners/MetadataEventListener.py | 11 +--- .../ambari_agent/TestAgentStompResponses.py | 20 +++++-- .../dummy_files/stomp/host_level_params.json | 37 ++++++++++++ .../stomp/metadata_after_registration.json | 9 +-- .../dummy_files/stomp/topology_add_host.json | 5 +- .../stomp/topology_cache_expected.json | 6 -- .../dummy_files/stomp/topology_create.json | 15 +---- 14 files changed, 187 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py new file mode 100644 index 0000000..3e490c5 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/ClusterHostLevelParamsCache.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python + +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from ambari_agent.ClusterCache import ClusterCache +import logging + +logger = logging.getLogger(__name__) + +class ClusterHostLevelParamsCache(ClusterCache): + """ + Maintains an in-memory cache and disk cache of the host level params send from server for + every cluster. This is useful for having quick access to any of the + topology properties. + + Host level params. Is parameters used by execution and status commands which can be generated + differently for every host. + """ + + def __init__(self, cluster_cache_dir): + """ + Initializes the host level params cache. + :param cluster_cache_dir: + :return: + """ + super(ClusterHostLevelParamsCache, self).__init__(cluster_cache_dir) + + def get_cache_name(self): + return 'host_level_params' http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/Constants.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py index 02945ee..17ed2be 100644 --- a/ambari-agent/src/main/python/ambari_agent/Constants.py +++ b/ambari-agent/src/main/python/ambari_agent/Constants.py @@ -21,16 +21,18 @@ limitations under the License. COMMANDS_TOPIC = '/user/commands' CONFIGURATIONS_TOPIC = '/user/configs' +HOST_LEVEL_PARAMS_TOPIC = '/user/host_level_params' METADATA_TOPIC = '/events/metadata' TOPOLOGIES_TOPIC = '/events/topologies' SERVER_RESPONSES_TOPIC = '/user/' PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC] -POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC] +POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC, HOST_LEVEL_PARAMS_TOPIC] TOPOLOGY_REQUEST_ENDPOINT = '/agents/topologies' METADATA_REQUEST_ENDPOINT = '/agents/metadata' CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs' +HOST_LEVEL_PARAMS_TOPIC_ENPOINT = '/agents/host_level_params' COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status' COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status' HOST_STATUS_REPORTS_ENDPOINT = '/reports/host_status' http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py index 6d1a491..c0b20ed 100644 --- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py +++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py @@ -81,6 +81,7 @@ class CustomServiceOrchestrator(): self.metadata_cache = initializer_module.metadata_cache self.topology_cache = initializer_module.topology_cache self.configurations_cache = initializer_module.configurations_cache + self.host_level_params_cache = initializer_module.host_level_params_cache self.config = initializer_module.config self.tmp_dir = self.config.get('agent', 'prefix') self.force_https_protocol = self.config.get_force_https_protocol() @@ -459,13 +460,14 @@ class CustomServiceOrchestrator(): metadata_cache = self.metadata_cache[cluster_id] configurations_cache = self.configurations_cache[cluster_id] + host_level_params_cache = self.host_level_params_cache[cluster_id] component_dict = self.topology_cache.get_component_info_by_key(cluster_id, service_name, component_name) command_dict = { 'clusterLevelParams': metadata_cache.clusterLevelParams, 'serviceLevelParams': metadata_cache.serviceLevelParams[service_name], - 'hostLevelParams': self.topology_cache.get_current_host_info(cluster_id).hostLevelParams, + 'hostLevelParams': host_level_params_cache, 'componentLevelParams': component_dict.componentLevelParams, 'script_type': self.SCRIPT_TYPE_PYTHON } http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index dbf4006..6ba84e6 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -31,6 +31,7 @@ from ambari_agent.listeners.TopologyEventListener import TopologyEventListener from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener from ambari_agent.listeners.MetadataEventListener import MetadataEventListener from ambari_agent.listeners.CommandsEventListener import CommandsEventListener +from ambari_agent.listeners.HostLevelParamsEventListener import HostLevelParamsEventListener HEARTBEAT_INTERVAL = 10 REQUEST_RESPONSE_TIMEOUT = 10 @@ -49,20 +50,21 @@ class HeartbeatThread(threading.Thread): self.registration_builder = Register(initializer_module.config) self.initializer_module = initializer_module - self.caches = [initializer_module.metadata_cache, initializer_module.topology_cache, initializer_module.configurations_cache] # listeners self.server_responses_listener = ServerResponsesListener() self.commands_events_listener = CommandsEventListener(initializer_module.action_queue) - self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache, initializer_module.recovery_manager) + self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache) self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache) self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache) - self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener] + self.host_level_params_events_listener = HostLevelParamsEventListener(initializer_module.host_level_params_cache, initializer_module.recovery_manager) + self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener, self.host_level_params_events_listener] self.post_registration_requests = [ (Constants.TOPOLOGY_REQUEST_ENDPOINT, initializer_module.topology_cache, self.topology_events_listener), (Constants.METADATA_REQUEST_ENDPOINT, initializer_module.metadata_cache, self.metadata_events_listener), - (Constants.CONFIGURATIONS_REQUEST_ENDPOINT, initializer_module.configurations_cache, self.configuration_events_listener) + (Constants.CONFIGURATIONS_REQUEST_ENDPOINT, initializer_module.configurations_cache, self.configuration_events_listener), + (Constants.HOST_LEVEL_PARAMS_TOPIC_ENPOINT, initializer_module.host_level_params_cache, self.host_level_params_events_listener) ] self.responseId = 0 @@ -152,6 +154,10 @@ class HeartbeatThread(threading.Thread): else: self.responseId = serverId + if 'restartAgent' in response and response['restartAgent'].lower() == "true": + logger.warn("Restarting the agent by the request from server") + Utils.restartAgent() + def get_heartbeat_body(self): """ Heartbeat body to be send to server http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/InitializerModule.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index 8de1fa5..dbc26e8 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -26,6 +26,7 @@ from ambari_agent.AmbariConfig import AmbariConfig from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache from ambari_agent.ClusterTopologyCache import ClusterTopologyCache from ambari_agent.ClusterMetadataCache import ClusterMetadataCache +from ambari_agent.ClusterHostLevelParamsCache import ClusterHostLevelParamsCache from ambari_agent.Utils import lazy_property from ambari_agent.security import AmbariStompConnection from ambari_agent.ActionQueue import ActionQueue @@ -73,6 +74,7 @@ class InitializerModule: self.metadata_cache = ClusterMetadataCache(self.cluster_cache_dir) self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir, self.config) self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir) + self.host_level_params_cache = ClusterHostLevelParamsCache(self.cluster_cache_dir) self.customServiceOrchestrator = CustomServiceOrchestrator(self) self.recovery_manager = RecoveryManager(self.recovery_cache_dir) http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py index 68dd0be..19f163e 100644 --- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py +++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py @@ -97,7 +97,6 @@ class RecoveryManager: self.__cache_lock = threading.RLock() self.active_command_count = 0 self.paused = False - self.recovery_timestamp = -1 self.cluster_id = None if not os.path.exists(cache_dir): @@ -110,7 +109,7 @@ class RecoveryManager: self.actions = {} - self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "", -1) + self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "") pass @@ -560,8 +559,7 @@ class RecoveryManager: "maxCount" : 10, "windowInMinutes" : 60, "retryGap" : 0, - "components" : "a,b", - "recoveryTimestamp" : 1458150424380 + "components" : "a,b" } """ @@ -573,7 +571,6 @@ class RecoveryManager: retry_gap = 5 max_lifetime_count = 12 enabled_components = "" - recovery_timestamp = -1 # Default value if recoveryTimestamp is not available. if dictionary and "recoveryConfig" in dictionary: @@ -599,11 +596,8 @@ class RecoveryManager: if 'components' in config: enabled_components = config['components'] - if 'recoveryTimestamp' in config: - recovery_timestamp = config['recoveryTimestamp'] - self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only, - auto_install_start, enabled_components, recovery_timestamp) + auto_install_start, enabled_components) pass """ @@ -617,10 +611,9 @@ class RecoveryManager: auto_start_only - True if AUTO_START recovery type was specified. False otherwise. auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise. enabled_components - CSV of componenents enabled for auto start. - recovery_timestamp - Timestamp when the recovery values were last updated. -1 on start up. """ def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, - auto_start_only, auto_install_start, enabled_components, recovery_timestamp): + auto_start_only, auto_install_start, enabled_components): """ Update recovery configuration, recovery is disabled if configuration values are not correct @@ -653,7 +646,6 @@ class RecoveryManager: self.auto_install_start = auto_install_start self.max_lifetime_count = max_lifetime_count self.enabled_components = [] - self.recovery_timestamp = recovery_timestamp self.allowed_desired_states = [self.STARTED, self.INSTALLED] self.allowed_current_states = [self.INIT, self.INSTALL_FAILED, self.INSTALLED, self.STARTED] http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py new file mode 100644 index 0000000..5aee634 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import ambari_stomp + +from ambari_agent.listeners import EventListener +from ambari_agent import Constants + +logger = logging.getLogger(__name__) + +class HostLevelParamsEventListener(EventListener): + """ + Listener of Constants.HOST_LEVEL_PARAMS_TOPIC events from server. + """ + def __init__(self, host_level_params_cache, recovery_manager): + self.host_level_params_cache = host_level_params_cache + self.recovery_manager = recovery_manager + + def on_event(self, headers, message): + """ + Is triggered when an event to Constants.CONFIGURATIONS_TOPIC topic is received from server. + + @param headers: headers dictionary + @param message: message payload dictionary + """ + # this kind of response is received if hash was identical. And server does not need to change anything + if message == {}: + return + + self.host_level_params_cache.rewrite_cache(message['clusters']) + self.host_level_params_cache.hash = message['hash'] + + if message['clusters']: + # FIXME: Recovery manager does not support multiple cluster as of now. + cluster_id = message['clusters'].keys()[0] + + if 'recoveryConfig' in message['clusters'][cluster_id]: + logging.info("Updating recoveryConfig from metadata") + self.recovery_manager.update_recovery_config(self.host_level_params_cache[cluster_id]) + self.recovery_manager.cluster_id = cluster_id + + def get_handled_path(self): + return Constants.HOST_LEVEL_PARAMS_TOPIC \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py index 364d8af..5802ffe 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py @@ -32,9 +32,8 @@ class MetadataEventListener(EventListener): """ Listener of Constants.METADATA_TOPIC events from server. """ - def __init__(self, metadata_cache, recovery_manager): + def __init__(self, metadata_cache): self.metadata_cache = metadata_cache - self.recovery_manager = recovery_manager def on_event(self, headers, message): """ @@ -50,13 +49,5 @@ class MetadataEventListener(EventListener): self.metadata_cache.rewrite_cache(message['clusters']) self.metadata_cache.hash = message['hash'] - # FIXME: Recovery manager does not support multiple cluster as of now. - cluster_id = message['clusters'].keys()[0] - - if 'recoveryConfig' in message['clusters'][cluster_id]: - logging.info("Updating recoveryConfig from metadata") - self.recovery_manager.update_recovery_config(self.metadata_cache[cluster_id]) - self.recovery_manager.cluster_id = cluster_id - def get_handled_path(self): return Constants.METADATA_TOPIC \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py index c41f87e..feaf7dd 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py @@ -40,7 +40,7 @@ from mock.mock import MagicMock, patch @patch("ambari_agent.hostname.hostname", new=MagicMock(return_value="c6401.ambari.apache.org")) class TestAgentStompResponses(BaseStompServerTestCase): def setUp(self): - self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json']) + self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json', '/tmp/host_level_params.json']) if not os.path.exists("/tmp/ambari-agent"): os.mkdir("/tmp/ambari-agent") @@ -83,9 +83,13 @@ class TestAgentStompResponses(BaseStompServerTestCase): f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body=self.get_json("configurations_update.json")) self.server.topic_manager.send(f) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=self.get_json("host_level_params.json")) + self.server.topic_manager.send(f) + initial_topology_request = self.server.frames_queue.get() initial_metadata_request = self.server.frames_queue.get() initial_configs_request = self.server.frames_queue.get() + initial_host_level_params_request = self.server.frames_queue.get() while not initializer_module.is_registered: time.sleep(0.1) @@ -103,6 +107,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): configurations_subscribe_frame = self.server.frames_queue.get() metadata_subscribe_frame = self.server.frames_queue.get() topologies_subscribe_frame = self.server.frames_queue.get() + host_level_params_subscribe_frame = self.server.frames_queue.get() heartbeat_frame = self.server.frames_queue.get() dn_install_in_progress_frame = json.loads(self.server.frames_queue.get().body) dn_install_failed_frame = json.loads(self.server.frames_queue.get().body) @@ -116,7 +121,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): initializer_module.stop_event.set() - f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'})) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body=json.dumps({'id':'1'})) self.server.topic_manager.send(f) command_status_reporter.join() @@ -179,6 +184,9 @@ class TestAgentStompResponses(BaseStompServerTestCase): f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}') self.server.topic_manager.send(f) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body='{}') + self.server.topic_manager.send(f) + commands_subscribe_frame = self.server.frames_queue.get() configurations_subscribe_frame = self.server.frames_queue.get() metadata_subscribe_frame = self.server.frames_queue.get() @@ -188,7 +196,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): initializer_module.stop_event.set() - f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'})) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body=json.dumps({'id':'1'})) self.server.topic_manager.send(f) heartbeat_thread.join() @@ -222,9 +230,13 @@ class TestAgentStompResponses(BaseStompServerTestCase): f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}') self.server.topic_manager.send(f) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body='{}') + self.server.topic_manager.send(f) + initial_topology_request = self.server.frames_queue.get() initial_metadata_request = self.server.frames_queue.get() initial_configs_request = self.server.frames_queue.get() + initial_host_level_params_request = self.server.frames_queue.get() while not initializer_module.is_registered: time.sleep(0.1) @@ -262,7 +274,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): initializer_module.stop_event.set() - f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'})) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '5'}, body=json.dumps({'id':'1'})) self.server.topic_manager.send(f) heartbeat_thread.join() http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/host_level_params.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/host_level_params.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/host_level_params.json new file mode 100644 index 0000000..7447e93 --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/host_level_params.json @@ -0,0 +1,37 @@ +{ + "hash":"aa4cd03688d36f18fc2cbba06614bafe", + "clusters":{ + "0":{ + "repoInfo": [ + { + "defaultBaseUrl": "http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3", + "ambariManagedRepositories": true, + "baseUrl": "http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/", + "latestBaseUrl": "http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129", + "repoSaved": true, + "repoName": "HDP", + "osType": "redhat6", + "unique": true, + "repoId": "HDP-2.6" + }, + { + "defaultBaseUrl": "http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6", + "ambariManagedRepositories": true, + "baseUrl": "http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6", + "latestBaseUrl": "http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6", + "repoSaved": true, + "repoName": "HDP-UTILS", + "osType": "redhat6", + "unique": false, + "repoId": "HDP-UTILS-1.1.0.21" + } + ], + "recoveryConfig": { + "type" : "AUTO_INSTALL_START", + "maxCount" : 10, + "windowInMinutes" : 60, + "components" : "NAMENODE,DATANODE" + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json index 6462ccf..c1aba28 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json @@ -38,14 +38,7 @@ }, "status_commands_to_run": [ "STATUS" - ], - "recoveryConfig": { - "type" : "AUTO_INSTALL_START", - "maxCount" : 10, - "windowInMinutes" : 60, - "components" : "NAMENODE,DATANODE", - "recoveryTimestamp" : 1458150424380 - } + ] } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json index 2458f08..a9407c3 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json @@ -8,10 +8,7 @@ "hostId":2, "hostName":"c6403.ambari.apache.org", "rackName":"/default-rack", - "ipv4":"192.168.64.103", - "hostLevelParams": { - "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]" - } + "ipv4":"192.168.64.103" } ] } http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json index 53d0e0d..ff2b3fd 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json @@ -46,18 +46,12 @@ "hosts": [ { "hostId": 1, - "hostLevelParams": { - "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]" - }, "hostName": "c6402.ambari.apache.org", "ipv4": "192.168.64.102", "rackName": "/default-rack" }, { "hostId": 2, - "hostLevelParams": { - "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]" - }, "hostName": "c6403.ambari.apache.org", "ipv4": "192.168.64.103", "rackName": "/default-rack" http://git-wip-us.apache.org/repos/asf/ambari/blob/917898cd/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json index dfe17b9..6df4bc3 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json @@ -50,19 +50,13 @@ "hostId":0, "hostName":"c6401.ambari.apache.org", "rackName":"/default-rack", - "ipv4":"192.168.64.101", - "hostLevelParams": { - "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]" - } + "ipv4":"192.168.64.101" }, { "hostId":1, "hostName":"c6402.ambari.apache.org", "rackName":"/default-rack", - "ipv4":"192.168.64.102", - "hostLevelParams": { - "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]" - } + "ipv4":"192.168.64.102" } ] }, @@ -87,10 +81,7 @@ "hostId":0, "hostName":"c6401.ambari.apache.org", "rackName":"/default-rack", - "ipv4":"192.168.64.101", - "hostLevelParams": { - "repo_info": "[{\"baseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129/\",\"osType\":\"redhat6\",\"repoId\":\"HDP-2.6\",\"repoName\":\"HDP\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.6.0.3\",\"latestBaseUrl\":\"http://s3.amazonaws.com/dev.hortonworks.com/HDP/centos6/2.x/BUILDS/2.6.1.0-129\",\"repoSaved\":true,\"unique\":true,\"ambariManagedRepositories\":true},{\"baseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"osType\":\"redhat6\",\"repoId\":\"HDP-UTILS-1.1.0.21\",\"repoName\":\"HDP-UTILS\",\"defaultBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"latestBaseUrl\":\"http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.21/repos/centos6\",\"repoSaved\":true,\"unique\":false,\"ambariManagedRepositories\":true}]" - } + "ipv4":"192.168.64.101" } ] }
