This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ed7d82f8694b7ec97a0975f84c0b26cc6671bbd6
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Dec 11 21:48:57 2025 +0800
[v3-1-test] Fix XCom key handling when keys contain special characters like
slash (#58344) (#59311)
* Fix XCom key handling when keys contain special characters like slash
* remove all the unquote
* add test
* remove all the quote() in client.py
* fix unit test
(cherry picked from commit 6e3e51201e48aa96e9072f0c56918528d479d814)
Co-authored-by: Henry Chen <[email protected]>
---
.../api_fastapi/core_api/routes/public/xcom.py | 4 +-
.../api_fastapi/execution_api/routes/xcoms.py | 212 ++++++++++-----------
.../core_api/routes/public/test_xcom.py | 48 ++++-
.../task_sdk/execution_time/test_task_runner.py | 79 ++++++++
4 files changed, 233 insertions(+), 110 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
index 6aa86d09f22..b1a4bc84414 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
@@ -60,7 +60,7 @@ xcom_router = AirflowRouter(
@xcom_router.get(
- "/{xcom_key}",
+ "/{xcom_key:path}",
responses=create_openapi_http_exception_doc(
[
status.HTTP_400_BAD_REQUEST,
@@ -292,7 +292,7 @@ def create_xcom_entry(
@xcom_router.patch(
- "/{xcom_key}",
+ "/{xcom_key:path}",
status_code=status.HTTP_200_OK,
responses=create_openapi_http_exception_doc(
[
diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
index b2399635499..1408adcefee 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py
@@ -21,7 +21,7 @@ import logging
from typing import Annotated
from fastapi import APIRouter, Body, Depends, HTTPException, Path, Query,
Request, Response, status
-from pydantic import BaseModel, JsonValue, StringConstraints
+from pydantic import BaseModel, JsonValue
from sqlalchemy import delete
from sqlalchemy.sql.selectable import Select
@@ -41,7 +41,7 @@ async def has_xcom_access(
dag_id: str,
run_id: str,
task_id: str,
- xcom_key: Annotated[str, Path(alias="key")],
+ xcom_key: Annotated[str, Path(alias="key", min_length=1)],
request: Request,
token=JWTBearerDep,
) -> bool:
@@ -88,111 +88,15 @@ async def xcom_query(
return query
[email protected](
- "/{dag_id}/{run_id}/{task_id}/{key}",
- responses={
- status.HTTP_200_OK: {
- "description": "Metadata about the number of matching XCom values",
- "headers": {
- "Content-Range": {
- "schema": {"pattern": r"^map_indexes \d+$"},
- "description": "The number of (mapped) XCom values found
for this task.",
- },
- },
- },
- },
- description="Returns the count of mapped XCom values found in the
`Content-Range` response header",
-)
-def head_xcom(
- response: Response,
- session: SessionDep,
- xcom_query: Annotated[Select, Depends(xcom_query)],
- map_index: Annotated[int | None, Query()] = None,
-) -> None:
- """Get the count of XComs from database - not other XCom Backends."""
- if map_index is not None:
- raise HTTPException(
- status_code=status.HTTP_400_BAD_REQUEST,
- detail={"reason": "invalid_request", "message": "Cannot specify
map_index in a HEAD request"},
- )
-
- count = get_query_count(xcom_query, session=session)
- # Tell the caller how many items in this query. We define a custom range
unit (HTTP spec only defines
- # "bytes" but we can add our own)
- response.headers["Content-Range"] = f"map_indexes {count}"
-
-
-class GetXcomFilterParams(BaseModel):
- """Class to house the params that can optionally be set for Get XCom."""
-
- map_index: int = -1
- include_prior_dates: bool = False
- offset: int | None = None
-
-
@router.get(
- "/{dag_id}/{run_id}/{task_id}/{key}",
- description="Get a single XCom Value",
-)
-def get_xcom(
- dag_id: str,
- run_id: str,
- task_id: str,
- key: Annotated[str, StringConstraints(min_length=1)],
- session: SessionDep,
- params: Annotated[GetXcomFilterParams, Query()],
-) -> XComResponse:
- """Get an Airflow XCom from database - not other XCom Backends."""
- xcom_query = XComModel.get_many(
- run_id=run_id,
- key=key,
- task_ids=task_id,
- dag_ids=dag_id,
- include_prior_dates=params.include_prior_dates,
- )
- if params.offset is not None:
- xcom_query =
xcom_query.where(XComModel.value.is_not(None)).order_by(None)
- if params.offset >= 0:
- xcom_query =
xcom_query.order_by(XComModel.map_index.asc()).offset(params.offset)
- else:
- xcom_query =
xcom_query.order_by(XComModel.map_index.desc()).offset(-1 - params.offset)
- else:
- xcom_query = xcom_query.where(XComModel.map_index == params.map_index)
-
- # We use `BaseXCom.get_many` to fetch XComs directly from the database,
bypassing the XCom Backend.
- # This avoids deserialization via the backend (e.g., from a remote storage
like S3) and instead
- # retrieves the raw serialized value from the database. By not relying on
`XCom.get_many` or `XCom.get_one`
- # (which automatically deserializes using the backend), we avoid potential
- # performance hits from retrieving large data files into the API server.
- result = session.scalars(xcom_query).first()
- if result is None:
- if params.offset is None:
- message = (
- f"XCom with {key=} map_index={params.map_index} not found for "
- f"task {task_id!r} in DAG run {run_id!r} of {dag_id!r}"
- )
- else:
- message = (
- f"XCom with {key=} offset={params.offset} not found for "
- f"task {task_id!r} in DAG run {run_id!r} of {dag_id!r}"
- )
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail={"reason": "not_found", "message": message},
- )
-
- return XComResponse(key=key, value=result.value)
-
-
[email protected](
- "/{dag_id}/{run_id}/{task_id}/{key}/item/{offset}",
+ "/{dag_id}/{run_id}/{task_id}/{key:path}/item/{offset}",
description="Get a single XCom value from a mapped task by sequence index",
)
def get_mapped_xcom_by_index(
dag_id: str,
run_id: str,
task_id: str,
- key: str,
+ key: Annotated[str, Path(min_length=1)],
offset: int,
session: SessionDep,
) -> XComSequenceIndexResponse:
@@ -229,14 +133,14 @@ class GetXComSliceFilterParams(BaseModel):
@router.get(
- "/{dag_id}/{run_id}/{task_id}/{key}/slice",
+ "/{dag_id}/{run_id}/{task_id}/{key:path}/slice",
description="Get XCom values from a mapped task by sequence slice",
)
def get_mapped_xcom_by_slice(
dag_id: str,
run_id: str,
task_id: str,
- key: str,
+ key: Annotated[str, Path(min_length=1)],
params: Annotated[GetXComSliceFilterParams, Query()],
session: SessionDep,
) -> XComSequenceSliceResponse:
@@ -310,17 +214,113 @@ def get_mapped_xcom_by_slice(
return XComSequenceSliceResponse(values)
[email protected](
+ "/{dag_id}/{run_id}/{task_id}/{key:path}",
+ responses={
+ status.HTTP_200_OK: {
+ "description": "Metadata about the number of matching XCom values",
+ "headers": {
+ "Content-Range": {
+ "schema": {"pattern": r"^map_indexes \d+$"},
+ "description": "The number of (mapped) XCom values found
for this task.",
+ },
+ },
+ },
+ },
+ description="Returns the count of mapped XCom values found in the
`Content-Range` response header",
+)
+def head_xcom(
+ response: Response,
+ session: SessionDep,
+ xcom_query: Annotated[Select, Depends(xcom_query)],
+ map_index: Annotated[int | None, Query()] = None,
+) -> None:
+ """Get the count of XComs from database - not other XCom Backends."""
+ if map_index is not None:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail={"reason": "invalid_request", "message": "Cannot specify
map_index in a HEAD request"},
+ )
+
+ count = get_query_count(xcom_query, session=session)
+ # Tell the caller how many items in this query. We define a custom range
unit (HTTP spec only defines
+ # "bytes" but we can add our own)
+ response.headers["Content-Range"] = f"map_indexes {count}"
+
+
+class GetXcomFilterParams(BaseModel):
+ """Class to house the params that can optionally be set for Get XCom."""
+
+ map_index: int = -1
+ include_prior_dates: bool = False
+ offset: int | None = None
+
+
[email protected](
+ "/{dag_id}/{run_id}/{task_id}/{key:path}",
+ description="Get a single XCom Value",
+)
+def get_xcom(
+ dag_id: str,
+ run_id: str,
+ task_id: str,
+ key: Annotated[str, Path(min_length=1)],
+ session: SessionDep,
+ params: Annotated[GetXcomFilterParams, Query()],
+) -> XComResponse:
+ """Get an Airflow XCom from database - not other XCom Backends."""
+ xcom_query = XComModel.get_many(
+ run_id=run_id,
+ key=key,
+ task_ids=task_id,
+ dag_ids=dag_id,
+ include_prior_dates=params.include_prior_dates,
+ )
+ if params.offset is not None:
+ xcom_query =
xcom_query.where(XComModel.value.is_not(None)).order_by(None)
+ if params.offset >= 0:
+ xcom_query =
xcom_query.order_by(XComModel.map_index.asc()).offset(params.offset)
+ else:
+ xcom_query =
xcom_query.order_by(XComModel.map_index.desc()).offset(-1 - params.offset)
+ else:
+ xcom_query = xcom_query.where(XComModel.map_index == params.map_index)
+
+ # We use `BaseXCom.get_many` to fetch XComs directly from the database,
bypassing the XCom Backend.
+ # This avoids deserialization via the backend (e.g., from a remote storage
like S3) and instead
+ # retrieves the raw serialized value from the database. By not relying on
`XCom.get_many` or `XCom.get_one`
+ # (which automatically deserializes using the backend), we avoid potential
+ # performance hits from retrieving large data files into the API server.
+ result = session.scalars(xcom_query).first()
+ if result is None:
+ if params.offset is None:
+ message = (
+ f"XCom with {key=} map_index={params.map_index} not found for "
+ f"task {task_id!r} in DAG run {run_id!r} of {dag_id!r}"
+ )
+ else:
+ message = (
+ f"XCom with {key=} offset={params.offset} not found for "
+ f"task {task_id!r} in DAG run {run_id!r} of {dag_id!r}"
+ )
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail={"reason": "not_found", "message": message},
+ )
+
+ return XComResponse(key=key, value=result.value)
+
+
# TODO: once we have JWT tokens, then remove dag_id/run_id/task_id from the
URL and just use the info in
# the token
@router.post(
- "/{dag_id}/{run_id}/{task_id}/{key}",
+ "/{dag_id}/{run_id}/{task_id}/{key:path}",
status_code=status.HTTP_201_CREATED,
)
def set_xcom(
dag_id: str,
run_id: str,
task_id: str,
- key: Annotated[str, StringConstraints(min_length=1)],
+ key: Annotated[str, Path(min_length=1)],
session: SessionDep,
value: Annotated[
JsonValue,
@@ -431,7 +431,7 @@ def set_xcom(
@router.delete(
- "/{dag_id}/{run_id}/{task_id}/{key}",
+ "/{dag_id}/{run_id}/{task_id}/{key:path}",
responses={status.HTTP_404_NOT_FOUND: {"description": "XCom not found"}},
description="Delete a single XCom Value",
)
@@ -440,7 +440,7 @@ def delete_xcom(
dag_id: str,
run_id: str,
task_id: str,
- key: str,
+ key: Annotated[str, Path(min_length=1)],
map_index: Annotated[int, Query()] = -1,
):
"""Delete a single XCom Value."""
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
index d7a4837ddcc..395e45f0a9e 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_xcom.py
@@ -161,6 +161,18 @@ class TestGetXComEntry(TestXComEndpoint):
assert response.status_code == 404
assert response.json()["detail"] == f"XCom entry with key:
`{TEST_XCOM_KEY_2}` not found"
+ def test_should_respond_200_native_with_slash_key(self, test_client):
+ slash_key = "folder/sub/value"
+ self._create_xcom(slash_key, TEST_XCOM_VALUE)
+ # Use raw slash_key directly - FastAPI with :path converter handles it
+ response = test_client.get(
+
f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{slash_key}"
+ )
+ assert response.status_code == 200
+ current_data = response.json()
+ assert current_data["key"] == slash_key
+ assert current_data["value"] == json.dumps(TEST_XCOM_VALUE)
+
@pytest.mark.parametrize(
"params, expected_value",
[
@@ -630,6 +642,23 @@ class TestCreateXComEntry(TestXComEndpoint):
)
assert response.status_code == 403
+ def test_create_xcom_entry_with_slash_key(self, test_client):
+ slash_key = "a/b/c"
+ body = XComCreateBody(key=slash_key, value=TEST_XCOM_VALUE)
+ response = test_client.post(
+
f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries",
+ json=body.dict(),
+ )
+ assert response.status_code == 201
+ assert response.json()["key"] == slash_key
+ # Verify retrieval via encoded path
+ get_resp = test_client.get(
+
f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{slash_key}"
+ )
+ assert get_resp.status_code == 200
+ assert get_resp.json()["key"] == slash_key
+ assert get_resp.json()["value"] == json.dumps(TEST_XCOM_VALUE)
+
class TestPatchXComEntry(TestXComEndpoint):
@pytest.mark.parametrize(
@@ -657,7 +686,8 @@ class TestPatchXComEntry(TestXComEndpoint):
# Ensure the XCom entry exists before updating
if expected_status != 404:
self._create_xcom(TEST_XCOM_KEY, TEST_XCOM_VALUE)
- new_value = XComModel.serialize_value(patch_body["value"])
+ # The value is double-serialized: first
json.dumps(patch_body["value"]), then json.dumps() again
+ new_value = json.dumps(json.dumps(patch_body["value"]))
response = test_client.patch(
f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{key}",
@@ -667,7 +697,7 @@ class TestPatchXComEntry(TestXComEndpoint):
assert response.status_code == expected_status
if expected_status == 200:
- assert response.json()["value"] ==
XComModel.serialize_value(new_value)
+ assert response.json()["value"] == new_value
else:
assert response.json()["detail"] == expected_detail
check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry",
logical_date=None)
@@ -685,3 +715,17 @@ class TestPatchXComEntry(TestXComEndpoint):
json={},
)
assert response.status_code == 403
+
+ def test_patch_xcom_entry_with_slash_key(self, test_client, session):
+ slash_key = "x/y"
+ self._create_xcom(slash_key, TEST_XCOM_VALUE)
+ new_value = {"updated": True}
+ response = test_client.patch(
+
f"/dags/{TEST_DAG_ID}/dagRuns/{run_id}/taskInstances/{TEST_TASK_ID}/xcomEntries/{slash_key}",
+ json={"value": new_value},
+ )
+ assert response.status_code == 200
+ assert response.json()["key"] == slash_key
+ # The value is double-serialized: first json.dumps(new_value), then
json.dumps() again
+ assert response.json()["value"] == json.dumps(json.dumps(new_value))
+ check_last_log(session, dag_id=TEST_DAG_ID, event="update_xcom_entry",
logical_date=None)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 9728325cc23..96128bc810d 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -2076,6 +2076,85 @@ class TestRuntimeTaskInstance:
mock_delete.assert_not_called()
+ def test_xcom_push_pull_with_slash_in_key(self, create_runtime_ti,
mock_supervisor_comms):
+ """
+ Ensure that XCom keys containing slashes are correctly quoted/unquoted
+ and do not break API routes (no 400/404).
+ """
+
+ class PushOperator(BaseOperator):
+ def execute(self, context):
+ context["ti"].xcom_push(key="some/key/with/slash",
value="slash_value")
+
+ task = PushOperator(task_id="push_task")
+ runtime_ti = create_runtime_ti(task=task, dag_id="test_dag")
+
+ # Run the task (which should trigger xcom_push)
+ run(runtime_ti, context=runtime_ti.get_template_context(),
log=mock.MagicMock())
+
+ # Verify supervisor received a SetXCom with quoted key
+ called_args = [
+ call.kwargs.get("msg") or call.args[0] for call in
mock_supervisor_comms.send.call_args_list
+ ]
+ assert any(getattr(arg, "key", None) == "some/key/with/slash" for arg
in called_args)
+
+ ser_value = BaseXCom.serialize_value("slash_value")
+ mock_supervisor_comms.send.reset_mock()
+ mock_supervisor_comms.send.return_value = XComSequenceSliceResult(
+ key="some/key/with/slash",
+ root=[ser_value],
+ )
+
+ pulled_value = runtime_ti.xcom_pull(key="some/key/with/slash",
task_ids="push_task")
+ assert pulled_value == "slash_value"
+
+ # Key should NOT be quoted here - client API will handle encoding
+ mock_supervisor_comms.send.assert_any_call(
+ GetXComSequenceSlice(
+ key="some/key/with/slash",
+ dag_id="test_dag",
+ run_id="test_run",
+ task_id="push_task",
+ map_index=0,
+ include_prior_dates=False,
+ start=None,
+ stop=None,
+ step=None,
+ type="GetXComSequenceSlice",
+ )
+ )
+
+ def test_taskflow_dict_return_with_slash_key(self, create_runtime_ti,
mock_supervisor_comms):
+ """
+ High-level: Ensure TaskFlow returning dict with slash in key doesn't
404 during XCom push.
+ """
+
+ @dag_decorator(schedule=None, start_date=timezone.datetime(2024, 12,
3))
+ def dag_with_slash_key():
+ @task_decorator
+ def dict_task():
+ return {"key with slash /": "Some Value"}
+
+ return dict_task() # returns XComArg
+
+ dag_obj = dag_with_slash_key()
+ task_op = dag_obj.get_task("dict_task")
+ runtime_ti = create_runtime_ti(task=task_op, dag_id=dag_obj.dag_id)
+
+ # Run task instance → should trigger TaskFlow dict expansion + XCom
push
+ run(runtime_ti, context=runtime_ti.get_template_context(),
log=mock.MagicMock())
+
+ # Mock supervisor response to simulate retrieval
+ ser_value = BaseXCom.serialize_value("Some Value")
+ mock_supervisor_comms.send.reset_mock()
+ mock_supervisor_comms.send.return_value = XComSequenceSliceResult(
+ key="key/slash",
+ root=[ser_value],
+ )
+
+ pulled = runtime_ti.xcom_pull(key="key/slash", task_ids="dict_task")
+ assert pulled == "Some Value"
+
class TestXComAfterTaskExecution:
@pytest.mark.parametrize(