amoghrajesh commented on code in PR #61021:
URL: https://github.com/apache/airflow/pull/61021#discussion_r2730378812


##########
airflow-ctl/src/airflowctl/api/operations.py:
##########
@@ -697,3 +702,125 @@ def get(self) -> VersionInfo | ServerResponseError:
             return VersionInfo.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
+
+
+class XComOperations(BaseOperations):
+    """XCom operations."""
+
+    def get(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        map_index: int = None,  # type: ignore
+    ) -> XComResponseNative | ServerResponseError:
+        """Get an XCom entry."""
+        try:
+            params: dict[str, Any] = {}
+            if map_index is not None:
+                params["map_index"] = map_index
+            self.response = self.client.get(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+                params=params,
+            )
+            return 
XComResponseNative.model_validate_json(self.response.content)
+        except ServerResponseError as e:
+            raise e
+
+    def list(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        map_index: int = None,  # type: ignore
+        key: str = None,  # type: ignore
+    ) -> XComCollectionResponse | ServerResponseError:
+        """List XCom entries."""
+        params: dict[str, Any] = {}
+        if map_index is not None:
+            params["map_index"] = map_index
+        if key is not None:
+            params["xcom_key"] = key
+        return super().execute_list(
+            
path=f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
+            data_model=XComCollectionResponse,
+            params=params,
+        )
+
+    def add(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        value: str,
+        map_index: int = None,  # type: ignore
+    ) -> XComResponseNative | ServerResponseError:
+        """Add an XCom entry."""
+        try:
+            parsed_value = json.loads(value)
+        except (ValueError, TypeError):
+            parsed_value = value
+
+        body_dict: dict[str, Any] = {"key": key, "value": parsed_value}
+        if map_index is not None:
+            body_dict["map_index"] = map_index
+        body = XComCreateBody(**body_dict)
+        try:
+            self.response = self.client.post(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
+                json=body.model_dump(mode="json", exclude_unset=True),
+            )
+            return 
XComResponseNative.model_validate_json(self.response.content)
+        except ServerResponseError as e:
+            raise e
+
+    def edit(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        value: str,
+        map_index: int = None,  # type: ignore
+    ) -> XComResponseNative | ServerResponseError:
+        """Edit an XCom entry."""
+        try:
+            parsed_value = json.loads(value)
+        except (ValueError, TypeError):
+            parsed_value = value
+
+        body_dict: dict[str, Any] = {"value": parsed_value}
+        if map_index is not None:
+            body_dict["map_index"] = map_index
+        body = XComUpdateBody(**body_dict)
+        try:
+            self.response = self.client.patch(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+                json=body.model_dump(mode="json", exclude_unset=True),
+            )
+            return 
XComResponseNative.model_validate_json(self.response.content)
+        except ServerResponseError as e:
+            raise e
+
+    def delete(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        map_index: int = None,  # type: ignore
+    ) -> str | ServerResponseError:
+        """Delete an XCom entry."""
+        try:
+            params: dict[str, Any] = {}
+            if map_index is not None:
+                params["map_index"] = map_index
+            self.client.delete(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+                params=params,
+            )
+            return key
+        except ServerResponseError as e:
+            raise e

Review Comment:
   What's the use case for these? I could understand why we need read / list 
but just trying to understand what the use case is for manipulating xcoms 
outside task execution context



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to