amoghrajesh opened a new pull request, #47339:
URL: https://github.com/apache/airflow/pull/47339

   <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
   
      http://www.apache.org/licenses/LICENSE-2.0
   
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
   
   <!--
   Thank you for contributing! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   In case of an existing issue, reference it using one of the following:
   
   closes: #ISSUE
   related: #ISSUE
   
   How to write a good git commit message:
   http://chris.beams.io/posts/git-commit/
   -->
   
   closes: #45481
   
   
   ## What?
   
   XCom backends allow us to customise and store the xcoms to different 
backends, it is of use because:
   The default XCom backend is the 
[BaseXCom](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/xcom/index.html#airflow.models.xcom.BaseXCom)
 class, which stores XComs in the Airflow database. This is fine for small 
values, but can be problematic for large values, or for large numbers of XComs.
   
   Docs: 
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/xcoms.html#custom-xcom-backends
   
   
   ## Important changes
   
   ### Models XCOM
   1. The BaseXcom model was a big model that served the purpose of ORM, 
operations on the table, utilities for custom xcom backends and much more.
   2. This is not great, it makes it really hard to understand things as well 
as move things over to the task SDK because everything is so tightly coupled.
   3. Using this as an opportunity to do some things:
   - Extracting `XComModel` out of `BaseXCom`
   - `XComModel` will only have table related stuff and some utilities like: 
`get_many`, `clear`, `set` stays as its hard to remove for now.
   - `BaseXCom` is a class that will be moved to the task sdk under 
`airflow.sdk.execution_time` -- thought a bit, its not a definition, so maybe 
`definitions` isnt the right place for it. Although we can softly argue on this 
one
   
   
   ### Execution Time XCOMs
   1. New module introduced to store the definition for BaseXcom and serve as 
building blocks for custom xcom backends
   2. This uses the new task sdk architecture and all the utilties defined here 
use the new execution API rather than direct DB access.
   3. For now, this has:
   
   a) `set` - store an xcom value, for BaseXcom store the `value` in the DB and 
for xcom backends, store the `path` in the DB. This part handles it:
   ```
   
           value = cls.serialize_value(
               value=value,
               key=key,
               task_id=task_id,
               dag_id=dag_id,
               run_id=run_id,
               map_index=map_index,
           )
   ```
   
   b) `get_value`  - retrieve a xcom value, uses the new task SDK.
   
   c) `get_one` - similar to what we had earlier in models. Uses the task sdk 
to get an xcom value, either from DB or from the custom xcom backend if 
configured. That part is handled here:
   ```
           if msg.value is not None:
               return cls.deserialize_value(msg.value)
           return None
   ```
   
   
   d) `serialize_value` - for normal cases, we serialise the value we get from 
the DB for case of tables, and this is overriden by XCOM backends, so needn't 
have to worry about that.
   
   e) `deserialize_value` - similar to the above case.
   
   f) `purge` - used to purge an xcom entry, mainly for custom xcom backends. 
Empty for ORM
   
   g) `delete` - used to delete an xcom. Added a new utility, will explain 
below why this was added.
   
   
   2. `resolve_xcom_backend` is used to resolve a custom XCom class if provided 
in the conf or just return the `BaseXcom` class, exactly ditto as earlier 
behaviour.
   
   
   ### Execution Time changes
   
   1. Comms has a new `DeleteXCom` introduced
   2. Supervisor uses `DeleteXCom` when called
   3. Task runner 
   
   - The `xcom_pull` and `xcom_push` now use the utilities in 
`execution_time/xcom.py`
   - We now get the xcom keys to clear when we mark a task first as running and 
we individually delete those instead of doing it in the execution API server 
due to couple issues:
   - If /run endpoint was called mutliple times, we would end up deleting some 
xcoms that we shouldn';t or we'd fail
   
   ### Execution API server changes
   
   #### Task Instances
   1. The `ti_run` endpoint now sends `xcom_keys_to_clear` which is basically:
   ```
   query = session.query(XComModel.key).filter_by(
                   dag_id=ti.dag_id, task_id=ti.task_id, run_id=ti.run_id
               )
               if map_index is not None:
                   query = query.filter_by(map_index=map_index)
   
               xcom_keys = session.execute(query)
   ```
   2. This is used by the task runner to send delete requests to clear these 
xcoms.
   
   #### Xcoms
   
   1. `get_xcom`, `set_xcom` now directly query the DB instead of using the 
`xcom.set` or `xcom.get_many` utilities.
   2. New endpoint to `delete xcoms` which will be called by the task runner.
   
   ### Core api changes
   
   1. The Xcom apis in core_api have been modified to make up for the chanegs 
