Gollum999 opened a new issue, #30689: URL: https://github.com/apache/airflow/issues/30689
### Apache Airflow version Other Airflow 2 version (please specify below) ### What happened If you have an `ExternalTaskSensor` that uses `external_task_group_id` to wait on a `TaskGroup`, and if that `TaskGroup` contains any [mapped tasks](https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html), the sensor will be stuck waiting forever even after the task group is successful. ### What you think should happen instead `ExternalTaskSensor` should be able to wait on `TaskGroup`s, regardless of whether or not that group contains mapped tasks. ### How to reproduce ``` #!/usr/bin/env python3 import datetime import logging from airflow.decorators import dag, task from airflow.operators.empty import EmptyOperator from airflow.sensors.external_task import ExternalTaskSensor from airflow.utils.task_group import TaskGroup logger = logging.getLogger(__name__) @dag( schedule_interval='@daily', start_date=datetime.datetime(2023, 4, 17), ) def task_groups(): with TaskGroup(group_id='group'): EmptyOperator(task_id='operator1') >> EmptyOperator(task_id='operator2') with TaskGroup(group_id='mapped_tasks'): @task def get_tasks(): return [1, 2, 3] @task def process(x): print(x) process.expand(x=get_tasks()) ExternalTaskSensor( task_id='wait_for_normal_task_group', external_dag_id='task_groups', external_task_group_id='group', poke_interval=3, check_existence=True, ) ExternalTaskSensor( task_id='wait_for_mapped_task_group', external_dag_id='task_groups', external_task_group_id='mapped_tasks', poke_interval=3, check_existence=True, ) task_groups() ``` ### Operating System CentOS Stream 8 ### Versions of Apache Airflow Providers N/A ### Deployment Other ### Deployment details Standalone ### Anything else I think the bug is [here](https://github.com/apache/airflow/blob/731ef3d692fc7472e245f39f3e3e42c2360cb769/airflow/sensors/external_task.py#L364): ``` elif self.external_task_group_id: external_task_group_task_ids = self.get_external_task_group_task_ids(session) count = ( self._count_query(TI, session, states, dttm_filter) .filter(TI.task_id.in_(external_task_group_task_ids)) .scalar() ) / len(external_task_group_task_ids) ``` If the group contains mapped tasks, `external_task_group_ids` only contains a list of task names (not expanded to include mapped task indices), but the `count` will count all mapped instances. This returns a larger value than the calling function expects to receive when it checks for `count_allowed == len(dttm_filter)`, so `poke` always returns `False`. ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
