This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-7-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8754b509bd09f21c8395aa4ded5edaf2e5f2868c Author: Wei Lee <[email protected]> AuthorDate: Sat Aug 26 22:28:24 2023 +0800 Respect "soft_fail" for core async sensors (#33403) * fix(sensors): ensure that DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync respect soft_fail * refactor(sensors): move the soft_fail checking logic from DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync to DateTimeTrigger * test(triggers/temporal): add test case for DateTimeSensorAsync respects soft_fail * fix(triggers/temporal): use the original error message with skipping postfix as message for AirflowSkipException * Revert "fix(triggers/temporal): use the original error message with skipping postfix as message for AirflowSkipException" This reverts commit a6d803303bf71a84e9e59e94d9c088e3120bedb5. * Revert "test(triggers/temporal): add test case for DateTimeSensorAsync respects soft_fail" This reverts commit 50e39e08a415685ace788ae728397a199c21e82b. * Revert "refactor(sensors): move the soft_fail checking logic from DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync to DateTimeTrigger" This reverts commit 985981a269cea68da719d6fd1c60bedd9a7e5225. * Revert "fix(sensors): ensure that DateTimeSensorAsync, TimeDeltaSensorAsync, TimeSensorAsync respect soft_fail" This reverts commit b2f2662ae1a11ea928aad57acd2892c763c2db25. * fix(sensors): move core async sensor trigger initialization to __init__ if possible (cherry picked from commit 9ce76e321f9792690a8d93d5ecb4df9bdaf8fac9) --- airflow/sensors/date_time.py | 6 +++++- airflow/sensors/time_delta.py | 10 +++++++++- airflow/sensors/time_sensor.py | 3 ++- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py index 1425028870..2ac17ca1b6 100644 --- a/airflow/sensors/date_time.py +++ b/airflow/sensors/date_time.py @@ -85,9 +85,13 @@ class DateTimeSensorAsync(DateTimeSensor): :param target_time: datetime after which the job succeeds. (templated) """ + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self.trigger = DateTimeTrigger(moment=timezone.parse(self.target_time)) + def execute(self, context: Context): self.defer( - trigger=DateTimeTrigger(moment=timezone.parse(self.target_time)), + trigger=self.trigger, method_name="execute_complete", ) diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py index 1571334757..dfedcd706f 100644 --- a/airflow/sensors/time_delta.py +++ b/airflow/sensors/time_delta.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger from airflow.utils import timezone @@ -64,7 +65,14 @@ class TimeDeltaSensorAsync(TimeDeltaSensor): def execute(self, context: Context): target_dttm = context["data_interval_end"] target_dttm += self.delta - self.defer(trigger=DateTimeTrigger(moment=target_dttm), method_name="execute_complete") + try: + trigger = DateTimeTrigger(moment=target_dttm) + except (TypeError, ValueError) as e: + if self.soft_fail: + raise AirflowSkipException("Skipping due to soft_fail is set to True.") from e + raise + + self.defer(trigger=trigger, method_name="execute_complete") def execute_complete(self, context, event=None): """Execute for when the trigger fires - return immediately.""" diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py index 7f6809851a..c459003090 100644 --- a/airflow/sensors/time_sensor.py +++ b/airflow/sensors/time_sensor.py @@ -68,10 +68,11 @@ class TimeSensorAsync(BaseSensorOperator): ) self.target_datetime = timezone.convert_to_utc(aware_time) + self.trigger = DateTimeTrigger(moment=self.target_datetime) def execute(self, context: Context): self.defer( - trigger=DateTimeTrigger(moment=self.target_datetime), + trigger=self.trigger, method_name="execute_complete", )
