Copilot commented on code in PR #60390:
URL: https://github.com/apache/airflow/pull/60390#discussion_r3066477523


##########
airflow-core/tests/unit/utils/test_db_cleanup.py:
##########
@@ -741,6 +741,318 @@ def test_drop_archived_tables(self, mock_input, 
confirm_mock, inspect_mock, capl
         else:
             confirm_mock.assert_not_called()
 
+    def test_dag_version_cleanup_respects_task_instance_fk_constraint(self):
+        """
+        Verify that dag_version cleanup does not delete dag_version rows that 
are
+        referenced by task_instance rows that are being kept (not deleted).
+
+        This tests the FK constraint handling added to prevent:
+        - task_instance.dag_version_id uses ondelete="RESTRICT", so deleting a
+          dag_version that's still referenced by a kept task_instance would 
fail.
+
+        The test creates:
+        - 2 dag_versions (old and new)
+        - 2 task_instances: one old (to be deleted) and one new (to be kept)
+        - Both TIs reference different dag_versions
+
+        When cleaning up with a timestamp that keeps the new TI, the 
dag_version
+        referenced by the kept TI should NOT be deleted, even if it's old.
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+
+        with create_session() as session:
+            bundle_name = "testing"
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+
+            dag_id = "test-dag-fk-constraint"
+            dag = DAG(dag_id=dag_id)
+            dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+            session.add(dm)
+            SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name=bundle_name)
+            dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+            # Create an old dag_run and task_instance (to be deleted)
+            old_start_date = base_date
+            old_dag_run = DagRun(
+                dag.dag_id,
+                run_id="old_run",
+                run_type=DagRunType.MANUAL,
+                start_date=old_start_date,
+            )
+            old_ti = create_task_instance(
+                PythonOperator(task_id="old-task", python_callable=print),
+                run_id=old_dag_run.run_id,
+                dag_version_id=dag_version.id,
+            )
+            old_ti.dag_id = dag.dag_id
+            old_ti.start_date = old_start_date
+            session.add(old_dag_run)
+            session.add(old_ti)
+
+            # Create a new dag_run and task_instance (to be kept)
+            new_start_date = base_date.add(days=10)
+            new_dag_run = DagRun(
+                dag.dag_id,
+                run_id="new_run",
+                run_type=DagRunType.MANUAL,
+                start_date=new_start_date,
+            )
+            new_ti = create_task_instance(
+                PythonOperator(task_id="new-task", python_callable=print),
+                run_id=new_dag_run.run_id,
+                dag_version_id=dag_version.id,
+            )
+            new_ti.dag_id = dag.dag_id
+            new_ti.start_date = new_start_date
+            session.add(new_dag_run)
+            session.add(new_ti)
+            session.commit()
+
+            # Clean before timestamp that would keep the new TI but delete the 
old one
+            clean_before_date = base_date.add(days=5)
+
+            # Count dag_versions before cleanup
+            dag_versions_before = session.scalar(
+                
select(func.count()).select_from(DagVersion).where(DagVersion.dag_id == dag_id)
+            )
+            assert dag_versions_before == 1
+
+            # Run cleanup on dag_version table
+            # The dag_version is old (created_at < clean_before_date), but 
it's referenced
+            # by a kept task_instance (new_ti with start_date >= 
clean_before_date)
+            # So it should NOT be deleted due to the FK constraint handling
+            query = _build_query(
+                **config_dict["dag_version"].__dict__,
+                clean_before_timestamp=clean_before_date,
+                session=session,
+            )

Review Comment:
   This test does not currently exercise the reported failure mode. 
`dag_version` cleanup uses `created_at` plus `keep_last=True`; with only a 
single `dag_version` created here (and without forcing its `created_at` to be 
older than `clean_before_date` and also not the latest), the row won’t be 
eligible for deletion even without the new FK-aware filter. To validate the 
fix, create at least two `dag_version` rows for the same `dag_id`, make the 
*older* one eligible for deletion (`created_at < clean_before_date` and not the 
latest), and have a kept `task_instance` reference that older version; then 
assert the older `dag_version` is excluded from deletion.



##########
airflow-core/tests/unit/utils/test_db_cleanup.py:
##########
@@ -741,6 +741,318 @@ def test_drop_archived_tables(self, mock_input, 
confirm_mock, inspect_mock, capl
         else:
             confirm_mock.assert_not_called()
 
+    def test_dag_version_cleanup_respects_task_instance_fk_constraint(self):
+        """
+        Verify that dag_version cleanup does not delete dag_version rows that 
are
+        referenced by task_instance rows that are being kept (not deleted).
+
+        This tests the FK constraint handling added to prevent:
+        - task_instance.dag_version_id uses ondelete="RESTRICT", so deleting a
+          dag_version that's still referenced by a kept task_instance would 
fail.
+
+        The test creates:
+        - 2 dag_versions (old and new)
+        - 2 task_instances: one old (to be deleted) and one new (to be kept)
+        - Both TIs reference different dag_versions
+
+        When cleaning up with a timestamp that keeps the new TI, the 
dag_version
+        referenced by the kept TI should NOT be deleted, even if it's old.
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+
+        with create_session() as session:
+            bundle_name = "testing"
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+
+            dag_id = "test-dag-fk-constraint"
+            dag = DAG(dag_id=dag_id)
+            dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+            session.add(dm)
+            SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name=bundle_name)
+            dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+            # Create an old dag_run and task_instance (to be deleted)
+            old_start_date = base_date
+            old_dag_run = DagRun(
+                dag.dag_id,
+                run_id="old_run",
+                run_type=DagRunType.MANUAL,
+                start_date=old_start_date,
+            )
+            old_ti = create_task_instance(
+                PythonOperator(task_id="old-task", python_callable=print),
+                run_id=old_dag_run.run_id,
+                dag_version_id=dag_version.id,
+            )
+            old_ti.dag_id = dag.dag_id
+            old_ti.start_date = old_start_date
+            session.add(old_dag_run)
+            session.add(old_ti)
+
+            # Create a new dag_run and task_instance (to be kept)
+            new_start_date = base_date.add(days=10)
+            new_dag_run = DagRun(
+                dag.dag_id,
+                run_id="new_run",
+                run_type=DagRunType.MANUAL,
+                start_date=new_start_date,
+            )
+            new_ti = create_task_instance(
+                PythonOperator(task_id="new-task", python_callable=print),
+                run_id=new_dag_run.run_id,
+                dag_version_id=dag_version.id,
+            )
+            new_ti.dag_id = dag.dag_id
+            new_ti.start_date = new_start_date
+            session.add(new_dag_run)
+            session.add(new_ti)
+            session.commit()
+
+            # Clean before timestamp that would keep the new TI but delete the 
old one
+            clean_before_date = base_date.add(days=5)
+
+            # Count dag_versions before cleanup
+            dag_versions_before = session.scalar(
+                
select(func.count()).select_from(DagVersion).where(DagVersion.dag_id == dag_id)
+            )
+            assert dag_versions_before == 1
+
+            # Run cleanup on dag_version table
+            # The dag_version is old (created_at < clean_before_date), but 
it's referenced
+            # by a kept task_instance (new_ti with start_date >= 
clean_before_date)
+            # So it should NOT be deleted due to the FK constraint handling
+            query = _build_query(
+                **config_dict["dag_version"].__dict__,
+                clean_before_timestamp=clean_before_date,
+                session=session,
+            )
+
+            # The query should return 0 rows to delete because the dag_version
+            # is referenced by a kept task_instance
+            rows_to_delete = 
session.scalar(select(func.count()).select_from(query.subquery()))
+            assert rows_to_delete == 0, (
+                f"Expected 0 dag_version rows to be marked for deletion, got 
{rows_to_delete}. "
+                "The dag_version should be protected because it's referenced 
by a kept task_instance."
+            )
+
+    def test_dag_version_cleanup_deletes_unreferenced_rows(self):
+        """
+        Verify that dag_version rows that are NOT referenced by any 
task_instance
+        are properly deleted during cleanup.
+
+        This ensures the FK constraint handling doesn't over-protect 
dag_version rows
+        that have no task_instance references at all.
+
+        Note: dag_version has keep_last=True, so the most recent dag_version 
per dag_id
+        is always kept. We need at least 2 dag_versions for a dag to test 
deletion.
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+
+        with create_session() as session:
+            bundle_name = "testing"
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+
+            # Create a dag with TWO dag_versions (neither has TI references)
+            # The older one should be deleted, the newer one kept by keep_last
+            dag_id = "test-dag-no-ti"
+            dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+            session.add(dm)
+            session.flush()
+
+            # Create first (old) dag_version using DagVersion.write_dag 
directly
+            old_dag_version = DagVersion.write_dag(dag_id=dag_id, 
bundle_name=bundle_name, session=session)
+            old_dag_version.created_at = base_date  # Old date
+            session.flush()
+
+            # Create second (new) dag_version
+            new_dag_version = DagVersion.write_dag(dag_id=dag_id, 
bundle_name=bundle_name, session=session)
+            new_dag_version.created_at = base_date.add(days=10)  # New date
+            session.commit()
+
+            # Verify we have 2 dag_versions
+            dag_version_count = session.scalar(
+                
select(func.count()).select_from(DagVersion).where(DagVersion.dag_id == dag_id)
+            )
+            assert dag_version_count == 2, f"Expected 2 dag_versions, got 
{dag_version_count}"
+
+            # Clean before timestamp that would delete the old dag_version
+            clean_before_date = base_date.add(days=5)
+
+            # Build query for dag_version cleanup
+            query = _build_query(
+                **config_dict["dag_version"].__dict__,
+                clean_before_timestamp=clean_before_date,
+                session=session,
+            )
+
+            # The old unreferenced dag_version should be marked for deletion
+            # (the new one is kept by keep_last)
+            rows_to_delete = 
session.scalar(select(func.count()).select_from(query.subquery()))
+            assert rows_to_delete == 1, (
+                f"Expected 1 dag_version row to be marked for deletion, got 
{rows_to_delete}. "
+                "The old unreferenced dag_version should be deleted."
+            )
+
+    def test_dag_version_cleanup_deletes_rows_only_referenced_by_old_ti(self):
+        """
+        Verify that dag_version rows that are only referenced by OLD 
task_instance
+        rows (that will be deleted) can be properly deleted.
+
+        After task_instance cleanup removes old TIs, the dag_version rows they
+        referenced should become eligible for deletion.
+
+        Note: dag_version has keep_last=True, so we need 2 dag_versions. The 
older
+        one (referenced only by old TI) should be deleted, the newer one is 
kept.
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+
+        with create_session() as session:
+            bundle_name = "testing"
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+
+            dag_id = "test-dag-old-ti-only"
+            dag = DAG(dag_id=dag_id)
+            dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+            session.add(dm)
+            session.flush()
+
+            # Create first (old) dag_version using DagVersion.write_dag 
directly
+            old_dag_version = DagVersion.write_dag(dag_id=dag_id, 
bundle_name=bundle_name, session=session)
+            old_dag_version.created_at = base_date  # Old date
+            session.flush()
+
+            # Create ONLY an old task_instance referencing the OLD dag_version
+            old_start_date = base_date
+            old_dag_run = DagRun(
+                dag.dag_id,
+                run_id="old_run",
+                run_type=DagRunType.MANUAL,
+                start_date=old_start_date,
+            )
+            old_ti = create_task_instance(
+                PythonOperator(task_id="old-task", python_callable=print),
+                run_id=old_dag_run.run_id,
+                dag_version_id=old_dag_version.id,
+            )
+            old_ti.dag_id = dag.dag_id
+            old_ti.start_date = old_start_date
+            session.add(old_dag_run)
+            session.add(old_ti)
+
+            # Create second (new) dag_version - no TI references this one
+            new_dag_version = DagVersion.write_dag(dag_id=dag_id, 
bundle_name=bundle_name, session=session)
+            new_dag_version.created_at = base_date.add(days=10)  # New date, 
kept by keep_last
+            session.commit()
+
+            # Verify we have 2 dag_versions
+            dag_version_count = session.scalar(
+                
select(func.count()).select_from(DagVersion).where(DagVersion.dag_id == dag_id)
+            )
+            assert dag_version_count == 2, f"Expected 2 dag_versions, got 
{dag_version_count}"
+
+            # Clean before timestamp that would delete the old TI
+            clean_before_date = base_date.add(days=5)
+
+            # Build query for dag_version cleanup
+            query = _build_query(
+                **config_dict["dag_version"].__dict__,
+                clean_before_timestamp=clean_before_date,
+                session=session,
+            )
+
+            # The OLD dag_version should be marked for deletion since:
+            # 1. Its created_at is old (base_date < clean_before_date)
+            # 2. The only task_instance referencing it is old (will also be 
deleted)
+            # 3. It's not the latest dag_version (keep_last keeps the new one)
+            rows_to_delete = 
session.scalar(select(func.count()).select_from(query.subquery()))
+            assert rows_to_delete == 1, (
+                f"Expected 1 dag_version row to be marked for deletion, got 
{rows_to_delete}. "
+                "The dag_version referenced only by old TIs should be deleted."
+            )
+
+    def test_dag_version_cleanup_no_fk_violation_integration(self):
+        """
+        Integration test to verify that the full cleanup process doesn't fail
+        with FK violations when task_instance and dag_version have different
+        recency timestamps.
+
+        This tests the actual run_cleanup flow, not just the query building.
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+
+        with create_session() as session:
+            bundle_name = "testing"
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+
+            dag_id = "test-dag-fk-integration"
+            dag = DAG(dag_id=dag_id)
+            dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+            session.add(dm)
+            SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name=bundle_name)
+            dag_version = DagVersion.get_latest_version(dag.dag_id)
+            # Set dag_version created_at to old date (eligible for cleanup by 
timestamp)
+            dag_version.created_at = base_date
+
+            # Create a NEW task_instance that references this OLD dag_version
+            # This is the scenario that would cause FK violation without 
proper handling
+            new_start_date = base_date.add(days=10)
+            dag_run = DagRun(
+                dag.dag_id,
+                run_id="new_run",
+                run_type=DagRunType.MANUAL,
+                start_date=new_start_date,
+            )
+            ti = create_task_instance(
+                PythonOperator(task_id="new-task", python_callable=print),
+                run_id=dag_run.run_id,
+                dag_version_id=dag_version.id,
+            )
+            ti.dag_id = dag.dag_id
+            ti.start_date = new_start_date
+            session.add(dag_run)
+            session.add(ti)
+            session.commit()
+
+            # Clean before timestamp that keeps the new TI but would normally
+            # try to delete the old dag_version (based on created_at)
+            clean_before_date = base_date.add(days=5)
+
+            # Count dag_versions before cleanup
+            dag_versions_before = session.scalar(
+                
select(func.count()).select_from(DagVersion).where(DagVersion.dag_id == dag_id)
+            )
+
+            # Run the full cleanup - this should NOT raise FK violation errors
+            # The dag_version table cleanup should skip this row because it's
+            # referenced by a kept task_instance
+            run_cleanup(
+                clean_before_timestamp=clean_before_date,
+                table_names=["dag_version"],
+                dry_run=False,
+                confirm=False,
+                session=session,
+            )

Review Comment:
   This integration test is unlikely to catch the FK constraint regression 
because it only creates a single `dag_version` for the DAG. With 
`keep_last=True`, the only version is always kept, so 
`run_cleanup(table_names=[\"dag_version\"])` won’t attempt to delete it 
regardless of the FK-aware filtering. To meaningfully validate the fix 
end-to-end, create two `dag_version` rows and ensure the older one would be 
deleted by timestamp/keep_last logic, while a kept `task_instance` still 
references that older version; then assert the cleanup completes and the 
referenced older version remains.



##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -372,6 +373,49 @@ def _build_query(
             ),
         )
         conditions.append(column(max_date_col_name).is_(None))
+
+    # Special handling for dag_version: exclude rows referenced by 
task_instance rows that are NOT being deleted.
+    #
+    # dag_version has two dependent tables (see config_list):
+    #   - task_instance.dag_version_id: uses ondelete="RESTRICT" - will BLOCK 
deletion if referenced
+    #   - dag_run.created_dag_version_id: uses ondelete="SET NULL" - 
automatically sets to NULL on delete
+    #
+    # We only need to handle task_instance here because dag_run's FK 
constraint won't cause violations
+    # (the database handles it automatically by setting the FK to NULL when 
dag_version is deleted).
+    if table_name == "dag_version":
+        try:
+            # Reflect the task_instance table to get proper column access
+            metadata = reflect_tables(["task_instance", "dag_version"], 
session)
+            ti_table_reflected = metadata.tables["task_instance"]
+            dv_table_reflected = metadata.tables["dag_version"]
+            ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+            ti_start_date_col = ti_table_reflected.c.start_date
+            dv_id_col = dv_table_reflected.c.id
+
+            # Find dag_version_ids that are referenced by task_instance rows 
that are kept (not deleted)
+            # These are task_instance rows with start_date >= 
clean_before_timestamp
+            # Use EXISTS for better performance and NULL handling
+            base_table_id_col = base_table.c[dv_id_col.name]
+            kept_tis_exists = exists(
+                select(1)
+                .select_from(ti_table_reflected)
+                .where(ti_dag_version_id_col == base_table_id_col)
+                .where(ti_start_date_col >= clean_before_timestamp)
+                .where(ti_dag_version_id_col.isnot(None))
+            )
+
+            # Exclude dag_version rows that are referenced by kept 
task_instance rows
+            # Negate EXISTS to get NOT EXISTS behavior
+            conditions.append(~kept_tis_exists)
+        except (KeyError, AttributeError, OperationalError, ProgrammingError) 
as e:
+            # If we can't add the FK constraint filter, continue without it
+            # This prevents the cleanup from failing, though it may still hit 
FK violations
+            logger.warning(
+                "Failed to add foreign key constraint filter for dag_version 
table cleanup: %s. "
+                "Continuing without the filter. Note that FK violations may 
still be encountered during cleanup.",
+                type(e).__name__,
+            )

Review Comment:
   Continuing without the FK filter can reintroduce the exact FK violation this 
PR is addressing, causing `db clean` to fail later during delete execution. A 
safer fallback would be to skip `dag_version` deletions in this run (e.g., add 
a condition that yields no rows) when the filter can’t be constructed. Also, 
the warning logs only the exception type; including the exception message 
and/or `exc_info=True` would make failures diagnosable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to