This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7b548b0e6942c11288ec4cc30682362aad93d7ba Author: Andrew Gibbs <[email protected]> AuthorDate: Fri Jun 24 14:50:13 2022 +0100 ExternalTaskSensor respects soft_fail if the external task enters a failed_state (#23647) * Respecting soft_fail in ExternalTaskSensor when the upstream tasks are in the failed state (#19754) - Changed behaviour of sensor to as above to respect soft_fail - Added tests of new soft_fail behaviour (#19754) - Added newsfragment and improved sensor docstring (cherry picked from commit 1b345981f6e8e910b3542ec53829e39e6c9b6dba) --- airflow/sensors/external_task.py | 29 ++++++++++++++- newsfragments/23647.bugfix.rst | 1 + tests/sensors/test_external_task_sensor.py | 57 +++++++++++++++++++++++++----- 3 files changed, 78 insertions(+), 9 deletions(-) diff --git a/airflow/sensors/external_task.py b/airflow/sensors/external_task.py index 30c27c7214..bd66c8da29 100644 --- a/airflow/sensors/external_task.py +++ b/airflow/sensors/external_task.py @@ -22,7 +22,7 @@ from typing import TYPE_CHECKING, Any, Callable, Collection, FrozenSet, Iterable from sqlalchemy import func -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowSkipException from airflow.models.baseoperator import BaseOperatorLink from airflow.models.dag import DagModel from airflow.models.dagbag import DagBag @@ -55,6 +55,24 @@ class ExternalTaskSensor(BaseSensorOperator): Waits for a different DAG or a task in a different DAG to complete for a specific logical date. + By default the ExternalTaskSensor will wait for the external task to + succeed, at which point it will also succeed. However, by default it will + *not* fail if the external task fails, but will continue to check the status + until the sensor times out (thus giving you time to retry the external task + without also having to clear the sensor). + + It is possible to alter the default behavior by setting states which + cause the sensor to fail, e.g. by setting ``allowed_states=[State.FAILED]`` + and ``failed_states=[State.SUCCESS]`` you will flip the behaviour to get a + sensor which goes green when the external task *fails* and immediately goes + red if the external task *succeeds*! + + Note that ``soft_fail`` is respected when examining the failed_states. Thus + if the external task enters a failed state and ``soft_fail == True`` the + sensor will _skip_ rather than fail. As a result, setting ``soft_fail=True`` + and ``failed_states=[State.SKIPPED]`` will result in the sensor skipping if + the external task skips. + :param external_dag_id: The dag_id that contains the task you want to wait for :param external_task_id: The task_id that contains the task you want to @@ -182,11 +200,20 @@ class ExternalTaskSensor(BaseSensorOperator): if count_failed == len(dttm_filter): if self.external_task_ids: + if self.soft_fail: + raise AirflowSkipException( + f'Some of the external tasks {self.external_task_ids} ' + f'in DAG {self.external_dag_id} failed. Skipping due to soft_fail.' + ) raise AirflowException( f'Some of the external tasks {self.external_task_ids} ' f'in DAG {self.external_dag_id} failed.' ) else: + if self.soft_fail: + raise AirflowSkipException( + f'The external DAG {self.external_dag_id} failed. Skipping due to soft_fail.' + ) raise AirflowException(f'The external DAG {self.external_dag_id} failed.') return count_allowed == len(dttm_filter) diff --git a/newsfragments/23647.bugfix.rst b/newsfragments/23647.bugfix.rst new file mode 100644 index 0000000000..d12c1d7046 --- /dev/null +++ b/newsfragments/23647.bugfix.rst @@ -0,0 +1 @@ +``ExternalTaskSensor`` now supports the ``soft_fail`` flag to skip if external task or DAG enters a failed state. diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index 8725d76081..8a2a571160 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -134,6 +134,28 @@ class TestExternalTaskSensor(unittest.TestCase): "unit_test_dag failed." ) + def test_external_task_sensor_soft_fail_failed_states_as_skipped(self, session=None): + self.test_time_sensor() + op = ExternalTaskSensor( + task_id='test_external_task_sensor_check', + external_dag_id=TEST_DAG_ID, + external_task_id=TEST_TASK_ID, + allowed_states=[State.FAILED], + failed_states=[State.SUCCESS], + soft_fail=True, + dag=self.dag, + ) + + # when + op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + # then + session = settings.Session() + TI = TaskInstance + task_instances: list[TI] = session.query(TI).filter(TI.task_id == op.task_id).all() + assert len(task_instances) == 1, "Unexpected number of task instances" + assert task_instances[0].state == State.SKIPPED, "Unexpected external task state" + def test_external_task_sensor_external_task_id_param(self): """Test external_task_ids is set properly when external_task_id is passed as a template""" self.test_time_sensor() @@ -141,10 +163,7 @@ class TestExternalTaskSensor(unittest.TestCase): task_id='test_external_task_sensor_check', external_dag_id='{{ params.dag_id }}', external_task_id='{{ params.task_id }}', - params={ - 'dag_id': TEST_DAG_ID, - 'task_id': TEST_TASK_ID, - }, + params={'dag_id': TEST_DAG_ID, 'task_id': TEST_TASK_ID}, dag=self.dag, ) @@ -162,10 +181,7 @@ class TestExternalTaskSensor(unittest.TestCase): task_id='test_external_task_sensor_check', external_dag_id='{{ params.dag_id }}', external_task_ids=['{{ params.task_id }}'], - params={ - 'dag_id': TEST_DAG_ID, - 'task_id': TEST_TASK_ID, - }, + params={'dag_id': TEST_DAG_ID, 'task_id': TEST_TASK_ID}, dag=self.dag, ) @@ -214,6 +230,31 @@ class TestExternalTaskSensor(unittest.TestCase): ) op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + def test_external_dag_sensor_soft_fail_as_skipped(self): + other_dag = DAG('other_dag', default_args=self.args, end_date=DEFAULT_DATE, schedule_interval='@once') + other_dag.create_dagrun( + run_id='test', start_date=DEFAULT_DATE, execution_date=DEFAULT_DATE, state=State.SUCCESS + ) + op = ExternalTaskSensor( + task_id='test_external_dag_sensor_check', + external_dag_id='other_dag', + external_task_id=None, + allowed_states=[State.FAILED], + failed_states=[State.SUCCESS], + soft_fail=True, + dag=self.dag, + ) + + # when + op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + # then + session = settings.Session() + TI = TaskInstance + task_instances: list[TI] = session.query(TI).filter(TI.task_id == op.task_id).all() + assert len(task_instances) == 1, "Unexpected number of task instances" + assert task_instances[0].state == State.SKIPPED, "Unexpected external task state" + def test_external_task_sensor_fn_multiple_execution_dates(self): bash_command_code = """ {% set s=logical_date.time().second %}
