hanxdatadog opened a new issue, #50982:
URL: https://github.com/apache/airflow/issues/50982

   ### Apache Airflow version
   
   3.0.1
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   When pulling xcom data from dynamic mapped tasks by doing
   ```
   ti.xcom_pull(task_ids="some_task")
   ```
   It used to return all mapped tasks' xcom data in airflow 2. It is still 
doing that when testing airflow dag using `airflow dags test my_dag`. But when 
running airflow in production/standalone, it returns None. 
   
   Reading the code, we are using RuntimeTaskInstance when actually deploying 
Airflow. The execution API XCom API cannot achieve the previous behavior of 
getting many XCom data from mapped tasks.
   
https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L359-L375
   
   
   ### What you think should happen instead?
   
   1. It should restore the behavior from Airflow 2, which returns a list of 
XCom data from all dynamically mapped tasks. This is quite useful, as 
dynamically mapped tasks are often based on runtime data, making it difficult 
to know the number of mapped tasks in advance.
   
   2. If restoring the same behavior is difficult, could we provide a way to 
retrieve the mapped task count for a given upstream task? This would help in 
fetching all the XCom data from the mapped tasks later on.
   
   ### How to reproduce
   
   With the following DAG code
   ```
   with DAG(
       "my_dag",
       schedule=timedelta(days=1),
       start_date=datetime(2025, 1, 1),
       catchup=False,
   ) as dag:
   
       def push_data(x):
           return f"foo: {x}"
   
       push_task = PythonOperator.partial(
           task_id="push_task",
           python_callable=push_data,
       ).expand(op_kwargs=[{"x": "a"}, {"x": "b"}])
   
       def pull_data(**kwargs):
           ti = kwargs["ti"]
           print(f"ti_class:{ti.__class__}")
           print(f"ti.map_index:{ti.map_index}")
           data = ti.xcom_pull(task_ids="push_task")
           result = list(data) if data else data
           print(f"pulled data {result}")
           return result
   
       pull_task = PythonOperator(
           task_id="pull_data",
           python_callable=pull_data,
       )
   
       def pull_data_0(**kwargs):
           ti = kwargs["ti"]
           data = ti.xcom_pull(task_ids="push_task", map_indexes=0)
           result = list(data) if data else data
           print(f"pulled data 0 {result}")
           return result
   
       pull_task_explicit = PythonOperator(
           task_id="pull_task_explicit",
           python_callable=pull_data_0,
       )
       push_task >> pull_task >> pull_task_explicit
   ```
   If we run `airflow dags test my_dag`, in the `pull_data` task, it will print
   ```
   pulled data ['data: a', 'data: b']
   ```
   If we deploy airflow (either through k8s or standalone) and trigger the dag, 
the `pull_data` task will output
   ```
   pulled data None: chan="stdout": source="task"
   ```
   
   ### Operating System
   
   MacOS
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to