SameerMesiah97 commented on code in PR #60390:
URL: https://github.com/apache/airflow/pull/60390#discussion_r2684029069


##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -365,6 +366,39 @@ def _build_query(
             ),
         )
         conditions.append(column(max_date_col_name).is_(None))
+
+    # Special handling for dag_version: exclude rows referenced by 
task_instance rows that are NOT being deleted
+    if table_name == "dag_version":
+        try:
+            from airflow.utils.db import reflect_tables
+
+            # Reflect the task_instance table to get proper column access
+            metadata = reflect_tables(["task_instance", "dag_version"], 
session)
+            ti_table_reflected = metadata.tables["task_instance"]
+            dv_table_reflected = metadata.tables["dag_version"]
+            ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+            ti_start_date_col = ti_table_reflected.c.start_date
+            dv_id_col = dv_table_reflected.c.id
+
+            # Find dag_version_ids that are referenced by task_instance rows 
that are kept (not deleted)
+            # These are task_instance rows with start_date >= 
clean_before_timestamp
+            kept_ti_subquery = (
+                select(ti_dag_version_id_col)
+                .select_from(ti_table_reflected)
+                .where(ti_start_date_col >= clean_before_timestamp)
+                .where(ti_dag_version_id_col.isnot(None))
+                .distinct()
+            )
+
+            # Exclude dag_version rows that are referenced by kept 
task_instance rows
+            # Use the reflected table's id column and join it with base_table
+            base_table_id_col = base_table.c[dv_id_col.name]
+            conditions.append(base_table_id_col.not_in(kept_ti_subquery))

Review Comment:
   I understand you used DISTINCT because multiple task instances can reference 
the same `dag_version_id` and you wanted to de-duplicate them but I believe it 
would be better to use a NOT EXISTS approach here for better performance and 
NULL handling. kept_ti_subquery (you can rename it to kept_tis_exists for more 
clarity) could be instantiated with the EXISTS ORM query which you can then 
negate using '~' when you append it to the conditions (which would be evaluated 
as NOT EXISTS). This would have the same effect as de-depulicating using 
DISTINCT and using NOT IN. 



##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -365,6 +366,39 @@ def _build_query(
             ),
         )
         conditions.append(column(max_date_col_name).is_(None))
+
+    # Special handling for dag_version: exclude rows referenced by 
task_instance rows that are NOT being deleted
+    if table_name == "dag_version":
+        try:
+            from airflow.utils.db import reflect_tables
+
+            # Reflect the task_instance table to get proper column access
+            metadata = reflect_tables(["task_instance", "dag_version"], 
session)
+            ti_table_reflected = metadata.tables["task_instance"]
+            dv_table_reflected = metadata.tables["dag_version"]
+            ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+            ti_start_date_col = ti_table_reflected.c.start_date
+            dv_id_col = dv_table_reflected.c.id
+
+            # Find dag_version_ids that are referenced by task_instance rows 
that are kept (not deleted)
+            # These are task_instance rows with start_date >= 
clean_before_timestamp
+            kept_ti_subquery = (
+                select(ti_dag_version_id_col)
+                .select_from(ti_table_reflected)
+                .where(ti_start_date_col >= clean_before_timestamp)
+                .where(ti_dag_version_id_col.isnot(None))
+                .distinct()
+            )
+
+            # Exclude dag_version rows that are referenced by kept 
task_instance rows
+            # Use the reflected table's id column and join it with base_table
+            base_table_id_col = base_table.c[dv_id_col.name]
+            conditions.append(base_table_id_col.not_in(kept_ti_subquery))
+        except Exception:
+            # If we can't add the FK constraint filter, continue without it
+            # This prevents the cleanup from failing, though it may still hit 
FK violations
+            pass

Review Comment:
   The Exception is being swallowed silently. I would suggest naming it in a 
`logger.warning` message (please see other examples in this module) alongside 
text that clearly informs the user that FK violations may still be encountered. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to