to the `BaseXcom` model.
   2. Mostly residual changes to manage the changes to the models.
   
   ### Other changes
   
   1. Any other changes are mostly residual -- tests, devel-common, etc to 
adjust / make up for the changes to the models
   2. If anything is unexpected, please shout.
   
   
   ## Testing
   
   ### Situation 1: Using the database for xcoms (no xcom backend)
   
   Using DAG:
   ```
   from airflow import DAG
   from airflow.providers.standard.operators.python import PythonOperator
   
   def push_to_xcom(**kwargs):
       value = ["This is a long message"] * 10
       return value
   #
   # def push_to_xcom2(**kwargs):
   #     value = ["Hello, XCom2!"]
   #     return value
   
   def pull_from_xcom(**kwargs):
       ti = kwargs['ti']
       xcom_value1 = ti.xcom_pull(task_ids=["push_xcom_task"])
   
       print("Pulled value", xcom_value1)
   
       topush = xcom_value1 + ["modified"] * 10
       print("Pushing value", topush)
   
       # xcom_value2 = ti.xcom_pull(task_ids=["push_xcom_task2"])
       return topush
   
   with DAG(
       'xcom_example',
       schedule=None,
       catchup=False,
   ) as dag:
   
       push_xcom_task = PythonOperator(
           task_id='push_xcom_task',
           python_callable=push_to_xcom,
       )
   
       # push_xcom_task2 = PythonOperator(
       #     task_id='push_xcom_task2',
       #     python_callable=push_to_xcom2,
       # )
   
       pull_xcom_task = PythonOperator(
           task_id='pull_xcom_task',
           python_callable=pull_from_xcom,
       )
   
       push_xcom_task >> pull_xcom_task
   
   ```
   
   
   Success:
   
