This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 5adf257d2d25cc232aa4b8e6ee5c9de06a158f7b Author: Kaxil Naik <[email protected]> AuthorDate: Thu Jan 22 22:02:58 2026 +0000 Use bulk DELETE for XComModel.clear() instead of loading records (#60955) The clear() method was loading all matching XCom records into memory and deleting them one by one. For tasks with many XCom entries, this caused unnecessary memory usage and N+1 DELETE statements. This change uses a single bulk DELETE statement which is both more memory-efficient and faster. (cherry picked from commit b651403e7a6ab1b6637ff90152618254f0806538) --- airflow-core/src/airflow/models/xcom.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/models/xcom.py b/airflow-core/src/airflow/models/xcom.py index 0d3236eb408..847603d6c3f 100644 --- a/airflow-core/src/airflow/models/xcom.py +++ b/airflow-core/src/airflow/models/xcom.py @@ -144,14 +144,12 @@ class XComModel(TaskInstanceDependencies): if not run_id: raise ValueError(f"run_id must be passed. Passed run_id={run_id}") - query = select(cls).where(cls.dag_id == dag_id, cls.task_id == task_id, cls.run_id == run_id) + # Use bulk delete for efficiency instead of loading and deleting one by one + delete_stmt = delete(cls).where(cls.dag_id == dag_id, cls.task_id == task_id, cls.run_id == run_id) if map_index is not None: - query = query.where(cls.map_index == map_index) - - for xcom in session.scalars(query): - # print(f"Clearing XCOM {xcom} with value {xcom.value}") - session.delete(xcom) + delete_stmt = delete_stmt.where(cls.map_index == map_index) + session.execute(delete_stmt) session.commit() @classmethod
