Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-285-Cancel-execution-may-leave-running-processes 9f26c06ad -> b019de64e (forced update)
fixed test waitinge Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b019de64 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b019de64 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b019de64 Branch: refs/heads/ARIA-285-Cancel-execution-may-leave-running-processes Commit: b019de64ed90365e9f5893ef015b5ff7459427e0 Parents: 3d5efc4 Author: max-orlov <[email protected]> Authored: Tue Jun 27 11:53:40 2017 +0300 Committer: max-orlov <[email protected]> Committed: Tue Jun 27 15:02:48 2017 +0300 ---------------------------------------------------------------------- aria/logger.py | 3 +-- aria/orchestrator/workflows/executor/process.py | 24 +++++++++----------- .../workflows/executor/test_process_executor.py | 15 +++++++----- 3 files changed, 21 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b019de64/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index 6094f75..bd7ed4e 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -48,7 +48,7 @@ class LoggerMixin(object): logger_name = None logger_level = logging.DEBUG - def __init__(self, termination_timeout=10, *args, **kwargs): + def __init__(self, *args, **kwargs): self.logger_name = self.logger_name or self.__class__.__name__ self.logger = logging.getLogger('{0}.{1}'.format(_base_logger.name, self.logger_name)) # Set the logger handler of any object derived from LoggerMixing to NullHandler. @@ -56,7 +56,6 @@ class LoggerMixin(object): # `No handlers could be found for logger "..."`. self.logger.addHandler(NullHandler()) self.logger.setLevel(self.logger_level) - self._termination_timeout = termination_timeout super(LoggerMixin, self).__init__(*args, **kwargs) @classmethod http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b019de64/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 92c83e3..278e764 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -29,8 +29,6 @@ from collections import namedtuple import signal -import time - script_dir = os.path.dirname(__file__) if script_dir in sys.path: sys.path.remove(script_dir) @@ -122,17 +120,14 @@ class ProcessExecutor(base.BaseExecutor): self._server_socket.close() self._listener_thread.join(timeout=60) + for task_id in self._tasks: + self.terminate(task_id) + def terminate(self, task_id): task = self._tasks.get(task_id) # The process might have managed to finished so it would not be in the tasks list - if task and os.getsid(os.getpid()) != os.getpgid(task.proc.pid): - # If the above condition is false, the process group leader is the group leader - # for the current session of the system, and killing it will kill the the entire - # os session. - os.killpg(os.getpgid(task.proc.pid), signal.SIGINT) - - time.sleep(self._termination_timeout) - os.killpg(os.getpgid(task.proc.pid), signal.SIGTERM) + if task: + os.killpg(os.getpgid(task.proc.pid), signal.SIGKILL) def _execute(self, ctx): self._check_closed() @@ -146,10 +141,13 @@ class ProcessExecutor(base.BaseExecutor): env = self._construct_subprocess_env(task=ctx.task) # Asynchronously start the operation in a subprocess proc = subprocess.Popen( - '{0} {1} {2}'.format(sys.executable, __file__, arguments_json_path), + [ + sys.executable, + os.path.expanduser(os.path.expandvars(__file__)), + os.path.expanduser(os.path.expandvars(arguments_json_path)) + ], env=env, - preexec_fn=os.setsid, - shell=True) + preexec_fn=os.setsid) self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=proc) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b019de64/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index be3e833..746ed93 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -90,12 +90,16 @@ class TestProcessExecutor(object): executor.execute(ctx) - while fs_test_holder.get('subproc', None) is None: - time.sleep(1) - pids = [executor._tasks[ctx.task.id].proc.pid, fs_test_holder['subproc']] + @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500) + def wait_for_extra_process_id(): + return fs_test_holder.get('subproc', False) + + pids = [executor._tasks[ctx.task.id].proc.pid, wait_for_extra_process_id()] assert any(p.pid == pid for p in psutil.process_iter() for pid in pids) executor.terminate(ctx.task.id) - time.sleep(10) + + # Give a change to the processes to terminate + time.sleep(2) assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE for p in psutil.process_iter() for pid in pids) @@ -112,8 +116,7 @@ def fs_test_holder(tmpdir): def executor(plugin_manager): result = process.ProcessExecutor( plugin_manager=plugin_manager, - python_path=[tests.ROOT_DIR], - termination_timeout=1) + python_path=[tests.ROOT_DIR]) yield result result.close()
