This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 88d895695a68a8a307ee86bd989673a7b6838301 Author: Daniel Standish <[email protected]> AuthorDate: Tue May 3 11:50:23 2022 -0700 Optimize 2.3.0 pre-upgrade check queries (#23458) We have to check for rows that are missing either corresponding TI or DR and move them out of table before adding FKs. We were doing correlation in the JOIN condition but it appears postgres does *not* like this so here we move correlation to WHERE. (cherry picked from commit 8e3d6c30e2409f56f9e4a55fe16769d23f5e3012) --- airflow/utils/db.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index bc4007c279..0e1ada2bbd 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -1092,30 +1092,30 @@ def _task_instance_exists(session, source_table, dag_run, task_instance): """ if 'run_id' not in task_instance.c: # db is < 2.2.0 - source_to_ti_join_cond = and_( + where_clause = and_( source_table.c.dag_id == task_instance.c.dag_id, source_table.c.task_id == task_instance.c.task_id, source_table.c.execution_date == task_instance.c.execution_date, ) ti_to_dr_join_cond = and_( - source_table.c.dag_id == task_instance.c.dag_id, - source_table.c.execution_date == task_instance.c.execution_date, + dag_run.c.dag_id == task_instance.c.dag_id, + dag_run.c.execution_date == task_instance.c.execution_date, ) else: # db is 2.2.0 <= version < 2.3.0 - source_to_ti_join_cond = and_( + where_clause = and_( source_table.c.dag_id == task_instance.c.dag_id, source_table.c.task_id == task_instance.c.task_id, + source_table.c.execution_date == dag_run.c.execution_date, ) ti_to_dr_join_cond = and_( - source_table.c.dag_id == task_instance.c.dag_id, + dag_run.c.dag_id == task_instance.c.dag_id, dag_run.c.run_id == task_instance.c.run_id, - source_table.c.execution_date == dag_run.c.execution_date, ) exists_subquery = ( session.query(text('1')) .select_from(task_instance.join(dag_run, onclause=ti_to_dr_join_cond)) - .filter(source_to_ti_join_cond) + .filter(where_clause) ) return exists_subquery
