amoghrajesh commented on code in PR #58992:
URL: https://github.com/apache/airflow/pull/58992#discussion_r2597432869


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py:
##########
@@ -101,16 +101,25 @@ def get_xcom_entry(
     item = copy.copy(result)
 
     if deserialize:
-        # We use `airflow.serialization.serde` for deserialization here 
because custom XCom backends (with their own
-        # serializers/deserializers) are only used on the worker side during 
task execution.
-
-        # However, the XCom value is *always* stored in the metadata database 
as a valid JSON object.
-        # Therefore, for purposes such as UI display or returning API 
responses, deserializing with
-        # `airflow.serialization.serde` is safe and recommended.
-        from airflow.serialization.serde import deserialize as 
serde_deserialize
-
-        # full=False ensures that the `item` is deserialized without loading 
the classes, and it returns a stringified version
-        item.value = serde_deserialize(XComModel.deserialize_value(item), 
full=False)
+        # Custom XCom backends may store references (eg: object storage paths) 
in the database.
+        # The custom XCom backend's deserialize_value() resolves these to 
actual values, but that is only
+        # used on workers during task execution. The API reads directly from 
the database and uses
+        # stringify() to convert DB values (references or serialized data) to 
human readable
+        # format for UI display or for API users.
+        import json
+
+        from airflow.serialization.stringify import stringify as stringify_xcom
+
+        try:
+            parsed_value = json.loads(result.value)
+        except (ValueError, TypeError):
+            # Already deserialized (e.g., set via Task Execution API)
+            parsed_value = result.value

Review Comment:
   This is same as: 
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/xcom.py#L354-L392
   
   Basically its to account for this:
   1. When xcoms are pushed via API, they undergo: `serialize_value` before 
storing in DB
   2. When xcoms are pushed in task execution, they are serialized and sent 
from sdk and hence they do not undergo `serialize_value` again to not having to 
serialize sdk objects in API server
   
   So for API values, `json.loads` is needed but not for the task execution 
ones.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py:
##########
@@ -100,16 +100,25 @@ def get_xcom_entry(
     item = copy.copy(result)
 
     if deserialize:
-        # We use `airflow.serialization.serde` for deserialization here 
because custom XCom backends (with their own
-        # serializers/deserializers) are only used on the worker side during 
task execution.
-
-        # However, the XCom value is *always* stored in the metadata database 
as a valid JSON object.
-        # Therefore, for purposes such as UI display or returning API 
responses, deserializing with
-        # `airflow.serialization.serde` is safe and recommended.
-        from airflow.serialization.serde import deserialize as 
serde_deserialize
-
-        # full=False ensures that the `item` is deserialized without loading 
the classes, and it returns a stringified version
-        item.value = serde_deserialize(XComModel.deserialize_value(item), 
full=False)
+        # Custom XCom backends may store references (eg: object storage paths) 
in the database.
+        # The custom XCom backend's deserialize_value() resolves these to 
actual values, but that is only
+        # used on workers during task execution. The API reads directly from 
the database and uses
+        # stringify() to convert DB values (references or serialized data) to 
human readable
+        # format for UI display or for API users.
+        import json
+
+        from airflow.serialization.stringify import stringify as stringify_xcom
+
+        try:
+            parsed_value = json.loads(result.value)
+        except (ValueError, TypeError):
+            # Already deserialized (e.g., set via Task Execution API)
+            parsed_value = result.value
+
+        try:
+            item.value = stringify_xcom(parsed_value)
+        except ValueError:

Review Comment:
   Yeah sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to