This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 50e2ecb2b8 Wait sensor async (#41557)
50e2ecb2b8 is described below

commit 50e2ecb2b8693e558572cb5f073aff9d8a7a6570
Author: Collin McNulty <[email protected]>
AuthorDate: Tue Aug 20 14:50:54 2024 -0500

    Wait sensor async (#41557)
---
 airflow/sensors/time_delta.py    | 39 ++++++++++++++++++++++++++++++++++++++-
 tests/sensors/test_time_delta.py | 23 +++++++++++++++++++++--
 2 files changed, 59 insertions(+), 3 deletions(-)

diff --git a/airflow/sensors/time_delta.py b/airflow/sensors/time_delta.py
index d068fad9bf..dc78a0e33b 100644
--- a/airflow/sensors/time_delta.py
+++ b/airflow/sensors/time_delta.py
@@ -17,11 +17,14 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import timedelta
+from time import sleep
 from typing import TYPE_CHECKING, Any, NoReturn
 
+from airflow.configuration import conf
 from airflow.exceptions import AirflowSkipException
 from airflow.sensors.base import BaseSensorOperator
-from airflow.triggers.temporal import DateTimeTrigger
+from airflow.triggers.temporal import DateTimeTrigger, TimeDeltaTrigger
 from airflow.utils import timezone
 
 if TYPE_CHECKING:
@@ -89,3 +92,37 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
     def execute_complete(self, context: Context, event: Any = None) -> None:
         """Handle the event when the trigger fires and return immediately."""
         return None
+
+
+class WaitSensor(BaseSensorOperator):
+    """
+    A sensor that waits a specified period of time before completing.
+
+    This differs from TimeDeltaSensor because the time to wait is measured 
from the start of the task, not
+    the data_interval_end of the DAG run.
+
+    :param time_to_wait: time length to wait after the task starts before 
succeeding.
+    :param deferrable: Run sensor in deferrable mode
+    """
+
+    def __init__(
+        self,
+        time_to_wait: timedelta | int,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.deferrable = deferrable
+        if isinstance(time_to_wait, int):
+            self.time_to_wait = timedelta(minutes=time_to_wait)
+        else:
+            self.time_to_wait = time_to_wait
+
+    def execute(self, context: Context) -> None:
+        if self.deferrable:
+            self.defer(
+                trigger=TimeDeltaTrigger(self.time_to_wait, 
end_from_trigger=True),
+                method_name="execute_complete",
+            )
+        else:
+            sleep(int(self.time_to_wait.total_seconds()))
diff --git a/tests/sensors/test_time_delta.py b/tests/sensors/test_time_delta.py
index b437937df2..408a3c8828 100644
--- a/tests/sensors/test_time_delta.py
+++ b/tests/sensors/test_time_delta.py
@@ -22,15 +22,15 @@ from unittest import mock
 
 import pendulum
 import pytest
+import time_machine
 
 from airflow.models import DagBag
 from airflow.models.dag import DAG
-from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync
+from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync, 
WaitSensor
 from airflow.utils.timezone import datetime
 
 pytestmark = pytest.mark.db_test
 
-
 DEFAULT_DATE = datetime(2015, 1, 1)
 DEV_NULL = "/dev/null"
 TEST_DAG_ID = "unit_tests"
@@ -71,3 +71,22 @@ class TestTimeDeltaSensorAsync:
             defer_mock.assert_called_once()
         else:
             defer_mock.assert_not_called()
+
+    @pytest.mark.parametrize(
+        "should_defer",
+        [False, True],
+    )
+    @mock.patch("airflow.models.baseoperator.BaseOperator.defer")
+    @mock.patch("airflow.sensors.time_delta.sleep")
+    def test_wait_sensor(self, sleep_mock, defer_mock, should_defer):
+        wait_time = timedelta(seconds=30)
+        op = WaitSensor(
+            task_id="wait_sensor_check", time_to_wait=wait_time, dag=self.dag, 
deferrable=should_defer
+        )
+        with time_machine.travel(pendulum.datetime(year=2024, month=8, day=1, 
tz="UTC"), tick=False):
+            op.execute({})
+            if should_defer:
+                defer_mock.assert_called_once()
+            else:
+                defer_mock.assert_not_called()
+                sleep_mock.assert_called_once_with(30)

Reply via email to