AMBARI-20323. Commands timed-out on ambari host without any error logs (echekanskiy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/17ef5559 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/17ef5559 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/17ef5559 Branch: refs/heads/branch-dev-logsearch Commit: 17ef555940758b73cd09ddcc9fc8a3461604c085 Parents: c6a9a3c Author: Eugene Chekanskiy <[email protected]> Authored: Thu Mar 9 18:30:20 2017 +0200 Committer: Eugene Chekanskiy <[email protected]> Committed: Thu Mar 9 18:30:20 2017 +0200 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 52 +--- .../src/main/python/ambari_agent/Controller.py | 54 +--- .../ambari_agent/StatusCommandsExecutor.py | 307 +++++++++++++++---- .../src/main/python/ambari_agent/main.py | 12 +- .../test/python/ambari_agent/TestActionQueue.py | 4 +- .../test/python/ambari_agent/TestController.py | 3 - .../src/test/python/ambari_agent/TestMain.py | 9 +- 7 files changed, 280 insertions(+), 161 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 5300b52..15ae03d 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -76,10 +76,6 @@ class ActionQueue(threading.Thread): def __init__(self, config, controller): super(ActionQueue, self).__init__() self.commandQueue = Queue.Queue() - self.statusCommandQueue = None # the queue this field points to is re-created whenever - # a new StatusCommandExecutor child process is spawned - # by Controller - # multiprocessing.Queue() self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor. self.backgroundCommandQueue = Queue.Queue() self.commandStatuses = CommandStatusDict(callback_action = @@ -102,25 +98,7 @@ class ActionQueue(threading.Thread): return self._stop.isSet() def put_status(self, commands): - if not self.statusCommandQueue.empty(): - #Clear all status commands. Was supposed that we got all set of statuses, we don't need to keep old ones - statusCommandQueueSize = 0 - try: - while not self.statusCommandQueue.empty(): - self.statusCommandQueue.get(False) - statusCommandQueueSize = statusCommandQueueSize + 1 - except Queue.Empty: - pass - - logger.info("Number of status commands removed from queue : " + str(statusCommandQueueSize)) - - for command in commands: - logger.info("Adding " + command['commandType'] + " for component " + \ - command['componentName'] + " of service " + \ - command['serviceName'] + " of cluster " + \ - command['clusterName'] + " to the queue.") - self.statusCommandQueue.put(command) - logger.debug(pprint.pformat(command)) + self.controller.statusCommandsExecutor.put_commands(commands) def put(self, commands): for command in commands: @@ -167,8 +145,8 @@ class ActionQueue(threading.Thread): def run(self): try: while not self.stopped(): - self.processBackgroundQueueSafeEmpty(); - self.processStatusCommandResultQueueSafeEmpty(); + self.processBackgroundQueueSafeEmpty() + self.process_status_command_results() try: if self.parallel_execution == 0: command = self.commandQueue.get(True, self.EXECUTION_COMMAND_WAIT_TIME) @@ -212,23 +190,13 @@ class ActionQueue(threading.Thread): except Queue.Empty: pass - def processStatusCommandResultQueueSafeEmpty(self): - try: - while not self.statusCommandResultQueue.empty(): - try: - result = self.statusCommandResultQueue.get(False) - self.process_status_command_result(result) - except Queue.Empty: - pass - except IOError: - # on race condition in multiprocessing.Queue if get/put and thread kill are executed at the same time. - # During queue.close IOError will be thrown (this prevents from permanently dead-locked get). - pass - except UnicodeDecodeError: - pass - except IOError: - # queue.empty() may also throw IOError - pass + def process_status_command_results(self): + self.controller.statusCommandsExecutor.process_logs() + for result in self.controller.statusCommandsExecutor.get_results(): + try: + self.process_status_command_result(result) + except UnicodeDecodeError: + pass def createCommandHandle(self, command): if command.has_key('__handle'): http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 61a74e6..301ad43 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -40,7 +40,6 @@ import AmbariConfig from ambari_agent.Heartbeat import Heartbeat from ambari_agent.Register import Register from ambari_agent.ActionQueue import ActionQueue -from ambari_agent.StatusCommandsExecutor import StatusCommandsExecutor from ambari_agent.FileCache import FileCache from ambari_agent.NetUtil import NetUtil from ambari_agent.LiveStatus import LiveStatus @@ -49,6 +48,7 @@ from ambari_agent.ClusterConfiguration import ClusterConfiguration from ambari_agent.RecoveryManager import RecoveryManager from ambari_agent.HeartbeatHandlers import HeartbeatStopHandlers, bind_signal_handlers from ambari_agent.ExitHelper import ExitHelper +from ambari_agent.StatusCommandsExecutor import StatusCommandsExecutor from resource_management.libraries.functions.version import compare_versions from ambari_commons.os_utils import get_used_ram @@ -87,9 +87,6 @@ class Controller(threading.Thread): self.hasMappedComponents = True self.statusCommandsExecutor = None - # this lock is used control which thread spawns/kills the StatusCommandExecutor child process - self.spawnKillStatusCommandExecutorLock = threading.RLock() - # Event is used for synchronizing heartbeat iterations (to make possible # manual wait() interruption between heartbeats ) self.heartbeat_stop_callback = heartbeat_stop_callback @@ -205,7 +202,7 @@ class Controller(threading.Thread): # Start StatusCommandExecutor child process or restart it if already running # in order to receive up to date agent config. - self.spawnStatusCommandsExecutorProcess() + self.statusCommandsExecutor.relaunch("REGISTER_WITH_SERVER") if 'statusCommands' in ret.keys(): logger.debug("Got status commands on registration.") @@ -476,51 +473,11 @@ class Controller(threading.Thread): logger.log(logging_level, "Wait for next heartbeat over") - def spawnStatusCommandsExecutorProcess(self): - ''' - Starts a new StatusCommandExecutor child process. In case there is a running instance - already restarts it by simply killing it and starting new one. - This function is thread-safe. - ''' - with self.getSpawnKillStatusCommandExecutorLock(): - # if there is already an instance of StatusCommandExecutor kill it first - self.killStatusCommandsExecutorProcess() - - # Re-create the status command queue as in case the consumer - # process is killed the queue may deadlock (see http://bugs.python.org/issue20527). - # The queue must be re-created by the producer process. - statusCommandQueue = self.actionQueue.statusCommandQueue - self.actionQueue.statusCommandQueue = multiprocessing.Queue() - - if statusCommandQueue is not None: - statusCommandQueue.close() - - logger.info("Spawning statusCommandsExecutor") - self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue) - self.statusCommandsExecutor.start() - - def killStatusCommandsExecutorProcess(self): - ''' - Kills the StatusExecutorChild process if exists. This function is thread-safe. - ''' - with self.getSpawnKillStatusCommandExecutorLock(): - if self.statusCommandsExecutor is not None and self.statusCommandsExecutor.is_alive(): - logger.info("Terminating statusCommandsExecutor.") - self.statusCommandsExecutor.kill() - - def getSpawnKillStatusCommandExecutorLock(self): - ''' - Re-entrant lock to be used to synchronize the spawning or killing of - StatusCommandExecutor child process in multi-thread environment. - ''' - return self.spawnKillStatusCommandExecutorLock; - - def getStatusCommandsExecutor(self): - return self.statusCommandsExecutor - def run(self): try: self.actionQueue = ActionQueue(self.config, controller=self) + self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue) + ExitHelper().register(self.statusCommandsExecutor.kill, "CLEANUP_KILLING") self.actionQueue.start() self.register = Register(self.config) self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector()) @@ -613,6 +570,9 @@ class Controller(threading.Thread): logger.debug("LiveStatus.CLIENT_COMPONENTS" + str(LiveStatus.CLIENT_COMPONENTS)) logger.debug("LiveStatus.COMPONENTS" + str(LiveStatus.COMPONENTS)) + def get_status_commands_executor(self): + return self.statusCommandsExecutor + def move_data_dir_mount_file(self): """ In Ambari 2.1.2, we moved the dfs_data_dir_mount.hist to a static location http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py index 2f15770..3f7ef4c 100644 --- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -''' +""" Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information @@ -15,80 +15,279 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -''' +""" -import os -import signal -import threading +import Queue import logging import multiprocessing -from ambari_agent.PythonReflectiveExecutor import PythonReflectiveExecutor +import os +import pprint +import threading + +import time + +import signal from ambari_agent.RemoteDebugUtils import bind_debug_signal_handlers -from ambari_agent.ExitHelper import ExitHelper logger = logging.getLogger(__name__) -class StatusCommandsExecutor(multiprocessing.Process): - """ - A process which executes status/security status commands. - It dies and respawns itself on timeout of the command. Which is the most graceful way to end the currently running status command. - """ +class StatusCommandsExecutor(object): def __init__(self, config, actionQueue): - multiprocessing.Process.__init__(self) - self.config = config self.actionQueue = actionQueue - self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5)) # in seconds - self.hasTimeoutedEvent = multiprocessing.Event() - ExitHelper().register(self.kill) + # used to prevent queues from been used during creation of new one to prevent threads messing up with combination of + # old and new queues + self.usage_lock = threading.RLock() - def run(self): - try: - bind_debug_signal_handlers() - logger.info("StatusCommandsExecutor starting") + self.status_command_timeout = int(self.config.get('agent', 'status_command_timeout', 5)) + self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator + + self.worker_process = None + self.mustDieEvent = multiprocessing.Event() + self.timedOutEvent = multiprocessing.Event() + + # multiprocessing stuff that need to be cleaned every time + self.mp_result_queue = multiprocessing.Queue() + self.mp_result_logs = multiprocessing.Queue() + self.mp_task_queue = multiprocessing.Queue() + + def _log_message(self, level, message, exception=None): + """ + Put log message to logging queue. Must be used only for logging from child process(in _worker_process_target). + + :param level: + :param message: + :param exception: + :return: + """ + result_message = "StatusCommandExecutor reporting at {0}: ".format(time.time()) + message + self.mp_result_logs.put((level, result_message, exception)) + + def get_log_messages(self): + """ + Returns list of (level, message, exception) log messages. + + :return: list of (level, message, exception) + """ + results = [] + with self.usage_lock: + try: + while not self.mp_result_logs.empty(): + try: + results.append(self.mp_result_logs.get(False)) + except Queue.Empty: + pass + except IOError: + pass + except UnicodeDecodeError: + pass + except IOError: + pass + return results + + def process_logs(self): + """ + Get all available at this moment logs and prints them to logger. + """ + for level, message, exception in self.get_log_messages(): + if level == logging.ERROR: + logger.debug(message, exc_info=exception) + if level == logging.WARN: + logger.warn(message) + if level == logging.INFO: + logger.info(message) + + def _worker_process_target(self): + """ + Internal method that running in separate process. + """ + bind_debug_signal_handlers() + self._log_message(logging.INFO, "StatusCommandsExecutor process started") + + # region StatusCommandsExecutor process internals + internal_in_queue = Queue.Queue() + internal_out_queue = Queue.Queue() + + def _internal_worker(): + """ + thread that actually executes status commands + """ while True: - command = self.actionQueue.statusCommandQueue.get(True) # blocks until status status command appears - logger.debug("Running status command for {0}".format(command['componentName'])) - - timeout_timer = threading.Timer( self.status_command_timeout, self.respawn, [command]) - timeout_timer.start() - - self.process_status_command(command) - - timeout_timer.cancel() - logger.debug("Completed status command for {0}".format(command['componentName'])) - except: - logger.exception("StatusCommandsExecutor process failed with exception:") - raise + _cmd = internal_in_queue.get() + component_status_result = self.customServiceOrchestrator.requestComponentStatus(_cmd) + component_security_status_result = self.customServiceOrchestrator.requestComponentSecurityState(_cmd) + internal_out_queue.put((_cmd, component_status_result, component_security_status_result)) - logger.warn("StatusCommandsExecutor process has finished") + worker = threading.Thread(target=_internal_worker) + worker.daemon = True + worker.start() - def process_status_command(self, command): - component_status_result = self.actionQueue.customServiceOrchestrator.requestComponentStatus(command) - component_security_status_result = self.actionQueue.customServiceOrchestrator.requestComponentSecurityState(command) - result = (command, component_status_result, component_security_status_result) + def _internal_process_command(_command): + internal_in_queue.put(_command) + start_time = time.time() + result = None + while not self.mustDieEvent.is_set() and not result and time.time() - start_time < self.status_command_timeout: + try: + result = internal_out_queue.get(timeout=1) + except Queue.Empty: + pass - self.actionQueue.statusCommandResultQueue.put(result) + if result: + self.mp_result_queue.put(result) + return True + else: + # do not set timed out event twice + if not self.timedOutEvent.is_set(): + self._set_timed_out(_command) + return False + + # endregion - def respawn(self, command): try: - if hasattr(PythonReflectiveExecutor, "last_context"): - # Force context to reset to normal. By context we mean sys.path, imports, etc. They are set by specific status command, and are not relevant to ambari-agent. - PythonReflectiveExecutor.last_context.revert() + while not self.mustDieEvent.is_set(): + try: + command = self.mp_task_queue.get(False) + except Queue.Empty: + # no command, lets try in other loop iteration + time.sleep(.1) + continue + + self._log_message(logging.DEBUG, "Running status command for {0}".format(command['componentName'])) - logger.warn("Command {0} for {1} is running for more than {2} seconds. Terminating it due to timeout.".format(command['commandType'], command['componentName'], self.status_command_timeout)) + if _internal_process_command(command): + self._log_message(logging.DEBUG, "Completed status command for {0}".format(command['componentName'])) - self.hasTimeoutedEvent.set() - except: - logger.exception("StatusCommandsExecutor.finish thread failed with exception:") + except Exception as e: + self._log_message(logging.ERROR, "StatusCommandsExecutor process failed with exception:", e) raise - def kill(self): - os.kill(self.pid, signal.SIGKILL) + self._log_message(logging.WARN, "StatusCommandsExecutor subprocess finished") + + def _set_timed_out(self, command): + """ + Set timeout event and adding log entry for given command. + + :param command: + :return: + """ + msg = "Command {0} for {1} is running for more than {2} seconds. Terminating it due to timeout.".format( + command['commandType'], + command['componentName'], + self.status_command_timeout + ) + self._log_message(logging.WARN, msg) + self.timedOutEvent.set() + + def put_commands(self, commands): + """ + Put given commands to command executor. + + :param commands: status commands to execute + :return: + """ + with self.usage_lock: + if not self.mp_task_queue.empty(): + status_command_queue_size = 0 + try: + while not self.mp_task_queue.empty(): + self.mp_task_queue.get(False) + status_command_queue_size += 1 + except Queue.Empty: + pass + + logger.info("Number of status commands removed from queue : " + str(status_command_queue_size)) + for command in commands: + logger.info("Adding " + command['commandType'] + " for component " + \ + command['componentName'] + " of service " + \ + command['serviceName'] + " of cluster " + \ + command['clusterName'] + " to the queue.") + self.mp_task_queue.put(command) + logger.debug(pprint.pformat(command)) + + def get_results(self): + """ + Get all available results for status commands. + + :return: list of results + """ + results = [] + with self.usage_lock: + try: + while not self.mp_result_queue.empty(): + try: + results.append(self.mp_result_queue.get(False)) + except Queue.Empty: + pass + except IOError: + pass + except UnicodeDecodeError: + pass + except IOError: + pass + return results + + @property + def need_relaunch(self): + """ + Indicates if process need to be relaunched due to timeout or it is dead or even was not created. + """ + return self.timedOutEvent.is_set() or not self.worker_process or not self.worker_process.is_alive() + + def relaunch(self, reason=None): + """ + Restart status command executor internal process. + + :param reason: reason of restart + :return: + """ + self.kill(reason) + self.worker_process = multiprocessing.Process(target=self._worker_process_target) + self.worker_process.start() + logger.info("Started process with pid {0}".format(self.worker_process.pid)) + + def kill(self, reason=None): + """ + Tries to stop command executor internal process for sort time, otherwise killing it. Closing all possible queues to + unblock threads that probably blocked on read or write operations to queues. Must be called from threads different + from threads that calling read or write methods(get_log_messages, get_results, put_commands). + + :param reason: reason of killing + :return: + """ + # try graceful stop, otherwise hard-kill + if self.worker_process and self.worker_process.is_alive(): + logger.info("Killing child process reason:" + str(reason)) + self.mustDieEvent.set() + self.worker_process.join(timeout=3) + if self.worker_process.is_alive(): + os.kill(self.worker_process.pid, signal.SIGKILL) + logger.info("Child process killed by -9") + else: + # get log messages only if we died gracefully, otherwise we will have chance to block here forever, in most cases + # this call will do nothing, as all logs will be processed in ActionQueue loop + self.process_logs() + logger.info("Child process died gracefully") + else: + logger.info("Child process already dead") + + # close queues and acquire usage lock + # closing both sides of pipes here, we need this hack in case of blocking on recv() call + self.mp_result_queue.close() + self.mp_result_queue._writer.close() + self.mp_result_logs.close() + self.mp_result_logs._writer.close() + self.mp_task_queue.close() + self.mp_task_queue._writer.close() - # prevent queue from ending up with non-freed semaphores, locks during put. Which would result in dead-lock in process executing get. - self.actionQueue.statusCommandResultQueue.close() - self.actionQueue.statusCommandResultQueue.join_thread() - self.actionQueue.statusCommandResultQueue = multiprocessing.Queue() + with self.usage_lock: + self.mp_result_queue.join_thread() + self.mp_result_queue = multiprocessing.Queue() + self.mp_task_queue.join_thread() + self.mp_task_queue = multiprocessing.Queue() + self.mp_result_logs.join_thread() + self.mp_result_logs = multiprocessing.Queue() + self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator + self.mustDieEvent.clear() + self.timedOutEvent.clear() http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index 3f333c4..3c5d8f1 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -311,18 +311,14 @@ def run_threads(server_hostname, heartbeat_stop_callback): # Launch Controller communication controller = Controller(config, server_hostname, heartbeat_stop_callback) controller.start() + time.sleep(2) # in order to get controller.statusCommandsExecutor initialized while controller.is_alive(): time.sleep(0.1) - with controller.getSpawnKillStatusCommandExecutorLock(): - # We need to lock as Controller.py may try to spawn StatusCommandExecutor child in parallel as well - if controller.getStatusCommandsExecutor() is not None \ - and (not controller.getStatusCommandsExecutor().is_alive() - or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()): - controller.spawnStatusCommandsExecutorProcess() + if controller.get_status_commands_executor().need_relaunch: + controller.get_status_commands_executor().relaunch("COMMAND_TIMEOUT_OR_KILLED") - - controller.killStatusCommandsExecutorProcess() + controller.get_status_commands_executor().kill() # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process # we need this for windows os, where no sigterm available http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index 9fefefb..8701a24 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -1329,7 +1329,7 @@ class TestActionQueue(TestCase): execute_command = copy.deepcopy(self.background_command) actionQueue.put([execute_command]) actionQueue.processBackgroundQueueSafeEmpty(); - actionQueue.processStatusCommandResultQueueSafeEmpty(); + actionQueue.process_status_command_results(); #assert that python execturor start self.assertTrue(runCommand_mock.called) @@ -1373,7 +1373,7 @@ class TestActionQueue(TestCase): None, command_complete_w) actionQueue.put([self.background_command]) actionQueue.processBackgroundQueueSafeEmpty(); - actionQueue.processStatusCommandResultQueueSafeEmpty(); + actionQueue.process_status_command_results(); with lock: complete_done.wait(0.1) http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/test/python/ambari_agent/TestController.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py index 663e215..7f5d451 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestController.py +++ b/ambari-agent/src/test/python/ambari_agent/TestController.py @@ -44,7 +44,6 @@ import ambari_commons @not_for_platform(PLATFORM_WINDOWS) @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) [email protected](Controller.Controller, "spawnStatusCommandsExecutorProcess", new = MagicMock()) class TestController(unittest.TestCase): logger = logging.getLogger() @@ -119,10 +118,8 @@ class TestController(unittest.TestCase): self.assertEqual({"responseId":1}, self.controller.registerWithServer()) self.controller.sendRequest.return_value = {"responseId":1, "statusCommands": "commands", "log":"", "exitstatus":"0"} - self.controller.addToStatusQueue = MagicMock(name="addToStatusQueue") self.controller.isRegistered = False self.assertEqual({'exitstatus': '0', 'responseId': 1, 'log': '', 'statusCommands': 'commands'}, self.controller.registerWithServer()) - self.controller.addToStatusQueue.assert_called_with("commands") calls = [] http://git-wip-us.apache.org/repos/asf/ambari/blob/17ef5559/ambari-agent/src/test/python/ambari_agent/TestMain.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py b/ambari-agent/src/test/python/ambari_agent/TestMain.py index 97c448b..af2f68b 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestMain.py +++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py @@ -324,8 +324,6 @@ class TestMain(unittest.TestCase): @patch.object(Controller, "__init__") @patch.object(Controller, "is_alive") @patch.object(Controller, "start") - @patch.object(Controller, "getStatusCommandsExecutor") - @patch.object(Controller, "killStatusCommandsExecutorProcess") @patch("optparse.OptionParser.parse_args") @patch.object(DataCleaner,"start") @patch.object(DataCleaner,"__init__") @@ -333,9 +331,10 @@ class TestMain(unittest.TestCase): @patch.object(PingPortListener,"__init__") @patch.object(ExitHelper,"execute_cleanup") @patch.object(ExitHelper, "exit") - def test_main(self, exithelper_exit_mock, cleanup_mock, ping_port_init_mock, ping_port_start_mock, data_clean_init_mock,data_clean_start_mock, - parse_args_mock, start_mock, Controller_killStatusCommandsExecutorProcess, - Controller_getStatusCommandsExecutor, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock, + @patch.object(Controller, "get_status_commands_executor") + def test_main(self, get_status_commands_executor_mock, exithelper_exit_mock, cleanup_mock, ping_port_init_mock, + ping_port_start_mock, data_clean_init_mock,data_clean_start_mock, + parse_args_mock, start_mock, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock, update_log_level_mock, daemonize_mock, perform_prestart_checks_mock, ambari_config_mock, stop_mock, bind_signal_handlers_mock,
