AMBARI-19216 After rescanning yarn queue, Ambari still asks for RM to be restarted (dsen)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/95e9804a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/95e9804a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/95e9804a Branch: refs/heads/branch-dev-patch-upgrade Commit: 95e9804acdd7fa806357feb84d1e9c7f589dd8a0 Parents: 28f1f2e Author: Dmytro Sen <[email protected]> Authored: Tue Dec 20 15:12:46 2016 +0200 Committer: Dmytro Sen <[email protected]> Committed: Tue Dec 20 15:12:46 2016 +0200 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 13 +++++ .../test/python/ambari_agent/TestActionQueue.py | 61 ++++++++++++++++++++ 2 files changed, 74 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/95e9804a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index cc1a048..793eeba 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -430,6 +430,19 @@ class ActionQueue(threading.Thread): # let ambari know that configuration tags were applied configHandler = ActualConfigHandler(self.config, self.configTags) + #update + if 'commandParams' in command: + command_params = command['commandParams'] + if command_params and command_params.has_key('forceRefreshConfigTags') and len(command_params['forceRefreshConfigTags']) > 0 : + forceRefreshConfigTags = command_params['forceRefreshConfigTags'].split(',') + logger.info("Got refresh additional component tags command") + + for configTag in forceRefreshConfigTags : + configHandler.update_component_tag(command['role'], configTag, command['configurationTags'][configTag]) + + roleResult['customCommand'] = self.CUSTOM_COMMAND_RESTART # force restart for component to evict stale_config on server side + command['configurationTags'] = configHandler.read_actual_component(command['role']) + if command.has_key('configurationTags'): configHandler.write_actual(command['configurationTags']) roleResult['configurationTags'] = command['configurationTags'] http://git-wip-us.apache.org/repos/asf/ambari/blob/95e9804a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index 65127f2..d4f5436 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -214,6 +214,20 @@ class TestActionQueue(TestCase): 'hostLevelParams':{'custom_command': 'START'} } + yarn_refresh_queues_custom_command = { + 'commandType': 'EXECUTION_COMMAND', + 'role': u'RESOURCEMANAGER', + 'roleCommand': u'CUSTOM_COMMAND', + 'commandId': '1-1', + 'taskId': 9, + 'clusterName': u'cc', + 'serviceName': u'YARN', + 'commandParams' : {'forceRefreshConfigTags' : 'capacity-scheduler'}, + 'configurations':{'global' : {}}, + 'configurationTags':{'global' : { 'tag': 'v123' }, 'capacity-scheduler' : {'tag': 'v123'}}, + 'hostLevelParams':{'custom_command': 'REFRESHQUEUES'} + } + status_command_for_alerts = { "serviceName" : 'FLUME', "commandType" : "STATUS_COMMAND", @@ -826,6 +840,53 @@ class TestActionQueue(TestCase): @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(ActualConfigHandler, "write_client_components") @patch.object(ActualConfigHandler, "write_actual_component") + @patch.object(ActualConfigHandler, "update_component_tag") + @patch.object(CustomServiceOrchestrator, "runCommand") + @patch("CommandStatusDict.CommandStatusDict") + @patch.object(ActionQueue, "status_update_callback") + def test_refresh_queues_custom_command(self, status_update_callback_mock, + command_status_dict_mock, + cso_runCommand_mock, update_component_tag, write_actual_component_mock, write_client_components_mock): + custom_service_orchestrator_execution_result_dict = { + 'stdout': 'out', + 'stderr': 'stderr', + 'structuredOut' : '', + 'exitcode' : 0 + } + cso_runCommand_mock.return_value = custom_service_orchestrator_execution_result_dict + + config = AmbariConfig() + tempdir = tempfile.gettempdir() + config.set('agent', 'prefix', tempdir) + config.set('agent', 'cache_dir', "/var/lib/ambari-agent/cache") + config.set('agent', 'tolerate_download_failures', "true") + dummy_controller = MagicMock() + actionQueue = ActionQueue(config, dummy_controller) + actionQueue.execute_command(self.yarn_refresh_queues_custom_command) + + report = actionQueue.result() + expected = {'status': 'COMPLETED', + 'configurationTags': None, + 'stderr': 'stderr', + 'stdout': 'out\n\nCommand completed successfully!\n', + 'clusterName': u'cc', + 'structuredOut': '""', + 'roleCommand': u'CUSTOM_COMMAND', + 'serviceName': u'YARN', + 'role': u'RESOURCEMANAGER', + 'actionId': '1-1', + 'taskId': 9, + 'customCommand': 'RESTART', + 'exitCode': 0} + self.assertEqual(len(report['reports']), 1) + self.assertEqual(expected, report['reports'][0]) + + # Configuration tags should be updated + self.assertTrue(update_component_tag.called) + + @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) + @patch.object(ActualConfigHandler, "write_client_components") + @patch.object(ActualConfigHandler, "write_actual_component") @patch.object(CustomServiceOrchestrator, "runCommand") @patch("CommandStatusDict.CommandStatusDict") @patch.object(ActionQueue, "status_update_callback")
