amoghrajesh commented on code in PR #44605:
URL: https://github.com/apache/airflow/pull/44605#discussion_r1868920705
##########
task_sdk/src/airflow/sdk/api/client.py:
##########
@@ -169,6 +169,19 @@ def get(self, dag_id: str, run_id: str, task_id: str, key:
str, map_index: int =
resp = self.client.get(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}",
params={"map_index": map_index})
return XComResponse.model_validate_json(resp.read())
+ def set(
+ self, dag_id: str, run_id: str, task_id: str, key: str, value,
map_index: int | None = None
+ ) -> dict[str, bool]:
+ """Set a XCom value via the API server."""
+ params = {}
+ if map_index:
+ params = {"map_index": map_index}
+ self.client.post(f"xcoms/{dag_id}/{run_id}/{task_id}/{key}",
params=params, json=value)
+ # Any error from the server will anyway be propagated down to the
supervisor,
+ # so we choose to send a generic response to the supervisor over the
server response to
+ # decouple from the server response string
Review Comment:
Same as https://github.com/apache/airflow/pull/44562#discussion_r1868014142
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]