Repository: ambari Updated Branches: refs/heads/trunk 6d11d6780 -> 4e78cb341
AMBARI-4992. Sometimes cluster installation pauses for few minutes between tasks (dlysnichenko) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4e78cb34 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4e78cb34 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4e78cb34 Branch: refs/heads/trunk Commit: 4e78cb3410a87469fc1706d39b24cd36e63e4438 Parents: 6d11d67 Author: Lisnichenko Dmitro <[email protected]> Authored: Thu Mar 13 16:00:12 2014 +0200 Committer: Lisnichenko Dmitro <[email protected]> Committed: Thu Mar 13 16:00:12 2014 +0200 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 36 ++++++++++++++++---- .../src/main/python/ambari_agent/Controller.py | 13 +++++-- .../test/python/ambari_agent/TestController.py | 33 ++++++++++++++---- 3 files changed, 67 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/4e78cb34/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 731ac54..549651a 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -46,6 +46,11 @@ class ActionQueue(threading.Thread): # How many actions can be performed in parallel. Feel free to change MAX_CONCURRENT_ACTIONS = 5 + + #How much time(in seconds) we need wait for new incoming execution command before checking + #status command queue + EXECUTION_COMMAND_WAIT_TIME = 2 + STATUS_COMMAND = 'STATUS_COMMAND' EXECUTION_COMMAND = 'EXECUTION_COMMAND' ROLE_COMMAND_INSTALL = 'INSTALL' @@ -64,6 +69,7 @@ class ActionQueue(threading.Thread): def __init__(self, config, controller): super(ActionQueue, self).__init__() self.commandQueue = Queue.Queue() + self.statusCommandQueue = Queue.Queue() self.commandStatuses = CommandStatusDict(callback_action = self.status_update_callback) self.config = config @@ -81,6 +87,17 @@ class ActionQueue(threading.Thread): def stopped(self): return self._stop.isSet() + def put_status(self, commands): + #Was supposed that we got all set of statuses, we don't need to keep old ones + self.statusCommandQueue.queue.clear() + + for command in commands: + logger.info("Adding " + command['commandType'] + " for service " + \ + command['serviceName'] + " of cluster " + \ + command['clusterName'] + " to the queue.") + logger.debug(pprint.pformat(command)) + self.statusCommandQueue.put(command) + def put(self, commands): for command in commands: logger.info("Adding " + command['commandType'] + " for service " + \ @@ -89,14 +106,21 @@ class ActionQueue(threading.Thread): logger.debug(pprint.pformat(command)) self.commandQueue.put(command) - def empty(self): - return self.commandQueue.empty() - - def run(self): while not self.stopped(): - command = self.commandQueue.get() # Will block if queue is empty - self.process_command(command) + while not self.statusCommandQueue.empty(): + try: + command = self.statusCommandQueue.get(False) + self.process_command(command) + except (Queue.Empty): + pass + try: + command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) + self.process_command(command) + except (Queue.Empty): + pass + + def process_command(self, command): http://git-wip-us.apache.org/repos/asf/ambari/blob/4e78cb34/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 26985ef..ad928c2 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -107,7 +107,7 @@ class Controller(threading.Thread): self.isRegistered = True if 'statusCommands' in ret.keys(): logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']) ) - self.addToQueue(ret['statusCommands']) + self.addToStatusQueue(ret['statusCommands']) pass else: self.hasMappedComponents = False @@ -138,6 +138,13 @@ class Controller(threading.Thread): self.actionQueue.put(commands) pass + def addToStatusQueue(self, commands): + if not commands: + logger.debug("No status commands from the server : " + pprint.pformat(commands)) + else: + self.actionQueue.put_status(commands) + pass + # For testing purposes DEBUG_HEARTBEAT_RETRIES = 0 DEBUG_SUCCESSFULL_HEARTBEATS = 0 @@ -190,8 +197,8 @@ class Controller(threading.Thread): if 'executionCommands' in response.keys(): self.addToQueue(response['executionCommands']) pass - if 'statusCommands' in response.keys() and self.actionQueue.empty(): - self.addToQueue(response['statusCommands']) + if 'statusCommands' in response.keys(): + self.addToStatusQueue(response['statusCommands']) pass if "true" == response['restartAgent']: logger.error("Got restartAgent command") http://git-wip-us.apache.org/repos/asf/ambari/blob/4e78cb34/ambari-agent/src/test/python/ambari_agent/TestController.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py index 1e110da..63f5b22 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestController.py +++ b/ambari-agent/src/test/python/ambari_agent/TestController.py @@ -78,10 +78,10 @@ class TestController(unittest.TestCase): self.assertEqual({"responseId":1}, self.controller.registerWithServer()) self.controller.sendRequest.return_value = '{"responseId":1, "statusCommands": "commands", "log":"", "exitstatus":"0"}' - self.controller.addToQueue = MagicMock(name="addToQueue") + self.controller.addToStatusQueue = MagicMock(name="addToStatusQueue") self.controller.isRegistered = False self.assertEqual({'exitstatus': '0', 'responseId': 1, 'log': '', 'statusCommands': 'commands'}, self.controller.registerWithServer()) - self.controller.addToQueue.assert_called_with("commands") + self.controller.addToStatusQueue.assert_called_with("commands") calls = [] @@ -102,7 +102,7 @@ class TestController(unittest.TestCase): sys.stdout = sys.__stdout__ self.controller.sendRequest = Controller.Controller.sendRequest - self.controller.addToQueue = Controller.Controller.addToQueue + self.controller.addToStatusQueue = Controller.Controller.addToStatusQueue @patch("pprint.pformat") @@ -116,6 +116,17 @@ class TestController(unittest.TestCase): self.assertTrue(actionQueue.put.called) + @patch("pprint.pformat") + def test_addToStatusQueue(self, pformatMock): + + actionQueue = MagicMock() + self.controller.actionQueue = actionQueue + self.controller.addToStatusQueue(None) + self.assertFalse(actionQueue.put_status.called) + self.controller.addToStatusQueue("cmd") + self.assertTrue(actionQueue.put_status.called) + + @patch("urllib2.build_opener") @patch("urllib2.install_opener") @patch.object(Controller, "ActionQueue") @@ -369,17 +380,25 @@ class TestController(unittest.TestCase): restartAgent.assert_called_once_with() - # executionCommands, statusCommands + # executionCommands self.controller.responseId = 1 addToQueue = MagicMock(name="addToQueue") self.controller.addToQueue = addToQueue response["executionCommands"] = "executionCommands" + self.controller.DEBUG_STOP_HEARTBEATING = False + self.controller.heartbeatWithServer() + + addToQueue.assert_has_calls([call("executionCommands")]) + + # statusCommands + self.controller.responseId = 1 + addToStatusQueue = MagicMock(name="addToStatusQueue") + self.controller.addToStatusQueue = addToStatusQueue response["statusCommands"] = "statusCommands" self.controller.DEBUG_STOP_HEARTBEATING = False self.controller.heartbeatWithServer() - addToQueue.assert_has_calls([call("executionCommands"), - call("statusCommands")]) + addToStatusQueue.assert_has_calls([call("statusCommands")]) # restartAgent command self.controller.responseId = 1 @@ -404,6 +423,8 @@ class TestController(unittest.TestCase): sys.stdout = sys.__stdout__ self.controller.sendRequest = Controller.Controller.sendRequest self.controller.sendRequest = Controller.Controller.addToQueue + self.controller.sendRequest = Controller.Controller.addToStatusQueue + @patch("pprint.pformat") @patch("time.sleep")
