amoghrajesh commented on PR #47339:
URL: https://github.com/apache/airflow/pull/47339#issuecomment-2728666074

   Did a final round of regression tests with a few example dags both with Xcom 
DB backend and custom XCom backend (described in PR desc)
   
   
   DAG1:
   ```
   from airflow import DAG
   from airflow.providers.standard.operators.python import PythonOperator
   
   def push_to_xcom(**kwargs):
       value = ["This is a long message"] * 10
       # value = ("key", "value")
       return value
   #
   # def push_to_xcom2(**kwargs):
   #     value = ["Hello, XCom2!"]
   #     return value
   
   def pull_from_xcom(**kwargs):
       ti = kwargs['ti']
       xcom_value1 = ti.xcom_pull(task_ids=["push_xcom_task"])
   
       print("Pulled value", xcom_value1)
   
       topush = xcom_value1 + ["modified"] * 10
       print("Pushing value", topush)
   
       # xcom_value2 = ti.xcom_pull(task_ids=["push_xcom_task2"])
       return topush
   
   with DAG(
       'xcom_example',
       schedule=None,
       catchup=False,
   ) as dag:
   
       push_xcom_task = PythonOperator(
           task_id='push_xcom_task',
           python_callable=push_to_xcom,
       )
   
       # push_xcom_task2 = PythonOperator(
       #     task_id='push_xcom_task2',
       #     python_callable=push_to_xcom2,
       # )
   
       pull_xcom_task = PythonOperator(
           task_id='pull_xcom_task',
           python_callable=pull_from_xcom,
       )
   
       push_xcom_task >> pull_xcom_task
   
   ```
   
   
   DAG2:
   ```
   from airflow import DAG
   from airflow.providers.standard.operators.python import PythonOperator
   
   def push_to_xcom(**kwargs):
       value = ("Hello", "XCom!")
       return value
   
   with DAG(
       'xcom_tuple_return',
       schedule=None,
       catchup=False,
   ) as dag:
   
       push_xcom_task = PythonOperator(
           task_id='tuple_task',
           python_callable=push_to_xcom,
       )
   ```
   
   DAG3:
   ```
   from airflow import DAG
   from airflow.providers.standard.operators.python import PythonOperator
   from datetime import datetime
   
   # Task to push an XCom
   def push_xcom(**kwargs):
       kwargs['ti'].xcom_push(key="example_key", value="example_value")
   
   # Task to pull an XCom
   def pull_xcom(**kwargs):
       pulled_value = kwargs['ti'].xcom_pull(task_ids="push_task", 
key="example_key")
       print(f"Pulled XCom Value: {pulled_value}")
   
   # Define the DAG
   with DAG(
       dag_id="simple_xcom_pull_example",
       default_args={"owner": "airflow"},
       schedule=None,
       start_date=datetime(2023, 1, 1),
       catchup=False,
   ) as dag:
   
       # Task 1: Push XCom
       push_task = PythonOperator(
           task_id="push_task",
           python_callable=push_xcom,
       )
   
       # Task 2: Pull XCom
       pull_task = PythonOperator(
           task_id="pull_task",
           python_callable=pull_xcom,
       )
   
       # Define task dependencies
       push_task >> pull_task
   ```
   
   
   DAG4:
   ```
   """Example DAG demonstrating the usage of XComs."""
   from airflow import DAG
   from airflow.providers.standard.operators.python import PythonOperator
   from datetime import datetime, timedelta
   
   dag = DAG(
           "example_xcom",
       start_date=datetime(2023, 11, 28),
       default_args={"owner": "airflow"},
       schedule="@daily",
       catchup=False,
       tags=["core"],
   )
   
   value_1 = [1, 2, 3]
   value_2 = {"a": "b"}
   
   
   def push(**kwargs):
       """Pushes an XCom without a specific target"""
       kwargs["ti"].xcom_push(key="value from pusher 1", value=value_1)
   
   
   def push_by_returning(**kwargs):
       """Pushes an XCom without a specific target, just by returning it"""
       return value_2
   
   
   def puller(**kwargs):
       """Pull all previously pushed XComs and check if the pushed values match 
the pulled values."""
       ti = kwargs["ti"]
   
       # get value_1
       pulled_value_1 = ti.xcom_pull(key=None, task_ids="push")
       if pulled_value_1 != value_1:
           raise ValueError(f"The two values differ {pulled_value_1} and 
{value_1}")
   
       # get value_2
       pulled_value_2 = ti.xcom_pull(task_ids="push_by_returning")
       if pulled_value_2 != value_2:
           raise ValueError(f"The two values differ {pulled_value_2} and 
{value_2}")
   
       # get both value_1 and value_2
       pulled_value_1, pulled_value_2 = ti.xcom_pull(
           key=None, task_ids=["push", "push_by_returning"]
       )
       print(f"pulled_value_1 is {pulled_value_1}")
       print(f"pulled_value_2 is {pulled_value_2}")
       if pulled_value_1 != value_1:
           raise ValueError(f"The two values differ {pulled_value_1} and 
{value_1}")
       if pulled_value_2 != value_2:
           raise ValueError(f"The two values differ {pulled_value_2} and 
{value_2}")
   
   
   push1 = PythonOperator(
       task_id="push",
       dag=dag,
       python_callable=push,
       depends_on_past=True,
   )
   
   push2 = PythonOperator(
       task_id="push_by_returning",
       dag=dag,
       python_callable=push_by_returning,
   )
   
   pull = PythonOperator(
       task_id="puller",
       dag=dag,
       python_callable=puller,
   )
   
   pull << [push1, push2]
   ```
   
   These works as expected, both with the custom xcom backend as well as with 
the XCom db backend.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to