amoghrajesh commented on code in PR #58992:
URL: https://github.com/apache/airflow/pull/58992#discussion_r2597432869
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py:
##########
@@ -101,16 +101,25 @@ def get_xcom_entry(
item = copy.copy(result)
if deserialize:
- # We use `airflow.serialization.serde` for deserialization here
because custom XCom backends (with their own
- # serializers/deserializers) are only used on the worker side during
task execution.
-
- # However, the XCom value is *always* stored in the metadata database
as a valid JSON object.
- # Therefore, for purposes such as UI display or returning API
responses, deserializing with
- # `airflow.serialization.serde` is safe and recommended.
- from airflow.serialization.serde import deserialize as
serde_deserialize
-
- # full=False ensures that the `item` is deserialized without loading
the classes, and it returns a stringified version
- item.value = serde_deserialize(XComModel.deserialize_value(item),
full=False)
+ # Custom XCom backends may store references (eg: object storage paths)
in the database.
+ # The custom XCom backend's deserialize_value() resolves these to
actual values, but that is only
+ # used on workers during task execution. The API reads directly from
the database and uses
+ # stringify() to convert DB values (references or serialized data) to
human readable
+ # format for UI display or for API users.
+ import json
+
+ from airflow.serialization.stringify import stringify as stringify_xcom
+
+ try:
+ parsed_value = json.loads(result.value)
+ except (ValueError, TypeError):
+ # Already deserialized (e.g., set via Task Execution API)
+ parsed_value = result.value
Review Comment:
This is same as:
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/xcom.py#L354-L392
Basically its to account for this:
1. When xcoms are pushed via API, they undergo: `serialize_value` before
storing in DB
2. When xcoms are pushed in task execution, they are serialized and sent
from sdk and hence they do not undergo `serialize_value` again to not having to
serialize sdk objects in API server
So for API values, `json.loads` is needed but not for the task execution
ones.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py:
##########
@@ -100,16 +100,25 @@ def get_xcom_entry(
item = copy.copy(result)
if deserialize:
- # We use `airflow.serialization.serde` for deserialization here
because custom XCom backends (with their own
- # serializers/deserializers) are only used on the worker side during
task execution.
-
- # However, the XCom value is *always* stored in the metadata database
as a valid JSON object.
- # Therefore, for purposes such as UI display or returning API
responses, deserializing with
- # `airflow.serialization.serde` is safe and recommended.
- from airflow.serialization.serde import deserialize as
serde_deserialize
-
- # full=False ensures that the `item` is deserialized without loading
the classes, and it returns a stringified version
- item.value = serde_deserialize(XComModel.deserialize_value(item),
full=False)
+ # Custom XCom backends may store references (eg: object storage paths)
in the database.
+ # The custom XCom backend's deserialize_value() resolves these to
actual values, but that is only
+ # used on workers during task execution. The API reads directly from
the database and uses
+ # stringify() to convert DB values (references or serialized data) to
human readable
+ # format for UI display or for API users.
+ import json
+
+ from airflow.serialization.stringify import stringify as stringify_xcom
+
+ try:
+ parsed_value = json.loads(result.value)
+ except (ValueError, TypeError):
+ # Already deserialized (e.g., set via Task Execution API)
+ parsed_value = result.value
+
+ try:
+ item.value = stringify_xcom(parsed_value)
+ except ValueError:
Review Comment:
Yeah sure.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]