Repository: ambari Updated Branches: refs/heads/trunk 43ae930b0 -> a3a0ae041
AMBARI-10029. Node recovery support - phase 2 Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a3a0ae04 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a3a0ae04 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a3a0ae04 Branch: refs/heads/trunk Commit: a3a0ae041a43aaa1ffdc8f9247338bb68dd5675f Parents: 43ae930 Author: Sumit Mohanty <[email protected]> Authored: Tue Apr 28 17:00:08 2015 -0700 Committer: Sumit Mohanty <[email protected]> Committed: Tue Apr 28 17:48:29 2015 -0700 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 2 +- .../src/main/python/ambari_agent/Controller.py | 5 +- .../src/main/python/ambari_agent/LiveStatus.py | 8 +- .../main/python/ambari_agent/RecoveryManager.py | 171 +++++++++++++++---- .../test/python/ambari_agent/TestActionQueue.py | 2 +- .../test/python/ambari_agent/TestController.py | 10 +- .../test/python/ambari_agent/TestLiveStatus.py | 10 +- .../python/ambari_agent/TestRecoveryManager.py | 86 +++++++++- .../server/actionmanager/ActionScheduler.java | 21 +++ .../apache/ambari/server/agent/ActionQueue.java | 32 ++++ .../ambari/server/agent/HeartBeatHandler.java | 6 + .../ambari/server/agent/HeartBeatResponse.java | 11 ++ .../ambari/server/agent/HeartbeatMonitor.java | 1 + .../ambari/server/agent/StatusCommand.java | 11 ++ .../ambari/server/agent/TestActionQueue.java | 118 ++++++++++++- .../server/agent/TestHeartbeatMonitor.java | 10 +- 16 files changed, 454 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index c7286bc..bdaefd0 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -400,7 +400,7 @@ class ActionQueue(threading.Thread): if component_status_result.has_key('structuredOut'): component_extra = component_status_result['structuredOut'] - result = livestatus.build(forsed_component_status= component_status) + result = livestatus.build(forced_component_status= component_status) if self.controller.recovery_manager.enabled(): result['sendExecCmdDet'] = str(request_execution_cmd) http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 959c213..9ebb83a 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -233,7 +233,6 @@ class Controller(threading.Thread): logger.debug("Sending Heartbeat (id = %s): %s", self.responseId, data) response = self.sendRequest(self.heartbeatUrl, data) - exitStatus = 0 if 'exitstatus' in response.keys(): exitStatus = int(response['exitstatus']) @@ -248,6 +247,10 @@ class Controller(threading.Thread): if 'hasMappedComponents' in response.keys(): self.hasMappedComponents = response['hasMappedComponents'] is not False + if 'hasPendingTasks' in response.keys(): + self.recovery_manager.set_paused(response['hasPendingTasks']) + + if 'registrationCommand' in response.keys(): # check if the registration command is None. If none skip if response['registrationCommand'] is not None: http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-agent/src/main/python/ambari_agent/LiveStatus.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py index ec01ee7..c56f6f3 100644 --- a/ambari-agent/src/main/python/ambari_agent/LiveStatus.py +++ b/ambari-agent/src/main/python/ambari_agent/LiveStatus.py @@ -51,16 +51,16 @@ class LiveStatus: #TODO: Should also check belonging of server to cluster return component['serviceName'] == self.service - def build(self, forsed_component_status = None): + def build(self, forced_component_status = None): """ - If forsed_component_status is explicitly defined, than StatusCheck methods are + If forced_component_status is explicitly defined, than StatusCheck methods are not used. This feature has been added to support custom (ver 2.0) services. """ global SERVICES, CLIENT_COMPONENTS, COMPONENTS, LIVE_STATUS, DEAD_STATUS component = {"serviceName" : self.service, "componentName" : self.component} - if forsed_component_status: # If already determined - status = forsed_component_status # Nothing to do + if forced_component_status: # If already determined + status = forced_component_status # Nothing to do elif component in self.CLIENT_COMPONENTS: status = self.DEAD_STATUS # CLIENT components can't have status STARTED elif component in self.COMPONENTS: http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py index 3deec2b..e2c5e98 100644 --- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py +++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py @@ -43,6 +43,7 @@ class RecoveryManager: ROLE = "role" TASK_ID = "taskId" DESIRED_STATE = "desiredState" + HAS_STALE_CONFIG = "hasStaleConfigs" EXECUTION_COMMAND_DETAILS = "executionCommandDetails" ROLE_COMMAND = "roleCommand" PAYLOAD_LEVEL_DEFAULT = "DEFAULT" @@ -51,6 +52,8 @@ class RecoveryManager: STARTED = "STARTED" INSTALLED = "INSTALLED" INIT = "INIT" # TODO: What is the state when machine is reset + COMPONENT_UPDATE_KEY_FORMAT = "{0}_UPDATE_TIME" + COMMAND_REFRESH_DELAY_SEC = 600 #10 minutes default_action_counter = { "lastAttempt": 0, @@ -62,6 +65,12 @@ class RecoveryManager: "warnedThresholdReached": False } + default_component_status = { + "current": "", + "desired": "", + "stale_config": False + } + def __init__(self, recovery_enabled=False, auto_start_only=False): self.recovery_enabled = recovery_enabled @@ -78,16 +87,42 @@ class RecoveryManager: self.actions = {} self.statuses = {} self.__status_lock = threading.RLock() + self.__command_lock = threading.RLock() + self.paused = False self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only) pass + def set_paused(self, paused): + if self.paused != paused: + logger.debug("RecoveryManager is transitioning from isPaused = " + str(self.paused) + " to " + str(paused)) + self.paused = paused + def enabled(self): return self.recovery_enabled + def update_config_staleness(self, component, is_config_stale): + """ + Updates staleness of config + """ + if component not in self.statuses: + self.__status_lock.acquire() + try: + if component not in self.statuses: + self.statuses[component] = copy.deepcopy(self.default_component_status) + finally: + self.__status_lock.release() + pass + + self.statuses[component]["stale_config"] = is_config_stale + if self.statuses[component]["current"] == self.statuses[component]["desired"] and \ + self.statuses[component]["stale_config"] == False: + self.remove_command(component) + pass + def update_current_status(self, component, state): """ Updates the current status of a host component managed by the agent @@ -96,15 +131,15 @@ class RecoveryManager: self.__status_lock.acquire() try: if component not in self.statuses: - self.statuses[component] = { - "current": "", - "desired": "" - } + self.statuses[component] = copy.deepcopy(self.default_component_status) finally: self.__status_lock.release() pass self.statuses[component]["current"] = state + if self.statuses[component]["current"] == self.statuses[component]["desired"] and \ + self.statuses[component]["stale_config"] == False: + self.remove_command(component) pass @@ -116,15 +151,15 @@ class RecoveryManager: self.__status_lock.acquire() try: if component not in self.statuses: - self.statuses[component] = { - "current": "", - "desired": "" - } + self.statuses[component] = copy.deepcopy(self.default_component_status) finally: self.__status_lock.release() pass self.statuses[component]["desired"] = state + if self.statuses[component]["current"] == self.statuses[component]["desired"] and \ + self.statuses[component]["stale_config"] == False: + self.remove_command(component) pass @@ -133,7 +168,7 @@ class RecoveryManager: Recovery is allowed for: INISTALLED --> STARTED INIT --> INSTALLED --> STARTED - CLIENTs may be RE-INSTALLED (TODO) + RE-INSTALLED (if configs do not match) """ if not self.enabled(): return False @@ -141,17 +176,18 @@ class RecoveryManager: if component not in self.statuses: return False - status = self.statuses[component] - if status["current"] == status["desired"]: - return False + if self.auto_start_only: + status = self.statuses[component] + if status["current"] == status["desired"]: + return False + else: + status = self.statuses[component] + if status["current"] == status["desired"] and status['stale_config'] == False: + return False if status["desired"] not in self.allowed_desired_states or status["current"] not in self.allowed_current_states: return False - ### No recovery to INSTALLED or INIT states - if status["current"] == self.STARTED: - return False - logger.info("%s needs recovery.", component) return True pass @@ -213,14 +249,29 @@ class RecoveryManager: for component in self.statuses.keys(): if self.requires_recovery(component) and self.may_execute(component): status = copy.deepcopy(self.statuses[component]) - if status["desired"] == self.STARTED: - if status["current"] == self.INSTALLED: - command = self.get_start_command(component) - elif status["current"] == self.INIT: - command = self.get_install_command(component) - elif status["desired"] == self.INSTALLED: - if status["current"] == self.INIT: - command = self.get_install_command(component) + command = None + if self.auto_start_only: + if status["desired"] == self.STARTED: + if status["current"] == self.INSTALLED: + command = self.get_start_command(component) + else: + # START, INSTALL, RESTART + if status["desired"] != status["current"]: + if status["desired"] == self.STARTED: + if status["current"] == self.INSTALLED: + command = self.get_start_command(component) + elif status["current"] == self.INIT: + command = self.get_install_command(component) + elif status["desired"] == self.INSTALLED: + if status["current"] == self.INIT: + command = self.get_install_command(component) + # else issue a STOP command + else: + if status["current"] == self.INSTALLED: + command = self.get_install_command(component) + elif status["current"] == self.STARTED: + command = self.get_restart_command(component) + if command: self.execute(component) commands.append(command) @@ -417,7 +468,7 @@ class RecoveryManager: self.max_lifetime_count = max_lifetime_count self.allowed_desired_states = [self.STARTED, self.INSTALLED] - self.allowed_current_states = [self.INIT, self.INSTALLED] + self.allowed_current_states = [self.INIT, self.INSTALLED, self.STARTED] if self.auto_start_only: self.allowed_desired_states = [self.STARTED] @@ -481,12 +532,13 @@ class RecoveryManager: component = command[self.COMPONENT_NAME] self.update_desired_status(component, command[self.DESIRED_STATE]) + self.update_config_staleness(component, command[self.HAS_STALE_CONFIG]) if payloadLevel == self.PAYLOAD_LEVEL_EXECUTION_COMMAND: if self.EXECUTION_COMMAND_DETAILS in command: # Store the execution command details self.remove_command(component) - self.stored_exec_commands[component] = command[self.EXECUTION_COMMAND_DETAILS] + self.add_command(component, command[self.EXECUTION_COMMAND_DETAILS]) logger.debug("Stored command details for " + component) else: logger.warn("Expected field " + self.EXECUTION_COMMAND_DETAILS + " unavailable.") @@ -495,6 +547,10 @@ class RecoveryManager: def get_install_command(self, component): + if self.paused: + logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.") + return None + if self.enabled(): logger.debug("Using stored INSTALL command for %s", component) if self.command_exists(component, ActionQueue.EXECUTION_COMMAND): @@ -504,14 +560,39 @@ class RecoveryManager: command[self.TASK_ID] = self.get_unique_task_id() return command else: - logger.info("INSTALL command cannot be computed.") + logger.info("INSTALL command cannot be computed as details are not received from Server.") else: logger.info("Recovery is not enabled. INSTALL command will not be computed.") return None pass + def get_restart_command(self, component): + if self.paused: + logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.") + return None + + if self.enabled(): + logger.debug("Using stored INSTALL command for %s", component) + if self.command_exists(component, ActionQueue.EXECUTION_COMMAND): + command = copy.deepcopy(self.stored_exec_commands[component]) + command[self.ROLE_COMMAND] = "CUSTOM_COMMAND" + command[self.COMMAND_TYPE] = ActionQueue.AUTO_EXECUTION_COMMAND + command[self.TASK_ID] = self.get_unique_task_id() + command['hostLevelParams']['custom_command'] = 'RESTART' + return command + else: + logger.info("RESTART command cannot be computed as details are not received from Server.") + else: + logger.info("Recovery is not enabled. RESTART command will not be computed.") + return None + pass + def get_start_command(self, component): + if self.paused: + logger.info("Recovery is paused, likely tasks waiting in pipeline for this host.") + return None + if self.enabled(): logger.debug("Using stored START command for %s", component) if self.command_exists(component, ActionQueue.EXECUTION_COMMAND): @@ -521,7 +602,7 @@ class RecoveryManager: command[self.TASK_ID] = self.get_unique_task_id() return command else: - logger.info("START command cannot be computed.") + logger.info("START command cannot be computed as details are not received from Server.") else: logger.info("Recovery is not enabled. START command will not be computed.") @@ -531,6 +612,7 @@ class RecoveryManager: def command_exists(self, component, command_type): if command_type == ActionQueue.EXECUTION_COMMAND: + self.remove_stale_command(component) if component in self.stored_exec_commands: return True @@ -538,13 +620,42 @@ class RecoveryManager: pass + def remove_stale_command(self, component): + component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component) + if component in self.stored_exec_commands: + insert_time = self.stored_exec_commands[component_update_key] + age = self._now_() - insert_time + if self.COMMAND_REFRESH_DELAY_SEC < age: + logger.debug("Removing stored command for component : " + str(component) + " as its " + str(age) + " sec old") + self.remove_command(component) + pass + + def remove_command(self, component): if component in self.stored_exec_commands: - del self.stored_exec_commands[component] - return True + self.__status_lock.acquire() + try: + component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component) + del self.stored_exec_commands[component] + del self.stored_exec_commands[component_update_key] + logger.debug("Removed stored command for component : " + str(component)) + return True + finally: + self.__status_lock.release() return False + def add_command(self, component, command): + self.__status_lock.acquire() + try: + component_update_key = self.COMPONENT_UPDATE_KEY_FORMAT.format(component) + self.stored_exec_commands[component] = command + self.stored_exec_commands[component_update_key] = self._now_() + logger.debug("Added command for component : " + str(component)) + finally: + self.__status_lock.release() + + def _read_int_(self, value, default_value=0): int_value = default_value try: http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index 6aab74e..ee18c38 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -678,9 +678,9 @@ class TestActionQueue(TestCase): gpeo_mock.return_value = 1 config.get_parallel_exec_option = gpeo_mock actionQueue = ActionQueue(config, dummy_controller) - actionQueue.start() actionQueue.put([self.datanode_install_command, self.hbase_install_command]) self.assertEqual(2, actionQueue.commandQueue.qsize()) + actionQueue.start() time.sleep(1) actionQueue.stop() actionQueue.join() http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-agent/src/test/python/ambari_agent/TestController.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py index f379fbf..e00e737 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestController.py +++ b/ambari-agent/src/test/python/ambari_agent/TestController.py @@ -675,9 +675,16 @@ class TestController(unittest.TestCase): self.controller.recovery_manager.process_execution_commands = process_execution_commands process_status_commands = MagicMock(name="process_status_commands") self.controller.recovery_manager.process_status_commands = process_status_commands + set_paused = MagicMock(name = "set_paused") + self.controller.recovery_manager.set_paused = set_paused self.controller.responseId = 0 - response = {"responseId":1, "statusCommands": "commands2", "executionCommands" : "commands1", "log":"", "exitstatus":"0"} + response = {"responseId":1, + "statusCommands": "commands2", + "executionCommands" : "commands1", + "log":"", + "exitstatus":"0", + "hasPendingTasks": True} sendRequest.return_value = response def one_heartbeat(*args, **kwargs): @@ -697,6 +704,7 @@ class TestController(unittest.TestCase): self.assertTrue(process_status_commands.called) process_execution_commands.assert_called_with("commands1") process_status_commands.assert_called_with("commands2") + set_paused.assert_called_with(True) self.controller.heartbeatWithServer() sys.stdout = sys.__stdout__ http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py b/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py index 24fad3c..e9bb776 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py +++ b/ambari-agent/src/test/python/ambari_agent/TestLiveStatus.py @@ -165,20 +165,20 @@ class TestLiveStatus(TestCase): result = livestatus.build() self.assertTrue(len(result) > 0, 'Livestatus should not be empty') self.assertTrue(result.has_key('configurationTags')) - # Test build status with forsed_component_status + # Test build status with forced_component_status ## Alive livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {}) - result = livestatus.build(forsed_component_status = LiveStatus.LIVE_STATUS) + result = livestatus.build(forced_component_status = LiveStatus.LIVE_STATUS) self.assertTrue(len(result) > 0, 'Livestatus should not be empty') self.assertTrue(result['status'], LiveStatus.LIVE_STATUS) ## Dead livestatus = LiveStatus('c1', 'HDFS', 'HDFS_CLIENT', { }, config, {}) - result = livestatus.build(forsed_component_status = LiveStatus.DEAD_STATUS) + result = livestatus.build(forced_component_status = LiveStatus.DEAD_STATUS) self.assertTrue(len(result) > 0, 'Livestatus should not be empty') self.assertTrue(result['status'], LiveStatus.DEAD_STATUS) livestatus = LiveStatus('c1', 'TEZ', 'TEZ_CLIENT', { }, config, {}) - result = livestatus.build(forsed_component_status = LiveStatus.LIVE_STATUS) + result = livestatus.build(forced_component_status = LiveStatus.LIVE_STATUS) self.assertTrue(len(result) > 0, 'Livestatus should not be empty') self.assertTrue(result['status'], LiveStatus.LIVE_STATUS) @@ -197,7 +197,7 @@ class TestLiveStatus(TestCase): 'SOME_UNKNOWN_COMPONENT', {}, config, {}) livestatus.versionsHandler.versionsFilePath = "ambari_agent" + \ os.sep + "dummy_files" + os.sep + "dummy_current_stack" - result = livestatus.build(forsed_component_status = "STARTED") + result = livestatus.build(forced_component_status = "STARTED") result_str = pprint.pformat(result) self.assertEqual(result_str, "{'clusterName': '',\n " http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py index 4cc2c0b..bd7c96b 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py +++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py @@ -31,10 +31,13 @@ class TestRecoveryManager(TestCase): "payloadLevel": "EXECUTION_COMMAND", "componentName": "NODEMANAGER", "desiredState": "STARTED", + "hasStaleConfigs": False, "executionCommandDetails": { "commandType": "EXECUTION_COMMAND", "roleCommand": "INSTALL", "role": "NODEMANAGER", + "hostLevelParams": { + "custom_command":""}, "configurations": { "capacity-scheduler": { "yarn.scheduler.capacity.default.minimum-user-limit-percent": "100"}, @@ -227,7 +230,7 @@ class TestRecoveryManager(TestCase): rm.update_current_status("NODEMANAGER", "STARTED") rm.update_desired_status("NODEMANAGER", "INSTALLED") - self.assertFalse(rm.requires_recovery("NODEMANAGER")) + self.assertTrue(rm.requires_recovery("NODEMANAGER")) rm.update_desired_status("NODEMANAGER", "STARTED") self.assertFalse(rm.requires_recovery("NODEMANAGER")) @@ -296,14 +299,26 @@ class TestRecoveryManager(TestCase): self.assertEqual(None, rm.get_install_command("component2")) self.assertEqual(None, rm.get_start_command("component2")) + rm.store_or_update_command(command1) + self.assertTrue(rm.command_exists("NODEMANAGER", "EXECUTION_COMMAND")) + rm.set_paused(True) + + self.assertEqual(None, rm.get_install_command("NODEMANAGER")) + self.assertEqual(None, rm.get_start_command("NODEMANAGER")) + pass @patch.object(RecoveryManager, "_now_") def test_get_recovery_commands(self, time_mock): time_mock.side_effect = \ - [1000, 2000, 3000, 4000, 5000, 6000] + [1000, 1001, 1002, 1003, + 1100, 1101, 1102, + 1200, 1201, 1203, + 4000, 4001, 4002, 4003, + 4100, 4101, 4102, 4103, + 4200, 4201, 4202] rm = RecoveryManager(True) - rm.update_config(10, 5, 1, 11, True, False) + rm.update_config(15, 5, 1, 16, True, False) command1 = copy.deepcopy(self.command) @@ -319,6 +334,7 @@ class TestRecoveryManager(TestCase): rm.update_current_status("NODEMANAGER", "INIT") rm.update_desired_status("NODEMANAGER", "STARTED") + # Starts at 1100 commands = rm.get_recovery_commands() self.assertEqual(1, len(commands)) self.assertEqual("INSTALL", commands[0]["roleCommand"]) @@ -326,6 +342,7 @@ class TestRecoveryManager(TestCase): rm.update_current_status("NODEMANAGER", "INIT") rm.update_desired_status("NODEMANAGER", "INSTALLED") + # Starts at 1200 commands = rm.get_recovery_commands() self.assertEqual(1, len(commands)) self.assertEqual("INSTALL", commands[0]["roleCommand"]) @@ -336,6 +353,36 @@ class TestRecoveryManager(TestCase): commands = rm.get_recovery_commands() self.assertEqual(0, len(commands)) + + rm.update_config(12, 5, 1, 15, True, False) + rm.update_current_status("NODEMANAGER", "INIT") + rm.update_desired_status("NODEMANAGER", "INSTALLED") + + rm.store_or_update_command(command1) + commands = rm.get_recovery_commands() + self.assertEqual(1, len(commands)) + self.assertEqual("INSTALL", commands[0]["roleCommand"]) + + rm.update_config_staleness("NODEMANAGER", False) + rm.update_current_status("NODEMANAGER", "INSTALLED") + rm.update_desired_status("NODEMANAGER", "INSTALLED") + commands = rm.get_recovery_commands() + self.assertEqual(0, len(commands)) + + command_install = copy.deepcopy(self.command) + command_install["desiredState"] = "INSTALLED" + rm.store_or_update_command(command_install) + rm.update_config_staleness("NODEMANAGER", True) + commands = rm.get_recovery_commands() + self.assertEqual(1, len(commands)) + self.assertEqual("INSTALL", commands[0]["roleCommand"]) + + rm.update_current_status("NODEMANAGER", "STARTED") + rm.update_desired_status("NODEMANAGER", "STARTED") + commands = rm.get_recovery_commands() + self.assertEqual(1, len(commands)) + self.assertEqual("CUSTOM_COMMAND", commands[0]["roleCommand"]) + self.assertEqual("RESTART", commands[0]["hostLevelParams"]["custom_command"]) pass @patch.object(RecoveryManager, "update_config") @@ -427,4 +474,37 @@ class TestRecoveryManager(TestCase): {"name": "LION", "numAttempts": 4, "limitReached": True}, {"name": "PUMA", "numAttempts": 4, "limitReached": True} ]}) + pass + + @patch.object(RecoveryManager, "_now_") + def test_command_expiry(self, time_mock): + time_mock.side_effect = \ + [1000, 1001, 1002, 1003, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812] + + rm = RecoveryManager(True) + rm.update_config(5, 5, 1, 11, True, False) + + command1 = copy.deepcopy(self.command) + + rm.store_or_update_command(command1) + + rm.update_current_status("NODEMANAGER", "INSTALLED") + rm.update_desired_status("NODEMANAGER", "STARTED") + + commands = rm.get_recovery_commands() + self.assertEqual(1, len(commands)) + self.assertEqual("START", commands[0]["roleCommand"]) + + commands = rm.get_recovery_commands() + self.assertEqual(1, len(commands)) + self.assertEqual("START", commands[0]["roleCommand"]) + + #1807 command is stale + commands = rm.get_recovery_commands() + self.assertEqual(0, len(commands)) + + rm.store_or_update_command(command1) + commands = rm.get_recovery_commands() + self.assertEqual(1, len(commands)) + self.assertEqual("START", commands[0]["roleCommand"]) pass \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java index 76f6a8b..4f51fbc 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionScheduler.java @@ -222,6 +222,7 @@ class ActionScheduler implements Runnable { LOG.debug("There are no stages currently in progress."); } + actionQueue.updateListOfHostsWithPendingTask(null); return; } @@ -238,12 +239,17 @@ class ActionScheduler implements Runnable { LOG.debug("There are no stages currently in progress."); } + actionQueue.updateListOfHostsWithPendingTask(null); return; } int i_stage = 0; + HashSet<String> hostsWithTasks = getListOfHostsWithPendingTask(stages); + actionQueue.updateListOfHostsWithPendingTask(hostsWithTasks); + stages = filterParallelPerHostStages(stages); + // At this point the stages is a filtered list boolean exclusiveRequestIsGoing = false; // This loop greatly depends on the fact that order of stages in @@ -401,6 +407,21 @@ class ActionScheduler implements Runnable { } /** + * Returns the list of hosts that have a task assigned + * + * @param stages + * + * @return + */ + private HashSet<String> getListOfHostsWithPendingTask(List<Stage> stages) { + HashSet<String> hostsWithTasks = new HashSet<String>(); + for (Stage s : stages) { + hostsWithTasks.addAll(s.getHosts()); + } + return hostsWithTasks; + } + + /** * Returns filtered list of stages following the rule: * 1) remove stages that has the same host. Leave only first stage, the rest that have same host of any operation will be filtered * 2) do not remove stages intersected by host if they have intersection by background command http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java index 2479f37..24640d3 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java @@ -19,6 +19,7 @@ package org.apache.ambari.server.agent; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Queue; @@ -37,8 +38,12 @@ public class ActionQueue { private static Logger LOG = LoggerFactory.getLogger(ActionQueue.class); + private static HashSet<String> EMPTY_HOST_LIST = new HashSet<String>(); + final ConcurrentMap<String, Queue<AgentCommand>> hostQueues; + HashSet<String> hostsWithPendingTask = new HashSet<String>(); + public ActionQueue() { hostQueues = new ConcurrentHashMap<String, Queue<AgentCommand>>(); } @@ -174,4 +179,31 @@ public class ActionQueue { return l; } + + /** + * Update the cache of hosts that have pending tasks + * + * @param hosts + */ + public void updateListOfHostsWithPendingTask(HashSet<String> hosts) { + if (hosts != null) { + hostsWithPendingTask = hosts; + } else if (hostsWithPendingTask.size() > 0) { + hostsWithPendingTask = EMPTY_HOST_LIST; + } + } + + /** + * Checks whether host has pending tasks + * @param hostName + * @return + */ + public boolean hasPendingTask(String hostName) { + HashSet<String> copyHostsWithTaskPending = hostsWithPendingTask; + if (copyHostsWithTaskPending != null) { + return copyHostsWithTaskPending.contains(hostName); + } + + return false; + } } http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java index 1b180eb..ab17db6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java @@ -981,6 +981,7 @@ public class HeartBeatHandler { /** * Annotate the response with some housekeeping details. * hasMappedComponents - indicates if any components are mapped to the host + * hasPendingTasks - indicates if any tasks are pending for the host (they may not be sent yet) * @param hostname * @param response * @throws org.apache.ambari.server.AmbariException @@ -993,6 +994,11 @@ public class HeartBeatHandler { break; } } + + if(actionQueue.hasPendingTask(hostname)) { + LOG.debug("Host " + hostname + " has pending tasks"); + response.setHasPendingTasks(true); + } } /** http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java index 56b4f18..617b04b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java @@ -66,6 +66,9 @@ public class HeartBeatResponse { @SerializedName("hasMappedComponents") private boolean hasMappedComponents = false; + @SerializedName("hasPendingTasks") + private boolean hasPendingTasks = false; + public long getResponseId() { return responseId; } @@ -144,6 +147,14 @@ public class HeartBeatResponse { this.hasMappedComponents = hasMappedComponents; } + public boolean hasPendingTasks() { + return hasPendingTasks; + } + + public void setHasPendingTasks(boolean hasPendingTasks) { + this.hasPendingTasks = hasPendingTasks; + } + public void addExecutionCommand(ExecutionCommand execCmd) { executionCommands.add(execCmd); } http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java index ad55c05..d245a25 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java @@ -304,6 +304,7 @@ public class HeartbeatMonitor implements Runnable { // If Agent wants the command and the States differ statusCmd.setDesiredState(sch.getDesiredState()); + statusCmd.setHasStaleConfigs(configHelper.isStaleConfigs(sch)); if (getAgentRequests().shouldSendExecutionDetails(hostname, componentName)) { LOG.info(componentName + " is at " + sch.getState() + " adding more payload per agent ask"); statusCmd.setPayloadLevel(StatusCommand.StatusCommandPayload.EXECUTION_COMMAND); http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java index 0e08ed8..5dec53c 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/StatusCommand.java @@ -62,6 +62,9 @@ public class StatusCommand extends AgentCommand { @SerializedName("desiredState") private State desiredState; + @SerializedName("hasStaleConfigs") + private Boolean hasStaleConfigs; + @SerializedName("executionCommandDetails") private ExecutionCommand executionCommand; @@ -81,6 +84,14 @@ public class StatusCommand extends AgentCommand { this.desiredState = desiredState; } + public Boolean getHasStaleConfigs() { + return hasStaleConfigs; + } + + public void setHasStaleConfigs(Boolean hasStaleConfigs) { + this.hasStaleConfigs = hasStaleConfigs; + } + public StatusCommandPayload getPayloadLevel() { return payloadLevel; } http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-server/src/test/java/org/apache/ambari/server/agent/TestActionQueue.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestActionQueue.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestActionQueue.java index c4f5b86..e431eee 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestActionQueue.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestActionQueue.java @@ -18,9 +18,11 @@ package org.apache.ambari.server.agent; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import org.apache.ambari.server.agent.AgentCommand.AgentCommandType; @@ -39,7 +41,9 @@ public class TestActionQueue { enum OpType { ENQUEUE, DEQUEUE, - DEQUEUEALL + DEQUEUEALL, + CHECKPENDING, + UPDATEHOSTLIST } private volatile boolean shouldRun = true; @@ -76,8 +80,12 @@ public class TestActionQueue { case DEQUEUE: dequeueOp(); break; - case DEQUEUEALL: - dequeueAllOp(); + case DEQUEUEALL: + dequeueAllOp(); + case CHECKPENDING: + checkPending(); + case UPDATEHOSTLIST: + updateHostList(); break; } } catch (Exception ex) { @@ -86,6 +94,34 @@ public class TestActionQueue { } } + private void checkPending() throws InterruptedException { + while (shouldRun) { + int index = 0; + for (String host: hosts) { + actionQueue.hasPendingTask(host); + opCounts[index]++; + index++; + } + Thread.sleep(1); + } + } + + private void updateHostList() throws InterruptedException { + HashSet<String> hostsWithTasks = new HashSet<String>(); + while (shouldRun) { + for (String host: hosts) { + hostsWithTasks.add(host); + if (hostsWithTasks.size() % 2 == 0) { + actionQueue.updateListOfHostsWithPendingTask(hostsWithTasks); + } else { + actionQueue.updateListOfHostsWithPendingTask(null); + } + opCounts[0]++; + } + Thread.sleep(1); + } + } + private void enqueueOp() throws InterruptedException { while (shouldRun) { int index = 0; @@ -215,6 +251,82 @@ public class TestActionQueue { } } + @Test + public void testConcurrentHostCheck() throws InterruptedException { + ActionQueue aq = new ActionQueue(); + String[] hosts = new String[] { "h0", "h1", "h2", "h3", "h4" }; + + ActionQueueOperation[] hostCheckers = new ActionQueueOperation[threadCount]; + ActionQueueOperation[] hostUpdaters = new ActionQueueOperation[threadCount]; + + List<Thread> producers = new ArrayList<Thread>(); + List<Thread> consumers = new ArrayList<Thread>(); + + for (int i = 0; i < threadCount; i++) { + hostCheckers[i] = new ActionQueueOperation(aq, hosts, + ActionQueueOperation.OpType.CHECKPENDING); + Thread t = new Thread(hostCheckers[i]); + consumers.add(t); + t.start(); + } + + for (int i = 0; i < threadCount; i++) { + hostUpdaters[i] = new ActionQueueOperation(aq, hosts, + ActionQueueOperation.OpType.UPDATEHOSTLIST); + Thread t = new Thread(hostUpdaters[i]); + producers.add(t); + t.start(); + } + + // Run for some time + Thread.sleep(100); + + for (int i = 0; i < threadCount; i++) { + hostUpdaters[i].stop(); + } + + for (Thread producer : producers) { + producer.join(); + } + + for (int i = 0; i < threadCount; i++) { + hostCheckers[i].stop(); + } + + for (Thread consumer : consumers) { + consumer.join(); + } + + int totalChecks = 0; + int totalUpdates = 0; + for (int i = 0; i < threadCount; i++) { + totalChecks += hostUpdaters[i].getOpCounts()[0]; + for (int h = 0; h<hosts.length; h++) { + totalUpdates += hostCheckers[i].getOpCounts()[h]; + } + } + LOG.info("Report: totalChecks: " + totalChecks + ", totalUpdates: " + totalUpdates); + + HashSet<String> hostsWithPendingtasks = new HashSet<String>(); + aq.updateListOfHostsWithPendingTask(hostsWithPendingtasks); + hostsWithPendingtasks.add("h1"); + aq.updateListOfHostsWithPendingTask(hostsWithPendingtasks); + assertTrue(aq.hasPendingTask("h1")); + assertFalse(aq.hasPendingTask("h2")); + + hostsWithPendingtasks.add("h1"); + hostsWithPendingtasks.add("h2"); + aq.updateListOfHostsWithPendingTask(hostsWithPendingtasks); + assertTrue(aq.hasPendingTask("h1")); + assertTrue(aq.hasPendingTask("h2")); + + hostsWithPendingtasks.clear(); + hostsWithPendingtasks.add("h2"); + aq.updateListOfHostsWithPendingTask(hostsWithPendingtasks); + assertFalse(aq.hasPendingTask("h1")); + assertTrue(aq.hasPendingTask("h2")); + } + /** * @throws Exception */ http://git-wip-us.apache.org/repos/asf/ambari/blob/a3a0ae04/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatMonitor.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatMonitor.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatMonitor.java index c47b601..d3fee8b 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatMonitor.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatMonitor.java @@ -245,6 +245,12 @@ public class TestHeartbeatMonitor { hdfs.getServiceComponent(Role.HDFS_CLIENT.name()).getServiceComponentHost(hostname1).setState(State.INSTALLED); hdfs.getServiceComponent(Role.HDFS_CLIENT.name()).getServiceComponentHost(hostname2).setState(State.INSTALLED); + hdfs.getServiceComponent(Role.DATANODE.name()).getServiceComponentHost(hostname1).setDesiredState(State.INSTALLED); + hdfs.getServiceComponent(Role.NAMENODE.name()).getServiceComponentHost(hostname1).setDesiredState(State.INSTALLED); + hdfs.getServiceComponent(Role.SECONDARY_NAMENODE.name()).getServiceComponentHost(hostname1).setDesiredState(State.INSTALLED); + hdfs.getServiceComponent(Role.HDFS_CLIENT.name()).getServiceComponentHost(hostname1).setDesiredState(State.INSTALLED); + hdfs.getServiceComponent(Role.HDFS_CLIENT.name()).getServiceComponentHost(hostname2).setDesiredState(State.INSTALLED); + ActionQueue aq = new ActionQueue(); ActionManager am = mock(ActionManager.class); HeartbeatMonitor hm = new HeartbeatMonitor(clusters, aq, am, @@ -283,8 +289,10 @@ public class TestHeartbeatMonitor { containsSECONDARY_NAMENODEStatus |= cmd.getComponentName(). equals("SECONDARY_NAMENODE"); containsHDFS_CLIENTStatus |= cmd.getComponentName().equals - ("HDFS_CLIENT"); + ("HDFS_CLIENT"); assertTrue(cmd.getConfigurations().size() > 0); + assertEquals(State.INSTALLED, cmd.getDesiredState()); + assertEquals(false, cmd.getHasStaleConfigs()); } assertTrue(containsDATANODEStatus); assertTrue(containsNAMENODEStatus);
