This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new f402617f907 [v3-1-test] Sqlalchemy 2.0 changes (test_types.py,
test_manager.py, test_processor.py) (#59748) (#59784)
f402617f907 is described below
commit f402617f907a82ae1c9946e2124aa8c3418c6ca2
Author: Jens Scheffler <[email protected]>
AuthorDate: Wed Dec 24 11:56:19 2025 +0100
[v3-1-test] Sqlalchemy 2.0 changes (test_types.py, test_manager.py,
test_processor.py) (#59748) (#59784)
* Update sqlalchemy compatible change for test_types.py
* Compatibility fixes
* Update sqlalchemy compatible change for test_manager.py
* Update sqlalchemy compatible change for test_processor.py
* Review fix
(cherry picked from commit 9e9874fd524f9c72258e5b360ac7ed30edeb2f7c)
Co-authored-by: Kunal Bhattacharya <[email protected]>
---
.../tests/unit/dag_processing/test_manager.py | 32 +++++++++-------------
.../tests/unit/dag_processing/test_processor.py | 5 ++--
airflow-core/tests/unit/utils/test_types.py | 9 +++---
3 files changed, 21 insertions(+), 25 deletions(-)
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 91692e6d0fa..456d925935b 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -172,7 +172,7 @@ class TestDagFileProcessorManager:
manager.run()
with create_session() as session:
- import_errors = session.query(ParseImportError).all()
+ import_errors = session.scalars(select(ParseImportError)).all()
assert len(import_errors) == 1
path_to_parse.unlink()
@@ -181,7 +181,7 @@ class TestDagFileProcessorManager:
manager.run()
with create_session() as session:
- import_errors = session.query(ParseImportError).all()
+ import_errors = session.scalars(select(ParseImportError)).all()
assert len(import_errors) == 0
session.rollback()
@@ -435,7 +435,7 @@ class TestDagFileProcessorManager:
manager._queue_requested_files_for_parsing()
assert manager._file_queue == deque([file1])
with create_session() as session2:
- parsing_request_after =
session2.query(DagPriorityParsingRequest).all()
+ parsing_request_after =
session2.scalars(select(DagPriorityParsingRequest)).all()
assert len(parsing_request_after) == 1
assert parsing_request_after[0].relative_fileloc == "file_x.py"
@@ -480,34 +480,28 @@ class TestDagFileProcessorManager:
manager._files = [test_dag_path]
manager._file_stats[test_dag_path] = stat
- active_dag_count = (
- session.query(func.count(DagModel.dag_id))
- .filter(
+ active_dag_count = session.scalar(
+ select(func.count(DagModel.dag_id)).where(
~DagModel.is_stale,
DagModel.relative_fileloc == str(test_dag_path.rel_path),
DagModel.bundle_name == test_dag_path.bundle_name,
)
- .scalar()
)
assert active_dag_count == 1
manager._scan_stale_dags()
- active_dag_count = (
- session.query(func.count(DagModel.dag_id))
- .filter(
+ active_dag_count = session.scalar(
+ select(func.count(DagModel.dag_id)).where(
~DagModel.is_stale,
DagModel.relative_fileloc == str(test_dag_path.rel_path),
DagModel.bundle_name == test_dag_path.bundle_name,
)
- .scalar()
)
assert active_dag_count == 0
- serialized_dag_count = (
- session.query(func.count(SerializedDagModel.dag_id))
- .filter(SerializedDagModel.dag_id == dag.dag_id)
- .scalar()
+ serialized_dag_count = session.scalar(
+
select(func.count(SerializedDagModel.dag_id)).where(SerializedDagModel.dag_id
== dag.dag_id)
)
# Deactivating the DagModel should not delete the SerializedDagModel
# SerializedDagModel gives history about Dags
@@ -776,7 +770,7 @@ class TestDagFileProcessorManager:
assert callbacks[0].run_id == "123"
assert callbacks[1].run_id == "456"
- assert session.query(DbCallbackRequest).count() == 0
+ assert len(session.scalars(select(DbCallbackRequest)).all())
== 0
@conf_vars(
{
@@ -805,11 +799,11 @@ class TestDagFileProcessorManager:
with create_session() as session:
manager.run()
- assert session.query(DbCallbackRequest).count() == 3
+ assert len(session.scalars(select(DbCallbackRequest)).all())
== 3
with create_session() as session:
manager.run()
- assert session.query(DbCallbackRequest).count() == 1
+ assert len(session.scalars(select(DbCallbackRequest)).all())
== 1
@conf_vars({("core", "load_examples"): "False"})
def test_fetch_callbacks_ignores_other_bundles(self,
configure_testing_dag_bundle):
@@ -850,7 +844,7 @@ class TestDagFileProcessorManager:
assert [c.run_id for c in callbacks] == ["match"]
# The non-matching callback should remain in the DB
- remaining = session.query(DbCallbackRequest).all()
+ remaining = session.scalars(select(DbCallbackRequest)).all()
assert len(remaining) == 1
# Decode remaining request and verify it's for the other bundle
remaining_req = remaining[0].get_callback_request()
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index fc95923cf02..63e63a57643 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -32,6 +32,7 @@ from unittest.mock import MagicMock, patch
import pytest
import structlog
from pydantic import TypeAdapter
+from sqlalchemy import select
from structlog.typing import FilteringBoundLogger
from airflow._shared.timezones import timezone
@@ -242,7 +243,7 @@ class TestDagFileProcessor:
assert result.import_errors == {}
assert result.serialized_dags[0].dag_id == "test_myvalue"
- all_vars = session.query(VariableORM).all()
+ all_vars = session.scalars(select(VariableORM)).all()
assert len(all_vars) == 1
assert all_vars[0].key == "mykey"
@@ -285,7 +286,7 @@ class TestDagFileProcessor:
assert result.import_errors == {}
assert result.serialized_dags[0].dag_id == "not-found"
- all_vars = session.query(VariableORM).all()
+ all_vars = session.scalars(select(VariableORM)).all()
assert len(all_vars) == 0
def test_top_level_connection_access(
diff --git a/airflow-core/tests/unit/utils/test_types.py
b/airflow-core/tests/unit/utils/test_types.py
index 4a6831f4035..277a0d84607 100644
--- a/airflow-core/tests/unit/utils/test_types.py
+++ b/airflow-core/tests/unit/utils/test_types.py
@@ -19,6 +19,7 @@ from __future__ import annotations
from datetime import timedelta
import pytest
+from sqlalchemy import select
from airflow.models.dagrun import DagRun
from airflow.utils.state import State
@@ -36,22 +37,22 @@ def test_runtype_enum_escape(dag_maker, session):
pass
dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
- query = session.query(
+ query = select(
DagRun.dag_id,
DagRun.state,
DagRun.run_type,
- ).filter(
+ ).where(
DagRun.dag_id == "test_enum_dags",
# make sure enum value can be used in filter queries
DagRun.run_type == DagRunType.SCHEDULED,
)
- assert str(query.statement.compile(compile_kwargs={"literal_binds":
True})) == (
+ rows = session.execute(query).all()
+ assert str(query.compile(compile_kwargs={"literal_binds": True})) == (
"SELECT dag_run.dag_id, dag_run.state, dag_run.run_type \n"
"FROM dag_run \n"
"WHERE dag_run.dag_id = 'test_enum_dags' AND dag_run.run_type =
'scheduled'"
)
- rows = query.all()
assert len(rows) == 1
assert rows[0].dag_id == "test_enum_dags"
assert rows[0].state == State.RUNNING