This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 5adf257d2d25cc232aa4b8e6ee5c9de06a158f7b
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Jan 22 22:02:58 2026 +0000

    Use bulk DELETE for XComModel.clear() instead of loading records (#60955)
    
    The clear() method was loading all matching XCom records into memory
    and deleting them one by one. For tasks with many XCom entries, this
    caused unnecessary memory usage and N+1 DELETE statements.
    
    This change uses a single bulk DELETE statement which is both more
    memory-efficient and faster.
    
    (cherry picked from commit b651403e7a6ab1b6637ff90152618254f0806538)
---
 airflow-core/src/airflow/models/xcom.py | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/models/xcom.py 
b/airflow-core/src/airflow/models/xcom.py
index 0d3236eb408..847603d6c3f 100644
--- a/airflow-core/src/airflow/models/xcom.py
+++ b/airflow-core/src/airflow/models/xcom.py
@@ -144,14 +144,12 @@ class XComModel(TaskInstanceDependencies):
         if not run_id:
             raise ValueError(f"run_id must be passed. Passed run_id={run_id}")
 
-        query = select(cls).where(cls.dag_id == dag_id, cls.task_id == 
task_id, cls.run_id == run_id)
+        # Use bulk delete for efficiency instead of loading and deleting one 
by one
+        delete_stmt = delete(cls).where(cls.dag_id == dag_id, cls.task_id == 
task_id, cls.run_id == run_id)
         if map_index is not None:
-            query = query.where(cls.map_index == map_index)
-
-        for xcom in session.scalars(query):
-            # print(f"Clearing XCOM {xcom} with value {xcom.value}")
-            session.delete(xcom)
+            delete_stmt = delete_stmt.where(cls.map_index == map_index)
 
+        session.execute(delete_stmt)
         session.commit()
 
     @classmethod

Reply via email to