Copilot commented on code in PR #60390:
URL: https://github.com/apache/airflow/pull/60390#discussion_r2692143845
##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -365,6 +366,39 @@ def _build_query(
),
)
conditions.append(column(max_date_col_name).is_(None))
+
+ # Special handling for dag_version: exclude rows referenced by
task_instance rows that are NOT being deleted
+ if table_name == "dag_version":
+ try:
+ from airflow.utils.db import reflect_tables
+
+ # Reflect the task_instance table to get proper column access
+ metadata = reflect_tables(["task_instance", "dag_version"],
session)
+ ti_table_reflected = metadata.tables["task_instance"]
+ dv_table_reflected = metadata.tables["dag_version"]
+ ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+ ti_start_date_col = ti_table_reflected.c.start_date
+ dv_id_col = dv_table_reflected.c.id
+
+ # Find dag_version_ids that are referenced by task_instance rows
that are kept (not deleted)
+ # These are task_instance rows with start_date >=
clean_before_timestamp
+ kept_ti_subquery = (
+ select(ti_dag_version_id_col)
+ .select_from(ti_table_reflected)
+ .where(ti_start_date_col >= clean_before_timestamp)
+ .where(ti_dag_version_id_col.isnot(None))
+ .distinct()
+ )
+
+ # Exclude dag_version rows that are referenced by kept
task_instance rows
+ # Use the reflected table's id column and join it with base_table
+ base_table_id_col = base_table.c[dv_id_col.name]
+ conditions.append(base_table_id_col.not_in(kept_ti_subquery))
Review Comment:
According to the dag_version configuration in config_list (line 167),
dag_version has dependent_tables including both "task_instance" and "dag_run".
While dag_run.created_dag_version_id has ondelete="set null" (which won't cause
FK violations), the current implementation only handles the task_instance
constraint. Consider whether dag_run also needs to be checked, or add a comment
explaining why only task_instance requires special handling (i.e., because
dag_run uses ondelete="set null").
##########
scripts/ci/prek/check_secrets_search_path_sync.py:
##########
@@ -42,9 +42,9 @@ def extract_from_file(file_path: Path, constant_name: str) ->
list[str] | None:
if isinstance(node.value, ast.List):
values = []
for elt in node.value.elts:
- if isinstance(elt, ast.Constant):
+ if isinstance(elt, ast.Constant) and
isinstance(elt.value, str):
values.append(elt.value)
- return values
+ return values if values else None
Review Comment:
This change appears unrelated to the PR's stated purpose of fixing FK
constraint violations in airflow db clean for the dag_version table. While the
changes are valid improvements (filtering non-string constants and returning
None for empty lists), they should ideally be in a separate PR focused on
improving the check_secrets_search_path_sync script.
```suggestion
return values
```
##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -365,6 +366,39 @@ def _build_query(
),
)
conditions.append(column(max_date_col_name).is_(None))
+
+ # Special handling for dag_version: exclude rows referenced by
task_instance rows that are NOT being deleted
+ if table_name == "dag_version":
+ try:
+ from airflow.utils.db import reflect_tables
+
+ # Reflect the task_instance table to get proper column access
+ metadata = reflect_tables(["task_instance", "dag_version"],
session)
+ ti_table_reflected = metadata.tables["task_instance"]
+ dv_table_reflected = metadata.tables["dag_version"]
+ ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+ ti_start_date_col = ti_table_reflected.c.start_date
+ dv_id_col = dv_table_reflected.c.id
+
+ # Find dag_version_ids that are referenced by task_instance rows
that are kept (not deleted)
+ # These are task_instance rows with start_date >=
clean_before_timestamp
+ kept_ti_subquery = (
+ select(ti_dag_version_id_col)
+ .select_from(ti_table_reflected)
+ .where(ti_start_date_col >= clean_before_timestamp)
+ .where(ti_dag_version_id_col.isnot(None))
+ .distinct()
+ )
+
+ # Exclude dag_version rows that are referenced by kept
task_instance rows
+ # Use the reflected table's id column and join it with base_table
+ base_table_id_col = base_table.c[dv_id_col.name]
+ conditions.append(base_table_id_col.not_in(kept_ti_subquery))
+ except Exception:
+ # If we can't add the FK constraint filter, continue without it
+ # This prevents the cleanup from failing, though it may still hit
FK violations
+ pass
+
Review Comment:
The new dag_version foreign key constraint handling logic lacks test
coverage. Consider adding test cases that verify: 1) dag_version rows
referenced by recent task_instance rows are NOT deleted, 2) dag_version rows
not referenced by any task_instance (or only by old task_instance rows) ARE
deleted, and 3) the cleanup doesn't fail with FK violations when task_instance
and dag_version have different recency timestamps.
##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -365,6 +366,39 @@ def _build_query(
),
)
conditions.append(column(max_date_col_name).is_(None))
+
+ # Special handling for dag_version: exclude rows referenced by
task_instance rows that are NOT being deleted
+ if table_name == "dag_version":
+ try:
+ from airflow.utils.db import reflect_tables
+
Review Comment:
The import statement for reflect_tables is redundant since it's already
imported at the top of the file (line 43). The local import inside the try
block should be removed.
```suggestion
```
##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -365,6 +366,39 @@ def _build_query(
),
)
conditions.append(column(max_date_col_name).is_(None))
+
+ # Special handling for dag_version: exclude rows referenced by
task_instance rows that are NOT being deleted
+ if table_name == "dag_version":
+ try:
+ from airflow.utils.db import reflect_tables
+
+ # Reflect the task_instance table to get proper column access
+ metadata = reflect_tables(["task_instance", "dag_version"],
session)
+ ti_table_reflected = metadata.tables["task_instance"]
+ dv_table_reflected = metadata.tables["dag_version"]
+ ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+ ti_start_date_col = ti_table_reflected.c.start_date
+ dv_id_col = dv_table_reflected.c.id
+
+ # Find dag_version_ids that are referenced by task_instance rows
that are kept (not deleted)
+ # These are task_instance rows with start_date >=
clean_before_timestamp
+ kept_ti_subquery = (
+ select(ti_dag_version_id_col)
+ .select_from(ti_table_reflected)
+ .where(ti_start_date_col >= clean_before_timestamp)
+ .where(ti_dag_version_id_col.isnot(None))
+ .distinct()
+ )
+
+ # Exclude dag_version rows that are referenced by kept
task_instance rows
+ # Use the reflected table's id column and join it with base_table
+ base_table_id_col = base_table.c[dv_id_col.name]
+ conditions.append(base_table_id_col.not_in(kept_ti_subquery))
+ except Exception:
Review Comment:
The bare except clause catches all exceptions including system exceptions
like KeyboardInterrupt and SystemExit. This should be narrowed to catch only
the relevant exceptions (e.g., SQLAlchemyError, KeyError) that might occur
during table reflection or column access. The current implementation could
silently swallow critical errors.
```suggestion
except (KeyError, AttributeError, OperationalError,
ProgrammingError):
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]