ashb commented on a change in pull request #19808:
URL: https://github.com/apache/airflow/pull/19808#discussion_r756411056



##########
File path: airflow/utils/db.py
##########
@@ -794,16 +744,68 @@ def check_run_id_null(session) -> Iterable[str]:
                 reason="with a NULL dag_id, run_id, or execution_date",
             )
             return
-        _move_dangling_run_data_to_new_table(session, dagrun_table, 
dagrun_dangling_table_name)
+        _move_dangling_data_to_new_table(
+            session,
+            dagrun_table,
+            dagrun_table.select(invalid_dagrun_filter),
+            dagrun_dangling_table_name,
+        )
 
 
-def _move_dangling_task_data_to_new_table(session, source_table: "Table", 
target_table_name: str):
-    where_clause = """
-        left join dag_run as dr
-        on (source.dag_id = dr.dag_id and source.execution_date = 
dr.execution_date)
-        where dr.id is null
-    """
-    _move_dangling_table(session, source_table, target_table_name, 
where_clause)
+def _move_dangling_data_to_new_table(
+    session, source_table: "Table", source_query: "Query", target_table_name: 
str
+):
+    from sqlalchemy import column, select, table
+    from sqlalchemy.sql.selectable import Join

Review comment:
       Probably not strictly required, but this is in the hot import path for 
`airflow` so I want to minimize the global imports in this file




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to