amoghrajesh opened a new pull request, #47339:
URL: https://github.com/apache/airflow/pull/47339
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!--
Thank you for contributing! Please make sure that your code changes
are covered with tests. And in case of new features or big changes
remember to adjust the documentation.
Feel free to ping committers for the review!
In case of an existing issue, reference it using one of the following:
closes: #ISSUE
related: #ISSUE
How to write a good git commit message:
http://chris.beams.io/posts/git-commit/
-->
closes: #45481
## What?
XCom backends allow us to customise and store the xcoms to different
backends, it is of use because:
The default XCom backend is the
[BaseXCom](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/xcom/index.html#airflow.models.xcom.BaseXCom)
class, which stores XComs in the Airflow database. This is fine for small
values, but can be problematic for large values, or for large numbers of XComs.
Docs:
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html#custom-xcom-backends
## Important changes
### Models XCOM
1. The BaseXcom model was a big model that served the purpose of ORM,
operations on the table, utilities for custom xcom backends and much more.
2. This is not great, it makes it really hard to understand things as well
as move things over to the task SDK because everything is so tightly coupled.
3. Using this as an opportunity to do some things:
- Extracting `XComModel` out of `BaseXCom`
- `XComModel` will only have table related stuff and some utilities like:
`get_many`, `clear`, `set` stays as its hard to remove for now.
- `BaseXCom` is a class that will be moved to the task sdk under
`airflow.sdk.execution_time` -- thought a bit, its not a definition, so maybe
`definitions` isnt the right place for it. Although we can softly argue on this
one
### Execution Time XCOMs
1. New module introduced to store the definition for BaseXcom and serve as
building blocks for custom xcom backends
2. This uses the new task sdk architecture and all the utilties defined here
use the new execution API rather than direct DB access.
3. For now, this has:
a) `set` - store an xcom value, for BaseXcom store the `value` in the DB and
for xcom backends, store the `path` in the DB. This part handles it:
```
value = cls.serialize_value(
value=value,
key=key,
task_id=task_id,
dag_id=dag_id,
run_id=run_id,
map_index=map_index,
)
```
b) `get_value` - retrieve a xcom value, uses the new task SDK.
c) `get_one` - similar to what we had earlier in models. Uses the task sdk
to get an xcom value, either from DB or from the custom xcom backend if
configured. That part is handled here:
```
if msg.value is not None:
return cls.deserialize_value(msg.value)
return None
```
d) `serialize_value` - for normal cases, we serialise the value we get from
the DB for case of tables, and this is overriden by XCOM backends, so needn't
have to worry about that.
e) `deserialize_value` - similar to the above case.
f) `purge` - used to purge an xcom entry, mainly for custom xcom backends.
Empty for ORM
g) `delete` - used to delete an xcom. Added a new utility, will explain
below why this was added.
2. `resolve_xcom_backend` is used to resolve a custom XCom class if provided
in the conf or just return the `BaseXcom` class, exactly ditto as earlier
behaviour.
### Execution Time changes
1. Comms has a new `DeleteXCom` introduced
2. Supervisor uses `DeleteXCom` when called
3. Task runner
- The `xcom_pull` and `xcom_push` now use the utilities in
`execution_time/xcom.py`
- We now get the xcom keys to clear when we mark a task first as running and
we individually delete those instead of doing it in the execution API server
due to couple issues:
- If /run endpoint was called mutliple times, we would end up deleting some
xcoms that we shouldn';t or we'd fail
### Execution API server changes
#### Task Instances
1. The `ti_run` endpoint now sends `xcom_keys_to_clear` which is basically:
```
query = session.query(XComModel.key).filter_by(
dag_id=ti.dag_id, task_id=ti.task_id, run_id=ti.run_id
)
if map_index is not None:
query = query.filter_by(map_index=map_index)
xcom_keys = session.execute(query)
```
2. This is used by the task runner to send delete requests to clear these
xcoms.
#### Xcoms
1. `get_xcom`, `set_xcom` now directly query the DB instead of using the
`xcom.set` or `xcom.get_many` utilities.
2. New endpoint to `delete xcoms` which will be called by the task runner.
### Core api changes
1. The Xcom apis in core_api have been modified to make up for the chanegs
to the `BaseXcom` model.
2. Mostly residual changes to manage the changes to the models.
### Other changes
1. Any other changes are mostly residual -- tests, devel-common, etc to
adjust / make up for the changes to the models
2. If anything is unexpected, please shout.
## Testing
### Situation 1: Using the database for xcoms (no xcom backend)
Using DAG:
```
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
def push_to_xcom(**kwargs):
value = ["This is a long message"] * 10
return value
#
# def push_to_xcom2(**kwargs):
# value = ["Hello, XCom2!"]
# return value
def pull_from_xcom(**kwargs):
ti = kwargs['ti']
xcom_value1 = ti.xcom_pull(task_ids=["push_xcom_task"])
print("Pulled value", xcom_value1)
topush = xcom_value1 + ["modified"] * 10
print("Pushing value", topush)
# xcom_value2 = ti.xcom_pull(task_ids=["push_xcom_task2"])
return topush
with DAG(
'xcom_example',
schedule=None,
catchup=False,
) as dag:
push_xcom_task = PythonOperator(
task_id='push_xcom_task',
python_callable=push_to_xcom,
)
# push_xcom_task2 = PythonOperator(
# task_id='push_xcom_task2',
# python_callable=push_to_xcom2,
# )
pull_xcom_task = PythonOperator(
task_id='pull_xcom_task',
python_callable=pull_from_xcom,
)
push_xcom_task >> pull_xcom_task
```
Success:

