Repository: incubator-airflow Updated Branches: refs/heads/master e9f3fdc52 -> df05546f8
[AIRFLOW-437] Send TI context in kill zombies Fix to provide proper TI context while calling ti.handle_failure during kill_zombies, as without the context handler_failure is of no use and its equivalent of marking those TIs as failed directly. This patch had conflicts when merged, resolved by Committer: Ash Berlin-Taylor <[email protected]> Closes #1796 from msumit/AIRFLOW-437-2 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/df05546f Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/df05546f Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/df05546f Branch: refs/heads/master Commit: df05546f8d4676ff3651e6a4c093ad068c7f5bbc Parents: e9f3fdc Author: Sumit Maheshwari <[email protected]> Authored: Fri Jun 8 16:36:14 2018 +0100 Committer: Ash Berlin-Taylor <[email protected]> Committed: Fri Jun 8 16:36:20 2018 +0100 ---------------------------------------------------------------------- airflow/models.py | 11 ++++++++--- tests/models.py | 29 ++++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/df05546f/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index 312074c..704dc80 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -397,9 +397,15 @@ class DagBag(BaseDagBag, LoggingMixin): dag = self.dags[ti.dag_id] if ti.task_id in dag.task_ids: task = dag.get_task(ti.task_id) + + # now set non db backed vars on ti ti.task = task - ti.handle_failure("{} killed as zombie".format(str(ti))) - self.log.info('Marked zombie job %s as failed', ti) + ti.test_mode = configuration.getboolean('core', 'unit_test_mode') + + ti.handle_failure("{} detected as zombie".format(ti), + ti.test_mode, ti.get_template_context()) + self.log.info( + 'Marked zombie job %s as %s', ti, ti.state) Stats.incr('zombies_killed') session.commit() @@ -1750,7 +1756,6 @@ class TaskInstance(Base, LoggingMixin): if not test_mode: session.merge(self) session.commit() - self.log.error(str(error)) @provide_session def get_template_context(self, session=None): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/df05546f/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index 3515516..63c41c7 100644 --- a/tests/models.py +++ b/tests/models.py @@ -44,6 +44,7 @@ from airflow.models import DagModel, DagRun, DagStat from airflow.models import clear_task_instances from airflow.models import XCom from airflow.models import Connection +from airflow.jobs import LocalTaskJob from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator @@ -53,7 +54,7 @@ from airflow.utils import timezone from airflow.utils.weight_rule import WeightRule from airflow.utils.state import State from airflow.utils.trigger_rule import TriggerRule -from mock import patch +from mock import patch, ANY from parameterized import parameterized from tempfile import NamedTemporaryFile @@ -1392,6 +1393,32 @@ class DagBagTest(unittest.TestCase): self.assertEqual([], dagbag.process_file(None)) + @patch.object(TI, 'handle_failure') + def test_kill_zombies(self, mock_ti): + """ + Test that kill zombies call TIs failure handler with proper context + """ + dagbag = models.DagBag() + session = settings.Session + dag = dagbag.get_dag('example_branch_operator') + task = dag.get_task(task_id='run_this_first') + + ti = TI(task, datetime.datetime.now() - datetime.timedelta(1), 'running') + lj = LocalTaskJob(ti) + lj.state = State.SHUTDOWN + + session.add(lj) + session.commit() + + ti.job_id = lj.id + + session.add(ti) + session.commit() + + dagbag.kill_zombies() + mock_ti.assert_called_with(ANY, + configuration.getboolean('core', 'unit_test_mode'), + ANY) class TaskInstanceTest(unittest.TestCase):
