kaxil commented on issue #49456: URL: https://github.com/apache/airflow/issues/49456#issuecomment-2912523908
Coming in late. But I disagree in parts with the assessment. We can solve this issue without needing to deserialize using the Custom XCom backend. Firstly, why the discrepancy / bug? It is because we triple serialize XCom values when executing tasks as explained in https://github.com/apache/airflow/issues/45231#issuecomment-2659973296. 1. XCom is pushed via task return value or `ti.xcom_push` and then uses the object that is serialised using `CustomXComBackend` or the XCom class via the `XComEncoder` (e.g set, Asset, datetime) into Python JSON-serializable object. https://github.com/apache/airflow/blob/3208bbb9e08d19d02db6928fe01f99b538781641/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L526 https://github.com/apache/airflow/blob/3208bbb9e08d19d02db6928fe01f99b538781641/task-sdk/src/airflow/sdk/bases/xcom.py#L64-L71 https://github.com/apache/airflow/blob/3208bbb9e08d19d02db6928fe01f99b538781641/task-sdk/src/airflow/sdk/bases/xcom.py#L280-L283 2. FastAPI deserializes it (JsonValue) when API call is sent to API server → Converts it into a Python dict. https://github.com/apache/airflow/blob/3208bbb9e08d19d02db6928fe01f99b538781641/task-sdk/src/airflow/sdk/api/client.py#L414 https://github.com/apache/airflow/blob/3208bbb9e08d19d02db6928fe01f99b538781641/airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py#L204-L223 3. SQLAlchemy stores it (Column(JSON)) → Serializes it back into JSON in the database. https://github.com/apache/airflow/blob/3208bbb9e08d19d02db6928fe01f99b538781641/airflow-core/src/airflow/models/xcom.py#L79 But when de-serializing to show in the UI or API, we don't de-serialize using our Serialization module which happened in (1) above. --- Now, because we are sure that the value stored in DB will always be a valid JSON object, it is safe to de-serialize it using our Serializer. We don't need to use Custom XCom backend to display in the UI at least. There is also `full=False` param that can be used in our custom deserializer to just return string to display it in the UI. So UI can pass `deserialize=True`. It should show something like: <img width="1719" alt="Image" src="https://github.com/user-attachments/assets/b0288ccc-f62d-441d-b315-f0c0087daad8" /> ```diff diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py index 2da8b620d7..8c77a5140b 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/xcom.py @@ -69,41 +69,46 @@ def get_xcom_entry( stringify: Annotated[bool, Query()] = False, ) -> XComResponseNative | XComResponseString: """Get an XCom entry.""" + if deserialize: if not conf.getboolean("api", "enable_xcom_deserialize_support", fallback=False): raise HTTPException( status.HTTP_400_BAD_REQUEST, "XCom deserialization is disabled in configuration." ) - query = select(XComModel, XComModel.value) - else: - query = select(XComModel) - query = query.where( - XComModel.dag_id == dag_id, - XComModel.task_id == task_id, - XComModel.key == xcom_key, - XComModel.map_index == map_index, + xcom_query = XComModel.get_many( + run_id=dag_run_id, + key=xcom_key, + task_ids=task_id, + dag_ids=dag_id, + map_indexes=map_index, + session=session, + limit=1, ) - query = query.join(DR, and_(XComModel.dag_id == DR.dag_id, XComModel.run_id == DR.run_id)) - query = query.where(DR.run_id == dag_run_id) - query = query.options(joinedload(XComModel.dag_run).joinedload(DR.dag_model)) - if deserialize: - item = session.execute(query).one_or_none() - else: - item = session.scalars(query).one_or_none() + # We use `BaseXCom.get_many` to fetch XComs directly from the database, bypassing the XCom Backend. + # This avoids deserialization via the backend (e.g., from a remote storage like S3) and instead + # retrieves the raw serialized value from the database. + result = xcom_query.limit(1).first() - if item is None: + if result is None: raise HTTPException(status.HTTP_404_NOT_FOUND, f"XCom entry with key: `{xcom_key}` not found") + item = copy.copy(result) + + # We deserialize the value here to ensure that the response is in the expected format. + item.value = XComModel.deserialize_value(item) + if deserialize: - from airflow.sdk.execution_time.xcom import XCom + from airflow.serialization.serde import deserialize + + item.value = deserialize(XComModel.deserialize_value(item), full=False) + + else: + from airflow.serialization.serde import deserialize - xcom, value = item - xcom_stub = copy.copy(xcom) - xcom_stub.value = value - xcom_stub.value = XCom.deserialize_value(xcom_stub) - item = xcom_stub + item = copy.copy(result) + item.value = XComModel.deserialize_value(item) if stringify: return XComResponseString.model_validate(item) ``` This way we can avoid deserializing a Custom object. It can be further polished (and we should really consolidate amount of deserialization we do) but at least this would work right now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
