This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a5f73bb5439 Fix exceptions of positional session use in Edge provider
(#67661)
a5f73bb5439 is described below
commit a5f73bb54399f015aeb520d404133525d0ffdb5b
Author: Jens Scheffler <[email protected]>
AuthorDate: Fri May 29 13:45:24 2026 +0200
Fix exceptions of positional session use in Edge provider (#67661)
* Fix exceptions of positional session use in Edge provider
* Revert interface-based limits
---
.../tests/unit/always/test_project_structure.py | 2 --
.../providers/edge3/executors/edge_executor.py | 4 ++--
.../src/airflow/providers/edge3/models/edge_worker.py | 19 ++++++++++---------
.../providers/edge3/plugins/edge_executor_plugin.py | 2 +-
.../airflow/providers/edge3/worker_api/routes/logs.py | 2 +-
scripts/ci/prek/known_provide_session_positional.txt | 5 +----
6 files changed, 15 insertions(+), 19 deletions(-)
diff --git a/airflow-core/tests/unit/always/test_project_structure.py
b/airflow-core/tests/unit/always/test_project_structure.py
index 750925072d2..c4300a19226 100644
--- a/airflow-core/tests/unit/always/test_project_structure.py
+++ b/airflow-core/tests/unit/always/test_project_structure.py
@@ -111,8 +111,6 @@ class TestProjectStructure:
"providers/edge3/tests/unit/edge3/models/test_edge_job.py",
"providers/edge3/tests/unit/edge3/models/test_edge_logs.py",
"providers/edge3/tests/unit/edge3/models/test_edge_worker.py",
-
"providers/edge3/tests/unit/edge3/worker_api/routes/test__v2_compat.py",
-
"providers/edge3/tests/unit/edge3/worker_api/routes/test__v2_routes.py",
"providers/edge3/tests/unit/edge3/worker_api/test_app.py",
"providers/edge3/tests/unit/edge3/worker_api/test_auth.py",
"providers/edge3/tests/unit/edge3/worker_api/test_datamodels.py",
diff --git
a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
index aacf9e65e61..dda48f49786 100644
--- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
+++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
@@ -74,7 +74,7 @@ class EdgeExecutor(BaseExecutor):
self.team_name = None
@provide_session
- def start(self, session: Session = NEW_SESSION):
+ def start(self, *, session: Session = NEW_SESSION):
"""If EdgeExecutor provider is loaded first time, ensure table
exists."""
check_db_manager_config()
edge_db_manager = EdgeDBManager(session)
@@ -309,7 +309,7 @@ class EdgeExecutor(BaseExecutor):
return purged_marker
@provide_session
- def sync(self, session: Session = NEW_SESSION) -> None:
+ def sync(self, *, session: Session = NEW_SESSION) -> None:
"""Sync will get called periodically by the heartbeat method."""
with Stats.timer("edge_executor.sync.duration"):
orphaned = self._update_orphaned_jobs(session)
diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
index 0098f17e7d8..78c60307ea1 100644
--- a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
@@ -225,6 +225,7 @@ def get_query_filter_by_worker_name(worker_name: str):
def _fetch_edge_hosts_from_db(
hostname: str | None = None,
states: list | None = None,
+ *,
session: Session = NEW_SESSION,
) -> Sequence[EdgeWorkerModel]:
query = select(EdgeWorkerModel)
@@ -238,13 +239,13 @@ def _fetch_edge_hosts_from_db(
@providers_configuration_loaded
@provide_session
-def get_registered_edge_hosts(states: list | None = None, session: Session =
NEW_SESSION):
+def get_registered_edge_hosts(*, states: list | None = None, session: Session
= NEW_SESSION):
return _fetch_edge_hosts_from_db(states=states, session=session)
@provide_session
def request_maintenance(
- worker_name: str, maintenance_comment: str | None, session: Session =
NEW_SESSION
+ worker_name: str, maintenance_comment: str | None, *, session: Session =
NEW_SESSION
) -> None:
"""Write maintenance request to the db."""
query = get_query_filter_by_worker_name(worker_name=worker_name)
@@ -256,7 +257,7 @@ def request_maintenance(
@provide_session
-def exit_maintenance(worker_name: str, session: Session = NEW_SESSION) -> None:
+def exit_maintenance(worker_name: str, *, session: Session = NEW_SESSION) ->
None:
"""Write maintenance exit to the db."""
query = get_query_filter_by_worker_name(worker_name)
worker: EdgeWorkerModel | None = session.scalar(query)
@@ -267,7 +268,7 @@ def exit_maintenance(worker_name: str, session: Session =
NEW_SESSION) -> None:
@provide_session
-def remove_worker(worker_name: str, session: Session = NEW_SESSION) -> None:
+def remove_worker(worker_name: str, *, session: Session = NEW_SESSION) -> None:
"""Remove a worker that is offline or just gone from DB."""
query = get_query_filter_by_worker_name(worker_name)
worker: EdgeWorkerModel | None = session.scalar(query)
@@ -287,7 +288,7 @@ def remove_worker(worker_name: str, session: Session =
NEW_SESSION) -> None:
@provide_session
def change_maintenance_comment(
- worker_name: str, maintenance_comment: str | None, session: Session =
NEW_SESSION
+ worker_name: str, maintenance_comment: str | None, *, session: Session =
NEW_SESSION
) -> None:
"""Write maintenance comment in the db."""
query = get_query_filter_by_worker_name(worker_name)
@@ -308,7 +309,7 @@ def change_maintenance_comment(
@provide_session
-def request_shutdown(worker_name: str, session: Session = NEW_SESSION) -> None:
+def request_shutdown(worker_name: str, *, session: Session = NEW_SESSION) ->
None:
"""Request to shutdown the edge worker."""
query = get_query_filter_by_worker_name(worker_name)
worker: EdgeWorkerModel | None = session.scalar(query)
@@ -323,7 +324,7 @@ def request_shutdown(worker_name: str, session: Session =
NEW_SESSION) -> None:
@provide_session
-def add_worker_queues(worker_name: str, queues: list[str], session: Session =
NEW_SESSION) -> None:
+def add_worker_queues(worker_name: str, queues: list[str], *, session: Session
= NEW_SESSION) -> None:
"""Add queues to an edge worker."""
query = get_query_filter_by_worker_name(worker_name)
worker: EdgeWorkerModel | None = session.scalar(query)
@@ -341,7 +342,7 @@ def add_worker_queues(worker_name: str, queues: list[str],
session: Session = NE
@provide_session
-def remove_worker_queues(worker_name: str, queues: list[str], session: Session
= NEW_SESSION) -> None:
+def remove_worker_queues(worker_name: str, queues: list[str], *, session:
Session = NEW_SESSION) -> None:
"""Remove queues from an edge worker."""
query = get_query_filter_by_worker_name(worker_name)
worker: EdgeWorkerModel | None = session.scalar(query)
@@ -361,7 +362,7 @@ def remove_worker_queues(worker_name: str, queues:
list[str], session: Session =
@provide_session
-def set_worker_concurrency(worker_name: str, concurrency: int, session:
Session = NEW_SESSION) -> None:
+def set_worker_concurrency(worker_name: str, concurrency: int, *, session:
Session = NEW_SESSION) -> None:
"""Set the concurrency of an edge worker."""
query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name ==
worker_name)
worker: EdgeWorkerModel | None = session.scalar(query)
diff --git
a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
index 8a23cf7231e..f18b438fa65 100644
---
a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
+++
b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
@@ -32,7 +32,7 @@ from airflow.utils.db import DBLocks, create_global_lock
@provide_session
-def _get_api_endpoint(session: Session = NEW_SESSION) -> dict[str, Any]:
+def _get_api_endpoint(*, session: Session = NEW_SESSION) -> dict[str, Any]:
# Ensure all required DB modeals are created before starting the API
with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
engine = session.get_bind().engine
diff --git
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py
index 33fe9dceee1..520000e64dd 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py
@@ -39,7 +39,7 @@ logs_router = AirflowRouter(tags=["Logs"], prefix="/logs")
@cache
@provide_session
-def _logfile_path(task: TaskInstanceKey, session=NEW_SESSION) -> str:
+def _logfile_path(task: TaskInstanceKey, *, session=NEW_SESSION) -> str:
"""Elaborate the (relative) path and filename to expect from task
execution."""
ti = TaskInstance.get_task_instance(
dag_id=task.dag_id,
diff --git a/scripts/ci/prek/known_provide_session_positional.txt
b/scripts/ci/prek/known_provide_session_positional.txt
index d0c84e2f6b4..f5ca5505fbf 100644
--- a/scripts/ci/prek/known_provide_session_positional.txt
+++ b/scripts/ci/prek/known_provide_session_positional.txt
@@ -75,10 +75,7 @@
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_renderi
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py::1
providers/common/ai/tests/unit/common/ai/plugins/test_hitl_review.py::1
providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py::2
-providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py::3
-providers/edge3/src/airflow/providers/edge3/models/edge_worker.py::10
-providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py::1
-providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py::1
+providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py::1
providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py::1
providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py::1
providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py::3