Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf 12df64fe3 -> 94fed5503


AMBARI-21245. Send information about agentEnv only if it changed. (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/94fed550
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/94fed550
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/94fed550

Branch: refs/heads/branch-3.0-perf
Commit: 94fed550380c51fb8baf672919985bcbf14143c4
Parents: 12df64f
Author: Andrew Onishuk <[email protected]>
Authored: Fri Jun 16 11:27:36 2017 +0300
Committer: Andrew Onishuk <[email protected]>
Committed: Fri Jun 16 11:27:36 2017 +0300

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/ActionQueue.py |  2 +-
 .../main/python/ambari_agent/AmbariConfig.py    |  2 +-
 .../python/ambari_agent/CommandStatusDict.py    |  5 +-
 .../src/main/python/ambari_agent/Constants.py   |  1 +
 .../ambari_agent/CustomServiceOrchestrator.py   |  2 +-
 .../main/python/ambari_agent/HeartbeatThread.py |  3 +-
 .../src/main/python/ambari_agent/HostInfo.py    | 19 ++----
 .../python/ambari_agent/HostStatusReporter.py   | 71 ++++++++++++++++++++
 .../python/ambari_agent/InitializerModule.py    | 15 +++--
 .../src/main/python/ambari_agent/Register.py    |  2 +-
 .../src/main/python/ambari_agent/Utils.py       | 30 +++++++++
 .../src/main/python/ambari_agent/main.py        |  4 ++
 .../ambari_agent/TestAgentStompResponses.py     | 25 +++++--
 13 files changed, 151 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py 
b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
index 4cef88b..5632b5b 100644
--- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
+++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py
@@ -75,7 +75,7 @@ class ActionQueue(threading.Thread):
     self.commandQueue = Queue.Queue()
     self.backgroundCommandQueue = Queue.Queue()
     self.commandStatuses = initializer_module.commandStatuses
-    self.config = initializer_module.ambariConfig
+    self.config = initializer_module.config
     self.configTags = {}
     self.stop_event = initializer_module.stop_event
     self.tmpdir = self.config.get('agent', 'prefix')

http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py 
b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
index 3d480ca..9507c9d 100644
--- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
+++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
@@ -63,7 +63,7 @@ server_crt=ca.crt
 passphrase_env_var_name=AMBARI_PASSPHRASE
 
 [heartbeat]
-state_interval = 6
+state_interval = 1
 
dirs={ps}etc{ps}hadoop,{ps}etc{ps}hadoop{ps}conf,{ps}var{ps}run{ps}hadoop,{ps}var{ps}log{ps}hadoop
 log_lines_count=300
 iddle_interval_min=1

http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py 
b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index d0f6801..e27a243 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -81,13 +81,13 @@ class CommandStatusDict():
         if command ['commandType'] in [ActionQueue.EXECUTION_COMMAND, 
ActionQueue.BACKGROUND_EXECUTION_COMMAND]:
           if (report['status']) != ActionQueue.IN_PROGRESS_STATUS:
             resultReports.append(report)
-            self.reported_reports.append(key)
+            self.reported_reports.add(key)
           else:
             in_progress_report = self.generate_in_progress_report(command, 
report)
             resultReports.append(in_progress_report)
         elif command ['commandType'] in [ActionQueue.AUTO_EXECUTION_COMMAND]:
           logger.debug("AUTO_EXECUTION_COMMAND task deleted " + 
str(command['commandId']))
-          self.reported_reports.append(key)
+          self.reported_reports.add(key)
           pass
       return resultReports
 
@@ -95,6 +95,7 @@ class CommandStatusDict():
     with self.lock:
       for key in self.reported_reports:
         del self.current_state[key]
+      self.reported_reports = set()
 
   def generate_in_progress_report(self, command, report):
     """

http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py 
b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 8fafca1..02945ee 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -33,6 +33,7 @@ METADATA_REQUEST_ENDPOINT = '/agents/metadata'
 CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs'
 COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status'
 COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status'
+HOST_STATUS_REPORTS_ENDPOINT = '/reports/host_status'
 
 HEARTBEAT_ENDPOINT = '/heartbeat'
 REGISTRATION_ENDPOINT = '/register'

http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py 
b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
index 2350504..6d1a491 100644
--- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
+++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py
@@ -81,7 +81,7 @@ class CustomServiceOrchestrator():
     self.metadata_cache = initializer_module.metadata_cache
     self.topology_cache = initializer_module.topology_cache
     self.configurations_cache = initializer_module.configurations_cache
-    self.config = initializer_module.ambariConfig
+    self.config = initializer_module.config
     self.tmp_dir = self.config.get('agent', 'prefix')
     self.force_https_protocol = self.config.get_force_https_protocol()
     self.exec_tmp_dir = Constants.AGENT_TMP_DIR

http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py 
b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index e54c0c1..40e5b12 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -46,7 +46,7 @@ class HeartbeatThread(threading.Thread):
     self.heartbeat_interval = HEARTBEAT_INTERVAL
     self.stop_event = initializer_module.stop_event
 
-    self.registration_builder = Register(initializer_module.ambariConfig)
+    self.registration_builder = Register(initializer_module.config)
 
     self.initializer_module = initializer_module
     self.caches = [initializer_module.metadata_cache, 
initializer_module.topology_cache, initializer_module.configurations_cache]
@@ -89,6 +89,7 @@ class HeartbeatThread(threading.Thread):
 
       self.stop_event.wait(self.heartbeat_interval)
 
+    self.initializer_module.is_registered = False
     self.initializer_module.connection.disconnect()
     delattr(self.initializer_module, '_connection')
     logger.info("HeartbeatThread has successfully finished")

http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/HostInfo.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HostInfo.py 
b/ambari-agent/src/main/python/ambari_agent/HostInfo.py
index 4b7bfd7..b6db1f2 100644
--- a/ambari-agent/src/main/python/ambari_agent/HostInfo.py
+++ b/ambari-agent/src/main/python/ambari_agent/HostInfo.py
@@ -35,6 +35,7 @@ from resource_management.core import shell
 from ambari_agent.HostCheckReportFileHandler import HostCheckReportFileHandler
 from AmbariConfig import AmbariConfig
 from resource_management.core.resources.jcepolicyinfo import JcePolicyInfo
+import Hardware
 
 logger = logging.getLogger()
 
@@ -296,11 +297,8 @@ class HostInfoLinux(HostInfo):
       logger.exception('Unable to get information about JCE')
       return None
 
-  def register(self, dict, componentsMapped=True, commandsInProgress=True):
-    """ Return various details about the host
-    componentsMapped: indicates if any components are mapped to this host
-    commandsInProgress: indicates if any commands are in progress
-    """
+  def register(self, dict, runExpensiveChecks=False):
+    """ Return various details about the host"""
 
     dict['hostHealth'] = {}
 
@@ -321,7 +319,7 @@ class HostInfoLinux(HostInfo):
     dict['hasUnlimitedJcePolicy'] = self.checkUnlimitedJce()
     # If commands are in progress or components are already mapped to this host
     # Then do not perform certain expensive host checks
-    if componentsMapped or commandsInProgress:
+    if not runExpensiveChecks:
       dict['alternatives'] = []
       dict['stackFoldersAndFiles'] = []
       dict['existingUsers'] = []
@@ -418,10 +416,8 @@ class HostInfoWindows(HostInfo):
     code, out, err = 
run_powershell_script(self.SERVICE_STATUS_CMD.format(serivce_name))
     return out, err, code
 
-  def register(self, dict, componentsMapped=True, commandsInProgress=True):
+  def register(self, dict, runExpensiveChecks=False):
     """ Return various details about the host
-    componentsMapped: indicates if any components are mapped to this host
-    commandsInProgress: indicates if any commands are in progress
     """
     dict['hostHealth'] = {}
 
@@ -438,9 +434,8 @@ class HostInfoWindows(HostInfo):
     dict['firewallRunning'] = self.checkFirewall()
     dict['firewallName'] = self.getFirewallName()
     dict['reverseLookup'] = self.checkReverseLookup()
-    # If commands are in progress or components are already mapped to this host
-    # Then do not perform certain expensive host checks
-    if componentsMapped or commandsInProgress:
+
+    if not runExpensiveChecks:
       dict['alternatives'] = []
       dict['stackFoldersAndFiles'] = []
       dict['existingUsers'] = []

http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py 
b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
new file mode 100644
index 0000000..c45b64a
--- /dev/null
+++ b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py
@@ -0,0 +1,71 @@
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import threading
+
+from ambari_agent import Constants
+from ambari_agent.HostInfo import HostInfo
+from ambari_agent.Utils import Utils
+from ambari_agent.Hardware import Hardware
+
+logger = logging.getLogger(__name__)
+
+class HostStatusReporter(threading.Thread):
+  """
+  The thread reports host status to server if it changed from previous report 
every 'host_status_report_interval' seconds.
+  """
+  def __init__(self, initializer_module):
+    self.initializer_module = initializer_module
+    self.report_interval = initializer_module.host_status_report_interval
+    self.stop_event = initializer_module.stop_event
+    self.config = initializer_module.config
+    self.host_info = HostInfo(initializer_module.config)
+    self.last_report = {}
+    threading.Thread.__init__(self)
+
+  def run(self):
+    while not self.stop_event.is_set():
+      try:
+        if self.initializer_module.is_registered:
+          report = self.get_report()
+
+          if self.initializer_module.is_registered and not 
Utils.are_dicts_equal(report, self.last_report, 
keys_to_skip=["agentTimeStampAtReporting"]):
+            self.initializer_module.connection.send(message=report, 
destination=Constants.HOST_STATUS_REPORTS_ENDPOINT)
+            self.last_report = report
+
+        # don't use else to avoid race condition
+        if not self.initializer_module.is_registered:
+          self.last_report = {}
+      except:
+        logger.exception("Exception in HostStatusReporter. Re-running it")
+
+      self.stop_event.wait(self.report_interval)
+
+    logger.info("HostStatusReporter has successfully finished")
+
+  def get_report(self):
+    host_info_dict = {}
+    self.host_info.register(host_info_dict)
+
+    report = {
+      'agentEnv': host_info_dict,
+      'mounts': Hardware.osdisks(self.config),
+    }
+
+    return report
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py 
b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index 88c8b91..f0c3b43 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -31,6 +31,7 @@ from ambari_agent.security import AmbariStompConnection
 from ambari_agent.ActionQueue import ActionQueue
 from ambari_agent.CommandStatusDict import CommandStatusDict
 from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
+from ambari_agent.HostStatusReporter import HostStatusReporter
 
 logger = logging.getLogger()
 
@@ -49,15 +50,17 @@ class InitializerModule:
     """
     Initialize every property got from ambari-agent.ini
     """
-    self.ambariConfig = AmbariConfig.get_resolved_config()
+    self.config = AmbariConfig.get_resolved_config()
 
-    self.server_hostname = self.ambariConfig.get('server', 'hostname')
-    self.secured_url_port = self.ambariConfig.get('server', 'secured_url_port')
+    self.server_hostname = self.config.get('server', 'hostname')
+    self.secured_url_port = self.config.get('server', 'secured_url_port')
 
-    self.cache_dir = self.ambariConfig.get('agent', 'cache_dir', 
default='/var/lib/ambari-agent/cache')
-    self.command_reports_interval = int(self.ambariConfig.get('agent', 
'command_reports_interval', default='5'))
+    self.cache_dir = self.config.get('agent', 'cache_dir', 
default='/var/lib/ambari-agent/cache')
+    self.command_reports_interval = int(self.config.get('agent', 
'command_reports_interval', default='5'))
     self.cluster_cache_dir = os.path.join(self.cache_dir, 
FileCache.CLUSTER_CACHE_DIRECTORY)
 
+    self.host_status_report_interval = int(self.config.get('heartbeat', 
'state_interval_seconds', '60'))
+
   def init(self):
     """
     Initialize properties
@@ -67,7 +70,7 @@ class InitializerModule:
     self.is_registered = False
 
     self.metadata_cache = ClusterMetadataCache(self.cluster_cache_dir)
-    self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir, 
self.ambariConfig)
+    self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir, 
self.config)
     self.configurations_cache = 
