Gollum999 opened a new issue, #30689:
URL: https://github.com/apache/airflow/issues/30689

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   If you have an `ExternalTaskSensor` that uses `external_task_group_id` to 
wait on a `TaskGroup`, and if that `TaskGroup` contains any [mapped 
tasks](https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html),
 the sensor will be stuck waiting forever even after the task group is 
successful.
   
   ### What you think should happen instead
   
   `ExternalTaskSensor` should be able to wait on `TaskGroup`s, regardless of 
whether or not that group contains mapped tasks.
   
   ### How to reproduce
   
   ```
   #!/usr/bin/env python3
   import datetime
   import logging
   
   from airflow.decorators import dag, task
   from airflow.operators.empty import EmptyOperator
   from airflow.sensors.external_task import ExternalTaskSensor
   from airflow.utils.task_group import TaskGroup
   
   
   logger = logging.getLogger(__name__)
   
   
   @dag(
       schedule_interval='@daily',
       start_date=datetime.datetime(2023, 4, 17),
   )
   def task_groups():
       with TaskGroup(group_id='group'):
           EmptyOperator(task_id='operator1') >> 
EmptyOperator(task_id='operator2')
   
       with TaskGroup(group_id='mapped_tasks'):
           @task
           def get_tasks():
               return [1, 2, 3]
   
           @task
           def process(x):
               print(x)
   
           process.expand(x=get_tasks())
   
       ExternalTaskSensor(
           task_id='wait_for_normal_task_group',
           external_dag_id='task_groups',
           external_task_group_id='group',
           poke_interval=3,
           check_existence=True,
       )
       ExternalTaskSensor(
           task_id='wait_for_mapped_task_group',
           external_dag_id='task_groups',
           external_task_group_id='mapped_tasks',
           poke_interval=3,
           check_existence=True,
       )
   
   
   task_groups()
   ```
   
   ### Operating System
   
   CentOS Stream 8
   
   ### Versions of Apache Airflow Providers
   
   N/A
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Standalone
   
   ### Anything else
   
   I think the bug is 
[here](https://github.com/apache/airflow/blob/731ef3d692fc7472e245f39f3e3e42c2360cb769/airflow/sensors/external_task.py#L364):
   ```
           elif self.external_task_group_id:
               external_task_group_task_ids = 
self.get_external_task_group_task_ids(session)
               count = (
                   self._count_query(TI, session, states, dttm_filter)
                   .filter(TI.task_id.in_(external_task_group_task_ids))
                   .scalar()
               ) / len(external_task_group_task_ids)
   ```
   If the group contains mapped tasks, `external_task_group_ids` only contains 
a list of task names (not expanded to include mapped task indices), but the 
`count` will count all mapped instances.  This returns a larger value than the 
calling function expects to receive when it checks for `count_allowed == 
len(dttm_filter)`, so `poke` always returns `False`.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to