Repository: ambari Updated Branches: refs/heads/branch-3.0-perf 12df64fe3 -> 94fed5503
AMBARI-21245. Send information about agentEnv only if it changed. (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/94fed550 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/94fed550 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/94fed550 Branch: refs/heads/branch-3.0-perf Commit: 94fed550380c51fb8baf672919985bcbf14143c4 Parents: 12df64f Author: Andrew Onishuk <[email protected]> Authored: Fri Jun 16 11:27:36 2017 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Fri Jun 16 11:27:36 2017 +0300 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 2 +- .../main/python/ambari_agent/AmbariConfig.py | 2 +- .../python/ambari_agent/CommandStatusDict.py | 5 +- .../src/main/python/ambari_agent/Constants.py | 1 + .../ambari_agent/CustomServiceOrchestrator.py | 2 +- .../main/python/ambari_agent/HeartbeatThread.py | 3 +- .../src/main/python/ambari_agent/HostInfo.py | 19 ++---- .../python/ambari_agent/HostStatusReporter.py | 71 ++++++++++++++++++++ .../python/ambari_agent/InitializerModule.py | 15 +++-- .../src/main/python/ambari_agent/Register.py | 2 +- .../src/main/python/ambari_agent/Utils.py | 30 +++++++++ .../src/main/python/ambari_agent/main.py | 4 ++ .../ambari_agent/TestAgentStompResponses.py | 25 +++++-- 13 files changed, 151 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 4cef88b..5632b5b 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -75,7 +75,7 @@ class ActionQueue(threading.Thread): self.commandQueue = Queue.Queue() self.backgroundCommandQueue = Queue.Queue() self.commandStatuses = initializer_module.commandStatuses - self.config = initializer_module.ambariConfig + self.config = initializer_module.config self.configTags = {} self.stop_event = initializer_module.stop_event self.tmpdir = self.config.get('agent', 'prefix') http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py index 3d480ca..9507c9d 100644 --- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py +++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py @@ -63,7 +63,7 @@ server_crt=ca.crt passphrase_env_var_name=AMBARI_PASSPHRASE [heartbeat] -state_interval = 6 +state_interval = 1 dirs={ps}etc{ps}hadoop,{ps}etc{ps}hadoop{ps}conf,{ps}var{ps}run{ps}hadoop,{ps}var{ps}log{ps}hadoop log_lines_count=300 iddle_interval_min=1 http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py index d0f6801..e27a243 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py @@ -81,13 +81,13 @@ class CommandStatusDict(): if command ['commandType'] in [ActionQueue.EXECUTION_COMMAND, ActionQueue.BACKGROUND_EXECUTION_COMMAND]: if (report['status']) != ActionQueue.IN_PROGRESS_STATUS: resultReports.append(report) - self.reported_reports.append(key) + self.reported_reports.add(key) else: in_progress_report = self.generate_in_progress_report(command, report) resultReports.append(in_progress_report) elif command ['commandType'] in [ActionQueue.AUTO_EXECUTION_COMMAND]: logger.debug("AUTO_EXECUTION_COMMAND task deleted " + str(command['commandId'])) - self.reported_reports.append(key) + self.reported_reports.add(key) pass return resultReports @@ -95,6 +95,7 @@ class CommandStatusDict(): with self.lock: for key in self.reported_reports: del self.current_state[key] + self.reported_reports = set() def generate_in_progress_report(self, command, report): """ http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/Constants.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py index 8fafca1..02945ee 100644 --- a/ambari-agent/src/main/python/ambari_agent/Constants.py +++ b/ambari-agent/src/main/python/ambari_agent/Constants.py @@ -33,6 +33,7 @@ METADATA_REQUEST_ENDPOINT = '/agents/metadata' CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs' COMPONENT_STATUS_REPORTS_ENDPOINT = '/reports/component_status' COMMANDS_STATUS_REPORTS_ENDPOINT = '/reports/commands_status' +HOST_STATUS_REPORTS_ENDPOINT = '/reports/host_status' HEARTBEAT_ENDPOINT = '/heartbeat' REGISTRATION_ENDPOINT = '/register' http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py index 2350504..6d1a491 100644 --- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py +++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py @@ -81,7 +81,7 @@ class CustomServiceOrchestrator(): self.metadata_cache = initializer_module.metadata_cache self.topology_cache = initializer_module.topology_cache self.configurations_cache = initializer_module.configurations_cache - self.config = initializer_module.ambariConfig + self.config = initializer_module.config self.tmp_dir = self.config.get('agent', 'prefix') self.force_https_protocol = self.config.get_force_https_protocol() self.exec_tmp_dir = Constants.AGENT_TMP_DIR http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index e54c0c1..40e5b12 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -46,7 +46,7 @@ class HeartbeatThread(threading.Thread): self.heartbeat_interval = HEARTBEAT_INTERVAL self.stop_event = initializer_module.stop_event - self.registration_builder = Register(initializer_module.ambariConfig) + self.registration_builder = Register(initializer_module.config) self.initializer_module = initializer_module self.caches = [initializer_module.metadata_cache, initializer_module.topology_cache, initializer_module.configurations_cache] @@ -89,6 +89,7 @@ class HeartbeatThread(threading.Thread): self.stop_event.wait(self.heartbeat_interval) + self.initializer_module.is_registered = False self.initializer_module.connection.disconnect() delattr(self.initializer_module, '_connection') logger.info("HeartbeatThread has successfully finished") http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/HostInfo.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HostInfo.py b/ambari-agent/src/main/python/ambari_agent/HostInfo.py index 4b7bfd7..b6db1f2 100644 --- a/ambari-agent/src/main/python/ambari_agent/HostInfo.py +++ b/ambari-agent/src/main/python/ambari_agent/HostInfo.py @@ -35,6 +35,7 @@ from resource_management.core import shell from ambari_agent.HostCheckReportFileHandler import HostCheckReportFileHandler from AmbariConfig import AmbariConfig from resource_management.core.resources.jcepolicyinfo import JcePolicyInfo +import Hardware logger = logging.getLogger() @@ -296,11 +297,8 @@ class HostInfoLinux(HostInfo): logger.exception('Unable to get information about JCE') return None - def register(self, dict, componentsMapped=True, commandsInProgress=True): - """ Return various details about the host - componentsMapped: indicates if any components are mapped to this host - commandsInProgress: indicates if any commands are in progress - """ + def register(self, dict, runExpensiveChecks=False): + """ Return various details about the host""" dict['hostHealth'] = {} @@ -321,7 +319,7 @@ class HostInfoLinux(HostInfo): dict['hasUnlimitedJcePolicy'] = self.checkUnlimitedJce() # If commands are in progress or components are already mapped to this host # Then do not perform certain expensive host checks - if componentsMapped or commandsInProgress: + if not runExpensiveChecks: dict['alternatives'] = [] dict['stackFoldersAndFiles'] = [] dict['existingUsers'] = [] @@ -418,10 +416,8 @@ class HostInfoWindows(HostInfo): code, out, err = run_powershell_script(self.SERVICE_STATUS_CMD.format(serivce_name)) return out, err, code - def register(self, dict, componentsMapped=True, commandsInProgress=True): + def register(self, dict, runExpensiveChecks=False): """ Return various details about the host - componentsMapped: indicates if any components are mapped to this host - commandsInProgress: indicates if any commands are in progress """ dict['hostHealth'] = {} @@ -438,9 +434,8 @@ class HostInfoWindows(HostInfo): dict['firewallRunning'] = self.checkFirewall() dict['firewallName'] = self.getFirewallName() dict['reverseLookup'] = self.checkReverseLookup() - # If commands are in progress or components are already mapped to this host - # Then do not perform certain expensive host checks - if componentsMapped or commandsInProgress: + + if not runExpensiveChecks: dict['alternatives'] = [] dict['stackFoldersAndFiles'] = [] dict['existingUsers'] = [] http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py new file mode 100644 index 0000000..c45b64a --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/HostStatusReporter.py @@ -0,0 +1,71 @@ +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import threading + +from ambari_agent import Constants +from ambari_agent.HostInfo import HostInfo +from ambari_agent.Utils import Utils +from ambari_agent.Hardware import Hardware + +logger = logging.getLogger(__name__) + +class HostStatusReporter(threading.Thread): + """ + The thread reports host status to server if it changed from previous report every 'host_status_report_interval' seconds. + """ + def __init__(self, initializer_module): + self.initializer_module = initializer_module + self.report_interval = initializer_module.host_status_report_interval + self.stop_event = initializer_module.stop_event + self.config = initializer_module.config + self.host_info = HostInfo(initializer_module.config) + self.last_report = {} + threading.Thread.__init__(self) + + def run(self): + while not self.stop_event.is_set(): + try: + if self.initializer_module.is_registered: + report = self.get_report() + + if self.initializer_module.is_registered and not Utils.are_dicts_equal(report, self.last_report, keys_to_skip=["agentTimeStampAtReporting"]): + self.initializer_module.connection.send(message=report, destination=Constants.HOST_STATUS_REPORTS_ENDPOINT) + self.last_report = report + + # don't use else to avoid race condition + if not self.initializer_module.is_registered: + self.last_report = {} + except: + logger.exception("Exception in HostStatusReporter. Re-running it") + + self.stop_event.wait(self.report_interval) + + logger.info("HostStatusReporter has successfully finished") + + def get_report(self): + host_info_dict = {} + self.host_info.register(host_info_dict) + + report = { + 'agentEnv': host_info_dict, + 'mounts': Hardware.osdisks(self.config), + } + + return report \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/InitializerModule.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index 88c8b91..f0c3b43 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -31,6 +31,7 @@ from ambari_agent.security import AmbariStompConnection from ambari_agent.ActionQueue import ActionQueue from ambari_agent.CommandStatusDict import CommandStatusDict from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator +from ambari_agent.HostStatusReporter import HostStatusReporter logger = logging.getLogger() @@ -49,15 +50,17 @@ class InitializerModule: """ Initialize every property got from ambari-agent.ini """ - self.ambariConfig = AmbariConfig.get_resolved_config() + self.config = AmbariConfig.get_resolved_config() - self.server_hostname = self.ambariConfig.get('server', 'hostname') - self.secured_url_port = self.ambariConfig.get('server', 'secured_url_port') + self.server_hostname = self.config.get('server', 'hostname') + self.secured_url_port = self.config.get('server', 'secured_url_port') - self.cache_dir = self.ambariConfig.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache') - self.command_reports_interval = int(self.ambariConfig.get('agent', 'command_reports_interval', default='5')) + self.cache_dir = self.config.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache') + self.command_reports_interval = int(self.config.get('agent', 'command_reports_interval', default='5')) self.cluster_cache_dir = os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY) + self.host_status_report_interval = int(self.config.get('heartbeat', 'state_interval_seconds', '60')) + def init(self): """ Initialize properties @@ -67,7 +70,7 @@ class InitializerModule: self.is_registered = False self.metadata_cache = ClusterMetadataCache(self.cluster_cache_dir) - self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir, self.ambariConfig) + self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir, self.config) self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir) self.customServiceOrchestrator = CustomServiceOrchestrator(self) http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/Register.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Register.py b/ambari-agent/src/main/python/ambari_agent/Register.py index 4c4bf32..a6bc8c0 100644 --- a/ambari-agent/src/main/python/ambari_agent/Register.py +++ b/ambari-agent/src/main/python/ambari_agent/Register.py @@ -36,7 +36,7 @@ class Register: hostInfo = HostInfo(self.config) agentEnv = { } - hostInfo.register(agentEnv, False, False) + hostInfo.register(agentEnv, runExpensiveChecks=True) current_ping_port = self.config.get('agent','ping_port') http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/Utils.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py index de073bb..46aa955 100644 --- a/ambari-agent/src/main/python/ambari_agent/Utils.py +++ b/ambari-agent/src/main/python/ambari_agent/Utils.py @@ -73,6 +73,36 @@ class BlockingDictionary(): class Utils(object): @staticmethod + def are_dicts_equal(d1, d2, keys_to_skip=[]): + """ + Check if two dictionaries are equal. Comparing the nested dictionaries is done as well. + """ + return Utils.are_dicts_equal_one_way(d1, d2, keys_to_skip) and Utils.are_dicts_equal_one_way(d2, d1, keys_to_skip) + @staticmethod + def are_dicts_equal_one_way(d1, d2, keys_to_skip=[]): + """ + Check if d1 has all the same keys and their values as d2 + including nested dictionaries + """ + for k in d1.keys(): + if k in keys_to_skip: + #print "skipping " + str(k) + continue + if not d2.has_key(k): + #print "don't have key="+str(k) + return False + else: + if type(d1[k]) is dict: + are_equal = Utils.are_dicts_equal_one_way(d1[k], d2[k], keys_to_skip) + if not are_equal: + return False + else: + if d1[k] != d2[k]: + #print "not equal at "+str(k) + return False + return True + + @staticmethod def make_immutable(value): if isinstance(value, ImmutableDictionary): return value http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index a1a98e8..56962d3 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -107,6 +107,7 @@ from ambari_agent import HeartbeatThread from ambari_agent.InitializerModule import InitializerModule from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor from ambari_agent.CommandStatusReporter import CommandStatusReporter +from ambari_agent.HostStatusReporter import HostStatusReporter logger = logging.getLogger() alerts_logger = logging.getLogger('ambari_alerts') @@ -353,6 +354,9 @@ def run_threads(initializer_module): command_status_reporter = CommandStatusReporter(initializer_module) command_status_reporter.start() + host_status_reporter = HostStatusReporter(initializer_module) + host_status_reporter.start() + initializer_module.action_queue.start() while not initializer_module.stop_event.is_set(): http://git-wip-us.apache.org/repos/asf/ambari/blob/94fed550/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py index 9d57261..f53097f 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py @@ -31,6 +31,7 @@ from ambari_agent import HeartbeatThread from ambari_agent.InitializerModule import InitializerModule from ambari_agent.ComponentStatusExecutor import ComponentStatusExecutor from ambari_agent.CommandStatusReporter import CommandStatusReporter +from ambari_agent.HostStatusReporter import HostStatusReporter from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator from mock.mock import MagicMock, patch @@ -63,9 +64,6 @@ class TestAgentStompResponses(BaseStompServerTestCase): component_status_executor = ComponentStatusExecutor(initializer_module) component_status_executor.start() - command_status_reporter = CommandStatusReporter(initializer_module) - command_status_reporter.start() - connect_frame = self.server.frames_queue.get() users_subscribe_frame = self.server.frames_queue.get() registration_frame = self.server.frames_queue.get() @@ -92,6 +90,12 @@ class TestAgentStompResponses(BaseStompServerTestCase): while not initializer_module.is_registered: time.sleep(0.1) + command_status_reporter = CommandStatusReporter(initializer_module) + command_status_reporter.start() + + host_status_reporter = HostStatusReporter(initializer_module) + host_status_reporter.start() + f = Frame(frames.MESSAGE, headers={'destination': '/user/commands'}, body=self.get_json("execution_commands.json")) self.server.topic_manager.send(f) @@ -106,16 +110,22 @@ class TestAgentStompResponses(BaseStompServerTestCase): zk_start_failed_frame = json.loads(self.server.frames_queue.get().body) action_status_in_progress_frame = json.loads(self.server.frames_queue.get().body) action_status_failed_frame = json.loads(self.server.frames_queue.get().body) + host_status_report = json.loads(self.server.frames_queue.get().body) + initializer_module.stop_event.set() f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '4'}, body=json.dumps({'id':'1'})) self.server.topic_manager.send(f) + command_status_reporter.join() heartbeat_thread.join() component_status_executor.join() - command_status_reporter.join() + host_status_reporter.join() action_queue.join() + + self.assertTrue('mounts' in host_status_report) + self.assertTrue('activeJavaProcs' in host_status_report['agentEnv']['hostHealth']) self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostName'], 'c6401.ambari.apache.org') self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'], ('STATUS',)) self.assertEquals(initializer_module.configurations_cache['0']['configurations']['zoo.cfg']['clientPort'], '2181') @@ -124,7 +134,6 @@ class TestAgentStompResponses(BaseStompServerTestCase): self.assertEquals(dn_start_in_progress_frame[0]['status'], 'IN_PROGRESS') self.assertEquals(dn_start_failed_frame[0]['status'], 'FAILED') - #============================================================================================ #============================================================================================ @@ -145,6 +154,9 @@ class TestAgentStompResponses(BaseStompServerTestCase): command_status_reporter = CommandStatusReporter(initializer_module) command_status_reporter.start() + host_status_reporter = HostStatusReporter(initializer_module) + host_status_reporter.start() + connect_frame = self.server.frames_queue.get() users_subscribe_frame = self.server.frames_queue.get() registration_frame = self.server.frames_queue.get() @@ -177,8 +189,10 @@ class TestAgentStompResponses(BaseStompServerTestCase): heartbeat_thread.join() component_status_executor.join() command_status_reporter.join() + host_status_reporter.join() action_queue.join() + def test_topology_update_and_delete(self): initializer_module = InitializerModule() heartbeat_thread = HeartbeatThread.HeartbeatThread(initializer_module) @@ -247,3 +261,4 @@ class TestAgentStompResponses(BaseStompServerTestCase): self.server.topic_manager.send(f) heartbeat_thread.join() +
