Repository: ambari Updated Branches: refs/heads/branch-2.2 dbde2f9b0 -> f4e6e59dc
AMBARI-16036. Add logging for problems in ambari-agent Controller and ActionQueue (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f4e6e59d Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f4e6e59d Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f4e6e59d Branch: refs/heads/branch-2.2 Commit: f4e6e59dc8231aaea46d450efe2ea9a180e27367 Parents: dbde2f9 Author: Andrew Onishuk <[email protected]> Authored: Fri Apr 22 12:57:43 2016 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Fri Apr 22 12:57:43 2016 +0300 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 68 +++++++++++--------- .../src/main/python/ambari_agent/Controller.py | 36 +++++++---- .../python/ambari_agent/HeartbeatHandlers.py | 1 + .../test/python/ambari_agent/TestActionQueue.py | 22 +++---- 4 files changed, 71 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/f4e6e59d/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index c4d0b5e..0734bd7 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -144,37 +144,43 @@ class ActionQueue(threading.Thread): self.customServiceOrchestrator.cancel_command(task_id, reason) def run(self): - while not self.stopped(): - self.processBackgroundQueueSafeEmpty(); - self.processStatusCommandQueueSafeEmpty(); - try: - if self.parallel_execution == 0: - command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) - self.process_command(command) - else: - # If parallel execution is enabled, just kick off all available - # commands using separate threads - while (True): + try: + while not self.stopped(): + self.processBackgroundQueueSafeEmpty(); + self.processStatusCommandQueueSafeEmpty(); + try: + if self.parallel_execution == 0: command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) - # If command is not retry_enabled then do not start them in parallel - # checking just one command is enough as all commands for a stage is sent - # at the same time and retry is only enabled for initial start/install - retryAble = False - if 'commandParams' in command and 'command_retry_enabled' in command['commandParams']: - retryAble = command['commandParams']['command_retry_enabled'] == "true" - if retryAble: - logger.info("Kicking off a thread for the command, id=" + - str(command['commandId']) + " taskId=" + str(command['taskId'])) - t = threading.Thread(target=self.process_command, args=(command,)) - t.daemon = True - t.start() - else: - self.process_command(command) - break; + self.process_command(command) + else: + # If parallel execution is enabled, just kick off all available + # commands using separate threads + while (True): + command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) + # If command is not retry_enabled then do not start them in parallel + # checking just one command is enough as all commands for a stage is sent + # at the same time and retry is only enabled for initial start/install + retryAble = False + if 'commandParams' in command and 'command_retry_enabled' in command['commandParams']: + retryAble = command['commandParams']['command_retry_enabled'] == "true" + if retryAble: + logger.info("Kicking off a thread for the command, id=" + + str(command['commandId']) + " taskId=" + str(command['taskId'])) + t = threading.Thread(target=self.process_command, args=(command,)) + t.daemon = True + t.start() + else: + self.process_command(command) + break; + pass pass + except (Queue.Empty): pass - except (Queue.Empty): - pass + except: + logger.exception("ActionQueue thread failed with exception:") + raise + + logger.info("ActionQueue thread has successfully finished") def processBackgroundQueueSafeEmpty(self): while not self.backgroundCommandQueue.empty(): @@ -217,10 +223,8 @@ class ActionQueue(threading.Thread): self.execute_status_command(command) else: logger.error("Unrecognized command " + pprint.pformat(command)) - except Exception, err: - # Should not happen - traceback.print_exc() - logger.warn(err) + except Exception: + logger.exception("Exception while processing {0} command".format(commandType)) def tasks_in_progress_or_pending(self): return_val = False http://git-wip-us.apache.org/repos/asf/ambari/blob/f4e6e59d/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 9f6d32b..e149789 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -332,6 +332,7 @@ class Controller(threading.Thread): except ssl.SSLError: self.repeatRegistration=False self.isRegistered = False + logger.exception("SSLError while trying to heartbeat.") return except Exception, err: if "code" in err: @@ -369,19 +370,26 @@ class Controller(threading.Thread): self.DEBUG_STOP_HEARTBEATING=True def run(self): - self.actionQueue = ActionQueue(self.config, controller=self) - self.actionQueue.start() - self.register = Register(self.config) - self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector()) - - opener = urllib2.build_opener() - urllib2.install_opener(opener) - - while True: - self.repeatRegistration = False - self.registerAndHeartbeat() - if not self.repeatRegistration: - break + try: + self.actionQueue = ActionQueue(self.config, controller=self) + self.actionQueue.start() + self.register = Register(self.config) + self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector()) + + opener = urllib2.build_opener() + urllib2.install_opener(opener) + + while True: + self.repeatRegistration = False + self.registerAndHeartbeat() + if not self.repeatRegistration: + logger.info("Finished heartbeating and registering cycle") + break + except: + logger.exception("Controller thread failed with exception:") + raise + + logger.info("Controller thread has successfully finished") def registerAndHeartbeat(self): registerResponse = self.registerWithServer() @@ -402,6 +410,8 @@ class Controller(threading.Thread): time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC) self.heartbeatWithServer() + else: + logger.info("Registration response from %s didn't contain 'response' as a key".format(self.serverHostname)) def restartAgent(self): ExitHelper().exit(AGENT_AUTO_RESTART_EXIT_CODE) http://git-wip-us.apache.org/repos/asf/ambari/blob/f4e6e59d/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py index 4347595..81ccd22 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatHandlers.py @@ -77,6 +77,7 @@ class HeartbeatStopHandlersWindows(HeartbeatStopHandlers): def signal_handler(signum, frame): global _handler + logger.info("Ambari-agent received {0} signal, stopping...".format(signum)) _handler.set_stop() http://git-wip-us.apache.org/repos/asf/ambari/blob/f4e6e59d/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index a81bc55..ffb6419 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -284,11 +284,11 @@ class TestActionQueue(TestCase): @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) - @patch("traceback.print_exc") + @patch("logging.RootLogger.exception") @patch.object(ActionQueue, "execute_command") @patch.object(ActionQueue, "execute_status_command") def test_process_command(self, execute_status_command_mock, - execute_command_mock, print_exc_mock): + execute_command_mock, log_exc_mock): dummy_controller = MagicMock() config = AmbariConfig() config.set('agent', 'tolerate_download_failures', "true") @@ -306,42 +306,42 @@ class TestActionQueue(TestCase): actionQueue.process_command(wrong_command) self.assertFalse(execute_command_mock.called) self.assertFalse(execute_status_command_mock.called) - self.assertFalse(print_exc_mock.called) + self.assertFalse(log_exc_mock.called) execute_command_mock.reset_mock() execute_status_command_mock.reset_mock() - print_exc_mock.reset_mock() + log_exc_mock.reset_mock() # Try normal execution actionQueue.process_command(execution_command) self.assertTrue(execute_command_mock.called) self.assertFalse(execute_status_command_mock.called) - self.assertFalse(print_exc_mock.called) + self.assertFalse(log_exc_mock.called) execute_command_mock.reset_mock() execute_status_command_mock.reset_mock() - print_exc_mock.reset_mock() + log_exc_mock.reset_mock() actionQueue.process_command(status_command) self.assertFalse(execute_command_mock.called) self.assertTrue(execute_status_command_mock.called) - self.assertFalse(print_exc_mock.called) + self.assertFalse(log_exc_mock.called) execute_command_mock.reset_mock() execute_status_command_mock.reset_mock() - print_exc_mock.reset_mock() + log_exc_mock.reset_mock() # Try exception to check proper logging def side_effect(self): raise Exception("TerribleException") execute_command_mock.side_effect = side_effect actionQueue.process_command(execution_command) - self.assertTrue(print_exc_mock.called) + self.assertTrue(log_exc_mock.called) - print_exc_mock.reset_mock() + log_exc_mock.reset_mock() execute_status_command_mock.side_effect = side_effect actionQueue.process_command(execution_command) - self.assertTrue(print_exc_mock.called) + self.assertTrue(log_exc_mock.called) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) @patch.object(CustomServiceOrchestrator, "runCommand")
