This is an automated email from the ASF dual-hosted git repository.

ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cb7c347d81f Skip FK-referenced dag_version rows during db clean 
(#68339)
cb7c347d81f is described below

commit cb7c347d81f471e5e08ffc55fabcc2f62e9074c5
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Sun Jun 14 11:18:41 2026 +0100

    Skip FK-referenced dag_version rows during db clean (#68339)
    
    airflow db clean on the dag_version table selected old, non-latest
    versions for deletion regardless of whether they were still referenced.
    Because task_instance.dag_version_id is ON DELETE RESTRICT, deleting a
    version still referenced by a task instance fails the foreign key, so
    the command could not prune dag_version at all for any DAG with history.
    
    Add a generic skip_if_referenced option to the cleanup table config that
    excludes rows still referenced by a given (table, fk_column) via a
    correlated NOT EXISTS, and apply it to dag_version for
    task_instance.dag_version_id. Cleanup now prunes only orphaned older
    versions and makes progress as task instances age out and are cleaned.
    
    related: #66177
---
 airflow-core/src/airflow/utils/db_cleanup.py     | 48 +++++++++++++-
 airflow-core/tests/unit/utils/test_db_cleanup.py | 83 ++++++++++++++++++++++++
 2 files changed, 130 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/utils/db_cleanup.py 
b/airflow-core/src/airflow/utils/db_cleanup.py
index 976a4a5e17c..8197d56c42d 100644
--- a/airflow-core/src/airflow/utils/db_cleanup.py
+++ b/airflow-core/src/airflow/utils/db_cleanup.py
@@ -32,7 +32,7 @@ from dataclasses import dataclass
 from types import SimpleNamespace
 from typing import TYPE_CHECKING, Any
 
-from sqlalchemy import and_, column, func, inspect, select, table, text
+from sqlalchemy import and_, column, func, inspect, literal, select, table, 
text
 from sqlalchemy.exc import OperationalError, ProgrammingError
 from sqlalchemy.ext.compiler import compiles
 from sqlalchemy.orm import aliased
@@ -78,6 +78,12 @@ class _TableConfig:
     :param keep_last_group_by: if keeping the last record, can keep the last 
record for each group
     :param dependent_tables: list of tables which have FK relationship with 
this table
     :param extra_filters: SQLAlchemy expressions ANDed with the recency 
filter; referenced columns must be in ``extra_columns``.
+    :param skip_if_referenced: list of ``(referencing_table, fk_column)`` 
pairs whose FK points at this
+        table's ``referenced_pk_column``. A row that is still referenced by 
any of these is excluded from
+        deletion. This avoids issuing deletes that would violate an ``ON 
DELETE RESTRICT`` foreign key
+        (e.g. ``task_instance.dag_version_id``) — such deletes fail and, on 
MySQL, can leave the cleanup
+        command blocked on metadata locks. ``referenced_pk_column`` must be 
listed in ``extra_columns``.
+    :param referenced_pk_column: the primary-key column of this table that 
``skip_if_referenced`` FKs point at.
     """
 
     table_name: str
@@ -92,6 +98,8 @@ class _TableConfig:
     # Relying on automation here would increase complexity and reduce 
maintainability.
     dependent_tables: list[str] | None = None
     extra_filters: list[Any] | None = None
+    skip_if_referenced: list[tuple[str, str]] | None = None
+    referenced_pk_column: str = "id"
 
     def __post_init__(self):
         self.recency_column = column(self.recency_column_name)
@@ -109,6 +117,16 @@ class _TableConfig:
                 self.recency_column,
             )
 
+        # skip_if_referenced filters on referenced_pk_column, which must be a 
column of orm_model
+        # (added via extra_columns). Fail fast with a clear message instead of 
a cryptic KeyError
+        # raised later when _build_query evaluates 
base_table.c[referenced_pk_column].
+        if self.skip_if_referenced and self.referenced_pk_column not in 
self.orm_model.c.keys():
+            raise ValueError(
+                f"_TableConfig for table {self.table_name!r} sets 
skip_if_referenced but its "
+                f"referenced_pk_column {self.referenced_pk_column!r} is not 
one of its columns; "
+                f"add {self.referenced_pk_column!r} to extra_columns."
+            )
+
     def __lt__(self, other):
         return self.table_name < other.table_name
 
@@ -174,10 +192,16 @@ config_list: list[_TableConfig] = [
     _TableConfig(
         table_name="dag_version",
         recency_column_name="created_at",
+        extra_columns=["id"],
         dependent_tables=["task_instance", "dag_run"],
         dag_id_column_name="dag_id",
         keep_last=True,
         keep_last_group_by=["dag_id"],
+        # task_instance.dag_version_id is ON DELETE RESTRICT, so a version 
still referenced by any
+        # task instance cannot be deleted. Skip those rows instead of issuing 
a delete that would
+        # fail the FK (and hang on MySQL). They become eligible once their 
task instances age out
+        # and are cleaned. dag_run.created_dag_version_id is ON DELETE SET 
NULL, so it does not block.
+        skip_if_referenced=[("task_instance", "dag_version_id")],
     ),
     _TableConfig(table_name="deadline", recency_column_name="deadline_time", 
dag_id_column_name="dag_id"),
     _TableConfig(table_name="revoked_token", recency_column_name="exp"),
@@ -357,6 +381,8 @@ def _build_query(
     dag_ids: list[str] | None = None,
     exclude_dag_ids: list[str] | None = None,
     extra_filters: list[Any] | None = None,
+    skip_if_referenced: list[tuple[str, str]] | None = None,
+    referenced_pk_column: str = "id",
     **kwargs,
 ) -> Select:
     base_table_alias = "base"
@@ -368,6 +394,22 @@ def _build_query(
     if extra_filters:
         conditions.extend(extra_filters)
 
+    if skip_if_referenced:
+        # Exclude rows still referenced by a RESTRICT foreign key; deleting 
them would fail the
+        # constraint (and on MySQL leave the command blocked on metadata 
locks). correlate() is
+        # explicit on purpose: this is a NOT EXISTS guard whose silent failure 
would delete
+        # still-referenced rows, so we don't rely on implicit correlation of 
the base table.
+        base_table_pk_col = base_table.c[referenced_pk_column]
+        for referencing_table, fk_column in skip_if_referenced:
+            referencing = table(referencing_table, column(fk_column))
+            conditions.append(
+                ~select(literal(1))
+                .select_from(referencing)
+                .where(referencing.c[fk_column] == base_table_pk_col)
+                .correlate(base_table)
+                .exists()
+            )
+
     if (dag_ids or exclude_dag_ids) and dag_id_column is not None:
         base_table_dag_id_col = base_table.c[dag_id_column.name]
 
@@ -414,6 +456,8 @@ def _cleanup_table(
     session: Session,
     batch_size: int | None = None,
     extra_filters: list[Any] | None = None,
+    skip_if_referenced: list[tuple[str, str]] | None = None,
+    referenced_pk_column: str = "id",
     **kwargs,
 ) -> None:
     print()
@@ -430,6 +474,8 @@ def _cleanup_table(
         keep_last_group_by=keep_last_group_by,
         clean_before_timestamp=clean_before_timestamp,
         extra_filters=extra_filters,
+        skip_if_referenced=skip_if_referenced,
+        referenced_pk_column=referenced_pk_column,
         session=session,
     )
     logger.debug("old rows query:\n%s", query.selectable.compile())
diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py 
b/airflow-core/tests/unit/utils/test_db_cleanup.py
index ed4978df45d..4d66a96b1a3 100644
--- a/airflow-core/tests/unit/utils/test_db_cleanup.py
+++ b/airflow-core/tests/unit/utils/test_db_cleanup.py
@@ -48,6 +48,7 @@ from airflow.utils.db_cleanup import (
     _confirm_drop_archives,
     _dump_table_to_file,
     _get_archived_table_names,
+    _TableConfig,
     config_dict,
     drop_archived_tables,
     export_archived_records,
@@ -475,6 +476,88 @@ class TestDBCleanup:
             assert session.scalar(select(func.count()).select_from(model)) == 5
             assert len(_get_archived_table_names(["dag_run"], session)) == 
expected_archives
 
+    def test_dag_version_cleanup_skips_versions_pinned_by_task_instance(self):
+        """db clean must skip dag_version rows still referenced by a task 
instance.
+
+        ``task_instance.dag_version_id`` is ``ON DELETE RESTRICT``, so 
deleting a referenced
+        ``dag_version`` fails the foreign key. Cleanup must skip those rows 
while still pruning
+        genuinely orphaned older versions.
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+        bundle_name = f"testing-{uuid4()}"
+        dag_id = f"test_dag_{uuid4()}"
+
+        with create_session() as session:
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+            session.add(DagModel(dag_id=dag_id, bundle_name=bundle_name))
+            session.flush()
+
+            pinned_old = DagVersion(
+                dag_id=dag_id,
+                version_number=1,
+                bundle_name=bundle_name,
+                created_at=base_date,
+                last_updated=base_date,
+            )
+            orphan_old = DagVersion(
+                dag_id=dag_id,
+                version_number=2,
+                bundle_name=bundle_name,
+                created_at=base_date.add(minutes=1),
+                last_updated=base_date.add(minutes=1),
+            )
+            latest = DagVersion(
+                dag_id=dag_id,
+                version_number=3,
+                bundle_name=bundle_name,
+                created_at=base_date.add(minutes=2),
+                last_updated=base_date.add(minutes=2),
+            )
+            session.add_all([pinned_old, orphan_old, latest])
+            session.flush()
+
+            dag = DAG(dag_id=dag_id)
+            dag_run = DagRun(dag_id, run_id="run-1", 
run_type=DagRunType.MANUAL, start_date=base_date)
+            ti = create_task_instance(
+                PythonOperator(task_id="dummy-task", python_callable=print),
+                run_id=dag_run.run_id,
+                dag_version_id=pinned_old.id,
+            )
+            ti.dag_id = dag.dag_id
+            ti.start_date = base_date
+            session.add_all([dag_run, ti])
+            session.commit()
+
+            pinned_id, orphan_id, latest_id = pinned_old.id, orphan_old.id, 
latest.id
+
+            # Previously this raised an IntegrityError on the FK; it must now 
succeed.
+            _cleanup_table(
+                **config_dict["dag_version"].__dict__,
+                clean_before_timestamp=base_date.add(days=10),
+                dry_run=False,
+                session=session,
+                table_names=["dag_version"],
+                skip_archive=True,
+            )
+
+            remaining = 
set(session.scalars(select(DagVersion.id).where(DagVersion.dag_id == 
dag_id)).all())
+
+        assert pinned_id in remaining  # still referenced by a task instance 
-> skipped
+        assert latest_id in remaining  # kept by keep_last
+        assert orphan_id not in remaining  # old and unreferenced -> pruned
+
+    def test_table_config_skip_if_referenced_requires_pk_column(self):
+        """A misconfigured skip_if_referenced (pk not in columns) must fail 
fast at construction."""
+        with pytest.raises(ValueError, match="referenced_pk_column"):
+            _TableConfig(
+                table_name="dag_version",
+                recency_column_name="created_at",
+                dag_id_column_name="dag_id",
+                skip_if_referenced=[("task_instance", "dag_version_id")],
+                # "id" intentionally omitted from extra_columns
+            )
+
     @patch("airflow.utils.db.reflect_tables")
     def test_skip_archive_failure_will_remove_table(self, reflect_tables_mock):
         """

Reply via email to