ashb commented on code in PR #60108:
URL: https://github.com/apache/airflow/pull/60108#discussion_r2675871772


##########
airflow-core/src/airflow/api_fastapi/execution_api/deps.py:
##########
@@ -87,13 +87,70 @@ async def __call__(  # type: ignore[override]
             else:
                 validators = self.required_claims
             claims = await validator.avalidated_claims(creds.credentials, 
validators)
+
+            # Reject queue-scoped tokens - they can only be used on /run 
endpoint
+            # Only check if scope claim is present (allows backwards 
compatibility with tests)

Review Comment:
   If this back compat is just for tests then we don't need it.



##########
airflow-core/src/airflow/api_fastapi/auth/tokens.py:
##########
@@ -458,6 +461,39 @@ def generate(self, extras: dict[str, Any] | None = None, 
headers: dict[str, Any]
             headers["kid"] = self.kid
         return jwt.encode(claims, self.signing_arg, algorithm=self.algorithm, 
headers=headers)
 
+    def generate_queue_token(self, sub: str) -> str:
+        """
+        Generate a long-lived queue token for task workloads.
+
+        Queue tokens have a special 'scope' claim that restricts them to the 
/run endpoint only.
+        They are valid for longer (default 24h) to survive queue wait times.
+        """
+        from airflow.configuration import conf
+
+        queue_expiry = conf.getint("execution_api", 
"jwt_queue_token_expiration_time", fallback=86400)
+        now = int(datetime.now(tz=timezone.utc).timestamp())
+
+        claims = {
+            "jti": uuid.uuid4().hex,
+            "iss": self.issuer,
+            "aud": self.audience,
+            "nbf": now,
+            "exp": now + queue_expiry,
+            "iat": now,
+            "sub": sub,
+            "scope": TOKEN_SCOPE_QUEUE,
+        }
+
+        if claims["iss"] is None:
+            del claims["iss"]
+        if claims["aud"] is None:
+            del claims["aud"]
+
+        headers = {"alg": self.algorithm}
+        if self._private_key:
+            headers["kid"] = self.kid
+        return jwt.encode(claims, self.signing_arg, algorithm=self.algorithm, 
headers=headers)

Review Comment:
   I don't think we need a new whole function for this -- the existing 
`generate()` could work already by doing:
   
   ```python
   generator.generate({"sub": sub, "exp": now + queue_expiry})
   ```
   
   If you think it's worth "packaging" that up, then make it call self.generate 
-- don't essentially duplicate it.



##########
airflow-core/src/airflow/api_fastapi/execution_api/app.py:
##########
@@ -296,15 +296,24 @@ class InProcessExecutionAPI:
     @cached_property
     def app(self):
         if not self._app:
+            from unittest.mock import AsyncMock, MagicMock
+
+            from airflow.api_fastapi.auth.tokens import JWTValidator
             from airflow.api_fastapi.common.dagbag import create_dag_bag
-            from airflow.api_fastapi.execution_api.app import 
create_task_execution_api_app
             from airflow.api_fastapi.execution_api.deps import (
+                DepContainer,
                 JWTBearerDep,
+                JWTBearerQueueDep,
                 JWTBearerTIPathDep,
             )
             from airflow.api_fastapi.execution_api.routes.connections import 
has_connection_access
             from airflow.api_fastapi.execution_api.routes.variables import 
has_variable_access
             from airflow.api_fastapi.execution_api.routes.xcoms import 
has_xcom_access
+            from airflow.configuration import conf
+
+            # Set a dummy JWT secret so the lifespan can create JWT services 
without failing.
+            if not conf.get("api_auth", "jwt_secret", fallback=None):
+                conf.set("api_auth", "jwt_secret", 
"in-process-test-secret-key")

Review Comment:
   Never ever ever do this in production/runtime code. The risk of it being 
picked up and every install in the world having a signing key of 
"in-process-test-secret-key" is too large.
   
   I knot this is just the InProcess class, but I'm still worried about doing 
this, doubly so as config is process global.



##########
airflow-core/src/airflow/api_fastapi/execution_api/app.py:
##########
@@ -315,28 +324,57 @@ async def always_allow(): ...
 
             self._app.dependency_overrides[JWTBearerDep.dependency] = 
always_allow
             self._app.dependency_overrides[JWTBearerTIPathDep.dependency] = 
always_allow
+            self._app.dependency_overrides[JWTBearerQueueDep.dependency] = 
always_allow
             self._app.dependency_overrides[has_connection_access] = 
always_allow
             self._app.dependency_overrides[has_variable_access] = always_allow
             self._app.dependency_overrides[has_xcom_access] = always_allow
 
+            # Create a mock container that provides mock JWT services
+            mock_jwt_generator = MagicMock(spec=JWTGenerator)
+            mock_jwt_generator.generate.return_value = "mock-execution-token"
+
+            mock_jwt_validator = AsyncMock(spec=JWTValidator)
+            mock_jwt_validator.avalidated_claims.return_value = {"sub": 
"test", "exp": 9999999999}
+
+            class MockContainer:
+                """A mock svcs container that returns mock services."""
+
+                async def aget(self, svc_type):
+                    if svc_type is JWTGenerator:
+                        return mock_jwt_generator
+                    if svc_type is JWTValidator:
+                        return mock_jwt_validator
+                    raise ValueError(f"Unknown service type: {svc_type}")
+
+            async def mock_container_dep():
+                return MockContainer()
+
+            self._app.dependency_overrides[DepContainer.dependency] = 
mock_container_dep

Review Comment:
   Why do we need a _mock_ svcs container? Why not just use a real svcs 
container?



##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -1822,6 +1822,17 @@ execution_api:
       type: integer
       example: ~
       default: "600"
+    jwt_queue_token_expiration_time:
+      description: |
+        Number in seconds until the queue JWT token expires. Queue tokens are 
long-lived tokens
+        sent with task workloads to executors (e.g., Celery). They can only be 
used to call
+        the /run endpoint, which then issues a short-lived execution token.
+
+        This should be set long enough to cover the maximum expected queue 
wait time.
+      version_added: 3.1.0

Review Comment:
   ```suggestion
         version_added: 3.1.7
   ```



##########
airflow-core/src/airflow/api_fastapi/execution_api/deps.py:
##########


Review Comment:
   I'd rather we wrote the JWTBearer and the JWTBearerQueueScope queue in a 
layered approach (deps can depend on each other, or subclassing) - that way we 
only have to writ much of the validation once and only tweak the behaviour.
   
   I think the layer approach would be best, so we have a base JWTBearer dep 
that does the basic validation, but nothing of the presence/absence of the 
queue scope, and then two deps that consume that returned TIToken to do the 
next layer.
   



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -100,12 +100,15 @@
         HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload for 
the state transition"},
     },
     response_model_exclude_unset=True,
