This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9813bf5cb11 Prevent Session from staying opened between yields (#65179)
9813bf5cb11 is described below

commit 9813bf5cb1109a8e31461d8b2a087ac86c6ee9f8
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Tue Apr 14 09:39:36 2026 +0200

    Prevent Session from staying opened between yields (#65179)
---
 .../airflow/api_fastapi/core_api/routes/ui/grid.py | 52 ++++++++++++----------
 .../tests/unit/api_fastapi/core_api/test_app.py    | 10 +++--
 2 files changed, 36 insertions(+), 26 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index 2316282e0ea..2708cbc5325 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -27,7 +27,7 @@ from sqlalchemy import exists, select
 from sqlalchemy.orm import Session, joinedload, load_only
 
 from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
-from airflow.api_fastapi.common.db.common import SessionDep, _get_session, 
paginated_select
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
 from airflow.api_fastapi.common.db.dag_runs import attach_dag_versions_to_runs
 from airflow.api_fastapi.common.parameters import (
     QueryDagRunRunTypesFilter,
@@ -66,6 +66,7 @@ from airflow.models.dagrun import DagRun
 from airflow.models.deadline import Deadline
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance
+from airflow.utils.session import create_session
 
 log = structlog.get_logger(logger_name=__name__)
 grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
@@ -453,7 +454,6 @@ def _build_ti_summaries(
 )
 def get_grid_ti_summaries_stream(
     dag_id: str,
-    session: Annotated[Session, Depends(_get_session)],
     run_ids: Annotated[list[str] | None, Query()] = None,
 ) -> StreamingResponse:
     """
@@ -468,30 +468,36 @@ def get_grid_ti_summaries_stream(
     """
 
     def _generate() -> Generator[str, None, None]:
+        # Each iteration opens and closes its own DB session so the connection 
is
+        # released between yields.  This prevents a slow client from holding a
+        # database connection open for the entire stream duration.
+        # See https://github.com/apache/airflow/issues/65010.
+
         serdag_cache: dict[Any, SerializedDagModel | None] = {}
         for run_id in run_ids or []:
-            tis = session.execute(
-                select(
-                    TaskInstance.task_id,
-                    TaskInstance.state,
-                    TaskInstance.dag_version_id,
-                    TaskInstance.start_date,
-                    TaskInstance.end_date,
-                    DagVersion.version_number,
+            with create_session(scoped=False) as session:
+                tis = session.execute(
+                    select(
+                        TaskInstance.task_id,
+                        TaskInstance.state,
+                        TaskInstance.dag_version_id,
+                        TaskInstance.start_date,
+                        TaskInstance.end_date,
+                        DagVersion.version_number,
+                    )
+                    .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
+                    .where(TaskInstance.dag_id == dag_id)
+                    .where(TaskInstance.run_id == run_id)
+                    .order_by(TaskInstance.task_id)
+                    .execution_options(yield_per=1000)
+                )
+                summary = _build_ti_summaries(
+                    dag_id,
+                    run_id,
+                    tis,
+                    session,
+                    serdag_cache=serdag_cache,
                 )
-                .outerjoin(DagVersion, TaskInstance.dag_version_id == 
DagVersion.id)
-                .where(TaskInstance.dag_id == dag_id)
-                .where(TaskInstance.run_id == run_id)
-                .order_by(TaskInstance.task_id)
-                .execution_options(yield_per=1000)
-            )
-            summary = _build_ti_summaries(
-                dag_id,
-                run_id,
-                tis,
-                session,
-                serdag_cache=serdag_cache,
-            )
             if summary is None:
                 continue
             yield GridTISummaries.model_validate(summary).model_dump_json() + 
"\n"
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_app.py 
b/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
index 75e44245b39..a30c803f50c 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
@@ -93,9 +93,13 @@ class TestStreamingEndpointSessionScope:
 
         assert not violations, (
             "Streaming endpoints must not use function-scoped dependencies 
like "
-            "SessionDep.  Use Annotated[Session, Depends(_get_session)] 
(default "
-            "request scope) instead — function-scoped cleanup runs before the "
-            "response body is streamed, leaking database connections.\n"
+            "SessionDep — function-scoped cleanup runs before the response 
body "
+            "is streamed, leaking database connections.\n"
+            "Do NOT use Annotated[Session, Depends(_get_session)] or other 
session dependencies "
+            "either, as this holds the DB connection open for the entire 
stream "
+            "duration.\n"
+            "Instead, use create_session() inside the generator to open/close 
a "
+            "connection for each iteration, releasing it between yields.\n"
             + "\n".join(f"  - {v}" for v in violations)
         )
 

Reply via email to