anishgirianish commented on code in PR #60108:
URL: https://github.com/apache/airflow/pull/60108#discussion_r2700795602


##########
airflow-core/src/airflow/api_fastapi/execution_api/app.py:
##########
@@ -296,25 +296,74 @@ class InProcessExecutionAPI:
     @cached_property
     def app(self):
         if not self._app:
+            import os
+            from base64 import urlsafe_b64encode
+
             from airflow.api_fastapi.common.dagbag import create_dag_bag
             from airflow.api_fastapi.execution_api.app import 
create_task_execution_api_app
             from airflow.api_fastapi.execution_api.deps import (
                 JWTBearerDep,
                 JWTBearerTIPathDep,
             )
             from airflow.api_fastapi.execution_api.routes.connections import 
has_connection_access
+            from airflow.api_fastapi.execution_api.routes.task_instances 
import JWTBearerWorkloadDep
             from airflow.api_fastapi.execution_api.routes.variables import 
has_variable_access
             from airflow.api_fastapi.execution_api.routes.xcoms import 
has_xcom_access
+            from airflow.configuration import conf
+
+            # Ensure JWT secret is available for in-process execution.
+            # The /run endpoint needs JWTGenerator to issue execution tokens.
+            # If the config option is empty, generate a random one for the 
duration of this process.
+            if not conf.get("api_auth", "jwt_secret", fallback=None):
+                logger.debug(
+                    "`api_auth/jwt_secret` is not set, generating a temporary 
one for in-process execution"
+                )
+                conf.set("api_auth", "jwt_secret", 
urlsafe_b64encode(os.urandom(16)).decode())
 
             self._app = create_task_execution_api_app()
 
             # Set up dag_bag in app state for dependency injection
             self._app.state.dag_bag = create_dag_bag()
 
+            self._app.state.jwt_generator = _jwt_generator()
+            self._app.state.jwt_validator = _jwt_validator()
+
+            # Why InProcessContainer instead of lifespan.registry or 
svcs.Container?
+            #
+            # The normal app uses @svcs.fastapi.lifespan which manages the 
registry lifecycle.
+            # In tests (conftest.py), lifespan.registry.register_value() works 
because the
+            # TestClient initializes the lifespan before requests. However, in 
InProcessExecutionAPI,
+            # the lifespan runs later (when transport is accessed), but 
services may be needed
+            # before that. Using lifespan.registry fails in CI with 
ServiceNotFoundError.
+            #
+            # This minimal container bypasses the svcs lifecycle and directly 
returns pre-created
+            # service instances from app.state. If you add new services, 
update this class.
+            from airflow.api_fastapi.execution_api.deps import _container
+
+            class InProcessContainer:
+                """Minimal container for in-process execution, bypassing svcs 
lifecycle."""

Review Comment:
   I'm still getting familiar with the codebase, but from what I understand, I 
added this to fix ServiceNotFoundError failures in CI. With 
InProcessExecutionAPI, the svcs lifespan runs later (when transport is 
accessed), but services like JWTGenerator are needed before that. This 
container bypasses the lifecycle and returns pre-created instances from 
app.state. I may well be missing something - if there's a cleaner pattern you'd 
recommend, I'd really appreciate the guidance!



##########
airflow-core/src/airflow/api_fastapi/execution_api/app.py:
##########
@@ -296,25 +296,74 @@ class InProcessExecutionAPI:
     @cached_property
     def app(self):
         if not self._app:
+            import os
+            from base64 import urlsafe_b64encode
+
             from airflow.api_fastapi.common.dagbag import create_dag_bag
             from airflow.api_fastapi.execution_api.app import 
create_task_execution_api_app
             from airflow.api_fastapi.execution_api.deps import (
                 JWTBearerDep,
                 JWTBearerTIPathDep,
             )
             from airflow.api_fastapi.execution_api.routes.connections import 
has_connection_access
+            from airflow.api_fastapi.execution_api.routes.task_instances 
import JWTBearerWorkloadDep
             from airflow.api_fastapi.execution_api.routes.variables import 
has_variable_access
             from airflow.api_fastapi.execution_api.routes.xcoms import 
has_xcom_access
+            from airflow.configuration import conf
+
+            # Ensure JWT secret is available for in-process execution.
+            # The /run endpoint needs JWTGenerator to issue execution tokens.
+            # If the config option is empty, generate a random one for the 
duration of this process.
+            if not conf.get("api_auth", "jwt_secret", fallback=None):
+                logger.debug(
+                    "`api_auth/jwt_secret` is not set, generating a temporary 
one for in-process execution"
+                )
+                conf.set("api_auth", "jwt_secret", 
urlsafe_b64encode(os.urandom(16)).decode())
 
             self._app = create_task_execution_api_app()
 
             # Set up dag_bag in app state for dependency injection
             self._app.state.dag_bag = create_dag_bag()
 
+            self._app.state.jwt_generator = _jwt_generator()
+            self._app.state.jwt_validator = _jwt_validator()
+
+            # Why InProcessContainer instead of lifespan.registry or 
svcs.Container?
+            #
+            # The normal app uses @svcs.fastapi.lifespan which manages the 
registry lifecycle.
+            # In tests (conftest.py), lifespan.registry.register_value() works 
because the
+            # TestClient initializes the lifespan before requests. However, in 
InProcessExecutionAPI,
+            # the lifespan runs later (when transport is accessed), but 
services may be needed
+            # before that. Using lifespan.registry fails in CI with 
ServiceNotFoundError.
+            #
+            # This minimal container bypasses the svcs lifecycle and directly 
returns pre-created
+            # service instances from app.state. If you add new services, 
update this class.
+            from airflow.api_fastapi.execution_api.deps import _container
+
+            class InProcessContainer:
+                """Minimal container for in-process execution, bypassing svcs 
lifecycle."""

Review Comment:
   I'm still getting familiar with the codebase, but from what I understand, I 
added this to fix ServiceNotFoundError failures in CI. With 
InProcessExecutionAPI, the svcs lifespan runs later (when transport is 
accessed), but services like JWTGenerator are needed before that. This 
container bypasses the lifecycle and returns pre-created instances from 
app.state. I may well be missing something - if there's a cleaner pattern you'd 
recommend, I'd really appreciate the guidance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to