+    dependencies=[JWTBearerQueueDep],

Review Comment:
   Since this is the one and only place we use this dep (and it's also the only 
place we ever want to use this dep) I think it would be better if we moved 
JWTBearerQueueDep in to this file.



##########
airflow-core/src/airflow/api_fastapi/execution_api/app.py:
##########
@@ -315,28 +324,57 @@ async def always_allow(): ...
 
             self._app.dependency_overrides[JWTBearerDep.dependency] = 
always_allow
             self._app.dependency_overrides[JWTBearerTIPathDep.dependency] = 
always_allow
+            self._app.dependency_overrides[JWTBearerQueueDep.dependency] = 
always_allow
             self._app.dependency_overrides[has_connection_access] = 
always_allow
             self._app.dependency_overrides[has_variable_access] = always_allow
             self._app.dependency_overrides[has_xcom_access] = always_allow
 
+            # Create a mock container that provides mock JWT services
+            mock_jwt_generator = MagicMock(spec=JWTGenerator)
+            mock_jwt_generator.generate.return_value = "mock-execution-token"
+
+            mock_jwt_validator = AsyncMock(spec=JWTValidator)
+            mock_jwt_validator.avalidated_claims.return_value = {"sub": 
"test", "exp": 9999999999}
+
+            class MockContainer:
+                """A mock svcs container that returns mock services."""
+
+                async def aget(self, svc_type):
+                    if svc_type is JWTGenerator:
+                        return mock_jwt_generator
+                    if svc_type is JWTValidator:
+                        return mock_jwt_validator
+                    raise ValueError(f"Unknown service type: {svc_type}")
+
+            async def mock_container_dep():
+                return MockContainer()
+
+            self._app.dependency_overrides[DepContainer.dependency] = 
mock_container_dep

Review Comment:
   What is this doing here? This looks like test-only code. 



##########
airflow-core/src/airflow/api_fastapi/execution_api/app.py:
##########
@@ -315,28 +324,57 @@ async def always_allow(): ...
 
             self._app.dependency_overrides[JWTBearerDep.dependency] = 
always_allow
             self._app.dependency_overrides[JWTBearerTIPathDep.dependency] = 
always_allow
+            self._app.dependency_overrides[JWTBearerQueueDep.dependency] = 
always_allow
             self._app.dependency_overrides[has_connection_access] = 
always_allow
             self._app.dependency_overrides[has_variable_access] = always_allow
             self._app.dependency_overrides[has_xcom_access] = always_allow
 
+            # Create a mock container that provides mock JWT services
+            mock_jwt_generator = MagicMock(spec=JWTGenerator)
+            mock_jwt_generator.generate.return_value = "mock-execution-token"
+
+            mock_jwt_validator = AsyncMock(spec=JWTValidator)
+            mock_jwt_validator.avalidated_claims.return_value = {"sub": 
"test", "exp": 9999999999}
+
+            class MockContainer:
+                """A mock svcs container that returns mock services."""
+
+                async def aget(self, svc_type):
+                    if svc_type is JWTGenerator:
+                        return mock_jwt_generator
+                    if svc_type is JWTValidator:
+                        return mock_jwt_validator
+                    raise ValueError(f"Unknown service type: {svc_type}")
+
+            async def mock_container_dep():
+                return MockContainer()
+
+            self._app.dependency_overrides[DepContainer.dependency] = 
mock_container_dep
+
         return self._app
 
     @cached_property
     def transport(self) -> httpx.WSGITransport:
         import asyncio
+        import threading
 
         import httpx
         from a2wsgi import ASGIMiddleware
 
         middleware = ASGIMiddleware(self.app)
+        lifespan_started = threading.Event()
 
         # https://github.com/abersheeran/a2wsgi/discussions/64
         async def start_lifespan(cm: AsyncExitStack, app: FastAPI):
             await cm.enter_async_context(app.router.lifespan_context(app))
+            lifespan_started.set()
 
         self._cm = AsyncExitStack()
 
         asyncio.run_coroutine_threadsafe(start_lifespan(self._cm, self.app), 
middleware.loop)
+        # Wait for lifespan to complete before returning the transport
+        lifespan_started.wait(timeout=5.0)

Review Comment:
   Why do we need this?



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/__init__.py:
##########


Review Comment:
   Any reason we needed to change this order?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to