hussein-awala commented on issue #34023:
URL: https://github.com/apache/airflow/issues/34023#issuecomment-1705761692
@ephraimbuddy the problem is in the method
`get_relevant_upstream_map_indexes` where when we try to get the relevant map
index for upstream `deliver_record` and we call this method
```python
ti.get_relevant_upstream_map_indexes(
upstream=ti.task.dag.task_dict[upstream_id],
ti_count=expanded_ti_count,
session=session,
)
```
we call it with this values:
```python
ti.get_relevant_upstream_map_indexes(
upstream="deliver_records.deliver_record",
ti_count=3, # we have 3 tis because it's a mapped task group
session=session,
)
```
and this method doesn't take into account the mapped task group, so it
return -1 instead of the same map index of the checked TI.
So we have two options:
1. update `get_relevant_upstream_map_indexes` to make it handling mapped
task groups.
2. trying to detect that the two tasks are in a mapped task group without
calling this method, in this case we can return the map index of the checked TI.
I'm already working on refactoring some queries in this class including the
one which have this bug:
```python
task_id_counts = session.execute(
select(TaskInstance.task_id,
func.count(TaskInstance.task_id))
.where(TaskInstance.dag_id == ti.dag_id,
TaskInstance.run_id == ti.run_id)
.where(or_(*_iter_upstream_conditions(relevant_tasks=upstream_tasks)))
.group_by(TaskInstance.task_id)
).all()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]