Xcom pushed by task 1:

Task 1 logs:

Sends the `SetXcom` call:
```
[2025-03-10, 11:21:06] DEBUG - Sending request
json="{\"key\":\"return_value\",\"value\":[\"This is a long message\",\"This is
a long message\",\"This is a long message\",\"This is a long message\",\"This
is a long message\",\"This is a long message\",\"This is a long
message\",\"This is a long message\",\"This is a long message\",\"This is a
long
message\"],\"dag_id\":\"xcom_example\",\"run_id\":\"manual__2025-03-10T11:21:05.415784+00:00_uE7ibsBB\",\"task_id\":\"push_xcom_task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n"
source="task"
```
Xcom pushed by task 2:

Task 2 logs:

Sends the GetXcom call:
```
Sending request
json="{\"key\":\"return_value\",\"dag_id\":\"xcom_example\",\"run_id\":\"manual__2025-03-10T11:21:05.415784+00:00_uE7ibsBB\",\"task_id\":\"push_xcom_task\",\"map_index\":-1,\"type\":\"GetXCom\"}\n"
source="task"
```
Status of the table:
```
1,push_xcom_task,-1,return_value,xcom_example,manual__2025-03-10T11:21:05.415784+00:00_uE7ibsBB,"[""This
is a long message"", ""This is a long message"", ""This is a long message"",
""This is a long message"", ""This is a long message"", ""This is a long
message"", ""This is a long message"", ""This is a long message"", ""This is a
long message"", ""This is a long message""]",2025-03-10 11:21:06.671134 +00:00
1,pull_xcom_task,-1,return_value,xcom_example,manual__2025-03-10T11:21:05.415784+00:00_uE7ibsBB,"[""This
is a long message"", ""This is a long message"", ""This is a long message"",
""This is a long message"", ""This is a long message"", ""This is a long
message"", ""This is a long message"", ""This is a long message"", ""This is a
long message"", ""This is a long message"", ""modified"", ""modified"",
""modified"", ""modified"", ""modified"", ""modified"", ""modified"",
""modified"", ""modified"", ""modified""]",2025-03-10 11:21:07.493851 +00:00
```
Observe that the data has been stored in the table in a native python object
manner (json complaint) and actual data is stored and not a reference of it.
### Situation 2: Using a custom xcom backend.
Using DAG:
```
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
def push_to_xcom(**kwargs):
value = ["This is a long message"] * 10
return value
#
# def push_to_xcom2(**kwargs):
# value = ["Hello, XCom2!"]
# return value
def pull_from_xcom(**kwargs):
ti = kwargs['ti']
xcom_value1 = ti.xcom_pull(task_ids=["push_xcom_task"])
print("Pulled value", xcom_value1)
topush = xcom_value1 + ["modified"] * 10
print("Pushing value", topush)
# xcom_value2 = ti.xcom_pull(task_ids=["push_xcom_task2"])
return topush
with DAG(
'xcom_example',
schedule=None,
catchup=False,
) as dag:
push_xcom_task = PythonOperator(
task_id='push_xcom_task',
python_callable=push_to_xcom,
)
# push_xcom_task2 = PythonOperator(
# task_id='push_xcom_task2',
# python_callable=push_to_xcom2,
# )
pull_xcom_task = PythonOperator(
task_id='pull_xcom_task',
python_callable=pull_from_xcom,
)
push_xcom_task >> pull_xcom_task
```
#### Setup
1. Wrote a custom xcom backend that works by storing xcoms in a JSON format
in the worker file system:
```
from __future__ import annotations
import json
import os
from typing import Any, TypeVar
from airflow.sdk.execution_time.xcom import BaseXCom
from airflow.utils.json import XComDecoder, XComEncoder
T = TypeVar("T")
class JSONFileXComBackend(BaseXCom):
FILE_PATH = "/tmp/airflow_xcom.json"
@staticmethod
def _read_xcom_file() -> dict:
"""Read the XCom JSON file."""
if not os.path.exists(JSONFileXComBackend.FILE_PATH):
return {}
with open(JSONFileXComBackend.FILE_PATH, "r") as f:
try:
return json.load(f)
except json.JSONDecodeError:
return {}
@staticmethod
def _write_xcom_file(data: dict) -> None:
"""Write data to the XCom JSON file."""
with open(JSONFileXComBackend.FILE_PATH, "a+") as f:
json.dump(data, f, indent=4)
@staticmethod
def serialize_value(
value: T,
*,
key: str | None = None,
task_id: str | None = None,
dag_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
) -> str:
# we will always serialize ourselves and not by BaseXCom as the
deserialize method
# from BaseXCom accepts only XCom objects and not the value directly
s_val = json.dumps(value, cls=XComEncoder)
s_val_encoded = s_val.encode("utf-8")
base_path = JSONFileXComBackend.FILE_PATH
with open(base_path, mode="wb") as f:
f.write(s_val_encoded)
return BaseXCom.serialize_value(base_path)
@staticmethod
def deserialize_value(result) -> Any:
"""
Deserializes the value from the database or object storage.
Compression is inferred from the file extension.
"""
data = BaseXCom.deserialize_value(result)
try:
path = JSONFileXComBackend.FILE_PATH
except (TypeError, ValueError):
return data
try:
with open(path, mode="rb") as f:
return json.load(f, cls=XComDecoder)
except (TypeError, ValueError):
return data
```
2. Set up breeze to use this backend (set up a `init.sh`)
```
export AIRFLOW__CORE__XCOM_BACKEND="my_xcom.JSONFileXComBackend"
```
3. Launch breeze start-airflow normally
Run the dag normally
Task 1

