tirkarthi commented on code in PR #49996:
URL: https://github.com/apache/airflow/pull/49996#discussion_r2071634966
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -247,6 +261,26 @@ def ti_run(
)
+def _get_upstream_map_indexes(
+ task: Operator, ti_map_index: int
+) -> Iterator[tuple[str, int | list[int] | None]]:
+ for upstream_task in task.upstream_list:
+ map_indexes: int | list[int] | None
+ if upstream_task.task_group is None:
+ # regular tasks
+ map_indexes = None
+ elif task.task_group == upstream_task.task_group:
+ # tasks in the same mapped task group
+ map_indexes = ti_map_index
+ else:
+ # tasks not in the same mapped task group
+ # the upstream mapped task group should combine the xcom as a list
and return it
+ mapped_ti_count: int =
upstream_task.task_group.get_parse_time_mapped_ti_count()
Review Comment:
`get_parse_time_mapped_ti_count` is present only in `MappedTaskGroup` and
not in `TaskGroup` . I was testing this PR and sample dag like below could
cause the traceback since here `TaskGroup` is passed which works in Airflow 2.
I was also wondering since this is only for parse time count whether there is
similar method for dynamic input like `[1, 2, 3]` returned by another `@task`.
This could be taken as another PR too but just thought of adding it since I
found it during testing.
```python
from datetime import datetime
try:
from airflow.sdk import DAG, task, task_group
except ImportError:
from airflow import DAG
from airflow.decorators import task, task_group
with DAG(
dag_id="gh49714_1",
start_date=datetime(2025, 1, 1),
schedule=None,
):
@task_group
def add_one_and_double():
@task
def add_one(value: int):
return value + 1
@task
def double(value: int):
return value * 2
new_values = add_one.expand(value=[1, 2, 3])
return double.expand(value=new_values)
@task
def consumer(values: list[int]):
print(f"New values are: {list(values)}")
results = add_one_and_double()
consumer(results)
```
```
│
/home/karthikeyan/stuff/python/airflow/airflow-core/src/airflow/api_fastapi/execution_api/routes
│
│ /task_instances.py:278 in _get_upstream_map_indexes
│
│
│
│ 275 │ │ else:
│
│ 276 │ │ │ # tasks not in the same mapped task group
│
│ 277 │ │ │ # the upstream mapped task group should combine the xcom
as a list and retur │
│ ❱ 278 │ │ │ mapped_ti_count: int =
upstream_task.task_group.get_parse_time_mapped_ti_cou │
│ 279 │ │ │ map_indexes = list(range(mapped_ti_count)) if
mapped_ti_count is not None el │
│ 280 │ │
│
│ 281 │ │ yield upstream_task.task_id, map_indexes
│
│
│
│ ╭─────────────────────────────────────────── locals
───────────────────────────────────────────╮ │
│ │ task = <Task(_PythonDecoratedOperator): consumer>
│ │
│ │ ti_map_index = -1
│ │
│ │ upstream_task = MappedOperator(
│ │
│ │ │ operator_class={
│ │
│ │ │ │ 'ui_color': '#ffefeb',
│ │
│ │ │ │ 'task_id': 'add_one_and_double.double',
│ │
│ │ │ │ 'task_type': '_PythonDecoratedOperator',
│ │
│ │ │ │ 'start_from_trigger': False,
│ │
│ │ │ │ 'template_ext': [],
│ │
│ │ │ │ 'template_fields': ['templates_dict', 'op_args',
'op_kwargs'], │ │
│ │ │ │ 'template_fields_renderers': {
│ │
│ │ │ │ │ 'templates_dict': 'json',
│ │
│ │ │ │ │ 'op_args': 'py',
│ │
│ │ │ │ │ 'op_kwargs': 'py'
│ │
│ │ │ │ },
│ │
│ │ │ │ 'ui_fgcolor': '#000',
│ │
│ │ │ │ 'downstream_task_ids': ['consumer'],
│ │
│ │ │ │ 'start_trigger_args': None
│ │
│ │ │ },
│ │
│ │ │ _is_mapped=True,
│ │
│ │ │ expand_input=DictOfListsExpandInput(value={}),
│ │
│ │ │ partial_kwargs={
│ │
│ │ │ │ 'is_setup': False,
│ │
│ │ │ │ 'is_teardown': False,
│ │
│ │ │ │ 'on_failure_fail_dagrun': False,
│ │
│ │ │ │ 'op_args': [],
│ │
│ │ │ │ 'op_kwargs': {}
│ │
│ │ │ },
│ │
│ │ │ task_id='add_one_and_double.double',
│ │
│ │ │ params={},
│ │
│ │ │ deps=frozenset({
│ │
│ │ │ │ <TIDep(Mapped dependencies have succeeded)>,
│ │
│ │ │ │ <TIDep(Previous Dagrun State)>,
│ │
│ │ │ │ <TIDep(Not Previously Skipped)>,
│ │
│ │ │ │ <TIDep(Not In Retry Period)>,
│ │
│ │ │ │ <TIDep(Trigger Rule)>
│ │
│ │ │ }),
│ │
│ │ │ operator_extra_links=[],
│ │
│ │ │ template_ext=[],
│ │
│ │ │ template_fields=['templates_dict', 'op_args',
'op_kwargs'], │ │
│ │ │ template_fields_renderers={
│ │
│ │ │ │ 'templates_dict': 'json',
│ │
│ │ │ │ 'op_args': 'py',
│ │
│ │ │ │ 'op_kwargs': 'py'
│ │
│ │ │ },
│ │
│ │ │ ui_color='#ffefeb',
│ │
│ │ │ ui_fgcolor='#000',
│ │
│ │ │ _is_empty=False,
│ │
│ │ │ _can_skip_downstream=False,
│ │
│ │ │ _is_sensor=False,
│ │
│ │ │
_task_module='airflow.providers.standard.decorators.python', │ │
│ │ │ _task_type='_PythonDecoratedOperator',
│ │
│ │ │ _operator_name='@task',
│ │
│ │ │ start_trigger_args=None,
│ │
│ │ │ start_from_trigger=False,
│ │
│ │ │ _needs_expansion=True,
│ │
│ │ │ dag=<SerializedDAG dag_id='gh49714_1' schedule=None
#tasks=3>, │ │
│ │ │ task_group=<weakproxy at 0x7f26a48cb1f0 to TaskGroup
at 0x7f26a6513590>, │ │
│ │ │ start_date=DateTime(2025, 1, 1, 0, 0, 0,
tzinfo=Timezone('UTC')), │ │
│ │ │ end_date=None,
│ │
│ │ │ upstream_task_ids={'add_one_and_double.add_one'},
│ │
│ │ │ downstream_task_ids={'consumer'},
│ │
│ │ │ _disallow_kwargs_override=False,
│ │
│ │ │ _expand_input_attr='op_kwargs_expand_input'
│ │
│ │ )
│ │
│
╰──────────────────────────────────────────────────────────────────────────────────────────────╯
│
╰──────────────────────────────────────────────────────────────────────────────────────────────────╯
AttributeError: 'TaskGroup' object has no attribute
'get_parse_time_mapped_ti_count'
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]