kaxil commented on issue #49456:
URL: https://github.com/apache/airflow/issues/49456#issuecomment-2912523908

   Coming in late. But I disagree in parts with the assessment. 
   
   We can solve this issue without needing to deserialize using the Custom XCom 
backend.
   
   Firstly, why the discrepancy / bug? It is because we triple serialize XCom 
values when executing tasks as explained in 
https://github.com/apache/airflow/issues/45231#issuecomment-2659973296.
   
   
   1. XCom is pushed via task return value or `ti.xcom_push` and then uses the 
object that is serialised using `CustomXComBackend` or the XCom class via the 
`XComEncoder`  (e.g set, Asset, datetime) into Python JSON-serializable object.
   
       
https://github.com/apache/airflow/blob/3208bbb9e08d19d02db6928fe01f99b538781641/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L526
   
       
https://github.com/apache/airflow/blob/3208bbb9e08d19d02db6928fe01f99b538781641/task-sdk/src/airflow/sdk/bases/xcom.py#L64-L71
   
       
https://github.com/apache/airflow/blob/3208bbb9e08d19d02db6928fe01f99b538781641/task-sdk/src/airflow/sdk/bases/xcom.py#L280-L283
   
   2. FastAPI deserializes it (JsonValue) when API call is sent to API server → 
Converts it into a Python dict.
   
       
https://github.com/apache/airflow/blob/3208bbb9e08d19d02db6928fe01f99b538781641/task-sdk/src/airflow/sdk/api/client.py#L414
   
       
https://github.com/apache/airflow/blob/3208bbb9e08d19d02db6928fe01f99b538781641/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py#L204-L223
   
   
   3. SQLAlchemy stores it (Column(JSON)) → Serializes it back into JSON in the 
database.
   
       
https://github.com/apache/airflow/blob/3208bbb9e08d19d02db6928fe01f99b538781641/airflow-core/src/airflow/models/xcom.py#L79
   
   
   But when de-serializing to show in the UI or API, we don't de-serialize 
using our Serialization module which happened in (1) above.
   
   
   ---
   
   Now, because we are sure that the value stored in DB will always be a valid 
JSON object, it is safe to de-serialize it using our Serializer. We don't need 
to use Custom XCom backend to display in the UI at least.
   
   
   There is also `full=False` param that can be used in our custom deserializer 
to just return string to display it in the UI. So UI can pass 
`deserialize=True`.
   
   It should show something like:
   
   <img width="1719" alt="Image" 
src="https://github.com/user-attachments/assets/b0288ccc-f62d-441d-b315-f0c0087daad8";
 />
   
   ```diff
   diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
   index 2da8b620d7..8c77a5140b 100644
   --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
   +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py
   @@ -69,41 +69,46 @@ def get_xcom_entry(
        stringify: Annotated[bool, Query()] = False,
    ) -> XComResponseNative | XComResponseString:
        """Get an XCom entry."""
   +
        if deserialize:
            if not conf.getboolean("api", "enable_xcom_deserialize_support", 
fallback=False):
                raise HTTPException(
                    status.HTTP_400_BAD_REQUEST, "XCom deserialization is 
disabled in configuration."
                )
   -        query = select(XComModel, XComModel.value)
   -    else:
   -        query = select(XComModel)
    
   -    query = query.where(
   -        XComModel.dag_id == dag_id,
   -        XComModel.task_id == task_id,
   -        XComModel.key == xcom_key,
   -        XComModel.map_index == map_index,
   +    xcom_query = XComModel.get_many(
   +        run_id=dag_run_id,
   +        key=xcom_key,
   +        task_ids=task_id,
   +        dag_ids=dag_id,
   +        map_indexes=map_index,
   +        session=session,
   +        limit=1,
        )
   -    query = query.join(DR, and_(XComModel.dag_id == DR.dag_id, 
XComModel.run_id == DR.run_id))
   -    query = query.where(DR.run_id == dag_run_id)
   -    query = 
query.options(joinedload(XComModel.dag_run).joinedload(DR.dag_model))
    
   -    if deserialize:
   -        item = session.execute(query).one_or_none()
   -    else:
   -        item = session.scalars(query).one_or_none()
   +    # We use `BaseXCom.get_many` to fetch XComs directly from the database, 
bypassing the XCom Backend.
   +    # This avoids deserialization via the backend (e.g., from a remote 
storage like S3) and instead
   +    # retrieves the raw serialized value from the database.
   +    result = xcom_query.limit(1).first()
    
   -    if item is None:
   +    if result is None:
            raise HTTPException(status.HTTP_404_NOT_FOUND, f"XCom entry with 
key: `{xcom_key}` not found")
    
   +    item = copy.copy(result)
   +
   +    # We deserialize the value here to ensure that the response is in the 
expected format.
   +    item.value = XComModel.deserialize_value(item)
   +
        if deserialize:
   -        from airflow.sdk.execution_time.xcom import XCom
   +        from airflow.serialization.serde import deserialize
   +
   +        item.value = deserialize(XComModel.deserialize_value(item), 
full=False)
   +
   +    else:
   +        from airflow.serialization.serde import deserialize
    
   -        xcom, value = item
   -        xcom_stub = copy.copy(xcom)
   -        xcom_stub.value = value
   -        xcom_stub.value = XCom.deserialize_value(xcom_stub)
   -        item = xcom_stub
   +        item = copy.copy(result)
   +        item.value = XComModel.deserialize_value(item)
    
        if stringify:
            return XComResponseString.model_validate(item)
   ```
   
   This way we can avoid deserializing a Custom object. It can be further 
polished (and we should really consolidate amount of deserialization we do) but 
at least this would work right now. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to