This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9813bf5cb11 Prevent Session from staying opened between yields (#65179)
9813bf5cb11 is described below
commit 9813bf5cb1109a8e31461d8b2a087ac86c6ee9f8
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Tue Apr 14 09:39:36 2026 +0200
Prevent Session from staying opened between yields (#65179)
---
.../airflow/api_fastapi/core_api/routes/ui/grid.py | 52 ++++++++++++----------
.../tests/unit/api_fastapi/core_api/test_app.py | 10 +++--
2 files changed, 36 insertions(+), 26 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
index 2316282e0ea..2708cbc5325 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -27,7 +27,7 @@ from sqlalchemy import exists, select
from sqlalchemy.orm import Session, joinedload, load_only
from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
-from airflow.api_fastapi.common.db.common import SessionDep, _get_session,
paginated_select
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
from airflow.api_fastapi.common.db.dag_runs import attach_dag_versions_to_runs
from airflow.api_fastapi.common.parameters import (
QueryDagRunRunTypesFilter,
@@ -66,6 +66,7 @@ from airflow.models.dagrun import DagRun
from airflow.models.deadline import Deadline
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance
+from airflow.utils.session import create_session
log = structlog.get_logger(logger_name=__name__)
grid_router = AirflowRouter(prefix="/grid", tags=["Grid"])
@@ -453,7 +454,6 @@ def _build_ti_summaries(
)
def get_grid_ti_summaries_stream(
dag_id: str,
- session: Annotated[Session, Depends(_get_session)],
run_ids: Annotated[list[str] | None, Query()] = None,
) -> StreamingResponse:
"""
@@ -468,30 +468,36 @@ def get_grid_ti_summaries_stream(
"""
def _generate() -> Generator[str, None, None]:
+ # Each iteration opens and closes its own DB session so the connection
is
+ # released between yields. This prevents a slow client from holding a
+ # database connection open for the entire stream duration.
+ # See https://github.com/apache/airflow/issues/65010.
+
serdag_cache: dict[Any, SerializedDagModel | None] = {}
for run_id in run_ids or []:
- tis = session.execute(
- select(
- TaskInstance.task_id,
- TaskInstance.state,
- TaskInstance.dag_version_id,
- TaskInstance.start_date,
- TaskInstance.end_date,
- DagVersion.version_number,
+ with create_session(scoped=False) as session:
+ tis = session.execute(
+ select(
+ TaskInstance.task_id,
+ TaskInstance.state,
+ TaskInstance.dag_version_id,
+ TaskInstance.start_date,
+ TaskInstance.end_date,
+ DagVersion.version_number,
+ )
+ .outerjoin(DagVersion, TaskInstance.dag_version_id ==
DagVersion.id)
+ .where(TaskInstance.dag_id == dag_id)
+ .where(TaskInstance.run_id == run_id)
+ .order_by(TaskInstance.task_id)
+ .execution_options(yield_per=1000)
+ )
+ summary = _build_ti_summaries(
+ dag_id,
+ run_id,
+ tis,
+ session,
+ serdag_cache=serdag_cache,
)
- .outerjoin(DagVersion, TaskInstance.dag_version_id ==
DagVersion.id)
- .where(TaskInstance.dag_id == dag_id)
- .where(TaskInstance.run_id == run_id)
- .order_by(TaskInstance.task_id)
- .execution_options(yield_per=1000)
- )
- summary = _build_ti_summaries(
- dag_id,
- run_id,
- tis,
- session,
- serdag_cache=serdag_cache,
- )
if summary is None:
continue
yield GridTISummaries.model_validate(summary).model_dump_json() +
"\n"
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
b/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
index 75e44245b39..a30c803f50c 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/test_app.py
@@ -93,9 +93,13 @@ class TestStreamingEndpointSessionScope:
assert not violations, (
"Streaming endpoints must not use function-scoped dependencies
like "
- "SessionDep. Use Annotated[Session, Depends(_get_session)]
(default "
- "request scope) instead — function-scoped cleanup runs before the "
- "response body is streamed, leaking database connections.\n"
+ "SessionDep — function-scoped cleanup runs before the response
body "
+ "is streamed, leaking database connections.\n"
+ "Do NOT use Annotated[Session, Depends(_get_session)] or other
session dependencies "
+ "either, as this holds the DB connection open for the entire
stream "
+ "duration.\n"
+ "Instead, use create_session() inside the generator to open/close
a "
+ "connection for each iteration, releasing it between yields.\n"
+ "\n".join(f" - {v}" for v in violations)
)