Copilot commented on code in PR #60390:
URL: https://github.com/apache/airflow/pull/60390#discussion_r2692143845


##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -365,6 +366,39 @@ def _build_query(
             ),
         )
         conditions.append(column(max_date_col_name).is_(None))
+
+    # Special handling for dag_version: exclude rows referenced by 
task_instance rows that are NOT being deleted
+    if table_name == "dag_version":
+        try:
+            from airflow.utils.db import reflect_tables
+
+            # Reflect the task_instance table to get proper column access
+            metadata = reflect_tables(["task_instance", "dag_version"], 
session)
+            ti_table_reflected = metadata.tables["task_instance"]
+            dv_table_reflected = metadata.tables["dag_version"]
+            ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+            ti_start_date_col = ti_table_reflected.c.start_date
+            dv_id_col = dv_table_reflected.c.id
+
+            # Find dag_version_ids that are referenced by task_instance rows 
that are kept (not deleted)
+            # These are task_instance rows with start_date >= 
clean_before_timestamp
+            kept_ti_subquery = (
+                select(ti_dag_version_id_col)
+                .select_from(ti_table_reflected)
+                .where(ti_start_date_col >= clean_before_timestamp)
+                .where(ti_dag_version_id_col.isnot(None))
+                .distinct()
+            )
+
+            # Exclude dag_version rows that are referenced by kept 
task_instance rows
+            # Use the reflected table's id column and join it with base_table
+            base_table_id_col = base_table.c[dv_id_col.name]
+            conditions.append(base_table_id_col.not_in(kept_ti_subquery))

Review Comment:
   According to the dag_version configuration in config_list (line 167), 
dag_version has dependent_tables including both "task_instance" and "dag_run". 
While dag_run.created_dag_version_id has ondelete="set null" (which won't cause 
FK violations), the current implementation only handles the task_instance 
constraint. Consider whether dag_run also needs to be checked, or add a comment 
explaining why only task_instance requires special handling (i.e., because 
dag_run uses ondelete="set null").



##########
scripts/ci/prek/check_secrets_search_path_sync.py:
##########
@@ -42,9 +42,9 @@ def extract_from_file(file_path: Path, constant_name: str) -> 
list[str] | None:
                         if isinstance(node.value, ast.List):
                             values = []
                             for elt in node.value.elts:
-                                if isinstance(elt, ast.Constant):
+                                if isinstance(elt, ast.Constant) and 
isinstance(elt.value, str):
                                     values.append(elt.value)
-                            return values
+                            return values if values else None

Review Comment:
   This change appears unrelated to the PR's stated purpose of fixing FK 
constraint violations in airflow db clean for the dag_version table. While the 
changes are valid improvements (filtering non-string constants and returning 
None for empty lists), they should ideally be in a separate PR focused on 
improving the check_secrets_search_path_sync script.
   ```suggestion
                               return values
   ```



##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -365,6 +366,39 @@ def _build_query(
             ),
         )
         conditions.append(column(max_date_col_name).is_(None))
+
+    # Special handling for dag_version: exclude rows referenced by 
task_instance rows that are NOT being deleted
+    if table_name == "dag_version":
+        try:
+            from airflow.utils.db import reflect_tables
+
+            # Reflect the task_instance table to get proper column access
+            metadata = reflect_tables(["task_instance", "dag_version"], 
session)
+            ti_table_reflected = metadata.tables["task_instance"]
+            dv_table_reflected = metadata.tables["dag_version"]
+            ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+            ti_start_date_col = ti_table_reflected.c.start_date
+            dv_id_col = dv_table_reflected.c.id
+
+            # Find dag_version_ids that are referenced by task_instance rows 
that are kept (not deleted)
+            # These are task_instance rows with start_date >= 
clean_before_timestamp
+            kept_ti_subquery = (
+                select(ti_dag_version_id_col)
+                .select_from(ti_table_reflected)
+                .where(ti_start_date_col >= clean_before_timestamp)
+                .where(ti_dag_version_id_col.isnot(None))
+                .distinct()
+            )
+
+            # Exclude dag_version rows that are referenced by kept 
task_instance rows
+            # Use the reflected table's id column and join it with base_table
+            base_table_id_col = base_table.c[dv_id_col.name]
+            conditions.append(base_table_id_col.not_in(kept_ti_subquery))
+        except Exception:
+            # If we can't add the FK constraint filter, continue without it
+            # This prevents the cleanup from failing, though it may still hit 
FK violations
+            pass
+

Review Comment:
   The new dag_version foreign key constraint handling logic lacks test 
coverage. Consider adding test cases that verify: 1) dag_version rows 
referenced by recent task_instance rows are NOT deleted, 2) dag_version rows 
not referenced by any task_instance (or only by old task_instance rows) ARE 
deleted, and 3) the cleanup doesn't fail with FK violations when task_instance 
and dag_version have different recency timestamps.



##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -365,6 +366,39 @@ def _build_query(
             ),
         )
         conditions.append(column(max_date_col_name).is_(None))
+
+    # Special handling for dag_version: exclude rows referenced by 
task_instance rows that are NOT being deleted
+    if table_name == "dag_version":
+        try:
+            from airflow.utils.db import reflect_tables
+

Review Comment:
   The import statement for reflect_tables is redundant since it's already 
imported at the top of the file (line 43). The local import inside the try 
block should be removed.
   ```suggestion
   
   ```



##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -365,6 +366,39 @@ def _build_query(
             ),
         )
         conditions.append(column(max_date_col_name).is_(None))
+
+    # Special handling for dag_version: exclude rows referenced by 
task_instance rows that are NOT being deleted
+    if table_name == "dag_version":
+        try:
+            from airflow.utils.db import reflect_tables
+
+            # Reflect the task_instance table to get proper column access
+            metadata = reflect_tables(["task_instance", "dag_version"], 
session)
+            ti_table_reflected = metadata.tables["task_instance"]
+            dv_table_reflected = metadata.tables["dag_version"]
+            ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+            ti_start_date_col = ti_table_reflected.c.start_date
+            dv_id_col = dv_table_reflected.c.id
+
+            # Find dag_version_ids that are referenced by task_instance rows 
that are kept (not deleted)
+            # These are task_instance rows with start_date >= 
clean_before_timestamp
+            kept_ti_subquery = (
+                select(ti_dag_version_id_col)
+                .select_from(ti_table_reflected)
+                .where(ti_start_date_col >= clean_before_timestamp)
+                .where(ti_dag_version_id_col.isnot(None))
+                .distinct()
+            )
+
+            # Exclude dag_version rows that are referenced by kept 
task_instance rows
+            # Use the reflected table's id column and join it with base_table
+            base_table_id_col = base_table.c[dv_id_col.name]
+            conditions.append(base_table_id_col.not_in(kept_ti_subquery))
+        except Exception:

Review Comment:
   The bare except clause catches all exceptions including system exceptions 
like KeyboardInterrupt and SystemExit. This should be narrowed to catch only 
the relevant exceptions (e.g., SQLAlchemyError, KeyError) that might occur 
during table reflection or column access. The current implementation could 
silently swallow critical errors.
   ```suggestion
           except (KeyError, AttributeError, OperationalError, 
ProgrammingError):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to