This is an automated email from the ASF dual-hosted git repository.
ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cb7c347d81f Skip FK-referenced dag_version rows during db clean
(#68339)
cb7c347d81f is described below
commit cb7c347d81f471e5e08ffc55fabcc2f62e9074c5
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Sun Jun 14 11:18:41 2026 +0100
Skip FK-referenced dag_version rows during db clean (#68339)
airflow db clean on the dag_version table selected old, non-latest
versions for deletion regardless of whether they were still referenced.
Because task_instance.dag_version_id is ON DELETE RESTRICT, deleting a
version still referenced by a task instance fails the foreign key, so
the command could not prune dag_version at all for any DAG with history.
Add a generic skip_if_referenced option to the cleanup table config that
excludes rows still referenced by a given (table, fk_column) via a
correlated NOT EXISTS, and apply it to dag_version for
task_instance.dag_version_id. Cleanup now prunes only orphaned older
versions and makes progress as task instances age out and are cleaned.
related: #66177
---
airflow-core/src/airflow/utils/db_cleanup.py | 48 +++++++++++++-
airflow-core/tests/unit/utils/test_db_cleanup.py | 83 ++++++++++++++++++++++++
2 files changed, 130 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/utils/db_cleanup.py
b/airflow-core/src/airflow/utils/db_cleanup.py
index 976a4a5e17c..8197d56c42d 100644
--- a/airflow-core/src/airflow/utils/db_cleanup.py
+++ b/airflow-core/src/airflow/utils/db_cleanup.py
@@ -32,7 +32,7 @@ from dataclasses import dataclass
from types import SimpleNamespace
from typing import TYPE_CHECKING, Any
-from sqlalchemy import and_, column, func, inspect, select, table, text
+from sqlalchemy import and_, column, func, inspect, literal, select, table,
text
from sqlalchemy.exc import OperationalError, ProgrammingError
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import aliased
@@ -78,6 +78,12 @@ class _TableConfig:
:param keep_last_group_by: if keeping the last record, can keep the last
record for each group
:param dependent_tables: list of tables which have FK relationship with
this table
:param extra_filters: SQLAlchemy expressions ANDed with the recency
filter; referenced columns must be in ``extra_columns``.
+ :param skip_if_referenced: list of ``(referencing_table, fk_column)``
pairs whose FK points at this
+ table's ``referenced_pk_column``. A row that is still referenced by
any of these is excluded from
+ deletion. This avoids issuing deletes that would violate an ``ON
DELETE RESTRICT`` foreign key
+ (e.g. ``task_instance.dag_version_id``) — such deletes fail and, on
MySQL, can leave the cleanup
+ command blocked on metadata locks. ``referenced_pk_column`` must be
listed in ``extra_columns``.
+ :param referenced_pk_column: the primary-key column of this table that
``skip_if_referenced`` FKs point at.
"""
table_name: str
@@ -92,6 +98,8 @@ class _TableConfig:
# Relying on automation here would increase complexity and reduce
maintainability.
dependent_tables: list[str] | None = None
extra_filters: list[Any] | None = None
+ skip_if_referenced: list[tuple[str, str]] | None = None
+ referenced_pk_column: str = "id"
def __post_init__(self):
self.recency_column = column(self.recency_column_name)
@@ -109,6 +117,16 @@ class _TableConfig:
self.recency_column,
)
+ # skip_if_referenced filters on referenced_pk_column, which must be a
column of orm_model
+ # (added via extra_columns). Fail fast with a clear message instead of
a cryptic KeyError
+ # raised later when _build_query evaluates
base_table.c[referenced_pk_column].
+ if self.skip_if_referenced and self.referenced_pk_column not in
self.orm_model.c.keys():
+ raise ValueError(
+ f"_TableConfig for table {self.table_name!r} sets
skip_if_referenced but its "
+ f"referenced_pk_column {self.referenced_pk_column!r} is not
one of its columns; "
+ f"add {self.referenced_pk_column!r} to extra_columns."
+ )
+
def __lt__(self, other):
return self.table_name < other.table_name
@@ -174,10 +192,16 @@ config_list: list[_TableConfig] = [
_TableConfig(
table_name="dag_version",
recency_column_name="created_at",
+ extra_columns=["id"],
dependent_tables=["task_instance", "dag_run"],
dag_id_column_name="dag_id",
keep_last=True,
keep_last_group_by=["dag_id"],
+ # task_instance.dag_version_id is ON DELETE RESTRICT, so a version
still referenced by any
+ # task instance cannot be deleted. Skip those rows instead of issuing
a delete that would
+ # fail the FK (and hang on MySQL). They become eligible once their
task instances age out
+ # and are cleaned. dag_run.created_dag_version_id is ON DELETE SET
NULL, so it does not block.
+ skip_if_referenced=[("task_instance", "dag_version_id")],
),
_TableConfig(table_name="deadline", recency_column_name="deadline_time",
dag_id_column_name="dag_id"),
_TableConfig(table_name="revoked_token", recency_column_name="exp"),
@@ -357,6 +381,8 @@ def _build_query(
dag_ids: list[str] | None = None,
exclude_dag_ids: list[str] | None = None,
extra_filters: list[Any] | None = None,
+ skip_if_referenced: list[tuple[str, str]] | None = None,
+ referenced_pk_column: str = "id",
**kwargs,
) -> Select:
base_table_alias = "base"
@@ -368,6 +394,22 @@ def _build_query(
if extra_filters:
conditions.extend(extra_filters)
+ if skip_if_referenced:
+ # Exclude rows still referenced by a RESTRICT foreign key; deleting
them would fail the
+ # constraint (and on MySQL leave the command blocked on metadata
locks). correlate() is
+ # explicit on purpose: this is a NOT EXISTS guard whose silent failure
would delete
+ # still-referenced rows, so we don't rely on implicit correlation of
the base table.
+ base_table_pk_col = base_table.c[referenced_pk_column]
+ for referencing_table, fk_column in skip_if_referenced:
+ referencing = table(referencing_table, column(fk_column))
+ conditions.append(
+ ~select(literal(1))
+ .select_from(referencing)
+ .where(referencing.c[fk_column] == base_table_pk_col)
+ .correlate(base_table)
+ .exists()
+ )
+
if (dag_ids or exclude_dag_ids) and dag_id_column is not None:
base_table_dag_id_col = base_table.c[dag_id_column.name]
@@ -414,6 +456,8 @@ def _cleanup_table(
session: Session,
batch_size: int | None = None,
extra_filters: list[Any] | None = None,
+ skip_if_referenced: list[tuple[str, str]] | None = None,
+ referenced_pk_column: str = "id",
**kwargs,
) -> None:
print()
@@ -430,6 +474,8 @@ def _cleanup_table(
keep_last_group_by=keep_last_group_by,
clean_before_timestamp=clean_before_timestamp,
extra_filters=extra_filters,
+ skip_if_referenced=skip_if_referenced,
+ referenced_pk_column=referenced_pk_column,
session=session,
)
logger.debug("old rows query:\n%s", query.selectable.compile())
diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py
b/airflow-core/tests/unit/utils/test_db_cleanup.py
index ed4978df45d..4d66a96b1a3 100644
--- a/airflow-core/tests/unit/utils/test_db_cleanup.py
+++ b/airflow-core/tests/unit/utils/test_db_cleanup.py
@@ -48,6 +48,7 @@ from airflow.utils.db_cleanup import (
_confirm_drop_archives,
_dump_table_to_file,
_get_archived_table_names,
+ _TableConfig,
config_dict,
drop_archived_tables,
export_archived_records,
@@ -475,6 +476,88 @@ class TestDBCleanup:
assert session.scalar(select(func.count()).select_from(model)) == 5
assert len(_get_archived_table_names(["dag_run"], session)) ==
expected_archives
+ def test_dag_version_cleanup_skips_versions_pinned_by_task_instance(self):
+ """db clean must skip dag_version rows still referenced by a task
instance.
+
+ ``task_instance.dag_version_id`` is ``ON DELETE RESTRICT``, so
deleting a referenced
+ ``dag_version`` fails the foreign key. Cleanup must skip those rows
while still pruning
+ genuinely orphaned older versions.
+ """
+ base_date = pendulum.DateTime(2022, 1, 1,
tzinfo=pendulum.timezone("UTC"))
+ bundle_name = f"testing-{uuid4()}"
+ dag_id = f"test_dag_{uuid4()}"
+
+ with create_session() as session:
+ session.add(DagBundleModel(name=bundle_name))
+ session.flush()
+ session.add(DagModel(dag_id=dag_id, bundle_name=bundle_name))
+ session.flush()
+
+ pinned_old = DagVersion(
+ dag_id=dag_id,
+ version_number=1,
+ bundle_name=bundle_name,
+ created_at=base_date,
+ last_updated=base_date,
+ )
+ orphan_old = DagVersion(
+ dag_id=dag_id,
+ version_number=2,
+ bundle_name=bundle_name,
+ created_at=base_date.add(minutes=1),
+ last_updated=base_date.add(minutes=1),
+ )
+ latest = DagVersion(
+ dag_id=dag_id,
+ version_number=3,
+ bundle_name=bundle_name,
+ created_at=base_date.add(minutes=2),
+ last_updated=base_date.add(minutes=2),
+ )
+ session.add_all([pinned_old, orphan_old, latest])
+ session.flush()
+
+ dag = DAG(dag_id=dag_id)
+ dag_run = DagRun(dag_id, run_id="run-1",
run_type=DagRunType.MANUAL, start_date=base_date)
+ ti = create_task_instance(
+ PythonOperator(task_id="dummy-task", python_callable=print),
+ run_id=dag_run.run_id,
+ dag_version_id=pinned_old.id,
+ )
+ ti.dag_id = dag.dag_id
+ ti.start_date = base_date
+ session.add_all([dag_run, ti])
+ session.commit()
+
+ pinned_id, orphan_id, latest_id = pinned_old.id, orphan_old.id,
latest.id
+
+ # Previously this raised an IntegrityError on the FK; it must now
succeed.
+ _cleanup_table(
+ **config_dict["dag_version"].__dict__,
+ clean_before_timestamp=base_date.add(days=10),
+ dry_run=False,
+ session=session,
+ table_names=["dag_version"],
+ skip_archive=True,
+ )
+
+ remaining =
set(session.scalars(select(DagVersion.id).where(DagVersion.dag_id ==
dag_id)).all())
+
+ assert pinned_id in remaining # still referenced by a task instance
-> skipped
+ assert latest_id in remaining # kept by keep_last
+ assert orphan_id not in remaining # old and unreferenced -> pruned
+
+ def test_table_config_skip_if_referenced_requires_pk_column(self):
+ """A misconfigured skip_if_referenced (pk not in columns) must fail
fast at construction."""
+ with pytest.raises(ValueError, match="referenced_pk_column"):
+ _TableConfig(
+ table_name="dag_version",
+ recency_column_name="created_at",
+ dag_id_column_name="dag_id",
+ skip_if_referenced=[("task_instance", "dag_version_id")],
+ # "id" intentionally omitted from extra_columns
+ )
+
@patch("airflow.utils.db.reflect_tables")
def test_skip_archive_failure_will_remove_table(self, reflect_tables_mock):
"""