Copilot commented on code in PR #64556:
URL: https://github.com/apache/airflow/pull/64556#discussion_r3025326757
##########
providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py:
##########
@@ -70,6 +71,10 @@ def fetch(
session: SessionDep,
) -> EdgeJobFetched | None:
"""Fetch a job to execute on the edge worker."""
+ worker =
session.scalar(select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name ==
worker_name))
+ if not worker:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, "Worker not found")
+
Review Comment:
New behavior: `fetch()` raises `HTTPException(404)` when the worker name is
not found. The unit tests currently only cover the happy path; please add a
regression test asserting the 404 for an unknown worker.
##########
providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py:
##########
@@ -80,7 +85,8 @@ def fetch(
)
if body.queues:
query = query.where(EdgeJobModel.queue.in_(body.queues))
- query = query.where(EdgeJobModel.team_name == body.team_name)
+ if worker.team_name is not None:
+ query = query.where(EdgeJobModel.team_name == worker.team_name)
Review Comment:
The team isolation now relies on `worker.team_name`, but that value is still
writable by the worker via the worker registration/heartbeat endpoints (e.g.
`worker_api/routes/worker.py` assigns `worker.team_name = body.team_name`). A
client holding the shared JWT secret can therefore change its persisted team
and fetch jobs for another team, so the authorization bypass is not fully
addressed.
Consider making `team_name` server-controlled/immutable (e.g. reject changes
when a worker already exists, or pre-provision workers with a fixed team and
disallow `team_name` in the request body), and add validation tests for this
behavior.
##########
providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py:
##########
@@ -70,6 +71,10 @@ def fetch(
session: SessionDep,
) -> EdgeJobFetched | None:
"""Fetch a job to execute on the edge worker."""
+ worker =
session.scalar(select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name ==
worker_name))
+ if not worker:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, "Worker not found")
+
Review Comment:
`fetch()` now raises an HTTP 404 when the worker is unknown, but the route’s
documented `responses=` only includes 400/403. Please update the OpenAPI
exception docs for this endpoint to include 404 as well, so clients can see
this behavior in the API schema.
##########
providers/edge3/tests/unit/edge3/worker_api/routes/test_jobs.py:
##########
@@ -136,6 +138,11 @@ def test_state(self, mock_stats_incr, session: Session):
def test_fetch_filters_by_team_name(self, session: Session):
Review Comment:
The test name `test_fetch_filters_by_team_name` is now a bit misleading
since the filter is based on the persisted worker’s team (and explicitly
ignores `body.team_name`). Renaming it to something like
`test_fetch_filters_by_worker_team_name` would make the intent clearer and
reduce confusion when reading failures.
```suggestion
def test_fetch_filters_by_worker_team_name(self, session: Session):
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]