Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf 6bad191bb -> 6303e9358


AMBARI-21140. Support CREATE/UPDATE/DELETE of topology, hashes and some events 
format changes (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/6303e935
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/6303e935
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/6303e935

Branch: refs/heads/branch-3.0-perf
Commit: 6303e9358db26640ab2c6161111f2d7f527c965c
Parents: 6bad191
Author: Andrew Onishuk <[email protected]>
Authored: Tue May 30 10:12:35 2017 +0300
Committer: Andrew Onishuk <[email protected]>
Committed: Tue May 30 10:12:35 2017 +0300

----------------------------------------------------------------------
 .../main/python/ambari_agent/ClusterCache.py    |  42 ++--
 .../python/ambari_agent/ClusterTopologyCache.py |  85 ++++++++
 .../ambari_agent/ComponentStatusExecutor.py     |   2 +-
 .../main/python/ambari_agent/HeartbeatThread.py |   3 +-
 .../src/main/python/ambari_agent/Utils.py       |  19 +-
 .../listeners/ConfigurationEventListener.py     |   7 +-
 .../listeners/MetadataEventListener.py          |   9 +-
 .../listeners/TopologyEventListener.py          |  18 +-
 .../python/ambari_agent/listeners/__init__.py   |   2 +-
 .../src/main/python/ambari_agent/security.py    |   5 +-
 .../ambari_agent/BaseStompServerTestCase.py     |  14 +-
 .../ambari_agent/TestAgentStompResponses.py     |  83 +++++++-
 .../stomp/configurations_update.json            |  45 ++--
 .../stomp/metadata_after_registration.json      | 212 ++++++++++++++++---
 .../stomp/topology_add_component.json           |  23 ++
 .../stomp/topology_add_component_host.json      |  18 ++
 .../dummy_files/stomp/topology_add_host.json    |  16 ++
 .../stomp/topology_cache_expected.json          |  58 +++++
 .../dummy_files/stomp/topology_create.json      |  85 ++++++++
 .../stomp/topology_delete_cluster.json          |   8 +
 .../stomp/topology_delete_component.json        |  19 ++
 .../stomp/topology_delete_component_host.json   |  18 ++
 .../dummy_files/stomp/topology_delete_host.json |  13 ++
 .../dummy_files/stomp/topology_update.json      |  69 ------
 24 files changed, 714 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py 
b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
index 4b88f71..8e91afe 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py
@@ -18,7 +18,6 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 """
 
-import hashlib
 import logging
 import ambari_simplejson as json
 import os
@@ -45,6 +44,7 @@ class ClusterCache(dict):
     """
 
     self.cluster_cache_dir = cluster_cache_dir
+    self.hash = None
 
     self.__current_cache_json_file = os.path.join(self.cluster_cache_dir, 
self.get_cache_name()+'.json')
 
@@ -67,19 +67,29 @@ class ClusterCache(dict):
   def get_cluster_ids(self):
     return self.keys()
 
-  def update_cache(self, cache):
+  def rewrite_cache(self, cache):
+    cache_ids_to_delete = []
+    for existing_cluster_id in self:
+      if not existing_cluster_id in cache:
+        cache_ids_to_delete.append(existing_cluster_id)
+
     for cluster_id, cluster_cache in cache.iteritems():
-      self.update_cluster_cache(cluster_id, cluster_cache)
+      self.rewrite_cluster_cache(cluster_id, cluster_cache)
+
+    with self._cache_lock:
+      for cache_id_to_delete in cache_ids_to_delete:
+        del self[cache_id_to_delete]
 
-  def update_cluster_cache(self, cluster_id, cache):
+
+  def rewrite_cluster_cache(self, cluster_id, cache):
     """
     Thread-safe method for writing out the specified cluster cache
-    and updating the in-memory representation.
+    and rewriting the in-memory representation.
     :param cluster_id:
     :param cache:
     :return:
     """
-    logger.info("Updating cache {0} for cluster 
{1}".format(self.__class__.__name__, cluster_id))
+    logger.info("Rewriting cache {0} for cluster 
{1}".format(self.__class__.__name__, cluster_id))
 
     # The cache should contain exactly the data received from server.
     # Modifications on agent-side will lead to unnecessary cache sync every 
agent registration. Which is a big concern on perf clusters!
@@ -88,7 +98,9 @@ class ClusterCache(dict):
     with self._cache_lock:
       self[cluster_id] = immutable_cache
 
+    self.persist_cache()
 
+  def persist_cache(self):
     # ensure that our cache directory exists
     if not os.path.exists(self.cluster_cache_dir):
       os.makedirs(self.cluster_cache_dir)
@@ -97,24 +109,10 @@ class ClusterCache(dict):
       with os.fdopen(os.open(self.__current_cache_json_file, os.O_WRONLY | 
os.O_CREAT, 0o600), "w") as f:
         json.dump(self, f, indent=2)
 
-  def get_md5_hashsum(self):
-    """
-    Thread-safe method for writing out the specified cluster cache
-    and updating the in-memory representation.
-    :param cache:
-    :return:
-    """
+  def _get_mutable_copy(self):
     with self._cache_lock:
-      # have to make sure server generates json in exactly the same way. So 
hashes are equal
-      json_repr = json.dumps(self, sort_keys=True)
-
-    md5_calculator = hashlib.md5()
-    md5_calculator.update(json_repr)
-    result = md5_calculator.hexdigest()
-
-    logger.info("Cache value for {0} is {1}".format(self.__class__.__name__, 
result))
+      return Utils.get_mutable_copy(self)
 
-    return result
 
   def get_cache_name(self):
     raise NotImplemented()

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py 
b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
index 0cfe5a4..1102cd1 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
@@ -40,3 +40,88 @@ class ClusterTopologyCache(ClusterCache):
 
   def get_cache_name(self):
     return 'topology'
+
+  @staticmethod
+  def find_host_by_id(host_dicts, cluster_id, host_id):
+    for host_dict in host_dicts:
+      if host_dict['hostId'] == host_id:
+        return host_dict
+    return None
+
+  @staticmethod
+  def find_component(component_dicts, cluster_id, service_name, 
component_name):
+    for component_dict in component_dicts:
+      if component_dict['serviceName'] == service_name and 
component_dict['componentName'] == component_name:
+        return component_dict
+    return None
+
+  def cache_update(self, cache_update):
+    mutable_dict = self._get_mutable_copy()
+
+    for cluster_id, cluster_updates_dict in cache_update.iteritems():
+      if not cluster_id in mutable_dict:
+        logger.error("Cannot do topology update for cluster cluster_id={0}, 
because do not have information about the cluster")
+        continue
+
+      if 'hosts' in cluster_updates_dict:
+        hosts_mutable_list = mutable_dict[cluster_id]['hosts']
+        for host_updates_dict in cluster_updates_dict['hosts']:
+          host_mutable_dict = 
ClusterTopologyCache.find_host_by_id(hosts_mutable_list, cluster_id, 
host_updates_dict['hostId'])
+          if host_mutable_dict is not None:
+            host_mutable_dict.update(host_updates_dict)
+          else:
+            hosts_mutable_list.append(host_updates_dict)
+
+      if 'components' in cluster_updates_dict:
+        components_mutable_list = mutable_dict[cluster_id]['components']
+        for component_updates_dict in cluster_updates_dict['components']:
+          component_mutable_dict = 
ClusterTopologyCache.find_component(components_mutable_list, cluster_id, 
component_updates_dict['serviceName'], component_updates_dict['componentName'])
+          if component_mutable_dict is not None:
+            component_updates_dict['hostIds'] += 
component_mutable_dict['hostIds']
+            component_updates_dict['hostIds'] = 
list(set(component_updates_dict['hostIds']))
+            component_mutable_dict.update(component_updates_dict)
+          else:
+            components_mutable_list.append(component_updates_dict)
+
+    self.rewrite_cache(mutable_dict)
+
+  def cache_delete(self, cache_update):
+    mutable_dict = self._get_mutable_copy()
+    clusters_ids_to_delete = []
+
+    for cluster_id, cluster_updates_dict in cache_update.iteritems():
+      if not cluster_id in mutable_dict:
+        logger.error("Cannot do topology delete for cluster cluster_id={0}, 
because do not have information about the cluster")
+        continue
+
+      if 'hosts' in cluster_updates_dict:
+        hosts_mutable_list = mutable_dict[cluster_id]['hosts']
+        for host_updates_dict in cluster_updates_dict['hosts']:
+          host_to_delete = 
ClusterTopologyCache.find_host_by_id(hosts_mutable_list, cluster_id, 
host_updates_dict['hostId'])
+          if host_to_delete is not None:
+            mutable_dict[cluster_id]['hosts'] = [host_dict for host_dict in 
hosts_mutable_list if host_dict != host_to_delete]
+          else:
+            logger.error("Cannot do topology delete for cluster_id={0}, 
host_id={1}, because cannot find the host in cache".format(cluster_id, 
host_updates_dict['hostId']))
+
+      if 'components' in cluster_updates_dict:
+        components_mutable_list = mutable_dict[cluster_id]['components']
+        for component_updates_dict in cluster_updates_dict['components']:
+          component_mutable_dict = 
ClusterTopologyCache.find_component(components_mutable_list, cluster_id, 
component_updates_dict['serviceName'], component_updates_dict['componentName'])
+          if 'hostIds' in component_mutable_dict:
+            exclude_host_ids = component_updates_dict['hostIds']
+            component_mutable_dict['hostIds'] = [host_id for host_id in 
component_mutable_dict['hostIds'] if host_id not in exclude_host_ids]
+          if not 'hostIds' in component_mutable_dict or 
component_mutable_dict['hostIds'] == []:
+            if component_mutable_dict is not None:
+              mutable_dict[cluster_id]['components'] = [component_dict for 
component_dict in components_mutable_list if component_dict != 
component_mutable_dict]
+            else:
+              logger.error("Cannot do component delete for cluster_id={0}, 
serviceName={1}, componentName={2}, because cannot find the host in 
cache".format(cluster_id, component_updates_dict['serviceName'], 
component_updates_dict['componentName']))
+
+      if cluster_updates_dict == {}:
+        clusters_ids_to_delete.append(cluster_id)
+
+    for cluster_id in clusters_ids_to_delete:
+      del mutable_dict[cluster_id]
+
+    self.rewrite_cache(mutable_dict)
+
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py 
b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index 6783138..41f0df4 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -72,7 +72,7 @@ class ComponentStatusExecutor(threading.Thread):
               component_name = component_dict.componentName
 
               # TODO STOMP: run real command
-              logger.info("Running 
{0}/{1}".format(component_dict.statusCommandsParams.service_package_folder, 
component_dict.statusCommandsParams.script))
+              logger.info("Running 
{0}/{1}".format(component_dict.statusCommandParams.servicePackageFolder, 
component_dict.statusCommandParams.script))
               #self.customServiceOrchestrator.requestComponentStatus(command)
               status = random.choice(["INSTALLED","STARTED"])
               result = {

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py 
b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index e88fee7..b7bb5ed 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -78,6 +78,7 @@ class HeartbeatThread(threading.Thread):
         # TODO STOMP: handle heartbeat reponse
       except:
         logger.exception("Exception in HeartbeatThread. Re-running the 
registration")
+        self.stop_event.wait(self.heartbeat_interval)
         self.initializer_module.is_registered = False
         self.initializer_module.connection.disconnect()
         pass
@@ -104,7 +105,7 @@ class HeartbeatThread(threading.Thread):
     self.registration_response = response
 
     for endpoint, cache, listener in self.post_registration_requests:
-      response = self.blocking_request({'hash': cache.get_md5_hashsum()}, 
endpoint)
+      response = self.blocking_request({'hash': cache.hash}, endpoint)
       listener.on_event({}, response)
 
     self.subscribe_to_topics(Constants.POST_REGISTRATION_TOPICS_TO_SUBSCRIBE)

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/Utils.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py 
b/ambari-agent/src/main/python/ambari_agent/Utils.py
index e48aa5f..f94eba0 100644
--- a/ambari-agent/src/main/python/ambari_agent/Utils.py
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -43,8 +43,9 @@ class BlockingDictionary():
     """
     Block until a key in dictionary is available and than pop it.
     """
-    if key in self.dict:
-      return self.dict.pop(key)
+    with self.dict_lock:
+      if key in self.dict:
+        return self.dict.pop(key)
 
     while True:
       self.put_event.wait()
@@ -69,6 +70,20 @@ class Utils(object):
 
     return value
 
+  @staticmethod
+  def get_mutable_copy(param):
+    if isinstance(param, dict):
+      mutable_dict = {}
+
+      for k, v in param.iteritems():
+        mutable_dict[k] = Utils.get_mutable_copy(v)
+
+      return mutable_dict
+    elif isinstance(param, (list, tuple)):
+      return [Utils.get_mutable_copy(x) for x in param]
+
+    return param
+
 class ImmutableDictionary(dict):
   def __init__(self, dictionary):
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
 
b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
index 20b42e6..e32c503 100644
--- 
a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
@@ -40,7 +40,12 @@ class ConfigurationEventListener(EventListener):
     @param headers: headers dictionary
     @param message: message payload dictionary
     """
-    self.configuration_cache.update_cache(message)
+    # this kind of response is received if hash was identical. And server does 
not need to change anything
+    if message == {}:
+      return
+
+    self.configuration_cache.rewrite_cache(message['clusters'])
+    self.configuration_cache.hash = message['hash']
 
   def get_handled_path(self):
     return Constants.CONFIGURATIONS_TOPIC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py 
b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
index c738ea2..788d381 100644
--- 
a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
@@ -26,6 +26,8 @@ from ambari_agent import Constants
 
 logger = logging.getLogger(__name__)
 
+METADATA_DICTIONARY_KEY = 'metadataClusters'
+
 class MetadataEventListener(EventListener):
   """
   Listener of Constants.METADATA_TOPIC events from server.
@@ -40,7 +42,12 @@ class MetadataEventListener(EventListener):
     @param headers: headers dictionary
     @param message: message payload dictionary
     """
-    self.topology_cache.update_cache(message)
+    # this kind of response is received if hash was identical. And server does 
not need to change anything
+    if message == {}:
+      return
+
+    self.topology_cache.rewrite_cache(message['clusters'])
+    self.topology_cache.hash = message['hash']
 
   def get_handled_path(self):
     return Constants.METADATA_TOPIC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py 
b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
index 61e89bd..19a1d32 100644
--- 
a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
@@ -40,7 +40,23 @@ class TopologyEventListener(EventListener):
     @param headers: headers dictionary
     @param message: message payload dictionary
     """
-    self.topology_cache.update_cache(message)
+    # this kind of response is received if hash was identical. And server does 
not need to change anything
+    if message == {}:
+      return
+
+    event_type = message['eventType']
+
+    if event_type == 'CREATE':
+      self.topology_cache.rewrite_cache(message['clusters'])
+      self.topology_cache.hash = message['hash']
+    elif event_type == 'UPDATE':
+      self.topology_cache.cache_update(message['clusters'])
+      self.topology_cache.hash = message['hash']
+    elif event_type == 'DELETE':
+      self.topology_cache.cache_delete(message['clusters'])
+      self.topology_cache.hash = message['hash']
+    else:
+      logger.error("Unknown event type '{0}' for topology event")
 
   def get_handled_path(self):
     return Constants.TOPOLOGIES_TOPIC
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py 
b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
index f05f8da..ddc5900 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
@@ -43,7 +43,7 @@ class EventListener(ambari_stomp.ConnectionListener):
       try:
         message_json = json.loads(message)
       except ValueError:
-        logger.exception("Received event from server does not  a valid json as 
a message. Message is:\n{0}".format(message))
+        logger.exception("Received from server event is not a valid message 
json. Message is:\n{0}".format(message))
         return
 
       logger.info("Event from server at {0}{1}".format(destination, 
self.get_log_message(headers, message_json)))

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/main/python/ambari_agent/security.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/security.py 
b/ambari-agent/src/main/python/ambari_agent/security.py
index df45699..ca90f0e 100644
--- a/ambari-agent/src/main/python/ambari_agent/security.py
+++ b/ambari-agent/src/main/python/ambari_agent/security.py
@@ -30,6 +30,7 @@ import traceback
 import hostname
 import platform
 import ambari_stomp
+import threading
 from ambari_stomp.adapter.websocket import WsConnection
 
 logger = logging.getLogger(__name__)
@@ -103,11 +104,13 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection):
 
 class AmbariStompConnection(WsConnection):
   def __init__(self, url):
+    self.lock = threading.RLock()
     self.correlation_id = -1
     WsConnection.__init__(self, url)
 
   def send(self, destination, message, content_type=None, headers=None, 
**keyword_headers):
-    self.correlation_id += 1
+    with self.lock:
+      self.correlation_id += 1
 
     logger.info("Event to server at {0} (correlation_id={1}): 
{2}".format(destination, self.correlation_id, message))
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py 
b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
index 7380727..87417fc 100644
--- a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
+++ b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
@@ -124,8 +124,14 @@ class BaseStompServerTestCase(unittest.TestCase):
   def get_json(self, filename):
     filepath = os.path.join(os.path.abspath(os.path.dirname(__file__)), 
"dummy_files", "stomp", filename)
 
-    with open(filepath) as f:
-      return f.read()
+    with open(filepath) as fp:
+      return fp.read()
+
+  def get_dict_from_file(self, filename):
+    filepath = os.path.join(os.path.abspath(os.path.dirname(__file__)), 
"dummy_files", "stomp", filename)
+
+    with open(filepath) as fp:
+      return json.load(fp)
 
   def init_stdout_logger(self):
     format='%(levelname)s %(asctime)s - %(message)s'
@@ -260,11 +266,13 @@ class TestStompClient(object):
 
 class TestCaseTcpConnection(ambari_stomp.Connection):
   def __init__(self, url):
+    self.lock = threading.RLock()
     self.correlation_id = -1
     ambari_stomp.Connection.__init__(self, host_and_ports=[('127.0.0.1', 
21613)])
 
   def send(self, destination, message, content_type=None, headers=None, 
**keyword_headers):
-    self.correlation_id += 1
+    with self.lock:
+      self.correlation_id += 1
 
     logger.info("Event to server at {0} (correlation_id={1}): 
{2}".format(destination, self.correlation_id, message))
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py 
b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index 1f3a6e7..87c7f57 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -36,6 +36,7 @@ from ambari_agent.CustomServiceOrchestrator import 
CustomServiceOrchestrator
 from mock.mock import MagicMock, patch
 
 class TestAgentStompResponses(BaseStompServerTestCase):
+  """
   @patch.object(CustomServiceOrchestrator, "runCommand")
   def test_mock_server_can_start(self, runCommand_mock):
     runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 
'structuredOut' : '{}', 'exitcode':1}
@@ -68,7 +69,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
 
 
     # response to /initial_topology
-    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '1'}, body=self.get_json("topology_update.json"))
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '1'}, body=self.get_json("topology_create.json"))
     self.server.topic_manager.send(f)
 
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '2'}, body=self.get_json("metadata_after_registration.json"))
@@ -108,7 +109,7 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     command_status_reporter.join()
     action_queue.join()
 
-    
self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostname'],
 'c6401.ambari.apache.org')
+    
self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostName'],
 'c6401.ambari.apache.org')
     
self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'],
 ('STATUS',))
     
self.assertEquals(initializer_module.configurations_cache['0']['configurations']['zoo.cfg']['clientPort'],
 '2181')
     self.assertEquals(dn_start_in_progress_frame[0]['roleCommand'], 'START')
@@ -116,10 +117,10 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     self.assertEquals(dn_start_in_progress_frame[0]['status'], 'IN_PROGRESS')
     self.assertEquals(dn_start_failed_frame[0]['status'], 'FAILED')
 
-    """
+
     
============================================================================================
     
============================================================================================
-    """
+
 
     initializer_module = InitializerModule()
     self.server.frames_queue.queue.clear()
@@ -168,4 +169,76 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     heartbeat_thread.join()
     component_status_executor.join()
     command_status_reporter.join()
-    action_queue.join()
\ No newline at end of file
+    action_queue.join()
+    """
+
+  @patch.object(CustomServiceOrchestrator, "runCommand")
+  def test_topology_update_and_delete(self, runCommand_mock):
+    runCommand_mock.return_value = {'stdout':'...', 'stderr':'...', 
'structuredOut' : '{}', 'exitcode':1}
+
+    self.remove_files(['/tmp/cluster_cache/configurations.json', 
'/tmp/cluster_cache/metadata.json', '/tmp/cluster_cache/topology.json'])
+
+    if not os.path.exists("/tmp/ambari-agent"):
+      os.mkdir("/tmp/ambari-agent")
+
+    initializer_module = InitializerModule()
+    heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
+    heartbeat_thread.start()
+
+    connect_frame = self.server.frames_queue.get()
+    users_subscribe_frame = self.server.frames_queue.get()
+    registration_frame = self.server.frames_queue.get()
+
+    # server sends registration response
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '0'}, body=self.get_json("registration_response.json"))
+    self.server.topic_manager.send(f)
+
+
+    # response to /initial_topology
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '1'}, body=self.get_json("topology_create.json"))
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '2'}, body='{}')
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '3'}, body='{}')
+    self.server.topic_manager.send(f)
+
+    initial_topology_request = self.server.frames_queue.get()
+    initial_metadata_request = self.server.frames_queue.get()
+    initial_configs_request = self.server.frames_queue.get()
+
+    while not initializer_module.is_registered:
+      time.sleep(0.1)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, 
body=self.get_json("topology_add_component.json"))
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, 
body=self.get_json("topology_add_component_host.json"))
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, 
body=self.get_json("topology_add_host.json"))
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, 
body=self.get_json("topology_delete_host.json"))
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, 
body=self.get_json("topology_delete_component.json"))
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, 
body=self.get_json("topology_delete_component_host.json"))
+    self.server.topic_manager.send(f)
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/events/topology'}, 
body=self.get_json("topology_delete_cluster.json"))
+    self.server.topic_manager.send(f)
+
+    time.sleep(0.1)
+    self.assertEquals(json.dumps(initializer_module.topology_cache, indent=2, 
sort_keys=True), 
json.dumps(self.get_dict_from_file("topology_cache_expected.json"), indent=2, 
sort_keys=True))
+    #self.assertEquals(initializer_module.topology_cache, 
self.get_dict_from_file("topology_cache_expected.json"))
+
+    initializer_module.stop_event.set()
+
+    f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '4'}, body=json.dumps({'heartbeat-response':'true'}))
+    self.server.topic_manager.send(f)
+
+    heartbeat_thread.join()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
index cc1dda0..c415c7d 100644
--- 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/configurations_update.json
@@ -1,26 +1,29 @@
 {
-  "0":{
-    "configurations":{
-      "zookeeper-env":{
-        "zk_user":"zookeeper",
-        "zk_log_dir":"/var/log/zookeeper",
-        "content":"\nexport JAVA_HOME={{java64_home}}\nexport 
ZOO_LOG_DIR={{zk_log_dir}}\nexport ZOOPIDFILE={{zk_pid_file}}\nexport 
SERVER_JVMFLAGS={{zk_server_heapsize}}\nexport JAVA=$JAVA_HOME/bin/java\nexport 
CLASSPATH=$CLASSPATH:/usr/share/zookeeper/*\n\n{% if security_enabled 
%}\nexport SERVER_JVMFLAGS=\"$SERVER_JVMFLAGS 
-Djava.security.auth.login.config={{zk_server_jaas_file}}\"\nexport 
CLIENT_JVMFLAGS=\"$CLIENT_JVMFLAGS 
-Djava.security.auth.login.config={{zk_client_jaas_file}}\"\n{% endif %}",
-        "zk_pid_dir":"/var/run/zookeeper",
-        "zookeeper_principal_name":"zookeeper/[email protected]",
-        "zookeeper_keytab_path":"/etc/security/keytabs/zk.service.keytab"
+  "hash": "a1a71f4b46feaf72bf33627d78bbdc3e",
+  "clusters": {
+    "0":{
+      "configurations":{
+        "zookeeper-env":{
+          "zk_user":"zookeeper",
+          "zk_log_dir":"/var/log/zookeeper",
+          "content":"\nexport JAVA_HOME={{java64_home}}\nexport 
ZOO_LOG_DIR={{zk_log_dir}}\nexport ZOOPIDFILE={{zk_pid_file}}\nexport 
SERVER_JVMFLAGS={{zk_server_heapsize}}\nexport JAVA=$JAVA_HOME/bin/java\nexport 
CLASSPATH=$CLASSPATH:/usr/share/zookeeper/*\n\n{% if security_enabled 
%}\nexport SERVER_JVMFLAGS=\"$SERVER_JVMFLAGS 
-Djava.security.auth.login.config={{zk_server_jaas_file}}\"\nexport 
CLIENT_JVMFLAGS=\"$CLIENT_JVMFLAGS 
-Djava.security.auth.login.config={{zk_client_jaas_file}}\"\n{% endif %}",
+          "zk_pid_dir":"/var/run/zookeeper",
+          "zookeeper_principal_name":"zookeeper/[email protected]",
+          "zookeeper_keytab_path":"/etc/security/keytabs/zk.service.keytab"
+        },
+        "zoo.cfg":{
+          "clientPort":"2181",
+          "syncLimit":"5",
+          "initLimit":"10",
+          "dataDir":"/hadoop/zookeeper",
+          "tickTime":"2000"
+        }
       },
-      "zoo.cfg":{
-        "clientPort":"2181",
-        "syncLimit":"5",
-        "initLimit":"10",
-        "dataDir":"/hadoop/zookeeper",
-        "tickTime":"2000"
-      }
-    },
-    "configurationAttributes":{
-      "core-site":{
-        "final":{
-          "fs.defaultFS":"true"
+      "configurationAttributes":{
+        "core-site":{
+          "final":{
+            "fs.defaultFS":"true"
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
index 14d7c04..0dc5aff 100644
--- 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json
@@ -1,33 +1,185 @@
 {
-  "0":{
-    "status_commands_to_run":[
-      "STATUS"
-    ],
-    "status_commands_timeout":"900",
-    "hooks_folder":"HDP/2.0.6/hooks",
-    "credentialStoreEnabled":"false",
-    "availableServices":{
-      "SQOOP":"1.4.6.2.5",
-      "AMBARI_METRICS":"0.1.0",
-      "KERBEROS":"1.10.3-10",
-      "RANGER":"0.6.0.2.5",
-      "ZEPPELIN":"0.6.0.2.5",
-      "HDFS":"2.7.3.2.6",
-      "ZOOKEEPER":"3.4.6"
-    },
-    "agentConfigParams":{
-      "agent":{
-        "parallel_execution":0,
-        "use_system_proxy_settings":true
-      }
-    },
-    "recoveryConfig":{
-      "type":"DEFAULT|AUTO_START|AUTO_INSTALL_START|FULL",
-      "maxCount":10,
-      "windowInMinutes":60,
-      "retryGap":0,
-      "components":"ZOOKEEPER_CLIENT,ZOOKEEPER_SERVER",
-      "recoveryTimestamp":1458150424380
+    "hash": "c2bea6695221368416b2412fec2ba0d7",
+    "clusters": {
+        "0": {
+            "serviceSpecifics": {
+                "GANGLIA": {
+                    "version": "3.5.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": null
+                },
+                "DRUID": {
+                    "version": "0.9.2",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "TEZ": {
+                    "version": "0.7.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "SPARK": {
+                    "version": "1.6.x",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "HBASE": {
+                    "version": "1.1.2",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "RANGER_KMS": {
+                    "version": "0.7.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "ATLAS": {
+                    "version": "0.8.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "HIVE": {
+                    "version": "1.2.1000",
+                    "credentialStoreEnabled": true,
+                    "status_commands_timeout": null
+                },
+                "SLIDER": {
+                    "version": "0.92.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "AMBARI_INFRA": {
+                    "version": "0.1.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "FLUME": {
+                    "version": "1.5.2",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "MAHOUT": {
+                    "version": "0.9.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "SQOOP": {
+                    "version": "1.4.6",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "OOZIE": {
+                    "version": "4.2.0",
+                    "credentialStoreEnabled": true,
+                    "status_commands_timeout": 300
+                },
+                "HDFS": {
+                    "version": "2.7.3",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "MAPREDUCE2": {
+                    "version": "2.7.3",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "ACCUMULO": {
+                    "version": "1.7.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "ZOOKEEPER": {
+                    "version": "3.4.6",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "YARN": {
+                    "version": "2.7.3",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "KERBEROS": {
+                    "version": "1.10.3-10",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "KNOX": {
+                    "version": "0.12.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "PIG": {
+                    "version": "0.16.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "STORM": {
+                    "version": "1.1.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "RANGER": {
+                    "version": "0.7.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "AMBARI_METRICS": {
+                    "version": "0.1.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 600
+                },
+                "ZEPPELIN": {
+                    "version": "0.7.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "KAFKA": {
+                    "version": "0.10.1",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "LOGSEARCH": {
+                    "version": "0.5.0",
+                    "credentialStoreEnabled": true,
+                    "status_commands_timeout": 300
+                },
+                "FALCON": {
+                    "version": "0.10.0",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                },
+                "SPARK2": {
+                    "version": "2.x",
+                    "credentialStoreEnabled": false,
+                    "status_commands_timeout": 300
+                }
+            },
+            "clusterLevelParams": {
+                "host_sys_prepped": "false",
+                "java_home": null,
+                "agent_stack_retry_count": "5",
+                "jdk_location": 
"http://c6401.ambari.apache.org:8080/resources/";,
+                "jdk_name": null,
+                "stack_version": "2.6",
+                "user_list": 
"[\"accumulo\",\"zookeeper\",\"ams\",\"ambari-qa\",\"hdfs\",\"yarn\",\"mapred\"]",
+                "mysql_jdbc_url": 
"http://dvitiiuk-System-Product-Name:8080/resources//mysql-connector-java.jar";,
+                "oracle_jdbc_url": 
"http://dvitiiuk-System-Product-Name:8080/resources//ojdbc6.jar";,
+                "ambari_db_rca_password": "mapred",
+                "jce_name": null,
+                "group_list": "[\"hadoop\",\"users\"]",
+                "db_name": "ambari",
+                "ambari_db_rca_driver": "org.postgresql.Driver",
+                "ambari_db_rca_username": "mapred",
+                "java_version": "8",
+                "not_managed_hdfs_path_list": 
"[\"/mr-history/done\",\"/app-logs\",\"/tmp\"]",
+                "db_driver_filename": "mysql-connector-java.jar",
+                "stack_name": "HDP",
+                "ambari_db_rca_url": 
"jdbc:postgresql://c6401.ambari.apache.org/ambarirca",
+                "agent_stack_retry_on_unavailability": "false",
+                "user_groups": "{}"
+            },
+            "status_commands_to_run": ["STATUS"],
+            "hooks_folder": "HDP/2.0.6/hooks"
+        }
     }
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json
new file mode 100644
index 0000000..2c37111
--- /dev/null
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json
@@ -0,0 +1,23 @@
+{
+  "eventType":"UPDATE",
+  "hash":"33db7b2f65011d475a08bff4810a9965",
+  "clusters":{
+    "0":{
+      "components":[
+        {
+          "componentName":"SECONDARY_NAMENODE",
+          "serviceName":"HDFS",
+          "version":"2.6.0.3-8",
+          "hostIds":[
+            0,
+            1
+          ],
+          "statusCommandParams":{
+            "script":"scripts/snamenode.py",
+            "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package"
+          }
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component_host.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component_host.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component_host.json
new file mode 100644
index 0000000..be13c93
--- /dev/null
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component_host.json
@@ -0,0 +1,18 @@
+{
+  "eventType":"UPDATE",
+  "hash":"b313cdebc1a5e36dcad76affb747858b",
+  "clusters":{
+    "0":{
+      "components":[
+        {
+          "componentName":"HDFS_CLIENT",
+          "serviceName":"HDFS",
+          "version":"2.6.0.3-8",
+          "hostIds":[
+            1
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
new file mode 100644
index 0000000..a9407c3
--- /dev/null
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_host.json
@@ -0,0 +1,16 @@
+{
+  "eventType":"UPDATE",
+  "hash":"0b01b81018fe39309d75e7b12a8225ec",
+  "clusters":{
+    "0":{
+      "hosts":[
+        {
+          "hostId":2,
+          "hostName":"c6403.ambari.apache.org",
+          "rackName":"/default-rack",
+          "ipv4":"192.168.64.103"
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
new file mode 100644
index 0000000..9894420
--- /dev/null
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json
@@ -0,0 +1,58 @@
+{
+  "0": {
+    "hosts": [
+      {
+        "rackName": "/default-rack",
+        "hostName": "c6402.ambari.apache.org",
+        "ipv4": "192.168.64.102",
+        "hostId": 1
+      },
+      {
+        "rackName": "/default-rack",
+        "hostName": "c6403.ambari.apache.org",
+        "ipv4": "192.168.64.103",
+        "hostId": 2
+      }
+    ],
+    "components": [
+      {
+        "statusCommandParams": {
+          "script": "scripts/datanode.py",
+          "servicePackageFolder": "common-services/HDFS/2.1.0.2.0/package"
+        },
+        "componentName": "DATANODE",
+        "serviceName": "HDFS",
+        "version": "2.6.0.3-8",
+        "hostIds": [
+          1
+        ]
+      },
+      {
+        "statusCommandParams": {
+          "script": "scripts/hdfs_client.py",
+          "servicePackageFolder": "common-services/HDFS/2.1.0.2.0/package"
+        },
+        "componentName": "HDFS_CLIENT",
+        "version": "2.6.0.3-8",
+        "serviceName": "HDFS",
+        "hostIds": [
+          0,
+          1
+        ]
+      },
+      {
+        "statusCommandParams": {
+          "script": "scripts/snamenode.py",
+          "servicePackageFolder": "common-services/HDFS/2.1.0.2.0/package"
+        },
+        "componentName": "SECONDARY_NAMENODE",
+        "serviceName": "HDFS",
+        "version": "2.6.0.3-8",
+        "hostIds": [
+          0,
+          1
+        ]
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
new file mode 100644
index 0000000..cf1afa7
--- /dev/null
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json
@@ -0,0 +1,85 @@
+{
+  "eventType":"CREATE",
+  "hash":"1a49a0f19639c5c2528220bfc5ad0778",
+  "clusters":{
+    "0":{
+      "components":[
+        {
+          "componentName":"NAMENODE",
+          "serviceName":"HDFS",
+          "version":"2.6.0.3-8",
+          "hostIds":[
+            0
+          ],
+          "statusCommandParams":{
+            "script":"scripts/namenode.py",
+            "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package"
+          }
+        },
+        {
+          "componentName":"DATANODE",
+          "serviceName":"HDFS",
+          "version":"2.6.0.3-8",
+          "hostIds":[
+            0,
+            1
+          ],
+          "statusCommandParams":{
+            "script":"scripts/datanode.py",
+            "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package"
+          }
+        },
+        {
+          "componentName":"HDFS_CLIENT",
+          "serviceName":"HDFS",
+          "version":"2.6.0.3-8",
+          "hostIds":[
+            0
+          ],
+          "statusCommandParams":{
+            "script":"scripts/hdfs_client.py",
+            "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package"
+          }
+        }
+      ],
+      "hosts":[
+        {
+          "hostId":0,
+          "hostName":"c6401.ambari.apache.org",
+          "rackName":"/default-rack",
+          "ipv4":"192.168.64.101"
+        },
+        {
+          "hostId":1,
+          "hostName":"c6402.ambari.apache.org",
+          "rackName":"/default-rack",
+          "ipv4":"192.168.64.102"
+        }
+      ]
+    },
+    "1": {
+      "components":[
+        {
+          "componentName":"NAMENODE",
+          "serviceName":"HDFS",
+          "version":"2.6.0.3-8",
+          "hostIds":[
+            0
+          ],
+          "statusCommandParams":{
+            "script":"scripts/namenode.py",
+            "servicePackageFolder":"common-services/HDFS/2.1.0.2.0/package"
+          }
+        }
+      ],
+      "hosts":[
+        {
+          "hostId":0,
+          "hostName":"c6401.ambari.apache.org",
+          "rackName":"/default-rack",
+          "ipv4":"192.168.64.101"
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_cluster.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_cluster.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_cluster.json
new file mode 100644
index 0000000..12c75a8
--- /dev/null
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_cluster.json
@@ -0,0 +1,8 @@
+{
+  "eventType":"DELETE",
+  "hash":"686bca74a23019ce8865d759beeccb0a",
+  "clusters":{
+    "1":{
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component.json
new file mode 100644
index 0000000..19e5418
--- /dev/null
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component.json
@@ -0,0 +1,19 @@
+{
+  "eventType":"DELETE",
+  "hash":"26370af7e07ff89c3949537aa902b3da",
+  "clusters":{
+    "0":{
+      "components":[
+        {
+          "componentName":"NAMENODE",
+          "serviceName":"HDFS",
+          "version":"2.6.0.3-8",
+          "hostIds":[
+            0,
+            1
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component_host.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component_host.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component_host.json
new file mode 100644
index 0000000..0c137b8
--- /dev/null
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_component_host.json
@@ -0,0 +1,18 @@
+{
+  "eventType":"DELETE",
+  "hash":"591930ef5e0eb5d5437f815e73673e4b",
+  "clusters":{
+    "0":{
+      "components":[
+        {
+          "componentName":"DATANODE",
+          "serviceName":"HDFS",
+          "version":"2.6.0.3-8",
+          "hostIds":[
+            0
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_host.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_host.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_host.json
new file mode 100644
index 0000000..5f5d68a
--- /dev/null
+++ 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_delete_host.json
@@ -0,0 +1,13 @@
+{
+  "eventType":"DELETE",
+  "hash":"686bca74a23019ce8865d759beeccb0a",
+  "clusters":{
+    "0":{
+      "hosts":[
+        {
+          "hostId":0
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/6303e935/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_update.json
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_update.json
 
b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_update.json
deleted file mode 100644
index b403e4d..0000000
--- 
a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_update.json
+++ /dev/null
@@ -1,69 +0,0 @@
-{
-  "0":{
-    "components":[
-      {
-        "serviceName":"ZOOKEEPER",
-        "componentName":"ZOOKEEPER_SERVER",
-        "hosts":[
-          0,
-          1,
-          4
-        ],
-        "statusCommandsParams":{
-          "script":"scripts/zookeeper_server.py",
-          "service_package_folder":"common-services/ZOOKEEPER/3.4.5/package"
-        }
-      },
-      {
-        "serviceName":"ZOOKEEPER",
-        "componentName":"ZOOKEEPER_CLIENT",
-        "hosts":[
-          0,
-          1,
-          2,
-          3,
-          4
-        ],
-        "statusCommandsParams":{
-          "script":"scripts/zookeeper_client.py",
-          "service_package_folder":"common-services/ZOOKEEPER/3.4.5/package"
-        }
-      }
-    ],
-    "hosts":[
-      {
-        "hostname":"c6401.ambari.apache.org",
-        "rack_id":0,
-        "ipv4_ip":"10.240.0.240"
-      },
-      {
-        "hostname":"c6402.ambari.apache.org",
-        "rack_id":1,
-        "ipv4_ip":"10.240.0.241"
-      },
-      {
-        "hostname":"c6403.ambari.apache.org",
-        "rack_id":0,
-        "ipv4_ip":"10.240.0.242"
-      },
-      {
-        "hostname":"c6404.ambari.apache.org",
-        "rack_id":0,
-        "ipv4_ip":"10.240.0.243"
-      },
-      {
-        "hostname":"c6405.ambari.apache.org",
-        "rack_id":1,
-        "ipv4_ip":"10.240.0.244"
-      }
-    ],
-    "racks":[
-      {
-        "name":"/default-rack"
-      },
-      {
-        "name":"/another-rack"
-      }
-    ]
-  }
-}
\ No newline at end of file

Reply via email to