Repository: ambari Updated Branches: refs/heads/trunk 0f24bc24b -> 5bcd37529
AMBARI-6768 Add ability to an agent to cancel queued/running tasks (dsen) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5bcd3752 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5bcd3752 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5bcd3752 Branch: refs/heads/trunk Commit: 5bcd375294cc072665097187c76199696859c00b Parents: 0f24bc2 Author: Dmytro Sen <[email protected]> Authored: Thu Aug 7 17:15:54 2014 +0300 Committer: Dmytro Sen <[email protected]> Committed: Thu Aug 7 17:15:54 2014 +0300 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 33 ++++++++ .../src/main/python/ambari_agent/Controller.py | 18 ++++- .../ambari_agent/CustomServiceOrchestrator.py | 28 ++++++- .../main/python/ambari_agent/PythonExecutor.py | 7 +- .../test/python/ambari_agent/TestActionQueue.py | 39 +++++++++ .../test/python/ambari_agent/TestController.py | 6 ++ .../TestCustomServiceOrchestrator.py | 84 +++++++++++++++++++- .../python/ambari_agent/TestPythonExecutor.py | 18 +++-- 8 files changed, 220 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 58d3e75..d3aad6e 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -108,6 +108,32 @@ class ActionQueue(threading.Thread): logger.debug(pprint.pformat(command)) self.commandQueue.put(command) + def cancel(self, commands): + for command in commands: + + logger.info("Canceling command {tid}".format(tid = str(command['target_task_id']))) + logger.debug(pprint.pformat(command)) + + task_id = command['target_task_id'] + reason = command['reason'] + + # Remove from the command queue by task_id + queue = self.commandQueue + self.commandQueue = Queue.Queue() + + while not queue.empty(): + queued_command = queue.get(False) + if queued_command['task_id'] != task_id: + self.commandQueue.put(queued_command) + else: + logger.info("Canceling " + queued_command['commandType'] + \ + " for service " + queued_command['serviceName'] + \ + " of cluster " + queued_command['clusterName'] + \ + " to the queue.") + + # Kill if in progress + self.customServiceOrchestrator.cancel_command(task_id, reason) + def run(self): while not self.stopped(): while not self.statusCommandQueue.empty(): @@ -287,3 +313,10 @@ class ActionQueue(threading.Thread): Actions that are executed every time when command status changes """ self.controller.heartbeat_wait_event.set() + + # Removes all commands from the queue + def reset(self): + queue = self.commandQueue + with queue.mutex: + queue.queue.clear() + http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 36ed94c..87af939 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -78,7 +78,7 @@ class Controller(threading.Thread): def __del__(self): logger.info("Server connection disconnected.") pass - + def registerWithServer(self): LiveStatus.SERVICES = [] LiveStatus.CLIENT_COMPONENTS = [] @@ -142,7 +142,12 @@ class Controller(threading.Thread): pass return ret - + def cancelCommandInQueue(self, commands): + """ Remove from the queue commands, kill the process if it's in progress """ + if commands: + self.actionQueue.cancel(commands) + pass + def addToQueue(self, commands): """Add to the queue for running the commands """ """ Put the required actions into the Queue """ @@ -223,6 +228,10 @@ class Controller(threading.Thread): else: self.responseId=serverId + if 'cancelCommands' in response.keys(): + self.cancelCommandInQueue(response['cancelCommands']) + pass + if 'executionCommands' in response.keys(): self.addToQueue(response['executionCommands']) pass @@ -309,6 +318,11 @@ class Controller(threading.Thread): logger.info("Registration response from %s was %s", self.serverHostname, message) if self.isRegistered: + # Clearing command queue to stop executing "stale" commands + # after registration + logger.info('Resetting ActionQueue...') + self.actionQueue.reset() + # Process callbacks for callback in self.registration_listeners: callback() http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py index 4331678..e13e543 100644 --- a/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py +++ b/ambari-agent/src/main/python/ambari_agent/CustomServiceOrchestrator.py @@ -22,6 +22,7 @@ import logging import os import json import sys +import shell from FileCache import FileCache from AgentException import AgentException @@ -68,7 +69,19 @@ class CustomServiceOrchestrator(): os.unlink(self.status_commands_stderr) except OSError: pass # Ignore fail + self.commands_in_progress = {} + def map_task_to_process(self, task_id, processId): + self.commands_in_progress[task_id] = processId + + def cancel_command(self, task_id, reason): + if task_id in self.commands_in_progress.keys(): + pid = self.commands_in_progress.get(task_id) + self.commands_in_progress[task_id] = reason + logger.info("Canceling command with task_id - {tid}, " \ + "reason - {reason} . Killing process {pid}" + .format(tid = str(task_id), reason = reason, pid = pid)) + shell.kill_process_with_children(pid) def runCommand(self, command, tmpoutfile, tmperrfile, forced_command_name = None, override_output_files = True): @@ -132,7 +145,8 @@ class CustomServiceOrchestrator(): script_params = [command_name, json_path, current_base_dir] ret = self.python_executor.run_file(py_file, script_params, self.exec_tmp_dir, tmpoutfile, tmperrfile, timeout, - tmpstrucoutfile, logger_level, override_output_files) + tmpstrucoutfile, logger_level, self.map_task_to_process, + task_id, override_output_files) # Next run_file() invocations should always append to current output override_output_files = False if ret['exitcode'] != 0: @@ -141,6 +155,18 @@ class CustomServiceOrchestrator(): if not ret: # Something went wrong raise AgentException("No script has been executed") + # if canceled + pid = self.commands_in_progress.pop(task_id) + if not isinstance(pid, int): + reason = '\nCommand aborted. ' + pid + ret['stdout'] += reason + ret['stderr'] += reason + + with open(tmpoutfile, "a") as f: + f.write(reason) + with open(tmperrfile, "a") as f: + f.write(reason) + except Exception: # We do not want to let agent fail completely exc_type, exc_obj, exc_tb = sys.exc_info() message = "Catched an exception while executing "\ http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py index c4f1234..704e8f3 100644 --- a/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/PythonExecutor.py @@ -47,8 +47,9 @@ class PythonExecutor: self.config = config pass - def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile, timeout, - tmpstructedoutfile, logger_level, override_output_files = True): + def run_file(self, script, script_params, tmp_dir, tmpoutfile, tmperrfile, + timeout, tmpstructedoutfile, logger_level, callback, task_id, + override_output_files = True): """ Executes the specified python file in a separate subprocess. Method returns only when the subprocess is finished. @@ -77,6 +78,8 @@ class PythonExecutor: pythonCommand = self.python_command(script, script_params) logger.info("Running command " + pprint.pformat(pythonCommand)) process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr) + # map task_id to pid + callback(task_id, process.pid) logger.debug("Launching watchdog thread") self.event.clear() self.python_process_has_been_killed = False http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py index 04c12b2..e06efe4 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py +++ b/ambari-agent/src/test/python/ambari_agent/TestActionQueue.py @@ -487,3 +487,42 @@ class TestActionQueue(TestCase): self.assertTrue(requestComponentStatus_mock.called) self.assertEqual(len(report['componentStatus']), 1) self.assertTrue(report['componentStatus'][0].has_key('alerts')) + + @patch.object(ActionQueue, "process_command") + @patch.object(Queue, "get") + @patch.object(CustomServiceOrchestrator, "__init__") + def test_reset_queue(self, CustomServiceOrchestrator_mock, + get_mock, process_command_mock): + CustomServiceOrchestrator_mock.return_value = None + dummy_controller = MagicMock() + config = MagicMock() + actionQueue = ActionQueue(config, dummy_controller) + actionQueue.start() + actionQueue.put([self.datanode_install_command, self.hbase_install_command]) + self.assertEqual(2, actionQueue.commandQueue.qsize()) + actionQueue.reset() + self.assertTrue(actionQueue.commandQueue.empty()) + time.sleep(0.1) + actionQueue.stop() + actionQueue.join() + self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') + + @patch.object(ActionQueue, "process_command") + @patch.object(Queue, "get") + @patch.object(CustomServiceOrchestrator, "__init__") + def test_cancel(self, CustomServiceOrchestrator_mock, + get_mock, process_command_mock): + CustomServiceOrchestrator_mock.return_value = None + dummy_controller = MagicMock() + config = MagicMock() + actionQueue = ActionQueue(config, dummy_controller) + actionQueue.start() + actionQueue.put([self.datanode_install_command, self.hbase_install_command]) + self.assertEqual(2, actionQueue.commandQueue.qsize()) + actionQueue.reset() + self.assertTrue(actionQueue.commandQueue.empty()) + time.sleep(0.1) + actionQueue.stop() + actionQueue.join() + self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.') + http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/test/python/ambari_agent/TestController.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestController.py b/ambari-agent/src/test/python/ambari_agent/TestController.py index 6c1dcd0..9ec23db 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestController.py +++ b/ambari-agent/src/test/python/ambari_agent/TestController.py @@ -236,6 +236,8 @@ class TestController(unittest.TestCase): self.controller.registerWithServer = registerWithServer heartbeatWithServer = MagicMock(name="heartbeatWithServer") self.controller.heartbeatWithServer = heartbeatWithServer + actionQueue = MagicMock(name="actionQueue") + self.controller.actionQueue = actionQueue Controller.Controller.__sendRequest__ = MagicMock(side_effect=Exception()) @@ -257,6 +259,8 @@ class TestController(unittest.TestCase): self.controller.registerWithServer = registerWithServer heartbeatWithServer = MagicMock(name="heartbeatWithServer") self.controller.heartbeatWithServer = heartbeatWithServer + actionQueue = MagicMock(name="actionQueue") + self.controller.actionQueue = actionQueue listener1 = MagicMock() listener2 = MagicMock() @@ -282,6 +286,8 @@ class TestController(unittest.TestCase): self.controller.registerWithServer = registerWithServer heartbeatWithServer = MagicMock(name="heartbeatWithServer") self.controller.heartbeatWithServer = heartbeatWithServer + actionQueue = MagicMock(name="actionQueue") + self.controller.actionQueue = actionQueue self.controller.isRegistered = True self.controller.registerAndHeartbeat() http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py index e8668f6..d669cd2 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py +++ b/ambari-agent/src/test/python/ambari_agent/TestCustomServiceOrchestrator.py @@ -18,9 +18,11 @@ See the License for the specific language governing permissions and limitations under the License. ''' import ConfigParser +from multiprocessing.pool import ThreadPool import os import pprint +import shell from unittest import TestCase import threading @@ -185,6 +187,8 @@ class TestCustomServiceOrchestrator(TestCase): '/hooks_dir/prefix-command') dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) + unix_process_id = 111 + orchestrator.commands_in_progress = {command['taskId']: unix_process_id} get_hook_base_dir_mock.return_value = "/hooks/" # normal run case run_file_mock.return_value = { @@ -208,9 +212,9 @@ class TestCustomServiceOrchestrator(TestCase): ret = orchestrator.runCommand(command, "out.txt", "err.txt", forced_command_name=CustomServiceOrchestrator.COMMAND_NAME_STATUS) ## Check that override_output_files was true only during first call - self.assertEquals(run_file_mock.call_args_list[0][0][8], True) - self.assertEquals(run_file_mock.call_args_list[1][0][8], False) - self.assertEquals(run_file_mock.call_args_list[2][0][8], False) + self.assertEquals(run_file_mock.call_args_list[0][0][10], True) + self.assertEquals(run_file_mock.call_args_list[1][0][10], False) + self.assertEquals(run_file_mock.call_args_list[2][0][10], False) ## Check that forced_command_name was taken into account self.assertEqual(run_file_mock.call_args_list[0][0][1][0], CustomServiceOrchestrator.COMMAND_NAME_STATUS) @@ -229,6 +233,78 @@ class TestCustomServiceOrchestrator(TestCase): pass + @patch("shell.kill_process_with_children") + @patch.object(CustomServiceOrchestrator, "resolve_script_path") + @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path") + @patch.object(FileCache, "get_service_base_dir") + @patch.object(FileCache, "get_hook_base_dir") + @patch.object(CustomServiceOrchestrator, "dump_command_to_json") + @patch.object(PythonExecutor, "run_file") + @patch.object(FileCache, "__init__") + def test_cancel_command(self, FileCache_mock, + run_file_mock, dump_command_to_json_mock, + get_hook_base_dir_mock, get_service_base_dir_mock, + resolve_hook_script_path_mock, resolve_script_path_mock, + kill_process_with_children_mock): + FileCache_mock.return_value = None + command = { + 'role' : 'REGION_SERVER', + 'hostLevelParams' : { + 'stack_name' : 'HDP', + 'stack_version' : '2.0.7', + 'jdk_location' : 'some_location' + }, + 'commandParams': { + 'script_type': 'PYTHON', + 'script': 'scripts/hbase_regionserver.py', + 'command_timeout': '600', + 'service_package_folder' : 'HBASE' + }, + 'taskId' : '3', + 'roleCommand': 'INSTALL' + } + get_service_base_dir_mock.return_value = "/basedir/" + resolve_script_path_mock.return_value = "/basedir/scriptpath" + resolve_hook_script_path_mock.return_value = \ + ('/hooks_dir/prefix-command/scripts/hook.py', + '/hooks_dir/prefix-command') + dummy_controller = MagicMock() + orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) + unix_process_id = 111 + orchestrator.commands_in_progress = {command['taskId']: unix_process_id} + get_hook_base_dir_mock.return_value = "/hooks/" + run_file_mock_return_value = { + 'stdout' : 'killed', + 'stderr' : 'killed', + 'exitcode': 1, + } + def side_effect(*args, **kwargs): + time.sleep(0.2) + return run_file_mock_return_value + run_file_mock.side_effect = side_effect + + _, out = tempfile.mkstemp() + _, err = tempfile.mkstemp() + pool = ThreadPool(processes=1) + async_result = pool.apply_async(orchestrator.runCommand, (command, out, err)) + + time.sleep(0.1) + orchestrator.cancel_command(command['taskId'], 'reason') + + ret = async_result.get() + + self.assertEqual(ret['exitcode'], 1) + self.assertEquals(ret['stdout'], 'killed\nCommand aborted. reason') + self.assertEquals(ret['stderr'], 'killed\nCommand aborted. reason') + + self.assertTrue(kill_process_with_children_mock.called) + self.assertFalse(command['taskId'] in orchestrator.commands_in_progress.keys()) + self.assertTrue(os.path.exists(out)) + self.assertTrue(os.path.exists(err)) + os.remove(out) + os.remove(err) + + @patch.object(CustomServiceOrchestrator, "dump_command_to_json") @patch.object(PythonExecutor, "run_file") @patch.object(FileCache, "__init__") @@ -252,6 +328,8 @@ class TestCustomServiceOrchestrator(TestCase): } dummy_controller = MagicMock() orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) + unix_process_id = 111 + orchestrator.commands_in_progress = {command['taskId']: unix_process_id} # normal run case run_file_mock.return_value = { 'stdout' : 'sss', http://git-wip-us.apache.org/repos/asf/ambari/blob/5bcd3752/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py index c33b97d..d1bec5d 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py +++ b/ambari-agent/src/test/python/ambari_agent/TestPythonExecutor.py @@ -55,12 +55,15 @@ class TestPythonExecutor(TestCase): runShellKillPgrp_method.side_effect = lambda python : python.terminate() executor.runShellKillPgrp = runShellKillPgrp_method subproc_mock.returncode = None + callback_method = MagicMock() thread = Thread(target = executor.run_file, args = ("fake_puppetFile", - ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, tmpstrucout,"INFO")) + ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile, + PYTHON_TIMEOUT_SECONDS, tmpstrucout, "INFO", callback_method, '1')) thread.start() time.sleep(0.1) subproc_mock.finished_event.wait() self.assertEquals(subproc_mock.was_terminated, True, "Subprocess should be terminated due to timeout") + self.assertTrue(callback_method.called) def test_watchdog_2(self): @@ -83,16 +86,18 @@ class TestPythonExecutor(TestCase): runShellKillPgrp_method.side_effect = lambda python : python.terminate() executor.runShellKillPgrp = runShellKillPgrp_method subproc_mock.returncode = 0 + callback_method = MagicMock() thread = Thread(target = executor.run_file, args = ("fake_puppetFile", ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile, - PYTHON_TIMEOUT_SECONDS, tmpstrucout, "INFO")) + PYTHON_TIMEOUT_SECONDS, tmpstrucout, + "INFO", callback_method, "1-1")) thread.start() time.sleep(0.1) subproc_mock.should_finish_event.set() subproc_mock.finished_event.wait() self.assertEquals(subproc_mock.was_terminated, False, "Subprocess should not be terminated before timeout") self.assertEquals(subproc_mock.returncode, 0, "Subprocess should not be terminated before timeout") - + self.assertTrue(callback_method.called) def test_execution_results(self): subproc_mock = self.Subprocess_mockup() @@ -112,10 +117,13 @@ class TestPythonExecutor(TestCase): executor.runShellKillPgrp = runShellKillPgrp_method subproc_mock.returncode = 0 subproc_mock.should_finish_event.set() - result = executor.run_file("file", ["arg1", "arg2"], "/fake_tmp_dir", tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, tmpstroutfile, "INFO") + callback_method = MagicMock() + result = executor.run_file("file", ["arg1", "arg2"], "/fake_tmp_dir", + tmpoutfile, tmperrfile, PYTHON_TIMEOUT_SECONDS, + tmpstroutfile, "INFO", callback_method, "1-1") self.assertEquals(result, {'exitcode': 0, 'stderr': 'Dummy err', 'stdout': 'Dummy output', 'structuredOut': {}}) - + self.assertTrue(callback_method.called) def test_is_successfull(self): executor = PythonExecutor("/tmp", AmbariConfig().getConfig())
