amoghrajesh commented on code in PR #44605:
URL: https://github.com/apache/airflow/pull/44605#discussion_r1873396736


##########
task_sdk/tests/api/test_client.py:
##########
@@ -148,3 +148,110 @@ def handle_request(request: httpx.Request) -> 
httpx.Response:
 
         result = client.variables.set(key="test_key", value="test_value", 
description="test_description")
         assert result == {"ok": True}
+
+
+class TestXCOMOperations:
+    """
+    Test that the XComOperations class works as expected. While the operations 
are simple, it
+    still catches the basic functionality of the client for xcoms including 
endpoint and
+    response parsing.
+    """
+
+    def test_xcom_get_success(self):
+        # Simulate a successful response from the server when getting an xcom
+        def handle_request(request: httpx.Request) -> httpx.Response:
+            if request.url.path == "/xcoms/dag_id/run_id/task_id/key":
+                return httpx.Response(
+                    status_code=201,
+                    json={"key": "test_key", "value": "test_value"},
+                )
+            return httpx.Response(status_code=400, json={"detail": "Bad 
Request"})
+
+        client = make_client(transport=httpx.MockTransport(handle_request))
+        result = client.xcoms.get(
+            dag_id="dag_id",
+            run_id="run_id",
+            task_id="task_id",
+            key="key",
+        )
+        assert isinstance(result, XComResponse)
+        assert result.key == "test_key"
+        assert result.value == "test_value"
+
+    def test_xcom_get_success_with_map_index(self):
+        # Simulate a successful response from the server when getting an xcom 
with map_index passed
+        def handle_request(request: httpx.Request) -> httpx.Response:
+            if (
+                request.url.path == "/xcoms/dag_id/run_id/task_id/key"
+                and request.url.params.get("map_index") == "2"
+            ):
+                return httpx.Response(
+                    status_code=201,
+                    json={"key": "test_key", "value": "test_value"},
+                )
+            return httpx.Response(status_code=400, json={"detail": "Bad 
Request"})
+
+        client = make_client(transport=httpx.MockTransport(handle_request))
+        result = client.xcoms.get(
+            dag_id="dag_id",
+            run_id="run_id",
+            task_id="task_id",
+            key="key",
+            map_index=2,
+        )
+        assert isinstance(result, XComResponse)
+        assert result.key == "test_key"
+        assert result.value == "test_value"
+
+    @pytest.mark.parametrize(
+        "values",
+        [
+            pytest.param("value1", id="string-value"),
+            pytest.param({"key1": "value1"}, id="dict-value"),
+            pytest.param(["value1", "value2"], id="list-value"),
+            pytest.param({"key": "test_key", "value": {"key2": "value2"}}, 
id="nested-dict-value"),
+        ],
+    )
+    def test_xcom_set_success(self, values):
+        # Simulate a successful response from the server when setting an xcom
+        def handle_request(request: httpx.Request) -> httpx.Response:
+            if request.url.path == "/xcoms/dag_id/run_id/task_id/key":
+                return httpx.Response(

Review Comment:
   Yeah makes sense. Added it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to