This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 277cfcb6cd Use `NOT EXISTS` subquery instead of 
`tuple_not_in_condition` (#33527)
277cfcb6cd is described below

commit 277cfcb6cd1761386b1607d95e43fc1b1bd100c9
Author: Andrey Anshin <[email protected]>
AuthorDate: Tue Aug 22 01:08:53 2023 +0400

    Use `NOT EXISTS` subquery instead of `tuple_not_in_condition` (#33527)
    
    * Use `NOT EXISTS` subquery instead of `tuple_not_in_condition`
    
    * Remove deprication from `tuple_not_in_condition`
---
 airflow/models/renderedtifields.py | 26 ++++++++++++++++++--------
 airflow/utils/sqlalchemy.py        |  4 +++-
 2 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/airflow/models/renderedtifields.py 
b/airflow/models/renderedtifields.py
index 269af8276a..22bb536d81 100644
--- a/airflow/models/renderedtifields.py
+++ b/airflow/models/renderedtifields.py
@@ -22,7 +22,16 @@ import os
 from typing import TYPE_CHECKING
 
 import sqlalchemy_jsonfield
-from sqlalchemy import Column, ForeignKeyConstraint, Integer, 
PrimaryKeyConstraint, delete, select, text
+from sqlalchemy import (
+    Column,
+    ForeignKeyConstraint,
+    Integer,
+    PrimaryKeyConstraint,
+    delete,
+    exists,
+    select,
+    text,
+)
 from sqlalchemy.ext.associationproxy import association_proxy
 from sqlalchemy.orm import Session, relationship
 
@@ -33,7 +42,6 @@ from airflow.serialization.helpers import 
serialize_template_field
 from airflow.settings import json
 from airflow.utils.retries import retry_db_transaction
 from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.sqlalchemy import tuple_not_in_condition
 
 if TYPE_CHECKING:
     from sqlalchemy.sql import FromClause
@@ -201,11 +209,11 @@ class RenderedTaskInstanceFields(Base):
         :param num_to_keep: Number of Records to keep
         :param session: SqlAlchemy Session
         """
-        from airflow.models.dagrun import DagRun
-
         if num_to_keep <= 0:
             return
 
+        from airflow.models.dagrun import DagRun
+
         tis_to_keep_query = (
             select(cls.dag_id, cls.task_id, cls.run_id, DagRun.execution_date)
             .where(cls.dag_id == dag_id, cls.task_id == task_id)
@@ -234,17 +242,19 @@ class RenderedTaskInstanceFields(Base):
         session: Session,
     ) -> None:
         # This query might deadlock occasionally and it should be retried if 
fails (see decorator)
+
         stmt = (
             delete(cls)
             .where(
                 cls.dag_id == dag_id,
                 cls.task_id == task_id,
-                tuple_not_in_condition(
-                    (cls.dag_id, cls.task_id, cls.run_id),
-                    select(ti_clause.c.dag_id, ti_clause.c.task_id, 
ti_clause.c.run_id),
-                    session=session,
+                ~exists(1).where(
+                    ti_clause.c.dag_id == cls.dag_id,
+                    ti_clause.c.task_id == cls.task_id,
+                    ti_clause.c.run_id == cls.run_id,
                 ),
             )
             .execution_options(synchronize_session=False)
         )
+
         session.execute(stmt)
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index bb2277e4ed..1a1331427f 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -590,7 +590,9 @@ def tuple_not_in_condition(
 
     :meta private:
     """
-    if settings.engine.dialect.name != "mssql":
+    dialect = session.bind.dialect if session else settings.engine.dialect
+
+    if dialect.name != "mssql":
         return tuple_(*columns).not_in(collection)
     if not isinstance(collection, Select):
         rows = collection

Reply via email to