Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-296-Process-termination-test-fails-on-windows 32a98d576 -> 5ff27124f (forced update)
ARIA-296 Process termination test fails on windows Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5ff27124 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5ff27124 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5ff27124 Branch: refs/heads/ARIA-296-Process-termination-test-fails-on-windows Commit: 5ff27124fd6f1d8f199da1d4d173e38161bd315d Parents: 94ae844 Author: max-orlov <[email protected]> Authored: Thu Jun 29 10:55:54 2017 +0300 Committer: max-orlov <[email protected]> Committed: Sun Jul 2 18:17:06 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/executor/process.py | 7 ++- .../workflows/executor/test_process_executor.py | 49 ++++++++++++++------ 2 files changed, 38 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5ff27124/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 69288ea..2507307 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -26,7 +26,6 @@ import sys # entry point. We thus remove this module's directory from the python path if it happens to be # there -import signal from collections import namedtuple script_dir = os.path.dirname(__file__) @@ -121,7 +120,7 @@ class ProcessExecutor(base.BaseExecutor): self._server_socket.close() self._listener_thread.join(timeout=60) - for task_id in self._tasks: + for task_id in set(self._tasks): self.terminate(task_id) def terminate(self, task_id): @@ -132,10 +131,10 @@ class ProcessExecutor(base.BaseExecutor): parent_process = psutil.Process(task.proc.pid) for child_process in reversed(parent_process.children(recursive=True)): try: - child_process.send_signal(signal.SIGKILL) + child_process.kill() except BaseException: pass - parent_process.send_signal(signal.SIGKILL) + parent_process.kill() except BaseException: pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5ff27124/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index d3e7a6d..b17ec85 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -14,6 +14,7 @@ # limitations under the License. import os +import sys import time import Queue import subprocess @@ -23,6 +24,7 @@ import psutil import retrying import aria + from aria import operation from aria.modeling import models from aria.orchestrator import events @@ -78,33 +80,52 @@ class TestProcessExecutor(object): executor.execute(MockContext(model, task_kwargs=dict(function='some.function'))) assert 'closed' in exc_info.value.message - @pytest.mark.skipif(os.name == 'nt', reason='uses bash script') - def test_process_termination(self, executor, model, fs_test_holder): - argument = models.Argument.wrap('holder_path', fs_test_holder._path) - model.argument.put(argument) + def test_process_termination(self, executor, model, fs_test_holder, tmpdir): + freeze_script_path = str(tmpdir.join('freeze_script')) + with open(freeze_script_path, 'w+b') as f: + f.write( + '''import time +while True: + time.sleep(5) + ''' + ) + holder_path_argument = models.Argument.wrap('holder_path', fs_test_holder._path) + script_path_argument = models.Argument.wrap('freezing_script_path', + str(tmpdir.join('freeze_script'))) + + model.argument.put(holder_path_argument) + model.argument.put(script_path_argument) ctx = MockContext( model, task_kwargs=dict( function='{0}.{1}'.format(__name__, freezing_task.__name__), - arguments=dict(holder_path=argument)), + arguments=dict(holder_path=holder_path_argument, + freezing_script_path=script_path_argument)), ) executor.execute(ctx) - @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500) + @retrying.retry(retry_on_result=lambda r: r is False) def wait_for_extra_process_id(): return fs_test_holder.get('subproc', False) - pids = [executor._tasks[ctx.task.id].proc.pid, wait_for_extra_process_id()] - assert any(p.pid == pid for p in psutil.process_iter() for pid in pids) + task_pid = executor._tasks[ctx.task.id].proc.pid + extra_process_pid = wait_for_extra_process_id() + + assert set([task_pid, extra_process_pid]).issubset(set(psutil.pids())) executor.terminate(ctx.task.id) # Give a chance to the processes to terminate - time.sleep(10) # windows might require more time - assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE - for p in psutil.process_iter() - for pid in pids) + time.sleep(2) + # all processes should be either zombies or non existent + pids = [task_pid, extra_process_pid] + for pid in pids: + if pid in psutil.pids(): + assert psutil.Process(pid).status() == psutil.STATUS_ZOMBIE + else: + # making the test more readable + assert pid not in psutil.pids() @pytest.fixture def fs_test_holder(tmpdir): @@ -138,8 +159,8 @@ def model(tmpdir): @operation -def freezing_task(holder_path, **_): +def freezing_task(holder_path, freezing_script_path, **_): holder = FilesystemDataHolder(holder_path) - holder['subproc'] = subprocess.Popen('while true; do sleep 5; done', shell=True).pid + holder['subproc'] = subprocess.Popen([sys.executable, freezing_script_path], shell=True).pid while True: time.sleep(5)
