This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-6-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit c00001508315999f1b0f9e8077daafc270ff0f4c Author: Hussein Awala <[email protected]> AuthorDate: Wed Jun 21 11:55:45 2023 +0200 Fix ExternalTaskSensor when there is not task group TIs for the current execution date (#32009) * Add a check on none TIs for the current execution date Signed-off-by: Hussein Awala <[email protected]> * replace inline if-else by old one Signed-off-by: Hussein Awala <[email protected]> --------- Signed-off-by: Hussein Awala <[email protected]> (cherry picked from commit 14eb1d3116ecef15be7be9a8f9d08757e74f981c) --- airflow/sensors/external_task.py | 13 ++++++++----- tests/sensors/test_external_task_sensor.py | 20 ++++++++++++++++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py index 959ebe5131..69ba41ef09 100644 --- a/airflow/sensors/external_task.py +++ b/airflow/sensors/external_task.py @@ -366,11 +366,14 @@ class ExternalTaskSensor(BaseSensorOperator): ) / len(self.external_task_ids) elif self.external_task_group_id: external_task_group_task_ids = self.get_external_task_group_task_ids(session, dttm_filter) - count = ( - self._count_query(TI, session, states, dttm_filter) - .filter(tuple_in_condition((TI.task_id, TI.map_index), external_task_group_task_ids)) - .scalar() - ) / len(external_task_group_task_ids) + if not external_task_group_task_ids: + count = 0 + else: + count = ( + self._count_query(TI, session, states, dttm_filter) + .filter(tuple_in_condition((TI.task_id, TI.map_index), external_task_group_task_ids)) + .scalar() + ) / len(external_task_group_task_ids) else: count = self._count_query(DR, session, states, dttm_filter).scalar() return count diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index 966c3a2139..7c62357e9e 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -808,6 +808,26 @@ exit 0 ): op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + def test_external_task_group_when_there_is_no_TIs(self): + """Test that the sensor does not fail when there are no TIs to check.""" + self.add_time_sensor() + self.add_dummy_task_group_with_dynamic_tasks(State.FAILED) + op = ExternalTaskSensor( + task_id="test_external_task_sensor_check", + external_dag_id=TEST_DAG_ID, + external_task_group_id=TEST_TASK_GROUP_ID, + failed_states=[State.FAILED], + dag=self.dag, + poke_interval=1, + timeout=3, + ) + with pytest.raises(AirflowSensorTimeout): + op.run( + start_date=DEFAULT_DATE + timedelta(hours=1), + end_date=DEFAULT_DATE + timedelta(hours=1), + ignore_ti_state=True, + ) + def test_external_task_sensor_check_zipped_dag_existence(dag_zip_maker): with dag_zip_maker("test_external_task_sensor_check_existense.py") as dagbag:
