This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 50e2ecb2b8 Wait sensor async (#41557)
50e2ecb2b8 is described below
commit 50e2ecb2b8693e558572cb5f073aff9d8a7a6570
Author: Collin McNulty <[email protected]>
AuthorDate: Tue Aug 20 14:50:54 2024 -0500
Wait sensor async (#41557)
---
airflow/sensors/time_delta.py | 39 ++++++++++++++++++++++++++++++++++++++-
tests/sensors/test_time_delta.py | 23 +++++++++++++++++++++--
2 files changed, 59 insertions(+), 3 deletions(-)
diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py
index d068fad9bf..dc78a0e33b 100644
--- a/airflow/sensors/time_delta.py
+++ b/airflow/sensors/time_delta.py
@@ -17,11 +17,14 @@
# under the License.
from __future__ import annotations
+from datetime import timedelta
+from time import sleep
from typing import TYPE_CHECKING, Any, NoReturn
+from airflow.configuration import conf
from airflow.exceptions import AirflowSkipException
from airflow.sensors.base import BaseSensorOperator
-from airflow.triggers.temporal import DateTimeTrigger
+from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
from airflow.utils import timezone
if TYPE_CHECKING:
@@ -89,3 +92,37 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
def execute_complete(self, context: Context, event: Any = None) -> None:
"""Handle the event when the trigger fires and return immediately."""
return None
+
+
+class WaitSensor(BaseSensorOperator):
+ """
+ A sensor that waits a specified period of time before completing.
+
+ This differs from TimeDeltaSensor because the time to wait is measured
from the start of the task, not
+ the data_interval_end of the DAG run.
+
+ :param time_to_wait: time length to wait after the task starts before
succeeding.
+ :param deferrable: Run sensor in deferrable mode
+ """
+
+ def __init__(
+ self,
+ time_to_wait: timedelta | int,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.deferrable = deferrable
+ if isinstance(time_to_wait, int):
+ self.time_to_wait = timedelta(minutes=time_to_wait)
+ else:
+ self.time_to_wait = time_to_wait
+
+ def execute(self, context: Context) -> None:
+ if self.deferrable:
+ self.defer(
+ trigger=TimeDeltaTrigger(self.time_to_wait,
end_from_trigger=True),
+ method_name="execute_complete",
+ )
+ else:
+ sleep(int(self.time_to_wait.total_seconds()))
diff --git a/tests/sensors/test_time_delta.py b/tests/sensors/test_time_delta.py
index b437937df2..408a3c8828 100644
--- a/tests/sensors/test_time_delta.py
+++ b/tests/sensors/test_time_delta.py
@@ -22,15 +22,15 @@ from unittest import mock
import pendulum
import pytest
+import time_machine
from airflow.models import DagBag
from airflow.models.dag import DAG
-from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync
+from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync,
WaitSensor
from airflow.utils.timezone import datetime
pytestmark = pytest.mark.db_test
-
DEFAULT_DATE = datetime(2015, 1, 1)
DEV_NULL = "/dev/null"
TEST_DAG_ID = "unit_tests"
@@ -71,3 +71,22 @@ class TestTimeDeltaSensorAsync:
defer_mock.assert_called_once()
else:
defer_mock.assert_not_called()
+
+ @pytest.mark.parametrize(
+ "should_defer",
+ [False, True],
+ )
+ @mock.patch("airflow.models.baseoperator.BaseOperator.defer")
+ @mock.patch("airflow.sensors.time_delta.sleep")
+ def test_wait_sensor(self, sleep_mock, defer_mock, should_defer):
+ wait_time = timedelta(seconds=30)
+ op = WaitSensor(
+ task_id="wait_sensor_check", time_to_wait=wait_time, dag=self.dag,
deferrable=should_defer
+ )
+ with time_machine.travel(pendulum.datetime(year=2024, month=8, day=1,
tz="UTC"), tick=False):
+ op.execute({})
+ if should_defer:
+ defer_mock.assert_called_once()
+ else:
+ defer_mock.assert_not_called()
+ sleep_mock.assert_called_once_with(30)