This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d91c481ac9 Respect "soft_fail" argument when "poke" is called (#33401)
d91c481ac9 is described below
commit d91c481ac9051316d0e61b02eda5a7e21bb6ac5b
Author: Wei Lee <[email protected]>
AuthorDate: Sat Aug 26 02:11:00 2023 +0800
Respect "soft_fail" argument when "poke" is called (#33401)
* feat(sensors/base): raise AirflowSkipException if soft_fail is set to
True and exception occurs after running poke()
* test(sensor/base): add test case for respecting soft_fail option when
other kinds of exception is raised
---
airflow/sensors/base.py | 7 ++++++-
tests/sensors/test_base.py | 30 +++++++++++++++++++++++++-----
2 files changed, 31 insertions(+), 6 deletions(-)
diff --git a/airflow/sensors/base.py b/airflow/sensors/base.py
index 77094269a1..719bc26a1e 100644
--- a/airflow/sensors/base.py
+++ b/airflow/sensors/base.py
@@ -240,14 +240,19 @@ class BaseSensorOperator(BaseOperator, SkipMixin):
except (
AirflowSensorTimeout,
AirflowTaskTimeout,
- AirflowSkipException,
AirflowFailException,
) as e:
+ if self.soft_fail:
+ raise AirflowSkipException("Skipping due to soft_fail is
set to True.") from e
+ raise e
+ except AirflowSkipException as e:
raise e
except Exception as e:
if self.silent_fail:
logging.error("Sensor poke failed: \n %s",
traceback.format_exc())
poke_return = False
+ elif self.soft_fail:
+ raise AirflowSkipException("Skipping due to soft_fail is
set to True.") from e
else:
raise e
diff --git a/tests/sensors/test_base.py b/tests/sensors/test_base.py
index 4dff8222e2..2dbd5cc686 100644
--- a/tests/sensors/test_base.py
+++ b/tests/sensors/test_base.py
@@ -25,9 +25,11 @@ import time_machine
from airflow.exceptions import (
AirflowException,
+ AirflowFailException,
AirflowRescheduleException,
AirflowSensorTimeout,
AirflowSkipException,
+ AirflowTaskTimeout,
)
from airflow.executors.debug_executor import DebugExecutor
from airflow.executors.executor_constants import (
@@ -48,9 +50,7 @@ from airflow.operators.empty import EmptyOperator
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from airflow.providers.celery.executors.celery_kubernetes_executor import
CeleryKubernetesExecutor
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import
KubernetesExecutor
-from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor
import (
- LocalKubernetesExecutor,
-)
+from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor
import LocalKubernetesExecutor
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue,
poke_mode_only
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
from airflow.utils import timezone
@@ -176,6 +176,28 @@ class TestBaseSensor:
if ti.task_id == DUMMY_OP:
assert ti.state == State.NONE
+ @pytest.mark.parametrize(
+ "exception_cls",
+ (
+ AirflowSensorTimeout,
+ AirflowTaskTimeout,
+ AirflowFailException,
+ Exception,
+ ),
+ )
+ def test_soft_fail_with_non_skip_exception(self, make_sensor,
exception_cls):
+ sensor, dr = make_sensor(False, soft_fail=True)
+ sensor.poke = Mock(side_effect=[exception_cls(None)])
+
+ self._run(sensor)
+ tis = dr.get_task_instances()
+ assert len(tis) == 2
+ for ti in tis:
+ if ti.task_id == SENSOR_OP:
+ assert ti.state == State.SKIPPED
+ if ti.task_id == DUMMY_OP:
+ assert ti.state == State.NONE
+
def test_soft_fail_with_retries(self, make_sensor):
sensor, dr = make_sensor(
return_value=False, soft_fail=True, retries=1,
retry_delay=timedelta(milliseconds=1)
@@ -518,7 +540,6 @@ class TestBaseSensor:
assert sensor._get_next_poke_interval(started_at, run_duration, 2) ==
sensor.poke_interval
def test_sensor_with_exponential_backoff_on(self):
-
sensor = DummySensor(
task_id=SENSOR_OP, return_value=None, poke_interval=5, timeout=60,
exponential_backoff=True
)
@@ -575,7 +596,6 @@ class TestBaseSensor:
assert intervals[0] == intervals[-1]
def test_sensor_with_exponential_backoff_on_and_max_wait(self):
-
sensor = DummySensor(
task_id=SENSOR_OP,
return_value=None,