This is an automated email from the ASF dual-hosted git repository.
ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 38a51dc1faa Fix scheduler callback bundle_version when versioning
disabled (#66485)
38a51dc1faa is described below
commit 38a51dc1faa81c3493d0c444c901afff67bfc393
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu May 7 08:28:59 2026 +0100
Fix scheduler callback bundle_version when versioning disabled (#66485)
When a DAG has disable_bundle_versioning=True, dag_run.bundle_version is
left None at trigger time, but DagVersion.bundle_version still records
the bundle SHA captured during DAG parse. Scheduler-emitted task
callbacks (zombie tasks, heartbeat timeouts, stuck-in-queued) sourced
bundle_version from DagVersion, so the DFP would check out a pinned
versions/<sha>/ working tree and write a _tracking lockfile for a SHA
the run was never pinned to — causing the callback to run against
different code than the task itself, leaving per-version checkouts on
disk, and blocking stale bundle cleanup.
Source bundle_version from dag_run.bundle_version so the callback
inherits the same unpinned state as the run and executes against the
same on-disk code the task did.
---
.../src/airflow/jobs/scheduler_job_runner.py | 18 ++-
airflow-core/tests/unit/jobs/test_scheduler_job.py | 142 ++++++++++++++++++++-
2 files changed, 156 insertions(+), 4 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index 80897213c18..3eed95a8bb0 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1408,8 +1408,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# fall back to dag_model/dag_run for legacy tasks migrated
from
# Airflow 2 where dag_version may be None (AIP-66).
_bundle_name = ti.dag_version.bundle_name if
ti.dag_version else ti.dag_model.bundle_name
+ # Mirror dag_run pinning: if the run wasn't pinned (e.g.
dag.disable_bundle_versioning=True),
+ # leave the callback unpinned so it runs against the same
code as the task.
_bundle_version = (
- ti.dag_version.bundle_version if ti.dag_version else
ti.dag_run.bundle_version
+ ti.dag_version.bundle_version
+ if ti.dag_version and ti.dag_run.bundle_version is not
None
+ else ti.dag_run.bundle_version
)
# Backfill dag_version_id for legacy tasks (Pydantic
requires uuid.UUID).
if not _ensure_ti_has_dag_version_id(ti, session,
cls.logger()):
@@ -2601,8 +2605,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
_stuck_bundle_name = (
ti.dag_version.bundle_name if ti.dag_version else
ti.dag_model.bundle_name
)
+ # Mirror dag_run pinning: if the run wasn't pinned (e.g.
dag.disable_bundle_versioning=True),
+ # leave the callback unpinned so it runs against the same
code as the task.
_stuck_bundle_version = (
- ti.dag_version.bundle_version if ti.dag_version else
ti.dag_run.bundle_version
+ ti.dag_version.bundle_version
+ if ti.dag_version and ti.dag_run.bundle_version is not
None
+ else ti.dag_run.bundle_version
)
# Backfill dag_version_id for legacy tasks (Pydantic
requires uuid.UUID).
# Note: we cannot use `continue` here because this method
is not
@@ -2959,8 +2967,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# Safely extract bundle info with fallback for legacy tasks
# (dag_version may be None after Airflow 2 → 3 migration).
_hb_bundle_name = ti.dag_version.bundle_name if ti.dag_version
else ti.dag_model.bundle_name
+ # Mirror dag_run pinning: if the run wasn't pinned (e.g.
dag.disable_bundle_versioning=True),
+ # leave the callback unpinned so it runs against the same code as
the task.
_hb_bundle_version = (
- ti.dag_version.bundle_version if ti.dag_version else
ti.dag_run.bundle_version
+ ti.dag_version.bundle_version
+ if ti.dag_version and ti.dag_run.bundle_version is not None
+ else ti.dag_run.bundle_version
)
# Backfill dag_version_id for legacy tasks (Pydantic requires
uuid.UUID).
if not _ensure_ti_has_dag_version_id(ti, session, self.log):
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 16c42e8acd0..56e69459104 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -8019,6 +8019,127 @@ class TestSchedulerJob:
assert isinstance(request, TaskCallbackRequest)
assert request.task_callback_type == expected
+ @pytest.mark.parametrize(
+ ("dag_run_bv", "dag_version_bv", "expected_bv"),
+ [
+ pytest.param(None, "abc123-sha", None,
id="disable_bundle_versioning"),
+ pytest.param("abc123-sha", "abc123-sha", "abc123-sha",
id="versioning_enabled"),
+ ],
+ )
+ def test_external_kill_callback_bundle_version_follows_dag_run(
+ self, dag_maker, session, dag_run_bv, dag_version_bv, expected_bv
+ ):
+ """
+ TaskCallbackRequest.bundle_version must mirror dag_run.bundle_version,
not
+ dag_version.bundle_version. With disable_bundle_versioning=True the
trigger
+ path leaves dag_run.bundle_version=None even though DagVersion was
written
+ with a SHA — the callback must inherit None so it runs against the same
+ on-disk code as the task did.
+ """
+ with dag_maker(dag_id=f"ext_kill_bv_{dag_run_bv or 'none'}",
fileloc="/test_path1/"):
+ EmptyOperator(task_id="t1", on_failure_callback=lambda ctx: None)
+ dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+
+ ti = dr.get_task_instance(task_id="t1", session=session)
+ dag_version = ti.dag_version
+ dag_version.bundle_version = dag_version_bv
+ dr.bundle_version = dag_run_bv
+ ti.state = State.QUEUED
+ session.merge(dag_version)
+ session.merge(dr)
+ session.merge(ti)
+ session.commit()
+
+ executor = MockExecutor(do_update=False)
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(scheduler_job,
executors=[executor])
+ executor.event_buffer[ti.key] = State.FAILED, None
+
+ self.job_runner._process_executor_events(executor=executor,
session=session)
+
+ self.job_runner.executor.callback_sink.send.assert_called_once()
+ request = self.job_runner.executor.callback_sink.send.call_args[0][0]
+ assert isinstance(request, TaskCallbackRequest)
+ assert request.bundle_version == expected_bv
+
+ def test_heartbeat_timeout_callback_bundle_version_follows_dag_run(self,
dag_maker, session):
+ """
+ Same invariant as the external-kill path, exercised through
+ _find_and_purge_task_instances_without_heartbeats.
+ """
+ with dag_maker(dag_id="hb_timeout_bv", fileloc="/test_path1/"):
+ EmptyOperator(task_id="t1")
+ dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
+
+ executor = MagicMock()
+ scheduler_job = Job()
+ self.job_runner = SchedulerJobRunner(scheduler_job,
executors=[executor])
+
+ ti = dr.get_task_instance(task_id="t1", session=session)
+ dag_version = ti.dag_version
+ # disable_bundle_versioning state: DagVersion has a SHA, dag_run is
unpinned.
+ dag_version.bundle_version = "abc123-sha"
+ dr.bundle_version = None
+ ti.state = TaskInstanceState.RUNNING
+ ti.queued_by_job_id = scheduler_job.id
+ ti.last_heartbeat_at = timezone.utcnow() - timedelta(seconds=600)
+ session.merge(dag_version)
+ session.merge(dr)
+ session.merge(ti)
+ session.commit()
+
+ self.job_runner._find_and_purge_task_instances_without_heartbeats()
+
+ executor.send_callback.assert_called_once()
+ request = executor.send_callback.call_args[0][0]
+ assert isinstance(request, TaskCallbackRequest)
+ assert request.bundle_version is None
+
+ @conf_vars({("scheduler", "num_stuck_in_queued_retries"): "1"})
+ def test_stuck_in_queued_callback_bundle_version_follows_dag_run(
+ self, dag_maker, session, mock_executors
+ ):
+ """
+ Same invariant as the external-kill path, exercised through
+ _handle_tasks_stuck_in_queued. With num_stuck_in_queued_retries=1, the
+ first stuck detection exhausts the budget and emits the failure
callback.
+ """
+ with dag_maker(dag_id="stuck_bv"):
+ EmptyOperator(task_id="op1",
on_failure_callback=TestSchedulerJob.mock_failure_callback)
+ run_id = str(uuid4())
+ dr = dag_maker.create_dagrun(run_id=run_id, state=DagRunState.RUNNING)
+
+ ti = dr.get_task_instance(task_id="op1", session=session)
+ dag_version = ti.dag_version
+ dag_version.bundle_version = "abc123-sha"
+ dr.bundle_version = None
+ ti.state = State.QUEUED
+ ti.queued_dttm = timezone.utcnow()
+ session.merge(dag_version)
+ session.merge(dr)
+ session.merge(ti)
+ session.commit()
+
+ scheduler_job = Job()
+ scheduler = SchedulerJobRunner(job=scheduler_job, num_runs=0)
+ scheduler._task_queued_timeout = -300 # always in violation
+
+ # First sweep: reschedule (budget consumed). Re-queue and sweep again
to exceed.
+ with _loader_mock(mock_executors):
+ scheduler._handle_tasks_stuck_in_queued()
+ ti = dr.get_task_instance(task_id="op1", session=session)
+ ti.state = State.QUEUED
+ ti.queued_dttm = timezone.utcnow()
+ session.merge(ti)
+ session.commit()
+ with _loader_mock(mock_executors):
+ scheduler._handle_tasks_stuck_in_queued()
+
+ mock_executors[0].send_callback.assert_called_once()
+ request = mock_executors[0].send_callback.call_args[0][0]
+ assert isinstance(request, TaskCallbackRequest)
+ assert request.bundle_version is None
+
def test_scheduler_passes_context_from_server_on_task_failure(self,
dag_maker, session):
"""Test that scheduler passes context_from_server when handling task
failures."""
with dag_maker(dag_id="test_dag", session=session):
@@ -9926,7 +10047,11 @@ def _extract_bundle_name(ti):
def _extract_bundle_version(ti):
"""Mirror the inline fallback logic from scheduler_job_runner.py."""
- return ti.dag_version.bundle_version if ti.dag_version else
ti.dag_run.bundle_version
+ return (
+ ti.dag_version.bundle_version
+ if ti.dag_version and ti.dag_run.bundle_version is not None
+ else ti.dag_run.bundle_version
+ )
class TestSchedulerCallbackBundleInfoDagVersionNullable:
@@ -10023,3 +10148,18 @@ class
TestSchedulerCallbackBundleInfoDagVersionNullable:
assert _extract_bundle_name(ti) == "fallback-bundle"
assert _extract_bundle_version(ti) == "fallback-v1"
+
+ def test_unpinned_dag_run_overrides_dag_version_bundle_version(self):
+ """
+ When dag_run.bundle_version is None (e.g.
dag.disable_bundle_versioning=True
+ leaves it unpinned even though DagVersion was written with a SHA), the
+ callback must inherit None so it runs against the same on-disk code as
+ the task did, instead of pinning to the DagVersion's recorded SHA.
+ """
+ dv = _make_dag_version(bundle_name="my-bundle",
bundle_version="abc123-sha")
+ ti = _make_ti_with_dag_version(dag_version=dv,
dag_run_bundle_version=None)
+
+ # bundle_name still comes from dag_version
+ assert _extract_bundle_name(ti) == "my-bundle"
+ # but bundle_version follows the dag_run's unpinned state
+ assert _extract_bundle_version(ti) is None