Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions 20c328f36 -> da72f3be2 (forced update)
close processes test for appveyor fix Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/da72f3be Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/da72f3be Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/da72f3be Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions Commit: da72f3be2bf87a3d8f21a03e3f0e08c16ea8c493 Parents: 53dc64e Author: max-orlov <[email protected]> Authored: Thu Jun 29 10:55:54 2017 +0300 Committer: max-orlov <[email protected]> Committed: Sun Jul 2 18:02:23 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/executor/process.py | 6 +-- .../workflows/executor/test_process_executor.py | 49 ++++++++++++++------ tox.ini | 2 +- 3 files changed, 40 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da72f3be/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 11e3cfd..8af700e 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -121,7 +121,7 @@ class ProcessExecutor(base.BaseExecutor): self._server_socket.close() self._listener_thread.join(timeout=60) - for task_id in self._tasks: + for task_id in set(self._tasks): self.terminate(task_id) def terminate(self, task_id): @@ -132,10 +132,10 @@ class ProcessExecutor(base.BaseExecutor): parent_process = psutil.Process(task.proc.pid) for child_process in reversed(parent_process.children(recursive=True)): try: - child_process.send_signal(signal.SIGKILL) + child_process.kill() except BaseException: pass - parent_process.send_signal(signal.SIGKILL) + parent_process.kill() except BaseException: pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da72f3be/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index 6cac288..c8bebc7 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -23,6 +23,8 @@ import psutil import retrying import aria +import sys + from aria import operation from aria.modeling import models from aria.orchestrator import events @@ -39,6 +41,9 @@ from tests.fixtures import ( # pylint: disable=unused-import from . import MockContext +IS_WINDOWS = os.name == 'nt' + + class TestProcessExecutor(object): def test_plugin_execution(self, executor, mock_plugin, model): @@ -78,32 +83,50 @@ class TestProcessExecutor(object): executor.execute(MockContext(model, task_kwargs=dict(function='some.function'))) assert 'closed' in exc_info.value.message - def test_process_termination(self, executor, model, fs_test_holder): - argument = models.Argument.wrap('holder_path', fs_test_holder._path) - model.argument.put(argument) + def test_process_termination(self, executor, model, fs_test_holder, tmpdir): + freeze_script_path = str(tmpdir.join('freeze_script')) + with open(freeze_script_path, 'w+b') as f: + f.write( + '''import time +while True: + time.sleep(5) + ''' + ) + holder_path_argument = models.Argument.wrap('holder_path', fs_test_holder._path) + script_path_argument = models.Argument.wrap('freezing_script_path', str(tmpdir.join('freeze_script'))) + + model.argument.put(holder_path_argument) + model.argument.put(script_path_argument) ctx = MockContext( model, task_kwargs=dict( function='{0}.{1}'.format(__name__, freezing_task.__name__), - arguments=dict(holder_path=argument)), + arguments=dict(holder_path=holder_path_argument, freezing_script_path=script_path_argument)), ) executor.execute(ctx) - @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500) + @retrying.retry(retry_on_result=lambda r: r is False) def wait_for_extra_process_id(): return fs_test_holder.get('subproc', False) - pids = [executor._tasks[ctx.task.id].proc.pid, wait_for_extra_process_id()] - assert any(p.pid == pid for p in psutil.process_iter() for pid in pids) + task_pid = executor._tasks[ctx.task.id].proc.pid + extra_process_pid = wait_for_extra_process_id() + + assert set([task_pid, extra_process_pid]).issubset(set(psutil.pids())) executor.terminate(ctx.task.id) # Give a chance to the processes to terminate - time.sleep(10) # windows might require more time - assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE - for p in psutil.process_iter() - for pid in pids) + time.sleep(2) + # all processes should be either zombies or non existent + pids = [task_pid, extra_process_pid] + for pid in pids: + if pid in psutil.pids(): + assert psutil.Process(pid).status() == psutil.STATUS_ZOMBIE + else: + # making the test more readable + assert pid not in psutil.pids() @pytest.fixture def fs_test_holder(tmpdir): @@ -137,8 +160,8 @@ def model(tmpdir): @operation -def freezing_task(holder_path, **_): +def freezing_task(holder_path, freezing_script_path, **_): holder = FilesystemDataHolder(holder_path) - holder['subproc'] = subprocess.Popen('while true; do sleep 5; done', shell=True).pid + holder['subproc'] = subprocess.Popen([sys.executable, freezing_script_path], shell=True).pid while True: time.sleep(5) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da72f3be/tox.ini ---------------------------------------------------------------------- diff --git a/tox.ini b/tox.ini index 3e1fb3c..796df97 100644 --- a/tox.ini +++ b/tox.ini @@ -46,7 +46,7 @@ commands=pytest tests/end2end --cov-report term-missing --cov aria commands=pytest tests/end2end --cov-report term-missing --cov aria [testenv:pywin] -commands=pytest tests --ignore=tests/end2end --ignore=tests/orchestrator/execution_plugin/test_ssh.py --cov-report term-missing --cov aria +commands=pytest tests/orchestrator/workflows/executor/test_process_executor.py [testenv:py27ssh] install_command=pip install {opts} {packages} .[ssh]
