dheerajturaga commented on code in PR #61021:
URL: https://github.com/apache/airflow/pull/61021#discussion_r2743272605
##########
airflow-ctl/src/airflowctl/api/operations.py:
##########
@@ -697,3 +702,125 @@ def get(self) -> VersionInfo | ServerResponseError:
return VersionInfo.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
+
+
+class XComOperations(BaseOperations):
+ """XCom operations."""
+
+ def get(
+ self,
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ key: str,
+ map_index: int = None, # type: ignore
+ ) -> XComResponseNative | ServerResponseError:
+ """Get an XCom entry."""
+ try:
+ params: dict[str, Any] = {}
+ if map_index is not None:
+ params["map_index"] = map_index
+ self.response = self.client.get(
+
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+ params=params,
+ )
+ return
XComResponseNative.model_validate_json(self.response.content)
+ except ServerResponseError as e:
+ raise e
+
+ def list(
+ self,
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ map_index: int = None, # type: ignore
+ key: str = None, # type: ignore
+ ) -> XComCollectionResponse | ServerResponseError:
+ """List XCom entries."""
+ params: dict[str, Any] = {}
+ if map_index is not None:
+ params["map_index"] = map_index
+ if key is not None:
+ params["xcom_key"] = key
+ return super().execute_list(
+
path=f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
+ data_model=XComCollectionResponse,
+ params=params,
+ )
+
+ def add(
+ self,
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ key: str,
+ value: str,
+ map_index: int = None, # type: ignore
+ ) -> XComResponseNative | ServerResponseError:
+ """Add an XCom entry."""
+ try:
+ parsed_value = json.loads(value)
+ except (ValueError, TypeError):
+ parsed_value = value
+
+ body_dict: dict[str, Any] = {"key": key, "value": parsed_value}
+ if map_index is not None:
+ body_dict["map_index"] = map_index
+ body = XComCreateBody(**body_dict)
+ try:
+ self.response = self.client.post(
+
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
+ json=body.model_dump(mode="json", exclude_unset=True),
+ )
+ return
XComResponseNative.model_validate_json(self.response.content)
+ except ServerResponseError as e:
+ raise e
+
+ def edit(
+ self,
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ key: str,
+ value: str,
+ map_index: int = None, # type: ignore
+ ) -> XComResponseNative | ServerResponseError:
+ """Edit an XCom entry."""
+ try:
+ parsed_value = json.loads(value)
+ except (ValueError, TypeError):
+ parsed_value = value
+
+ body_dict: dict[str, Any] = {"value": parsed_value}
+ if map_index is not None:
+ body_dict["map_index"] = map_index
+ body = XComUpdateBody(**body_dict)
+ try:
+ self.response = self.client.patch(
+
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+ json=body.model_dump(mode="json", exclude_unset=True),
+ )
+ return
XComResponseNative.model_validate_json(self.response.content)
+ except ServerResponseError as e:
+ raise e
+
+ def delete(
+ self,
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ key: str,
+ map_index: int = None, # type: ignore
+ ) -> str | ServerResponseError:
+ """Delete an XCom entry."""
+ try:
+ params: dict[str, Any] = {}
+ if map_index is not None:
+ params["map_index"] = map_index
+ self.client.delete(
+
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+ params=params,
+ )
+ return key
+ except ServerResponseError as e:
+ raise e
Review Comment:
@amoghrajesh , The Python Client also provides mechanism to modify xcom
outside the Task Execution context as advertised in
https://github.com/apache/airflow-client-python/blob/main/docs/XComApi.md#create_xcom_entry
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]