This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 6aef8030135591a4cbf61c582fee0c2967164660 Author: Ephraim Anierobi <[email protected]> AuthorDate: Thu Sep 18 18:37:45 2025 +0100 Fix scheduler handling of orphaned tasks from airflow 2 (#55848) Set last_heartbeat_at on adoption when missing since we no longer have LocalTaskJob but now heartbeat with the TI.last_heartbeat_at. Initialize dag_run.conf to {} on adoption when it is None to avoid errors in callback model validation. Skip heartbeat-timeout handling when a TaskInstance has no dag_version and log a clear warning. It could be that the DAG has import error. This allows time to fix the dag for a new run. Tests added: Ensure last_heartbeat_at is set when adopting orphaned tasks. Ensure dag_run.conf is initialized to {} on adoption when None. Ensure purge skips processing (no callback/state change) when dag_version is missing. (cherry picked from commit c9e1089fcb039a570ed1fed61d72915611ba0b8d) --- .../src/airflow/jobs/scheduler_job_runner.py | 13 ++++ airflow-core/tests/unit/jobs/test_scheduler_job.py | 88 ++++++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index e718411c1bd..3ab84d6d5ed 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2291,6 +2291,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): for ti in set(tis_to_adopt_or_reset) - set(to_reset): ti.queued_by_job_id = self.job.id + # If old ti from Airflow 2 and last_heartbeat_at is None, set last_heartbeat_at to now + if ti.last_heartbeat_at is None: + ti.last_heartbeat_at = timezone.utcnow() + # If old ti from Airflow 2 and dag_run.conf is None, set dag_run.conf to {} + if ti.dag_run.conf is None: + ti.dag_run.conf = {} Stats.incr("scheduler.orphaned_tasks.cleared", len(to_reset)) Stats.incr("scheduler.orphaned_tasks.adopted", len(tis_to_adopt_or_reset) - len(to_reset)) @@ -2387,6 +2393,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin): task_instance_heartbeat_timeout_message_details = ( self._generate_task_instance_heartbeat_timeout_message_details(ti) ) + if not ti.dag_version: + # If old ti from Airflow 2 and dag_version is None, skip heartbeat timeout handling. + self.log.warning( + "DAG Version not found for TaskInstance %s. Skipping heartbeat timeout handling.", + ti, + ) + continue request = TaskCallbackRequest( filepath=ti.dag_model.relative_fileloc, bundle_name=ti.dag_version.bundle_name, diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index d52d3eaed15..c6b1edfed8e 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -2082,6 +2082,94 @@ class TestSchedulerJob: # Second executor called for ti3 mock_executors[1].try_adopt_task_instances.assert_called_once_with([ti3]) + def test_adopt_sets_last_heartbeat_on_adopt(self, dag_maker, session, mock_executor): + with dag_maker("test_adopt_sets_last_heartbeat_on_adopt", session=session): + op1 = EmptyOperator(task_id="op1") + + old_scheduler_job = Job() + session.add(old_scheduler_job) + session.flush() + + dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) + ti = dr.get_task_instance(task_id=op1.task_id, session=session) + ti.state = State.QUEUED + ti.queued_by_job_id = old_scheduler_job.id + ti.last_heartbeat_at = None + session.commit() + + # Executor adopts all TIs (returns empty list to reset), so TI is adopted + mock_executor.try_adopt_task_instances.return_value = [] + + new_scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=new_scheduler_job, num_runs=0) + self.job_runner.adopt_or_reset_orphaned_tasks(session=session) + + ti.refresh_from_db(session=session) + assert ti.state == State.QUEUED + assert ti.queued_by_job_id == new_scheduler_job.id + assert ti.last_heartbeat_at is not None + + def test_adopt_sets_dagrun_conf_when_none(self, dag_maker, session, mock_executor): + with dag_maker("test_adopt_sets_dagrun_conf_when_none", session=session): + op1 = EmptyOperator(task_id="op1") + + old_scheduler_job = Job() + session.add(old_scheduler_job) + session.flush() + + dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED) + # Ensure conf starts as None + dr.conf = None + session.merge(dr) + session.flush() + dr = session.scalar(select(DagRun).where(DagRun.id == dr.id)) + assert dr.conf is None + + ti = dr.get_task_instance(task_id=op1.task_id, session=session) + ti.state = State.QUEUED + ti.queued_by_job_id = old_scheduler_job.id + session.commit() + + # Executor adopts all TIs (returns empty list to reset), so TI is adopted + mock_executor.try_adopt_task_instances.return_value = [] + + new_scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=new_scheduler_job, num_runs=0) + self.job_runner.adopt_or_reset_orphaned_tasks(session=session) + + # DagRun.conf should be set to {} on adoption when it was None + session.refresh(dr) + assert dr.conf == {} + + def test_purge_without_heartbeat_skips_when_missing_dag_version(self, dag_maker, session, caplog): + with dag_maker("test_purge_without_heartbeat_skips_when_missing_dag_version", session=session): + EmptyOperator(task_id="task") + + dag_run = dag_maker.create_dagrun(run_id="test_run", state=DagRunState.RUNNING) + + mock_executor = MagicMock() + scheduler_job = Job(executor=mock_executor) + self.job_runner = SchedulerJobRunner(scheduler_job) + + ti = dag_run.get_task_instance(task_id="task", session=session) + ti.state = TaskInstanceState.RUNNING + ti.queued_by_job_id = scheduler_job.id + ti.last_heartbeat_at = timezone.utcnow() - timedelta(hours=1) + # Simulate missing dag_version + ti.dag_version_id = None + session.merge(ti) + session.commit() + + with caplog.at_level("WARNING", logger="airflow.jobs.scheduler_job_runner"): + self.job_runner._purge_task_instances_without_heartbeats([ti], session=session) + + # Should log a warning and skip processing + assert any("DAG Version not found for TaskInstance" in rec.message for rec in caplog.records) + mock_executor.send_callback.assert_not_called() + # State should be unchanged (not failed) + ti.refresh_from_db(session=session) + assert ti.state == TaskInstanceState.RUNNING + @staticmethod def mock_failure_callback(context): pass
