Repository: ambari Updated Branches: refs/heads/branch-3.0-perf 11f16c83a -> 670a08eed
AMBARI-21270. Ability to auto-start component without server intervention (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/670a08ee Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/670a08ee Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/670a08ee Branch: refs/heads/branch-3.0-perf Commit: 670a08eed3c8b45b58ab729c8592d6f46af3d4f2 Parents: 11f16c8 Author: Andrew Onishuk <[email protected]> Authored: Tue Jun 20 15:25:47 2017 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Tue Jun 20 15:25:47 2017 +0300 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 146 ++------- .../python/ambari_agent/CommandStatusDict.py | 2 +- .../ambari_agent/ComponentStatusExecutor.py | 5 +- .../main/python/ambari_agent/HeartbeatThread.py | 2 +- .../python/ambari_agent/InitializerModule.py | 4 +- .../main/python/ambari_agent/RecoveryManager.py | 303 +++++++------------ .../listeners/MetadataEventListener.py | 17 +- .../ambari_agent/TestAgentStompResponses.py | 23 +- .../dummy_files/stomp/execution_commands.json | 10 +- .../stomp/metadata_after_registration.json | 12 +- 10 files changed, 176 insertions(+), 348 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 5632b5b..a470697 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -76,6 +76,7 @@ class ActionQueue(threading.Thread): self.backgroundCommandQueue = Queue.Queue() self.commandStatuses = initializer_module.commandStatuses self.config = initializer_module.config + self.recovery_manager = initializer_module.recovery_manager self.configTags = {} self.stop_event = initializer_module.stop_event self.tmpdir = self.config.get('agent', 'prefix') @@ -91,9 +92,6 @@ class ActionQueue(threading.Thread): command['serviceName'] = "null" if command.has_key('clusterId'): command['clusterId'] = "null" - if not command.has_key('clusterName'): - command['clusterName'] = 'null' - logger.info("Adding " + command['commandType'] + " for role " + \ command['role'] + " for service " + \ @@ -134,6 +132,7 @@ class ActionQueue(threading.Thread): try: while not self.stop_event.is_set(): self.processBackgroundQueueSafeEmpty() + self.fillRecoveryCommands() try: if self.parallel_execution == 0: command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) @@ -168,6 +167,10 @@ class ActionQueue(threading.Thread): logger.info("ActionQueue thread has successfully finished") + def fillRecoveryCommands(self): + if not self.tasks_in_progress_or_pending(): + self.put(self.recovery_manager.get_recovery_commands()) + def processBackgroundQueueSafeEmpty(self): while not self.backgroundCommandQueue.empty(): try: @@ -190,40 +193,34 @@ class ActionQueue(threading.Thread): try: if commandType in [self.EXECUTION_COMMAND, self.BACKGROUND_EXECUTION_COMMAND, self.AUTO_EXECUTION_COMMAND]: try: - # TODO STOMP: fix recovery manager for execution commands - #if self.controller.recovery_manager.enabled(): - # self.controller.recovery_manager.start_execution_command() + if self.recovery_manager.enabled(): + self.recovery_manager.on_execution_command_start() + self.recovery_manager.process_execution_command(command) + self.execute_command(command) finally: - pass - #if self.controller.recovery_manager.enabled(): - # self.controller.recovery_manager.stop_execution_command() + if self.recovery_manager.enabled(): + self.recovery_manager.on_execution_command_finish() else: logger.error("Unrecognized command " + pprint.pformat(command)) except Exception: logger.exception("Exception while processing {0} command".format(commandType)) def tasks_in_progress_or_pending(self): - return_val = False - if not self.commandQueue.empty(): - return_val = True - if self.controller.recovery_manager.has_active_command(): - return_val = True - return return_val - pass + return not self.commandQueue.empty() or self.recovery_manager.has_active_command() def execute_command(self, command): ''' Executes commands of type EXECUTION_COMMAND ''' - clusterName = command['clusterName'] + clusterId = command['clusterId'] commandId = command['commandId'] isCommandBackground = command['commandType'] == self.BACKGROUND_EXECUTION_COMMAND isAutoExecuteCommand = command['commandType'] == self.AUTO_EXECUTION_COMMAND message = "Executing command with id = {commandId}, taskId = {taskId} for role = {role} of " \ - "cluster {cluster}.".format( + "cluster_id {cluster}.".format( commandId = str(commandId), taskId = str(command['taskId']), - role=command['role'], cluster=clusterName) + role=command['role'], cluster=clusterId) logger.info(message) taskId = command['taskId'] @@ -359,10 +356,10 @@ class ActionQueue(threading.Thread): roleResult['stderr'] = 'None' # let ambari know name of custom command - """ - if command['hostLevelParams'].has_key('custom_command'): - roleResult['customCommand'] = command['hostLevelParams']['custom_command'] - """ + + if command.has_key('custom_command'): + roleResult['customCommand'] = command['custom_command'] + if 'structuredOut' in commandresult: roleResult['structuredOut'] = str(json.dumps(commandresult['structuredOut'])) else: @@ -370,30 +367,6 @@ class ActionQueue(threading.Thread): # let recovery manager know the current state if status == self.COMPLETED_STATUS: - # TODO STOMP:fix recovery_manager - """ - if self.controller.recovery_manager.enabled() and command.has_key('roleCommand') \ - and self.controller.recovery_manager.configured_for_recovery(command['role']): - if command['roleCommand'] == self.ROLE_COMMAND_START: - self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.LIVE_STATUS) - self.controller.recovery_manager.update_config_staleness(command['role'], False) - logger.info("After EXECUTION_COMMAND (START), with taskId=" + str(command['taskId']) + - ", current state of " + command['role'] + " to " + - self.controller.recovery_manager.get_current_status(command['role']) ) - elif command['roleCommand'] == self.ROLE_COMMAND_STOP or command['roleCommand'] == self.ROLE_COMMAND_INSTALL: - self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.DEAD_STATUS) - logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId=" + str(command['taskId']) + - ", current state of " + command['role'] + " to " + - self.controller.recovery_manager.get_current_status(command['role']) ) - elif command['roleCommand'] == self.ROLE_COMMAND_CUSTOM_COMMAND: - if command['hostLevelParams'].has_key('custom_command') and \ - command['hostLevelParams']['custom_command'] == self.CUSTOM_COMMAND_RESTART: - self.controller.recovery_manager.update_current_status(command['role'], LiveStatus.LIVE_STATUS) - self.controller.recovery_manager.update_config_staleness(command['role'], False) - logger.info("After EXECUTION_COMMAND (RESTART), current state of " + command['role'] + " to " + - self.controller.recovery_manager.get_current_status(command['role']) ) - """ - # let ambari know that configuration tags were applied configHandler = ActualConfigHandler(self.config, self.configTags) @@ -428,18 +401,8 @@ class ActionQueue(threading.Thread): command['hostLevelParams']['clientsToUpdateConfigs']) roleResult['configurationTags'] = configHandler.read_actual_component( command['role']) - elif status == self.FAILED_STATUS: - # TODO STOMP: recovery manager - """ - if self.controller.recovery_manager.enabled() and command.has_key('roleCommand') \ - and self.controller.recovery_manager.configured_for_recovery(command['role']): - if command['roleCommand'] == self.ROLE_COMMAND_INSTALL: - self.controller.recovery_manager.update_current_status(command['role'], self.controller.recovery_manager.INSTALL_FAILED) - logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) + - ", current state of " + command['role'] + " to " + - self.controller.recovery_manager.get_current_status(command['role'])) - """ + self.recovery_manager.process_execution_command_result(command, status) self.commandStatuses.put_command_status(command, roleResult) def log_command_output(self, text, taskId): @@ -495,73 +458,6 @@ class ActionQueue(threading.Thread): component_status_result = self.customServiceOrchestrator.requestComponentStatus(command) return command, component_status_result - def process_status_command_result(self, result): - ''' - Executes commands of type STATUS_COMMAND - ''' - # TODO STOMP: review if we need to run this with new status commands - try: - command, component_status_result = result - cluster = command['clusterName'] - service = command['serviceName'] - component = command['componentName'] - configurations = command['configurations'] - if configurations.has_key('global'): - globalConfig = configurations['global'] - else: - globalConfig = {} - - # TODO STOMP: check why we need this - if not Script.config : - logger.debug('Setting Script.config to last status command configuration') - Script.config = command - - livestatus = LiveStatus(cluster, service, component, - globalConfig, self.config, self.configTags) - - component_extra = None - - if component_status_result['exitcode'] == 0: - component_status = LiveStatus.LIVE_STATUS - if self.controller.recovery_manager.enabled() \ - and self.controller.recovery_manager.configured_for_recovery(component): - self.controller.recovery_manager.update_current_status(component, component_status) - else: - component_status = LiveStatus.DEAD_STATUS - if self.controller.recovery_manager.enabled() \ - and self.controller.recovery_manager.configured_for_recovery(component): - if (self.controller.recovery_manager.get_current_status(component) != self.controller.recovery_manager.INSTALL_FAILED): - self.controller.recovery_manager.update_current_status(component, component_status) - - request_execution_cmd = self.controller.recovery_manager.requires_recovery(component) and \ - not self.controller.recovery_manager.command_exists(component, ActionQueue.EXECUTION_COMMAND) - - if 'structuredOut' in component_status_result: - component_extra = component_status_result['structuredOut'] - - result = livestatus.build(component_status=component_status) - if self.controller.recovery_manager.enabled(): - result['sendExecCmdDet'] = str(request_execution_cmd) - - if component_extra is not None and len(component_extra) != 0: - if component_extra.has_key('alerts'): - result['alerts'] = component_extra['alerts'] - del component_extra['alerts'] - - result['extra'] = component_extra - - logger.debug("Got live status for component " + component + \ - " of service " + str(service) + \ - " of cluster " + str(cluster)) - - logger.debug(pprint.pformat(result)) - if result is not None: - self.commandStatuses.put_command_status(command, result) - except Exception, err: - traceback.print_exc() - logger.warn(err) - pass - def status_update_callback(self): """ Actions that are executed every time when command status changes http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py index e27a243..133701f 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py @@ -136,7 +136,7 @@ class CommandStatusDict(): 'role': command['role'], 'actionId': command['commandId'], 'taskId': command['taskId'], - 'clusterName': command['clusterName'], + 'clusterId': command['clusterId'], 'serviceName': command['serviceName'], 'roleCommand': command['roleCommand'] } http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py index 520c97d..2ac904f 100644 --- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py @@ -22,6 +22,7 @@ import logging import threading from ambari_agent import Constants +from ambari_agent.LiveStatus import LiveStatus from collections import defaultdict logger = logging.getLogger(__name__) @@ -33,6 +34,7 @@ class ComponentStatusExecutor(threading.Thread): self.topology_cache = initializer_module.topology_cache self.customServiceOrchestrator = initializer_module.customServiceOrchestrator self.stop_event = initializer_module.stop_event + self.recovery_manager = initializer_module.recovery_manager self.reported_component_status = defaultdict(lambda:defaultdict(lambda:None)) # component statuses which were received by server threading.Thread.__init__(self) @@ -77,7 +79,7 @@ class ComponentStatusExecutor(threading.Thread): component_status_result = self.customServiceOrchestrator.requestComponentStatus(command_dict) logger.info(component_status_result) - status = "STARTED" if component_status_result['exitcode'] == 0 else "INSTALLED" + status = LiveStatus.LIVE_STATUS if component_status_result['exitcode'] == 0 else LiveStatus.DEAD_STATUS result = { 'serviceName': service_name, @@ -90,6 +92,7 @@ class ComponentStatusExecutor(threading.Thread): if status != self.reported_component_status[component_name][command_name]: logging.info("Status for {0} has changed to {1}".format(component_name, status)) cluster_reports[cluster_id].append(result) + self.recovery_manager.handle_status_change(component_name, status) self.send_updates_to_server(cluster_reports) except: http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index 40e5b12..dbf4006 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -54,7 +54,7 @@ class HeartbeatThread(threading.Thread): # listeners self.server_responses_listener = ServerResponsesListener() self.commands_events_listener = CommandsEventListener(initializer_module.action_queue) - self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache) + self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache, initializer_module.recovery_manager) self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache) self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache) self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener] http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/InitializerModule.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index f0c3b43..8de1fa5 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -31,7 +31,7 @@ from ambari_agent.security import AmbariStompConnection from ambari_agent.ActionQueue import ActionQueue from ambari_agent.CommandStatusDict import CommandStatusDict from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator -from ambari_agent.HostStatusReporter import HostStatusReporter +from ambari_agent.RecoveryManager import RecoveryManager logger = logging.getLogger() @@ -58,6 +58,7 @@ class InitializerModule: self.cache_dir = self.config.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache') self.command_reports_interval = int(self.config.get('agent', 'command_reports_interval', default='5')) self.cluster_cache_dir = os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY) + self.recovery_cache_dir = os.path.join(self.cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY) self.host_status_report_interval = int(self.config.get('heartbeat', 'state_interval_seconds', '60')) @@ -74,6 +75,7 @@ class InitializerModule: self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir) self.customServiceOrchestrator = CustomServiceOrchestrator(self) + self.recovery_manager = RecoveryManager(self.recovery_cache_dir) self.commandStatuses = CommandStatusDict(self) self.action_queue = ActionQueue(self) http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py index be335f2..68dd0be 100644 --- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py +++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py @@ -43,11 +43,12 @@ class RecoveryManager: COMPONENT_NAME = "componentName" ROLE = "role" TASK_ID = "taskId" + CLUSTER_ID = "clusterId" DESIRED_STATE = "desiredState" HAS_STALE_CONFIG = "hasStaleConfigs" EXECUTION_COMMAND_DETAILS = "executionCommandDetails" ROLE_COMMAND = "roleCommand" - HOST_LEVEL_PARAMS = "hostLevelParams" + COMMAND_ID = "commandId" PAYLOAD_LEVEL_DEFAULT = "DEFAULT" PAYLOAD_LEVEL_MINIMAL = "MINIMAL" PAYLOAD_LEVEL_EXECUTION_COMMAND = "EXECUTION_COMMAND" @@ -85,7 +86,6 @@ class RecoveryManager: self.retry_gap = 5 self.max_lifetime_count = 12 - self.stored_exec_commands = {} self.id = int(time.time()) self.allowed_desired_states = [self.STARTED, self.INSTALLED] self.allowed_current_states = [self.INIT, self.INSTALLED] @@ -98,6 +98,7 @@ class RecoveryManager: self.active_command_count = 0 self.paused = False self.recovery_timestamp = -1 + self.cluster_id = None if not os.path.exists(cache_dir): try: @@ -113,12 +114,12 @@ class RecoveryManager: pass - def start_execution_command(self): + def on_execution_command_start(self): with self.__active_command_lock: self.active_command_count += 1 pass - def stop_execution_command(self): + def on_execution_command_finish(self): with self.__active_command_lock: self.active_command_count -= 1 pass @@ -160,11 +161,18 @@ class RecoveryManager: pass self.statuses[component]["stale_config"] = is_config_stale - if self.statuses[component]["current"] == self.statuses[component]["desired"] and \ - self.statuses[component]["stale_config"] == False: - self.remove_command(component) pass + def handle_status_change(self, component, component_status): + if not self.enabled() or not self.configured_for_recovery(component): + return + + if component_status == LiveStatus.LIVE_STATUS: + self.update_current_status(component, component_status) + else: + if (self.get_current_status(component) != self.INSTALL_FAILED): + self.update_current_status(component, component_status) + def update_current_status(self, component, state): """ Updates the current status of a host component managed by the agent @@ -184,9 +192,6 @@ class RecoveryManager: if self.statuses[component]["current"] != state: logger.info("current status is set to %s for %s", state, component) self.statuses[component]["current"] = state - if self.statuses[component]["current"] == self.statuses[component]["desired"] and \ - self.statuses[component]["stale_config"] == False: - self.remove_command(component) pass @@ -209,9 +214,6 @@ class RecoveryManager: if self.statuses[component]["desired"] != state: logger.info("desired status is set to %s for %s", state, component) self.statuses[component]["desired"] = state - if self.statuses[component]["current"] == self.statuses[component]["desired"] and \ - self.statuses[component]["stale_config"] == False: - self.remove_command(component) pass """ @@ -354,9 +356,10 @@ class RecoveryManager: if command: self.execute(component) + logger.info("Created recovery command %s for component %s", + command[self.ROLE_COMMAND], command[self.ROLE]) commands.append(command) return commands - pass def may_execute(self, action): @@ -549,7 +552,7 @@ class RecoveryManager: pass - def update_configuration_from_registration(self, reg_resp): + def update_recovery_config(self, dictionary): """ TODO: Server sends the recovery configuration - call update_config after parsing "recoveryConfig": { @@ -573,9 +576,9 @@ class RecoveryManager: recovery_timestamp = -1 # Default value if recoveryTimestamp is not available. - if reg_resp and "recoveryConfig" in reg_resp: - logger.info("RecoverConfig = " + pprint.pformat(reg_resp["recoveryConfig"])) - config = reg_resp["recoveryConfig"] + if dictionary and "recoveryConfig" in dictionary: + logger.info("RecoverConfig = " + pprint.pformat(dictionary["recoveryConfig"])) + config = dictionary["recoveryConfig"] if "type" in config: if config["type"] in ["AUTO_INSTALL_START", "AUTO_START", "FULL"]: recovery_enabled = True @@ -681,209 +684,113 @@ class RecoveryManager: def get_unique_task_id(self): self.id += 1 return self.id - pass - - def process_status_commands(self, commands): + def process_execution_command_result(self, command, status): + """ + Update current status for the components depending on command and its status. + """ if not self.enabled(): return - - if commands and len(commands) > 0: - for command in commands: - self.store_or_update_command(command) - if self.EXECUTION_COMMAND_DETAILS in command: - logger.debug("Details to construct exec commands: " + pprint.pformat(command[self.EXECUTION_COMMAND_DETAILS])) - - pass - - - def process_execution_commands(self, commands): - if not self.enabled(): + + if not command.has_key(self.ROLE_COMMAND) or not self.configured_for_recovery(command['role']): return - - if commands and len(commands) > 0: - for command in commands: - if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND: - if self.ROLE in command: - if command[self.ROLE_COMMAND] in (ActionQueue.ROLE_COMMAND_INSTALL, ActionQueue.ROLE_COMMAND_STOP) \ - and self.configured_for_recovery(command[self.ROLE]): - self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS) - logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of " + command[self.ROLE] + " to " + - self.get_desired_status(command[self.ROLE]) ) - elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START \ - and self.configured_for_recovery(command[self.ROLE]): - self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS) - logger.info("Received EXECUTION_COMMAND (START), desired state of " + command[self.ROLE] + " to " + - self.get_desired_status(command[self.ROLE]) ) - elif command[self.HOST_LEVEL_PARAMS].has_key('custom_command') and \ - command[self.HOST_LEVEL_PARAMS]['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \ - and self.configured_for_recovery(command[self.ROLE]): - self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS) - logger.info("Received EXECUTION_COMMAND (RESTART), desired state of " + command[self.ROLE] + " to " + - self.get_desired_status(command[self.ROLE]) ) - - pass - - - def store_or_update_command(self, command): - """ - Stores command details by reading them from the STATUS_COMMAND - Update desired state as well + + if status == ActionQueue.COMPLETED_STATUS: + if command[self.ROLE_COMMAND] == self.ROLE_COMMAND_START: + self.update_current_status(command[self.ROLE], LiveStatus.LIVE_STATUS) + #self.update_config_staleness(command['role'], False) + logger.info("After EXECUTION_COMMAND (START), with taskId=" + str(command['taskId']) + + ", current state of " + command[self.ROLE] + " to " + + self.get_current_status(command[self.ROLE]) ) + elif command['roleCommand'] == ActionQueue.ROLE_COMMAND_STOP or command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL: + self.update_current_status(command[self.ROLE], LiveStatus.DEAD_STATUS) + logger.info("After EXECUTION_COMMAND (STOP/INSTALL), with taskId=" + str(command['taskId']) + + ", current state of " + command[self.ROLE] + " to " + + self.get_current_status(command[self.ROLE]) ) + elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_CUSTOM_COMMAND: + if command.has_key('custom_command') and command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART: + self.update_current_status(command['role'], LiveStatus.LIVE_STATUS) + #self.update_config_staleness(command['role'], False) + logger.info("After EXECUTION_COMMAND (RESTART), current state of " + command[self.ROLE] + " to " + + self.get_current_status(command[self.ROLE]) ) + elif status == ActionQueue.FAILED_STATUS: + if command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_INSTALL: + self.update_current_status(command[self.ROLE], self.INSTALL_FAILED) + logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) + + ", current state of " + command[self.ROLE] + " to " + + self.get_current_status(command[self.ROLE])) + + def process_execution_command(self, command): + """ + Change desired state of the component depending on the execution command triggered. """ if not self.enabled(): return + + if not self.COMMAND_TYPE in command or not command[self.COMMAND_TYPE] == ActionQueue.EXECUTION_COMMAND: + return + + if not self.ROLE in command: + return - logger.debug("Inspecting command to store/update details") - if self.COMMAND_TYPE in command and command[self.COMMAND_TYPE] == ActionQueue.STATUS_COMMAND: - payloadLevel = self.PAYLOAD_LEVEL_DEFAULT - if self.PAYLOAD_LEVEL in command: - payloadLevel = command[self.PAYLOAD_LEVEL] - - component = command[self.COMPONENT_NAME] - self.update_desired_status(component, command[self.DESIRED_STATE]) - self.update_config_staleness(component, command[self.HAS_STALE_CONFIG]) - - if payloadLevel == self.PAYLOAD_LEVEL_EXECUTION_COMMAND: - if self.EXECUTION_COMMAND_DETAILS in command: - # Store the execution command details - self.remove_command(component) - self.add_command(component, command[self.EXECUTION_COMMAND_DETAILS]) - logger.debug("Stored command details for " + component) - else: - logger.warn("Expected field " + self.EXECUTION_COMMAND_DETAILS + " unavailable.") - pass - pass - - - def get_install_command(self, component): - if self.paused: - logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.") - return None - - if self.enabled(): - logger.debug("Using stored INSTALL command for %s", component) - if self.command_exists(component, ActionQueue.EXECUTION_COMMAND): - command = copy.deepcopy(self.stored_exec_commands[component]) - command[self.ROLE_COMMAND] = "INSTALL" - command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND - command[self.TASK_ID] = self.get_unique_task_id() - return command - else: - logger.info("INSTALL command cannot be computed as details are not received from Server.") - else: - logger.info("Recovery is not enabled. INSTALL command will not be computed.") - return None - pass - - def get_stop_command(self, component): - if self.paused: - logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.") - return None - - if self.enabled(): - logger.debug("Using stored STOP command for %s", component) - if self.command_exists(component, ActionQueue.EXECUTION_COMMAND): - command = copy.deepcopy(self.stored_exec_commands[component]) - command[self.ROLE_COMMAND] = "STOP" - command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND - command[self.TASK_ID] = self.get_unique_task_id() - return command - else: - logger.info("STOP command cannot be computed as details are not received from Server.") - else: - logger.info("Recovery is not enabled. STOP command will not be computed.") - return None - pass - - def get_restart_command(self, component): - if self.paused: - logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.") - return None - - if self.enabled(): - logger.debug("Using stored INSTALL command for %s", component) - if self.command_exists(component, ActionQueue.EXECUTION_COMMAND): - command = copy.deepcopy(self.stored_exec_commands[component]) - command[self.ROLE_COMMAND] = "CUSTOM_COMMAND" - command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND - command[self.TASK_ID] = self.get_unique_task_id() - command[self.HOST_LEVEL_PARAMS]['custom_command'] = 'RESTART' - return command - else: - logger.info("RESTART command cannot be computed as details are not received from Server.") - else: - logger.info("Recovery is not enabled. RESTART command will not be computed.") - return None - pass - - - def get_start_command(self, component): + if command[self.ROLE_COMMAND] in (ActionQueue.ROLE_COMMAND_INSTALL, ActionQueue.ROLE_COMMAND_STOP) \ + and self.configured_for_recovery(command[self.ROLE]): + self.update_desired_status(command[self.ROLE], LiveStatus.DEAD_STATUS) + logger.info("Received EXECUTION_COMMAND (STOP/INSTALL), desired state of " + command[self.ROLE] + " to " + + self.get_desired_status(command[self.ROLE]) ) + elif command[self.ROLE_COMMAND] == ActionQueue.ROLE_COMMAND_START \ + and self.configured_for_recovery(command[self.ROLE]): + self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS) + logger.info("Received EXECUTION_COMMAND (START), desired state of " + command[self.ROLE] + " to " + + self.get_desired_status(command[self.ROLE]) ) + elif command.has_key('custom_command') and \ + command['custom_command'] == ActionQueue.CUSTOM_COMMAND_RESTART \ + and self.configured_for_recovery(command[self.ROLE]): + self.update_desired_status(command[self.ROLE], LiveStatus.LIVE_STATUS) + logger.info("Received EXECUTION_COMMAND (RESTART), desired state of " + command[self.ROLE] + " to " + + self.get_desired_status(command[self.ROLE]) ) + + def get_command(self, component, command_name): + """ + Get command dictionary by component name and command_name + """ if self.paused: logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.") return None if self.enabled(): - logger.debug("Using stored START command for %s", component) - if self.command_exists(component, ActionQueue.EXECUTION_COMMAND): - command = copy.deepcopy(self.stored_exec_commands[component]) - command[self.ROLE_COMMAND] = "START" - command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND - command[self.TASK_ID] = self.get_unique_task_id() - return command - else: - logger.info("START command cannot be computed as details are not received from Server.") + command_id = self.get_unique_task_id() + command = { + self.CLUSTER_ID: self.cluster_id, + self.ROLE_COMMAND: command_name, + self.COMMAND_TYPE: ActionQueue.AUTO_EXECUTION_COMMAND, + self.TASK_ID: command_id, + self.ROLE: component, + self.COMMAND_ID: command_id + } + return command else: logger.info("Recovery is not enabled. START command will not be computed.") return None - pass - - - def command_exists(self, component, command_type): - if command_type == ActionQueue.EXECUTION_COMMAND: - self.remove_stale_command(component) - if component in self.stored_exec_commands: - return True - - return False - pass + def get_restart_command(self, component): + command = self.get_command(component, "CUSTOM_COMMAND") - def remove_stale_command(self, component): - component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component) - if component in self.stored_exec_commands: - insert_time = self.stored_exec_commands[component_update_key] - age = self._now_() - insert_time - if self.COMMAND_REFRESH_DELAY_SEC < age: - logger.debug("Removing stored command for component : " + str(component) + " as its " + str(age) + " sec old") - self.remove_command(component) - pass + if command is not None: + command[self.ROLE_COMMAND] = "CUSTOM_COMMAND" + command['custom_command'] = 'RESTART' + return command - def remove_command(self, component): - if component in self.stored_exec_commands: - self.__status_lock.acquire() - try: - component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component) - del self.stored_exec_commands[component] - del self.stored_exec_commands[component_update_key] - logger.debug("Removed stored command for component : " + str(component)) - return True - finally: - self.__status_lock.release() - return False - + def get_install_command(self, component): + return self.get_command(component, "INSTALL") - def add_command(self, component, command): - self.__status_lock.acquire() - try: - component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component) - self.stored_exec_commands[component] = command - self.stored_exec_commands[component_update_key] = self._now_() - logger.debug("Added command for component : " + str(component)) - finally: - self.__status_lock.release() + def get_stop_command(self, component): + return self.get_command(component, "STOP") + def get_start_command(self, component): + return self.get_command(component, "START") def _read_int_(self, value, default_value=0): int_value = default_value http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py index 788d381..364d8af 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py @@ -32,8 +32,9 @@ class MetadataEventListener(EventListener): """ Listener of Constants.METADATA_TOPIC events from server. """ - def __init__(self, configuration_cache): - self.topology_cache = configuration_cache + def __init__(self, metadata_cache, recovery_manager): + self.metadata_cache = metadata_cache + self.recovery_manager = recovery_manager def on_event(self, headers, message): """ @@ -46,8 +47,16 @@ class MetadataEventListener(EventListener): if message == {}: return - self.topology_cache.rewrite_cache(message['clusters']) - self.topology_cache.hash = message['hash'] + self.metadata_cache.rewrite_cache(message['clusters']) + self.metadata_cache.hash = message['hash'] + + # FIXME: Recovery manager does not support multiple cluster as of now. + cluster_id = message['clusters'].keys()[0] + + if 'recoveryConfig' in message['clusters'][cluster_id]: + logging.info("Updating recoveryConfig from metadata") + self.recovery_manager.update_recovery_config(self.metadata_cache[cluster_id]) + self.recovery_manager.cluster_id = cluster_id def get_handled_path(self): return Constants.METADATA_TOPIC \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py index f53097f..c41f87e 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py @@ -104,12 +104,14 @@ class TestAgentStompResponses(BaseStompServerTestCase): metadata_subscribe_frame = self.server.frames_queue.get() topologies_subscribe_frame = self.server.frames_queue.get() heartbeat_frame = self.server.frames_queue.get() - dn_start_in_progress_frame = json.loads(self.server.frames_queue.get().body) - dn_start_failed_frame = json.loads(self.server.frames_queue.get().body) - zk_start_in_progress_frame = json.loads(self.server.frames_queue.get().body) - zk_start_failed_frame = json.loads(self.server.frames_queue.get().body) + dn_install_in_progress_frame = json.loads(self.server.frames_queue.get().body) + dn_install_failed_frame = json.loads(self.server.frames_queue.get().body) + zk_install_in_progress_frame = json.loads(self.server.frames_queue.get().body) + zk_install_failed_frame = json.loads(self.server.frames_queue.get().body) action_status_in_progress_frame = json.loads(self.server.frames_queue.get().body) action_status_failed_frame = json.loads(self.server.frames_queue.get().body) + dn_recovery_in_progress_frame = json.loads(self.server.frames_queue.get().body) + dn_recovery_failed_frame = json.loads(self.server.frames_queue.get().body) host_status_report = json.loads(self.server.frames_queue.get().body) initializer_module.stop_event.set() @@ -129,10 +131,13 @@ class TestAgentStompResponses(BaseStompServerTestCase): self.assertEquals(initializer_module.topology_cache['0']['hosts'][0]['hostName'], 'c6401.ambari.apache.org') self.assertEquals(initializer_module.metadata_cache['0']['status_commands_to_run'], ('STATUS',)) self.assertEquals(initializer_module.configurations_cache['0']['configurations']['zoo.cfg']['clientPort'], '2181') - self.assertEquals(dn_start_in_progress_frame[0]['roleCommand'], 'START') - self.assertEquals(dn_start_in_progress_frame[0]['role'], 'DATANODE') - self.assertEquals(dn_start_in_progress_frame[0]['status'], 'IN_PROGRESS') - self.assertEquals(dn_start_failed_frame[0]['status'], 'FAILED') + self.assertEquals(dn_install_in_progress_frame[0]['roleCommand'], 'INSTALL') + self.assertEquals(dn_install_in_progress_frame[0]['role'], 'DATANODE') + self.assertEquals(dn_install_in_progress_frame[0]['status'], 'IN_PROGRESS') + self.assertEquals(dn_install_failed_frame[0]['status'], 'FAILED') + self.assertEquals(dn_recovery_in_progress_frame[0]['roleCommand'], 'INSTALL') + self.assertEquals(dn_recovery_in_progress_frame[0]['role'], 'DATANODE') + self.assertEquals(dn_recovery_in_progress_frame[0]['status'], 'IN_PROGRESS') #============================================================================================ #============================================================================================ @@ -253,7 +258,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): self.assertEquals(json_topology, json_excepted_lopology) #self.assertEquals(initializer_module.topology_cache, self.get_dict_from_file("topology_cache_expected.json")) - self.assert_with_retries(is_json_equal, tries=40, try_sleep=0.1) + self.assert_with_retries(is_json_equal, tries=80, try_sleep=0.1) initializer_module.stop_event.set() http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json index 6e84319..075699e 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/execution_commands.json @@ -9,8 +9,7 @@ "serviceName":"HDFS", "role":"DATANODE", "commandType":"EXECUTION_COMMAND", - "roleCommand":"START", - "clusterName": "c1", + "roleCommand":"INSTALL", "clusterId": "0", "configuration_credentials":{ @@ -19,7 +18,7 @@ "service_package_folder":"common-services/HDFS/2.1.0.2.0/package", "hooks_folder":"HDP/2.0.6/hooks", "script":"scripts/datanode.py", - "phase":"INITIAL_START", + "phase":"INITIAL_INSTALL", "max_duration_for_retries":"600", "command_retry_enabled":"false", "command_timeout":"1200", @@ -35,8 +34,7 @@ "serviceName":"ZOOKEEPER", "role":"ZOOKEEPER_SERVER", "commandType":"EXECUTION_COMMAND", - "roleCommand":"START", - "clusterName": "c1", + "roleCommand":"INSTALL", "configuration_credentials":{ }, @@ -44,7 +42,7 @@ "service_package_folder":"common-services/ZOOKEEPER/3.4.5/package", "hooks_folder":"HDP/2.0.6/hooks", "script":"scripts/datanode.py", - "phase":"INITIAL_START", + "phase":"INITIAL_INSTALL", "max_duration_for_retries":"600", "command_retry_enabled":"false", "command_timeout":"1200", http://git-wip-us.apache.org/repos/asf/ambari/blob/670a08ee/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json index f60b49a..6462ccf 100644 --- a/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json +++ b/ambari-agent/src/test/python/ambari_agent/dummy_files/stomp/metadata_after_registration.json @@ -25,7 +25,8 @@ "ambari_db_rca_driver": "org.postgresql.Driver", "java_home": "/usr/jdk64/jdk1.8.0_112", "user_list": "[\"zookeeper\",\"ambari-qa\",\"hdfs\",\"yarn\",\"mapred\"]", - "hooks_folder": "HDP/2.0.6/hooks" + "hooks_folder": "HDP/2.0.6/hooks", + "cluster_name": "cl1" }, "serviceLevelParams": { "HDFS": { @@ -37,7 +38,14 @@ }, "status_commands_to_run": [ "STATUS" - ] + ], + "recoveryConfig": { + "type" : "AUTO_INSTALL_START", + "maxCount" : 10, + "windowInMinutes" : 60, + "components" : "NAMENODE,DATANODE", + "recoveryTimestamp" : 1458150424380 + } } } } \ No newline at end of file
