dheerajturaga commented on code in PR #61021:
URL: https://github.com/apache/airflow/pull/61021#discussion_r2733200722


##########
airflow-ctl/src/airflowctl/api/operations.py:
##########
@@ -697,3 +702,125 @@ def get(self) -> VersionInfo | ServerResponseError:
             return VersionInfo.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
+
+
+class XComOperations(BaseOperations):
+    """XCom operations."""
+
+    def get(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        map_index: int = None,  # type: ignore
+    ) -> XComResponseNative | ServerResponseError:
+        """Get an XCom entry."""
+        try:
+            params: dict[str, Any] = {}
+            if map_index is not None:
+                params["map_index"] = map_index
+            self.response = self.client.get(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+                params=params,
+            )
+            return 
XComResponseNative.model_validate_json(self.response.content)
+        except ServerResponseError as e:
+            raise e
+
+    def list(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        map_index: int = None,  # type: ignore
+        key: str = None,  # type: ignore
+    ) -> XComCollectionResponse | ServerResponseError:
+        """List XCom entries."""
+        params: dict[str, Any] = {}
+        if map_index is not None:
+            params["map_index"] = map_index
+        if key is not None:
+            params["xcom_key"] = key
+        return super().execute_list(
+            
path=f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
+            data_model=XComCollectionResponse,
+            params=params,
+        )
+
+    def add(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        value: str,
+        map_index: int = None,  # type: ignore
+    ) -> XComResponseNative | ServerResponseError:
+        """Add an XCom entry."""
+        try:
+            parsed_value = json.loads(value)
+        except (ValueError, TypeError):
+            parsed_value = value
+
+        body_dict: dict[str, Any] = {"key": key, "value": parsed_value}
+        if map_index is not None:
+            body_dict["map_index"] = map_index
+        body = XComCreateBody(**body_dict)
+        try:
+            self.response = self.client.post(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
+                json=body.model_dump(mode="json", exclude_unset=True),
+            )
+            return 
XComResponseNative.model_validate_json(self.response.content)
+        except ServerResponseError as e:
+            raise e
+
+    def edit(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        value: str,
+        map_index: int = None,  # type: ignore
+    ) -> XComResponseNative | ServerResponseError:
+        """Edit an XCom entry."""
+        try:
+            parsed_value = json.loads(value)
+        except (ValueError, TypeError):
+            parsed_value = value
+
+        body_dict: dict[str, Any] = {"value": parsed_value}
+        if map_index is not None:
+            body_dict["map_index"] = map_index
+        body = XComUpdateBody(**body_dict)
+        try:
+            self.response = self.client.patch(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+                json=body.model_dump(mode="json", exclude_unset=True),
+            )
+            return 
XComResponseNative.model_validate_json(self.response.content)
+        except ServerResponseError as e:
+            raise e
+
+    def delete(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        map_index: int = None,  # type: ignore
+    ) -> str | ServerResponseError:
+        """Delete an XCom entry."""
+        try:
+            params: dict[str, Any] = {}
+            if map_index is not None:
+                params["map_index"] = map_index
+            self.client.delete(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+                params=params,
+            )
+            return key
+        except ServerResponseError as e:
+            raise e

Review Comment:
   There are many scenarios. For example you can mark a task as "Success" and 
want to run downstream pipeline, however downstream tasks may depend on an xcom 
that parent task would have provided if it passed. Being able to modify these 
is critical in many scenarios



##########
airflow-ctl/src/airflowctl/api/operations.py:
##########
@@ -697,3 +702,125 @@ def get(self) -> VersionInfo | ServerResponseError:
             return VersionInfo.model_validate_json(self.response.content)
         except ServerResponseError as e:
             raise e
+
+
+class XComOperations(BaseOperations):
+    """XCom operations."""
+
+    def get(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        map_index: int = None,  # type: ignore
+    ) -> XComResponseNative | ServerResponseError:
+        """Get an XCom entry."""
+        try:
+            params: dict[str, Any] = {}
+            if map_index is not None:
+                params["map_index"] = map_index
+            self.response = self.client.get(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+                params=params,
+            )
+            return 
XComResponseNative.model_validate_json(self.response.content)
+        except ServerResponseError as e:
+            raise e
+
+    def list(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        map_index: int = None,  # type: ignore
+        key: str = None,  # type: ignore
+    ) -> XComCollectionResponse | ServerResponseError:
+        """List XCom entries."""
+        params: dict[str, Any] = {}
+        if map_index is not None:
+            params["map_index"] = map_index
+        if key is not None:
+            params["xcom_key"] = key
+        return super().execute_list(
+            
path=f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
+            data_model=XComCollectionResponse,
+            params=params,
+        )
+
+    def add(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        value: str,
+        map_index: int = None,  # type: ignore
+    ) -> XComResponseNative | ServerResponseError:
+        """Add an XCom entry."""
+        try:
+            parsed_value = json.loads(value)
+        except (ValueError, TypeError):
+            parsed_value = value
+
+        body_dict: dict[str, Any] = {"key": key, "value": parsed_value}
+        if map_index is not None:
+            body_dict["map_index"] = map_index
+        body = XComCreateBody(**body_dict)
+        try:
+            self.response = self.client.post(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries",
+                json=body.model_dump(mode="json", exclude_unset=True),
+            )
+            return 
XComResponseNative.model_validate_json(self.response.content)
+        except ServerResponseError as e:
+            raise e
+
+    def edit(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        value: str,
+        map_index: int = None,  # type: ignore
+    ) -> XComResponseNative | ServerResponseError:
+        """Edit an XCom entry."""
+        try:
+            parsed_value = json.loads(value)
+        except (ValueError, TypeError):
+            parsed_value = value
+
+        body_dict: dict[str, Any] = {"value": parsed_value}
+        if map_index is not None:
+            body_dict["map_index"] = map_index
+        body = XComUpdateBody(**body_dict)
+        try:
+            self.response = self.client.patch(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+                json=body.model_dump(mode="json", exclude_unset=True),
+            )
+            return 
XComResponseNative.model_validate_json(self.response.content)
+        except ServerResponseError as e:
+            raise e
+
+    def delete(
+        self,
+        dag_id: str,
+        dag_run_id: str,
+        task_id: str,
+        key: str,
+        map_index: int = None,  # type: ignore
+    ) -> str | ServerResponseError:
+        """Delete an XCom entry."""
+        try:
+            params: dict[str, Any] = {}
+            if map_index is not None:
+                params["map_index"] = map_index
+            self.client.delete(
+                
f"dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{key}",
+                params=params,
+            )
+            return key
+        except ServerResponseError as e:
+            raise e

Review Comment:
   There are many scenarios. For example you can mark a task as "Success" and 
want to run downstream pipeline, however downstream tasks may depend on an xcom 
that parent task would have provided if it passed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to