This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fc892d7ccb2 Fix exceptions of positional session use in airflow-core 
leftover non-models modules (#67810)
fc892d7ccb2 is described below

commit fc892d7ccb28faac67dc296e287b8b22db039b7e
Author: Jens Scheffler <[email protected]>
AuthorDate: Sun May 31 23:29:55 2026 +0200

    Fix exceptions of positional session use in airflow-core leftover 
non-models modules (#67810)
---
 airflow-core/src/airflow/secrets/metastore.py      |  4 ++--
 .../src/airflow/serialization/definitions/dag.py   |  3 ++-
 .../ti_deps/deps/dag_ti_slots_available_dep.py     |  2 +-
 .../tests/unit/listeners/test_listeners.py         | 24 ++++++++++++++--------
 .../ci/prek/known_provide_session_positional.txt   |  3 ---
 5 files changed, 21 insertions(+), 15 deletions(-)

diff --git a/airflow-core/src/airflow/secrets/metastore.py 
b/airflow-core/src/airflow/secrets/metastore.py
index e7f25a9f917..78e124b423a 100644
--- a/airflow-core/src/airflow/secrets/metastore.py
+++ b/airflow-core/src/airflow/secrets/metastore.py
@@ -37,7 +37,7 @@ class MetastoreBackend(BaseSecretsBackend):
 
     @provide_session
     def get_connection(
-        self, conn_id: str, team_name: str | None = None, session: Session = 
NEW_SESSION
+        self, conn_id: str, team_name: str | None = None, *, session: Session 
= NEW_SESSION
     ) -> Connection | None:
         """
         Get Airflow Connection from Metadata DB.
@@ -63,7 +63,7 @@ class MetastoreBackend(BaseSecretsBackend):
 
     @provide_session
     def get_variable(
-        self, key: str, team_name: str | None = None, session: Session = 
NEW_SESSION
+        self, key: str, team_name: str | None = None, *, session: Session = 
NEW_SESSION
     ) -> str | None:
         """
         Get Airflow Variable from Metadata DB.
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py 
b/airflow-core/src/airflow/serialization/definitions/dag.py
index 6fb7bf083cf..092a3e589ce 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -183,6 +183,7 @@ class SerializedDAG:
         bundle_version: str | None,
         dags: Collection[DAG | LazyDeserializedDAG],
         parse_duration: float | None = None,
+        *,
         session: Session = NEW_SESSION,
     ) -> None:
         """
@@ -482,7 +483,7 @@ class SerializedDAG:
             )
 
     @provide_session
-    def get_concurrency_reached(self, session=NEW_SESSION) -> bool:
+    def get_concurrency_reached(self, *, session=NEW_SESSION) -> bool:
         """Return a boolean indicating whether the max_active_tasks limit for 
this DAG has been reached."""
         from airflow.models.taskinstance import TaskInstance
 
diff --git 
a/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
index 6fb370eabd2..9effcd4661e 100644
--- a/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
@@ -29,7 +29,7 @@ class DagTISlotsAvailableDep(BaseTIDep):
 
     @provide_session
     def _get_dep_statuses(self, ti, dep_context, *, session):
-        if ti.task.dag.get_concurrency_reached(session):
+        if ti.task.dag.get_concurrency_reached(session=session):
             yield self._failing_status(
                 reason=(
                     f"The maximum number of running tasks 
({ti.task.dag.max_active_tasks}) for "
diff --git a/airflow-core/tests/unit/listeners/test_listeners.py 
b/airflow-core/tests/unit/listeners/test_listeners.py
index aad2ea7b6e8..e060be0557d 100644
--- a/airflow-core/tests/unit/listeners/test_listeners.py
+++ b/airflow-core/tests/unit/listeners/test_listeners.py
@@ -19,6 +19,7 @@ from __future__ import annotations
 import contextlib
 import logging
 import os
+from typing import TYPE_CHECKING
 
 import pytest
 
@@ -38,6 +39,9 @@ from unit.listeners import (
 )
 from unit.utils.test_helpers import MockJobRunner
 
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
+
 pytestmark = pytest.mark.db_test
 
 
@@ -65,7 +69,7 @@ def clean_listener_state():
 
 
 @provide_session
-def test_listener_gets_calls(create_task_instance, session, listener_manager):
+def test_listener_gets_calls(create_task_instance, listener_manager, *, 
session: Session):
     listener_manager(full_listener)
 
     ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
@@ -79,7 +83,7 @@ def test_listener_gets_calls(create_task_instance, session, 
listener_manager):
 
 
 @provide_session
-def test_multiple_listeners(create_task_instance, session, listener_manager):
+def test_multiple_listeners(create_task_instance, listener_manager, *, 
session: Session):
     listener_manager(full_listener)
     listener_manager(lifecycle_listener)
     class_based_listener = class_listener.ClassBasedListener()
@@ -99,7 +103,7 @@ def test_multiple_listeners(create_task_instance, session, 
listener_manager):
 
 
 @provide_session
-def test_listener_gets_only_subscribed_calls(create_task_instance, session, 
listener_manager):
+def test_listener_gets_only_subscribed_calls(create_task_instance, 
listener_manager, *, session: Session):
     listener_manager(partial_listener)
 
     ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
@@ -113,7 +117,9 @@ def 
test_listener_gets_only_subscribed_calls(create_task_instance, session, list
 
 
 @provide_session
-def test_listener_suppresses_exceptions(create_task_instance, session, 
cap_structlog, listener_manager):
+def test_listener_suppresses_exceptions(
+    create_task_instance, cap_structlog, listener_manager, *, session: Session
+):
     listener_manager(throwing_listener)
 
     ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
@@ -122,7 +128,9 @@ def 
test_listener_suppresses_exceptions(create_task_instance, session, cap_struc
 
 
 @provide_session
-def 
test_listener_captures_failed_taskinstances(create_task_instance_of_operator, 
session, listener_manager):
+def test_listener_captures_failed_taskinstances(
+    create_task_instance_of_operator, listener_manager, *, session: Session
+):
     listener_manager(full_listener)
 
     ti = create_task_instance_of_operator(
@@ -137,7 +145,7 @@ def 
test_listener_captures_failed_taskinstances(create_task_instance_of_operator
 
 @provide_session
 def test_listener_captures_longrunning_taskinstances(
-    create_task_instance_of_operator, session, listener_manager
+    create_task_instance_of_operator, listener_manager, *, session: Session
 ):
     listener_manager(full_listener)
 
@@ -151,7 +159,7 @@ def test_listener_captures_longrunning_taskinstances(
 
 
 @provide_session
-def test_class_based_listener(create_task_instance, session, listener_manager):
+def test_class_based_listener(create_task_instance, listener_manager, *, 
session: Session):
     listener = class_listener.ClassBasedListener()
     listener_manager(listener)
 
@@ -161,7 +169,7 @@ def test_class_based_listener(create_task_instance, 
session, listener_manager):
     assert listener.state == [TaskInstanceState.RUNNING, 
TaskInstanceState.SUCCESS, DagRunState.SUCCESS]
 
 
-def test_listener_logs_call(caplog, create_task_instance, session, 
listener_manager):
+def test_listener_logs_call(caplog, create_task_instance, listener_manager, *, 
session: Session):
     caplog.set_level(logging.DEBUG, 
logger="airflow.sdk._shared.listeners.listener")
     listener_manager(full_listener)
 
diff --git a/scripts/ci/prek/known_provide_session_positional.txt 
b/scripts/ci/prek/known_provide_session_positional.txt
index 164938f1b8e..ec271c99df4 100644
--- a/scripts/ci/prek/known_provide_session_positional.txt
+++ b/scripts/ci/prek/known_provide_session_positional.txt
@@ -10,9 +10,6 @@ airflow-core/src/airflow/models/taskinstancehistory.py::2
 airflow-core/src/airflow/models/team.py::1
 airflow-core/src/airflow/models/trigger.py::7
 airflow-core/src/airflow/models/variable.py::2
-airflow-core/src/airflow/secrets/metastore.py::2
-airflow-core/src/airflow/serialization/definitions/dag.py::2
-airflow-core/tests/unit/listeners/test_listeners.py::7
 airflow-core/tests/unit/models/test_taskinstance.py::4
 airflow-core/tests/unit/models/test_timestamp.py::2
 providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py::1

Reply via email to