zameji commented on issue #40321: URL: https://github.com/apache/airflow/issues/40321#issuecomment-3480709795
The [suggested workaround](https://github.com/apache/airflow/issues/40321#issuecomment-2191430731) has the issue of introducing dependecies between tasks and bloating the data you are propagating downstream. Until a proper fix arrives, we work around the issue by getting the statuses of all the mapped tasks and then remapping the result array before getting the map_index value out of it. In **pseudocode** (can't publish actual code): ```python xcom_values = ti.xcom_pull(task_ids=task_id, key=key_id) mapped_tasks = [ti for ti in dag_run.get_task_instances() if is_task_instance(ti, xcom_task_id)] sorted_mapped_tasks = sort_mapped_tasks(mapped_tasks) mapped_tasks_statuses = [get_status(ti) for ti in sorted_mapped_tasks] xcom_value_corrected = insert_placeholder_for_unsuccessful_ti(xcom_values, mapped_tasks_statuses) return xcom_value_corrected[ti.map_index] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
