ashb commented on code in PR #44101:
URL: https://github.com/apache/airflow/pull/44101#discussion_r1937032285
##########
airflow/api_fastapi/execution_api/routes/xcoms.py:
##########
@@ -105,6 +106,75 @@ def get_xcom(
return XComResponse(key=key, value=xcom_value)
[email protected](
+ "/{dag_id}/{run_id}/{task_id}/{key}",
+ status_code=status.HTTP_201_CREATED,
+ responses={
+ status.HTTP_400_BAD_REQUEST: {"description": "Invalid request body"},
+ },
+)
+def set_xcom(
+ dag_id: str,
+ run_id: str,
+ task_id: str,
+ key: str,
+ value: Annotated[
+ Json,
+ Body(
+ description="A JSON-formatted string representing the value to set
for the XCom.",
+ openapi_examples={
+ "simple_value": {
+ "summary": "Simple value",
+ "value": '"value1"',
+ },
+ "dict_value": {
+ "summary": "Dictionary value",
+ "value": '{"key2": "value2"}',
+ },
+ "list_value": {
+ "summary": "List value",
+ "value": '["value1"]',
+ },
+ },
+ ),
+ ],
+ token: deps.TokenDep,
+ session: Annotated[Session, Depends(get_session)],
+ map_index: Annotated[int, Query()] = -1,
+):
+ """Set an Airflow XCom."""
+ if not has_xcom_access(key, token):
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail={
+ "reason": "access_denied",
+ "message": f"Task does not have access to set XCom key
'{key}'",
+ },
+ )
+
+ # We use `BaseXCom.set` to set XComs directly to the database, bypassing
the XCom Backend.
Review Comment:
I wonder why we actually want to do that on the server side? Yeah it would
be better if the worker/client was confused to push directly to a custom
xcombackend, but shouldn't we do it on the server if that is what the user has
configured?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]