Lee-W commented on issue #49714: URL: https://github.com/apache/airflow/issues/49714#issuecomment-2831955099
> @Lee-W I was checking on this while debugging #49737 . It seems in the example above while trying to resolve `consumer(results)` and `results` from `double` is an xcom arg that is attached to a mapped task group. No `map_indexes` are passed which causes xcoms to be fetched for `add_one_and_double.double` with `map_index` as `-1` from `self.map_index` for `consumer` which is a normal task inside `xcom_pull` function. > > One possible approach would be in case of the normal task having a closes task group which is also mapped get the parse time count of `map_indexes` and fetch all their values which would return `[4, 6, 8]` . But `get_parse_time_mapped_ti_count` will not be possible in cases where expansion depends on another task like `add_one_and_double.expand(value=producer())` . Hope I am correct in my understanding here. > > > ```diff > diff --git a/task-sdk/src/airflow/sdk/definitions/xcom_arg.py b/task-sdk/src/airflow/sdk/definitions/xcom_arg.py > index ab3a5a82b2..b85a14d5a7 100644 > --- a/task-sdk/src/airflow/sdk/definitions/xcom_arg.py > +++ b/task-sdk/src/airflow/sdk/definitions/xcom_arg.py > @@ -350,11 +350,26 @@ class PlainXComArg(XComArg): > ) > else: > # task from a task group > - result = ti.xcom_pull( > - task_ids=task_id, > - key=self.key, > - default=NOTSET, > - ) > + from airflow.sdk.definitions.taskgroup import MappedTaskGroup > + > + # Upstream of this normal task is a mapped task group. Hence all > + # xcom values for the upstream map_indexes need to be collected to be passed > + # to the normal task. > + if (tg, MappedTaskGroup) and ti.map_index == -1: > + map_indexes = list(range(tg.get_parse_time_mapped_ti_count())) > + > + result = ti.xcom_pull( > + task_ids=task_id, > + key=self.key, > + default=NOTSET, > + map_indexes=map_indexes > + ) > + else: > + result = ti.xcom_pull( > + task_ids=task_id, > + key=self.key, > + default=NOTSET, > + ) > if not isinstance(result, ArgNotSet): > return result > if self.key == XCOM_RETURN_KEY: > ``` Yep, it was checked by the common ancestor function in 2.10. I was thinking whether we want to do it the old way or doing something similar to what you did -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
