ARIA-285 Cancel execution may leave running processes
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/615baf9e Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/615baf9e Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/615baf9e Branch: refs/heads/ARIA-285-Cancel-execution-may-leave-running-processes Commit: 615baf9e079575d920722f4bed1ad3ff451116f3 Parents: 807db30 Author: max-orlov <[email protected]> Authored: Sun Jun 25 12:19:02 2017 +0300 Committer: max-orlov <[email protected]> Committed: Tue Jun 27 17:05:45 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/core/engine.py | 10 ++++ aria/orchestrator/workflows/executor/base.py | 9 +++- aria/orchestrator/workflows/executor/celery.py | 16 +++--- aria/orchestrator/workflows/executor/process.py | 46 ++++++++++++++--- requirements.in | 1 + requirements.txt | 1 + .../orchestrator/workflows/executor/__init__.py | 7 +++ .../workflows/executor/test_process_executor.py | 52 +++++++++++++++++++- tests/requirements.txt | 1 + 9 files changed, 125 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/615baf9e/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index d52ae85..5a94df8 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -66,13 +66,23 @@ class Engine(logger.LoggerMixin): else: time.sleep(0.1) if cancel: + self._terminate_tasks(tasks_tracker.executing_tasks) events.on_cancelled_workflow_signal.send(ctx) else: events.on_success_workflow_signal.send(ctx) except BaseException as e: + # Cleanup any remaining tasks + self._terminate_tasks(tasks_tracker.executing_tasks) events.on_failure_workflow_signal.send(ctx, exception=e) raise + def _terminate_tasks(self, tasks): + for task in tasks: + try: + self._executors[task._executor].terminate(task.id) + except BaseException: + pass + @staticmethod def cancel_execution(ctx): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/615baf9e/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 6a3c9d2..4cc4503 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -25,7 +25,7 @@ class BaseExecutor(logger.LoggerMixin): """ Base class for executors for running tasks """ - def _execute(self, task): + def _execute(self, ctx): raise NotImplementedError def execute(self, ctx): @@ -48,6 +48,13 @@ class BaseExecutor(logger.LoggerMixin): """ pass + def terminate(self, ctx): + """ + Terminate the executing task + :return: + """ + pass + @staticmethod def _task_started(ctx): events.start_task_signal.send(ctx) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/615baf9e/aria/orchestrator/workflows/executor/celery.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/celery.py b/aria/orchestrator/workflows/executor/celery.py index 9d66d26..46b15fd 100644 --- a/aria/orchestrator/workflows/executor/celery.py +++ b/aria/orchestrator/workflows/executor/celery.py @@ -42,15 +42,15 @@ class CeleryExecutor(BaseExecutor): self._receiver_thread.start() self._started_queue.get(timeout=30) - def _execute(self, task): - self._tasks[task.id] = task - arguments = dict(arg.unwrapped for arg in task.arguments.values()) - arguments['ctx'] = task.context - self._results[task.id] = self._app.send_task( - task.operation_mapping, + def _execute(self, ctx): + self._tasks[ctx.id] = ctx + arguments = dict(arg.unwrapped for arg in ctx.arguments.values()) + arguments['ctx'] = ctx.context + self._results[ctx.id] = self._app.send_task( + ctx.operation_mapping, kwargs=arguments, - task_id=task.id, - queue=self._get_queue(task)) + task_id=ctx.id, + queue=self._get_queue(ctx)) def close(self): self._stopped = True http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/615baf9e/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 8518b33..59e61b6 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -25,6 +25,10 @@ import sys # As part of the process executor implementation, subprocess are started with this module as their # entry point. We thus remove this module's directory from the python path if it happens to be # there + +import signal +from collections import namedtuple + script_dir = os.path.dirname(__file__) if script_dir in sys.path: sys.path.remove(script_dir) @@ -39,6 +43,7 @@ import tempfile import Queue import pickle +import psutil import jsonpickle import aria @@ -57,6 +62,9 @@ UPDATE_TRACKED_CHANGES_FAILED_STR = \ 'Some changes failed writing to storage. For more info refer to the log.' +_Task = namedtuple('_Task', 'proc, ctx') + + class ProcessExecutor(base.BaseExecutor): """ Executor which runs tasks in a subprocess environment @@ -113,9 +121,26 @@ class ProcessExecutor(base.BaseExecutor): self._server_socket.close() self._listener_thread.join(timeout=60) + for task_id in self._tasks: + self.terminate(task_id) + + def terminate(self, task_id): + task = self._tasks.get(task_id) + # The process might have managed to finish, thus it would not be in the tasks list + if task: + try: + parent_process = psutil.Process(task.proc.pid) + for child_process in reversed(parent_process.children(recursive=True)): + try: + child_process.send_signal(signal.SIGKILL) + except BaseException: + pass + parent_process.send_signal(signal.SIGKILL) + except BaseException: + pass + def _execute(self, ctx): self._check_closed() - self._tasks[ctx.task.id] = ctx # Temporary file used to pass arguments to the started subprocess file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json') @@ -125,10 +150,15 @@ class ProcessExecutor(base.BaseExecutor): env = self._construct_subprocess_env(task=ctx.task) # Asynchronously start the operation in a subprocess - subprocess.Popen( - '{0} {1} {2}'.format(sys.executable, __file__, arguments_json_path), - env=env, - shell=True) + proc = subprocess.Popen( + [ + sys.executable, + os.path.expanduser(os.path.expandvars(__file__)), + os.path.expanduser(os.path.expandvars(arguments_json_path)) + ], + env=env) + + self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=proc) def _remove_task(self, task_id): return self._tasks.pop(task_id) @@ -191,15 +221,15 @@ class ProcessExecutor(base.BaseExecutor): _send_message(connection, response) def _handle_task_started_request(self, task_id, **kwargs): - self._task_started(self._tasks[task_id]) + self._task_started(self._tasks[task_id].ctx) def _handle_task_succeeded_request(self, task_id, **kwargs): task = self._remove_task(task_id) - self._task_succeeded(task) + self._task_succeeded(task.ctx) def _handle_task_failed_request(self, task_id, request, **kwargs): task = self._remove_task(task_id) - self._task_failed(task, exception=request['exception'], traceback=request['traceback']) + self._task_failed(task.ctx, exception=request['exception'], traceback=request['traceback']) def _send_message(connection, message): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/615baf9e/requirements.in ---------------------------------------------------------------------- diff --git a/requirements.in b/requirements.in index cecc9fd..723ed51 100644 --- a/requirements.in +++ b/requirements.in @@ -33,6 +33,7 @@ PrettyTable>=0.7,<0.8 click_didyoumean==0.0.3 backports.shutil_get_terminal_size==1.0.0 logutils==0.3.4.1 +psutil>=5.2.2, < 6.0.0 importlib ; python_version < '2.7' ordereddict ; python_version < '2.7' total-ordering ; python_version < '2.7' # only one version on pypi http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/615baf9e/requirements.txt ---------------------------------------------------------------------- diff --git a/requirements.txt b/requirements.txt index 9f929a9..7ee1008 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,6 +26,7 @@ networkx==1.9.1 ordereddict==1.1 ; python_version < "2.7" packaging==16.8 # via setuptools prettytable==0.7.2 +psutil==5.2.2 pyparsing==2.2.0 # via packaging requests==2.13.0 retrying==1.3.3 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/615baf9e/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index 83584a6..99d0b39 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -18,10 +18,13 @@ from contextlib import contextmanager import aria from aria.modeling import models +from aria.orchestrator.context.common import BaseContext class MockContext(object): + INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS + def __init__(self, storage, task_kwargs=None): self.logger = logging.getLogger('mock_logger') self._task_kwargs = task_kwargs or {} @@ -46,6 +49,10 @@ class MockContext(object): def close(self): pass + @property + def model(self): + return self._storage + @classmethod def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None): return cls(storage=aria.application_model_storage(**(storage_kwargs or {})), http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/615baf9e/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index 755b9be..6f5c827 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -14,17 +14,24 @@ # limitations under the License. import os +import time import Queue +import subprocess import pytest +import psutil +import retrying import aria +from aria import operation +from aria.modeling import models from aria.orchestrator import events from aria.utils.plugin import create as create_plugin from aria.orchestrator.workflows.executor import process import tests.storage import tests.resources +from tests.helpers import FilesystemDataHolder from tests.fixtures import ( # pylint: disable=unused-import plugins_dir, plugin_manager, @@ -71,10 +78,45 @@ class TestProcessExecutor(object): executor.execute(MockContext(model, task_kwargs=dict(function='some.function'))) assert 'closed' in exc_info.value.message + def test_process_termination(self, executor, model, fs_test_holder): + argument = models.Argument.wrap('holder_path', fs_test_holder._path) + model.argument.put(argument) + ctx = MockContext( + model, + task_kwargs=dict( + function='{0}.{1}'.format(__name__, freezing_task.__name__), + arguments=dict(holder_path=argument)), + ) + + executor.execute(ctx) + + @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500) + def wait_for_extra_process_id(): + return fs_test_holder.get('subproc', False) + + pids = [executor._tasks[ctx.task.id].proc.pid, wait_for_extra_process_id()] + assert any(p.pid == pid for p in psutil.process_iter() for pid in pids) + executor.terminate(ctx.task.id) + + # Give a chance to the processes to terminate + time.sleep(2) + assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE + for p in psutil.process_iter() + for pid in pids) + + [email protected] +def fs_test_holder(tmpdir): + dataholder_path = str(tmpdir.join('dataholder')) + holder = FilesystemDataHolder(dataholder_path) + return holder + @pytest.fixture def executor(plugin_manager): - result = process.ProcessExecutor(plugin_manager=plugin_manager) + result = process.ProcessExecutor( + plugin_manager=plugin_manager, + python_path=[tests.ROOT_DIR]) yield result result.close() @@ -92,3 +134,11 @@ def model(tmpdir): initiator_kwargs=dict(base_dir=str(tmpdir))) yield _storage tests.storage.release_sqlite_storage(_storage) + + +@operation +def freezing_task(holder_path, **_): + holder = FilesystemDataHolder(holder_path) + holder['subproc'] = subprocess.Popen('while true; do sleep 5; done', shell=True).pid + while True: + time.sleep(5) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/615baf9e/tests/requirements.txt ---------------------------------------------------------------------- diff --git a/tests/requirements.txt b/tests/requirements.txt index 71a227a..cf57821 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -13,6 +13,7 @@ testtools fasteners==0.13.0 sh==1.12.13 +psutil==5.2.2 mock==1.0.1 pylint==1.6.4 pytest==3.0.2
