This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new f402617f907 [v3-1-test] Sqlalchemy 2.0 changes (test_types.py, 
test_manager.py, test_processor.py) (#59748) (#59784)
f402617f907 is described below

commit f402617f907a82ae1c9946e2124aa8c3418c6ca2
Author: Jens Scheffler <[email protected]>
AuthorDate: Wed Dec 24 11:56:19 2025 +0100

    [v3-1-test] Sqlalchemy 2.0 changes (test_types.py, test_manager.py, 
test_processor.py) (#59748) (#59784)
    
    * Update sqlalchemy compatible change for test_types.py
    
    * Compatibility fixes
    
    * Update sqlalchemy compatible change for test_manager.py
    
    * Update sqlalchemy compatible change for test_processor.py
    
    * Review fix
    (cherry picked from commit 9e9874fd524f9c72258e5b360ac7ed30edeb2f7c)
    
    Co-authored-by: Kunal Bhattacharya <[email protected]>
---
 .../tests/unit/dag_processing/test_manager.py      | 32 +++++++++-------------
 .../tests/unit/dag_processing/test_processor.py    |  5 ++--
 airflow-core/tests/unit/utils/test_types.py        |  9 +++---
 3 files changed, 21 insertions(+), 25 deletions(-)

diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 91692e6d0fa..456d925935b 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -172,7 +172,7 @@ class TestDagFileProcessorManager:
             manager.run()
 
             with create_session() as session:
-                import_errors = session.query(ParseImportError).all()
+                import_errors = session.scalars(select(ParseImportError)).all()
                 assert len(import_errors) == 1
 
                 path_to_parse.unlink()
@@ -181,7 +181,7 @@ class TestDagFileProcessorManager:
             manager.run()
 
             with create_session() as session:
-                import_errors = session.query(ParseImportError).all()
+                import_errors = session.scalars(select(ParseImportError)).all()
 
                 assert len(import_errors) == 0
                 session.rollback()
@@ -435,7 +435,7 @@ class TestDagFileProcessorManager:
         manager._queue_requested_files_for_parsing()
         assert manager._file_queue == deque([file1])
         with create_session() as session2:
-            parsing_request_after = 
session2.query(DagPriorityParsingRequest).all()
+            parsing_request_after = 
session2.scalars(select(DagPriorityParsingRequest)).all()
         assert len(parsing_request_after) == 1
         assert parsing_request_after[0].relative_fileloc == "file_x.py"
 
@@ -480,34 +480,28 @@ class TestDagFileProcessorManager:
         manager._files = [test_dag_path]
         manager._file_stats[test_dag_path] = stat
 
-        active_dag_count = (
-            session.query(func.count(DagModel.dag_id))
-            .filter(
+        active_dag_count = session.scalar(
+            select(func.count(DagModel.dag_id)).where(
                 ~DagModel.is_stale,
                 DagModel.relative_fileloc == str(test_dag_path.rel_path),
                 DagModel.bundle_name == test_dag_path.bundle_name,
             )
-            .scalar()
         )
         assert active_dag_count == 1
 
         manager._scan_stale_dags()
 
-        active_dag_count = (
-            session.query(func.count(DagModel.dag_id))
-            .filter(
+        active_dag_count = session.scalar(
+            select(func.count(DagModel.dag_id)).where(
                 ~DagModel.is_stale,
                 DagModel.relative_fileloc == str(test_dag_path.rel_path),
                 DagModel.bundle_name == test_dag_path.bundle_name,
             )
-            .scalar()
         )
         assert active_dag_count == 0
 
-        serialized_dag_count = (
-            session.query(func.count(SerializedDagModel.dag_id))
-            .filter(SerializedDagModel.dag_id == dag.dag_id)
-            .scalar()
+        serialized_dag_count = session.scalar(
+            
select(func.count(SerializedDagModel.dag_id)).where(SerializedDagModel.dag_id 
== dag.dag_id)
         )
         # Deactivating the DagModel should not delete the SerializedDagModel
         # SerializedDagModel gives history about Dags
@@ -776,7 +770,7 @@ class TestDagFileProcessorManager:
                 assert callbacks[0].run_id == "123"
                 assert callbacks[1].run_id == "456"
 
-                assert session.query(DbCallbackRequest).count() == 0
+                assert len(session.scalars(select(DbCallbackRequest)).all()) 
== 0
 
     @conf_vars(
         {
@@ -805,11 +799,11 @@ class TestDagFileProcessorManager:
 
             with create_session() as session:
                 manager.run()
-                assert session.query(DbCallbackRequest).count() == 3
+                assert len(session.scalars(select(DbCallbackRequest)).all()) 
== 3
 
             with create_session() as session:
                 manager.run()
-                assert session.query(DbCallbackRequest).count() == 1
+                assert len(session.scalars(select(DbCallbackRequest)).all()) 
== 1
 
     @conf_vars({("core", "load_examples"): "False"})
     def test_fetch_callbacks_ignores_other_bundles(self, 
configure_testing_dag_bundle):
@@ -850,7 +844,7 @@ class TestDagFileProcessorManager:
                 assert [c.run_id for c in callbacks] == ["match"]
 
                 # The non-matching callback should remain in the DB
-                remaining = session.query(DbCallbackRequest).all()
+                remaining = session.scalars(select(DbCallbackRequest)).all()
                 assert len(remaining) == 1
                 # Decode remaining request and verify it's for the other bundle
                 remaining_req = remaining[0].get_callback_request()
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py 
b/airflow-core/tests/unit/dag_processing/test_processor.py
index fc95923cf02..63e63a57643 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -32,6 +32,7 @@ from unittest.mock import MagicMock, patch
 import pytest
 import structlog
 from pydantic import TypeAdapter
+from sqlalchemy import select
 from structlog.typing import FilteringBoundLogger
 
 from airflow._shared.timezones import timezone
@@ -242,7 +243,7 @@ class TestDagFileProcessor:
             assert result.import_errors == {}
             assert result.serialized_dags[0].dag_id == "test_myvalue"
 
-            all_vars = session.query(VariableORM).all()
+            all_vars = session.scalars(select(VariableORM)).all()
             assert len(all_vars) == 1
             assert all_vars[0].key == "mykey"
 
@@ -285,7 +286,7 @@ class TestDagFileProcessor:
             assert result.import_errors == {}
             assert result.serialized_dags[0].dag_id == "not-found"
 
-            all_vars = session.query(VariableORM).all()
+            all_vars = session.scalars(select(VariableORM)).all()
             assert len(all_vars) == 0
 
     def test_top_level_connection_access(
diff --git a/airflow-core/tests/unit/utils/test_types.py 
b/airflow-core/tests/unit/utils/test_types.py
index 4a6831f4035..277a0d84607 100644
--- a/airflow-core/tests/unit/utils/test_types.py
+++ b/airflow-core/tests/unit/utils/test_types.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 from datetime import timedelta
 
 import pytest
+from sqlalchemy import select
 
 from airflow.models.dagrun import DagRun
 from airflow.utils.state import State
@@ -36,22 +37,22 @@ def test_runtype_enum_escape(dag_maker, session):
         pass
     dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
 
-    query = session.query(
+    query = select(
         DagRun.dag_id,
         DagRun.state,
         DagRun.run_type,
-    ).filter(
+    ).where(
         DagRun.dag_id == "test_enum_dags",
         # make sure enum value can be used in filter queries
         DagRun.run_type == DagRunType.SCHEDULED,
     )
-    assert str(query.statement.compile(compile_kwargs={"literal_binds": 
True})) == (
+    rows = session.execute(query).all()
+    assert str(query.compile(compile_kwargs={"literal_binds": True})) == (
         "SELECT dag_run.dag_id, dag_run.state, dag_run.run_type \n"
         "FROM dag_run \n"
         "WHERE dag_run.dag_id = 'test_enum_dags' AND dag_run.run_type = 
'scheduled'"
     )
 
-    rows = query.all()
     assert len(rows) == 1
     assert rows[0].dag_id == "test_enum_dags"
     assert rows[0].state == State.RUNNING

Reply via email to