Repository: ambari Updated Branches: refs/heads/branch-3.0-perf 6bad191bb -> 6303e9358
AMBARI-21140. Support CREATE/UPDATE/DELETE of topology, hashes and some events format changes (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6303e935 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6303e935 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6303e935 Branch: refs/heads/branch-3.0-perf Commit: 6303e9358db26640ab2c6161111f2d7f527c965c Parents: 6bad191 Author: Andrew Onishuk <[email protected]> Authored: Tue May 30 10:12:35 2017 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Tue May 30 10:12:35 2017 +0300 ---------------------------------------------------------------------- .../main/python/ambari_agent/ClusterCache.py | 42 ++-- .../python/ambari_agent/ClusterTopologyCache.py | 85 ++++++++ .../ambari_agent/ComponentStatusExecutor.py | 2 +- .../main/python/ambari_agent/HeartbeatThread.py | 3 +- .../src/main/python/ambari_agent/Utils.py | 19 +- .../listeners/ConfigurationEventListener.py | 7 +- .../listeners/MetadataEventListener.py | 9 +- .../listeners/TopologyEventListener.py | 18 +- .../python/ambari_agent/listeners/__init__.py | 2 +- .../src/main/python/ambari_agent/security.py | 5 +- .../ambari_agent/BaseStompServerTestCase.py | 14 +- .../ambari_agent/TestAgentStompResponses.py | 83 +++++++- .../stomp/configurations_update.json | 45 ++-- .../stomp/metadata_after_registration.json | 212 ++++++++++++++++--- .../stomp/topology_add_component.json | 23 ++ .../stomp/topology_add_component_host.json | 18 ++ .../dummy_files/stomp/topology_add_host.json | 16 ++ .../stomp/topology_cache_expected.json | 58 +++++ .../dummy_files/stomp/topology_create.json | 85 ++++++++ .../stomp/topology_delete_cluster.json | 8 + .../stomp/topology_delete_component.json | 19 ++ .../stomp/topology_delete_component_host.json | 18 ++ .../dummy_files/stomp/topology_delete_host.json | 13 ++ .../dummy_files/stomp/topology_update.json | 69 ------ 24 files changed, 714 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/ClusterCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py index 4b88f71..8e91afe 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py @@ -18,7 +18,6 @@ See the License for the specific language governing permissions and limitations under the License. """ -import hashlib import logging import ambari_simplejson as json import os @@ -45,6 +44,7 @@ class ClusterCache(dict): """ self.cluster_cache_dir = cluster_cache_dir + self.hash = None self.__current_cache_json_file = os.path.join(self.cluster_cache_dir, self.get_cache_name()+'.json') @@ -67,19 +67,29 @@ class ClusterCache(dict): def get_cluster_ids(self): return self.keys() - def update_cache(self, cache): + def rewrite_cache(self, cache): + cache_ids_to_delete = [] + for existing_cluster_id in self: + if not existing_cluster_id in cache: + cache_ids_to_delete.append(existing_cluster_id) + for cluster_id, cluster_cache in cache.iteritems(): - self.update_cluster_cache(cluster_id, cluster_cache) + self.rewrite_cluster_cache(cluster_id, cluster_cache) + + with self._cache_lock: + for cache_id_to_delete in cache_ids_to_delete: + del self[cache_id_to_delete] - def update_cluster_cache(self, cluster_id, cache): + + def rewrite_cluster_cache(self, cluster_id, cache): """ Thread-safe method for writing out the specified cluster cache - and updating the in-memory representation. + and rewriting the in-memory representation. :param cluster_id: :param cache: :return: """ - logger.info("Updating cache {0} for cluster {1}".format(self.__class__.__name__, cluster_id)) + logger.info("Rewriting cache {0} for cluster {1}".format(self.__class__.__name__, cluster_id)) # The cache should contain exactly the data received from server. # Modifications on agent-side will lead to unnecessary cache sync every agent registration. Which is a big concern on perf clusters! @@ -88,7 +98,9 @@ class ClusterCache(dict): with self._cache_lock: self[cluster_id] = immutable_cache + self.persist_cache() + def persist_cache(self): # ensure that our cache directory exists if not os.path.exists(self.cluster_cache_dir): os.makedirs(self.cluster_cache_dir) @@ -97,24 +109,10 @@ class ClusterCache(dict): with os.fdopen(os.open(self.__current_cache_json_file, os.O_WRONLY | os.O_CREAT, 0o600), "w") as f: json.dump(self, f, indent=2) - def get_md5_hashsum(self): - """ - Thread-safe method for writing out the specified cluster cache - and updating the in-memory representation. - :param cache: - :return: - """ + def _get_mutable_copy(self): with self._cache_lock: - # have to make sure server generates json in exactly the same way. So hashes are equal - json_repr = json.dumps(self, sort_keys=True) - - md5_calculator = hashlib.md5() - md5_calculator.update(json_repr) - result = md5_calculator.hexdigest() - - logger.info("Cache value for {0} is {1}".format(self.__class__.__name__, result)) + return Utils.get_mutable_copy(self) - return result def get_cache_name(self): raise NotImplemented() http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py index 0cfe5a4..1102cd1 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py @@ -40,3 +40,88 @@ class ClusterTopologyCache(ClusterCache): def get_cache_name(self): return 'topology' + + @staticmethod + def find_host_by_id(host_dicts, cluster_id, host_id): + for host_dict in host_dicts: + if host_dict['hostId'] == host_id: + return host_dict + return None + + @staticmethod + def find_component(component_dicts, cluster_id, service_name, component_name): + for component_dict in component_dicts: + if component_dict['serviceName'] == service_name and component_dict['componentName'] == component_name: + return component_dict + return None + + def cache_update(self, cache_update): + mutable_dict = self._get_mutable_copy() + + for cluster_id, cluster_updates_dict in cache_update.iteritems(): + if not cluster_id in mutable_dict: + logger.error("Cannot do topology update for cluster cluster_id={0}, because do not have information about the cluster") + continue + + if 'hosts' in cluster_updates_dict: + hosts_mutable_list = mutable_dict[cluster_id]['hosts'] + for host_updates_dict in cluster_updates_dict['hosts']: + host_mutable_dict = ClusterTopologyCache.find_host_by_id(hosts_mutable_list, cluster_id, host_updates_dict['hostId']) + if host_mutable_dict is not None: + host_mutable_dict.update(host_updates_dict) + else: + hosts_mutable_list.append(host_updates_dict) + + if 'components' in cluster_updates_dict: + components_mutable_list = mutable_dict[cluster_id]['components'] + for component_updates_dict in cluster_updates_dict['components']: + component_mutable_dict = ClusterTopologyCache.find_component(components_mutable_list, cluster_id, component_updates_dict['serviceName'], component_updates_dict['componentName']) + if component_mutable_dict is not None: + component_updates_dict['hostIds'] += component_mutable_dict['hostIds'] + component_updates_dict['hostIds'] = list(set(component_updates_dict['hostIds'])) + component_mutable_dict.update(component_updates_dict) + else: + components_mutable_list.append(component_updates_dict) + + self.rewrite_cache(mutable_dict) + + def cache_delete(self, cache_update): + mutable_dict = self._get_mutable_copy() + clusters_ids_to_delete = [] + + for cluster_id, cluster_updates_dict in cache_update.iteritems(): + if not cluster_id in mutable_dict: + logger.error("Cannot do topology delete for cluster cluster_id={0}, because do not have information about the cluster") + continue + + if 'hosts' in cluster_updates_dict: + hosts_mutable_list = mutable_dict[cluster_id]['hosts'] + for host_updates_dict in cluster_updates_dict['hosts']: + host_to_delete = ClusterTopologyCache.find_host_by_id(hosts_mutable_list, cluster_id, host_updates_dict['hostId']) + if host_to_delete is not None: + mutable_dict[cluster_id]['hosts'] = [host_dict for host_dict in hosts_mutable_list if host_dict != host_to_delete] + else: + logger.error("Cannot do topology delete for cluster_id={0}, host_id={1}, because cannot find the host in cache".format(cluster_id, host_updates_dict['hostId'])) + + if 'components' in cluster_updates_dict: + components_mutable_list = mutable_dict[cluster_id]['components'] + for component_updates_dict in cluster_updates_dict['components']: + component_mutable_dict = ClusterTopologyCache.find_component(components_mutable_list, cluster_id, component_updates_dict['serviceName'], component_updates_dict['componentName']) + if 'hostIds' in component_mutable_dict: + exclude_host_ids = component_updates_dict['hostIds'] + component_mutable_dict['hostIds'] = [host_id for host_id in component_mutable_dict['hostIds'] if host_id not in exclude_host_ids] + if not 'hostIds' in component_mutable_dict or component_mutable_dict['hostIds'] == []: + if component_mutable_dict is not None: + mutable_dict[cluster_id]['components'] = [component_dict for component_dict in components_mutable_list if component_dict != component_mutable_dict] + else: + logger.error("Cannot do component delete for cluster_id={0}, serviceName={1}, componentName={2}, because cannot find the host in cache".format(cluster_id, component_updates_dict['serviceName'], component_updates_dict['componentName'])) + + if cluster_updates_dict == {}: + clusters_ids_to_delete.append(cluster_id) + + for cluster_id in clusters_ids_to_delete: + del mutable_dict[cluster_id] + + self.rewrite_cache(mutable_dict) + + http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py index 6783138..41f0df4 100644 --- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py @@ -72,7 +72,7 @@ class ComponentStatusExecutor(threading.Thread): component_name = component_dict.componentName # TODO STOMP: run real command - logger.info("Running {0}/{1}".format(component_dict.statusCommandsParams.service_package_folder, component_dict.statusCommandsParams.script)) + logger.info("Running {0}/{1}".format(component_dict.statusCommandParams.servicePackageFolder, component_dict.statusCommandParams.script)) #self.customServiceOrchestrator.requestComponentStatus(command) status = random.choice(["INSTALLED","STARTED"]) result = { http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index e88fee7..b7bb5ed 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -78,6 +78,7 @@ class HeartbeatThread(threading.Thread): # TODO STOMP: handle heartbeat reponse except: logger.exception("Exception in HeartbeatThread. Re-running the registration") + self.stop_event.wait(self.heartbeat_interval) self.initializer_module.is_registered = False self.initializer_module.connection.disconnect() pass @@ -104,7 +105,7 @@ class HeartbeatThread(threading.Thread): self.registration_response = response for endpoint, cache, listener in self.post_registration_requests: - response = self.blocking_request({'hash': cache.get_md5_hashsum()}, endpoint) + response = self.blocking_request({'hash': cache.hash}, endpoint) listener.on_event({}, response) self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE) http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/Utils.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py index e48aa5f..f94eba0 100644 --- a/ambari-agent/src/main/python/ambari_agent/Utils.py +++ b/ambari-agent/src/main/python/ambari_agent/Utils.py @@ -43,8 +43,9 @@ class BlockingDictionary(): """ Block until a key in dictionary is available and than pop it. """ - if key in self.dict: - return self.dict.pop(key) + with self.dict_lock: + if key in self.dict: + return self.dict.pop(key) while True: self.put_event.wait() @@ -69,6 +70,20 @@ class Utils(object): return value + @staticmethod + def get_mutable_copy(param): + if isinstance(param, dict): + mutable_dict = {} + + for k, v in param.iteritems(): + mutable_dict[k] = Utils.get_mutable_copy(v) + + return mutable_dict + elif isinstance(param, (list, tuple)): + return [Utils.get_mutable_copy(x) for x in param] + + return param + class ImmutableDictionary(dict): def __init__(self, dictionary): """ http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py index 20b42e6..e32c503 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py @@ -40,7 +40,12 @@ class ConfigurationEventListener(EventListener): @param headers: headers dictionary @param message: message payload dictionary """ - self.configuration_cache.update_cache(message) + # this kind of response is received if hash was identical. And server does not need to change anything + if message == {}: + return + + self.configuration_cache.rewrite_cache(message['clusters']) + self.configuration_cache.hash = message['hash'] def get_handled_path(self): return Constants.CONFIGURATIONS_TOPIC \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py index c738ea2..788d381 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py @@ -26,6 +26,8 @@ from ambari_agent import Constants logger = logging.getLogger(__name__) +METADATA_DICTIONARY_KEY = 'metadataClusters' + class MetadataEventListener(EventListener): """ Listener of Constants.METADATA_TOPIC events from server. @@ -40,7 +42,12 @@ class MetadataEventListener(EventListener): @param headers: headers dictionary @param message: message payload dictionary """ - self.topology_cache.update_cache(message) + # this kind of response is received if hash was identical. And server does not need to change anything + if message == {}: + return + + self.topology_cache.rewrite_cache(message['clusters']) + self.topology_cache.hash = message['hash'] def get_handled_path(self): return Constants.METADATA_TOPIC \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py index 61e89bd..19a1d32 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py @@ -40,7 +40,23 @@ class TopologyEventListener(EventListener): @param headers: headers dictionary @param message: message payload dictionary """ - self.topology_cache.update_cache(message) + # this kind of response is received if hash was identical. And server does not need to change anything + if message == {}: + return + + event_type = message['eventType'] + + if event_type == 'CREATE': + self.topology_cache.rewrite_cache(message['clusters']) + self.topology_cache.hash = message['hash'] + elif event_type == 'UPDATE': + self.topology_cache.cache_update(message['clusters']) + self.topology_cache.hash = message['hash'] + elif event_type == 'DELETE': + self.topology_cache.cache_delete(message['clusters']) + self.topology_cache.hash = message['hash'] + else: + logger.error("Unknown event type '{0}' for topology event") def get_handled_path(self): return Constants.TOPOLOGIES_TOPIC \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py index f05f8da..ddc5900 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py @@ -43,7 +43,7 @@ class EventListener(ambari_stomp.ConnectionListener): try: message_json = json.loads(message) except ValueError: - logger.exception("Received event from server does not a valid json as a message. Message is:\n{0}".format(message)) + logger.exception("Received from server event is not a valid message json. Message is:\n{0}".format(message)) return logger.info("Event from server at {0}{1}".format(destination, self.get_log_message(headers, message_json))) http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/security.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py index df45699..ca90f0e 100644 --- a/ambari-agent/src/main/python/ambari_agent/security.py +++ b/ambari-agent/src/main/python/ambari_agent/security.py @@ -30,6 +30,7 @@ import traceback import hostname import platform import ambari_stomp +import threading from ambari_stomp.adapter.websocket import WsConnection logger = logging.getLogger(__name__) @@ -103,11 +104,13 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection): class AmbariStompConnection(WsConnection): def __init__(self, url): + self.lock = threading.RLock() self.correlation_id = -1 WsConnection.__init__(self, url) def send(self, destination, message, content_type=None, headers=None, **keyword_headers): - self.correlation_id += 1 + with self.lock: + self.correlation_id += 1 logger.info("Event to server at {0} (correlation_id={1}): {2}".format(destination, self.correlation_id, message)) http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py index 7380727..87417fc 100644 --- a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py +++ b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py @@ -124,8 +124,14 @@ class BaseStompServerTestCase(unittest.TestCase): def get_json(self, filename): filepath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "dummy_files", "stomp", filename) - with open(filepath) as f: - return f.read() + with open(filepath) as fp: + return fp.read() + + def get_dict_from_file(self, filename): + filepath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "dummy_files", "stomp", filename) + + with open(filepath) as fp: + return json.load(fp) def init_stdout_logger(self): format='%(levelname)s %(asctime)s - %(message)s' @@ -260,11 +266,13 @@ class TestStompClient(object): class TestCaseTcpConnection(ambari_stomp.Connection): def __init__(self, url): + self.lock = threading.RLock() self.correlation_id = -1 ambari_stomp.Connection.__init__(self, host_and_ports=[('127.0.0.1', 21613)]) def send(self, destination, message, content_type=None, headers=None, **keyword_headers): - self.correlation_id += 1 + with self.lock: + self.correlation_id += 1 logger.info("Event to server at {0} (correlation_id={1}): {2}".format(destination, self.correlation_id, message)) http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py index 1f3a6e7..87c7f57 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py @@ -36,6 +36,7 @@ from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator from mock.mock import MagicMock, patch class TestAgentStompResponses(BaseStompServerTestCase): + """ @patch.object(CustomServiceOrchestrator, "runCommand") def test_mock_server_can_start(self, runCommand_mock): runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 'structuredOut' : '{}', 'exitcode':1} @@ -68,7 +69,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): # response to /initial_topology - f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body=self.get_json("topology_update.json")) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body=self.get_json("topology_create.json")) self.server.topic_manager.send(f) f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body=self.get_json("metadata_after_registration.json")) @@ -108,7 +109,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): command_status_reporter.join() action_queue.join() - self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostname'], 'c6401.ambari.apache.org') + self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostName'], 'c6401.ambari.apache.org') self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'], ('STATUS',)) self.assertEquals(initializer_module.configurations_cache['0']['configurations']['zoo.cfg']['clientPort'], '2181') self.assertEquals(dn_start_in_progress_frame[0]['roleCommand'], 'START') @@ -116,10 +117,10 @@ class TestAgentStompResponses(BaseStompServerTestCase): self.assertEquals(dn_start_in_progress_frame[0]['status'], 'IN_PROGRESS') self.assertEquals(dn_start_failed_frame[0]['status'], 'FAILED') - """ + ============================================================================================ ============================================================================================ - """ + initializer_module = InitializerModule() self.server.frames_queue.queue.clear() @@ -168,4 +169,76 @@ class TestAgentStompResponses(BaseStompServerTestCase): heartbeat_thread.join() component_status_executor.join() command_status_reporter.join() - action_queue.join() \ No newline at end of file + action_queue.join() + """ + + @patch.object(CustomServiceOrchestrator, "runCommand") + def test_topology_update_and_delete(self, runCommand_mock): + runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 'structuredOut' : '{}', 'exitcode':1} + + self.remove_files(['/tmp/cluster_cache/configurations.json', '/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json']) + + if not os.path.exists("/tmp/ambari-agent"): + os.mkdir("/tmp/ambari-agent") + + initializer_module = InitializerModule() + heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module) + heartbeat_thread.start() + + connect_frame = self.server.frames_queue.get() + users_subscribe_frame = self.server.frames_queue.get() + registration_frame = self.server.frames_queue.get() + + # server sends registration response + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '0'}, body=self.get_json("registration_response.json")) + self.server.topic_manager.send(f) + + + # response to /initial_topology + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body=self.get_json("topology_create.json")) + self.server.topic_manager.send(f) + + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '2'}, body='{}') + self.server.topic_manager.send(f) + + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '3'}, body='{}') + self.server.topic_manager.send(f) + + initial_topology_request = self.server.frames_queue.get() + initial_metadata_request = self.server.frames_queue.get() + initial_configs_request = self.server.frames_queue.get() + + while not initializer_module.is_registered: + time.sleep(0.1) + + f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_add_component.json")) + self.server.topic_manager.send(f) + + f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_add_component_host.json")) + self.server.topic_manager.send(f) + + f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_add_host.json")) + self.server.topic_manager.send(f) + + f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_delete_host.json")) + self.server.topic_manager.send(f) + + f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_delete_component.json")) + self.server.topic_manager.send(f) + + f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_delete_component_host.json")) + self.server.topic_manager.send(f) + + f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, body=self.get_json("topology_delete_cluster.json")) + self.server.topic_manager.send(f) + + time.sleep(0.1) + self.assertEquals(json.dumps(initializer_module.topology_cache, indent=2, sort_keys=True), json.dumps(self.get_dict_from_file("topology_cache_expected.json"), indent=2, sort_keys=True)) + #self.assertEquals(initializer_module.topology_cache, self.get_dict_from_file("topology_cache_expected.json")) + + initializer_module.stop_event.set() + + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'heartbeat-response':'true'})) + self.server.topic_manager.send(f) + + heartbeat_thread.join() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json index cc1dda0..c415c7d 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json @@ -1,26 +1,29 @@ { - "0":{ - "configurations":{ - "zookeeper-env":{ - "zk_user":"zookeeper", - "zk_log_dir":"/var/log/zookeeper", - "content":"\nexport JAVA_HOME={{java64_home}}\nexport ZOO_LOG_DIR={{zk_log_dir}}\nexport ZOOPIDFILE={{zk_pid_file}}\nexport SERVER_JVMFLAGS={{zk_server_heapsize}}\nexport JAVA=$JAVA_HOME/bin/java\nexport CLASSPATH=$CLASSPATH:/usr/share/zookeeper/*\n\n{% if security_enabled %}\nexport SERVER_JVMFLAGS=\"$SERVER_JVMFLAGS -Djava.security.auth.login.config={{zk_server_jaas_file}}\"\nexport CLIENT_JVMFLAGS=\"$CLIENT_JVMFLAGS -Djava.security.auth.login.config={{zk_client_jaas_file}}\"\n{% endif %}", - "zk_pid_dir":"/var/run/zookeeper", - "zookeeper_principal_name":"zookeeper/[email protected]", - "zookeeper_keytab_path":"/etc/security/keytabs/zk.service.keytab" + "hash": "a1a71f4b46feaf72bf33627d78bbdc3e", + "clusters": { + "0":{ + "configurations":{ + "zookeeper-env":{ + "zk_user":"zookeeper", + "zk_log_dir":"/var/log/zookeeper", + "content":"\nexport JAVA_HOME={{java64_home}}\nexport ZOO_LOG_DIR={{zk_log_dir}}\nexport ZOOPIDFILE={{zk_pid_file}}\nexport SERVER_JVMFLAGS={{zk_server_heapsize}}\nexport JAVA=$JAVA_HOME/bin/java\nexport CLASSPATH=$CLASSPATH:/usr/share/zookeeper/*\n\n{% if security_enabled %}\nexport SERVER_JVMFLAGS=\"$SERVER_JVMFLAGS -Djava.security.auth.login.config={{zk_server_jaas_file}}\"\nexport CLIENT_JVMFLAGS=\"$CLIENT_JVMFLAGS -Djava.security.auth.login.config={{zk_client_jaas_file}}\"\n{% endif %}", + "zk_pid_dir":"/var/run/zookeeper", + "zookeeper_principal_name":"zookeeper/[email protected]", + "zookeeper_keytab_path":"/etc/security/keytabs/zk.service.keytab" + }, + "zoo.cfg":{ + "clientPort":"2181", + "syncLimit":"5", + "initLimit":"10", + "dataDir":"/hadoop/zookeeper", + "tickTime":"2000" + } }, - "zoo.cfg":{ - "clientPort":"2181", - "syncLimit":"5", - "initLimit":"10", - "dataDir":"/hadoop/zookeeper", - "tickTime":"2000" - } - }, - "configurationAttributes":{ - "core-site":{ - "final":{ - "fs.defaultFS":"true" + "configurationAttributes":{ + "core-site":{ + "final":{ + "fs.defaultFS":"true" + } } } } http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json index 14d7c04..0dc5aff 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json @@ -1,33 +1,185 @@ { - "0":{ - "status_commands_to_run":[ - "STATUS" - ], - "status_commands_timeout":"900", - "hooks_folder":"HDP/2.0.6/hooks", - "credentialStoreEnabled":"false", - "availableServices":{ - "SQOOP":"1.4.6.2.5", - "AMBARI_METRICS":"0.1.0", - "KERBEROS":"1.10.3-10", - "RANGER":"0.6.0.2.5", - "ZEPPELIN":"0.6.0.2.5", - "HDFS":"2.7.3.2.6", - "ZOOKEEPER":"3.4.6" - }, - "agentConfigParams":{ - "agent":{ - "parallel_execution":0, - "use_system_proxy_settings":true - } - }, - "recoveryConfig":{ - "type":"DEFAULT|AUTO_START|AUTO_INSTALL_START|FULL", - "maxCount":10, - "windowInMinutes":60, - "retryGap":0, - "components":"ZOOKEEPER_CLIENT,ZOOKEEPER_SERVER", - "recoveryTimestamp":1458150424380 + "hash": "c2bea6695221368416b2412fec2ba0d7", + "clusters": { + "0": { + "serviceSpecifics": { + "GANGLIA": { + "version": "3.5.0", + "credentialStoreEnabled": false, + "status_commands_timeout": null + }, + "DRUID": { + "version": "0.9.2", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "TEZ": { + "version": "0.7.0", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "SPARK": { + "version": "1.6.x", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "HBASE": { + "version": "1.1.2", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "RANGER_KMS": { + "version": "0.7.0", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "ATLAS": { + "version": "0.8.0", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "HIVE": { + "version": "1.2.1000", + "credentialStoreEnabled": true, + "status_commands_timeout": null + }, + "SLIDER": { + "version": "0.92.0", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "AMBARI_INFRA": { + "version": "0.1.0", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "FLUME": { + "version": "1.5.2", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "MAHOUT": { + "version": "0.9.0", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "SQOOP": { + "version": "1.4.6", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "OOZIE": { + "version": "4.2.0", + "credentialStoreEnabled": true, + "status_commands_timeout": 300 + }, + "HDFS": { + "version": "2.7.3", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "MAPREDUCE2": { + "version": "2.7.3", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "ACCUMULO": { + "version": "1.7.0", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "ZOOKEEPER": { + "version": "3.4.6", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "YARN": { + "version": "2.7.3", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "KERBEROS": { + "version": "1.10.3-10", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "KNOX": { + "version": "0.12.0", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "PIG": { + "version": "0.16.0", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "STORM": { + "version": "1.1.0", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "RANGER": { + "version": "0.7.0", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "AMBARI_METRICS": { + "version": "0.1.0", + "credentialStoreEnabled": false, + "status_commands_timeout": 600 + }, + "ZEPPELIN": { + "version": "0.7.0", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "KAFKA": { + "version": "0.10.1", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "LOGSEARCH": { + "version": "0.5.0", + "credentialStoreEnabled": true, + "status_commands_timeout": 300 + }, + "FALCON": { + "version": "0.10.0", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + }, + "SPARK2": { + "version": "2.x", + "credentialStoreEnabled": false, + "status_commands_timeout": 300 + } + }, + "clusterLevelParams": { + "host_sys_prepped": "false", + "java_home": null, + "agent_stack_retry_count": "5", + "jdk_location": "http://c6401.ambari.apache.org:8080/resources/", + "jdk_name": null, + "stack_version": "2.6", + "user_list": "[\"accumulo\",\"zookeeper\",\"ams\",\"ambari-qa\",\"hdfs\",\"yarn\",\"mapred\"]", + "mysql_jdbc_url": "http://dvitiiuk-System-Product-Name:8080/resources//mysql-connector-java.jar", + "oracle_jdbc_url": "http://dvitiiuk-System-Product-Name:8080/resources//ojdbc6.jar", + "ambari_db_rca_password": "mapred", + "jce_name": null, + "group_list": "[\"hadoop\",\"users\"]", + "db_name": "ambari", + "ambari_db_rca_driver": "org.postgresql.Driver", + "ambari_db_rca_username": "mapred", + "java_version": "8", + "not_managed_hdfs_path_list": "[\"/mr-history/done\",\"/app-logs\",\"/tmp\"]", + "db_driver_filename": "mysql-connector-java.jar", + "stack_name": "HDP", + "ambari_db_rca_url": "jdbc:postgresql://c6401.ambari.apache.org/ambarirca", + "agent_stack_retry_on_unavailability": "false", + "user_groups": "{}" + }, + "status_commands_to_run": ["STATUS"], + "hooks_folder": "HDP/2.0.6/hooks" + } } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json new file mode 100644 index 0000000..2c37111 --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json @@ -0,0 +1,23 @@ +{ + "eventType":"UPDATE", + "hash":"33db7b2f65011d475a08bff4810a9965", + "clusters":{ + "0":{ + "components":[ + { + "componentName":"SECONDARY_NAMENODE", + "serviceName":"HDFS", + "version":"2.6.0.3-8", + "hostIds":[ + 0, + 1 + ], + "statusCommandParams":{ + "script":"scripts/snamenode.py", + "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package" + } + } + ] + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component_host.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component_host.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component_host.json new file mode 100644 index 0000000..be13c93 --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component_host.json @@ -0,0 +1,18 @@ +{ + "eventType":"UPDATE", + "hash":"b313cdebc1a5e36dcad76affb747858b", + "clusters":{ + "0":{ + "components":[ + { + "componentName":"HDFS_CLIENT", + "serviceName":"HDFS", + "version":"2.6.0.3-8", + "hostIds":[ + 1 + ] + } + ] + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json new file mode 100644 index 0000000..a9407c3 --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json @@ -0,0 +1,16 @@ +{ + "eventType":"UPDATE", + "hash":"0b01b81018fe39309d75e7b12a8225ec", + "clusters":{ + "0":{ + "hosts":[ + { + "hostId":2, + "hostName":"c6403.ambari.apache.org", + "rackName":"/default-rack", + "ipv4":"192.168.64.103" + } + ] + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json new file mode 100644 index 0000000..9894420 --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json @@ -0,0 +1,58 @@ +{ + "0": { + "hosts": [ + { + "rackName": "/default-rack", + "hostName": "c6402.ambari.apache.org", + "ipv4": "192.168.64.102", + "hostId": 1 + }, + { + "rackName": "/default-rack", + "hostName": "c6403.ambari.apache.org", + "ipv4": "192.168.64.103", + "hostId": 2 + } + ], + "components": [ + { + "statusCommandParams": { + "script": "scripts/datanode.py", + "servicePackageFolder": "common-services/HDFS/2.1.0.2.0/package" + }, + "componentName": "DATANODE", + "serviceName": "HDFS", + "version": "2.6.0.3-8", + "hostIds": [ + 1 + ] + }, + { + "statusCommandParams": { + "script": "scripts/hdfs_client.py", + "servicePackageFolder": "common-services/HDFS/2.1.0.2.0/package" + }, + "componentName": "HDFS_CLIENT", + "version": "2.6.0.3-8", + "serviceName": "HDFS", + "hostIds": [ + 0, + 1 + ] + }, + { + "statusCommandParams": { + "script": "scripts/snamenode.py", + "servicePackageFolder": "common-services/HDFS/2.1.0.2.0/package" + }, + "componentName": "SECONDARY_NAMENODE", + "serviceName": "HDFS", + "version": "2.6.0.3-8", + "hostIds": [ + 0, + 1 + ] + } + ] + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json new file mode 100644 index 0000000..cf1afa7 --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json @@ -0,0 +1,85 @@ +{ + "eventType":"CREATE", + "hash":"1a49a0f19639c5c2528220bfc5ad0778", + "clusters":{ + "0":{ + "components":[ + { + "componentName":"NAMENODE", + "serviceName":"HDFS", + "version":"2.6.0.3-8", + "hostIds":[ + 0 + ], + "statusCommandParams":{ + "script":"scripts/namenode.py", + "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package" + } + }, + { + "componentName":"DATANODE", + "serviceName":"HDFS", + "version":"2.6.0.3-8", + "hostIds":[ + 0, + 1 + ], + "statusCommandParams":{ + "script":"scripts/datanode.py", + "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package" + } + }, + { + "componentName":"HDFS_CLIENT", + "serviceName":"HDFS", + "version":"2.6.0.3-8", + "hostIds":[ + 0 + ], + "statusCommandParams":{ + "script":"scripts/hdfs_client.py", + "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package" + } + } + ], + "hosts":[ + { + "hostId":0, + "hostName":"c6401.ambari.apache.org", + "rackName":"/default-rack", + "ipv4":"192.168.64.101" + }, + { + "hostId":1, + "hostName":"c6402.ambari.apache.org", + "rackName":"/default-rack", + "ipv4":"192.168.64.102" + } + ] + }, + "1": { + "components":[ + { + "componentName":"NAMENODE", + "serviceName":"HDFS", + "version":"2.6.0.3-8", + "hostIds":[ + 0 + ], + "statusCommandParams":{ + "script":"scripts/namenode.py", + "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package" + } + } + ], + "hosts":[ + { + "hostId":0, + "hostName":"c6401.ambari.apache.org", + "rackName":"/default-rack", + "ipv4":"192.168.64.101" + } + ] + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_cluster.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_cluster.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_cluster.json new file mode 100644 index 0000000..12c75a8 --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_cluster.json @@ -0,0 +1,8 @@ +{ + "eventType":"DELETE", + "hash":"686bca74a23019ce8865d759beeccb0a", + "clusters":{ + "1":{ + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component.json new file mode 100644 index 0000000..19e5418 --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component.json @@ -0,0 +1,19 @@ +{ + "eventType":"DELETE", + "hash":"26370af7e07ff89c3949537aa902b3da", + "clusters":{ + "0":{ + "components":[ + { + "componentName":"NAMENODE", + "serviceName":"HDFS", + "version":"2.6.0.3-8", + "hostIds":[ + 0, + 1 + ] + } + ] + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component_host.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component_host.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component_host.json new file mode 100644 index 0000000..0c137b8 --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component_host.json @@ -0,0 +1,18 @@ +{ + "eventType":"DELETE", + "hash":"591930ef5e0eb5d5437f815e73673e4b", + "clusters":{ + "0":{ + "components":[ + { + "componentName":"DATANODE", + "serviceName":"HDFS", + "version":"2.6.0.3-8", + "hostIds":[ + 0 + ] + } + ] + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_host.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_host.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_host.json new file mode 100644 index 0000000..5f5d68a --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_host.json @@ -0,0 +1,13 @@ +{ + "eventType":"DELETE", + "hash":"686bca74a23019ce8865d759beeccb0a", + "clusters":{ + "0":{ + "hosts":[ + { + "hostId":0 + } + ] + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_update.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_update.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_update.json deleted file mode 100644 index b403e4d..0000000 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_update.json +++ /dev/null @@ -1,69 +0,0 @@ -{ - "0":{ - "components":[ - { - "serviceName":"ZOOKEEPER", - "componentName":"ZOOKEEPER_SERVER", - "hosts":[ - 0, - 1, - 4 - ], - "statusCommandsParams":{ - "script":"scripts/zookeeper_server.py", - "service_package_folder":"common-services/ZOOKEEPER/3.4.5/package" - } - }, - { - "serviceName":"ZOOKEEPER", - "componentName":"ZOOKEEPER_CLIENT", - "hosts":[ - 0, - 1, - 2, - 3, - 4 - ], - "statusCommandsParams":{ - "script":"scripts/zookeeper_client.py", - "service_package_folder":"common-services/ZOOKEEPER/3.4.5/package" - } - } - ], - "hosts":[ - { - "hostname":"c6401.ambari.apache.org", - "rack_id":0, - "ipv4_ip":"10.240.0.240" - }, - { - "hostname":"c6402.ambari.apache.org", - "rack_id":1, - "ipv4_ip":"10.240.0.241" - }, - { - "hostname":"c6403.ambari.apache.org", - "rack_id":0, - "ipv4_ip":"10.240.0.242" - }, - { - "hostname":"c6404.ambari.apache.org", - "rack_id":0, - "ipv4_ip":"10.240.0.243" - }, - { - "hostname":"c6405.ambari.apache.org", - "rack_id":1, - "ipv4_ip":"10.240.0.244" - } - ], - "racks":[ - { - "name":"/default-rack" - }, - { - "name":"/another-rack" - } - ] - } -} \ No newline at end of file
