This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a5f73bb5439 Fix exceptions of positional session use in Edge provider 
(#67661)
a5f73bb5439 is described below

commit a5f73bb54399f015aeb520d404133525d0ffdb5b
Author: Jens Scheffler <[email protected]>
AuthorDate: Fri May 29 13:45:24 2026 +0200

    Fix exceptions of positional session use in Edge provider (#67661)
    
    * Fix exceptions of positional session use in Edge provider
    
    * Revert interface-based limits
---
 .../tests/unit/always/test_project_structure.py       |  2 --
 .../providers/edge3/executors/edge_executor.py        |  4 ++--
 .../src/airflow/providers/edge3/models/edge_worker.py | 19 ++++++++++---------
 .../providers/edge3/plugins/edge_executor_plugin.py   |  2 +-
 .../airflow/providers/edge3/worker_api/routes/logs.py |  2 +-
 scripts/ci/prek/known_provide_session_positional.txt  |  5 +----
 6 files changed, 15 insertions(+), 19 deletions(-)

diff --git a/airflow-core/tests/unit/always/test_project_structure.py 
b/airflow-core/tests/unit/always/test_project_structure.py
index 750925072d2..c4300a19226 100644
--- a/airflow-core/tests/unit/always/test_project_structure.py
+++ b/airflow-core/tests/unit/always/test_project_structure.py
@@ -111,8 +111,6 @@ class TestProjectStructure:
             "providers/edge3/tests/unit/edge3/models/test_edge_job.py",
             "providers/edge3/tests/unit/edge3/models/test_edge_logs.py",
             "providers/edge3/tests/unit/edge3/models/test_edge_worker.py",
-            
"providers/edge3/tests/unit/edge3/worker_api/routes/test__v2_compat.py",
-            
"providers/edge3/tests/unit/edge3/worker_api/routes/test__v2_routes.py",
             "providers/edge3/tests/unit/edge3/worker_api/test_app.py",
             "providers/edge3/tests/unit/edge3/worker_api/test_auth.py",
             "providers/edge3/tests/unit/edge3/worker_api/test_datamodels.py",
diff --git 
a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py 
b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
index aacf9e65e61..dda48f49786 100644
--- a/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
+++ b/providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py
@@ -74,7 +74,7 @@ class EdgeExecutor(BaseExecutor):
             self.team_name = None
 
     @provide_session
-    def start(self, session: Session = NEW_SESSION):
+    def start(self, *, session: Session = NEW_SESSION):
         """If EdgeExecutor provider is loaded first time, ensure table 
exists."""
         check_db_manager_config()
         edge_db_manager = EdgeDBManager(session)
@@ -309,7 +309,7 @@ class EdgeExecutor(BaseExecutor):
         return purged_marker
 
     @provide_session
-    def sync(self, session: Session = NEW_SESSION) -> None:
+    def sync(self, *, session: Session = NEW_SESSION) -> None:
         """Sync will get called periodically by the heartbeat method."""
         with Stats.timer("edge_executor.sync.duration"):
             orphaned = self._update_orphaned_jobs(session)
diff --git a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py 
b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
index 0098f17e7d8..78c60307ea1 100644
--- a/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
+++ b/providers/edge3/src/airflow/providers/edge3/models/edge_worker.py
@@ -225,6 +225,7 @@ def get_query_filter_by_worker_name(worker_name: str):
 def _fetch_edge_hosts_from_db(
     hostname: str | None = None,
     states: list | None = None,
+    *,
     session: Session = NEW_SESSION,
 ) -> Sequence[EdgeWorkerModel]:
     query = select(EdgeWorkerModel)
@@ -238,13 +239,13 @@ def _fetch_edge_hosts_from_db(
 
 @providers_configuration_loaded
 @provide_session
-def get_registered_edge_hosts(states: list | None = None, session: Session = 
NEW_SESSION):
+def get_registered_edge_hosts(*, states: list | None = None, session: Session 
= NEW_SESSION):
     return _fetch_edge_hosts_from_db(states=states, session=session)
 
 
 @provide_session
 def request_maintenance(
-    worker_name: str, maintenance_comment: str | None, session: Session = 
NEW_SESSION
+    worker_name: str, maintenance_comment: str | None, *, session: Session = 
NEW_SESSION
 ) -> None:
     """Write maintenance request to the db."""
     query = get_query_filter_by_worker_name(worker_name=worker_name)
@@ -256,7 +257,7 @@ def request_maintenance(
 
 
 @provide_session
-def exit_maintenance(worker_name: str, session: Session = NEW_SESSION) -> None:
+def exit_maintenance(worker_name: str, *, session: Session = NEW_SESSION) -> 
None:
     """Write maintenance exit to the db."""
     query = get_query_filter_by_worker_name(worker_name)
     worker: EdgeWorkerModel | None = session.scalar(query)
@@ -267,7 +268,7 @@ def exit_maintenance(worker_name: str, session: Session = 
NEW_SESSION) -> None:
 
 
 @provide_session
-def remove_worker(worker_name: str, session: Session = NEW_SESSION) -> None:
+def remove_worker(worker_name: str, *, session: Session = NEW_SESSION) -> None:
     """Remove a worker that is offline or just gone from DB."""
     query = get_query_filter_by_worker_name(worker_name)
     worker: EdgeWorkerModel | None = session.scalar(query)
@@ -287,7 +288,7 @@ def remove_worker(worker_name: str, session: Session = 
NEW_SESSION) -> None:
 
 @provide_session
 def change_maintenance_comment(
-    worker_name: str, maintenance_comment: str | None, session: Session = 
NEW_SESSION
+    worker_name: str, maintenance_comment: str | None, *, session: Session = 
NEW_SESSION
 ) -> None:
     """Write maintenance comment in the db."""
     query = get_query_filter_by_worker_name(worker_name)
@@ -308,7 +309,7 @@ def change_maintenance_comment(
 
 
 @provide_session
-def request_shutdown(worker_name: str, session: Session = NEW_SESSION) -> None:
+def request_shutdown(worker_name: str, *, session: Session = NEW_SESSION) -> 
None:
     """Request to shutdown the edge worker."""
     query = get_query_filter_by_worker_name(worker_name)
     worker: EdgeWorkerModel | None = session.scalar(query)
@@ -323,7 +324,7 @@ def request_shutdown(worker_name: str, session: Session = 
NEW_SESSION) -> None:
 
 
 @provide_session
-def add_worker_queues(worker_name: str, queues: list[str], session: Session = 
NEW_SESSION) -> None:
+def add_worker_queues(worker_name: str, queues: list[str], *, session: Session 
= NEW_SESSION) -> None:
     """Add queues to an edge worker."""
     query = get_query_filter_by_worker_name(worker_name)
     worker: EdgeWorkerModel | None = session.scalar(query)
@@ -341,7 +342,7 @@ def add_worker_queues(worker_name: str, queues: list[str], 
session: Session = NE
 
 
 @provide_session
-def remove_worker_queues(worker_name: str, queues: list[str], session: Session 
= NEW_SESSION) -> None:
+def remove_worker_queues(worker_name: str, queues: list[str], *, session: 
Session = NEW_SESSION) -> None:
     """Remove queues from an edge worker."""
     query = get_query_filter_by_worker_name(worker_name)
     worker: EdgeWorkerModel | None = session.scalar(query)
@@ -361,7 +362,7 @@ def remove_worker_queues(worker_name: str, queues: 
list[str], session: Session =
 
 
 @provide_session
-def set_worker_concurrency(worker_name: str, concurrency: int, session: 
Session = NEW_SESSION) -> None:
+def set_worker_concurrency(worker_name: str, concurrency: int, *, session: 
Session = NEW_SESSION) -> None:
     """Set the concurrency of an edge worker."""
     query = select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == 
worker_name)
     worker: EdgeWorkerModel | None = session.scalar(query)
diff --git 
a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py 
b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
index 8a23cf7231e..f18b438fa65 100644
--- 
a/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
+++ 
b/providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py
@@ -32,7 +32,7 @@ from airflow.utils.db import DBLocks, create_global_lock
 
 
 @provide_session
-def _get_api_endpoint(session: Session = NEW_SESSION) -> dict[str, Any]:
+def _get_api_endpoint(*, session: Session = NEW_SESSION) -> dict[str, Any]:
     # Ensure all required DB modeals are created before starting the API
     with create_global_lock(session=session, lock=DBLocks.MIGRATIONS):
         engine = session.get_bind().engine
diff --git 
a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py 
b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py
index 33fe9dceee1..520000e64dd 100644
--- a/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py
+++ b/providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py
@@ -39,7 +39,7 @@ logs_router = AirflowRouter(tags=["Logs"], prefix="/logs")
 
 @cache
 @provide_session
-def _logfile_path(task: TaskInstanceKey, session=NEW_SESSION) -> str:
+def _logfile_path(task: TaskInstanceKey, *, session=NEW_SESSION) -> str:
     """Elaborate the (relative) path and filename to expect from task 
execution."""
     ti = TaskInstance.get_task_instance(
         dag_id=task.dag_id,
diff --git a/scripts/ci/prek/known_provide_session_positional.txt 
b/scripts/ci/prek/known_provide_session_positional.txt
index d0c84e2f6b4..f5ca5505fbf 100644
--- a/scripts/ci/prek/known_provide_session_positional.txt
+++ b/scripts/ci/prek/known_provide_session_positional.txt
@@ -75,10 +75,7 @@ 
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_renderi
 
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py::1
 providers/common/ai/tests/unit/common/ai/plugins/test_hitl_review.py::1
 
providers/databricks/src/airflow/providers/databricks/plugins/databricks_workflow.py::2
-providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py::3
-providers/edge3/src/airflow/providers/edge3/models/edge_worker.py::10
-providers/edge3/src/airflow/providers/edge3/plugins/edge_executor_plugin.py::1
-providers/edge3/src/airflow/providers/edge3/worker_api/routes/logs.py::1
+providers/edge3/src/airflow/providers/edge3/executors/edge_executor.py::1
 
providers/fab/src/airflow/providers/fab/auth_manager/cli_commands/permissions_command.py::1
 providers/google/src/airflow/providers/google/cloud/triggers/bigquery.py::1
 providers/google/src/airflow/providers/google/cloud/triggers/dataproc.py::3

Reply via email to