This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-9-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 967d66af52a74d9a08f6a85c8e9ebae5ccec9229 Author: Jed Cunningham <[email protected]> AuthorDate: Tue May 21 10:41:05 2024 -0600 Better typing for BaseOperator `defer` (#39742) This adds typing for the `defer` method, and covers the core deferrable sensors. It also fixes 1 error in a databricks provider, but leaves the rest of the provider ecosystem alone. (cherry picked from commit f18e6340d89f9fb18850c1340434064276a95c54) --- airflow/models/baseoperator.py | 3 ++- airflow/sensors/date_time.py | 6 +++--- airflow/sensors/time_delta.py | 6 +++--- airflow/sensors/time_sensor.py | 4 ++-- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index 44d8e4e82f..686988da08 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -41,6 +41,7 @@ from typing import ( Callable, Collection, Iterable, + NoReturn, Sequence, TypeVar, Union, @@ -1674,7 +1675,7 @@ class BaseOperator(AbstractOperator, metaclass=BaseOperatorMeta): method_name: str, kwargs: dict[str, Any] | None = None, timeout: timedelta | None = None, - ): + ) -> NoReturn: """ Mark this Operator "deferred", suspending its execution until the provided trigger fires an event. diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py index 65880ebb9e..b0763ebd40 100644 --- a/airflow/sensors/date_time.py +++ b/airflow/sensors/date_time.py @@ -18,7 +18,7 @@ from __future__ import annotations import datetime -from typing import TYPE_CHECKING, Sequence +from typing import TYPE_CHECKING, NoReturn, Sequence from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger @@ -90,13 +90,13 @@ class DateTimeSensorAsync(DateTimeSensor): def __init__(self, **kwargs) -> None: super().__init__(**kwargs) - def execute(self, context: Context): + def execute(self, context: Context) -> NoReturn: trigger = DateTimeTrigger(moment=timezone.parse(self.target_time)) self.defer( trigger=trigger, method_name="execute_complete", ) - def execute_complete(self, context, event=None): + def execute_complete(self, context, event=None) -> None: """Execute when the trigger fires - returns immediately.""" return None diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py index 3595e551b0..82d16bbae6 100644 --- a/airflow/sensors/time_delta.py +++ b/airflow/sensors/time_delta.py @@ -17,7 +17,7 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, NoReturn from airflow.exceptions import AirflowSkipException from airflow.sensors.base import BaseSensorOperator @@ -66,7 +66,7 @@ class TimeDeltaSensorAsync(TimeDeltaSensor): """ - def execute(self, context: Context): + def execute(self, context: Context) -> NoReturn: target_dttm = context["data_interval_end"] target_dttm += self.delta try: @@ -78,6 +78,6 @@ class TimeDeltaSensorAsync(TimeDeltaSensor): self.defer(trigger=trigger, method_name="execute_complete") - def execute_complete(self, context, event=None): + def execute_complete(self, context, event=None) -> None: """Execute for when the trigger fires - return immediately.""" return None diff --git a/airflow/sensors/time_sensor.py b/airflow/sensors/time_sensor.py index 6df67bc855..91c1354782 100644 --- a/airflow/sensors/time_sensor.py +++ b/airflow/sensors/time_sensor.py @@ -18,7 +18,7 @@ from __future__ import annotations import datetime -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, NoReturn from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import DateTimeTrigger @@ -72,7 +72,7 @@ class TimeSensorAsync(BaseSensorOperator): self.target_datetime = timezone.convert_to_utc(aware_time) - def execute(self, context: Context): + def execute(self, context: Context) -> NoReturn: trigger = DateTimeTrigger(moment=self.target_datetime) self.defer( trigger=trigger,
