This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d8269f52a6 Honor the ti while deleting XComs via execution API 
(#47895)
3d8269f52a6 is described below

commit 3d8269f52a64e04367239a1e6b75e436bf5bb207
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Mar 18 15:16:31 2025 +0530

    Honor the ti while deleting XComs via execution API (#47895)
---
 airflow/api_fastapi/execution_api/routes/xcoms.py  | 14 +++++++++++---
 task-sdk/src/airflow/sdk/api/client.py             |  5 ++++-
 task-sdk/src/airflow/sdk/execution_time/comms.py   |  1 +
 .../src/airflow/sdk/execution_time/supervisor.py   |  2 +-
 .../task_sdk/execution_time/test_supervisor.py     | 22 ++++++++++++++++++++++
 .../api_fastapi/execution_api/routes/test_xcoms.py | 20 ++++++++++++++++----
 6 files changed, 55 insertions(+), 9 deletions(-)

diff --git a/airflow/api_fastapi/execution_api/routes/xcoms.py 
b/airflow/api_fastapi/execution_api/routes/xcoms.py
index 808f6deda2b..92750835b4f 100644
--- a/airflow/api_fastapi/execution_api/routes/xcoms.py
+++ b/airflow/api_fastapi/execution_api/routes/xcoms.py
@@ -276,11 +276,13 @@ def set_xcom(
 def delete_xcom(
     session: SessionDep,
     token: deps.TokenDep,
+    key: str,
     dag_id: str,
     run_id: str,
     task_id: str,
-    key: str,
+    map_index: Annotated[int, Query()] = -1,
 ):
+    """Delete a single XCom Value."""
     if not has_xcom_access(dag_id, run_id, task_id, key, token, write=True):
         raise HTTPException(
             status_code=status.HTTP_403_FORBIDDEN,
@@ -290,8 +292,14 @@ def delete_xcom(
             },
         )
 
-    query = session.query(XComModel).where(XComModel.key == key).first()
-    session.delete(query)
+    query = delete(XComModel).where(
+        XComModel.key == key,
+        XComModel.run_id == run_id,
+        XComModel.task_id == task_id,
+        XComModel.dag_id == dag_id,
+        XComModel.map_index == map_index,
+    )
+    session.execute(query)
     session.commit()
     return {"message": f"XCom with key: {key} successfully deleted."}
 
diff --git a/task-sdk/src/airflow/sdk/api/client.py 
b/task-sdk/src/airflow/sdk/api/client.py
index ae22a02603c..6dfe6a5bdc7 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -339,9 +339,12 @@ class XComOperations:
         run_id: str,
         task_id: str,
         key: str,
+        map_index: int | None = None,
     ) -> dict[str, bool]:
         """Delete a XCom with given key via the API server."""
-        self.client.delete(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}")
+        if map_index is not None and map_index >= 0:
+            params = {"map_index": map_index}
+        self.client.delete(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}", 
params=params)
         # Any error from the server will anyway be propagated down to the 
supervisor,
         # so we choose to send a generic response to the supervisor over the 
server response to
         # decouple from the server response string
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py 
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index ffe73092adf..bd081b534ab 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -340,6 +340,7 @@ class DeleteXCom(BaseModel):
     dag_id: str
     run_id: str
     task_id: str
+    map_index: int | None = None
     type: Literal["DeleteXCom"] = "DeleteXCom"
 
 
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py 
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index d986f57f3c0..e27e6314190 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -903,7 +903,7 @@ class ActivitySubprocess(WatchedSubprocess):
                 msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.value, 
msg.map_index, msg.mapped_length
             )
         elif isinstance(msg, DeleteXCom):
-            self.client.xcoms.delete(msg.dag_id, msg.run_id, msg.task_id, 
msg.key)
+            self.client.xcoms.delete(msg.dag_id, msg.run_id, msg.task_id, 
msg.key, msg.map_index)
         elif isinstance(msg, PutVariable):
             self.client.variables.set(msg.key, msg.value, msg.description)
         elif isinstance(msg, SetRenderedFields):
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py 
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 965164d642a..993d4520753 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -57,6 +57,7 @@ from airflow.sdk.execution_time.comms import (
     AssetResult,
     ConnectionResult,
     DeferTask,
+    DeleteXCom,
     GetAssetByName,
     GetAssetByUri,
     GetAssetEventByAsset,
@@ -1113,6 +1114,27 @@ class TestHandleRequest:
                 {"ok": True},
                 id="set_xcom_with_map_index_and_mapped_length",
             ),
+            pytest.param(
+                DeleteXCom(
+                    dag_id="test_dag",
+                    run_id="test_run",
+                    task_id="test_task",
+                    key="test_key",
+                    map_index=2,
+                ),
+                b"",
+                "xcoms.delete",
+                (
+                    "test_dag",
+                    "test_run",
+                    "test_task",
+                    "test_key",
+                    2,
+                ),
+                {},
+                {"ok": True},
+                id="delete_xcom",
+            ),
             # we aren't adding all states under TerminalTIState here, because 
this test's scope is only to check
             # if it can handle TaskState message
             pytest.param(
diff --git a/tests/api_fastapi/execution_api/routes/test_xcoms.py 
b/tests/api_fastapi/execution_api/routes/test_xcoms.py
index b8d10538c5b..f08c8371e28 100644
--- a/tests/api_fastapi/execution_api/routes/test_xcoms.py
+++ b/tests/api_fastapi/execution_api/routes/test_xcoms.py
@@ -1,4 +1,5 @@
 # Licensed to the Apache Software Foundation (ASF) under one
+# Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
 # regarding copyright ownership.  The ASF licenses this file
@@ -252,15 +253,26 @@ class TestXComsDeleteEndpoint:
         """Test that XCom value is deleted when Delete API is called."""
         ti = create_task_instance()
         ti.xcom_push(key="xcom_1", value='"value1"', session=session)
+
+        ti1 = create_task_instance(dag_id="my_dag_1", task_id="task_1")
+        ti1.xcom_push(key="xcom_1", value='"value2"', session=session)
         session.commit()
 
-        xcom = session.query(XComModel).filter_by(task_id=ti.task_id, 
dag_id=ti.dag_id, key="xcom_1").first()
-        assert xcom is not None
+        xcoms = session.query(XComModel).filter_by(key="xcom_1").all()
+        assert xcoms is not None
+        assert len(xcoms) == 2
 
         response = 
client.delete(f"/execution/xcoms/{ti.dag_id}/{ti.run_id}/{ti.task_id}/xcom_1")
 
         assert response.status_code == 200
         assert response.json() == {"message": "XCom with key: xcom_1 
successfully deleted."}
 
-        xcom = session.query(XComModel).filter_by(task_id=ti.task_id, 
dag_id=ti.dag_id, key="xcom_1").first()
-        assert xcom is None
+        xcom_ti = (
+            session.query(XComModel).filter_by(task_id=ti.task_id, 
dag_id=ti.dag_id, key="xcom_1").first()
+        )
+        assert xcom_ti is None
+
+        xcom_ti = (
+            session.query(XComModel).filter_by(task_id=ti1.task_id, 
dag_id=ti1.dag_id, key="xcom_1").first()
+        )
+        assert xcom_ti is not None

Reply via email to