kaxil commented on code in PR #60108:
URL: https://github.com/apache/airflow/pull/60108#discussion_r2991910313
##########
airflow-core/tests/unit/api_fastapi/execution_api/conftest.py:
##########
@@ -53,6 +57,12 @@ async def mock_jwt_bearer(request: Request):
exec_app.dependency_overrides[_jwt_bearer] = mock_jwt_bearer
with TestClient(app, headers={"Authorization": "Bearer fake"}) as client:
+ mock_generator = MagicMock(spec=JWTGenerator)
+ mock_generator.generate.return_value = "mock-execution-token"
+ lifespan.registry.register_value(JWTGenerator, mock_generator)
+
yield client
+ lifespan.registry.close()
+
exec_app.dependency_overrides.pop(_jwt_bearer, None)
Review Comment:
`lifespan.registry.close()` is new here (no other test file does this), and
the registry is shared across all tests via `cached_app`. Closing it could
break subsequent tests that try to look up services from the same registry. The
existing pattern in other test files (e.g., `test_task_instances.py`,
`test_router.py`) registers values on `lifespan.registry` without closing it
afterward. I'd drop this `close()` call to match what the rest of the test
suite does.
##########
airflow-core/src/airflow/api_fastapi/execution_api/app.py:
##########
@@ -142,6 +144,11 @@ async def dispatch(self, request: Request, call_next):
validator: JWTValidator = await services.aget(JWTValidator)
claims = await validator.avalidated_claims(token, {})
+ # Workload tokens are long-lived and meant to survive queue
+ # wait times so avoid refreshing them.
+ if claims.get("scope") == "workload":
+ return response
+
Review Comment:
The early return for workload tokens skips the refresh logic (correct), but
it also skips the `except` block below. If `avalidated_claims` raises for a
workload token, execution falls into the outer `except` and the response still
gets returned (with a warning log). Might be worth a comment clarifying that
workload token validation errors are handled by the outer catch.
##########
airflow-core/src/airflow/api_fastapi/execution_api/app.py:
##########
@@ -332,10 +343,29 @@ async def always_allow(request: Request):
)
return TIToken(id=ti_id, claims={"scope": "execution"})
+ # Override _container (the svcs service locator behind
DepContainer).
+ # The default _container reads request.app.state.svcs_registry, but
+ # Cadwyn's versioned sub-apps don't inherit the main app's state,
+ # so lookups raise ServiceNotFoundError. This registry provides
+ # services needed by routes called during dag.test().
+ #
Review Comment:
The stub `JWTGenerator` uses `secrets.token_urlsafe(32)` as the secret key,
so a new key is generated every time `InProcessExecutionAPI.app` is accessed.
Since `app` is a `cached_property`, the key is stable for the lifetime of the
object. But the token generated here by `ti_run` won't be validated by anything
(since `_jwt_bearer` is also overridden with `always_allow`), so this stub only
exists to satisfy the `services.get(JWTGenerator)` call. A brief comment noting
that these tokens are never validated in `dag.test()` mode would help future
readers.
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -286,14 +291,18 @@ def ti_run(
if ti.next_method:
context.next_method = ti.next_method
context.next_kwargs = ti.next_kwargs
-
- return context
except SQLAlchemyError:
log.exception("Error marking Task Instance state as running")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Database error occurred"
)
+ generator: JWTGenerator = services.get(JWTGenerator)
+ execution_token = generator.generate(extras={"sub": str(task_instance_id)})
Review Comment:
Token generation happens outside the `try...except SQLAlchemyError` block.
If `services.get(JWTGenerator)` or `generator.generate()` raises (missing
service, crypto error, etc.), the client gets a raw 500 with no useful detail.
Worth wrapping this in its own try/except, or at minimum a log line, so
operators can tell the difference between "database error" and "token
generation failed".
##########
airflow-core/src/airflow/api_fastapi/auth/tokens.py:
##########
@@ -418,6 +418,10 @@ class JWTGenerator:
kid: str = attrs.field(default=attrs.Factory(_generate_kid,
takes_self=True))
valid_for: float
+ workload_valid_for: float = attrs.field(
Review Comment:
The `workload_valid_for` default reads from config via `_conf_factory`, and
`_jwt_generator()` in `app.py` also reads the same config key and passes it
explicitly. The explicit kwarg takes precedence, so the default factory never
runs in production. Having two code paths that reference the same config key is
easy to get out of sync -- consider dropping the attrs default (make it
required like `valid_for`) and always passing it explicitly, or drop the
explicit kwarg in `_jwt_generator()` and let the default handle it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]