amoghrajesh commented on code in PR #47961:
URL: https://github.com/apache/airflow/pull/47961#discussion_r2004818254


##########
airflow/models/xcom.py:
##########
@@ -331,11 +331,44 @@ def serialize_value(
 
     @staticmethod
     def deserialize_value(result) -> Any:
-        """Deserialize XCom value from str objects."""
+        """
+        Deserialize XCom value from a database result.
+
+        If deserialization fails, the raw value is returned, which must still 
be a valid Python JSON-compatible
+        type (e.g., ``dict``, ``list``, ``str``, ``int``, ``float``, or 
``bool``).
+
+        XCom values are stored as JSON in the database, and SQLAlchemy 
automatically handles
+        serialization (``json.dumps``) and deserialization (``json.loads``). 
However, we
+        use a custom encoder for serialization (``serialize_value``) and 
deserialization to handle special
+        cases, such as encoding tuples via the Airflow Serialization module. 
These must be decoded
+        using ``XComDecoder`` to restore original types.
+
+        Some XCom values, such as those set via the Task Execution API, bypass 
``serialize_value``
+        and are stored directly in JSON format. Since these values are already 
deserialized
+        by SQLAlchemy, they are returned as-is.
+
+        **Example: Handling a tuple**:
+
+        .. code-block:: python
+
+            original_value = (1, 2, 3)
+            serialized_value = XComModel.serialize_value(original_value)
+            print(serialized_value)
+            # '{"__classname__": "builtins.tuple", "__version__": 1, 
"__data__": [1, 2, 3]}'
+
+        This serialized value is stored in the database. When deserialized, 
the value is restored to the original tuple.
+
+        :param result: The XCom database row or object containing a ``value`` 
attribute.
+        :return: The deserialized Python object.
+        """
         if result.value is None:
             return None
 
-        return json.loads(result.value, cls=XComDecoder)
+        try:
+            return json.loads(result.value, cls=XComDecoder)
+        except (ValueError, TypeError):
+            # Already deserialized (e.g., set via Task Execution API)
+            return result.value

Review Comment:
   I think the UI ends up calling the public APIs right?



##########
tests/api_fastapi/execution_api/routes/test_xcoms.py:
##########
@@ -163,6 +163,69 @@ def test_xcom_set(self, client, create_task_instance, 
session, value, expected_v
         task_map = session.query(TaskMap).filter_by(task_id=ti.task_id, 
dag_id=ti.dag_id).one_or_none()
         assert task_map is None, "Should not be mapped"
 
+    @pytest.mark.parametrize(
+        "orig_value, ser_value, deser_value",
+        [
+            pytest.param(1, 1, 1, id="int"),
+            pytest.param(1.0, 1.0, 1.0, id="float"),
+            pytest.param("string", "string", "string", id="str"),
+            pytest.param(True, True, True, id="bool"),
+            pytest.param({"key": "value"}, {"key": "value"}, {"key": "value"}, 
id="dict"),
+            pytest.param([1, 2], [1, 2], [1, 2], id="list"),
+            pytest.param(
+                (1, 2),
+                # Client serializes tuple as encoded list, send the encoded 
list to the API
+                {"__classname__": "builtins.tuple", "__data__": [1, 2], 
"__version__": 1},
+                # The API will send the encoded list to the DB and sends the 
same encoded list back
+                # during the response to the client as it is the clients 
responsibility to
+                # serialize it into a JSON object & deserialize value into a 
native object.
+                {"__classname__": "builtins.tuple", "__data__": [1, 2], 
"__version__": 1},
+                id="tuple",
+            ),

Review Comment:
   Good! I like how you have handled all cases!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to