hanxdatadog opened a new issue, #50982: URL: https://github.com/apache/airflow/issues/50982
### Apache Airflow version 3.0.1 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? When pulling xcom data from dynamic mapped tasks by doing ``` ti.xcom_pull(task_ids="some_task") ``` It used to return all mapped tasks' xcom data in airflow 2. It is still doing that when testing airflow dag using `airflow dags test my_dag`. But when running airflow in production/standalone, it returns None. Reading the code, we are using RuntimeTaskInstance when actually deploying Airflow. The execution API XCom API cannot achieve the previous behavior of getting many XCom data from mapped tasks. https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/execution_time/task_runner.py#L359-L375 ### What you think should happen instead? 1. It should restore the behavior from Airflow 2, which returns a list of XCom data from all dynamically mapped tasks. This is quite useful, as dynamically mapped tasks are often based on runtime data, making it difficult to know the number of mapped tasks in advance. 2. If restoring the same behavior is difficult, could we provide a way to retrieve the mapped task count for a given upstream task? This would help in fetching all the XCom data from the mapped tasks later on. ### How to reproduce With the following DAG code ``` with DAG( "my_dag", schedule=timedelta(days=1), start_date=datetime(2025, 1, 1), catchup=False, ) as dag: def push_data(x): return f"foo: {x}" push_task = PythonOperator.partial( task_id="push_task", python_callable=push_data, ).expand(op_kwargs=[{"x": "a"}, {"x": "b"}]) def pull_data(**kwargs): ti = kwargs["ti"] print(f"ti_class:{ti.__class__}") print(f"ti.map_index:{ti.map_index}") data = ti.xcom_pull(task_ids="push_task") result = list(data) if data else data print(f"pulled data {result}") return result pull_task = PythonOperator( task_id="pull_data", python_callable=pull_data, ) def pull_data_0(**kwargs): ti = kwargs["ti"] data = ti.xcom_pull(task_ids="push_task", map_indexes=0) result = list(data) if data else data print(f"pulled data 0 {result}") return result pull_task_explicit = PythonOperator( task_id="pull_task_explicit", python_callable=pull_data_0, ) push_task >> pull_task >> pull_task_explicit ``` If we run `airflow dags test my_dag`, in the `pull_data` task, it will print ``` pulled data ['data: a', 'data: b'] ``` If we deploy airflow (either through k8s or standalone) and trigger the dag, the `pull_data` task will output ``` pulled data None: chan="stdout": source="task" ``` ### Operating System MacOS ### Versions of Apache Airflow Providers _No response_ ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
