SameerMesiah97 commented on code in PR #60390:
URL: https://github.com/apache/airflow/pull/60390#discussion_r2694257187


##########
airflow-core/src/airflow/utils/db_cleanup.py:
##########
@@ -365,6 +366,39 @@ def _build_query(
             ),
         )
         conditions.append(column(max_date_col_name).is_(None))
+
+    # Special handling for dag_version: exclude rows referenced by 
task_instance rows that are NOT being deleted
+    if table_name == "dag_version":
+        try:
+            from airflow.utils.db import reflect_tables
+
+            # Reflect the task_instance table to get proper column access
+            metadata = reflect_tables(["task_instance", "dag_version"], 
session)
+            ti_table_reflected = metadata.tables["task_instance"]
+            dv_table_reflected = metadata.tables["dag_version"]
+            ti_dag_version_id_col = ti_table_reflected.c.dag_version_id
+            ti_start_date_col = ti_table_reflected.c.start_date
+            dv_id_col = dv_table_reflected.c.id
+
+            # Find dag_version_ids that are referenced by task_instance rows 
that are kept (not deleted)
+            # These are task_instance rows with start_date >= 
clean_before_timestamp
+            kept_ti_subquery = (
+                select(ti_dag_version_id_col)
+                .select_from(ti_table_reflected)
+                .where(ti_start_date_col >= clean_before_timestamp)
+                .where(ti_dag_version_id_col.isnot(None))
+                .distinct()
+            )
+
+            # Exclude dag_version rows that are referenced by kept 
task_instance rows
+            # Use the reflected table's id column and join it with base_table
+            base_table_id_col = base_table.c[dv_id_col.name]
+            conditions.append(base_table_id_col.not_in(kept_ti_subquery))
+        except Exception:
+            # If we can't add the FK constraint filter, continue without it
+            # This prevents the cleanup from failing, though it may still hit 
FK violations
+            pass

Review Comment:
   > Is there any reason that we can fail here and it is not a problem? What 
cause can it have to fail? I would rather prefer to fail "in general" or "fail 
fast" to fix bugs and not swallowing the exception.
   
   On second thought, I think we should only be swallowing classes of 
exceptions (with a logged warning of course) that we can expect to arise during 
normal operation and that are not indicative of more critical issues like an 
unhealthy DB or logic errors in the code (broad exceptions can mask 
regressions). The latter class of errors do need to fail loudly and stop the 
operation. 
   
   I believe the auto-reviewer’s suggestion below (Except SQLAlchemyError and 
KeyError) is in the right direction. But I think SQLAlchemyError should be 
further narrowed to exclude OperationalError and the like.
   
   I have no issues with the ‘fail fast’ and ‘in general’ approach you are 
suggesting. But your comment made me re-evaluate whether catching all 
exceptions is acceptable. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to