![image](https://github.com/user-attachments/assets/083277f6-f3ff-46e5-bf61-997f0e7b79ad)
   
   Xcom pushed by task 1:
   
   
![image](https://github.com/user-attachments/assets/bfa066b3-231d-48a1-9b8f-c0c394878e55)
   
   Task 1 logs:
   
![image](https://github.com/user-attachments/assets/4dcd5261-4edb-4a3f-9402-4b64acb0ab3d)
   
   Sends the `SetXcom` call:
   ```
   [2025-03-10, 11:21:06] DEBUG - Sending request 
json="{\"key\":\"return_value\",\"value\":[\"This is a long message\",\"This is 
a long message\",\"This is a long message\",\"This is a long message\",\"This 
is a long message\",\"This is a long message\",\"This is a long 
message\",\"This is a long message\",\"This is a long message\",\"This is a 
long 
message\"],\"dag_id\":\"xcom_example\",\"run_id\":\"manual__2025-03-10T11:21:05.415784+00:00_uE7ibsBB\",\"task_id\":\"push_xcom_task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n"
 source="task"
   ```
   
   
   
   Xcom pushed by task 2:
   
   
![image](https://github.com/user-attachments/assets/66b13c05-4cd5-4904-8371-f5f574e72004)
   
   Task 2 logs:
   
   
![image](https://github.com/user-attachments/assets/6229aa78-60ae-451f-be86-ec9350b0ae17)
   
   
   Sends the GetXcom call:
   ```
   Sending request 
json="{\"key\":\"return_value\",\"dag_id\":\"xcom_example\",\"run_id\":\"manual__2025-03-10T11:21:05.415784+00:00_uE7ibsBB\",\"task_id\":\"push_xcom_task\",\"map_index\":-1,\"type\":\"GetXCom\"}\n"
 source="task"
   ```
   
   
   Status of the table:
   
   ```
   
1,push_xcom_task,-1,return_value,xcom_example,manual__2025-03-10T11:21:05.415784+00:00_uE7ibsBB,"[""This
 is a long message"", ""This is a long message"", ""This is a long message"", 
""This is a long message"", ""This is a long message"", ""This is a long 
message"", ""This is a long message"", ""This is a long message"", ""This is a 
long message"", ""This is a long message""]",2025-03-10 11:21:06.671134 +00:00
   
1,pull_xcom_task,-1,return_value,xcom_example,manual__2025-03-10T11:21:05.415784+00:00_uE7ibsBB,"[""This
 is a long message"", ""This is a long message"", ""This is a long message"", 
""This is a long message"", ""This is a long message"", ""This is a long 
message"", ""This is a long message"", ""This is a long message"", ""This is a 
long message"", ""This is a long message"", ""modified"", ""modified"", 
""modified"", ""modified"", ""modified"", ""modified"", ""modified"", 
""modified"", ""modified"", ""modified""]",2025-03-10 11:21:07.493851 +00:00
   
   ```
   
   Observe that the data has been stored in the table in a native python object 
manner (json complaint) and actual data is stored and not a reference of it.
   
   
   ### Situation 2: Using a custom xcom backend.
   
   Using DAG:
   ```
   from airflow import DAG
   from airflow.providers.standard.operators.python import PythonOperator
   
   def push_to_xcom(**kwargs):
       value = ["This is a long message"] * 10
       return value
   #
   # def push_to_xcom2(**kwargs):
   #     value = ["Hello, XCom2!"]
   #     return value
   
   def pull_from_xcom(**kwargs):
       ti = kwargs['ti']
       xcom_value1 = ti.xcom_pull(task_ids=["push_xcom_task"])
   
       print("Pulled value", xcom_value1)
   
       topush = xcom_value1 + ["modified"] * 10
       print("Pushing value", topush)
   
       # xcom_value2 = ti.xcom_pull(task_ids=["push_xcom_task2"])
       return topush
   
   with DAG(
       'xcom_example',
       schedule=None,
       catchup=False,
   ) as dag:
   
       push_xcom_task = PythonOperator(
           task_id='push_xcom_task',
           python_callable=push_to_xcom,
       )
   
       # push_xcom_task2 = PythonOperator(
       #     task_id='push_xcom_task2',
       #     python_callable=push_to_xcom2,
       # )
   
       pull_xcom_task = PythonOperator(
           task_id='pull_xcom_task',
           python_callable=pull_from_xcom,
       )
   
       push_xcom_task >> pull_xcom_task
   
   ```
   
   
   #### Setup
   
   1. Wrote a custom xcom backend that works by storing xcoms in a JSON format 
in the worker file system:
   ```
   from __future__ import annotations
   
   import json
   import os
   from typing import Any, TypeVar
   
   from airflow.sdk.execution_time.xcom import BaseXCom
   from airflow.utils.json import XComDecoder, XComEncoder
   
   T = TypeVar("T")
   
   
   class JSONFileXComBackend(BaseXCom):
       FILE_PATH = "/tmp/airflow_xcom.json"
   
       @staticmethod
       def _read_xcom_file() -> dict:
           """Read the XCom JSON file."""
           if not os.path.exists(JSONFileXComBackend.FILE_PATH):
               return {}
           with open(JSONFileXComBackend.FILE_PATH, "r") as f:
               try:
                   return json.load(f)
               except json.JSONDecodeError:
                   return {}
   
       @staticmethod
       def _write_xcom_file(data: dict) -> None:
           """Write data to the XCom JSON file."""
           with open(JSONFileXComBackend.FILE_PATH, "a+") as f:
               json.dump(data, f, indent=4)
   
       @staticmethod
       def serialize_value(
           value: T,
           *,
           key: str | None = None,
           task_id: str | None = None,
           dag_id: str | None = None,
           run_id: str | None = None,
           map_index: int | None = None,
       ) -> str:
           # we will always serialize ourselves and not by BaseXCom as the 
deserialize method
           # from BaseXCom accepts only XCom objects and not the value directly
           s_val = json.dumps(value, cls=XComEncoder)
           s_val_encoded = s_val.encode("utf-8")
   
           base_path = JSONFileXComBackend.FILE_PATH
           with open(base_path, mode="wb") as f:
               f.write(s_val_encoded)
           return BaseXCom.serialize_value(base_path)
   
       @staticmethod
       def deserialize_value(result) -> Any:
           """
           Deserializes the value from the database or object storage.
   
           Compression is inferred from the file extension.
           """
           data = BaseXCom.deserialize_value(result)
           try:
               path = JSONFileXComBackend.FILE_PATH
           except (TypeError, ValueError):
               return data
           try:
               with open(path, mode="rb") as f:
                   return json.load(f, cls=XComDecoder)
           except (TypeError, ValueError):
               return data
   
   
   ```
   
   2. Set up breeze to use this backend (set up a `init.sh`)
   
   ```
   export AIRFLOW__CORE__XCOM_BACKEND="my_xcom.JSONFileXComBackend"
   ```
   
   3. Launch breeze start-airflow normally
   
   
   Run the dag normally
   
   Task 1
   
   
![image](https://github.com/user-attachments/assets/6e037e8a-dcc0-43c0-ad2f-705a3b73dc4e)
   
   
   XCOm pushed from task 1:
   
![image](https://github.com/user-attachments/assets/5058fcbf-9624-45cf-b0e0-d332a7c2157b)
   
   
   Logs:
   
   
![image](https://github.com/user-attachments/assets/bd38b95c-61a9-4072-95da-49359a0c8dfc)
   
   
   Logs showing what was actually pushed:
   
   ```
   [2025-03-10, 11:31:03] INFO - Done. Returned value was: ['This is a long 
message', 'This is a long message', 'This is a long message', 'This is a long 
message', 'This is a long message', 'This is a long message', 'This is a long 
message', 'This is a long message', 'This is a long message', 'This is a long 
message'] 
source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
   
   ```
   
   
   The SetXcom call:
   ```
   [2025-03-10, 11:31:03] DEBUG - Sending request 
json="{\"key\":\"return_value\",\"value\":\"/tmp/airflow_xcom.json\",\"dag_id\":\"xcom_example\",\"run_id\":\"manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b\",\"task_id\":\"push_xcom_task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n"
 source="task"
   ```
   
   
   Task 2:
   
![image](https://github.com/user-attachments/assets/b1aa6061-f097-46fa-955e-e3908a5d0147)
   
   
   Xcom pushed by task 2:
   
   
![image](https://github.com/user-attachments/assets/05475953-1b25-4e94-bb3d-5918e4047570)
   
   
   Important logs from task 2:
   ```
   2025-03-10, 11:31:04] INFO - Pushing xcom 
ti="RuntimeTaskInstance(id=UUID('01957fd2-9452-757c-8488-1bf67ddfa350'), 
task_id='pull_xcom_task', dag_id='xcom_example', 
run_id='manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b', try_number=1, 
map_index=-1, hostname='0ce659e47888', task=<Task(PythonOperator): 
pull_xcom_task>, bundle_instance=LocalDagBundle(name=dags-folder), max_tries=0, 
start_date=datetime.datetime(2025, 3, 10, 11, 31, 3, 845298, 
tzinfo=TzInfo(UTC)))" source="task"
   [2025-03-10, 11:31:04] INFO - Pulled value ['This is a long message', 'This 
is a long message', 'This is a long message', 'This is a long message', 'This 
is a long message', 'This is a long message', 'This is a long message', 'This 
is a long message', 'This is a long message', 'This is a long message'] 
chan="stdout" source="task"
   [2025-03-10, 11:31:04] DEBUG - Sending request 
json="{\"key\":\"return_value\",\"value\":\"/tmp/airflow_xcom.json\",\"dag_id\":\"xcom_example\",\"run_id\":\"manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b\",\"task_id\":\"pull_xcom_task\",\"map_index\":-1,\"mapped_length\":null,\"type\":\"SetXCom\"}\n"
 source="task"
   [2025-03-10, 11:31:04] INFO - Pushing value ['This is a long message', 'This 
is a long message', 'This is a long message', 'This is a long message', 'This 
is a long message', 'This is a long message', 'This is a long message', 'This 
is a long message', 'This is a long message', 'This is a long message', 
'modified', 'modified', 'modified', 'modified', 'modified', 'modified', 
'modified', 'modified', 'modified', 'modified'] chan="stdout" source="task"
   ```
   
   
   Inside the worker, check for the log json file:
   ```
   oot@0ce659e47888:/opt/airflow# cat /tmp/airflow_xcom.json
   ["This is a long message", "This is a long message", "This is a long 
message", "This is a long message", "This is a long message", "This is a long 
message", "This is a long message", "This is a long message", "This is a long 
message", "This is a long message", "modified", "modified", "modified", 
"modified", "modified", "modified", "modified", "modified", "modified", 
"modified"]root@0ce659e47888:/opt/airflow#
   ```
   
   
   Status of the table:
   ```
   
1,push_xcom_task,-1,return_value,xcom_example,manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b,"""/tmp/airflow_xcom.json""",2025-03-10
 11:31:03.391402 +00:00
   
1,pull_xcom_task,-1,return_value,xcom_example,manual__2025-03-10T11:31:01.832127+00:00_IgXcmb0b,"""/tmp/airflow_xcom.json""",2025-03-10
 11:31:04.431473 +00:00
   ```
   
   
   Observe that the path is stored.
   
   
   ## TODO / Whats next:
   
   - [ ] Fix the tests
   - [ ] Write new tests for my changes
   - [ ] Add a significant news fragment
   - [ ] Communicate to follow ups like 
https://github.com/apache/airflow/issues/47195
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to