Copilot commented on code in PR #64556:
URL: https://github.com/apache/airflow/pull/64556#discussion_r3025326757


##########
providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py:
##########
@@ -70,6 +71,10 @@ def fetch(
     session: SessionDep,
 ) -> EdgeJobFetched | None:
     """Fetch a job to execute on the edge worker."""
+    worker = 
session.scalar(select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == 
worker_name))
+    if not worker:
+        raise HTTPException(status.HTTP_404_NOT_FOUND, "Worker not found")
+

Review Comment:
   New behavior: `fetch()` raises `HTTPException(404)` when the worker name is 
not found. The unit tests currently only cover the happy path; please add a 
regression test asserting the 404 for an unknown worker.



##########
providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py:
##########
@@ -80,7 +85,8 @@ def fetch(
     )
     if body.queues:
         query = query.where(EdgeJobModel.queue.in_(body.queues))
-    query = query.where(EdgeJobModel.team_name == body.team_name)
+    if worker.team_name is not None:
+        query = query.where(EdgeJobModel.team_name == worker.team_name)

Review Comment:
   The team isolation now relies on `worker.team_name`, but that value is still 
writable by the worker via the worker registration/heartbeat endpoints (e.g. 
`worker_api/routes/worker.py` assigns `worker.team_name = body.team_name`). A 
client holding the shared JWT secret can therefore change its persisted team 
and fetch jobs for another team, so the authorization bypass is not fully 
addressed.
   
   Consider making `team_name` server-controlled/immutable (e.g. reject changes 
when a worker already exists, or pre-provision workers with a fixed team and 
disallow `team_name` in the request body), and add validation tests for this 
behavior.



##########
providers/edge3/src/airflow/providers/edge3/worker_api/routes/jobs.py:
##########
@@ -70,6 +71,10 @@ def fetch(
     session: SessionDep,
 ) -> EdgeJobFetched | None:
     """Fetch a job to execute on the edge worker."""
+    worker = 
session.scalar(select(EdgeWorkerModel).where(EdgeWorkerModel.worker_name == 
worker_name))
+    if not worker:
+        raise HTTPException(status.HTTP_404_NOT_FOUND, "Worker not found")
+

Review Comment:
   `fetch()` now raises an HTTP 404 when the worker is unknown, but the route’s 
documented `responses=` only includes 400/403. Please update the OpenAPI 
exception docs for this endpoint to include 404 as well, so clients can see 
this behavior in the API schema.



##########
providers/edge3/tests/unit/edge3/worker_api/routes/test_jobs.py:
##########
@@ -136,6 +138,11 @@ def test_state(self, mock_stats_incr, session: Session):
 
     def test_fetch_filters_by_team_name(self, session: Session):

Review Comment:
   The test name `test_fetch_filters_by_team_name` is now a bit misleading 
since the filter is based on the persisted worker’s team (and explicitly 
ignores `body.team_name`). Renaming it to something like 
`test_fetch_filters_by_worker_team_name` would make the intent clearer and 
reduce confusion when reading failures.
   ```suggestion
       def test_fetch_filters_by_worker_team_name(self, session: Session):
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to