dheerajturaga commented on code in PR #61021:
URL: https://github.com/apache/airflow/pull/61021#discussion_r2743258521
##########
airflow-ctl/src/airflowctl/api/operations.py:
##########
@@ -697,3 +702,125 @@ def get(self) -> VersionInfo | ServerResponseError:
return VersionInfo.model_validate_json(self.response.content)
except ServerResponseError as e:
raise e
+
+
+class XComOperations(BaseOperations):
+ """XCom operations."""
+
+ def get(
+ self,
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ key: str,
+ map_index: int = None, # type: ignore
+ ) -> XComResponseNative | ServerResponseError:
+ """Get an XCom entry."""
+ try:
+ params: dict[str, Any] = {}
+ if map_index is not None:
+ params["map_index"] = map_index
+ self.response = self.client.get(
+
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+ params=params,
+ )
+ return
XComResponseNative.model_validate_json(self.response.content)
+ except ServerResponseError as e:
+ raise e
+
+ def list(
+ self,
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ map_index: int = None, # type: ignore
+ key: str = None, # type: ignore
+ ) -> XComCollectionResponse | ServerResponseError:
+ """List XCom entries."""
+ params: dict[str, Any] = {}
+ if map_index is not None:
+ params["map_index"] = map_index
+ if key is not None:
+ params["xcom_key"] = key
+ return super().execute_list(
+
path=f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
+ data_model=XComCollectionResponse,
+ params=params,
+ )
+
+ def add(
+ self,
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ key: str,
+ value: str,
+ map_index: int = None, # type: ignore
+ ) -> XComResponseNative | ServerResponseError:
+ """Add an XCom entry."""
+ try:
+ parsed_value = json.loads(value)
+ except (ValueError, TypeError):
+ parsed_value = value
+
+ body_dict: dict[str, Any] = {"key": key, "value": parsed_value}
+ if map_index is not None:
+ body_dict["map_index"] = map_index
+ body = XComCreateBody(**body_dict)
+ try:
+ self.response = self.client.post(
+
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
+ json=body.model_dump(mode="json", exclude_unset=True),
+ )
+ return
XComResponseNative.model_validate_json(self.response.content)
+ except ServerResponseError as e:
+ raise e
+
+ def edit(
+ self,
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ key: str,
+ value: str,
+ map_index: int = None, # type: ignore
+ ) -> XComResponseNative | ServerResponseError:
+ """Edit an XCom entry."""
+ try:
+ parsed_value = json.loads(value)
+ except (ValueError, TypeError):
+ parsed_value = value
+
+ body_dict: dict[str, Any] = {"value": parsed_value}
+ if map_index is not None:
+ body_dict["map_index"] = map_index
+ body = XComUpdateBody(**body_dict)
+ try:
+ self.response = self.client.patch(
+
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+ json=body.model_dump(mode="json", exclude_unset=True),
+ )
+ return
XComResponseNative.model_validate_json(self.response.content)
+ except ServerResponseError as e:
+ raise e
+
+ def delete(
+ self,
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ key: str,
+ map_index: int = None, # type: ignore
+ ) -> str | ServerResponseError:
+ """Delete an XCom entry."""
+ try:
+ params: dict[str, Any] = {}
+ if map_index is not None:
+ params["map_index"] = map_index
+ self.client.delete(
+
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+ params=params,
+ )
+ return key
+ except ServerResponseError as e:
+ raise e
Review Comment:
These operations exist on the UI side already and are POR for 3.2 . So this
enhancement is in alignment with the UI.
I would like to merge this and have a discussion on it in the sidelines. IMO
these are very useful in some workflows (especially for me :-) )
<img width="2608" height="1056" alt="image"
src="https://github.com/user-attachments/assets/a723a74b-79b7-44a6-9d86-57d4b62400bf"
/>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]