XCOm pushed from task 1:

Logs:

Logs showing what was actually pushed:
```
[2025-03-10, 11:31:03] INFO - Done. Returned value was: ['This is a long
message', 'This is a long message', 'This is a long message', 'This is a long
message', 'This is a long message', 'This is a long message', 'This is a long
message', 'This is a long message', 'This is a long message', 'This is a long
message']
source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
```
The SetXcom call:
```
[2025-03-10, 11:31:03] DEBUG - Sending request
json="{\"key\":\"return_value\",\"value\":\"/tmp/airflow_xcom.json\",\"dag_id\":\"xcom_example\",\"run_id\":\"manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b\",\"task_id\":\"push_xcom_task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n"
source="task"
```
Task 2:

Xcom pushed by task 2:

Important logs from task 2:
```
2025-03-10, 11:31:04] INFO - Pushing xcom
ti="RuntimeTaskInstance(id=UUID('01957fd2-9452-757c-8488-1bf67ddfa350'),
task_id='pull_xcom_task', dag_id='xcom_example',
run_id='manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b', try_number=1,
map_index=-1, hostname='0ce659e47888', task=<Task(PythonOperator):
pull_xcom_task>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0,
start_date=datetime.datetime(2025, 3, 10, 11, 31, 3, 845298,
tzinfo=TzInfo(UTC)))" source="task"
[2025-03-10, 11:31:04] INFO - Pulled value ['This is a long message', 'This
is a long message', 'This is a long message', 'This is a long message', 'This
is a long message', 'This is a long message', 'This is a long message', 'This
is a long message', 'This is a long message', 'This is a long message']
chan="stdout" source="task"
[2025-03-10, 11:31:04] DEBUG - Sending request
json="{\"key\":\"return_value\",\"value\":\"/tmp/airflow_xcom.json\",\"dag_id\":\"xcom_example\",\"run_id\":\"manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b\",\"task_id\":\"pull_xcom_task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n"
source="task"
[2025-03-10, 11:31:04] INFO - Pushing value ['This is a long message', 'This
is a long message', 'This is a long message', 'This is a long message', 'This
is a long message', 'This is a long message', 'This is a long message', 'This
is a long message', 'This is a long message', 'This is a long message',
'modified', 'modified', 'modified', 'modified', 'modified', 'modified',
'modified', 'modified', 'modified', 'modified'] chan="stdout" source="task"
```
Inside the worker, check for the log json file:
```
oot@0ce659e47888:/opt/airflow# cat /tmp/airflow_xcom.json
["This is a long message", "This is a long message", "This is a long
message", "This is a long message", "This is a long message", "This is a long
message", "This is a long message", "This is a long message", "This is a long
message", "This is a long message", "modified", "modified", "modified",
"modified", "modified", "modified", "modified", "modified", "modified",
"modified"]root@0ce659e47888:/opt/airflow#
```
Status of the table:
```
1,push_xcom_task,-1,return_value,xcom_example,manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b,"""/tmp/airflow_xcom.json""",2025-03-10
11:31:03.391402 +00:00
1,pull_xcom_task,-1,return_value,xcom_example,manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b,"""/tmp/airflow_xcom.json""",2025-03-10
11:31:04.431473 +00:00
```
Observe that the path is stored.
## TODO / Whats next:
- [ ] Fix the tests
- [ ] Write new tests for my changes
- [ ] Add a significant news fragment
- [ ] Communicate to follow ups like
https://github.com/apache/airflow/issues/47195
<!-- Please keep an empty line above the dashes. -->
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]