This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch backport-65162 in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 3ac48ee9991e5594fdb0a898b61faa80168341f5 Author: Ash Berlin-Taylor <[email protected]> AuthorDate: Mon Apr 13 19:45:23 2026 +0100 Prevent Session leak from StreamingResponse API endpoints. (#65162) The comment in the test sums up what was going on, but essentially this result in `session.close` being called when the endpoint returned, but before the response was generated. Then SQLA was 'helpful' and re-opened it. However since the teardown/post-yield code has already run, this new session is never closed. This results in a connection open that leaks untli Python's full GC runs (i.e. `gc.collect()`) -- it going out of scope doesn't delete things as the Session and the RootTransaction form a cycle. (cherry picked from commit 5559365a4b016817fad3d16ec710b716f1ddd945) --- .../airflow/api_fastapi/core_api/routes/ui/grid.py | 6 +- .../tests/unit/api_fastapi/core_api/test_app.py | 77 ++++++++++++++++++++++ 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py index 0143ae81e14..bea583d5101 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py @@ -25,10 +25,10 @@ import structlog from fastapi import Depends, HTTPException, Query, status from fastapi.responses import StreamingResponse from sqlalchemy import exists, select -from sqlalchemy.orm import joinedload, load_only, selectinload +from sqlalchemy.orm import Session, joinedload, load_only, selectinload from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity -from airflow.api_fastapi.common.db.common import SessionDep, paginated_select +from airflow.api_fastapi.common.db.common import SessionDep, _get_session, paginated_select from airflow.api_fastapi.common.parameters import ( QueryDagRunRunTypesFilter, QueryDagRunStateFilter, @@ -426,7 +426,7 @@ def _build_ti_summaries( ) def get_grid_ti_summaries_stream( dag_id: str, - session: SessionDep, + session: Annotated[Session, Depends(_get_session)], run_ids: Annotated[list[str] | None, Query()] = None, ) -> StreamingResponse: """ diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_app.py b/airflow-core/tests/unit/api_fastapi/core_api/test_app.py index 4881976a673..75e44245b39 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/test_app.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/test_app.py @@ -16,13 +16,90 @@ # under the License. from __future__ import annotations +import inspect +import typing + import pytest +from fastapi.params import Depends as DependsClass +from fastapi.responses import StreamingResponse +from starlette.routing import Mount + +from airflow.api_fastapi.app import create_app from tests_common.test_utils.db import clear_db_jobs pytestmark = pytest.mark.db_test +def _get_all_api_routes(app): + """Recursively yield all APIRoutes from the app and its mounted sub-apps.""" + for route in getattr(app, "routes", []): + if isinstance(route, Mount) and hasattr(route, "app"): + yield from _get_all_api_routes(route.app) + if hasattr(route, "endpoint"): + yield route + + +class TestStreamingEndpointSessionScope: + def test_no_streaming_endpoint_uses_function_scoped_depends(self): + """Streaming endpoints must not use function-scoped generator dependencies. + + FastAPI's ``function_stack`` (used for ``scope="function"`` dependencies) + is torn down after the route handler returns but *before* the response body + is sent. For ``StreamingResponse`` endpoints the response body is produced + by a generator that runs during sending, so any generator dependency with + ``scope="function"`` will have its cleanup run before the generator + executes. This causes the generator to silently reopen the session via + autobegin, and the resulting connection is never returned to the pool. + """ + # These endpoints mention StreamingResponse but only use the session + # *before* streaming begins — the generator does not capture it. + # Function scope is correct for them: close the session early rather + # than hold it open for the entire (potentially long) stream. + allowed = { + "airflow.api_fastapi.core_api.routes.public.log.get_log", + "airflow.api_fastapi.core_api.routes.public.dag_run.wait_dag_run_until_finished", + } + + app = create_app() + violations = [] + for route in _get_all_api_routes(app): + try: + hints = typing.get_type_hints(route.endpoint, include_extras=True) + except Exception: + continue + returns_streaming = hints.get("return") is StreamingResponse + if not returns_streaming: + try: + returns_streaming = "StreamingResponse" in inspect.getsource(route.endpoint) + except (OSError, TypeError): + pass + if not returns_streaming: + continue + fqn = f"{route.endpoint.__module__}.{route.endpoint.__qualname__}" + if fqn in allowed: + continue + for param_name, hint in hints.items(): + if param_name == "return": + continue + if typing.get_origin(hint) is not typing.Annotated: + continue + for metadata in typing.get_args(hint)[1:]: + if isinstance(metadata, DependsClass) and metadata.scope == "function": + violations.append( + f"{route.endpoint.__module__}.{route.endpoint.__qualname__}" + f" parameter '{param_name}'" + ) + + assert not violations, ( + "Streaming endpoints must not use function-scoped dependencies like " + "SessionDep. Use Annotated[Session, Depends(_get_session)] (default " + "request scope) instead — function-scoped cleanup runs before the " + "response body is streamed, leaking database connections.\n" + + "\n".join(f" - {v}" for v in violations) + ) + + class TestGzipMiddleware: @pytest.fixture(autouse=True) def setup(self):
