hussein-awala commented on issue #34023:
URL: https://github.com/apache/airflow/issues/34023#issuecomment-1705761692

   @ephraimbuddy the problem is in the method 
`get_relevant_upstream_map_indexes` where when we try to get the relevant map 
index for upstream `deliver_record` and we call this method
   ```python
   ti.get_relevant_upstream_map_indexes(
       upstream=ti.task.dag.task_dict[upstream_id],
       ti_count=expanded_ti_count,
       session=session,
   )
   ```
   we call it with this values:
   ```python
   ti.get_relevant_upstream_map_indexes(
       upstream="deliver_records.deliver_record",
       ti_count=3, # we have 3 tis because it's a mapped task group
       session=session,
   )
   ```
   and this method doesn't take into account the mapped task group, so it 
return -1 instead of the same map index of the checked TI.
   
   So we have two options:
   1. update `get_relevant_upstream_map_indexes` to make it handling mapped 
task groups.
   2. trying to detect that the two tasks are in a mapped task group without 
calling this method, in this case we can return the map index of the checked TI.
   
   I'm already working on refactoring some queries in this class including the 
one which have this bug:
   ```python
                   task_id_counts = session.execute(
                       select(TaskInstance.task_id, 
func.count(TaskInstance.task_id))
                       .where(TaskInstance.dag_id == ti.dag_id, 
TaskInstance.run_id == ti.run_id)
                       
.where(or_(*_iter_upstream_conditions(relevant_tasks=upstream_tasks)))
                       .group_by(TaskInstance.task_id)
                   ).all()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to