anishgirianish commented on code in PR #60108:
URL: https://github.com/apache/airflow/pull/60108#discussion_r2700795602
##########
airflow-core/src/airflow/api_fastapi/execution_api/app.py:
##########
@@ -296,25 +296,74 @@ class InProcessExecutionAPI:
@cached_property
def app(self):
if not self._app:
+ import os
+ from base64 import urlsafe_b64encode
+
from airflow.api_fastapi.common.dagbag import create_dag_bag
from airflow.api_fastapi.execution_api.app import
create_task_execution_api_app
from airflow.api_fastapi.execution_api.deps import (
JWTBearerDep,
JWTBearerTIPathDep,
)
from airflow.api_fastapi.execution_api.routes.connections import
has_connection_access
+ from airflow.api_fastapi.execution_api.routes.task_instances
import JWTBearerWorkloadDep
from airflow.api_fastapi.execution_api.routes.variables import
has_variable_access
from airflow.api_fastapi.execution_api.routes.xcoms import
has_xcom_access
+ from airflow.configuration import conf
+
+ # Ensure JWT secret is available for in-process execution.
+ # The /run endpoint needs JWTGenerator to issue execution tokens.
+ # If the config option is empty, generate a random one for the
duration of this process.
+ if not conf.get("api_auth", "jwt_secret", fallback=None):
+ logger.debug(
+ "`api_auth/jwt_secret` is not set, generating a temporary
one for in-process execution"
+ )
+ conf.set("api_auth", "jwt_secret",
urlsafe_b64encode(os.urandom(16)).decode())
self._app = create_task_execution_api_app()
# Set up dag_bag in app state for dependency injection
self._app.state.dag_bag = create_dag_bag()
+ self._app.state.jwt_generator = _jwt_generator()
+ self._app.state.jwt_validator = _jwt_validator()
+
+ # Why InProcessContainer instead of lifespan.registry or
svcs.Container?
+ #
+ # The normal app uses @svcs.fastapi.lifespan which manages the
registry lifecycle.
+ # In tests (conftest.py), lifespan.registry.register_value() works
because the
+ # TestClient initializes the lifespan before requests. However, in
InProcessExecutionAPI,
+ # the lifespan runs later (when transport is accessed), but
services may be needed
+ # before that. Using lifespan.registry fails in CI with
ServiceNotFoundError.
+ #
+ # This minimal container bypasses the svcs lifecycle and directly
returns pre-created
+ # service instances from app.state. If you add new services,
update this class.
+ from airflow.api_fastapi.execution_api.deps import _container
+
+ class InProcessContainer:
+ """Minimal container for in-process execution, bypassing svcs
lifecycle."""
Review Comment:
I'm still getting familiar with the codebase, but from what I understand, I
added this to fix ServiceNotFoundError failures in CI. With
InProcessExecutionAPI, the svcs lifespan runs later (when transport is
accessed), but services like JWTGenerator are needed before that. This
container bypasses the lifecycle and returns pre-created instances from
app.state. I may well be missing something - if there's a cleaner pattern you'd
recommend, I'd really appreciate the guidance!
##########
airflow-core/src/airflow/api_fastapi/execution_api/app.py:
##########
@@ -296,25 +296,74 @@ class InProcessExecutionAPI:
@cached_property
def app(self):
if not self._app:
+ import os
+ from base64 import urlsafe_b64encode
+
from airflow.api_fastapi.common.dagbag import create_dag_bag
from airflow.api_fastapi.execution_api.app import
create_task_execution_api_app
from airflow.api_fastapi.execution_api.deps import (
JWTBearerDep,
JWTBearerTIPathDep,
)
from airflow.api_fastapi.execution_api.routes.connections import
has_connection_access
+ from airflow.api_fastapi.execution_api.routes.task_instances
import JWTBearerWorkloadDep
from airflow.api_fastapi.execution_api.routes.variables import
has_variable_access
from airflow.api_fastapi.execution_api.routes.xcoms import
has_xcom_access
+ from airflow.configuration import conf
+
+ # Ensure JWT secret is available for in-process execution.
+ # The /run endpoint needs JWTGenerator to issue execution tokens.
+ # If the config option is empty, generate a random one for the
duration of this process.
+ if not conf.get("api_auth", "jwt_secret", fallback=None):
+ logger.debug(
+ "`api_auth/jwt_secret` is not set, generating a temporary
one for in-process execution"
+ )
+ conf.set("api_auth", "jwt_secret",
urlsafe_b64encode(os.urandom(16)).decode())
self._app = create_task_execution_api_app()
# Set up dag_bag in app state for dependency injection
self._app.state.dag_bag = create_dag_bag()
+ self._app.state.jwt_generator = _jwt_generator()
+ self._app.state.jwt_validator = _jwt_validator()
+
+ # Why InProcessContainer instead of lifespan.registry or
svcs.Container?
+ #
+ # The normal app uses @svcs.fastapi.lifespan which manages the
registry lifecycle.
+ # In tests (conftest.py), lifespan.registry.register_value() works
because the
+ # TestClient initializes the lifespan before requests. However, in
InProcessExecutionAPI,
+ # the lifespan runs later (when transport is accessed), but
services may be needed
+ # before that. Using lifespan.registry fails in CI with
ServiceNotFoundError.
+ #
+ # This minimal container bypasses the svcs lifecycle and directly
returns pre-created
+ # service instances from app.state. If you add new services,
update this class.
+ from airflow.api_fastapi.execution_api.deps import _container
+
+ class InProcessContainer:
+ """Minimal container for in-process execution, bypassing svcs
lifecycle."""
Review Comment:
I'm still getting familiar with the codebase, but from what I understand, I
added this to fix ServiceNotFoundError failures in CI. With
InProcessExecutionAPI, the svcs lifespan runs later (when transport is
accessed), but services like JWTGenerator are needed before that. This
container bypasses the lifecycle and returns pre-created instances from
app.state. I may well be missing something - if there's a cleaner pattern you'd
recommend, I'd really appreciate the guidance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]