This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8754b509bd09f21c8395aa4ded5edaf2e5f2868c
Author: Wei Lee <[email protected]>
AuthorDate: Sat Aug 26 22:28:24 2023 +0800

    Respect "soft_fail" for core async sensors (#33403)
    
    * fix(sensors): ensure that DateTimeSensorAsync, TimeDeltaSensorAsync, 
TimeSensorAsync respect soft_fail
    
    * refactor(sensors): move the soft_fail checking logic from 
DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync to DateTimeTrigger
    
    * test(triggers/temporal): add test case for DateTimeSensorAsync respects 
soft_fail
    
    * fix(triggers/temporal): use the original error message with skipping 
postfix as message for AirflowSkipException
    
    * Revert "fix(triggers/temporal): use the original error message with 
skipping postfix as message for AirflowSkipException"
    
    This reverts commit a6d803303bf71a84e9e59e94d9c088e3120bedb5.
    
    * Revert "test(triggers/temporal): add test case for DateTimeSensorAsync 
respects soft_fail"
    
    This reverts commit 50e39e08a415685ace788ae728397a199c21e82b.
    
    * Revert "refactor(sensors): move the soft_fail checking logic from 
DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync to DateTimeTrigger"
    
    This reverts commit 985981a269cea68da719d6fd1c60bedd9a7e5225.
    
    * Revert "fix(sensors): ensure that DateTimeSensorAsync, 
TimeDeltaSensorAsync, TimeSensorAsync respect soft_fail"
    
    This reverts commit b2f2662ae1a11ea928aad57acd2892c763c2db25.
    
    * fix(sensors): move core async sensor trigger initialization to __init__ 
if possible
    
    (cherry picked from commit 9ce76e321f9792690a8d93d5ecb4df9bdaf8fac9)
---
 airflow/sensors/date_time.py   |  6 +++++-
 airflow/sensors/time_delta.py  | 10 +++++++++-
 airflow/sensors/time_sensor.py |  3 ++-
 3 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py
index 1425028870..2ac17ca1b6 100644
--- a/airflow/sensors/date_time.py
+++ b/airflow/sensors/date_time.py
@@ -85,9 +85,13 @@ class DateTimeSensorAsync(DateTimeSensor):
     :param target_time: datetime after which the job succeeds. (templated)
     """
 
+    def __init__(self, **kwargs) -> None:
+        super().__init__(**kwargs)
+        self.trigger = DateTimeTrigger(moment=timezone.parse(self.target_time))
+
     def execute(self, context: Context):
         self.defer(
-            trigger=DateTimeTrigger(moment=timezone.parse(self.target_time)),
+            trigger=self.trigger,
             method_name="execute_complete",
         )
 
diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py
index 1571334757..dfedcd706f 100644
--- a/airflow/sensors/time_delta.py
+++ b/airflow/sensors/time_delta.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+from airflow.exceptions import AirflowSkipException
 from airflow.sensors.base import BaseSensorOperator
 from airflow.triggers.temporal import DateTimeTrigger
 from airflow.utils import timezone
@@ -64,7 +65,14 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
     def execute(self, context: Context):
         target_dttm = context["data_interval_end"]
         target_dttm += self.delta
-        self.defer(trigger=DateTimeTrigger(moment=target_dttm), 
method_name="execute_complete")
+        try:
+            trigger = DateTimeTrigger(moment=target_dttm)
+        except (TypeError, ValueError) as e:
+            if self.soft_fail:
+                raise AirflowSkipException("Skipping due to soft_fail is set 
to True.") from e
+            raise
+
+        self.defer(trigger=trigger, method_name="execute_complete")
 
     def execute_complete(self, context, event=None):
         """Execute for when the trigger fires - return immediately."""
diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py
index 7f6809851a..c459003090 100644
--- a/airflow/sensors/time_sensor.py
+++ b/airflow/sensors/time_sensor.py
@@ -68,10 +68,11 @@ class TimeSensorAsync(BaseSensorOperator):
         )
 
         self.target_datetime = timezone.convert_to_utc(aware_time)
+        self.trigger = DateTimeTrigger(moment=self.target_datetime)
 
     def execute(self, context: Context):
         self.defer(
-            trigger=DateTimeTrigger(moment=self.target_datetime),
+            trigger=self.trigger,
             method_name="execute_complete",
         )
 

Reply via email to