This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 6254fa6a8b7 [v3-1-test] Fix deferrable sensors not respecting
soft_fail on timeout (#61132) (#61421)
6254fa6a8b7 is described below
commit 6254fa6a8b77957e0123a905dd6d90bcac1dd659
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 18 21:32:34 2026 +0100
[v3-1-test] Fix deferrable sensors not respecting soft_fail on timeout
(#61132) (#61421)
When a deferrable sensor with soft_fail=True times out, the task
fails with AirflowSensorTimeout instead of being marked as SKIPPED.
This is a regression from Airflow 2.x behavior.
The issue was in resume_execution() where TaskDeferralTimeout was
converted to AirflowSensorTimeout before checking soft_fail. This
fix uses nested exception handling to check soft_fail and never_fail
before the conversion, ensuring timeouts are properly skipped.
closes: #61130
(cherry picked from commit cec8ba68f19cb21293979a6f042909d5ccbaac11)
Co-authored-by: Nathan Hadfield <[email protected]>
Co-authored-by: Claude Sonnet 4.5 <[email protected]>
---
task-sdk/src/airflow/sdk/bases/sensor.py | 13 +++++--
task-sdk/tests/task_sdk/bases/test_sensor.py | 57 ++++++++++++++++++++++++++++
2 files changed, 66 insertions(+), 4 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/bases/sensor.py
b/task-sdk/src/airflow/sdk/bases/sensor.py
index 0ca447189a9..66a97c10f0e 100644
--- a/task-sdk/src/airflow/sdk/bases/sensor.py
+++ b/task-sdk/src/airflow/sdk/bases/sensor.py
@@ -251,13 +251,18 @@ class BaseSensorOperator(BaseOperator):
return xcom_value
def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] |
None, context: Context):
+ # Use nested try/except to convert TaskDeferralTimeout to
AirflowSensorTimeout
+ # while still allowing soft_fail/never_fail to handle both exception
types.
try:
- return super().resume_execution(next_method, next_kwargs, context)
- except TaskDeferralTimeout as e:
- raise AirflowSensorTimeout(*e.args) from e
+ try:
+ return super().resume_execution(next_method, next_kwargs,
context)
+ except TaskDeferralTimeout as e:
+ raise AirflowSensorTimeout(*e.args) from e
except (AirflowException, TaskDeferralError) as e:
if self.soft_fail:
- raise AirflowSkipException(str(e)) from e
+ raise AirflowSkipException("Skipping due to soft_fail is set
to True.") from e
+ if self.never_fail:
+ raise AirflowSkipException("Skipping due to never_fail is set
to True.") from e
raise
def _get_next_poke_interval(
diff --git a/task-sdk/tests/task_sdk/bases/test_sensor.py
b/task-sdk/tests/task_sdk/bases/test_sensor.py
index 8b9cc04a786..b37cdac3b80 100644
--- a/task-sdk/tests/task_sdk/bases/test_sensor.py
+++ b/task-sdk/tests/task_sdk/bases/test_sensor.py
@@ -31,6 +31,7 @@ from airflow.exceptions import (
AirflowSensorTimeout,
AirflowSkipException,
AirflowTaskTimeout,
+ TaskDeferralError,
)
from airflow.models.trigger import TriggerFailureReason
from airflow.providers.standard.operators.empty import EmptyOperator
@@ -677,3 +678,59 @@ class TestAsyncSensor:
async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor",
soft_fail=soft_fail)
with pytest.raises(expected_exception):
async_sensor.resume_execution("execute_complete", None, {})
+
+ @pytest.mark.parametrize(
+ ("soft_fail", "expected_exception"),
+ [
+ (True, AirflowSkipException),
+ (False, AirflowSensorTimeout),
+ ],
+ )
+ def test_timeout_after_resuming_deferred_sensor_with_soft_fail(self,
soft_fail, expected_exception):
+ """Test that deferrable sensors with soft_fail skip on timeout instead
of failing."""
+ async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor",
soft_fail=soft_fail)
+ with pytest.raises(expected_exception):
+ async_sensor.resume_execution(
+ next_method="__fail__",
+ next_kwargs={"error": TriggerFailureReason.TRIGGER_TIMEOUT},
+ context={},
+ )
+
+ def test_timeout_after_resuming_deferred_sensor_with_never_fail(self):
+ """Test that deferrable sensors with never_fail skip on timeout."""
+ async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor",
never_fail=True)
+ with pytest.raises(AirflowSkipException):
+ async_sensor.resume_execution(
+ next_method="__fail__",
+ next_kwargs={"error": TriggerFailureReason.TRIGGER_TIMEOUT},
+ context={},
+ )
+
+ @pytest.mark.parametrize(
+ ("soft_fail", "expected_exception"),
+ [
+ (True, AirflowSkipException),
+ (False, TaskDeferralError),
+ ],
+ )
+ def test_trigger_failure_after_resuming_deferred_sensor_with_soft_fail(
+ self, soft_fail, expected_exception
+ ):
+ """Test that deferrable sensors with soft_fail skip on trigger failure
instead of failing."""
+ async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor",
soft_fail=soft_fail)
+ with pytest.raises(expected_exception):
+ async_sensor.resume_execution(
+ next_method="__fail__",
+ next_kwargs={"error": TriggerFailureReason.TRIGGER_FAILURE},
+ context={},
+ )
+
+ def
test_trigger_failure_after_resuming_deferred_sensor_with_never_fail(self):
+ """Test that deferrable sensors with never_fail skip on trigger
failure."""
+ async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor",
never_fail=True)
+ with pytest.raises(AirflowSkipException):
+ async_sensor.resume_execution(
+ next_method="__fail__",
+ next_kwargs={"error": TriggerFailureReason.TRIGGER_FAILURE},
+ context={},
+ )