bugraoz93 commented on code in PR #61021:
URL: https://github.com/apache/airflow/pull/61021#discussion_r2743206057


##########
airflow-ctl/src/airflowctl/api/operations.py:
##########
@@ -697,3 +702,125 @@ def get(self) -> VersionInfo | ServerResponseError:
             return VersionInfo.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
+
+
+class XComOperations(BaseOperations):
+    """XCom operations."""
+
+    def get(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        map_index: int = None,  # type: ignore
+    ) -> XComResponseNative | ServerResponseError:
+        """Get an XCom entry."""
+        try:
+            params: dict[str, Any] = {}
+            if map_index is not None:
+                params["map_index"] = map_index
+            self.response = self.client.get(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+                params=params,
+            )
+            return 
XComResponseNative.model_validate_json(self.response.content)
+        except ServerResponseError as e:
+            raise e
+
+    def list(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        map_index: int = None,  # type: ignore
+        key: str = None,  # type: ignore
+    ) -> XComCollectionResponse | ServerResponseError:
+        """List XCom entries."""
+        params: dict[str, Any] = {}
+        if map_index is not None:
+            params["map_index"] = map_index
+        if key is not None:
+            params["xcom_key"] = key
+        return super().execute_list(
+            
path=f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
+            data_model=XComCollectionResponse,
+            params=params,
+        )
+
+    def add(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        value: str,
+        map_index: int = None,  # type: ignore
+    ) -> XComResponseNative | ServerResponseError:
+        """Add an XCom entry."""
+        try:
+            parsed_value = json.loads(value)
+        except (ValueError, TypeError):
+            parsed_value = value
+
+        body_dict: dict[str, Any] = {"key": key, "value": parsed_value}
+        if map_index is not None:
+            body_dict["map_index"] = map_index
+        body = XComCreateBody(**body_dict)
+        try:
+            self.response = self.client.post(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
+                json=body.model_dump(mode="json", exclude_unset=True),
+            )
+            return 
XComResponseNative.model_validate_json(self.response.content)
+        except ServerResponseError as e:
+            raise e
+
+    def edit(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        value: str,
+        map_index: int = None,  # type: ignore
+    ) -> XComResponseNative | ServerResponseError:
+        """Edit an XCom entry."""
+        try:
+            parsed_value = json.loads(value)
+        except (ValueError, TypeError):
+            parsed_value = value
+
+        body_dict: dict[str, Any] = {"value": parsed_value}
+        if map_index is not None:
+            body_dict["map_index"] = map_index
+        body = XComUpdateBody(**body_dict)
+        try:
+            self.response = self.client.patch(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+                json=body.model_dump(mode="json", exclude_unset=True),
+            )
+            return 
XComResponseNative.model_validate_json(self.response.content)
+        except ServerResponseError as e:
+            raise e
+
+    def delete(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        map_index: int = None,  # type: ignore
+    ) -> str | ServerResponseError:
+        """Delete an XCom entry."""
+        try:
+            params: dict[str, Any] = {}
+            if map_index is not None:
+                params["map_index"] = map_index
+            self.client.delete(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+                params=params,
+            )
+            return key
+        except ServerResponseError as e:
+            raise e

Review Comment:
   Anything we can do from the Public API context should be doable on ctl too. 
One day ctl could replace the Airflow Python Client package, too, which would 
be my main reason to have all possible API endpoints we can provide :pray: :) 
   I understand your concern Amogh. Thanks for raising it! Do you think we 
shouldn't have it on the Public API and make it special for the Execution 
API/TaskSDK side only :thinking: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to