Copilot commented on code in PR #60108:
URL: https://github.com/apache/airflow/pull/60108#discussion_r2957077744
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py:
##########
@@ -246,6 +246,36 @@ def test_ti_run_state_to_running(
)
assert response.status_code == 409
+ def test_ti_run_returns_execution_token(self, client, session,
create_task_instance, time_machine):
+ """PATCH /run should return an X-Execution-Token header on success."""
+ instant = timezone.parse("2024-10-31T12:00:00Z")
+ time_machine.move_to(instant, tick=False)
+
+ ti = create_task_instance(
+ task_id="test_exec_token",
+ state=State.QUEUED,
+ dagrun_state=DagRunState.RUNNING,
+ session=session,
+ start_date=instant,
+ dag_id=str(uuid4()),
+ )
+ session.commit()
+
+ response = client.patch(
+ f"/execution/task-instances/{ti.id}/run",
+ json={
+ "state": "running",
+ "hostname": "test-host",
+ "unixname": "test-user",
+ "pid": 100,
+ "start_date": "2024-10-31T12:00:00Z",
+ },
+ )
+
+ assert response.status_code == 200
+ assert "X-Execution-Token" in response.headers
+ assert response.headers["X-Execution-Token"] == "mock-execution-token"
+
Review Comment:
New behavior allows workload-scoped tokens to call `PATCH /run`, but there
is no regression test asserting that a token with `scope="workload"` is
accepted on `/run` (only that workload tokens are rejected elsewhere). Adding a
test that uses the real JWT bearer/validator to return `scope="workload"` and
expects 200 from `/run` would prevent accidental removal of the
`token:workload` allowance.
##########
devel-common/src/tests_common/test_utils/mock_executor.py:
##########
@@ -59,6 +59,7 @@ def __init__(self, do_update=True, *args, **kwargs):
# Mock JWT generator for token generation
mock_jwt_generator = MagicMock()
mock_jwt_generator.generate.return_value = "mock-token"
+ mock_jwt_generator.generate_workload_token.return_value =
"mock-workload-token"
Review Comment:
`MagicMock()` is created without `spec`/`autospec` for the JWT generator.
With the new `generate_workload_token` call path, an unspec'd mock can hide
attribute/method mismatches and make tests less reliable. Prefer
`MagicMock(spec=JWTGenerator)` (or `create_autospec`) so incorrect API usage
fails fast.
##########
airflow-core/src/airflow/api_fastapi/auth/tokens.py:
##########
@@ -447,15 +447,33 @@ def signing_arg(self) -> AllowedPrivateKeys | str:
assert self._secret_key
return self._secret_key
- def generate(self, extras: dict[str, Any] | None = None, headers:
dict[str, Any] | None = None) -> str:
+ def generate_workload_token(self, sub: str) -> str:
+ """Generate a long-lived workload token for executor queues."""
+ from airflow.configuration import conf
+
+ workload_valid_for = conf.getint(
+ "execution_api", "jwt_workload_token_expiration_time",
fallback=86400
+ )
+ return self.generate(
+ extras={"sub": sub, "scope": "workload"},
+ valid_for=workload_valid_for,
+ )
Review Comment:
The PR description mentions adding `JWTBearerWorkloadDep`,
`TOKEN_SCOPE_WORKLOAD`, and `SCOPE_MAPPING`, but these identifiers don't exist
in the code changes. The implementation here uses a `scope="workload"` claim
plus `token:*` FastAPI Security scopes/`ExecutionAPIRoute` logic instead.
Please update the PR description to reflect the actual approach so
reviewers/users aren’t looking for non-existent components.
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -92,18 +94,21 @@
@ti_id_router.patch(
"/{task_instance_id}/run",
status_code=status.HTTP_200_OK,
+ dependencies=[Security(require_auth, scopes=["token:execution",
"token:workload"])],
responses={
Review Comment:
This endpoint adds an extra `Security(require_auth, scopes=[...])`
dependency solely to widen allowed token types, but `require_auth` is already
applied at the router level. This likely causes `require_auth` to run twice per
request for `/run`, which is redundant and can be confusing. Consider using a
non-executing marker dependency (or another mechanism) to add the `token:*`
scopes for `ExecutionAPIRoute` without re-running the full auth dependency.
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -92,18 +94,21 @@
@ti_id_router.patch(
"/{task_instance_id}/run",
status_code=status.HTTP_200_OK,
+ dependencies=[Security(require_auth, scopes=["token:execution",
"token:workload"])],
responses={
status.HTTP_404_NOT_FOUND: {"description": "Task Instance not found"},
status.HTTP_409_CONFLICT: {"description": "The TI is already in the
requested state"},
HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for
the state transition"},
},
response_model_exclude_unset=True,
)
-def ti_run(
+async def ti_run(
task_instance_id: UUID,
ti_run_payload: Annotated[TIEnterRunningPayload, Body()],
+ response: Response,
session: SessionDep,
dag_bag: DagBagDep,
+ services=DepContainer,
Review Comment:
`ti_run` was changed to `async def` but still uses `SessionDep` (sync
SQLAlchemy session) and performs many blocking DB operations directly in the
event loop. This can reduce throughput for the Execution API under load.
Consider keeping this endpoint sync (and avoid awaiting the svcs container
directly), or migrate the DB access to `AsyncSessionDep`/async SQLAlchemy so
the endpoint can safely be async.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]