ClusterConfigurationCache(self.cluster_cache_dir)
     self.customServiceOrchestrator = CustomServiceOrchestrator(self)
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/Register.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Register.py 
b/ambari-agent/src/main/python/ambari_agent/Register.py
index 4c4bf32..a6bc8c0 100644
--- a/ambari-agent/src/main/python/ambari_agent/Register.py
+++ b/ambari-agent/src/main/python/ambari_agent/Register.py
@@ -36,7 +36,7 @@ class Register:
 
     hostInfo = HostInfo(self.config)
     agentEnv = { }
-    hostInfo.register(agentEnv, False, False)
+    hostInfo.register(agentEnv, runExpensiveChecks=True)
 
     current_ping_port = self.config.get('agent','ping_port')
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/Utils.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py 
b/ambari-agent/src/main/python/ambari_agent/Utils.py
index de073bb..46aa955 100644
--- a/ambari-agent/src/main/python/ambari_agent/Utils.py
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -73,6 +73,36 @@ class BlockingDictionary():
 
 class Utils(object):
   @staticmethod
+  def are_dicts_equal(d1, d2, keys_to_skip=[]):
+    """
+    Check if two dictionaries are equal. Comparing the nested dictionaries is 
done as well.
+    """
+    return Utils.are_dicts_equal_one_way(d1, d2, keys_to_skip) and 
Utils.are_dicts_equal_one_way(d2, d1, keys_to_skip)
+  @staticmethod
+  def are_dicts_equal_one_way(d1, d2, keys_to_skip=[]):
+    """
+    Check if d1 has all the same keys and their values as d2
+    including nested dictionaries
+    """
+    for k in d1.keys():
+      if k in keys_to_skip:
+        #print "skipping " + str(k)
+        continue
+      if not d2.has_key(k):
+        #print "don't have key="+str(k)
+        return False
+      else:
+        if type(d1[k]) is dict:
+          are_equal = Utils.are_dicts_equal_one_way(d1[k], d2[k], keys_to_skip)
+          if not are_equal:
+            return False
+        else:
+          if d1[k] != d2[k]:
+            #print "not equal at "+str(k)
+            return False
+    return True
+
+  @staticmethod
   def make_immutable(value):
     if isinstance(value, ImmutableDictionary):
       return value

