Repository: incubator-airflow
Updated Branches:
  refs/heads/master e9f3fdc52 -> df05546f8


[AIRFLOW-437] Send TI context in kill zombies

Fix to provide proper TI context while calling ti.handle_failure during
kill_zombies, as without the context handler_failure is of no use and
its equivalent of marking those TIs as failed directly.

This patch had conflicts when merged, resolved by
Committer: Ash Berlin-Taylor
<[email protected]>

Closes #1796 from msumit/AIRFLOW-437-2


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/df05546f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/df05546f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/df05546f

Branch: refs/heads/master
Commit: df05546f8d4676ff3651e6a4c093ad068c7f5bbc
Parents: e9f3fdc
Author: Sumit Maheshwari <[email protected]>
Authored: Fri Jun 8 16:36:14 2018 +0100
Committer: Ash Berlin-Taylor <[email protected]>
Committed: Fri Jun 8 16:36:20 2018 +0100

----------------------------------------------------------------------
 airflow/models.py | 11 ++++++++---
 tests/models.py   | 29 ++++++++++++++++++++++++++++-
 2 files changed, 36 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/df05546f/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index 312074c..704dc80 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -397,9 +397,15 @@ class DagBag(BaseDagBag, LoggingMixin):
                 dag = self.dags[ti.dag_id]
                 if ti.task_id in dag.task_ids:
                     task = dag.get_task(ti.task_id)
+
+                    # now set non db backed vars on ti
                     ti.task = task
-                    ti.handle_failure("{} killed as zombie".format(str(ti)))
-                    self.log.info('Marked zombie job %s as failed', ti)
+                    ti.test_mode = configuration.getboolean('core', 
'unit_test_mode')
+
+                    ti.handle_failure("{} detected as zombie".format(ti),
+                                      ti.test_mode, ti.get_template_context())
+                    self.log.info(
+                        'Marked zombie job %s as %s', ti, ti.state)
                     Stats.incr('zombies_killed')
         session.commit()
 
@@ -1750,7 +1756,6 @@ class TaskInstance(Base, LoggingMixin):
         if not test_mode:
             session.merge(self)
         session.commit()
-        self.log.error(str(error))
 
     @provide_session
     def get_template_context(self, session=None):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/df05546f/tests/models.py
----------------------------------------------------------------------
diff --git a/tests/models.py b/tests/models.py
index 3515516..63c41c7 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -44,6 +44,7 @@ from airflow.models import DagModel, DagRun, DagStat
 from airflow.models import clear_task_instances
 from airflow.models import XCom
 from airflow.models import Connection
+from airflow.jobs import LocalTaskJob
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.bash_operator import BashOperator
 from airflow.operators.python_operator import PythonOperator
@@ -53,7 +54,7 @@ from airflow.utils import timezone
 from airflow.utils.weight_rule import WeightRule
 from airflow.utils.state import State
 from airflow.utils.trigger_rule import TriggerRule
-from mock import patch
+from mock import patch, ANY
 from parameterized import parameterized
 from tempfile import NamedTemporaryFile
 
@@ -1392,6 +1393,32 @@ class DagBagTest(unittest.TestCase):
 
         self.assertEqual([], dagbag.process_file(None))
 
+    @patch.object(TI, 'handle_failure')
+    def test_kill_zombies(self, mock_ti):
+        """
+        Test that kill zombies call TIs failure handler with proper context
+        """
+        dagbag = models.DagBag()
+        session = settings.Session
+        dag = dagbag.get_dag('example_branch_operator')
+        task = dag.get_task(task_id='run_this_first')
+
+        ti = TI(task, datetime.datetime.now() - datetime.timedelta(1), 
'running')
+        lj = LocalTaskJob(ti)
+        lj.state = State.SHUTDOWN
+
+        session.add(lj)
+        session.commit()
+
+        ti.job_id = lj.id
+
+        session.add(ti)
+        session.commit()
+
+        dagbag.kill_zombies()
+        mock_ti.assert_called_with(ANY,
+                                   configuration.getboolean('core', 
'unit_test_mode'),
+                                   ANY)
 
 class TaskInstanceTest(unittest.TestCase):
 

Reply via email to