This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new cd6710d3973 [v3-1-test] Fix scheduler crash when enqueuing TI with 
null dag_version_id (#61813) (#61846)
cd6710d3973 is described below

commit cd6710d397375e919556974a9b9d89d6268f6cd4
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Feb 13 18:46:58 2026 +0100

    [v3-1-test] Fix scheduler crash when enqueuing TI with null dag_version_id 
(#61813) (#61846)
    
    * Fix scheduler crash when enqueuing TI with null dag_version_id
    
    After upgrade from AF2, TIs might be without dag_version_id since
    we don't enforce this at the DB level. The solution here is to skip
    enqueing such TIs until the verify_integrity runs which would update
    the dag_version_id of the TI.
    
    Initially, the TI would be stuck but would later be cleared when
    the handle tasks stuck in queued deems it fit.
    
    * update warning log
    
    * update warning log
    
    * Update the caplog level in test
    (cherry picked from commit 71f84fa3d88adb73f139463fb59b3ca7154f0b11)
    
    Co-authored-by: Ephraim Anierobi <[email protected]>
---
 .../src/airflow/jobs/scheduler_job_runner.py       |  7 +++++
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 30 ++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 13729013418..81cce651435 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -695,6 +695,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             if ti.dag_run.state in State.finished_dr_states:
                 ti.set_state(None, session=session)
                 continue
+            if not ti.dag_version_id:
+                self.log.warning(
+                    "TaskInstance %s does not have a dag_version_id set, 
cannot be enqueued. "
+                    "This would get unstuck and dag_version_id updated.",
+                    ti,
+                )
+                continue
 
             workload = workloads.ExecuteTask.make(ti, 
generator=executor.jwt_generator)
             executor.queue_workload(workload, session=session)
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 430833fb1ca..8fac72ea3ee 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -1899,6 +1899,36 @@ class TestSchedulerJob:
         assert ti.state == State.NONE
         mock_queue_workload.assert_not_called()
 
+    def test_enqueue_task_instances_skips_ti_without_dag_version_id(self, 
dag_maker, session, caplog):
+        """Task instances without dag_version_id are not enqueued and an error 
is logged."""
+        dag_id = 
"SchedulerJobTest.test_enqueue_task_instances_skips_ti_without_dag_version_id"
+        task_id_1 = "dummy"
+        session = settings.Session()
+        with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE, 
session=session):
+            task1 = EmptyOperator(task_id=task_id_1)
+
+        scheduler_job = Job(executor=self.null_exec)
+        self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+        dr1 = dag_maker.create_dagrun()
+        ti = dr1.get_task_instance(task1.task_id, session)
+        ti.state = State.SCHEDULED
+        ti.dag_version_id = None
+        session.merge(ti)
+        session.commit()
+
+        with patch.object(BaseExecutor, "queue_workload") as 
mock_queue_workload:
+            with caplog.at_level("WARNING", 
logger="airflow.jobs.scheduler_job_runner"):
+                self.job_runner._enqueue_task_instances_with_queued_state(
+                    [ti], executor=scheduler_job.executor, session=session
+                )
+
+        mock_queue_workload.assert_not_called()
+        assert any(
+            "does not have a dag_version_id set, cannot be enqueued" in 
rec.message for rec in caplog.records
+        )
+        session.rollback()
+
     @pytest.mark.parametrize(
         "task1_exec, task2_exec",
         [

Reply via email to