Repository: ambari Updated Branches: refs/heads/branch-3.0-perf 917898cdb -> d4cd91436
AMBARI-21401. Support cancel commands, some changes to status/command reports, bugfixes (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d4cd9143 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d4cd9143 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d4cd9143 Branch: refs/heads/branch-3.0-perf Commit: d4cd914367bf2e1716454d5dd5775ddb989f0382 Parents: 917898c Author: Andrew Onishuk <[email protected]> Authored: Wed Jul 5 11:52:45 2017 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Wed Jul 5 11:52:45 2017 +0300 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 4 +-- .../python/ambari_agent/CommandStatusDict.py | 12 +++++--- .../ambari_agent/CommandStatusReporter.py | 4 ++- .../ambari_agent/ComponentStatusExecutor.py | 2 +- .../ambari_agent/CustomServiceOrchestrator.py | 18 +++++------ .../src/main/python/ambari_agent/Utils.py | 17 +++++++++++ .../listeners/CommandsEventListener.py | 12 ++++++-- .../ambari_agent/TestAgentStompResponses.py | 14 ++++----- .../dummy_files/stomp/execution_commands.json | 13 ++------ .../stomp/topology_add_component.json | 8 +++-- .../stomp/topology_cache_expected.json | 18 +++++++++-- .../dummy_files/stomp/topology_create.json | 32 +++++++++++++++----- 12 files changed, 101 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index a470697..f2fc253 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -90,7 +90,7 @@ class ActionQueue(threading.Thread): for command in commands: if not command.has_key('serviceName'): command['serviceName'] = "null" - if command.has_key('clusterId'): + if not command.has_key('clusterId'): command['clusterId'] = "null" logger.info("Adding " + command['commandType'] + " for role " + \ @@ -302,7 +302,7 @@ class ActionQueue(threading.Thread): retryDuration -= delay # allow one last attempt commandresult['stderr'] += "\n\nCommand failed. Retrying command execution ...\n\n" logger.info("Retrying command with taskId = {cid} after a wait of {delay}".format(cid=taskId, delay=delay)) - command['commandBeingRetried'] = "true" + command['agentLevelParams']['commandBeingRetried'] = "true" time.sleep(delay) continue else: http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py index 133701f..ff526e3 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py @@ -21,6 +21,7 @@ limitations under the License. import logging import threading import copy +from collections import defaultdict from Grep import Grep from ambari_agent import Constants @@ -55,10 +56,10 @@ class CommandStatusDict(): self.current_state[key] = (command, new_report) self.reported_reports.discard(key) - self.force_update_to_server([new_report]) + self.force_update_to_server(command['clusterId'], new_report) - def force_update_to_server(self, reports): - self.initializer_module.connection.send(message=reports, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT) + def force_update_to_server(self, cluster_id, report): + self.initializer_module.connection.send(message={'clusters':{cluster_id: [report]}}, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT) def get_command_status(self, taskId): with self.lock: @@ -74,13 +75,14 @@ class CommandStatusDict(): self.generated_reports = [] from ActionQueue import ActionQueue with self.lock: # Synchronized - resultReports = [] + resultReports = defaultdict(lambda:[]) for key, item in self.current_state.items(): command = item[0] report = item[1] + cluster_id = report['clusterId'] if command ['commandType'] in [ActionQueue.EXECUTION_COMMAND, ActionQueue.BACKGROUND_EXECUTION_COMMAND]: if (report['status']) != ActionQueue.IN_PROGRESS_STATUS: - resultReports.append(report) + resultReports[cluster_id].append(report) self.reported_reports.add(key) else: in_progress_report = self.generate_in_progress_report(command, report) http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py index 6ee4474..5e5eb0d 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py @@ -44,8 +44,10 @@ class CommandStatusReporter(threading.Thread): try: if self.initializer_module.is_registered: report = self.commandStatuses.generate_report() + if report: - self.initializer_module.connection.send(message=report, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT) + self.initializer_module.connection.send(message={'clusters': report}, destination=Constants.COMMANDS_STATUS_REPORTS_ENDPOINT) + self.commandStatuses.clear_reported_reports() except: logger.exception("Exception in CommandStatusReporter. Re-running it") http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py index 2ac904f..5e53ed8 100644 --- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py @@ -105,7 +105,7 @@ class ComponentStatusExecutor(threading.Thread): if not cluster_reports or not self.initializer_module.is_registered: return - self.initializer_module.connection.send(message=cluster_reports, destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT) + self.initializer_module.connection.send(message={'clusters': cluster_reports}, destination=Constants.COMPONENT_STATUS_REPORTS_ENDPOINT) for cluster_id, reports in cluster_reports.iteritems(): for report in reports: http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py index c0b20ed..6389878 100644 --- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py +++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py @@ -31,6 +31,7 @@ from AgentException import AgentException from PythonExecutor import PythonExecutor from resource_management.libraries.functions.log_process_information import log_process_information from resource_management.core.utils import PasswordString +from ambari_agent.Utils import Utils import subprocess import Constants import hostname @@ -310,9 +311,9 @@ class CustomServiceOrchestrator(): """ try: command = self.generate_command(command_header) - script_type = command['script_type'] # TODO STOMP: take this from command? - script = command['componentLevelParams']['script'] - timeout = int('300') # TODO STOMP: fix it + script_type = command['commandParams']['script_type'] + script = command['commandParams']['script'] + timeout = int(command['commandParams']['command_timeout']) server_url_prefix = command['clusterLevelParams']['jdk_location'] @@ -366,8 +367,7 @@ class CustomServiceOrchestrator(): credentialStoreEnabled = (command['serviceLevelParams']['credentialStoreEnabled'] == "true") if credentialStoreEnabled == True: - # TODO STOMP: fix this with execution commands - if 'commandBeingRetried' not in command or command['commandBeingRetried'] != "true": + if 'commandBeingRetried' not in command['agentLevelParams'] or command['agentLevelParams']['commandBeingRetried'] != "true": self.generateJceks(command) else: logger.info("Skipping generation of jceks files as this is a retry of the command") @@ -469,12 +469,9 @@ class CustomServiceOrchestrator(): 'serviceLevelParams': metadata_cache.serviceLevelParams[service_name], 'hostLevelParams': host_level_params_cache, 'componentLevelParams': component_dict.componentLevelParams, - 'script_type': self.SCRIPT_TYPE_PYTHON + 'commandParams': component_dict.commandParams } command_dict.update(configurations_cache) - #command_dict['componentLevelParams']['script'] = component_dict.statusCommandParams['script'] - #command_dict['serviceLevelParams']['hooks_folder'] = metadata_cache['hooks_folder'] - #command_dict['serviceLevelParams']['service_package_folder'] = component_dict.statusCommandParams['service_package_folder'] command_dict['agentLevelParams'] = { 'public_hostname': self.public_fqdn, @@ -486,9 +483,8 @@ class CustomServiceOrchestrator(): "use_system_proxy_settings": self.config.use_system_proxy_setting() } } - command = copy.copy(command_header) - command.update(command_dict) + command = Utils.update_nested(command_dict, command_header) return command def requestComponentStatus(self, command_header): http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/main/python/ambari_agent/Utils.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py index 46aa955..af70d77 100644 --- a/ambari-agent/src/main/python/ambari_agent/Utils.py +++ b/ambari-agent/src/main/python/ambari_agent/Utils.py @@ -19,6 +19,7 @@ limitations under the License. """ import os import threading +import collections from functools import wraps from ambari_agent.ExitHelper import ExitHelper @@ -103,6 +104,22 @@ class Utils(object): return True @staticmethod + def update_nested(d, u): + """ + Update the dictionary 'd' and its sub-dictionaries with values of dictionary 'u' and its sub-dictionaries. + """ + for k, v in u.iteritems(): + if isinstance(d, collections.Mapping): + if isinstance(v, collections.Mapping): + r = Utils.update_nested(d.get(k, {}), v) + d[k] = r + else: + d[k] = u[k] + else: + d = {k: u[k]} + return d + + @staticmethod def make_immutable(value): if isinstance(value, ImmutableDictionary): return value http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py index c3839cb..ae8d400 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py @@ -42,12 +42,18 @@ class CommandsEventListener(EventListener): """ "" commands = [] + cancel_commands = [] for cluster_id in message['clusters'].keys(): cluster_dict = message['clusters'][cluster_id] - for command in cluster_dict['commands']: - commands.append(command) - self.action_queue.put(commands) + if 'commands' in cluster_dict: + commands += cluster_dict['commands'] + if 'cancelCommands' in cluster_dict: + cancel_commands += cluster_dict['cancelCommands'] + + with self.action_queue.lock: + self.action_queue.cancel(cancel_commands) + self.action_queue.put(commands) def get_handled_path(self): return Constants.COMMANDS_TOPIC \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py index feaf7dd..26c40b3 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py @@ -136,13 +136,13 @@ class TestAgentStompResponses(BaseStompServerTestCase): self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostName'], 'c6401.ambari.apache.org') self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'], ('STATUS',)) self.assertEquals(initializer_module.configurations_cache['0']['configurations']['zoo.cfg']['clientPort'], '2181') - self.assertEquals(dn_install_in_progress_frame[0]['roleCommand'], 'INSTALL') - self.assertEquals(dn_install_in_progress_frame[0]['role'], 'DATANODE') - self.assertEquals(dn_install_in_progress_frame[0]['status'], 'IN_PROGRESS') - self.assertEquals(dn_install_failed_frame[0]['status'], 'FAILED') - self.assertEquals(dn_recovery_in_progress_frame[0]['roleCommand'], 'INSTALL') - self.assertEquals(dn_recovery_in_progress_frame[0]['role'], 'DATANODE') - self.assertEquals(dn_recovery_in_progress_frame[0]['status'], 'IN_PROGRESS') + self.assertEquals(dn_install_in_progress_frame['clusters']['0'][0]['roleCommand'], 'INSTALL') + self.assertEquals(dn_install_in_progress_frame['clusters']['0'][0]['role'], 'DATANODE') + self.assertEquals(dn_install_in_progress_frame['clusters']['0'][0]['status'], 'IN_PROGRESS') + self.assertEquals(dn_install_failed_frame['clusters']['0'][0]['status'], 'FAILED') + self.assertEquals(dn_recovery_in_progress_frame['clusters']['0'][0]['roleCommand'], 'INSTALL') + self.assertEquals(dn_recovery_in_progress_frame['clusters']['0'][0]['role'], 'DATANODE') + self.assertEquals(dn_recovery_in_progress_frame['clusters']['0'][0]['status'], 'IN_PROGRESS') #============================================================================================ #============================================================================================ http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json index 075699e..76dac1b 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json @@ -15,22 +15,15 @@ }, "commandParams":{ - "service_package_folder":"common-services/HDFS/2.1.0.2.0/package", - "hooks_folder":"HDP/2.0.6/hooks", - "script":"scripts/datanode.py", - "phase":"INITIAL_INSTALL", - "max_duration_for_retries":"600", - "command_retry_enabled":"false", - "command_timeout":"1200", - "refresh_topology":"True", - "script_type":"PYTHON" + "command_retry_enabled": "true", + "refresh_topology":"true" } }, { "requestId":6, "taskId":9, "commandId":0, - "clusterId": "null", + "clusterId": "0", "serviceName":"ZOOKEEPER", "role":"ZOOKEEPER_SERVER", "commandType":"EXECUTION_COMMAND", http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json index 1514516..d6deecf 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_add_component.json @@ -14,8 +14,12 @@ ], "componentLevelParams": { "unlimited_key_jce_required": "false", - "clientsToUpdateConfigs": "[\"*\"]", - "script":"scripts/snamenode.py" + "clientsToUpdateConfigs": "[\"*\"]" + }, + "commandParams": { + "script": "scripts/namenode.py", + "script_type":"PYTHON", + "command_timeout":"1200" } } ] http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json index ff2b3fd..08bccd7 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_cache_expected.json @@ -2,9 +2,13 @@ "0": { "components": [ { + "commandParams": { + "command_timeout": "1200", + "script": "scripts/datanode.py", + "script_type": "PYTHON" + }, "componentLevelParams": { "clientsToUpdateConfigs": "[\"*\"]", - "script": "scripts/namenode.py", "unlimited_key_jce_required": "false" }, "componentName": "DATANODE", @@ -15,9 +19,13 @@ "version": "2.6.0.3-8" }, { + "commandParams": { + "command_timeout": "1200", + "script": "scripts/hdfs_client.py", + "script_type": "PYTHON" + }, "componentLevelParams": { "clientsToUpdateConfigs": "[\"*\"]", - "script": "scripts/hdfs_client.py", "unlimited_key_jce_required": "false" }, "componentName": "HDFS_CLIENT", @@ -29,9 +37,13 @@ "version": "2.6.0.3-8" }, { + "commandParams": { + "command_timeout": "1200", + "script": "scripts/namenode.py", + "script_type": "PYTHON" + }, "componentLevelParams": { "clientsToUpdateConfigs": "[\"*\"]", - "script": "scripts/snamenode.py", "unlimited_key_jce_required": "false" }, "componentName": "SECONDARY_NAMENODE", http://git-wip-us.apache.org/repos/asf/ambari/blob/d4cd9143/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json index 6df4bc3..758fe6d 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/topology_create.json @@ -13,8 +13,12 @@ ], "componentLevelParams": { "unlimited_key_jce_required": "false", - "clientsToUpdateConfigs": "[\"*\"]", - "script": "scripts/namenode.py" + "clientsToUpdateConfigs": "[\"*\"]" + }, + "commandParams": { + "script": "scripts/namenode.py", + "script_type":"PYTHON", + "command_timeout":"1200" } }, { @@ -27,8 +31,12 @@ ], "componentLevelParams": { "unlimited_key_jce_required": "false", - "clientsToUpdateConfigs": "[\"*\"]", - "script": "scripts/namenode.py" + "clientsToUpdateConfigs": "[\"*\"]" + }, + "commandParams": { + "script": "scripts/datanode.py", + "script_type":"PYTHON", + "command_timeout":"1200" } }, { @@ -40,8 +48,12 @@ ], "componentLevelParams": { "unlimited_key_jce_required": "false", - "clientsToUpdateConfigs": "[\"*\"]", - "script": "scripts/hdfs_client.py" + "clientsToUpdateConfigs": "[\"*\"]" + }, + "commandParams": { + "script": "scripts/hdfs_client.py", + "script_type":"PYTHON", + "command_timeout":"1200" } } ], @@ -71,8 +83,12 @@ ], "componentLevelParams": { "unlimited_key_jce_required": "false", - "clientsToUpdateConfigs": "[\"*\"]", - "script": "scripts/namenode.py" + "clientsToUpdateConfigs": "[\"*\"]" + }, + "commandParams": { + "script": "scripts/namenode.py", + "script_type":"PYTHON", + "command_timeout":"1200" } } ],
