This is an automated email from the ASF dual-hosted git repository.
rahulvats pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 2c6ed8eaaeb Prevent Session leak from StreamingResponse API endpoints.
(#65162) (#65193)
2c6ed8eaaeb is described below
commit 2c6ed8eaaeb4e0a5a9aee7aa9668e052566fa2b2
Author: Rahul Vats <[email protected]>
AuthorDate: Tue Apr 14 13:17:10 2026 +0530
Prevent Session leak from StreamingResponse API endpoints. (#65162) (#65193)
The comment in the test sums up what was going on, but essentially this
result
in `session.close` being called when the endpoint returned, but before the
response was generated.
Then SQLA was 'helpful' and re-opened it. However since the
teardown/post-yield code has already run, this new session is never closed.
This results in a connection open that leaks untli Python's full GC runs
(i.e.
`gc.collect()`) -- it going out of scope doesn't delete things as the
Session and the RootTransaction form a cycle.
(cherry picked from commit 5559365a4b016817fad3d16ec710b716f1ddd945)
Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
.../airflow/api_fastapi/core_api/routes/ui/grid.py | 6 +-
.../tests/unit/api_fastapi/core_api/test_app.py | 77 ++++++++++++++++++++++
2 files changed, 80 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index 0143ae81e14..bea583d5101 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -25,10 +25,10 @@ import structlog
from fastapi import Depends, HTTPException, Query, status
from fastapi.responses import StreamingResponse
from sqlalchemy import exists, select
-from sqlalchemy.orm import joinedload, load_only, selectinload
+from sqlalchemy.orm import Session, joinedload, load_only, selectinload
from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
-from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.db.common import SessionDep, _get_session,
paginated_select
from airflow.api_fastapi.common.parameters import (
QueryDagRunRunTypesFilter,
QueryDagRunStateFilter,
@@ -426,7 +426,7 @@ def _build_ti_summaries(
)
def get_grid_ti_summaries_stream(
dag_id: str,
- session: SessionDep,
+ session: Annotated[Session, Depends(_get_session)],
run_ids: Annotated[list[str] | None, Query()] = None,
) -> StreamingResponse:
"""
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
b/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
index 4881976a673..75e44245b39 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
@@ -16,13 +16,90 @@
# under the License.
from __future__ import annotations
+import inspect
+import typing
+
import pytest
+from fastapi.params import Depends as DependsClass
+from fastapi.responses import StreamingResponse
+from starlette.routing import Mount
+
+from airflow.api_fastapi.app import create_app
from tests_common.test_utils.db import clear_db_jobs
pytestmark = pytest.mark.db_test
+def _get_all_api_routes(app):
+ """Recursively yield all APIRoutes from the app and its mounted
sub-apps."""
+ for route in getattr(app, "routes", []):
+ if isinstance(route, Mount) and hasattr(route, "app"):
+ yield from _get_all_api_routes(route.app)
+ if hasattr(route, "endpoint"):
+ yield route
+
+
+class TestStreamingEndpointSessionScope:
+ def test_no_streaming_endpoint_uses_function_scoped_depends(self):
+ """Streaming endpoints must not use function-scoped generator
dependencies.
+
+ FastAPI's ``function_stack`` (used for ``scope="function"``
dependencies)
+ is torn down after the route handler returns but *before* the response
body
+ is sent. For ``StreamingResponse`` endpoints the response body is
produced
+ by a generator that runs during sending, so any generator dependency
with
+ ``scope="function"`` will have its cleanup run before the generator
+ executes. This causes the generator to silently reopen the session via
+ autobegin, and the resulting connection is never returned to the pool.
+ """
+ # These endpoints mention StreamingResponse but only use the session
+ # *before* streaming begins — the generator does not capture it.
+ # Function scope is correct for them: close the session early rather
+ # than hold it open for the entire (potentially long) stream.
+ allowed = {
+ "airflow.api_fastapi.core_api.routes.public.log.get_log",
+
"airflow.api_fastapi.core_api.routes.public.dag_run.wait_dag_run_until_finished",
+ }
+
+ app = create_app()
+ violations = []
+ for route in _get_all_api_routes(app):
+ try:
+ hints = typing.get_type_hints(route.endpoint,
include_extras=True)
+ except Exception:
+ continue
+ returns_streaming = hints.get("return") is StreamingResponse
+ if not returns_streaming:
+ try:
+ returns_streaming = "StreamingResponse" in
inspect.getsource(route.endpoint)
+ except (OSError, TypeError):
+ pass
+ if not returns_streaming:
+ continue
+ fqn = f"{route.endpoint.__module__}.{route.endpoint.__qualname__}"
+ if fqn in allowed:
+ continue
+ for param_name, hint in hints.items():
+ if param_name == "return":
+ continue
+ if typing.get_origin(hint) is not typing.Annotated:
+ continue
+ for metadata in typing.get_args(hint)[1:]:
+ if isinstance(metadata, DependsClass) and metadata.scope
== "function":
+ violations.append(
+
f"{route.endpoint.__module__}.{route.endpoint.__qualname__}"
+ f" parameter '{param_name}'"
+ )
+
+ assert not violations, (
+ "Streaming endpoints must not use function-scoped dependencies
like "
+ "SessionDep. Use Annotated[Session, Depends(_get_session)]
(default "
+ "request scope) instead — function-scoped cleanup runs before the "
+ "response body is streamed, leaking database connections.\n"
+ + "\n".join(f" - {v}" for v in violations)
+ )
+
+
class TestGzipMiddleware:
@pytest.fixture(autouse=True)
def setup(self):