kaxil commented on code in PR #46719:
URL: https://github.com/apache/airflow/pull/46719#discussion_r1956479951


##########
airflow/api_fastapi/execution_api/routes/xcoms.py:
##########
@@ -70,7 +70,7 @@ async def xcom_query(
         map_indexes=map_index,
         session=session,
     )
-    return query.with_entities(BaseXCom.value)
+    return query

Review Comment:
   Curious, why this change?



##########
airflow/api_fastapi/execution_api/routes/xcoms.py:
##########
@@ -175,7 +160,7 @@ def set_xcom(
     task_id: str,
     key: str,
     value: Annotated[
-        str,
+        JsonValue,

Review Comment:
   fyi: We would still do multiple serialization/ de-serialization steps that 
we should consolidate -- even after this PR.
   
   **Storing Data** (for GO-client example):
   
   1. Go serializes the object (json.Marshal) → Sends it as an HTTP request.
   2. FastAPI deserializes it (JsonValue) → Converts it into a Python dict.
   3. SQLAlchemy stores it (Column(JSON)) → Serializes it back into JSON in the 
database.
   
   
https://github.com/apache/airflow/blob/8f63b828ace09b1095229f22b0c6d1f0f85ea81a/airflow/models/xcom.py#L83
   
   
   
   
   
   **Retrieving Data**:
   
   1. SQLAlchemy loads it as a Python dict from the DB.
   2. FastAPI serializes it back into JSON.
   3. Go receives it and deserializes it (`json.Unmarshal` or a different 
library).



##########
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -346,11 +348,15 @@ def get_relevant_upstream_map_indexes(
 def _xcom_push(ti: RuntimeTaskInstance, key: str, value: Any, mapped_length: 
int | None = None) -> None:
     # Private function, as we don't want to expose the ability to manually set 
`mapped_length` to SDK
     # consumers
-    from airflow.models.xcom import XCom
+    from airflow.serialization.serde import serialize
 
     # TODO: Move XCom serialization & deserialization to Task SDK
     #   https://github.com/apache/airflow/issues/45231
-    value = XCom.serialize_value(value)
+
+    # The execution API server now deals in json compliant objects.
+    # It is responsibility of the client to handle any non native object 
serialization.
+    # serialize does just that.
+    value = serialize(value)

Review Comment:
   fyi: `XCom.serialize_value` uses `airflow.serialization.serde.serialize` and 
does a little more that is also used in `XComObjectStorageBackend`
   
   
https://github.com/apache/airflow/blob/8f63b828ace09b1095229f22b0c6d1f0f85ea81a/airflow/utils/json.py#L87-L105
   
   
https://github.com/apache/airflow/blob/main/providers/common/io/src/airflow/providers/common/io/xcom/backend.py#L73



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to