This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 277cfcb6cd Use `NOT EXISTS` subquery instead of
`tuple_not_in_condition` (#33527)
277cfcb6cd is described below
commit 277cfcb6cd1761386b1607d95e43fc1b1bd100c9
Author: Andrey Anshin <[email protected]>
AuthorDate: Tue Aug 22 01:08:53 2023 +0400
Use `NOT EXISTS` subquery instead of `tuple_not_in_condition` (#33527)
* Use `NOT EXISTS` subquery instead of `tuple_not_in_condition`
* Remove deprication from `tuple_not_in_condition`
---
airflow/models/renderedtifields.py | 26 ++++++++++++++++++--------
airflow/utils/sqlalchemy.py | 4 +++-
2 files changed, 21 insertions(+), 9 deletions(-)
diff --git a/airflow/models/renderedtifields.py
b/airflow/models/renderedtifields.py
index 269af8276a..22bb536d81 100644
--- a/airflow/models/renderedtifields.py
+++ b/airflow/models/renderedtifields.py
@@ -22,7 +22,16 @@ import os
from typing import TYPE_CHECKING
import sqlalchemy_jsonfield
-from sqlalchemy import Column, ForeignKeyConstraint, Integer,
PrimaryKeyConstraint, delete, select, text
+from sqlalchemy import (
+ Column,
+ ForeignKeyConstraint,
+ Integer,
+ PrimaryKeyConstraint,
+ delete,
+ exists,
+ select,
+ text,
+)
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.orm import Session, relationship
@@ -33,7 +42,6 @@ from airflow.serialization.helpers import
serialize_template_field
from airflow.settings import json
from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.sqlalchemy import tuple_not_in_condition
if TYPE_CHECKING:
from sqlalchemy.sql import FromClause
@@ -201,11 +209,11 @@ class RenderedTaskInstanceFields(Base):
:param num_to_keep: Number of Records to keep
:param session: SqlAlchemy Session
"""
- from airflow.models.dagrun import DagRun
-
if num_to_keep <= 0:
return
+ from airflow.models.dagrun import DagRun
+
tis_to_keep_query = (
select(cls.dag_id, cls.task_id, cls.run_id, DagRun.execution_date)
.where(cls.dag_id == dag_id, cls.task_id == task_id)
@@ -234,17 +242,19 @@ class RenderedTaskInstanceFields(Base):
session: Session,
) -> None:
# This query might deadlock occasionally and it should be retried if
fails (see decorator)
+
stmt = (
delete(cls)
.where(
cls.dag_id == dag_id,
cls.task_id == task_id,
- tuple_not_in_condition(
- (cls.dag_id, cls.task_id, cls.run_id),
- select(ti_clause.c.dag_id, ti_clause.c.task_id,
ti_clause.c.run_id),
- session=session,
+ ~exists(1).where(
+ ti_clause.c.dag_id == cls.dag_id,
+ ti_clause.c.task_id == cls.task_id,
+ ti_clause.c.run_id == cls.run_id,
),
)
.execution_options(synchronize_session=False)
)
+
session.execute(stmt)
diff --git a/airflow/utils/sqlalchemy.py b/airflow/utils/sqlalchemy.py
index bb2277e4ed..1a1331427f 100644
--- a/airflow/utils/sqlalchemy.py
+++ b/airflow/utils/sqlalchemy.py
@@ -590,7 +590,9 @@ def tuple_not_in_condition(
:meta private:
"""
- if settings.engine.dialect.name != "mssql":
+ dialect = session.bind.dialect if session else settings.engine.dialect
+
+ if dialect.name != "mssql":
return tuple_(*columns).not_in(collection)
if not isinstance(collection, Select):
rows = collection