This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 88d895695a68a8a307ee86bd989673a7b6838301
Author: Daniel Standish <[email protected]>
AuthorDate: Tue May 3 11:50:23 2022 -0700

    Optimize 2.3.0 pre-upgrade check queries (#23458)
    
    We have to check for rows that are missing either corresponding TI or DR 
and move them out of table before adding FKs.  We were doing correlation in the 
JOIN condition but it appears postgres does *not* like this so here we move 
correlation to WHERE.
    
    (cherry picked from commit 8e3d6c30e2409f56f9e4a55fe16769d23f5e3012)
---
 airflow/utils/db.py | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)

diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index bc4007c279..0e1ada2bbd 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -1092,30 +1092,30 @@ def _task_instance_exists(session, source_table, 
dag_run, task_instance):
     """
     if 'run_id' not in task_instance.c:
         # db is < 2.2.0
-        source_to_ti_join_cond = and_(
+        where_clause = and_(
             source_table.c.dag_id == task_instance.c.dag_id,
             source_table.c.task_id == task_instance.c.task_id,
             source_table.c.execution_date == task_instance.c.execution_date,
         )
         ti_to_dr_join_cond = and_(
-            source_table.c.dag_id == task_instance.c.dag_id,
-            source_table.c.execution_date == task_instance.c.execution_date,
+            dag_run.c.dag_id == task_instance.c.dag_id,
+            dag_run.c.execution_date == task_instance.c.execution_date,
         )
     else:
         # db is 2.2.0 <= version < 2.3.0
-        source_to_ti_join_cond = and_(
+        where_clause = and_(
             source_table.c.dag_id == task_instance.c.dag_id,
             source_table.c.task_id == task_instance.c.task_id,
+            source_table.c.execution_date == dag_run.c.execution_date,
         )
         ti_to_dr_join_cond = and_(
-            source_table.c.dag_id == task_instance.c.dag_id,
+            dag_run.c.dag_id == task_instance.c.dag_id,
             dag_run.c.run_id == task_instance.c.run_id,
-            source_table.c.execution_date == dag_run.c.execution_date,
         )
     exists_subquery = (
         session.query(text('1'))
         .select_from(task_instance.join(dag_run, onclause=ti_to_dr_join_cond))
-        .filter(source_to_ti_join_cond)
+        .filter(where_clause)
     )
     return exists_subquery
 

Reply via email to