This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 74d0c476224 Refactor db_cleanup.py to use SQLA2 (#60377)
74d0c476224 is described below

commit 74d0c476224b81f01287384362b08ac496e6b92d
Author: Justin Pakzad <[email protected]>
AuthorDate: Thu Jan 15 10:23:15 2026 -0500

    Refactor db_cleanup.py to use SQLA2 (#60377)
---
 .pre-commit-config.yaml                      |  1 +
 airflow-core/src/airflow/utils/db_cleanup.py | 34 ++++++++++++++++------------
 2 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 51ad605648c..5d872584977 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -433,6 +433,7 @@ repos:
             (?x)
             ^airflow-ctl.*\.py$|
             ^airflow-core/src/airflow/models/.*\.py$|
+            ^airflow-core/src/airflow/utils/db_cleanup.py$|
             
^airflow-core/src/airflow/migrations/versions/0015_2_9_0_update_trigger_kwargs_type.py$|
             
^airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py$|
             
^airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py$|
diff --git a/airflow-core/src/airflow/utils/db_cleanup.py 
b/airflow-core/src/airflow/utils/db_cleanup.py
index 8b6ce79a772..025303ce8f2 100644
--- a/airflow-core/src/airflow/utils/db_cleanup.py
+++ b/airflow-core/src/airflow/utils/db_cleanup.py
@@ -47,7 +47,8 @@ from airflow.utils.types import DagRunType
 
 if TYPE_CHECKING:
     from pendulum import DateTime
-    from sqlalchemy.orm import Query, Session
+    from sqlalchemy import Select
+    from sqlalchemy.orm import Session
 
     from airflow.models import Base
 
@@ -183,8 +184,8 @@ if (
 config_dict: dict[str, _TableConfig] = {x.orm_model.name: x for x in 
sorted(config_list)}
 
 
-def _check_for_rows(*, query: Query, print_rows: bool = False) -> int:
-    num_entities = query.count()
+def _check_for_rows(*, session: Session, query: Select, print_rows: bool = 
False) -> int:
+    num_entities = 
session.scalars(select(func.count()).select_from(query.subquery())).one()
     print(f"Found {num_entities} rows meeting deletion criteria.")
     if not print_rows or num_entities == 0:
         return num_entities
@@ -192,7 +193,7 @@ def _check_for_rows(*, query: Query, print_rows: bool = 
False) -> int:
     max_rows_to_print = 100
     print(f"Printing first {max_rows_to_print} rows.")
     logger.debug("print entities query: %s", query)
-    for entry in query.limit(max_rows_to_print):
+    for entry in session.execute(query.limit(max_rows_to_print)):
         print(entry.__dict__)
     return num_entities
 
@@ -213,7 +214,7 @@ def _dump_table_to_file(*, target_table: str, file_path: 
str, export_format: str
 
 
 def _do_delete(
-    *, query: Query, orm_model: Base, skip_archive: bool, session: Session, 
batch_size: int | None
+    *, query: Select, orm_model: Base, skip_archive: bool, session: Session, 
batch_size: int | None
 ) -> None:
     import itertools
     import re
@@ -224,7 +225,9 @@ def _do_delete(
 
     while True:
         limited_query = query.limit(batch_size) if batch_size else query
-        if limited_query.count() == 0:  # nothing left to delete
+        if (
+            
session.scalars(select(func.count()).select_from(limited_query.subquery())).one()
 == 0
+        ):  # nothing left to delete
             break
 
         batch_no = next(batch_counter)
@@ -290,13 +293,17 @@ def _do_delete(
 
 
 def _subquery_keep_last(
-    *, recency_column, keep_last_filters, group_by_columns, max_date_colname, 
session: Session
+    *,
+    recency_column,
+    keep_last_filters,
+    group_by_columns,
+    max_date_colname,
 ):
     subquery = select(*group_by_columns, 
func.max(recency_column).label(max_date_colname))
 
     if keep_last_filters is not None:
         for entry in keep_last_filters:
-            subquery = subquery.filter(entry)
+            subquery = subquery.where(entry)
 
     if group_by_columns is not None:
         subquery = subquery.group_by(*group_by_columns)
@@ -332,10 +339,10 @@ def _build_query(
     dag_ids: list[str] | None = None,
     exclude_dag_ids: list[str] | None = None,
     **kwargs,
-) -> Query:
+) -> Select:
     base_table_alias = "base"
     base_table = aliased(orm_model, name=base_table_alias)
-    query = 
session.query(base_table).with_entities(text(f"{base_table_alias}.*"))
+    query = select(text(f"{base_table_alias}.*")).select_from(base_table)
     base_table_recency_col = base_table.c[recency_column.name]
     conditions = [base_table_recency_col < clean_before_timestamp]
 
@@ -355,9 +362,8 @@ def _build_query(
             keep_last_filters=keep_last_filters,
             group_by_columns=group_by_columns,
             max_date_colname=max_date_col_name,
-            session=session,
         )
-        query = query.select_from(base_table).outerjoin(
+        query = query.outerjoin(
             subquery,
             and_(
                 *[base_table.c[x] == subquery.c[x] for x in 
keep_last_group_by],  # type: ignore[attr-defined]
@@ -365,7 +371,7 @@ def _build_query(
             ),
         )
         conditions.append(column(max_date_col_name).is_(None))
-    query = query.filter(and_(*conditions))
+    query = query.where(and_(*conditions))
     return query
 
 
@@ -404,7 +410,7 @@ def _cleanup_table(
     )
     logger.debug("old rows query:\n%s", query.selectable.compile())
     print(f"Checking table {orm_model.name}")
-    num_rows = _check_for_rows(query=query, print_rows=False)
+    num_rows = _check_for_rows(session=session, query=query, print_rows=False)
 
     if num_rows and not dry_run:
         _do_delete(

Reply via email to