jedcunningham commented on code in PR #47652:
URL: https://github.com/apache/airflow/pull/47652#discussion_r1990573749


##########
providers/standard/docs/sensors/datetime.rst:
##########
@@ -56,6 +56,8 @@ TimeSensor
 
 Use the :class:`~airflow.providers.standard.sensors.time_sensor.TimeSensor` to 
end sensing after time specified.
 
+Time will be evaluated against data_interval_end if present for the dag run, 
otherwise run_after will be used.

Review Comment:
   ```suggestion
   Time will be evaluated against ``data_interval_end`` if present for the dag 
run, otherwise ``run_after`` will be used.
   ```



##########
providers/standard/src/airflow/providers/standard/sensors/time_delta.py:
##########
@@ -46,29 +46,43 @@ def _get_airflow_version():
 
 class TimeDeltaSensor(BaseSensorOperator):
     """
-    Waits for a timedelta after the run's data interval.
+    Waits for a timedelta.
 
-    :param delta: time length to wait after the data interval before 
succeeding.
+    The delta will be evaluated against data_interval_end if present for the 
dag run,
+    otherwise run_after will be used.
+
+    :param delta: time to wait before succeeding.
 
     .. seealso::
         For more information on how to use this sensor, take a look at the 
guide:
         :ref:`howto/operator:TimeDeltaSensor`
 
-
     """
 
     def __init__(self, *, delta, **kwargs):
         super().__init__(**kwargs)
         self.delta = delta
 
-    def poke(self, context: Context):
-        data_interval_end = context["data_interval_end"]
+    def _derive_base_time(self, context: Context) -> datetime:
+        """
+        Get the "base time" against which the delta should be calculated.
 
-        if not isinstance(data_interval_end, datetime):
-            raise ValueError("`data_interval_end` returned non-datetime 
object")
+        If data_interval_end is populated, use it; else use run_after.
+        """
+        data_interval_end = context.get("data_interval_end")
+        if data_interval_end:
+            base_time = data_interval_end
+        else:
+            dag_run = context["dag_run"]
+            if not dag_run:
+                raise ValueError("No dag run found in task context")
+            base_time = dag_run.run_after
+        return base_time

Review Comment:
   ```suggestion
               return data_interval_end
   
           dag_run = context["dag_run"]
           if not dag_run:
               raise ValueError("No dag run found in task context")
           return dag_run.run_after
   ```
   
   nit



##########
providers/standard/docs/sensors/datetime.rst:
##########
@@ -71,6 +73,8 @@ TimeSensorAsync
 Use the 
:class:`~airflow.providers.standard.sensors.time_sensor.TimeSensorAsync` to end 
sensing after time specified.
 It is an async version of the operator and requires Triggerer to run.
 
+Time will be evaluated against data_interval_end if present for the dag run, 
otherwise run_after will be used.

Review Comment:
   ```suggestion
   Time will be evaluated against ``data_interval_end`` if present for the dag 
run, otherwise ``run_after`` will be used.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to