Copilot commented on code in PR #60390:
URL: https://github.com/apache/airflow/pull/60390#discussion_r2733030893


##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -371,6 +372,49 @@ def _build_query(
             ),
         )
         conditions.append(column(max_date_col_name).is_(None))
+
+    # Special handling for dag_version: exclude rows referenced by 
task_instance rows that are NOT being deleted.
+    #
+    # dag_version has two dependent tables (see config_list):
+    #   - task_instance.dag_version_id: uses ondelete="RESTRICT" - will BLOCK 
deletion if referenced
+    #   - dag_run.created_dag_version_id: uses ondelete="SET NULL" - 
automatically sets to NULL on delete
+    #
+    # We only need to handle task_instance here because dag_run's FK 
constraint won't cause violations
+    # (the database handles it automatically by setting the FK to NULL when 
dag_version is deleted).
+    if table_name == "dag_version":
+        try:
+            # Reflect the task_instance table to get proper column access
+            metadata = reflect_tables(["task_instance", "dag_version"], 
session)
+            ti_table_reflected = metadata.tables["task_instance"]
+            dv_table_reflected = metadata.tables["dag_version"]
+            ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+            ti_start_date_col = ti_table_reflected.c.start_date
+            dv_id_col = dv_table_reflected.c.id
+
+            # Find dag_version_ids that are referenced by task_instance rows 
that are kept (not deleted)
+            # These are task_instance rows with start_date >= 
clean_before_timestamp
+            # Use EXISTS for better performance and NULL handling
+            base_table_id_col = base_table.c[dv_id_col.name]
+            kept_tis_exists = exists(
+                select(1)
+                .select_from(ti_table_reflected)
+                .where(ti_dag_version_id_col == base_table_id_col)
+                .where(ti_start_date_col >= clean_before_timestamp)
+                .where(ti_dag_version_id_col.isnot(None))
+            )

Review Comment:
   The FK-protection EXISTS only treats task_instance rows with `start_date >= 
clean_before_timestamp` as “kept”. But `task_instance.start_date` is nullable, 
and rows with `start_date IS NULL` are also kept by the cleanup query (`NULL < 
timestamp` is not true). Those NULL-start_date task_instances can still 
reference dag_version and will trigger FK violations unless they’re included in 
the EXISTS predicate (e.g., treat `start_date IS NULL` as kept, or mirror the 
exact task_instance deletion criteria).



