This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to tag v2.3.3+astro.2
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8d570b14d34a3253ee93186c65755ffad5be23bd
Author: Jed Cunningham <[email protected]>
AuthorDate: Fri Jul 8 10:49:12 2022 -0600

    Fix zombie task handling with multiple schedulers (#24906)
    
    Each scheduler was looking at all running tasks for zombies, leading to
    multiple schedulers handling the zombies. This causes problems with
    retries (e.g. being marked as FAILED instead of UP_FOR_RETRY) and
    callbacks (e.g. `on_failure_callback` being called multiple times).
    
    When the second scheduler tries to determine if the task is able to be 
retried,
    and it's already in UP_FOR_RETRY (the first scheduler already finished),
    it sees the "next" try_number (as it's no longer running),
    which then leads it to be FAILED instead.
    
    The easy fix is to simply restrict each scheduler to its own TIs, as
    orphaned running TIs will be adopted anyways.
    
    (cherry picked from commit 1c0d0a5d907ae447b7221200952b47b69f8f8e87)
---
 airflow/jobs/scheduler_job.py    |  4 +++-
 tests/jobs/test_scheduler_job.py | 35 +++++++++++++++++++++++------------
 2 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py
index 3440832275..3332434838 100644
--- a/airflow/jobs/scheduler_job.py
+++ b/airflow/jobs/scheduler_job.py
@@ -1339,7 +1339,8 @@ class SchedulerJob(BaseJob):
     def _find_zombies(self, session):
         """
         Find zombie task instances, which are tasks haven't heartbeated for 
too long
-        and update the current zombie list.
+        or have a no-longer-running LocalTaskJob, and send them off to the DAG 
processor
+        to be handled.
         """
         self.log.debug("Finding 'running' jobs without a recent heartbeat")
         limit_dttm = timezone.utcnow() - 
timedelta(seconds=self._zombie_threshold_secs)
@@ -1355,6 +1356,7 @@ class SchedulerJob(BaseJob):
                     LocalTaskJob.latest_heartbeat < limit_dttm,
                 )
             )
+            .filter(TaskInstance.queued_by_job_id == self.id)
             .all()
         )
 
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index de909ef6e6..3895ab6af8 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3856,7 +3856,6 @@ class TestSchedulerJob:
             session.query(LocalTaskJob).delete()
             dag = dagbag.get_dag('example_branch_operator')
             dag.sync_to_db()
-            task = dag.get_task(task_id='run_this_first')
 
             dag_run = dag.create_dagrun(
                 state=DagRunState.RUNNING,
@@ -3865,21 +3864,33 @@ class TestSchedulerJob:
                 session=session,
             )
 
-            ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
-            local_job = LocalTaskJob(ti)
-            local_job.state = State.SHUTDOWN
-
-            session.add(local_job)
-            session.flush()
-
-            ti.job_id = local_job.id
-            session.add(ti)
-            session.flush()
-
             self.scheduler_job = SchedulerJob(subdir=os.devnull)
             self.scheduler_job.executor = MockExecutor()
             self.scheduler_job.processor_agent = mock.MagicMock()
 
+            # We will provision 2 tasks so we can check we only find zombies 
from this scheduler
+            tasks_to_setup = ['branching', 'run_this_first']
+
+            for task_id in tasks_to_setup:
+                task = dag.get_task(task_id=task_id)
+                ti = TaskInstance(task, run_id=dag_run.run_id, 
state=State.RUNNING)
+                ti.queued_by_job_id = 999
+
+                local_job = LocalTaskJob(ti)
+                local_job.state = State.SHUTDOWN
+
+                session.add(local_job)
+                session.flush()
+
+                ti.job_id = local_job.id
+                session.add(ti)
+                session.flush()
+
+            assert task.task_id == 'run_this_first'  # Make sure we have the 
task/ti we expect
+
+            ti.queued_by_job_id = self.scheduler_job.id
+            session.flush()
+
             self.scheduler_job._find_zombies(session=session)
 
             self.scheduler_job.executor.callback_sink.send.assert_called_once()

Reply via email to