Copilot commented on code in PR #60108:
URL: https://github.com/apache/airflow/pull/60108#discussion_r2957077744


##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -246,6 +246,36 @@ def test_ti_run_state_to_running(
         )
         assert response.status_code == 409
 
+    def test_ti_run_returns_execution_token(self, client, session, 
create_task_instance, time_machine):
+        """PATCH /run should return an X-Execution-Token header on success."""
+        instant = timezone.parse("2024-10-31T12:00:00Z")
+        time_machine.move_to(instant, tick=False)
+
+        ti = create_task_instance(
+            task_id="test_exec_token",
+            state=State.QUEUED,
+            dagrun_state=DagRunState.RUNNING,
+            session=session,
+            start_date=instant,
+            dag_id=str(uuid4()),
+        )
+        session.commit()
+
+        response = client.patch(
+            f"/execution/task-instances/{ti.id}/run",
+            json={
+                "state": "running",
+                "hostname": "test-host",
+                "unixname": "test-user",
+                "pid": 100,
+                "start_date": "2024-10-31T12:00:00Z",
+            },
+        )
+
+        assert response.status_code == 200
+        assert "X-Execution-Token" in response.headers
+        assert response.headers["X-Execution-Token"] == "mock-execution-token"
+

Review Comment:
   New behavior allows workload-scoped tokens to call `PATCH /run`, but there 
is no regression test asserting that a token with `scope="workload"` is 
accepted on `/run` (only that workload tokens are rejected elsewhere). Adding a 
test that uses the real JWT bearer/validator to return `scope="workload"` and 
expects 200 from `/run` would prevent accidental removal of the 
`token:workload` allowance.
   



##########
devel-common/src/tests_common/test_utils/mock_executor.py:
##########
@@ -59,6 +59,7 @@ def __init__(self, do_update=True, *args, **kwargs):
         # Mock JWT generator for token generation
         mock_jwt_generator = MagicMock()
         mock_jwt_generator.generate.return_value = "mock-token"
+        mock_jwt_generator.generate_workload_token.return_value = 
"mock-workload-token"
 

Review Comment:
   `MagicMock()` is created without `spec`/`autospec` for the JWT generator. 
With the new `generate_workload_token` call path, an unspec'd mock can hide 
attribute/method mismatches and make tests less reliable. Prefer 
`MagicMock(spec=JWTGenerator)` (or `create_autospec`) so incorrect API usage 
fails fast.



##########
airflow-core/src/airflow/api_fastapi/auth/tokens.py:
##########
@@ -447,15 +447,33 @@ def signing_arg(self) -> AllowedPrivateKeys | str:
             assert self._secret_key
         return self._secret_key
 
-    def generate(self, extras: dict[str, Any] | None = None, headers: 
dict[str, Any] | None = None) -> str:
+    def generate_workload_token(self, sub: str) -> str:
+        """Generate a long-lived workload token for executor queues."""
+        from airflow.configuration import conf
+
+        workload_valid_for = conf.getint(
+            "execution_api", "jwt_workload_token_expiration_time", 
fallback=86400
+        )
+        return self.generate(
+            extras={"sub": sub, "scope": "workload"},
+            valid_for=workload_valid_for,
+        )

Review Comment:
   The PR description mentions adding `JWTBearerWorkloadDep`, 
`TOKEN_SCOPE_WORKLOAD`, and `SCOPE_MAPPING`, but these identifiers don't exist 
in the code changes. The implementation here uses a `scope="workload"` claim 
plus `token:*` FastAPI Security scopes/`ExecutionAPIRoute` logic instead. 
Please update the PR description to reflect the actual approach so 
reviewers/users aren’t looking for non-existent components.



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -92,18 +94,21 @@
 @ti_id_router.patch(
     "/{task_instance_id}/run",
     status_code=status.HTTP_200_OK,
+    dependencies=[Security(require_auth, scopes=["token:execution", 
"token:workload"])],
     responses={

Review Comment:
   This endpoint adds an extra `Security(require_auth, scopes=[...])` 
dependency solely to widen allowed token types, but `require_auth` is already 
applied at the router level. This likely causes `require_auth` to run twice per 
request for `/run`, which is redundant and can be confusing. Consider using a 
non-executing marker dependency (or another mechanism) to add the `token:*` 
scopes for `ExecutionAPIRoute` without re-running the full auth dependency.



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -92,18 +94,21 @@
 @ti_id_router.patch(
     "/{task_instance_id}/run",
     status_code=status.HTTP_200_OK,
+    dependencies=[Security(require_auth, scopes=["token:execution", 
"token:workload"])],
     responses={
         status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
         status.HTTP_409_CONFLICT: {"description": "The TI is already in the 
requested state"},
         HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for 
the state transition"},
     },
     response_model_exclude_unset=True,
 )
-def ti_run(
+async def ti_run(
     task_instance_id: UUID,
     ti_run_payload: Annotated[TIEnterRunningPayload, Body()],
+    response: Response,
     session: SessionDep,
     dag_bag: DagBagDep,
+    services=DepContainer,

Review Comment:
   `ti_run` was changed to `async def` but still uses `SessionDep` (sync 
SQLAlchemy session) and performs many blocking DB operations directly in the 
event loop. This can reduce throughput for the Execution API under load. 
Consider keeping this endpoint sync (and avoid awaiting the svcs container 
directly), or migrate the DB access to `AsyncSessionDep`/async SQLAlchemy so 
the endpoint can safely be async.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to