This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fc892d7ccb2 Fix exceptions of positional session use in airflow-core
leftover non-models modules (#67810)
fc892d7ccb2 is described below
commit fc892d7ccb28faac67dc296e287b8b22db039b7e
Author: Jens Scheffler <[email protected]>
AuthorDate: Sun May 31 23:29:55 2026 +0200
Fix exceptions of positional session use in airflow-core leftover
non-models modules (#67810)
---
airflow-core/src/airflow/secrets/metastore.py | 4 ++--
.../src/airflow/serialization/definitions/dag.py | 3 ++-
.../ti_deps/deps/dag_ti_slots_available_dep.py | 2 +-
.../tests/unit/listeners/test_listeners.py | 24 ++++++++++++++--------
.../ci/prek/known_provide_session_positional.txt | 3 ---
5 files changed, 21 insertions(+), 15 deletions(-)
diff --git a/airflow-core/src/airflow/secrets/metastore.py
b/airflow-core/src/airflow/secrets/metastore.py
index e7f25a9f917..78e124b423a 100644
--- a/airflow-core/src/airflow/secrets/metastore.py
+++ b/airflow-core/src/airflow/secrets/metastore.py
@@ -37,7 +37,7 @@ class MetastoreBackend(BaseSecretsBackend):
@provide_session
def get_connection(
- self, conn_id: str, team_name: str | None = None, session: Session =
NEW_SESSION
+ self, conn_id: str, team_name: str | None = None, *, session: Session
= NEW_SESSION
) -> Connection | None:
"""
Get Airflow Connection from Metadata DB.
@@ -63,7 +63,7 @@ class MetastoreBackend(BaseSecretsBackend):
@provide_session
def get_variable(
- self, key: str, team_name: str | None = None, session: Session =
NEW_SESSION
+ self, key: str, team_name: str | None = None, *, session: Session =
NEW_SESSION
) -> str | None:
"""
Get Airflow Variable from Metadata DB.
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py
b/airflow-core/src/airflow/serialization/definitions/dag.py
index 6fb7bf083cf..092a3e589ce 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -183,6 +183,7 @@ class SerializedDAG:
bundle_version: str | None,
dags: Collection[DAG | LazyDeserializedDAG],
parse_duration: float | None = None,
+ *,
session: Session = NEW_SESSION,
) -> None:
"""
@@ -482,7 +483,7 @@ class SerializedDAG:
)
@provide_session
- def get_concurrency_reached(self, session=NEW_SESSION) -> bool:
+ def get_concurrency_reached(self, *, session=NEW_SESSION) -> bool:
"""Return a boolean indicating whether the max_active_tasks limit for
this DAG has been reached."""
from airflow.models.taskinstance import TaskInstance
diff --git
a/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
b/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
index 6fb370eabd2..9effcd4661e 100644
--- a/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/dag_ti_slots_available_dep.py
@@ -29,7 +29,7 @@ class DagTISlotsAvailableDep(BaseTIDep):
@provide_session
def _get_dep_statuses(self, ti, dep_context, *, session):
- if ti.task.dag.get_concurrency_reached(session):
+ if ti.task.dag.get_concurrency_reached(session=session):
yield self._failing_status(
reason=(
f"The maximum number of running tasks
({ti.task.dag.max_active_tasks}) for "
diff --git a/airflow-core/tests/unit/listeners/test_listeners.py
b/airflow-core/tests/unit/listeners/test_listeners.py
index aad2ea7b6e8..e060be0557d 100644
--- a/airflow-core/tests/unit/listeners/test_listeners.py
+++ b/airflow-core/tests/unit/listeners/test_listeners.py
@@ -19,6 +19,7 @@ from __future__ import annotations
import contextlib
import logging
import os
+from typing import TYPE_CHECKING
import pytest
@@ -38,6 +39,9 @@ from unit.listeners import (
)
from unit.utils.test_helpers import MockJobRunner
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
pytestmark = pytest.mark.db_test
@@ -65,7 +69,7 @@ def clean_listener_state():
@provide_session
-def test_listener_gets_calls(create_task_instance, session, listener_manager):
+def test_listener_gets_calls(create_task_instance, listener_manager, *,
session: Session):
listener_manager(full_listener)
ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
@@ -79,7 +83,7 @@ def test_listener_gets_calls(create_task_instance, session,
listener_manager):
@provide_session
-def test_multiple_listeners(create_task_instance, session, listener_manager):
+def test_multiple_listeners(create_task_instance, listener_manager, *,
session: Session):
listener_manager(full_listener)
listener_manager(lifecycle_listener)
class_based_listener = class_listener.ClassBasedListener()
@@ -99,7 +103,7 @@ def test_multiple_listeners(create_task_instance, session,
listener_manager):
@provide_session
-def test_listener_gets_only_subscribed_calls(create_task_instance, session,
listener_manager):
+def test_listener_gets_only_subscribed_calls(create_task_instance,
listener_manager, *, session: Session):
listener_manager(partial_listener)
ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
@@ -113,7 +117,9 @@ def
test_listener_gets_only_subscribed_calls(create_task_instance, session, list
@provide_session
-def test_listener_suppresses_exceptions(create_task_instance, session,
cap_structlog, listener_manager):
+def test_listener_suppresses_exceptions(
+ create_task_instance, cap_structlog, listener_manager, *, session: Session
+):
listener_manager(throwing_listener)
ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
@@ -122,7 +128,9 @@ def
test_listener_suppresses_exceptions(create_task_instance, session, cap_struc
@provide_session
-def
test_listener_captures_failed_taskinstances(create_task_instance_of_operator,
session, listener_manager):
+def test_listener_captures_failed_taskinstances(
+ create_task_instance_of_operator, listener_manager, *, session: Session
+):
listener_manager(full_listener)
ti = create_task_instance_of_operator(
@@ -137,7 +145,7 @@ def
test_listener_captures_failed_taskinstances(create_task_instance_of_operator
@provide_session
def test_listener_captures_longrunning_taskinstances(
- create_task_instance_of_operator, session, listener_manager
+ create_task_instance_of_operator, listener_manager, *, session: Session
):
listener_manager(full_listener)
@@ -151,7 +159,7 @@ def test_listener_captures_longrunning_taskinstances(
@provide_session
-def test_class_based_listener(create_task_instance, session, listener_manager):
+def test_class_based_listener(create_task_instance, listener_manager, *,
session: Session):
listener = class_listener.ClassBasedListener()
listener_manager(listener)
@@ -161,7 +169,7 @@ def test_class_based_listener(create_task_instance,
session, listener_manager):
assert listener.state == [TaskInstanceState.RUNNING,
TaskInstanceState.SUCCESS, DagRunState.SUCCESS]
-def test_listener_logs_call(caplog, create_task_instance, session,
listener_manager):
+def test_listener_logs_call(caplog, create_task_instance, listener_manager, *,
session: Session):
caplog.set_level(logging.DEBUG,
logger="airflow.sdk._shared.listeners.listener")
listener_manager(full_listener)
diff --git a/scripts/ci/prek/known_provide_session_positional.txt
b/scripts/ci/prek/known_provide_session_positional.txt
index 164938f1b8e..ec271c99df4 100644
--- a/scripts/ci/prek/known_provide_session_positional.txt
+++ b/scripts/ci/prek/known_provide_session_positional.txt
@@ -10,9 +10,6 @@ airflow-core/src/airflow/models/taskinstancehistory.py::2
airflow-core/src/airflow/models/team.py::1
airflow-core/src/airflow/models/trigger.py::7
airflow-core/src/airflow/models/variable.py::2
-airflow-core/src/airflow/secrets/metastore.py::2
-airflow-core/src/airflow/serialization/definitions/dag.py::2
-airflow-core/tests/unit/listeners/test_listeners.py::7
airflow-core/tests/unit/models/test_taskinstance.py::4
airflow-core/tests/unit/models/test_timestamp.py::2
providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py::1