Repository: ambari Updated Branches: refs/heads/branch-2.4 2810eca67 -> de85cddbd
AMBARI-16935: Retry and recover from component install failures Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/de85cddb Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/de85cddb Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/de85cddb Branch: refs/heads/branch-2.4 Commit: de85cddbdddd92639b9b5fcc386ffddbc63ff10f Parents: 2810eca Author: Nahappan Somasundaram <nsomasunda...@hortonworks.com> Authored: Tue May 31 15:10:09 2016 -0700 Committer: Nahappan Somasundaram <nsomasunda...@hortonworks.com> Committed: Wed Jun 8 16:57:57 2016 -0700 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 12 +++- .../main/python/ambari_agent/RecoveryManager.py | 42 +++++++++++--- .../python/ambari_agent/TestRecoveryManager.py | 59 +++++++++++++------- 3 files changed, 83 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/de85cddb/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 4de5390..f217a54 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -426,6 +426,14 @@ class ActionQueue(threading.Thread): command['hostLevelParams']['clientsToUpdateConfigs']) roleResult['configurationTags'] = configHandler.read_actual_component( command['role']) + elif status == self.FAILED_STATUS: + if self.controller.recovery_manager.enabled() and command.has_key('roleCommand') \ + and self.controller.recovery_manager.configured_for_recovery(command['role']): + if command['roleCommand'] == self.ROLE_COMMAND_INSTALL: + self.controller.recovery_manager.update_current_status(command['role'], self.controller.recovery_manager.INSTALL_FAILED) + logger.info("After EXECUTION_COMMAND (INSTALL), with taskId=" + str(command['taskId']) + + ", current state of " + command['role'] + " to " + + self.controller.recovery_manager.get_current_status(command['role'])) self.commandStatuses.put_command_status(command, roleResult) @@ -511,7 +519,9 @@ class ActionQueue(threading.Thread): component_status = LiveStatus.DEAD_STATUS if self.controller.recovery_manager.enabled() \ and self.controller.recovery_manager.configured_for_recovery(component): - self.controller.recovery_manager.update_current_status(component, component_status) + if (self.controller.recovery_manager.get_current_status(component) != self.controller.recovery_manager.INSTALL_FAILED): + self.controller.recovery_manager.update_current_status(component, component_status) + request_execution_cmd = self.controller.recovery_manager.requires_recovery(component) and \ not self.controller.recovery_manager.command_exists(component, ActionQueue.EXECUTION_COMMAND) http://git-wip-us.apache.org/repos/asf/ambari/blob/de85cddb/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py index 87d9483..be335f2 100644 --- a/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py +++ b/ambari-agent/src/main/python/ambari_agent/RecoveryManager.py @@ -54,6 +54,7 @@ class RecoveryManager: STARTED = "STARTED" INSTALLED = "INSTALLED" INIT = "INIT" # TODO: What is the state when machine is reset + INSTALL_FAILED = "INSTALL_FAILED" COMPONENT_UPDATE_KEY_FORMAT = "{0}_UPDATE_TIME" COMMAND_REFRESH_DELAY_SEC = 600 #10 minutes @@ -75,9 +76,10 @@ class RecoveryManager: "stale_config": False } - def __init__(self, cache_dir, recovery_enabled=False, auto_start_only=False): + def __init__(self, cache_dir, recovery_enabled=False, auto_start_only=False, auto_install_start=False): self.recovery_enabled = recovery_enabled self.auto_start_only = auto_start_only + self.auto_install_start = auto_install_start self.max_count = 6 self.window_in_min = 60 self.retry_gap = 5 @@ -107,7 +109,7 @@ class RecoveryManager: self.actions = {} - self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, "", -1) + self.update_config(6, 60, 5, 12, recovery_enabled, auto_start_only, auto_install_start, "", -1) pass @@ -238,7 +240,7 @@ class RecoveryManager: return False status = self.statuses[component] - if self.auto_start_only: + if self.auto_start_only or self.auto_install_start: if status["current"] == status["desired"]: return False if status["desired"] not in self.allowed_desired_states: @@ -306,6 +308,8 @@ class RecoveryManager: This method computes the recovery commands for the following transitions INSTALLED --> STARTED INIT --> INSTALLED + INSTALLED_FAILED --> INSTALLED + INSTALLED_FAILED --> STARTED """ commands = [] for component in self.statuses.keys(): @@ -316,6 +320,15 @@ class RecoveryManager: if status["desired"] == self.STARTED: if status["current"] == self.INSTALLED: command = self.get_start_command(component) + elif self.auto_install_start: + if status["desired"] == self.STARTED: + if status["current"] == self.INSTALLED: + command = self.get_start_command(component) + elif status["current"] == self.INSTALL_FAILED: + command = self.get_install_command(component) + elif status["desired"] == self.INSTALLED: + if status["current"] == self.INSTALL_FAILED: + command = self.get_install_command(component) else: # START, INSTALL, RESTART if status["desired"] != status["current"]: @@ -324,9 +337,13 @@ class RecoveryManager: command = self.get_start_command(component) elif status["current"] == self.INIT: command = self.get_install_command(component) + elif status["current"] == self.INSTALL_FAILED: + command = self.get_install_command(component) elif status["desired"] == self.INSTALLED: if status["current"] == self.INIT: command = self.get_install_command(component) + elif status["current"] == self.INSTALL_FAILED: + command = self.get_install_command(component) elif status["current"] == self.STARTED: command = self.get_stop_command(component) else: @@ -536,7 +553,7 @@ class RecoveryManager: """ TODO: Server sends the recovery configuration - call update_config after parsing "recoveryConfig": { - "type" : "DEFAULT|AUTO_START|FULL", + "type" : "DEFAULT|AUTO_START|AUTO_INSTALL_START|FULL", "maxCount" : 10, "windowInMinutes" : 60, "retryGap" : 0, @@ -547,6 +564,7 @@ class RecoveryManager: recovery_enabled = False auto_start_only = False + auto_install_start = False max_count = 6 window_in_min = 60 retry_gap = 5 @@ -559,10 +577,13 @@ class RecoveryManager: logger.info("RecoverConfig = " + pprint.pformat(reg_resp["recoveryConfig"])) config = reg_resp["recoveryConfig"] if "type" in config: - if config["type"] in ["AUTO_START", "FULL"]: + if config["type"] in ["AUTO_INSTALL_START", "AUTO_START", "FULL"]: recovery_enabled = True if config["type"] == "AUTO_START": auto_start_only = True + elif config["type"] == "AUTO_INSTALL_START": + auto_install_start = True + if "maxCount" in config: max_count = self._read_int_(config["maxCount"], max_count) if "windowInMinutes" in config: @@ -579,7 +600,7 @@ class RecoveryManager: recovery_timestamp = config['recoveryTimestamp'] self.update_config(max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, auto_start_only, - enabled_components, recovery_timestamp) + auto_install_start, enabled_components, recovery_timestamp) pass """ @@ -591,11 +612,12 @@ class RecoveryManager: max_lifetime_count - Configured maximum lifetime count of recovery attempt allowed per host component. recovery_enabled - True or False. Indicates whether recovery is enabled or not. auto_start_only - True if AUTO_START recovery type was specified. False otherwise. + auto_install_start - True if AUTO_INSTALL_START recovery type was specified. False otherwise. enabled_components - CSV of componenents enabled for auto start. recovery_timestamp - Timestamp when the recovery values were last updated. -1 on start up. """ def update_config(self, max_count, window_in_min, retry_gap, max_lifetime_count, recovery_enabled, - auto_start_only, enabled_components, recovery_timestamp): + auto_start_only, auto_install_start, enabled_components, recovery_timestamp): """ Update recovery configuration, recovery is disabled if configuration values are not correct @@ -625,16 +647,20 @@ class RecoveryManager: self.window_in_sec = window_in_min * 60 self.retry_gap_in_sec = retry_gap * 60 self.auto_start_only = auto_start_only + self.auto_install_start = auto_install_start self.max_lifetime_count = max_lifetime_count self.enabled_components = [] self.recovery_timestamp = recovery_timestamp self.allowed_desired_states = [self.STARTED, self.INSTALLED] - self.allowed_current_states = [self.INIT, self.INSTALLED, self.STARTED] + self.allowed_current_states = [self.INIT, self.INSTALL_FAILED, self.INSTALLED, self.STARTED] if self.auto_start_only: self.allowed_desired_states = [self.STARTED] self.allowed_current_states = [self.INSTALLED] + elif self.auto_install_start: + self.allowed_desired_states = [self.INSTALLED, self.STARTED] + self.allowed_current_states = [self.INSTALL_FAILED, self.INSTALLED] if enabled_components is not None and len(enabled_components) > 0: components = enabled_components.split(",") http://git-wip-us.apache.org/repos/asf/ambari/blob/de85cddb/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py index b7b214c..8868602 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py +++ b/ambari-agent/src/test/python/ambari_agent/TestRecoveryManager.py @@ -372,7 +372,7 @@ class _TestRecoveryManager(TestCase): @patch.object(RecoveryManager, "_now_") def test_get_recovery_commands(self, time_mock): time_mock.side_effect = \ - [1000, 1001, 1002, 1003, + [1000, 1001, 1002, 1003, 1004, 1100, 1101, 1102, 1200, 1201, 1203, 4000, 4001, 4002, 4003, @@ -380,12 +380,12 @@ class _TestRecoveryManager(TestCase): 4200, 4201, 4202, 4300, 4301, 4302] rm = RecoveryManager(tempfile.mktemp(), True) - rm.update_config(15, 5, 1, 16, True, False, "", -1) + rm.update_config(15, 5, 1, 16, True, False, False, "", -1) command1 = copy.deepcopy(self.command) rm.store_or_update_command(command1) - rm.update_config(12, 5, 1, 15, True, False, "NODEMANAGER", -1) + rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER", -1) rm.update_current_status("NODEMANAGER", "INSTALLED") rm.update_desired_status("NODEMANAGER", "STARTED") self.assertEqual("INSTALLED", rm.get_current_status("NODEMANAGER")) @@ -395,6 +395,14 @@ class _TestRecoveryManager(TestCase): self.assertEqual(1, len(commands)) self.assertEqual("START", commands[0]["roleCommand"]) + rm.update_config(2, 5, 1, 5, True, False, True, "NODEMANAGER", -1) + rm.update_current_status("NODEMANAGER", "INSTALL_FAILED") + rm.update_desired_status("NODEMANAGER", "INSTALLED") + + commands = rm.get_recovery_commands() + self.assertEqual(1, len(commands)) + self.assertEqual("INSTALL", commands[0]["roleCommand"]) + rm.update_current_status("NODEMANAGER", "INIT") rm.update_desired_status("NODEMANAGER", "STARTED") @@ -411,14 +419,14 @@ class _TestRecoveryManager(TestCase): self.assertEqual(1, len(commands)) self.assertEqual("INSTALL", commands[0]["roleCommand"]) - rm.update_config(2, 5, 1, 5, True, True, "", -1) + rm.update_config(2, 5, 1, 5, True, True, False, "", -1) rm.update_current_status("NODEMANAGER", "INIT") rm.update_desired_status("NODEMANAGER", "INSTALLED") commands = rm.get_recovery_commands() self.assertEqual(0, len(commands)) - rm.update_config(12, 5, 1, 15, True, False, "NODEMANAGER", -1) + rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER", -1) rm.update_current_status("NODEMANAGER", "INIT") rm.update_desired_status("NODEMANAGER", "INSTALLED") @@ -459,25 +467,26 @@ class _TestRecoveryManager(TestCase): def _test_update_rm_config(self, mock_uc): rm = RecoveryManager(tempfile.mktemp()) rm.update_configuration_from_registration(None) - mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True, "", -1)]) + mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "", -1)]) mock_uc.reset_mock() rm.update_configuration_from_registration({}) - mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True, "", -1)]) + mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "", -1)]) mock_uc.reset_mock() rm.update_configuration_from_registration( {"recoveryConfig": { "type" : "DEFAULT"}} ) - mock_uc.assert_has_calls([call(6, 60, 5, 12, False, True, "", -1)]) + + mock_uc.assert_has_calls([call(6, 60, 5, 12, False, False, False, "", -1)]) mock_uc.reset_mock() rm.update_configuration_from_registration( {"recoveryConfig": { "type" : "FULL"}} ) - mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, "", -1)]) + mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, False, "", -1)]) mock_uc.reset_mock() rm.update_configuration_from_registration( @@ -485,7 +494,15 @@ class _TestRecoveryManager(TestCase): "type" : "AUTO_START", "max_count" : "med"}} ) - mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True, "", -1)]) + mock_uc.assert_has_calls([call(6, 60, 5, 12, True, True, False, "", -1)]) + + mock_uc.reset_mock() + rm.update_configuration_from_registration( + {"recoveryConfig": { + "type" : "AUTO_INSTALL_START", + "max_count" : "med"}} + ) + mock_uc.assert_has_calls([call(6, 60, 5, 12, True, False, True, "", -1)]) mock_uc.reset_mock() rm.update_configuration_from_registration( @@ -498,7 +515,7 @@ class _TestRecoveryManager(TestCase): "components" : " A,B", "recoveryTimestamp" : 1}} ) - mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True, " A,B", 1)]) + mock_uc.assert_has_calls([call(5, 20, 2, 5, True, True, False, " A,B", 1)]) pass @patch.object(RecoveryManager, "_now_") @@ -510,7 +527,7 @@ class _TestRecoveryManager(TestCase): rec_st = rm.get_recovery_status() self.assertEquals(rec_st, {"summary": "DISABLED"}) - rm.update_config(2, 5, 1, 4, True, True, "", -1) + rm.update_config(2, 5, 1, 4, True, True, False, "", -1) rec_st = rm.get_recovery_status() self.assertEquals(rec_st, {"summary": "RECOVERABLE", "componentReports": []}) @@ -554,12 +571,12 @@ class _TestRecoveryManager(TestCase): [1000, 1001, 1002, 1003, 1104, 1105, 1106, 1807, 1808, 1809, 1810, 1811, 1812] rm = RecoveryManager(tempfile.mktemp(), True) - rm.update_config(5, 5, 1, 11, True, False, "", -1) + rm.update_config(5, 5, 1, 11, True, False, False, "", -1) command1 = copy.deepcopy(self.command) rm.store_or_update_command(command1) - rm.update_config(12, 5, 1, 15, True, False, "NODEMANAGER", -1) + rm.update_config(12, 5, 1, 15, True, False, False, "NODEMANAGER", -1) rm.update_current_status("NODEMANAGER", "INSTALLED") rm.update_desired_status("NODEMANAGER", "STARTED") @@ -595,24 +612,24 @@ class _TestRecoveryManager(TestCase): def test_configured_for_recovery(self): rm = RecoveryManager(tempfile.mktemp(), True) - rm.update_config(12, 5, 1, 15, True, False, "A,B", -1) + rm.update_config(12, 5, 1, 15, True, False, False, "A,B", -1) self.assertTrue(rm.configured_for_recovery("A")) self.assertTrue(rm.configured_for_recovery("B")) - rm.update_config(5, 5, 1, 11, True, False, "", -1) + rm.update_config(5, 5, 1, 11, True, False, False, "", -1) self.assertFalse(rm.configured_for_recovery("A")) self.assertFalse(rm.configured_for_recovery("B")) - rm.update_config(5, 5, 1, 11, True, False, "A", -1) + rm.update_config(5, 5, 1, 11, True, False, False, "A", -1) self.assertTrue(rm.configured_for_recovery("A")) self.assertFalse(rm.configured_for_recovery("B")) - rm.update_config(5, 5, 1, 11, True, False, "A", -1) + rm.update_config(5, 5, 1, 11, True, False, False, "A", -1) self.assertTrue(rm.configured_for_recovery("A")) self.assertFalse(rm.configured_for_recovery("B")) self.assertFalse(rm.configured_for_recovery("C")) - rm.update_config(5, 5, 1, 11, True, False, "A, D, F ", -1) + rm.update_config(5, 5, 1, 11, True, False, False, "A, D, F ", -1) self.assertTrue(rm.configured_for_recovery("A")) self.assertFalse(rm.configured_for_recovery("B")) self.assertFalse(rm.configured_for_recovery("C")) @@ -626,7 +643,7 @@ class _TestRecoveryManager(TestCase): [1000, 1071, 1372] rm = RecoveryManager(tempfile.mktemp(), True) - rm.update_config(2, 5, 1, 4, True, True, "", -1) + rm.update_config(2, 5, 1, 4, True, True, False, "", -1) rm.execute("COMPONENT") actions = rm.get_actions_copy()["COMPONENT"] @@ -644,7 +661,7 @@ class _TestRecoveryManager(TestCase): def test_is_action_info_stale(self, time_mock): rm = RecoveryManager(tempfile.mktemp(), True) - rm.update_config(5, 60, 5, 16, True, False, "", -1) + rm.update_config(5, 60, 5, 16, True, False, False, "", -1) time_mock.return_value = 0 self.assertFalse(rm.is_action_info_stale("COMPONENT_NAME"))