amoghrajesh commented on PR #47339:
URL: https://github.com/apache/airflow/pull/47339#issuecomment-2728666074
Did a final round of regression tests with a few example dags both with Xcom
DB backend and custom XCom backend (described in PR desc)
DAG1:
```
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
def push_to_xcom(**kwargs):
value = ["This is a long message"] * 10
# value = ("key", "value")
return value
#
# def push_to_xcom2(**kwargs):
# value = ["Hello, XCom2!"]
# return value
def pull_from_xcom(**kwargs):
ti = kwargs['ti']
xcom_value1 = ti.xcom_pull(task_ids=["push_xcom_task"])
print("Pulled value", xcom_value1)
topush = xcom_value1 + ["modified"] * 10
print("Pushing value", topush)
# xcom_value2 = ti.xcom_pull(task_ids=["push_xcom_task2"])
return topush
with DAG(
'xcom_example',
schedule=None,
catchup=False,
) as dag:
push_xcom_task = PythonOperator(
task_id='push_xcom_task',
python_callable=push_to_xcom,
)
# push_xcom_task2 = PythonOperator(
# task_id='push_xcom_task2',
# python_callable=push_to_xcom2,
# )
pull_xcom_task = PythonOperator(
task_id='pull_xcom_task',
python_callable=pull_from_xcom,
)
push_xcom_task >> pull_xcom_task
```
DAG2:
```
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
def push_to_xcom(**kwargs):
value = ("Hello", "XCom!")
return value
with DAG(
'xcom_tuple_return',
schedule=None,
catchup=False,
) as dag:
push_xcom_task = PythonOperator(
task_id='tuple_task',
python_callable=push_to_xcom,
)
```
DAG3:
```
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime
# Task to push an XCom
def push_xcom(**kwargs):
kwargs['ti'].xcom_push(key="example_key", value="example_value")
# Task to pull an XCom
def pull_xcom(**kwargs):
pulled_value = kwargs['ti'].xcom_pull(task_ids="push_task",
key="example_key")
print(f"Pulled XCom Value: {pulled_value}")
# Define the DAG
with DAG(
dag_id="simple_xcom_pull_example",
default_args={"owner": "airflow"},
schedule=None,
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
# Task 1: Push XCom
push_task = PythonOperator(
task_id="push_task",
python_callable=push_xcom,
)
# Task 2: Pull XCom
pull_task = PythonOperator(
task_id="pull_task",
python_callable=pull_xcom,
)
# Define task dependencies
push_task >> pull_task
```
DAG4:
```
"""Example DAG demonstrating the usage of XComs."""
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
from datetime import datetime, timedelta
dag = DAG(
"example_xcom",
start_date=datetime(2023, 11, 28),
default_args={"owner": "airflow"},
schedule="@daily",
catchup=False,
tags=["core"],
)
value_1 = [1, 2, 3]
value_2 = {"a": "b"}
def push(**kwargs):
"""Pushes an XCom without a specific target"""
kwargs["ti"].xcom_push(key="value from pusher 1", value=value_1)
def push_by_returning(**kwargs):
"""Pushes an XCom without a specific target, just by returning it"""
return value_2
def puller(**kwargs):
"""Pull all previously pushed XComs and check if the pushed values match
the pulled values."""
ti = kwargs["ti"]
# get value_1
pulled_value_1 = ti.xcom_pull(key=None, task_ids="push")
if pulled_value_1 != value_1:
raise ValueError(f"The two values differ {pulled_value_1} and
{value_1}")
# get value_2
pulled_value_2 = ti.xcom_pull(task_ids="push_by_returning")
if pulled_value_2 != value_2:
raise ValueError(f"The two values differ {pulled_value_2} and
{value_2}")
# get both value_1 and value_2
pulled_value_1, pulled_value_2 = ti.xcom_pull(
key=None, task_ids=["push", "push_by_returning"]
)
print(f"pulled_value_1 is {pulled_value_1}")
print(f"pulled_value_2 is {pulled_value_2}")
if pulled_value_1 != value_1:
raise ValueError(f"The two values differ {pulled_value_1} and
{value_1}")
if pulled_value_2 != value_2:
raise ValueError(f"The two values differ {pulled_value_2} and
{value_2}")
push1 = PythonOperator(
task_id="push",
dag=dag,
python_callable=push,
depends_on_past=True,
)
push2 = PythonOperator(
task_id="push_by_returning",
dag=dag,
python_callable=push_by_returning,
)
pull = PythonOperator(
task_id="puller",
dag=dag,
python_callable=puller,
)
pull << [push1, push2]
```
These works as expected, both with the custom xcom backend as well as with
the XCom db backend.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]