RNHTTR commented on code in PR #41557:
URL: https://github.com/apache/airflow/pull/41557#discussion_r1723825971


##########
airflow/sensors/time_delta.py:
##########
@@ -89,3 +92,37 @@ def execute(self, context: Context) -> bool | NoReturn:
     def execute_complete(self, context: Context, event: Any = None) -> None:
         """Handle the event when the trigger fires and return immediately."""
         return None
+
+
+class WaitSensor(BaseSensorOperator):
+    """
+    A sensor that waits a specified period of time before completing.
+
+    This differs from TimeDeltaSensor because the time to wait is measured 
from the start of the task, not
+    the data_interval_end of the DAG run.
+
+    :param time_to_wait: time length to wait after the task starts before 
succeeding.
+    :param deferrable: Run sensor in deferrable mode
+    """
+
+    def __init__(
+        self,
+        time_to_wait: timedelta | int,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.deferrable = deferrable
+        if isinstance(time_to_wait, int):
+            self.delta = timedelta(minutes=time_to_wait)
+        else:
+            self.delta = time_to_wait
+
+    def execute(self, context: Context) -> None:
+        if self.deferrable:
+            self.defer(
+                trigger=TimeDeltaTrigger(self.delta, end_from_trigger=True),

Review Comment:
   ```suggestion
                   trigger=TimeDeltaTrigger(self.time_to_wait, 
end_from_trigger=True),
   ```



##########
airflow/sensors/time_delta.py:
##########
@@ -89,3 +92,37 @@ def execute(self, context: Context) -> bool | NoReturn:
     def execute_complete(self, context: Context, event: Any = None) -> None:
         """Handle the event when the trigger fires and return immediately."""
         return None
+
+
+class WaitSensor(BaseSensorOperator):
+    """
+    A sensor that waits a specified period of time before completing.
+
+    This differs from TimeDeltaSensor because the time to wait is measured 
from the start of the task, not
+    the data_interval_end of the DAG run.
+
+    :param time_to_wait: time length to wait after the task starts before 
succeeding.
+    :param deferrable: Run sensor in deferrable mode
+    """
+
+    def __init__(
+        self,
+        time_to_wait: timedelta | int,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.deferrable = deferrable
+        if isinstance(time_to_wait, int):
+            self.delta = timedelta(minutes=time_to_wait)

Review Comment:
   ```suggestion
               self.time_to_wait = timedelta(minutes=time_to_wait)
   ```



##########
airflow/sensors/time_delta.py:
##########
@@ -89,3 +92,37 @@ def execute(self, context: Context) -> bool | NoReturn:
     def execute_complete(self, context: Context, event: Any = None) -> None:
         """Handle the event when the trigger fires and return immediately."""
         return None
+
+
+class WaitSensor(BaseSensorOperator):
+    """
+    A sensor that waits a specified period of time before completing.
+
+    This differs from TimeDeltaSensor because the time to wait is measured 
from the start of the task, not
+    the data_interval_end of the DAG run.
+
+    :param time_to_wait: time length to wait after the task starts before 
succeeding.
+    :param deferrable: Run sensor in deferrable mode
+    """
+
+    def __init__(
+        self,
+        time_to_wait: timedelta | int,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.deferrable = deferrable
+        if isinstance(time_to_wait, int):
+            self.delta = timedelta(minutes=time_to_wait)
+        else:
+            self.delta = time_to_wait

Review Comment:
   ```suggestion
               self.time_to_wait = time_to_wait
   ```



##########
airflow/sensors/time_delta.py:
##########
@@ -89,3 +92,37 @@ def execute(self, context: Context) -> bool | NoReturn:
     def execute_complete(self, context: Context, event: Any = None) -> None:
         """Handle the event when the trigger fires and return immediately."""
         return None
+
+
+class WaitSensor(BaseSensorOperator):
+    """
+    A sensor that waits a specified period of time before completing.
+
+    This differs from TimeDeltaSensor because the time to wait is measured 
from the start of the task, not
+    the data_interval_end of the DAG run.
+
+    :param time_to_wait: time length to wait after the task starts before 
succeeding.
+    :param deferrable: Run sensor in deferrable mode
+    """
+
+    def __init__(
+        self,
+        time_to_wait: timedelta | int,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+        self.deferrable = deferrable
+        if isinstance(time_to_wait, int):
+            self.delta = timedelta(minutes=time_to_wait)
+        else:
+            self.delta = time_to_wait
+
+    def execute(self, context: Context) -> None:
+        if self.deferrable:
+            self.defer(
+                trigger=TimeDeltaTrigger(self.delta, end_from_trigger=True),
+                method_name="execute_complete",
+            )
+        else:
+            sleep(int(self.delta.total_seconds()))

Review Comment:
   ```suggestion
               sleep(int(self.time_to_wait.total_seconds()))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to