Repository: ambari Updated Branches: refs/heads/branch-2.2 f04d81dad -> db471d93d
AMBARI-15795. Parallel execution should only be allowed on commands that have auto retry enabled (smohanty) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/db471d93 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/db471d93 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/db471d93 Branch: refs/heads/branch-2.2 Commit: db471d93d1a2abaa8acc01aa15a27a6f408c6498 Parents: f04d81d Author: Sumit Mohanty <[email protected]> Authored: Mon Apr 11 15:49:23 2016 -0700 Committer: Sumit Mohanty <[email protected]> Committed: Mon Apr 11 15:50:16 2016 -0700 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 22 +++++++-- .../test/python/ambari_agent/TestActionQueue.py | 48 +++++++++++++++++++- 2 files changed, 63 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/db471d93/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 434c51e..619e938 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -156,11 +156,23 @@ class ActionQueue(threading.Thread): # commands using separate threads while (True): command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) - logger.info("Kicking off a thread for the command, id=" + - str(command['commandId']) + " taskId=" + str(command['taskId'])) - t = threading.Thread(target=self.process_command, args=(command,)) - t.daemon = True - t.start() + # If command is not retry_enabled then do not start them in parallel + # checking just one command is enough as all commands for a stage is sent + # at the same time and retry is only enabled for initial start/install + retryAble = False + if 'command_retry_enabled' in command['commandParams']: + retryAble = command['commandParams']['command_retry_enabled'] == "true" + if retryAble: + logger.info("Kicking off a thread for the command, id=" + + str(command['commandId']) + " taskId=" + str(command['taskId'])) + t = threading.Thread(target=self.process_command, args=(command,)) + t.daemon = True + t.start() + else: + self.process_command(command) + break; + pass + pass except (Queue.Empty): pass http://git-wip-us.apache.org/repos/asf/ambari/blob/db471d93/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index 466c3a8..cb0574f 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -64,7 +64,26 @@ class TestActionQueue(TestCase): 'serviceName': u'HDFS', 'hostLevelParams': {}, 'configurations':{'global' : {}}, - 'configurationTags':{'global' : { 'tag': 'v1' }} + 'configurationTags':{'global' : { 'tag': 'v1' }}, + 'commandParams': { + 'command_retry_enabled': 'true' + } + } + + datanode_install_no_retry_command = { + 'commandType': 'EXECUTION_COMMAND', + 'role': u'DATANODE', + 'roleCommand': u'INSTALL', + 'commandId': '1-1', + 'taskId': 3, + 'clusterName': u'cc', + 'serviceName': u'HDFS', + 'hostLevelParams': {}, + 'configurations':{'global' : {}}, + 'configurationTags':{'global' : { 'tag': 'v1' }}, + 'commandParams': { + 'command_retry_enabled': 'false' + } } datanode_auto_start_command = { @@ -129,8 +148,11 @@ class TestActionQueue(TestCase): 'taskId': 7, 'clusterName': u'cc', 'serviceName': u'HDFS', - 'hostLevelParams': {} + 'hostLevelParams': {}, + 'commandParams': { + 'command_retry_enabled': 'true' } + } status_command = { "serviceName" : 'HDFS', @@ -887,6 +909,28 @@ class TestActionQueue(TestCase): self.assertEqual(2, process_command_mock.call_count) process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)]) + @patch("threading.Thread") + @patch.object(AmbariConfig, "get_parallel_exec_option") + @patch.object(ActionQueue, "process_command") + @patch.object(CustomServiceOrchestrator, "__init__") + def test_parallel_exec_no_retry(self, CustomServiceOrchestrator_mock, + process_command_mock, gpeo_mock, threading_mock): + CustomServiceOrchestrator_mock.return_value = None + dummy_controller = MagicMock() + config = MagicMock() + gpeo_mock.return_value = 1 + config.get_parallel_exec_option = gpeo_mock + actionQueue = ActionQueue(config, dummy_controller) + actionQueue.put([self.datanode_install_no_retry_command, self.snamenode_install_command]) + self.assertEqual(2, actionQueue.commandQueue.qsize()) + actionQueue.start() + time.sleep(1) + actionQueue.stop() + actionQueue.join() + self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') + self.assertEqual(1, process_command_mock.call_count) + self.assertEqual(0, threading_mock.call_count) + process_command_mock.assert_any_calls([call(self.datanode_install_command), call(self.hbase_install_command)]) @not_for_platform(PLATFORM_LINUX) @patch("time.sleep")
