Lee-W commented on code in PR #49996:
URL: https://github.com/apache/airflow/pull/49996#discussion_r2071438137
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -247,6 +251,31 @@ def ti_run(
)
+def _get_upstream_map_indexes(
+ request: Request, dag_id: str, task_id: str, ti_map_index
+) -> dict[str, int | list[int] | None] | None:
+ dag = request.app.state.dag_bag.get_dag(dag_id)
+ if not dag:
+ return None
+
+ task = dag.get_task(task_id)
+ upstream_map_indexes = {
+ upstream_task.task_id: (
+ # regular tasks
+ None
+ if upstream_task.task_group is None
+ # tasks in the same mapped task group
+ else ti_map_index
+ if task.task_group == upstream_task.task_group
+ # tasks not in the same mapped task group
+ # the upstream mapped task group should combine the xcom as a list
and return it
+ else
list(range(upstream_task.task_group.get_parse_time_mapped_ti_count()))
+ )
+ for upstream_task in task.upstream_list
+ }
+ return upstream_map_indexes
Review Comment:
Just updated. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]