This is an automated email from the ASF dual-hosted git repository.
amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3d8269f52a6 Honor the ti while deleting XComs via execution API
(#47895)
3d8269f52a6 is described below
commit 3d8269f52a64e04367239a1e6b75e436bf5bb207
Author: Amogh Desai <[email protected]>
AuthorDate: Tue Mar 18 15:16:31 2025 +0530
Honor the ti while deleting XComs via execution API (#47895)
---
airflow/api_fastapi/execution_api/routes/xcoms.py | 14 +++++++++++---
task-sdk/src/airflow/sdk/api/client.py | 5 ++++-
task-sdk/src/airflow/sdk/execution_time/comms.py | 1 +
.../src/airflow/sdk/execution_time/supervisor.py | 2 +-
.../task_sdk/execution_time/test_supervisor.py | 22 ++++++++++++++++++++++
.../api_fastapi/execution_api/routes/test_xcoms.py | 20 ++++++++++++++++----
6 files changed, 55 insertions(+), 9 deletions(-)
diff --git a/airflow/api_fastapi/execution_api/routes/xcoms.py
b/airflow/api_fastapi/execution_api/routes/xcoms.py
index 808f6deda2b..92750835b4f 100644
--- a/airflow/api_fastapi/execution_api/routes/xcoms.py
+++ b/airflow/api_fastapi/execution_api/routes/xcoms.py
@@ -276,11 +276,13 @@ def set_xcom(
def delete_xcom(
session: SessionDep,
token: deps.TokenDep,
+ key: str,
dag_id: str,
run_id: str,
task_id: str,
- key: str,
+ map_index: Annotated[int, Query()] = -1,
):
+ """Delete a single XCom Value."""
if not has_xcom_access(dag_id, run_id, task_id, key, token, write=True):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
@@ -290,8 +292,14 @@ def delete_xcom(
},
)
- query = session.query(XComModel).where(XComModel.key == key).first()
- session.delete(query)
+ query = delete(XComModel).where(
+ XComModel.key == key,
+ XComModel.run_id == run_id,
+ XComModel.task_id == task_id,
+ XComModel.dag_id == dag_id,
+ XComModel.map_index == map_index,
+ )
+ session.execute(query)
session.commit()
return {"message": f"XCom with key: {key} successfully deleted."}
diff --git a/task-sdk/src/airflow/sdk/api/client.py
b/task-sdk/src/airflow/sdk/api/client.py
index ae22a02603c..6dfe6a5bdc7 100644
--- a/task-sdk/src/airflow/sdk/api/client.py
+++ b/task-sdk/src/airflow/sdk/api/client.py
@@ -339,9 +339,12 @@ class XComOperations:
run_id: str,
task_id: str,
key: str,
+ map_index: int | None = None,
) -> dict[str, bool]:
"""Delete a XCom with given key via the API server."""
- self.client.delete(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}")
+ if map_index is not None and map_index >= 0:
+ params = {"map_index": map_index}
+ self.client.delete(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}",
params=params)
# Any error from the server will anyway be propagated down to the
supervisor,
# so we choose to send a generic response to the supervisor over the
server response to
# decouple from the server response string
diff --git a/task-sdk/src/airflow/sdk/execution_time/comms.py
b/task-sdk/src/airflow/sdk/execution_time/comms.py
index ffe73092adf..bd081b534ab 100644
--- a/task-sdk/src/airflow/sdk/execution_time/comms.py
+++ b/task-sdk/src/airflow/sdk/execution_time/comms.py
@@ -340,6 +340,7 @@ class DeleteXCom(BaseModel):
dag_id: str
run_id: str
task_id: str
+ map_index: int | None = None
type: Literal["DeleteXCom"] = "DeleteXCom"
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index d986f57f3c0..e27e6314190 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -903,7 +903,7 @@ class ActivitySubprocess(WatchedSubprocess):
msg.dag_id, msg.run_id, msg.task_id, msg.key, msg.value,
msg.map_index, msg.mapped_length
)
elif isinstance(msg, DeleteXCom):
- self.client.xcoms.delete(msg.dag_id, msg.run_id, msg.task_id,
msg.key)
+ self.client.xcoms.delete(msg.dag_id, msg.run_id, msg.task_id,
msg.key, msg.map_index)
elif isinstance(msg, PutVariable):
self.client.variables.set(msg.key, msg.value, msg.description)
elif isinstance(msg, SetRenderedFields):
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 965164d642a..993d4520753 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -57,6 +57,7 @@ from airflow.sdk.execution_time.comms import (
AssetResult,
ConnectionResult,
DeferTask,
+ DeleteXCom,
GetAssetByName,
GetAssetByUri,
GetAssetEventByAsset,
@@ -1113,6 +1114,27 @@ class TestHandleRequest:
{"ok": True},
id="set_xcom_with_map_index_and_mapped_length",
),
+ pytest.param(
+ DeleteXCom(
+ dag_id="test_dag",
+ run_id="test_run",
+ task_id="test_task",
+ key="test_key",
+ map_index=2,
+ ),
+ b"",
+ "xcoms.delete",
+ (
+ "test_dag",
+ "test_run",
+ "test_task",
+ "test_key",
+ 2,
+ ),
+ {},
+ {"ok": True},
+ id="delete_xcom",
+ ),
# we aren't adding all states under TerminalTIState here, because
this test's scope is only to check
# if it can handle TaskState message
pytest.param(
diff --git a/tests/api_fastapi/execution_api/routes/test_xcoms.py
b/tests/api_fastapi/execution_api/routes/test_xcoms.py
index b8d10538c5b..f08c8371e28 100644
--- a/tests/api_fastapi/execution_api/routes/test_xcoms.py
+++ b/tests/api_fastapi/execution_api/routes/test_xcoms.py
@@ -1,4 +1,5 @@
# Licensed to the Apache Software Foundation (ASF) under one
+# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
@@ -252,15 +253,26 @@ class TestXComsDeleteEndpoint:
"""Test that XCom value is deleted when Delete API is called."""
ti = create_task_instance()
ti.xcom_push(key="xcom_1", value='"value1"', session=session)
+
+ ti1 = create_task_instance(dag_id="my_dag_1", task_id="task_1")
+ ti1.xcom_push(key="xcom_1", value='"value2"', session=session)
session.commit()
- xcom = session.query(XComModel).filter_by(task_id=ti.task_id,
dag_id=ti.dag_id, key="xcom_1").first()
- assert xcom is not None
+ xcoms = session.query(XComModel).filter_by(key="xcom_1").all()
+ assert xcoms is not None
+ assert len(xcoms) == 2
response =
client.delete(f"/execution/xcoms/{ti.dag_id}/{ti.run_id}/{ti.task_id}/xcom_1")
assert response.status_code == 200
assert response.json() == {"message": "XCom with key: xcom_1
successfully deleted."}
- xcom = session.query(XComModel).filter_by(task_id=ti.task_id,
dag_id=ti.dag_id, key="xcom_1").first()
- assert xcom is None
+ xcom_ti = (
+ session.query(XComModel).filter_by(task_id=ti.task_id,
dag_id=ti.dag_id, key="xcom_1").first()
+ )
+ assert xcom_ti is None
+
+ xcom_ti = (
+ session.query(XComModel).filter_by(task_id=ti1.task_id,
dag_id=ti1.dag_id, key="xcom_1").first()
+ )
+ assert xcom_ti is not None