##########
airflow-core/tests/unit/utils/test_db_cleanup.py:
##########
@@ -741,6 +741,318 @@ def test_drop_archived_tables(self, mock_input, 
confirm_mock, inspect_mock, capl
         else:
             confirm_mock.assert_not_called()
 
+    def test_dag_version_cleanup_respects_task_instance_fk_constraint(self):
+        """
+        Verify that dag_version cleanup does not delete dag_version rows that 
are
+        referenced by task_instance rows that are being kept (not deleted).
+
+        This tests the FK constraint handling added to prevent:
+        - task_instance.dag_version_id uses ondelete="RESTRICT", so deleting a
+          dag_version that's still referenced by a kept task_instance would 
fail.
+
+        The test creates:
+        - 2 dag_versions (old and new)
+        - 2 task_instances: one old (to be deleted) and one new (to be kept)
+        - Both TIs reference different dag_versions
+
+        When cleaning up with a timestamp that keeps the new TI, the 
dag_version
+        referenced by the kept TI should NOT be deleted, even if it's old.
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+
+        with create_session() as session:
+            bundle_name = "testing"
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+
+            dag_id = "test-dag-fk-constraint"
+            dag = DAG(dag_id=dag_id)
+            dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+            session.add(dm)
+            SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name=bundle_name)
+            dag_version = DagVersion.get_latest_version(dag.dag_id)
+

Review Comment:
   This test doesn’t currently exercise the FK-protection behavior: it only 
creates a single dag_version (`DagVersion.get_latest_version(...)`), and 
`dag_version` cleanup is configured with `keep_last=True`, so the lone (latest) 
version won’t be selected for deletion regardless of whether any task_instance 
references it. To validate the fix, create at least two dag_versions for the 
dag, make the older one eligible by `created_at < clean_before_date` and not 
the latest, and have a kept task_instance reference that older version.



##########
airflow-core/tests/unit/utils/test_db_cleanup.py:
##########
@@ -741,6 +741,318 @@ def test_drop_archived_tables(self, mock_input, 
confirm_mock, inspect_mock, capl
         else:
             confirm_mock.assert_not_called()
 
+    def test_dag_version_cleanup_respects_task_instance_fk_constraint(self):
+        """
+        Verify that dag_version cleanup does not delete dag_version rows that 
are
+        referenced by task_instance rows that are being kept (not deleted).
+
+        This tests the FK constraint handling added to prevent:
+        - task_instance.dag_version_id uses ondelete="RESTRICT", so deleting a
+          dag_version that's still referenced by a kept task_instance would 
fail.
+
+        The test creates:
+        - 2 dag_versions (old and new)
+        - 2 task_instances: one old (to be deleted) and one new (to be kept)
+        - Both TIs reference different dag_versions
+
+        When cleaning up with a timestamp that keeps the new TI, the 
dag_version
+        referenced by the kept TI should NOT be deleted, even if it's old.
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+
+        with create_session() as session:
+            bundle_name = "testing"
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+
+            dag_id = "test-dag-fk-constraint"
+            dag = DAG(dag_id=dag_id)
+            dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+            session.add(dm)
+            SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name=bundle_name)
+            dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+            # Create an old dag_run and task_instance (to be deleted)
+            old_start_date = base_date
+            old_dag_run = DagRun(
+                dag.dag_id,
+                run_id="old_run",
+                run_type=DagRunType.MANUAL,
+                start_date=old_start_date,
+            )
+            old_ti = create_task_instance(
+                PythonOperator(task_id="old-task", python_callable=print),
+                run_id=old_dag_run.run_id,
+                dag_version_id=dag_version.id,
+            )
+            old_ti.dag_id = dag.dag_id
+            old_ti.start_date = old_start_date
+            session.add(old_dag_run)
+            session.add(old_ti)
+
+            # Create a new dag_run and task_instance (to be kept)
+            new_start_date = base_date.add(days=10)
+            new_dag_run = DagRun(
+                dag.dag_id,
+                run_id="new_run",
+                run_type=DagRunType.MANUAL,
+                start_date=new_start_date,
+            )
+            new_ti = create_task_instance(
+                PythonOperator(task_id="new-task", python_callable=print),
+                run_id=new_dag_run.run_id,
+                dag_version_id=dag_version.id,
+            )
+            new_ti.dag_id = dag.dag_id
+            new_ti.start_date = new_start_date
+            session.add(new_dag_run)
+            session.add(new_ti)
+            session.commit()
+
+            # Clean before timestamp that would keep the new TI but delete the 
old one
+            clean_before_date = base_date.add(days=5)
+
+            # Count dag_versions before cleanup
+            dag_versions_before = session.scalar(
+                
select(func.count()).select_from(DagVersion).where(DagVersion.dag_id == dag_id)
+            )
+            assert dag_versions_before == 1
+
+            # Run cleanup on dag_version table
+            # The dag_version is old (created_at < clean_before_date), but 
it's referenced
+            # by a kept task_instance (new_ti with start_date >= 
clean_before_date)
+            # So it should NOT be deleted due to the FK constraint handling
+            query = _build_query(
+                **config_dict["dag_version"].__dict__,
+                clean_before_timestamp=clean_before_date,
+                session=session,
+            )
+
+            # The query should return 0 rows to delete because the dag_version
+            # is referenced by a kept task_instance
+            rows_to_delete = 
session.scalar(select(func.count()).select_from(query.subquery()))
+            assert rows_to_delete == 0, (
+                f"Expected 0 dag_version rows to be marked for deletion, got 
{rows_to_delete}. "
+                "The dag_version should be protected because it's referenced 
by a kept task_instance."
+            )
+
+    def test_dag_version_cleanup_deletes_unreferenced_rows(self):
+        """
+        Verify that dag_version rows that are NOT referenced by any 
task_instance
+        are properly deleted during cleanup.
+
+        This ensures the FK constraint handling doesn't over-protect 
dag_version rows
+        that have no task_instance references at all.
+
+        Note: dag_version has keep_last=True, so the most recent dag_version 
per dag_id
+        is always kept. We need at least 2 dag_versions for a dag to test 
deletion.
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+
+        with create_session() as session:
+            bundle_name = "testing"
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+
+            # Create a dag with TWO dag_versions (neither has TI references)
+            # The older one should be deleted, the newer one kept by keep_last
+            dag_id = "test-dag-no-ti"
+            dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+            session.add(dm)
+            session.flush()
+
+            # Create first (old) dag_version using DagVersion.write_dag 
directly
+            old_dag_version = DagVersion.write_dag(dag_id=dag_id, 
bundle_name=bundle_name, session=session)
+            old_dag_version.created_at = base_date  # Old date
+            session.flush()
+
+            # Create second (new) dag_version
+            new_dag_version = DagVersion.write_dag(dag_id=dag_id, 
bundle_name=bundle_name, session=session)
+            new_dag_version.created_at = base_date.add(days=10)  # New date
+            session.commit()
+
+            # Verify we have 2 dag_versions
+            dag_version_count = session.scalar(
+                
select(func.count()).select_from(DagVersion).where(DagVersion.dag_id == dag_id)
+            )
+            assert dag_version_count == 2, f"Expected 2 dag_versions, got 
{dag_version_count}"
+
+            # Clean before timestamp that would delete the old dag_version
+            clean_before_date = base_date.add(days=5)
+
+            # Build query for dag_version cleanup
+            query = _build_query(
+                **config_dict["dag_version"].__dict__,
+                clean_before_timestamp=clean_before_date,
+                session=session,
+            )
+
+            # The old unreferenced dag_version should be marked for deletion
+            # (the new one is kept by keep_last)
+            rows_to_delete = 
session.scalar(select(func.count()).select_from(query.subquery()))
+            assert rows_to_delete == 1, (
+                f"Expected 1 dag_version row to be marked for deletion, got 
{rows_to_delete}. "
+                "The old unreferenced dag_version should be deleted."
+            )
+
+    def test_dag_version_cleanup_deletes_rows_only_referenced_by_old_ti(self):
+        """
+        Verify that dag_version rows that are only referenced by OLD 
task_instance
+        rows (that will be deleted) can be properly deleted.
+
+        After task_instance cleanup removes old TIs, the dag_version rows they
+        referenced should become eligible for deletion.
+
+        Note: dag_version has keep_last=True, so we need 2 dag_versions. The 
older
+        one (referenced only by old TI) should be deleted, the newer one is 
kept.
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+
+        with create_session() as session:
+            bundle_name = "testing"
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+
+            dag_id = "test-dag-old-ti-only"
+            dag = DAG(dag_id=dag_id)
+            dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+            session.add(dm)
+            session.flush()
+
+            # Create first (old) dag_version using DagVersion.write_dag 
directly
+            old_dag_version = DagVersion.write_dag(dag_id=dag_id, 
bundle_name=bundle_name, session=session)
+            old_dag_version.created_at = base_date  # Old date
+            session.flush()
+
+            # Create ONLY an old task_instance referencing the OLD dag_version
+            old_start_date = base_date
+            old_dag_run = DagRun(
+                dag.dag_id,
+                run_id="old_run",
+                run_type=DagRunType.MANUAL,
+                start_date=old_start_date,
+            )
+            old_ti = create_task_instance(
+                PythonOperator(task_id="old-task", python_callable=print),
+                run_id=old_dag_run.run_id,
+                dag_version_id=old_dag_version.id,
+            )
+            old_ti.dag_id = dag.dag_id
+            old_ti.start_date = old_start_date
+            session.add(old_dag_run)
+            session.add(old_ti)
+
+            # Create second (new) dag_version - no TI references this one
+            new_dag_version = DagVersion.write_dag(dag_id=dag_id, 
bundle_name=bundle_name, session=session)
+            new_dag_version.created_at = base_date.add(days=10)  # New date, 
kept by keep_last
+            session.commit()
+
+            # Verify we have 2 dag_versions
+            dag_version_count = session.scalar(
+                
select(func.count()).select_from(DagVersion).where(DagVersion.dag_id == dag_id)
+            )
+            assert dag_version_count == 2, f"Expected 2 dag_versions, got 
{dag_version_count}"
+
+            # Clean before timestamp that would delete the old TI
+            clean_before_date = base_date.add(days=5)
+
+            # Build query for dag_version cleanup
+            query = _build_query(
+                **config_dict["dag_version"].__dict__,
+                clean_before_timestamp=clean_before_date,
+                session=session,
+            )
+
+            # The OLD dag_version should be marked for deletion since:
+            # 1. Its created_at is old (base_date < clean_before_date)
+            # 2. The only task_instance referencing it is old (will also be 
deleted)
+            # 3. It's not the latest dag_version (keep_last keeps the new one)
+            rows_to_delete = 
session.scalar(select(func.count()).select_from(query.subquery()))
+            assert rows_to_delete == 1, (
+                f"Expected 1 dag_version row to be marked for deletion, got 
{rows_to_delete}. "
+                "The dag_version referenced only by old TIs should be deleted."
+            )
+
+    def test_dag_version_cleanup_no_fk_violation_integration(self):
+        """
+        Integration test to verify that the full cleanup process doesn't fail
+        with FK violations when task_instance and dag_version have different
+        recency timestamps.
+
+        This tests the actual run_cleanup flow, not just the query building.
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+
+        with create_session() as session:
+            bundle_name = "testing"
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+
+            dag_id = "test-dag-fk-integration"
+            dag = DAG(dag_id=dag_id)
+            dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+            session.add(dm)
+            SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name=bundle_name)
+            dag_version = DagVersion.get_latest_version(dag.dag_id)
+            # Set dag_version created_at to old date (eligible for cleanup by 
timestamp)
+            dag_version.created_at = base_date
+

Review Comment:
   This integration test also only creates a single dag_version for the dag. 
Since dag_version cleanup uses `keep_last=True`, that latest version will be 
kept even when `created_at` is old, so the cleanup won’t attempt the delete 
that would trigger the FK violation. Consider creating two dag_versions and 
making the older one eligible for deletion + referenced by a kept 
task_instance, then assert the older one remains and no IntegrityError is 
raised.



##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -371,6 +372,49 @@ def _build_query(
             ),
         )
         conditions.append(column(max_date_col_name).is_(None))
+
+    # Special handling for dag_version: exclude rows referenced by 
task_instance rows that are NOT being deleted.
+    #
+    # dag_version has two dependent tables (see config_list):
+    #   - task_instance.dag_version_id: uses ondelete="RESTRICT" - will BLOCK 
deletion if referenced
+    #   - dag_run.created_dag_version_id: uses ondelete="SET NULL" - 
automatically sets to NULL on delete
+    #
+    # We only need to handle task_instance here because dag_run's FK 
constraint won't cause violations
+    # (the database handles it automatically by setting the FK to NULL when 
dag_version is deleted).
+    if table_name == "dag_version":
+        try:
+            # Reflect the task_instance table to get proper column access
+            metadata = reflect_tables(["task_instance", "dag_version"], 
session)
+            ti_table_reflected = metadata.tables["task_instance"]
+            dv_table_reflected = metadata.tables["dag_version"]
+            ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+            ti_start_date_col = ti_table_reflected.c.start_date
+            dv_id_col = dv_table_reflected.c.id
+
+            # Find dag_version_ids that are referenced by task_instance rows 
that are kept (not deleted)
+            # These are task_instance rows with start_date >= 
clean_before_timestamp
+            # Use EXISTS for better performance and NULL handling
+            base_table_id_col = base_table.c[dv_id_col.name]
+            kept_tis_exists = exists(
+                select(1)
+                .select_from(ti_table_reflected)
+                .where(ti_dag_version_id_col == base_table_id_col)
+                .where(ti_start_date_col >= clean_before_timestamp)
+                .where(ti_dag_version_id_col.isnot(None))
+            )
+
+            # Exclude dag_version rows that are referenced by kept 
task_instance rows
+            # Negate EXISTS to get NOT EXISTS behavior
+            conditions.append(~kept_tis_exists)
+        except (KeyError, AttributeError, OperationalError, ProgrammingError) 
as e:
+            # If we can't add the FK constraint filter, continue without it
+            # This prevents the cleanup from failing, though it may still hit 
FK violations
+            logger.warning(
+                "Failed to add foreign key constraint filter for dag_version 
table cleanup: %s. "
+                "Continuing without the filter. Note that FK violations may 
still be encountered during cleanup.",
+                type(e).__name__,

Review Comment:
   The warning in the exception handler only logs the exception type, which 
makes diagnosing real FK-protection failures difficult (and in the current code 
path, `KeyError` is likely). Log the exception message and include traceback 
(`exc_info=True`) so operators can see why the filter could not be applied.
   ```suggestion
                   e,
                   exc_info=True,
   ```



##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -371,6 +372,49 @@ def _build_query(
             ),
         )
         conditions.append(column(max_date_col_name).is_(None))
+
+    # Special handling for dag_version: exclude rows referenced by 
task_instance rows that are NOT being deleted.
+    #
+    # dag_version has two dependent tables (see config_list):
+    #   - task_instance.dag_version_id: uses ondelete="RESTRICT" - will BLOCK 
deletion if referenced
+    #   - dag_run.created_dag_version_id: uses ondelete="SET NULL" - 
automatically sets to NULL on delete
+    #
+    # We only need to handle task_instance here because dag_run's FK 
constraint won't cause violations
+    # (the database handles it automatically by setting the FK to NULL when 
dag_version is deleted).
+    if table_name == "dag_version":
+        try:
+            # Reflect the task_instance table to get proper column access
+            metadata = reflect_tables(["task_instance", "dag_version"], 
session)
+            ti_table_reflected = metadata.tables["task_instance"]
+            dv_table_reflected = metadata.tables["dag_version"]
+            ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+            ti_start_date_col = ti_table_reflected.c.start_date
+            dv_id_col = dv_table_reflected.c.id
+
+            # Find dag_version_ids that are referenced by task_instance rows 
that are kept (not deleted)
+            # These are task_instance rows with start_date >= 
clean_before_timestamp
+            # Use EXISTS for better performance and NULL handling
+            base_table_id_col = base_table.c[dv_id_col.name]
+            kept_tis_exists = exists(
+                select(1)
+                .select_from(ti_table_reflected)
+                .where(ti_dag_version_id_col == base_table_id_col)
+                .where(ti_start_date_col >= clean_before_timestamp)
+                .where(ti_dag_version_id_col.isnot(None))
+            )
+
+            # Exclude dag_version rows that are referenced by kept 
task_instance rows
+            # Negate EXISTS to get NOT EXISTS behavior
+            conditions.append(~kept_tis_exists)
+        except (KeyError, AttributeError, OperationalError, ProgrammingError) 
as e:
+            # If we can't add the FK constraint filter, continue without it
+            # This prevents the cleanup from failing, though it may still hit 
FK violations

Review Comment:
   In the dag_version special-case, `base_table_id_col = 
base_table.c[dv_id_col.name]` will raise `KeyError` because `_TableConfig` for 
`dag_version` does not include the `id` column (it only declares `dag_id` + 
`created_at`). Since `KeyError` is caught, the FK-protection filter is silently 
skipped, so the fix won’t take effect. Add `id` to the dag_version config’s 
`extra_columns` (or build `base_table` from the reflected dag_version table 
when `table_name == "dag_version"`) so the correlated EXISTS can reference the 
PK column reliably.
   ```suggestion
               base_table_id_col = base_table.c.get(dv_id_col.name)
               if base_table_id_col is None:
                   # If the base_table does not expose the PK column (e.g. 
dag_version.id is missing
                   # from its configuration), we cannot safely build the 
FK-protection filter here.
                   # Log explicitly and continue without adding the EXISTS 
condition.
                   logger.warning(
                       "dag_version cleanup: base table has no '%s' column; "
                       "skipping foreign key protection filter against 
task_instance.dag_version_id.",
                       dv_id_col.name,
                   )
               else:
                   kept_tis_exists = exists(
                       select(1)
                       .select_from(ti_table_reflected)
                       .where(ti_dag_version_id_col == base_table_id_col)
                       .where(ti_start_date_col >= clean_before_timestamp)
                       .where(ti_dag_version_id_col.isnot(None))
                   )
   
                   # Exclude dag_version rows that are referenced by kept 
task_instance rows
                   # Negate EXISTS to get NOT EXISTS behavior
                   conditions.append(~kept_tis_exists)
           except (OperationalError, ProgrammingError) as e:
               # If we can't add the FK constraint filter due to database/SQL 
issues, continue without it.
               # This prevents the cleanup from failing, though it may still 
hit FK violations.
   ```



##########
airflow-core/tests/unit/utils/test_db_cleanup.py:
##########
@@ -741,6 +741,318 @@ def test_drop_archived_tables(self, mock_input, 
confirm_mock, inspect_mock, capl
         else:
             confirm_mock.assert_not_called()
 
+    def test_dag_version_cleanup_respects_task_instance_fk_constraint(self):
+        """
+        Verify that dag_version cleanup does not delete dag_version rows that 
are
+        referenced by task_instance rows that are being kept (not deleted).
+
+        This tests the FK constraint handling added to prevent:
+        - task_instance.dag_version_id uses ondelete="RESTRICT", so deleting a
+          dag_version that's still referenced by a kept task_instance would 
fail.
+
+        The test creates:
+        - 2 dag_versions (old and new)
+        - 2 task_instances: one old (to be deleted) and one new (to be kept)
+        - Both TIs reference different dag_versions
+
+        When cleaning up with a timestamp that keeps the new TI, the 
dag_version
+        referenced by the kept TI should NOT be deleted, even if it's old.
+        """
+        base_date = pendulum.DateTime(2022, 1, 1, 
tzinfo=pendulum.timezone("UTC"))
+
+        with create_session() as session:
+            bundle_name = "testing"
+            session.add(DagBundleModel(name=bundle_name))
+            session.flush()
+
+            dag_id = "test-dag-fk-constraint"
+            dag = DAG(dag_id=dag_id)
+            dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+            session.add(dm)
+            SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag), 
bundle_name=bundle_name)
+            dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+            # Create an old dag_run and task_instance (to be deleted)
+            old_start_date = base_date
+            old_dag_run = DagRun(
+                dag.dag_id,
+                run_id="old_run",
+                run_type=DagRunType.MANUAL,
+                start_date=old_start_date,
+            )
+            old_ti = create_task_instance(
+                PythonOperator(task_id="old-task", python_callable=print),
+                run_id=old_dag_run.run_id,
+                dag_version_id=dag_version.id,
+            )
+            old_ti.dag_id = dag.dag_id
+            old_ti.start_date = old_start_date
+            session.add(old_dag_run)
+            session.add(old_ti)
+
+            # Create a new dag_run and task_instance (to be kept)
+            new_start_date = base_date.add(days=10)
+            new_dag_run = DagRun(
+                dag.dag_id,
+                run_id="new_run",
+                run_type=DagRunType.MANUAL,
+                start_date=new_start_date,
+            )
+            new_ti = create_task_instance(
+                PythonOperator(task_id="new-task", python_callable=print),
+                run_id=new_dag_run.run_id,
+                dag_version_id=dag_version.id,

Review Comment:
   The docstring says the two task_instances reference different dag_versions, 
but both `create_task_instance(...)` calls pass 
`dag_version_id=dag_version.id`. If the intent is to model the FK-violation 
scenario, the kept task_instance should reference an *older* dag_version that 
would otherwise be deleted (i.e., not the keep_last version).
   ```suggestion
               # Use an older dag_version for the kept task_instance to model 
the FK-violation scenario.
               older_dag_version = session.scalars(
                   select(DagVersion)
                   .where(DagVersion.dag_id == dag_id)
                   .order_by(DagVersion.version_number)
               ).first()
               new_ti = create_task_instance(
                   PythonOperator(task_id="new-task", python_callable=print),
                   run_id=new_dag_run.run_id,
                   dag_version_id=older_dag_version.id if older_dag_version is 
not None else dag_version.id,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to