This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new cd6710d3973 [v3-1-test] Fix scheduler crash when enqueuing TI with
null dag_version_id (#61813) (#61846)
cd6710d3973 is described below
commit cd6710d397375e919556974a9b9d89d6268f6cd4
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Feb 13 18:46:58 2026 +0100
[v3-1-test] Fix scheduler crash when enqueuing TI with null dag_version_id
(#61813) (#61846)
* Fix scheduler crash when enqueuing TI with null dag_version_id
After upgrade from AF2, TIs might be without dag_version_id since
we don't enforce this at the DB level. The solution here is to skip
enqueing such TIs until the verify_integrity runs which would update
the dag_version_id of the TI.
Initially, the TI would be stuck but would later be cleared when
the handle tasks stuck in queued deems it fit.
* update warning log
* update warning log
* Update the caplog level in test
(cherry picked from commit 71f84fa3d88adb73f139463fb59b3ca7154f0b11)
Co-authored-by: Ephraim Anierobi <[email protected]>
---
.../src/airflow/jobs/scheduler_job_runner.py | 7 +++++
airflow-core/tests/unit/jobs/test_scheduler_job.py | 30 ++++++++++++++++++++++
2 files changed, 37 insertions(+)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 13729013418..81cce651435 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -695,6 +695,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
if ti.dag_run.state in State.finished_dr_states:
ti.set_state(None, session=session)
continue
+ if not ti.dag_version_id:
+ self.log.warning(
+ "TaskInstance %s does not have a dag_version_id set,
cannot be enqueued. "
+ "This would get unstuck and dag_version_id updated.",
+ ti,
+ )
+ continue
workload = workloads.ExecuteTask.make(ti,
generator=executor.jwt_generator)
executor.queue_workload(workload, session=session)
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 430833fb1ca..8fac72ea3ee 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -1899,6 +1899,36 @@ class TestSchedulerJob:
assert ti.state == State.NONE
mock_queue_workload.assert_not_called()
+ def test_enqueue_task_instances_skips_ti_without_dag_version_id(self,
dag_maker, session, caplog):
+ """Task instances without dag_version_id are not enqueued and an error
is logged."""
+ dag_id =
"SchedulerJobTest.test_enqueue_task_instances_skips_ti_without_dag_version_id"
+ task_id_1 = "dummy"
+ session = settings.Session()
+ with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE,
session=session):
+ task1 = EmptyOperator(task_id=task_id_1)
+
+ scheduler_job = Job(executor=self.null_exec)
+ self.job_runner = SchedulerJobRunner(job=scheduler_job)
+
+ dr1 = dag_maker.create_dagrun()
+ ti = dr1.get_task_instance(task1.task_id, session)
+ ti.state = State.SCHEDULED
+ ti.dag_version_id = None
+ session.merge(ti)
+ session.commit()
+
+ with patch.object(BaseExecutor, "queue_workload") as
mock_queue_workload:
+ with caplog.at_level("WARNING",
logger="airflow.jobs.scheduler_job_runner"):
+ self.job_runner._enqueue_task_instances_with_queued_state(
+ [ti], executor=scheduler_job.executor, session=session
+ )
+
+ mock_queue_workload.assert_not_called()
+ assert any(
+ "does not have a dag_version_id set, cannot be enqueued" in
rec.message for rec in caplog.records
+ )
+ session.rollback()
+
@pytest.mark.parametrize(
"task1_exec, task2_exec",
[