This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6aef8030135591a4cbf61c582fee0c2967164660
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu Sep 18 18:37:45 2025 +0100

    Fix scheduler handling of orphaned tasks from airflow 2 (#55848)
    
    Set last_heartbeat_at on adoption when missing since we no longer have 
LocalTaskJob but
    now heartbeat with the TI.last_heartbeat_at.
    Initialize dag_run.conf to {} on adoption when it is None to avoid errors 
in callback
    model validation.
    Skip heartbeat-timeout handling when a TaskInstance has no dag_version
    and log a clear warning. It could be that the DAG has import error. This 
allows time
    to fix the dag for a new run.
    
    Tests added:
    Ensure last_heartbeat_at is set when adopting orphaned tasks.
    Ensure dag_run.conf is initialized to {} on adoption when None.
    Ensure purge skips processing (no callback/state change) when
    dag_version is missing.
    
    (cherry picked from commit c9e1089fcb039a570ed1fed61d72915611ba0b8d)
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 13 ++++
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 88 ++++++++++++++++++++++
 2 files changed, 101 insertions(+)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index e718411c1bd..3ab84d6d5ed 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -2291,6 +2291,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
                     for ti in set(tis_to_adopt_or_reset) - set(to_reset):
                         ti.queued_by_job_id = self.job.id
+                        # If old ti from Airflow 2 and last_heartbeat_at is 
None, set last_heartbeat_at to now
+                        if ti.last_heartbeat_at is None:
+                            ti.last_heartbeat_at = timezone.utcnow()
+                        # If old ti from Airflow 2 and dag_run.conf is None, 
set dag_run.conf to {}
+                        if ti.dag_run.conf is None:
+                            ti.dag_run.conf = {}
 
                     Stats.incr("scheduler.orphaned_tasks.cleared", 
len(to_reset))
                     Stats.incr("scheduler.orphaned_tasks.adopted", 
len(tis_to_adopt_or_reset) - len(to_reset))
@@ -2387,6 +2393,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             task_instance_heartbeat_timeout_message_details = (
                 
self._generate_task_instance_heartbeat_timeout_message_details(ti)
             )
+            if not ti.dag_version:
+                # If old ti from Airflow 2 and dag_version is None, skip 
heartbeat timeout handling.
+                self.log.warning(
+                    "DAG Version not found for TaskInstance %s. Skipping 
heartbeat timeout handling.",
+                    ti,
+                )
+                continue
             request = TaskCallbackRequest(
                 filepath=ti.dag_model.relative_fileloc,
                 bundle_name=ti.dag_version.bundle_name,
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index d52d3eaed15..c6b1edfed8e 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -2082,6 +2082,94 @@ class TestSchedulerJob:
         # Second executor called for ti3
         
mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3])
 
+    def test_adopt_sets_last_heartbeat_on_adopt(self, dag_maker, session, 
mock_executor):
+        with dag_maker("test_adopt_sets_last_heartbeat_on_adopt", 
session=session):
+            op1 = EmptyOperator(task_id="op1")
+
+        old_scheduler_job = Job()
+        session.add(old_scheduler_job)
+        session.flush()
+
+        dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.QUEUED
+        ti.queued_by_job_id = old_scheduler_job.id
+        ti.last_heartbeat_at = None
+        session.commit()
+
+        # Executor adopts all TIs (returns empty list to reset), so TI is 
adopted
+        mock_executor.try_adopt_task_instances.return_value = []
+
+        new_scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=new_scheduler_job, num_runs=0)
+        self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
+
+        ti.refresh_from_db(session=session)
+        assert ti.state == State.QUEUED
+        assert ti.queued_by_job_id == new_scheduler_job.id
+        assert ti.last_heartbeat_at is not None
+
+    def test_adopt_sets_dagrun_conf_when_none(self, dag_maker, session, 
mock_executor):
+        with dag_maker("test_adopt_sets_dagrun_conf_when_none", 
session=session):
+            op1 = EmptyOperator(task_id="op1")
+
+        old_scheduler_job = Job()
+        session.add(old_scheduler_job)
+        session.flush()
+
+        dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
+        # Ensure conf starts as None
+        dr.conf = None
+        session.merge(dr)
+        session.flush()
+        dr = session.scalar(select(DagRun).where(DagRun.id == dr.id))
+        assert dr.conf is None
+
+        ti = dr.get_task_instance(task_id=op1.task_id, session=session)
+        ti.state = State.QUEUED
+        ti.queued_by_job_id = old_scheduler_job.id
+        session.commit()
+
+        # Executor adopts all TIs (returns empty list to reset), so TI is 
adopted
+        mock_executor.try_adopt_task_instances.return_value = []
+
+        new_scheduler_job = Job()
+        self.job_runner = SchedulerJobRunner(job=new_scheduler_job, num_runs=0)
+        self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
+
+        # DagRun.conf should be set to {} on adoption when it was None
+        session.refresh(dr)
+        assert dr.conf == {}
+
+    def test_purge_without_heartbeat_skips_when_missing_dag_version(self, 
dag_maker, session, caplog):
+        with 
dag_maker("test_purge_without_heartbeat_skips_when_missing_dag_version", 
session=session):
+            EmptyOperator(task_id="task")
+
+        dag_run = dag_maker.create_dagrun(run_id="test_run", 
state=DagRunState.RUNNING)
+
+        mock_executor = MagicMock()
+        scheduler_job = Job(executor=mock_executor)
+        self.job_runner = SchedulerJobRunner(scheduler_job)
+
+        ti = dag_run.get_task_instance(task_id="task", session=session)
+        ti.state = TaskInstanceState.RUNNING
+        ti.queued_by_job_id = scheduler_job.id
+        ti.last_heartbeat_at = timezone.utcnow() - timedelta(hours=1)
+        # Simulate missing dag_version
+        ti.dag_version_id = None
+        session.merge(ti)
+        session.commit()
+
+        with caplog.at_level("WARNING", 
logger="airflow.jobs.scheduler_job_runner"):
+            self.job_runner._purge_task_instances_without_heartbeats([ti], 
session=session)
+
+        # Should log a warning and skip processing
+        assert any("DAG Version not found for TaskInstance" in rec.message for 
rec in caplog.records)
+        mock_executor.send_callback.assert_not_called()
+        # State should be unchanged (not failed)
+        ti.refresh_from_db(session=session)
+        assert ti.state == TaskInstanceState.RUNNING
+
     @staticmethod
     def mock_failure_callback(context):
         pass

Reply via email to