Copilot commented on code in PR #60390:
URL: https://github.com/apache/airflow/pull/60390#discussion_r2733030893
##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -371,6 +372,49 @@ def _build_query(
),
)
conditions.append(column(max_date_col_name).is_(None))
+
+ # Special handling for dag_version: exclude rows referenced by
task_instance rows that are NOT being deleted.
+ #
+ # dag_version has two dependent tables (see config_list):
+ # - task_instance.dag_version_id: uses ondelete="RESTRICT" - will BLOCK
deletion if referenced
+ # - dag_run.created_dag_version_id: uses ondelete="SET NULL" -
automatically sets to NULL on delete
+ #
+ # We only need to handle task_instance here because dag_run's FK
constraint won't cause violations
+ # (the database handles it automatically by setting the FK to NULL when
dag_version is deleted).
+ if table_name == "dag_version":
+ try:
+ # Reflect the task_instance table to get proper column access
+ metadata = reflect_tables(["task_instance", "dag_version"],
session)
+ ti_table_reflected = metadata.tables["task_instance"]
+ dv_table_reflected = metadata.tables["dag_version"]
+ ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+ ti_start_date_col = ti_table_reflected.c.start_date
+ dv_id_col = dv_table_reflected.c.id
+
+ # Find dag_version_ids that are referenced by task_instance rows
that are kept (not deleted)
+ # These are task_instance rows with start_date >=
clean_before_timestamp
+ # Use EXISTS for better performance and NULL handling
+ base_table_id_col = base_table.c[dv_id_col.name]
+ kept_tis_exists = exists(
+ select(1)
+ .select_from(ti_table_reflected)
+ .where(ti_dag_version_id_col == base_table_id_col)
+ .where(ti_start_date_col >= clean_before_timestamp)
+ .where(ti_dag_version_id_col.isnot(None))
+ )
Review Comment:
The FK-protection EXISTS only treats task_instance rows with `start_date >=
clean_before_timestamp` as “kept”. But `task_instance.start_date` is nullable,
and rows with `start_date IS NULL` are also kept by the cleanup query (`NULL <
timestamp` is not true). Those NULL-start_date task_instances can still
reference dag_version and will trigger FK violations unless they’re included in
the EXISTS predicate (e.g., treat `start_date IS NULL` as kept, or mirror the
exact task_instance deletion criteria).
##########
airflow-core/tests/unit/utils/test_db_cleanup.py:
##########
@@ -741,6 +741,318 @@ def test_drop_archived_tables(self, mock_input,
confirm_mock, inspect_mock, capl
else:
confirm_mock.assert_not_called()
+ def test_dag_version_cleanup_respects_task_instance_fk_constraint(self):
+ """
+ Verify that dag_version cleanup does not delete dag_version rows that
are
+ referenced by task_instance rows that are being kept (not deleted).
+
+ This tests the FK constraint handling added to prevent:
+ - task_instance.dag_version_id uses ondelete="RESTRICT", so deleting a
+ dag_version that's still referenced by a kept task_instance would
fail.
+
+ The test creates:
+ - 2 dag_versions (old and new)
+ - 2 task_instances: one old (to be deleted) and one new (to be kept)
+ - Both TIs reference different dag_versions
+
+ When cleaning up with a timestamp that keeps the new TI, the
dag_version
+ referenced by the kept TI should NOT be deleted, even if it's old.
+ """
+ base_date = pendulum.DateTime(2022, 1, 1,
tzinfo=pendulum.timezone("UTC"))
+
+ with create_session() as session:
+ bundle_name = "testing"
+ session.add(DagBundleModel(name=bundle_name))
+ session.flush()
+
+ dag_id = "test-dag-fk-constraint"
+ dag = DAG(dag_id=dag_id)
+ dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+ session.add(dm)
+ SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name=bundle_name)
+ dag_version = DagVersion.get_latest_version(dag.dag_id)
+
Review Comment:
This test doesn’t currently exercise the FK-protection behavior: it only
creates a single dag_version (`DagVersion.get_latest_version(...)`), and
`dag_version` cleanup is configured with `keep_last=True`, so the lone (latest)
version won’t be selected for deletion regardless of whether any task_instance
references it. To validate the fix, create at least two dag_versions for the
dag, make the older one eligible by `created_at < clean_before_date` and not
the latest, and have a kept task_instance reference that older version.
##########
airflow-core/tests/unit/utils/test_db_cleanup.py:
##########
@@ -741,6 +741,318 @@ def test_drop_archived_tables(self, mock_input,
confirm_mock, inspect_mock, capl
else:
confirm_mock.assert_not_called()
+ def test_dag_version_cleanup_respects_task_instance_fk_constraint(self):
+ """
+ Verify that dag_version cleanup does not delete dag_version rows that
are
+ referenced by task_instance rows that are being kept (not deleted).
+
+ This tests the FK constraint handling added to prevent:
+ - task_instance.dag_version_id uses ondelete="RESTRICT", so deleting a
+ dag_version that's still referenced by a kept task_instance would
fail.
+
+ The test creates:
+ - 2 dag_versions (old and new)
+ - 2 task_instances: one old (to be deleted) and one new (to be kept)
+ - Both TIs reference different dag_versions
+
+ When cleaning up with a timestamp that keeps the new TI, the
dag_version
+ referenced by the kept TI should NOT be deleted, even if it's old.
+ """
+ base_date = pendulum.DateTime(2022, 1, 1,
tzinfo=pendulum.timezone("UTC"))
+
+ with create_session() as session:
+ bundle_name = "testing"
+ session.add(DagBundleModel(name=bundle_name))
+ session.flush()
+
+ dag_id = "test-dag-fk-constraint"
+ dag = DAG(dag_id=dag_id)
+ dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+ session.add(dm)
+ SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name=bundle_name)
+ dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+ # Create an old dag_run and task_instance (to be deleted)
+ old_start_date = base_date
+ old_dag_run = DagRun(
+ dag.dag_id,
+ run_id="old_run",
+ run_type=DagRunType.MANUAL,
+ start_date=old_start_date,
+ )
+ old_ti = create_task_instance(
+ PythonOperator(task_id="old-task", python_callable=print),
+ run_id=old_dag_run.run_id,
+ dag_version_id=dag_version.id,
+ )
+ old_ti.dag_id = dag.dag_id
+ old_ti.start_date = old_start_date
+ session.add(old_dag_run)
+ session.add(old_ti)
+
+ # Create a new dag_run and task_instance (to be kept)
+ new_start_date = base_date.add(days=10)
+ new_dag_run = DagRun(
+ dag.dag_id,
+ run_id="new_run",
+ run_type=DagRunType.MANUAL,
+ start_date=new_start_date,
+ )
+ new_ti = create_task_instance(
+ PythonOperator(task_id="new-task", python_callable=print),
+ run_id=new_dag_run.run_id,
+ dag_version_id=dag_version.id,
+ )
+ new_ti.dag_id = dag.dag_id
+ new_ti.start_date = new_start_date
+ session.add(new_dag_run)
+ session.add(new_ti)
+ session.commit()
+
+ # Clean before timestamp that would keep the new TI but delete the
old one
+ clean_before_date = base_date.add(days=5)
+
+ # Count dag_versions before cleanup
+ dag_versions_before = session.scalar(
+
select(func.count()).select_from(DagVersion).where(DagVersion.dag_id == dag_id)
+ )
+ assert dag_versions_before == 1
+
+ # Run cleanup on dag_version table
+ # The dag_version is old (created_at < clean_before_date), but
it's referenced
+ # by a kept task_instance (new_ti with start_date >=
clean_before_date)
+ # So it should NOT be deleted due to the FK constraint handling
+ query = _build_query(
+ **config_dict["dag_version"].__dict__,
+ clean_before_timestamp=clean_before_date,
+ session=session,
+ )
+
+ # The query should return 0 rows to delete because the dag_version
+ # is referenced by a kept task_instance
+ rows_to_delete =
session.scalar(select(func.count()).select_from(query.subquery()))
+ assert rows_to_delete == 0, (
+ f"Expected 0 dag_version rows to be marked for deletion, got
{rows_to_delete}. "
+ "The dag_version should be protected because it's referenced
by a kept task_instance."
+ )
+
+ def test_dag_version_cleanup_deletes_unreferenced_rows(self):
+ """
+ Verify that dag_version rows that are NOT referenced by any
task_instance
+ are properly deleted during cleanup.
+
+ This ensures the FK constraint handling doesn't over-protect
dag_version rows
+ that have no task_instance references at all.
+
+ Note: dag_version has keep_last=True, so the most recent dag_version
per dag_id
+ is always kept. We need at least 2 dag_versions for a dag to test
deletion.
+ """
+ base_date = pendulum.DateTime(2022, 1, 1,
tzinfo=pendulum.timezone("UTC"))
+
+ with create_session() as session:
+ bundle_name = "testing"
+ session.add(DagBundleModel(name=bundle_name))
+ session.flush()
+
+ # Create a dag with TWO dag_versions (neither has TI references)
+ # The older one should be deleted, the newer one kept by keep_last
+ dag_id = "test-dag-no-ti"
+ dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+ session.add(dm)
+ session.flush()
+
+ # Create first (old) dag_version using DagVersion.write_dag
directly
+ old_dag_version = DagVersion.write_dag(dag_id=dag_id,
bundle_name=bundle_name, session=session)
+ old_dag_version.created_at = base_date # Old date
+ session.flush()
+
+ # Create second (new) dag_version
+ new_dag_version = DagVersion.write_dag(dag_id=dag_id,
bundle_name=bundle_name, session=session)
+ new_dag_version.created_at = base_date.add(days=10) # New date
+ session.commit()
+
+ # Verify we have 2 dag_versions
+ dag_version_count = session.scalar(
+
select(func.count()).select_from(DagVersion).where(DagVersion.dag_id == dag_id)
+ )
+ assert dag_version_count == 2, f"Expected 2 dag_versions, got
{dag_version_count}"
+
+ # Clean before timestamp that would delete the old dag_version
+ clean_before_date = base_date.add(days=5)
+
+ # Build query for dag_version cleanup
+ query = _build_query(
+ **config_dict["dag_version"].__dict__,
+ clean_before_timestamp=clean_before_date,
+ session=session,
+ )
+
+ # The old unreferenced dag_version should be marked for deletion
+ # (the new one is kept by keep_last)
+ rows_to_delete =
session.scalar(select(func.count()).select_from(query.subquery()))
+ assert rows_to_delete == 1, (
+ f"Expected 1 dag_version row to be marked for deletion, got
{rows_to_delete}. "
+ "The old unreferenced dag_version should be deleted."
+ )
+
+ def test_dag_version_cleanup_deletes_rows_only_referenced_by_old_ti(self):
+ """
+ Verify that dag_version rows that are only referenced by OLD
task_instance
+ rows (that will be deleted) can be properly deleted.
+
+ After task_instance cleanup removes old TIs, the dag_version rows they
+ referenced should become eligible for deletion.
+
+ Note: dag_version has keep_last=True, so we need 2 dag_versions. The
older
+ one (referenced only by old TI) should be deleted, the newer one is
kept.
+ """
+ base_date = pendulum.DateTime(2022, 1, 1,
tzinfo=pendulum.timezone("UTC"))
+
+ with create_session() as session:
+ bundle_name = "testing"
+ session.add(DagBundleModel(name=bundle_name))
+ session.flush()
+
+ dag_id = "test-dag-old-ti-only"
+ dag = DAG(dag_id=dag_id)
+ dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+ session.add(dm)
+ session.flush()
+
+ # Create first (old) dag_version using DagVersion.write_dag
directly
+ old_dag_version = DagVersion.write_dag(dag_id=dag_id,
bundle_name=bundle_name, session=session)
+ old_dag_version.created_at = base_date # Old date
+ session.flush()
+
+ # Create ONLY an old task_instance referencing the OLD dag_version
+ old_start_date = base_date
+ old_dag_run = DagRun(
+ dag.dag_id,
+ run_id="old_run",
+ run_type=DagRunType.MANUAL,
+ start_date=old_start_date,
+ )
+ old_ti = create_task_instance(
+ PythonOperator(task_id="old-task", python_callable=print),
+ run_id=old_dag_run.run_id,
+ dag_version_id=old_dag_version.id,
+ )
+ old_ti.dag_id = dag.dag_id
+ old_ti.start_date = old_start_date
+ session.add(old_dag_run)
+ session.add(old_ti)
+
+ # Create second (new) dag_version - no TI references this one
+ new_dag_version = DagVersion.write_dag(dag_id=dag_id,
bundle_name=bundle_name, session=session)
+ new_dag_version.created_at = base_date.add(days=10) # New date,
kept by keep_last
+ session.commit()
+
+ # Verify we have 2 dag_versions
+ dag_version_count = session.scalar(
+
select(func.count()).select_from(DagVersion).where(DagVersion.dag_id == dag_id)
+ )
+ assert dag_version_count == 2, f"Expected 2 dag_versions, got
{dag_version_count}"
+
+ # Clean before timestamp that would delete the old TI
+ clean_before_date = base_date.add(days=5)
+
+ # Build query for dag_version cleanup
+ query = _build_query(
+ **config_dict["dag_version"].__dict__,
+ clean_before_timestamp=clean_before_date,
+ session=session,
+ )
+
+ # The OLD dag_version should be marked for deletion since:
+ # 1. Its created_at is old (base_date < clean_before_date)
+ # 2. The only task_instance referencing it is old (will also be
deleted)
+ # 3. It's not the latest dag_version (keep_last keeps the new one)
+ rows_to_delete =
session.scalar(select(func.count()).select_from(query.subquery()))
+ assert rows_to_delete == 1, (
+ f"Expected 1 dag_version row to be marked for deletion, got
{rows_to_delete}. "
+ "The dag_version referenced only by old TIs should be deleted."
+ )
+
+ def test_dag_version_cleanup_no_fk_violation_integration(self):
+ """
+ Integration test to verify that the full cleanup process doesn't fail
+ with FK violations when task_instance and dag_version have different
+ recency timestamps.
+
+ This tests the actual run_cleanup flow, not just the query building.
+ """
+ base_date = pendulum.DateTime(2022, 1, 1,
tzinfo=pendulum.timezone("UTC"))
+
+ with create_session() as session:
+ bundle_name = "testing"
+ session.add(DagBundleModel(name=bundle_name))
+ session.flush()
+
+ dag_id = "test-dag-fk-integration"
+ dag = DAG(dag_id=dag_id)
+ dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+ session.add(dm)
+ SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name=bundle_name)
+ dag_version = DagVersion.get_latest_version(dag.dag_id)
+ # Set dag_version created_at to old date (eligible for cleanup by
timestamp)
+ dag_version.created_at = base_date
+
Review Comment:
This integration test also only creates a single dag_version for the dag.
Since dag_version cleanup uses `keep_last=True`, that latest version will be
kept even when `created_at` is old, so the cleanup won’t attempt the delete
that would trigger the FK violation. Consider creating two dag_versions and
making the older one eligible for deletion + referenced by a kept
task_instance, then assert the older one remains and no IntegrityError is
raised.
##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -371,6 +372,49 @@ def _build_query(
),
)
conditions.append(column(max_date_col_name).is_(None))
+
+ # Special handling for dag_version: exclude rows referenced by
task_instance rows that are NOT being deleted.
+ #
+ # dag_version has two dependent tables (see config_list):
+ # - task_instance.dag_version_id: uses ondelete="RESTRICT" - will BLOCK
deletion if referenced
+ # - dag_run.created_dag_version_id: uses ondelete="SET NULL" -
automatically sets to NULL on delete
+ #
+ # We only need to handle task_instance here because dag_run's FK
constraint won't cause violations
+ # (the database handles it automatically by setting the FK to NULL when
dag_version is deleted).
+ if table_name == "dag_version":
+ try:
+ # Reflect the task_instance table to get proper column access
+ metadata = reflect_tables(["task_instance", "dag_version"],
session)
+ ti_table_reflected = metadata.tables["task_instance"]
+ dv_table_reflected = metadata.tables["dag_version"]
+ ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+ ti_start_date_col = ti_table_reflected.c.start_date
+ dv_id_col = dv_table_reflected.c.id
+
+ # Find dag_version_ids that are referenced by task_instance rows
that are kept (not deleted)
+ # These are task_instance rows with start_date >=
clean_before_timestamp
+ # Use EXISTS for better performance and NULL handling
+ base_table_id_col = base_table.c[dv_id_col.name]
+ kept_tis_exists = exists(
+ select(1)
+ .select_from(ti_table_reflected)
+ .where(ti_dag_version_id_col == base_table_id_col)
+ .where(ti_start_date_col >= clean_before_timestamp)
+ .where(ti_dag_version_id_col.isnot(None))
+ )
+
+ # Exclude dag_version rows that are referenced by kept
task_instance rows
+ # Negate EXISTS to get NOT EXISTS behavior
+ conditions.append(~kept_tis_exists)
+ except (KeyError, AttributeError, OperationalError, ProgrammingError)
as e:
+ # If we can't add the FK constraint filter, continue without it
+ # This prevents the cleanup from failing, though it may still hit
FK violations
+ logger.warning(
+ "Failed to add foreign key constraint filter for dag_version
table cleanup: %s. "
+ "Continuing without the filter. Note that FK violations may
still be encountered during cleanup.",
+ type(e).__name__,
Review Comment:
The warning in the exception handler only logs the exception type, which
makes diagnosing real FK-protection failures difficult (and in the current code
path, `KeyError` is likely). Log the exception message and include traceback
(`exc_info=True`) so operators can see why the filter could not be applied.
```suggestion
e,
exc_info=True,
```
##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -371,6 +372,49 @@ def _build_query(
),
)
conditions.append(column(max_date_col_name).is_(None))
+
+ # Special handling for dag_version: exclude rows referenced by
task_instance rows that are NOT being deleted.
+ #
+ # dag_version has two dependent tables (see config_list):
+ # - task_instance.dag_version_id: uses ondelete="RESTRICT" - will BLOCK
deletion if referenced
+ # - dag_run.created_dag_version_id: uses ondelete="SET NULL" -
automatically sets to NULL on delete
+ #
+ # We only need to handle task_instance here because dag_run's FK
constraint won't cause violations
+ # (the database handles it automatically by setting the FK to NULL when
dag_version is deleted).
+ if table_name == "dag_version":
+ try:
+ # Reflect the task_instance table to get proper column access
+ metadata = reflect_tables(["task_instance", "dag_version"],
session)
+ ti_table_reflected = metadata.tables["task_instance"]
+ dv_table_reflected = metadata.tables["dag_version"]
+ ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+ ti_start_date_col = ti_table_reflected.c.start_date
+ dv_id_col = dv_table_reflected.c.id
+
+ # Find dag_version_ids that are referenced by task_instance rows
that are kept (not deleted)
+ # These are task_instance rows with start_date >=
clean_before_timestamp
+ # Use EXISTS for better performance and NULL handling
+ base_table_id_col = base_table.c[dv_id_col.name]
+ kept_tis_exists = exists(
+ select(1)
+ .select_from(ti_table_reflected)
+ .where(ti_dag_version_id_col == base_table_id_col)
+ .where(ti_start_date_col >= clean_before_timestamp)
+ .where(ti_dag_version_id_col.isnot(None))
+ )
+
+ # Exclude dag_version rows that are referenced by kept
task_instance rows
+ # Negate EXISTS to get NOT EXISTS behavior
+ conditions.append(~kept_tis_exists)
+ except (KeyError, AttributeError, OperationalError, ProgrammingError)
as e:
+ # If we can't add the FK constraint filter, continue without it
+ # This prevents the cleanup from failing, though it may still hit
FK violations
Review Comment:
In the dag_version special-case, `base_table_id_col =
base_table.c[dv_id_col.name]` will raise `KeyError` because `_TableConfig` for
`dag_version` does not include the `id` column (it only declares `dag_id` +
`created_at`). Since `KeyError` is caught, the FK-protection filter is silently
skipped, so the fix won’t take effect. Add `id` to the dag_version config’s
`extra_columns` (or build `base_table` from the reflected dag_version table
when `table_name == "dag_version"`) so the correlated EXISTS can reference the
PK column reliably.
```suggestion
base_table_id_col = base_table.c.get(dv_id_col.name)
if base_table_id_col is None:
# If the base_table does not expose the PK column (e.g.
dag_version.id is missing
# from its configuration), we cannot safely build the
FK-protection filter here.
# Log explicitly and continue without adding the EXISTS
condition.
logger.warning(
"dag_version cleanup: base table has no '%s' column; "
"skipping foreign key protection filter against
task_instance.dag_version_id.",
dv_id_col.name,
)
else:
kept_tis_exists = exists(
select(1)
.select_from(ti_table_reflected)
.where(ti_dag_version_id_col == base_table_id_col)
.where(ti_start_date_col >= clean_before_timestamp)
.where(ti_dag_version_id_col.isnot(None))
)
# Exclude dag_version rows that are referenced by kept
task_instance rows
# Negate EXISTS to get NOT EXISTS behavior
conditions.append(~kept_tis_exists)
except (OperationalError, ProgrammingError) as e:
# If we can't add the FK constraint filter due to database/SQL
issues, continue without it.
# This prevents the cleanup from failing, though it may still
hit FK violations.
```
##########
airflow-core/tests/unit/utils/test_db_cleanup.py:
##########
@@ -741,6 +741,318 @@ def test_drop_archived_tables(self, mock_input,
confirm_mock, inspect_mock, capl
else:
confirm_mock.assert_not_called()
+ def test_dag_version_cleanup_respects_task_instance_fk_constraint(self):
+ """
+ Verify that dag_version cleanup does not delete dag_version rows that
are
+ referenced by task_instance rows that are being kept (not deleted).
+
+ This tests the FK constraint handling added to prevent:
+ - task_instance.dag_version_id uses ondelete="RESTRICT", so deleting a
+ dag_version that's still referenced by a kept task_instance would
fail.
+
+ The test creates:
+ - 2 dag_versions (old and new)
+ - 2 task_instances: one old (to be deleted) and one new (to be kept)
+ - Both TIs reference different dag_versions
+
+ When cleaning up with a timestamp that keeps the new TI, the
dag_version
+ referenced by the kept TI should NOT be deleted, even if it's old.
+ """
+ base_date = pendulum.DateTime(2022, 1, 1,
tzinfo=pendulum.timezone("UTC"))
+
+ with create_session() as session:
+ bundle_name = "testing"
+ session.add(DagBundleModel(name=bundle_name))
+ session.flush()
+
+ dag_id = "test-dag-fk-constraint"
+ dag = DAG(dag_id=dag_id)
+ dm = DagModel(dag_id=dag_id, bundle_name=bundle_name)
+ session.add(dm)
+ SerializedDagModel.write_dag(LazyDeserializedDAG.from_dag(dag),
bundle_name=bundle_name)
+ dag_version = DagVersion.get_latest_version(dag.dag_id)
+
+ # Create an old dag_run and task_instance (to be deleted)
+ old_start_date = base_date
+ old_dag_run = DagRun(
+ dag.dag_id,
+ run_id="old_run",
+ run_type=DagRunType.MANUAL,
+ start_date=old_start_date,
+ )
+ old_ti = create_task_instance(
+ PythonOperator(task_id="old-task", python_callable=print),
+ run_id=old_dag_run.run_id,
+ dag_version_id=dag_version.id,
+ )
+ old_ti.dag_id = dag.dag_id
+ old_ti.start_date = old_start_date
+ session.add(old_dag_run)
+ session.add(old_ti)
+
+ # Create a new dag_run and task_instance (to be kept)
+ new_start_date = base_date.add(days=10)
+ new_dag_run = DagRun(
+ dag.dag_id,
+ run_id="new_run",
+ run_type=DagRunType.MANUAL,
+ start_date=new_start_date,
+ )
+ new_ti = create_task_instance(
+ PythonOperator(task_id="new-task", python_callable=print),
+ run_id=new_dag_run.run_id,
+ dag_version_id=dag_version.id,
Review Comment:
The docstring says the two task_instances reference different dag_versions,
but both `create_task_instance(...)` calls pass
`dag_version_id=dag_version.id`. If the intent is to model the FK-violation
scenario, the kept task_instance should reference an *older* dag_version that
would otherwise be deleted (i.e., not the keep_last version).
```suggestion
# Use an older dag_version for the kept task_instance to model
the FK-violation scenario.
older_dag_version = session.scalars(
select(DagVersion)
.where(DagVersion.dag_id == dag_id)
.order_by(DagVersion.version_number)
).first()
new_ti = create_task_instance(
PythonOperator(task_id="new-task", python_callable=print),
run_id=new_dag_run.run_id,
dag_version_id=older_dag_version.id if older_dag_version is
not None else dag_version.id,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]