This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 41733b23516 Fix XCom PATCH/POST to store native values (instead of
json.dumps) (#64220) (#67116)
41733b23516 is described below
commit 41733b23516ad200dc20e170e04facadefc0a636
Author: Rahul Vats <[email protected]>
AuthorDate: Mon May 18 21:58:45 2026 +0530
Fix XCom PATCH/POST to store native values (instead of json.dumps) (#64220)
(#67116)
(cherry picked from commit acdd9da1ce46a5b08d75b6646e78ff8c8f74fa5c)
Co-authored-by: Henry Chen <[email protected]>
---
.../api_fastapi/core_api/routes/public/xcom.py | 53 ++++++++++++++--------
.../core_api/routes/public/test_xcom.py | 26 +++++++++--
2 files changed, 55 insertions(+), 24 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
index 2d98d873278..13c7f78ed28 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
@@ -17,7 +17,6 @@
from __future__ import annotations
import copy
-import json
from typing import Annotated
from fastapi import Depends, HTTPException, Query, status
@@ -291,27 +290,24 @@ def create_xcom_entry(
)
try:
- value = json.dumps(request_body.value)
- except (ValueError, TypeError):
+ XComModel.set(
+ key=request_body.key,
+ value=request_body.value,
+ dag_id=dag_id,
+ task_id=task_id,
+ run_id=dag_run_id,
+ map_index=request_body.map_index,
+ serialize=False,
+ session=session,
+ )
+ except (ValueError, TypeError) as e:
raise HTTPException(
status.HTTP_400_BAD_REQUEST, f"Couldn't serialise the XCom with
key: `{request_body.key}`"
- )
-
- new = XComModel(
- dag_run_id=dag_run.id,
- key=request_body.key,
- value=value,
- run_id=dag_run_id,
- task_id=task_id,
- dag_id=dag_id,
- map_index=request_body.map_index,
- )
- session.add(new)
- session.flush()
+ ) from e
xcom = session.scalar(
select(XComModel)
- .filter(
+ .where(
XComModel.dag_id == dag_id,
XComModel.task_id == task_id,
XComModel.run_id == dag_run_id,
@@ -345,11 +341,12 @@ def update_xcom_entry(
dag_run_id: str,
xcom_key: str,
patch_body: XComUpdateBody,
+ *,
session: SessionDep,
) -> XComResponseNative:
"""Update an existing XCom entry."""
# Check if XCom entry exists
- xcom_entry = session.scalar(
+ xcom_query = (
select(XComModel)
.where(
XComModel.dag_id == dag_id,
@@ -361,6 +358,7 @@ def update_xcom_entry(
.limit(1)
.options(joinedload(XComModel.task),
joinedload(XComModel.dag_run).joinedload(DR.dag_model))
)
+ xcom_entry = session.scalar(xcom_query)
if not xcom_entry:
raise HTTPException(
@@ -368,9 +366,24 @@ def update_xcom_entry(
f"The XCom with key: `{xcom_key}` with mentioned task instance
doesn't exist.",
)
- # Update XCom entry
- xcom_entry.value = json.dumps(patch_body.value)
+ try:
+ XComModel.set(
+ key=xcom_key,
+ value=patch_body.value,
+ dag_id=dag_id,
+ task_id=task_id,
+ run_id=dag_run_id,
+ map_index=patch_body.map_index,
+ serialize=False,
+ session=session,
+ )
+ except (ValueError, TypeError) as e:
+ raise HTTPException(
+ status.HTTP_400_BAD_REQUEST, f"Couldn't serialise the XCom with
key: `{xcom_key}`"
+ ) from e
+ # Fetch after setting, to get fresh object for response
+ xcom_entry = session.scalar(xcom_query)
return XComResponseNative.model_validate(xcom_entry)
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
index 0b2864c6e2d..bf4471be73a 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
@@ -680,7 +680,7 @@ class TestCreateXComEntry(TestXComEndpoint):
# Validate the created XCom response
current_data = response.json()
assert current_data["key"] == request_body.key
- assert current_data["value"] ==
XComModel.serialize_value(request_body.value)
+ assert current_data["value"] == request_body.value
assert current_data["dag_id"] == dag_id
assert current_data["task_id"] == task_id
assert current_data["run_id"] == dag_run_id
@@ -716,7 +716,7 @@ class TestCreateXComEntry(TestXComEndpoint):
)
assert get_resp.status_code == 200
assert get_resp.json()["key"] == slash_key
- assert get_resp.json()["value"] == json.dumps(TEST_XCOM_VALUE)
+ assert get_resp.json()["value"] == TEST_XCOM_VALUE
@pytest.mark.parametrize(
("key", "value"),
@@ -833,7 +833,7 @@ class TestPatchXComEntry(TestXComEndpoint):
assert response.status_code == expected_status
if expected_status == 200:
- assert response.json()["value"] == json.dumps(patch_body["value"])
+ assert response.json()["value"] == patch_body["value"]
else:
assert response.json()["detail"] == expected_detail
check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry",
logical_date=None)
@@ -862,7 +862,25 @@ class TestPatchXComEntry(TestXComEndpoint):
)
assert response.status_code == 200
assert response.json()["key"] == slash_key
- assert response.json()["value"] == json.dumps(new_value)
+ assert response.json()["value"] == new_value
+ check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry",
logical_date=None)
+
+ def test_patch_xcom_preserves_int_type(self, test_client, session):
+ """Test scenario described in #59032: if existing XCom value type is
int,
+ after patching with different value, it should still be int in the API
response.
+ """
+ key = "int_type_xcom"
+ # Create with int value
+ self._create_xcom(key, 42)
+ patch_value = 100
+ response = test_client.patch(
+
f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{key}",
+ json={"value": patch_value},
+ )
+ assert response.status_code == 200
+ data = response.json()
+ assert data["value"] == patch_value
+ assert isinstance(data["value"], int), f"Expected int type but got
{type(data['value'])}"
check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry",
logical_date=None)
@pytest.mark.parametrize(