Repository: incubator-airflow Updated Branches: refs/heads/master 0f9112daf -> d3abe2c3c
[AIRFLOW-403] Bash operator's kill method leaves underlying processes running Currently only the main process is being killed due to the fact that the process group is not being terminated. Closes #1714 from spektom/bash_operator_kill Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d3abe2c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d3abe2c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d3abe2c3 Branch: refs/heads/master Commit: d3abe2c3c2dd2ef53b00282de9f9e3a4512ab068 Parents: 0f9112d Author: Michael Spector <[email protected]> Authored: Sun Dec 25 14:53:50 2016 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Sun Dec 25 14:53:53 2016 +0100 ---------------------------------------------------------------------- airflow/models.py | 13 ++++++++----- airflow/operators/bash_operator.py | 10 +++++++--- tests/core.py | 22 ++++++++++++++++++++++ 3 files changed, 37 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d3abe2c3/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 55b855b..f6f7968 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -60,7 +60,7 @@ import six from airflow import settings, utils from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor from airflow import configuration -from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.exceptions import AirflowException, AirflowSkipException, AirflowTaskTimeout from airflow.dag.base_dag import BaseDag, BaseDagBag from airflow.ti_deps.deps.not_in_retry_period_dep import NotInRetryPeriodDep from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep @@ -1294,10 +1294,13 @@ class TaskInstance(Base): # if it goes beyond result = None if task_copy.execution_timeout: - with timeout(int( - task_copy.execution_timeout.total_seconds())): - result = task_copy.execute(context=context) - + try: + with timeout(int( + task_copy.execution_timeout.total_seconds())): + result = task_copy.execute(context=context) + except AirflowTaskTimeout: + task_copy.on_kill() + raise else: result = task_copy.execute(context=context) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d3abe2c3/airflow/operators/bash_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/bash_operator.py b/airflow/operators/bash_operator.py index 8e30da4..3146cd6 100644 --- a/airflow/operators/bash_operator.py +++ b/airflow/operators/bash_operator.py @@ -14,6 +14,8 @@ from builtins import bytes +import os +import signal import logging from subprocess import Popen, STDOUT, PIPE from tempfile import gettempdir, NamedTemporaryFile @@ -80,7 +82,8 @@ class BashOperator(BaseOperator): sp = Popen( ['bash', fname], stdout=PIPE, stderr=STDOUT, - cwd=tmp_dir, env=self.env) + cwd=tmp_dir, env=self.env, + preexec_fn=os.setsid) self.sp = sp @@ -100,5 +103,6 @@ class BashOperator(BaseOperator): return line def on_kill(self): - logging.info('Sending SIGTERM signal to bash subprocess') - self.sp.terminate() + logging.info('Sending SIGTERM signal to bash process group') + os.killpg(os.getpgid(self.sp.pid), signal.SIGTERM) + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d3abe2c3/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 24315f1..85e7fa1 100644 --- a/tests/core.py +++ b/tests/core.py @@ -416,6 +416,28 @@ class CoreTest(unittest.TestCase): output_encoding='utf-8') t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + def test_bash_operator_kill(self): + import subprocess + import psutil + sleep_time = "100%d" % os.getpid() + t = BashOperator( + task_id='test_bash_operator_kill', + execution_timeout=timedelta(seconds=1), + bash_command="/bin/bash -c 'sleep %s'" % sleep_time, + dag=self.dag) + self.assertRaises( + exceptions.AirflowTaskTimeout, + t.run, + start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + sleep(2) + pid = -1 + for proc in psutil.process_iter(): + if proc.cmdline() == ['sleep', sleep_time]: + pid = proc.pid + if pid != -1: + os.kill(pid, signal.SIGTERM) + self.fail("BashOperator's subprocess still running after stopping on timeout!") + def test_trigger_dagrun(self): def trigga(context, obj): if True:
