This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c0d0a5d90 Fix zombie task handling with multiple schedulers (#24906)
1c0d0a5d90 is described below

commit 1c0d0a5d907ae447b7221200952b47b69f8f8e87
Author: Jed Cunningham <[email protected]>
AuthorDate: Fri Jul 8 10:49:12 2022 -0600

    Fix zombie task handling with multiple schedulers (#24906)
    
    Each scheduler was looking at all running tasks for zombies, leading to
    multiple schedulers handling the zombies. This causes problems with
    retries (e.g. being marked as FAILED instead of UP_FOR_RETRY) and
    callbacks (e.g. `on_failure_callback` being called multiple times).
    
    When the second scheduler tries to determine if the task is able to be 
retried,
    and it's already in UP_FOR_RETRY (the first scheduler already finished),
    it sees the "next" try_number (as it's no longer running),
    which then leads it to be FAILED instead.
    
    The easy fix is to simply restrict each scheduler to its own TIs, as
    orphaned running TIs will be adopted anyways.
---
 airflow/jobs/scheduler_job.py    |  4 +++-
 tests/jobs/test_scheduler_job.py | 35 +++++++++++++++++++++++------------
 2 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 8452950e04..39aac294a8 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1358,7 +1358,8 @@ class SchedulerJob(BaseJob):
     def _find_zombies(self, session):
         """
         Find zombie task instances, which are tasks haven't heartbeated for 
too long
-        and update the current zombie list.
+        or have a no-longer-running LocalTaskJob, and send them off to the DAG 
processor
+        to be handled.
         """
         self.log.debug("Finding 'running' jobs without a recent heartbeat")
         limit_dttm = timezone.utcnow() - 
timedelta(seconds=self._zombie_threshold_secs)
@@ -1374,6 +1375,7 @@ class SchedulerJob(BaseJob):
                     LocalTaskJob.latest_heartbeat < limit_dttm,
                 )
             )
+            .filter(TaskInstance.queued_by_job_id == self.id)
             .all()
         )
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 27931d460a..7580d59c1c 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3890,7 +3890,6 @@ class TestSchedulerJob:
             session.query(LocalTaskJob).delete()
             dag = dagbag.get_dag('example_branch_operator')
             dag.sync_to_db()
-            task = dag.get_task(task_id='run_this_first')
 
             dag_run = dag.create_dagrun(
                 state=DagRunState.RUNNING,
@@ -3899,21 +3898,33 @@ class TestSchedulerJob:
                 session=session,
             )
 
-            ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
-            local_job = LocalTaskJob(ti)
-            local_job.state = State.SHUTDOWN
-
-            session.add(local_job)
-            session.flush()
-
-            ti.job_id = local_job.id
-            session.add(ti)
-            session.flush()
-
             self.scheduler_job = SchedulerJob(subdir=os.devnull)
             self.scheduler_job.executor = MockExecutor()
             self.scheduler_job.processor_agent = mock.MagicMock()
 
+            # We will provision 2 tasks so we can check we only find zombies 
from this scheduler
+            tasks_to_setup = ['branching', 'run_this_first']
+
+            for task_id in tasks_to_setup:
+                task = dag.get_task(task_id=task_id)
+                ti = TaskInstance(task, run_id=dag_run.run_id, 
state=State.RUNNING)
+                ti.queued_by_job_id = 999
+
+                local_job = LocalTaskJob(ti)
+                local_job.state = State.SHUTDOWN
+
+                session.add(local_job)
+                session.flush()
+
+                ti.job_id = local_job.id
+                session.add(ti)
+                session.flush()
+
+            assert task.task_id == 'run_this_first'  # Make sure we have the 
task/ti we expect
+
+            ti.queued_by_job_id = self.scheduler_job.id
+            session.flush()
+
             self.scheduler_job._find_zombies(session=session)
 
             self.scheduler_job.executor.callback_sink.send.assert_called_once()

Reply via email to