This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 1f1581d0e21 [v3-2-test] Fix N+1 query pattern in task instance states
and count endpoints (#60352) (#64695)
1f1581d0e21 is described below
commit 1f1581d0e21de2c8d924f6a5da911f5ff6c5ad85
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Apr 8 11:39:39 2026 +0200
[v3-2-test] Fix N+1 query pattern in task instance states and count
endpoints (#60352) (#64695)
* fix inefficient fetch all and filter
* add unittest case: map-index but no task-group
(cherry picked from commit f5ccebc8362e12e8283ea51d8fabbd7a5df9cf87)
Co-authored-by: Steve Ahn <[email protected]>
---
.../execution_api/routes/task_instances.py | 26 ++++++++++++++--------
.../versions/head/test_task_instances.py | 10 +++++++++
2 files changed, 27 insertions(+), 9 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 5f5073c916b..644b0f5fa2a 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -892,15 +892,13 @@ def get_task_instance_count(
query = query.where(TI.run_id.in_(run_ids))
if task_group_id:
- group_tasks = _get_group_tasks(dag_id, task_group_id, session,
dag_bag, logical_dates, run_ids)
+ group_tasks = _get_group_tasks(
+ dag_id, task_group_id, session, dag_bag, logical_dates, run_ids,
map_index
+ )
# Get unique (task_id, map_index) pairs
-
task_map_pairs = [(ti.task_id, ti.map_index) for ti in group_tasks]
- if map_index is not None:
- task_map_pairs = [(ti.task_id, ti.map_index) for ti in group_tasks
if ti.map_index == map_index]
-
if not task_map_pairs:
# If no task group tasks found, default to checking the task group
ID itself
# This matches the behavior in _get_external_task_group_task_ids
@@ -1000,15 +998,18 @@ def get_task_instance_states(
if run_ids:
query = query.where(TI.run_id.in_(run_ids))
+ if map_index is not None:
+ query = query.where(TI.map_index == map_index)
+
results = session.scalars(query).all()
if task_group_id:
- group_tasks = _get_group_tasks(dag_id, task_group_id, session,
dag_bag, logical_dates, run_ids)
+ group_tasks = _get_group_tasks(
+ dag_id, task_group_id, session, dag_bag, logical_dates, run_ids,
map_index
+ )
results = results + group_tasks if task_ids else group_tasks
- if map_index is not None:
- results = [task for task in results if task.map_index == map_index]
[
run_id_task_state_map[task.run_id].update(
{task.task_id: task.state}
@@ -1049,7 +1050,13 @@ def _is_eligible_to_retry(state: str, try_number: int,
max_tries: int) -> bool:
def _get_group_tasks(
- dag_id: str, task_group_id: str, session: SessionDep, dag_bag: DagBagDep,
logical_dates=None, run_ids=None
+ dag_id: str,
+ task_group_id: str,
+ session: SessionDep,
+ dag_bag: DagBagDep,
+ logical_dates=None,
+ run_ids=None,
+ map_index: int | None = None,
):
# Get all tasks in the task group
dag = get_latest_version_of_dag(dag_bag, dag_id, session,
include_reason=True)
@@ -1070,6 +1077,7 @@ def _get_group_tasks(
TI.task_id.in_(task.task_id for task in task_group.iter_tasks()),
*([TI.logical_date.in_(logical_dates)] if logical_dates else []),
*([TI.run_id.in_(run_ids)] if run_ids else []),
+ *([TI.map_index == map_index] if map_index is not None else []),
)
).all()
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 7f766ede71e..1ab690239fe 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -2336,6 +2336,7 @@ class TestGetCount:
("map_index", "dynamic_task_args", "task_ids", "task_group_name",
"expected_count"),
(
pytest.param(None, [1, 2, 3], None, None, 5,
id="use-default-map-index-None"),
+ pytest.param(0, [1, 2, 3], None, None, 1,
id="with-map-index-0-no-task-group"),
pytest.param(-1, [1, 2, 3], ["task1"], None, 1,
id="with-task-ids-and-map-index-(-1)"),
pytest.param(None, [1, 2, 3], None, "group1", 4,
id="with-task-group-id-and-map-index-None"),
pytest.param(0, [1, 2, 3], None, "group1", 1,
id="with-task-group-id-and-map-index-0"),
@@ -2853,6 +2854,15 @@ class TestGetTaskStates:
},
id="with-default-map-index-None",
),
+ pytest.param(
+ 0,
+ [1, 2, 3],
+ None,
+ None,
+ {"-1": State.SUCCESS, "0": State.FAILED, "1": State.SUCCESS,
"2": State.SUCCESS},
+ {"group1.add_one_0": "failed"},
+ id="with-map-index-0-no-task-group",
+ ),
pytest.param(
-1,
[1, 2, 3],