This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d91c481ac9 Respect "soft_fail" argument when "poke" is called (#33401)
d91c481ac9 is described below

commit d91c481ac9051316d0e61b02eda5a7e21bb6ac5b
Author: Wei Lee <[email protected]>
AuthorDate: Sat Aug 26 02:11:00 2023 +0800

    Respect "soft_fail" argument when "poke" is called (#33401)
    
    * feat(sensors/base): raise AirflowSkipException if soft_fail is set to 
True and exception occurs after running poke()
    
    * test(sensor/base): add test case for respecting soft_fail option when 
other kinds of exception is raised
---
 airflow/sensors/base.py    |  7 ++++++-
 tests/sensors/test_base.py | 30 +++++++++++++++++++++++++-----
 2 files changed, 31 insertions(+), 6 deletions(-)

diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py
index 77094269a1..719bc26a1e 100644
--- a/airflow/sensors/base.py
+++ b/airflow/sensors/base.py
@@ -240,14 +240,19 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
             except (
                 AirflowSensorTimeout,
                 AirflowTaskTimeout,
-                AirflowSkipException,
                 AirflowFailException,
             ) as e:
+                if self.soft_fail:
+                    raise AirflowSkipException("Skipping due to soft_fail is 
set to True.") from e
+                raise e
+            except AirflowSkipException as e:
                 raise e
             except Exception as e:
                 if self.silent_fail:
                     logging.error("Sensor poke failed: \n %s", 
traceback.format_exc())
                     poke_return = False
+                elif self.soft_fail:
+                    raise AirflowSkipException("Skipping due to soft_fail is 
set to True.") from e
                 else:
                     raise e
 
diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py
index 4dff8222e2..2dbd5cc686 100644
--- a/tests/sensors/test_base.py
+++ b/tests/sensors/test_base.py
@@ -25,9 +25,11 @@ import time_machine
 
 from airflow.exceptions import (
     AirflowException,
+    AirflowFailException,
     AirflowRescheduleException,
     AirflowSensorTimeout,
     AirflowSkipException,
+    AirflowTaskTimeout,
 )
 from airflow.executors.debug_executor import DebugExecutor
 from airflow.executors.executor_constants import (
@@ -48,9 +50,7 @@ from airflow.operators.empty import EmptyOperator
 from airflow.providers.celery.executors.celery_executor import CeleryExecutor
 from airflow.providers.celery.executors.celery_kubernetes_executor import 
CeleryKubernetesExecutor
 from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import 
KubernetesExecutor
-from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor 
import (
-    LocalKubernetesExecutor,
-)
+from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor 
import LocalKubernetesExecutor
 from airflow.sensors.base import BaseSensorOperator, PokeReturnValue, 
poke_mode_only
 from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
 from airflow.utils import timezone
@@ -176,6 +176,28 @@ class TestBaseSensor:
             if ti.task_id == DUMMY_OP:
                 assert ti.state == State.NONE
 
+    @pytest.mark.parametrize(
+        "exception_cls",
+        (
+            AirflowSensorTimeout,
+            AirflowTaskTimeout,
+            AirflowFailException,
+            Exception,
+        ),
+    )
+    def test_soft_fail_with_non_skip_exception(self, make_sensor, 
exception_cls):
+        sensor, dr = make_sensor(False, soft_fail=True)
+        sensor.poke = Mock(side_effect=[exception_cls(None)])
+
+        self._run(sensor)
+        tis = dr.get_task_instances()
+        assert len(tis) == 2
+        for ti in tis:
+            if ti.task_id == SENSOR_OP:
+                assert ti.state == State.SKIPPED
+            if ti.task_id == DUMMY_OP:
+                assert ti.state == State.NONE
+
     def test_soft_fail_with_retries(self, make_sensor):
         sensor, dr = make_sensor(
             return_value=False, soft_fail=True, retries=1, 
retry_delay=timedelta(milliseconds=1)
@@ -518,7 +540,6 @@ class TestBaseSensor:
         assert sensor._get_next_poke_interval(started_at, run_duration, 2) == 
sensor.poke_interval
 
     def test_sensor_with_exponential_backoff_on(self):
-
         sensor = DummySensor(
             task_id=SENSOR_OP, return_value=None, poke_interval=5, timeout=60, 
exponential_backoff=True
         )
@@ -575,7 +596,6 @@ class TestBaseSensor:
                 assert intervals[0] == intervals[-1]
 
     def test_sensor_with_exponential_backoff_on_and_max_wait(self):
-
         sensor = DummySensor(
             task_id=SENSOR_OP,
             return_value=None,

Reply via email to