RNHTTR commented on code in PR #41557:
URL: https://github.com/apache/airflow/pull/41557#discussion_r1723825971
##########
airflow/sensors/time_delta.py:
##########
@@ -89,3 +92,37 @@ def execute(self, context: Context) -> bool | NoReturn:
def execute_complete(self, context: Context, event: Any = None) -> None:
"""Handle the event when the trigger fires and return immediately."""
return None
+
+
+class WaitSensor(BaseSensorOperator):
+ """
+ A sensor that waits a specified period of time before completing.
+
+ This differs from TimeDeltaSensor because the time to wait is measured
from the start of the task, not
+ the data_interval_end of the DAG run.
+
+ :param time_to_wait: time length to wait after the task starts before
succeeding.
+ :param deferrable: Run sensor in deferrable mode
+ """
+
+ def __init__(
+ self,
+ time_to_wait: timedelta | int,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.deferrable = deferrable
+ if isinstance(time_to_wait, int):
+ self.delta = timedelta(minutes=time_to_wait)
+ else:
+ self.delta = time_to_wait
+
+ def execute(self, context: Context) -> None:
+ if self.deferrable:
+ self.defer(
+ trigger=TimeDeltaTrigger(self.delta, end_from_trigger=True),
Review Comment:
```suggestion
trigger=TimeDeltaTrigger(self.time_to_wait,
end_from_trigger=True),
```
##########
airflow/sensors/time_delta.py:
##########
@@ -89,3 +92,37 @@ def execute(self, context: Context) -> bool | NoReturn:
def execute_complete(self, context: Context, event: Any = None) -> None:
"""Handle the event when the trigger fires and return immediately."""
return None
+
+
+class WaitSensor(BaseSensorOperator):
+ """
+ A sensor that waits a specified period of time before completing.
+
+ This differs from TimeDeltaSensor because the time to wait is measured
from the start of the task, not
+ the data_interval_end of the DAG run.
+
+ :param time_to_wait: time length to wait after the task starts before
succeeding.
+ :param deferrable: Run sensor in deferrable mode
+ """
+
+ def __init__(
+ self,
+ time_to_wait: timedelta | int,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.deferrable = deferrable
+ if isinstance(time_to_wait, int):
+ self.delta = timedelta(minutes=time_to_wait)
Review Comment:
```suggestion
self.time_to_wait = timedelta(minutes=time_to_wait)
```
##########
airflow/sensors/time_delta.py:
##########
@@ -89,3 +92,37 @@ def execute(self, context: Context) -> bool | NoReturn:
def execute_complete(self, context: Context, event: Any = None) -> None:
"""Handle the event when the trigger fires and return immediately."""
return None
+
+
+class WaitSensor(BaseSensorOperator):
+ """
+ A sensor that waits a specified period of time before completing.
+
+ This differs from TimeDeltaSensor because the time to wait is measured
from the start of the task, not
+ the data_interval_end of the DAG run.
+
+ :param time_to_wait: time length to wait after the task starts before
succeeding.
+ :param deferrable: Run sensor in deferrable mode
+ """
+
+ def __init__(
+ self,
+ time_to_wait: timedelta | int,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.deferrable = deferrable
+ if isinstance(time_to_wait, int):
+ self.delta = timedelta(minutes=time_to_wait)
+ else:
+ self.delta = time_to_wait
Review Comment:
```suggestion
self.time_to_wait = time_to_wait
```
##########
airflow/sensors/time_delta.py:
##########
@@ -89,3 +92,37 @@ def execute(self, context: Context) -> bool | NoReturn:
def execute_complete(self, context: Context, event: Any = None) -> None:
"""Handle the event when the trigger fires and return immediately."""
return None
+
+
+class WaitSensor(BaseSensorOperator):
+ """
+ A sensor that waits a specified period of time before completing.
+
+ This differs from TimeDeltaSensor because the time to wait is measured
from the start of the task, not
+ the data_interval_end of the DAG run.
+
+ :param time_to_wait: time length to wait after the task starts before
succeeding.
+ :param deferrable: Run sensor in deferrable mode
+ """
+
+ def __init__(
+ self,
+ time_to_wait: timedelta | int,
+ deferrable: bool = conf.getboolean("operators", "default_deferrable",
fallback=False),
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+ self.deferrable = deferrable
+ if isinstance(time_to_wait, int):
+ self.delta = timedelta(minutes=time_to_wait)
+ else:
+ self.delta = time_to_wait
+
+ def execute(self, context: Context) -> None:
+ if self.deferrable:
+ self.defer(
+ trigger=TimeDeltaTrigger(self.delta, end_from_trigger=True),
+ method_name="execute_complete",
+ )
+ else:
+ sleep(int(self.delta.total_seconds()))
Review Comment:
```suggestion
sleep(int(self.time_to_wait.total_seconds()))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]