Lee-W commented on code in PR #49996:
URL: https://github.com/apache/airflow/pull/49996#discussion_r2073133313
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -247,6 +261,26 @@ def ti_run(
)
+def _get_upstream_map_indexes(
+ task: Operator, ti_map_index: int
+) -> Iterator[tuple[str, int | list[int] | None]]:
+ for upstream_task in task.upstream_list:
+ map_indexes: int | list[int] | None
+ if upstream_task.task_group is None:
+ # regular tasks
+ map_indexes = None
+ elif task.task_group == upstream_task.task_group:
+ # tasks in the same mapped task group
+ map_indexes = ti_map_index
+ else:
+ # tasks not in the same mapped task group
+ # the upstream mapped task group should combine the xcom as a list
and return it
+ mapped_ti_count: int =
upstream_task.task_group.get_parse_time_mapped_ti_count()
Review Comment:
This is fixed in the latest verison
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]