This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 14eb1d3116 Fix ExternalTaskSensor when there is not task group TIs for
the current execution date (#32009)
14eb1d3116 is described below
commit 14eb1d3116ecef15be7be9a8f9d08757e74f981c
Author: Hussein Awala <[email protected]>
AuthorDate: Wed Jun 21 11:55:45 2023 +0200
Fix ExternalTaskSensor when there is not task group TIs for the current
execution date (#32009)
* Add a check on none TIs for the current execution date
Signed-off-by: Hussein Awala <[email protected]>
* replace inline if-else by old one
Signed-off-by: Hussein Awala <[email protected]>
---------
Signed-off-by: Hussein Awala <[email protected]>
---
airflow/sensors/external_task.py | 13 ++++++++-----
tests/sensors/test_external_task_sensor.py | 20 ++++++++++++++++++++
2 files changed, 28 insertions(+), 5 deletions(-)
diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py
index 959ebe5131..69ba41ef09 100644
--- a/airflow/sensors/external_task.py
+++ b/airflow/sensors/external_task.py
@@ -366,11 +366,14 @@ class ExternalTaskSensor(BaseSensorOperator):
) / len(self.external_task_ids)
elif self.external_task_group_id:
external_task_group_task_ids =
self.get_external_task_group_task_ids(session, dttm_filter)
- count = (
- self._count_query(TI, session, states, dttm_filter)
- .filter(tuple_in_condition((TI.task_id, TI.map_index),
external_task_group_task_ids))
- .scalar()
- ) / len(external_task_group_task_ids)
+ if not external_task_group_task_ids:
+ count = 0
+ else:
+ count = (
+ self._count_query(TI, session, states, dttm_filter)
+ .filter(tuple_in_condition((TI.task_id, TI.map_index),
external_task_group_task_ids))
+ .scalar()
+ ) / len(external_task_group_task_ids)
else:
count = self._count_query(DR, session, states,
dttm_filter).scalar()
return count
diff --git a/tests/sensors/test_external_task_sensor.py
b/tests/sensors/test_external_task_sensor.py
index 616c9c5cab..a5259084b1 100644
--- a/tests/sensors/test_external_task_sensor.py
+++ b/tests/sensors/test_external_task_sensor.py
@@ -808,6 +808,26 @@ exit 0
):
op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
ignore_ti_state=True)
+ def test_external_task_group_when_there_is_no_TIs(self):
+ """Test that the sensor does not fail when there are no TIs to
check."""
+ self.add_time_sensor()
+ self.add_dummy_task_group_with_dynamic_tasks(State.FAILED)
+ op = ExternalTaskSensor(
+ task_id="test_external_task_sensor_check",
+ external_dag_id=TEST_DAG_ID,
+ external_task_group_id=TEST_TASK_GROUP_ID,
+ failed_states=[State.FAILED],
+ dag=self.dag,
+ poke_interval=1,
+ timeout=3,
+ )
+ with pytest.raises(AirflowSensorTimeout):
+ op.run(
+ start_date=DEFAULT_DATE + timedelta(hours=1),
+ end_date=DEFAULT_DATE + timedelta(hours=1),
+ ignore_ti_state=True,
+ )
+
def test_external_task_sensor_check_zipped_dag_existence(dag_zip_maker):
with dag_zip_maker("test_external_task_sensor_check_existense.py") as
dagbag: