Repository: incubator-airflow Updated Branches: refs/heads/master 8f9f5084b -> 9bdfb8c70
[AIRFLOW-1109] Use kill signal to kill processes and log results The kill_process_tree function comments state that it uses SIGKILL when it uses SIGTERM. We should update this to be correct as well as log results. Closes #2241 from saguziel/aguziel-kill-processes Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9bdfb8c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9bdfb8c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9bdfb8c7 Branch: refs/heads/master Commit: 9bdfb8c700a2622aad26422a34d24e252be52bff Parents: 8f9f508 Author: Alex Guziel <[email protected]> Authored: Thu Apr 13 18:52:43 2017 -0700 Committer: Alex Guziel <[email protected]> Committed: Thu Apr 13 18:52:43 2017 -0700 ---------------------------------------------------------------------- airflow/utils/helpers.py | 140 +++++++++++++----------------------------- tests/utils/helpers.py | 84 +++++++++++++++++++++++++ 2 files changed, 126 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9bdfb8c7/airflow/utils/helpers.py ---------------------------------------------------------------------- diff --git a/airflow/utils/helpers.py b/airflow/utils/helpers.py index dee0657..9a94125 100644 --- a/airflow/utils/helpers.py +++ b/airflow/utils/helpers.py @@ -35,7 +35,7 @@ from airflow.exceptions import AirflowException # When killing processes, time to wait after issuing a SIGTERM before issuing a # SIGKILL. -TIME_TO_WAIT_AFTER_SIGTERM = 5 +DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM = 5 def validate_key(k, max_length=250): @@ -182,20 +182,32 @@ def pprinttable(rows): return s -def kill_using_shell(pid, signal=signal.SIGTERM): - process = psutil.Process(pid) - # Use sudo only when necessary - consider SubDagOperator and SequentialExecutor case. - if process.username() != getpass.getuser(): - args = ["sudo", "kill", "-{}".format(int(signal)), str(pid)] - else: - args = ["kill", "-{}".format(int(signal)), str(pid)] - # PID may not exist and return a non-zero error code - subprocess.call(args) +def kill_using_shell(logger, pid, signal=signal.SIGTERM): + try: + process = psutil.Process(pid) + # Use sudo only when necessary - consider SubDagOperator and SequentialExecutor case. + if process.username() != getpass.getuser(): + args = ["sudo", "kill", "-{}".format(int(signal)), str(pid)] + else: + args = ["kill", "-{}".format(int(signal)), str(pid)] + # PID may not exist and return a non-zero error code + logger.error(subprocess.check_output(args)) + logger.info("Killed process {} with signal {}".format(pid, signal)) + return True + except psutil.NoSuchProcess as e: + logger.warning("Process {} no longer exists".format(pid)) + return False + except subprocess.CalledProcessError as e: + logger.warning("Failed to kill process {} with signal {}. Output: {}" + .format(pid, signal, e.output)) + return False -def kill_process_tree(logger, pid): +def kill_process_tree(logger, pid, timeout=DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM): """ - Kills the process and all of the descendants. Kills using the `kill` + TODO(saguziel): also kill the root process after killing descendants + + Kills the process's descendants. Kills using the `kill` shell command so that it can change users. Note: killing via PIDs has the potential to the wrong process if the process dies and the PID gets recycled in a narrow time window. @@ -215,23 +227,21 @@ def kill_process_tree(logger, pid): if x.is_running()] if len(descendant_processes) != 0: - logger.warning("Terminating descendant processes of {} PID: {}" - .format(root_process.cmdline(), - root_process.pid)) + logger.info("Terminating descendant processes of {} PID: {}" + .format(root_process.cmdline(), + root_process.pid)) temp_processes = descendant_processes[:] for descendant in temp_processes: - logger.warning("Terminating descendant process {} PID: {}" - .format(descendant.cmdline(), descendant.pid)) - try: - kill_using_shell(descendant.pid, signal.SIGTERM) - except psutil.NoSuchProcess: + logger.info("Terminating descendant process {} PID: {}" + .format(descendant.cmdline(), descendant.pid)) + if not kill_using_shell(logger, descendant.pid, signal.SIGTERM): descendant_processes.remove(descendant) - logger.warning("Waiting up to {}s for processes to exit..." - .format(TIME_TO_WAIT_AFTER_SIGTERM)) + logger.info("Waiting up to {}s for processes to exit..." + .format(timeout)) try: - psutil.wait_procs(descendant_processes, TIME_TO_WAIT_AFTER_SIGTERM) - logger.warning("Done waiting") + psutil.wait_procs(descendant_processes, timeout) + logger.info("Done waiting") except psutil.TimeoutExpired: logger.warning("Ran out of time while waiting for " "processes to exit") @@ -242,85 +252,19 @@ def kill_process_tree(logger, pid): if len(descendant_processes) > 0: temp_processes = descendant_processes[:] for descendant in temp_processes: - logger.warning("Killing descendant process {} PID: {}" - .format(descendant.cmdline(), descendant.pid)) - try: - kill_using_shell(descendant.pid, signal.SIGTERM) - descendant.wait() - except psutil.NoSuchProcess: + logger.info("Killing descendant process {} PID: {}" + .format(descendant.cmdline(), descendant.pid)) + if not kill_using_shell(logger, descendant.pid, signal.SIGKILL): descendant_processes.remove(descendant) - logger.warning("Killed all descendant processes of {} PID: {}" - .format(root_process.cmdline(), - root_process.pid)) + else: + descendant.wait() + logger.info("Killed all descendant processes of {} PID: {}" + .format(root_process.cmdline(), + root_process.pid)) else: logger.debug("There are no descendant processes to kill") -def kill_descendant_processes(logger, pids_to_kill=None): - """ - Kills all descendant processes of this process. - - :param logger: logger - :type logger: logging.Logger - :param pids_to_kill: if specified, kill only these PIDs - :type pids_to_kill: list[int] - """ - # First try SIGTERM - this_process = psutil.Process(os.getpid()) - - # Only check child processes to ensure that we don't have a case - # where a child process died but the PID got reused. - descendant_processes = [x for x in this_process.children(recursive=True) - if x.is_running()] - if pids_to_kill: - descendant_processes = [x for x in descendant_processes - if x.pid in pids_to_kill] - - if len(descendant_processes) == 0: - logger.debug("There are no descendant processes that can be killed") - return - logger.warning("Terminating descendant processes of {} PID: {}" - .format(this_process.cmdline(), - this_process.pid)) - - temp_processes = descendant_processes[:] - for descendant in temp_processes: - try: - logger.warning("Terminating descendant process {} PID: {}" - .format(descendant.cmdline(), descendant.pid)) - descendant.terminate() - except psutil.NoSuchProcess: - descendant_processes.remove(descendant) - - logger.warning("Waiting up to {}s for processes to exit..." - .format(TIME_TO_WAIT_AFTER_SIGTERM)) - try: - psutil.wait_procs(descendant_processes, TIME_TO_WAIT_AFTER_SIGTERM) - logger.warning("Done waiting") - except psutil.TimeoutExpired: - logger.warning("Ran out of time while waiting for " - "processes to exit") - # Then SIGKILL - descendant_processes = [x for x in this_process.children(recursive=True) - if x.is_running()] - if pids_to_kill: - descendant_processes = [x for x in descendant_processes - if x.pid in pids_to_kill] - - if len(descendant_processes) > 0: - for descendant in descendant_processes: - logger.warning("Killing descendant process {} PID: {}" - .format(descendant.cmdline(), descendant.pid)) - try: - descendant.kill() - descendant.wait() - except psutil.NoSuchProcess: - pass - logger.warning("Killed all descendant processes of {} PID: {}" - .format(this_process.cmdline(), - this_process.pid)) - - class AirflowImporter(object): """ Importer that dynamically loads a class and module from its parent. This http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9bdfb8c7/tests/utils/helpers.py ---------------------------------------------------------------------- diff --git a/tests/utils/helpers.py b/tests/utils/helpers.py new file mode 100644 index 0000000..3ef43f8 --- /dev/null +++ b/tests/utils/helpers.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import logging +import multiprocessing +import unittest +import psutil +import signal +import time + +from airflow.utils import helpers + + +class TestHelpers(unittest.TestCase): + @staticmethod + def _ignores_sigterm(child_pid, setup_done): + def signal_handler(signum, frame): + pass + signal.signal(signal.SIGTERM, signal_handler) + child_pid.value = os.getpid() + setup_done.release() + while True: + time.sleep(1) + + @staticmethod + def _parent_of_ignores_sigterm(child_process_killed, child_pid, + process_done, setup_done): + child = multiprocessing.Process(target=TestHelpers._ignores_sigterm, + args=[child_pid, setup_done]) + child.start() + if setup_done.acquire(timeout=1.0): + helpers.kill_process_tree(logging.getLogger(), os.getpid(), timeout=1.0) + # Process.is_alive doesnt work with SIGKILL + if not psutil.pid_exists(child_pid.value): + child_process_killed.value = 1 + process_done.release() + + def test_kill_process_tree(self): + """ Spin up a process that can't be killed by SIGTERM and make sure it gets killed anyway. """ + child_process_killed = multiprocessing.Value('i', 0) + process_done = multiprocessing.Semaphore(0) + child_pid = multiprocessing.Value('i', 0) + setup_done = multiprocessing.Semaphore(0) + args = [child_process_killed, child_pid, process_done, setup_done] + child = multiprocessing.Process(target=TestHelpers._parent_of_ignores_sigterm, args=args) + try: + child.start() + self.assertTrue(process_done.acquire(timeout=5.0)) + self.assertEqual(1, child_process_killed.value) + finally: + try: + os.kill(child_pid.value, signal.SIGKILL) # terminate doesnt work here + except OSError: + pass + child.terminate() + + def test_kill_using_shell(self): + """ Test when no process exists. """ + child_pid = multiprocessing.Value('i', 0) + setup_done = multiprocessing.Semaphore(0) + args = [child_pid, setup_done] + child = multiprocessing.Process(target=TestHelpers._ignores_sigterm, args=args) + child.start() + + self.assertTrue(setup_done.acquire(timeout=1.0)) + pid_to_kill = child_pid.value + self.assertTrue(helpers.kill_using_shell(logging.getLogger(), pid_to_kill, + signal=signal.SIGKILL)) + child.join() # remove orphan process + self.assertFalse(helpers.kill_using_shell(logging.getLogger(), pid_to_kill, + signal=signal.SIGKILL)) +
