Repository: incubator-airflow Updated Branches: refs/heads/master f048e94c8 -> 75c3e1d29
[AIRFLOW-721] Descendant process can disappear before termination There is a race condition in helpers.py's kill_descendant_processes that checks for running processes and then tries to terminate them. This is not done atomically allowing for a small window where a PID can disappear before termination. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b95a6154 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b95a6154 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b95a6154 Branch: refs/heads/master Commit: b95a615496a102eccb4e310792e5080bb29db71b Parents: ed8e15b Author: Bolke de Bruin <[email protected]> Authored: Tue Dec 27 21:03:39 2016 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Thu Dec 29 15:58:18 2016 +0100 ---------------------------------------------------------------------- airflow/utils/helpers.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b95a6154/airflow/utils/helpers.py ---------------------------------------------------------------------- diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index 24fc310..23fda03 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -205,10 +205,16 @@ def kill_descendant_processes(logger, pids_to_kill=None): logger.warn("Terminating descendant processes of {} PID: {}" .format(this_process.cmdline(), this_process.pid)) - for descendant in descendant_processes: + + temp_processes = descendant_processes[:] + for descendant in temp_processes: logger.warn("Terminating descendant process {} PID: {}" .format(descendant.cmdline(), descendant.pid)) - descendant.terminate() + try: + descendant.terminate() + except psutil.NoSuchProcess: + descendant_processes.remove(descendant) + logger.warn("Waiting up to {}s for processes to exit..." .format(TIME_TO_WAIT_AFTER_SIGTERM)) try: @@ -228,8 +234,11 @@ def kill_descendant_processes(logger, pids_to_kill=None): for descendant in descendant_processes: logger.warn("Killing descendant process {} PID: {}" .format(descendant.cmdline(), descendant.pid)) - descendant.kill() - descendant.wait() + try: + descendant.kill() + descendant.wait() + except psutil.NoSuchProcess: + pass logger.warn("Killed all descendant processes of {} PID: {}" .format(this_process.cmdline(), this_process.pid))