http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py 
b/ambari-agent/src/main/python/ambari_agent/main.py
index a1a98e8..56962d3 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -107,6 +107,7 @@ from ambari_agent import HeartbeatThread
 from ambari_agent.InitializerModule import InitializerModule
 from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor
 from ambari_agent.CommandStatusReporter import CommandStatusReporter
+from ambari_agent.HostStatusReporter import HostStatusReporter
 
 logger = logging.getLogger()
 alerts_logger = logging.getLogger('ambari_alerts')
@@ -353,6 +354,9 @@ def run_threads(initializer_module):
   command_status_reporter = CommandStatusReporter(initializer_module)
   command_status_reporter.start()
 
+  host_status_reporter = HostStatusReporter(initializer_module)
+  host_status_reporter.start()
+
   initializer_module.action_queue.start()
 
   while not initializer_module.stop_event.is_set():

http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py 
b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
index 9d57261..f53097f 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -31,6 +31,7 @@ from ambari_agent import HeartbeatThread
 from ambari_agent.InitializerModule import InitializerModule
 from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor
 from ambari_agent.CommandStatusReporter import CommandStatusReporter
+from ambari_agent.HostStatusReporter import HostStatusReporter
 from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator
 
 from mock.mock import MagicMock, patch
@@ -63,9 +64,6 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     component_status_executor = ComponentStatusExecutor(initializer_module)
     component_status_executor.start()
 
