Repository: ambari Updated Branches: refs/heads/branch-2.5 0bd7b8643 -> 290218f76
AMBARI-20323. Commands timed-out on ambari host without any error logs - addendum patch (echekanskiy) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/290218f7 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/290218f7 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/290218f7 Branch: refs/heads/branch-2.5 Commit: 290218f761f6c19355b79379041634fabd5bcbb2 Parents: 0bd7b86 Author: Eugene Chekanskiy <[email protected]> Authored: Sat Mar 11 00:08:59 2017 +0200 Committer: Eugene Chekanskiy <[email protected]> Committed: Sat Mar 11 00:08:59 2017 +0200 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/Controller.py | 2 +- .../src/main/python/ambari_agent/ExitHelper.py | 3 ++ .../ambari_agent/StatusCommandsExecutor.py | 36 ++++++++++++++++---- .../src/main/python/ambari_agent/main.py | 4 +-- 4 files changed, 35 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/290218f7/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 301ad43..a123f2f 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -477,7 +477,7 @@ class Controller(threading.Thread): try: self.actionQueue = ActionQueue(self.config, controller=self) self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue) - ExitHelper().register(self.statusCommandsExecutor.kill, "CLEANUP_KILLING") + ExitHelper().register(self.statusCommandsExecutor.kill, "CLEANUP_KILLING", can_relaunch=False) self.actionQueue.start() self.register = Register(self.config) self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector()) http://git-wip-us.apache.org/repos/asf/ambari/blob/290218f7/ambari-agent/src/main/python/ambari_agent/ExitHelper.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py b/ambari-agent/src/main/python/ambari_agent/ExitHelper.py index e51646f..66e29e6 100644 --- a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py +++ b/ambari-agent/src/main/python/ambari_agent/ExitHelper.py @@ -39,6 +39,9 @@ class ExitHelper(object): """ Class to cleanup resources before exiting. Replacement for atexit module. sys.exit(code) works only from threads and os._exit(code) will ignore atexit and cleanup will be ignored. + + WARNING: always import as `ambari_agent.ExitHelper import ExitHelper`, otherwise it will be imported twice and nothing + will work as expected. """ __metaclass__ = _singleton http://git-wip-us.apache.org/repos/asf/ambari/blob/290218f7/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py index 3f7ef4c..5c1c54a 100644 --- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py @@ -37,6 +37,9 @@ class StatusCommandsExecutor(object): self.config = config self.actionQueue = actionQueue + self._can_relaunch_lock = threading.RLock() + self._can_relaunch = True + # used to prevent queues from been used during creation of new one to prevent threads messing up with combination of # old and new queues self.usage_lock = threading.RLock() @@ -53,6 +56,16 @@ class StatusCommandsExecutor(object): self.mp_result_logs = multiprocessing.Queue() self.mp_task_queue = multiprocessing.Queue() + @property + def can_relaunch(self): + with self._can_relaunch_lock: + return self._can_relaunch + + @can_relaunch.setter + def can_relaunch(self, value): + with self._can_relaunch_lock: + self._can_relaunch = value + def _log_message(self, level, message, exception=None): """ Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target). @@ -163,7 +176,7 @@ class StatusCommandsExecutor(object): self._log_message(logging.ERROR, "StatusCommandsExecutor process failed with exception:", e) raise - self._log_message(logging.WARN, "StatusCommandsExecutor subprocess finished") + self._log_message(logging.INFO, "StatusCommandsExecutor subprocess finished") def _set_timed_out(self, command): """ @@ -242,23 +255,32 @@ class StatusCommandsExecutor(object): :param reason: reason of restart :return: """ - self.kill(reason) - self.worker_process = multiprocessing.Process(target=self._worker_process_target) - self.worker_process.start() - logger.info("Started process with pid {0}".format(self.worker_process.pid)) + if self.can_relaunch: + self.kill(reason) + self.worker_process = multiprocessing.Process(target=self._worker_process_target) + self.worker_process.start() + logger.info("Started process with pid {0}".format(self.worker_process.pid)) + else: + logger.debug("Relaunch does not allowed, can not relaunch") - def kill(self, reason=None): + def kill(self, reason=None, can_relaunch=True): """ Tries to stop command executor internal process for sort time, otherwise killing it. Closing all possible queues to unblock threads that probably blocked on read or write operations to queues. Must be called from threads different from threads that calling read or write methods(get_log_messages, get_results, put_commands). + :param can_relaunch: indicates if StatusCommandsExecutor can be relaunched after this kill :param reason: reason of killing :return: """ + logger.info("Killing child process reason:" + str(reason)) + self.can_relaunch = can_relaunch + + if not self.can_relaunch: + logger.info("Killing without possibility to relaunch...") + # try graceful stop, otherwise hard-kill if self.worker_process and self.worker_process.is_alive(): - logger.info("Killing child process reason:" + str(reason)) self.mustDieEvent.set() self.worker_process.join(timeout=3) if self.worker_process.is_alive(): http://git-wip-us.apache.org/repos/asf/ambari/blob/290218f7/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index 3c5d8f1..bbed123 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -88,7 +88,7 @@ from NetUtil import NetUtil from PingPortListener import PingPortListener import hostname from DataCleaner import DataCleaner -from ExitHelper import ExitHelper +from ambari_agent.ExitHelper import ExitHelper import socket from ambari_commons import OSConst, OSCheck from ambari_commons.shell import shellRunner @@ -318,7 +318,7 @@ def run_threads(server_hostname, heartbeat_stop_callback): if controller.get_status_commands_executor().need_relaunch: controller.get_status_commands_executor().relaunch("COMMAND_TIMEOUT_OR_KILLED") - controller.get_status_commands_executor().kill() + controller.get_status_commands_executor().kill("AGENT_STOPPED", can_relaunch=False) # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process # we need this for windows os, where no sigterm available
