ashb commented on a change in pull request #5511: [AIRFLOW-4797] Fix zombie 
detection
URL: https://github.com/apache/airflow/pull/5511#discussion_r300341807
 
 

 ##########
 File path: airflow/models/dagbag.py
 ##########
 @@ -268,35 +270,46 @@ def process_file(self, filepath, only_if_updated=True, 
safe_mode=True):
         return found_dags
 
     @provide_session
-    def kill_zombies(self, zombies, session=None):
+    def kill_zombies(self, session=None):
         """
-        Fail given zombie tasks, which are tasks that haven't
+        Fail zombie tasks, which are tasks that haven't
         had a heartbeat for too long, in the current DagBag.
 
-        :param zombies: zombie task instances to kill.
-        :type zombies: airflow.utils.dag_processing.SimpleTaskInstance
         :param session: DB session.
         :type session: sqlalchemy.orm.session.Session
         """
-        from airflow.models.taskinstance import TaskInstance  # Avoid circular 
import
-
-        for zombie in zombies:
-            if zombie.dag_id in self.dags:
-                dag = self.dags[zombie.dag_id]
-                if zombie.task_id in dag.task_ids:
-                    task = dag.get_task(zombie.task_id)
-                    ti = TaskInstance(task, zombie.execution_date)
-                    # Get properties needed for failure handling from 
SimpleTaskInstance.
-                    ti.start_date = zombie.start_date
-                    ti.end_date = zombie.end_date
-                    ti.try_number = zombie.try_number
-                    ti.state = zombie.state
-                    ti.test_mode = configuration.getboolean('core', 
'unit_test_mode')
-                    ti.handle_failure("{} detected as zombie".format(ti),
-                                      ti.test_mode, ti.get_template_context())
-                    self.log.info(
-                        'Marked zombie job %s as %s', ti, ti.state)
-                    Stats.incr('zombies_killed')
+        # Avoid circular import
+        from airflow.models.taskinstance import TaskInstance as TI
+        from airflow.jobs import LocalTaskJob as LJ
+
+        # How many seconds do we wait for tasks to heartbeat before mark them 
as zombies.
+        zombie_threshold_secs = (
+            configuration.getint('scheduler', 
'scheduler_zombie_task_threshold'))
+        limit_dttm = timezone.utcnow() - timedelta(
+            seconds=zombie_threshold_secs)
+        self.log.debug("Failing jobs without heartbeat after %s", limit_dttm)
+
+        tis = (
+            session.query(TI)
+            .join(LJ, TI.job_id == LJ.id)
+            .filter(TI.state == State.RUNNING)
+            .filter(TI.dag_id.in_(self.dags))
 
 Review comment:
   This works (I guess it must) even though dags is a dict?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to