-    command_status_reporter = CommandStatusReporter(initializer_module)
-    command_status_reporter.start()
-
     connect_frame = self.server.frames_queue.get()
     users_subscribe_frame = self.server.frames_queue.get()
     registration_frame = self.server.frames_queue.get()
@@ -92,6 +90,12 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     while not initializer_module.is_registered:
       time.sleep(0.1)
 
+    command_status_reporter = CommandStatusReporter(initializer_module)
+    command_status_reporter.start()
+
+    host_status_reporter = HostStatusReporter(initializer_module)
+    host_status_reporter.start()
+
     f = Frame(frames.MESSAGE, headers={'destination': '/user/commands'}, 
body=self.get_json("execution_commands.json"))
     self.server.topic_manager.send(f)
 
@@ -106,16 +110,22 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     zk_start_failed_frame = json.loads(self.server.frames_queue.get().body)
     action_status_in_progress_frame = 
json.loads(self.server.frames_queue.get().body)
     action_status_failed_frame = 
json.loads(self.server.frames_queue.get().body)
+    host_status_report = json.loads(self.server.frames_queue.get().body)
+
     initializer_module.stop_event.set()
 
     f = Frame(frames.MESSAGE, headers={'destination': '/user/', 
'correlationId': '4'}, body=json.dumps({'id':'1'}))
     self.server.topic_manager.send(f)
 
+    command_status_reporter.join()
     heartbeat_thread.join()
     component_status_executor.join()
-    command_status_reporter.join()
+    host_status_reporter.join()
     action_queue.join()
 
+
+    self.assertTrue('mounts' in host_status_report)
+    self.assertTrue('activeJavaProcs' in 
host_status_report['agentEnv']['hostHealth'])
     
self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostName'],
 'c6401.ambari.apache.org')
     
self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'],
 ('STATUS',))
     
self.assertEquals(initializer_module.configurations_cache['0']['configurations']['zoo.cfg']['clientPort'],
 '2181')
@@ -124,7 +134,6 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     self.assertEquals(dn_start_in_progress_frame[0]['status'], 'IN_PROGRESS')
     self.assertEquals(dn_start_failed_frame[0]['status'], 'FAILED')
 
-
     
#============================================================================================
     
#============================================================================================
 
@@ -145,6 +154,9 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     command_status_reporter = CommandStatusReporter(initializer_module)
     command_status_reporter.start()
 
+    host_status_reporter = HostStatusReporter(initializer_module)
+    host_status_reporter.start()
+
     connect_frame = self.server.frames_queue.get()
     users_subscribe_frame = self.server.frames_queue.get()
     registration_frame = self.server.frames_queue.get()
@@ -177,8 +189,10 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     heartbeat_thread.join()
     component_status_executor.join()
     command_status_reporter.join()
+    host_status_reporter.join()
     action_queue.join()
 
+
   def test_topology_update_and_delete(self):
     initializer_module = InitializerModule()
     heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module)
@@ -247,3 +261,4 @@ class TestAgentStompResponses(BaseStompServerTestCase):
     self.server.topic_manager.send(f)
 
     heartbeat_thread.join()
+

Reply via email to