hussein-awala commented on code in PR #33718:
URL: https://github.com/apache/airflow/pull/33718#discussion_r1306170740


##########
airflow/sensors/time_delta.py:
##########
@@ -64,7 +64,11 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
     def execute(self, context: Context):
         target_dttm = context["data_interval_end"]
         target_dttm += self.delta
-        self.defer(trigger=DateTimeTrigger(moment=target_dttm), 
method_name="execute_complete")
+        self.defer(
+            trigger=DateTimeTrigger(moment=target_dttm),
+            method_name="execute_complete",
+            timeout=self.timeout,

Review Comment:
   > Can you clarify which scenario you are talking about.
   
   > Maybe just clarify, with this example, what is the desired behavior that 
we need to design for, and how this design perhaps falls short.
   
   When we add `timeout=1h` for a sensor, we expect that the total execution 
date for this sensor is 1h, even if fails and retires multiple time.
   
   From the doc:
   > timeout 
([datetime.timedelta](https://docs.python.org/3/library/datetime.html#datetime.timedelta)
 | [float](https://docs.python.org/3/library/functions.html#float)) – Time 
elapsed before the task times out and fails. Can be timedelta or float seconds. 
This should not be confused with execution_timeout of the BaseOperator class. 
timeout measures the time elapsed between the first poke and the current time 
(taking into account any reschedule delay between each poke), while 
execution_timeout checks the running time of the task (leaving out any 
reschedule delay). In case that the mode is poke (see below), both of them are 
equivalent (as the sensor is never rescheduled), which is not the case in 
reschedule mode.
   
   So currently, in reschedule mode, the duration is calculated as:
   ```python
   run_duration = (timezone.utcnow() - start_date).total_seconds()
   
   # and start_date is
   first_try_number = context["ti"].max_tries - self.retries + 1
   task_reschedules = TaskReschedule.find_for_task_instance(
       context["ti"], try_number=first_try_number
   )
   if not task_reschedules:
       start_date = timezone.utcnow()
   else:
       start_date = task_reschedules[0].start_date
   ```
   Personally, I have a Glue sensor, and I'm setting `timeout` to 4h (it wait 
for a partition in a database). Sometimes the poke method fails because there 
is a pressure on the Glue API, for that I set retries=10, and I don't care even 
if it's 100, because the sensor will timeout after 4h regardless the number of 
attempts.
   
   This behavior could be unexpected in Airflow, but it's the current behavior 
in the sync mode, and the users (including me) use it like that. So if we want 
to change something, at least we need to update the documentation and explain 
why this is different, and maybe fix the sync mode if there is an unexpected 
behavior.



##########
airflow/sensors/time_delta.py:
##########
@@ -64,7 +64,11 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
     def execute(self, context: Context):
         target_dttm = context["data_interval_end"]
         target_dttm += self.delta
-        self.defer(trigger=DateTimeTrigger(moment=target_dttm), 
method_name="execute_complete")
+        self.defer(
+            trigger=DateTimeTrigger(moment=target_dttm),
+            method_name="execute_complete",
+            timeout=self.timeout,

Review Comment:
   > Can you clarify which scenario you are talking about.
   
   > Maybe just clarify, with this example, what is the desired behavior that 
we need to design for, and how this design perhaps falls short.
   
   When we add `timeout=1h` for a sensor, we expect that the total execution 
date for this sensor is 1h, even if fails and retires multiple time.
   
   From the doc:
   > timeout 
([datetime.timedelta](https://docs.python.org/3/library/datetime.html#datetime.timedelta)
 | [float](https://docs.python.org/3/library/functions.html#float)) – Time 
elapsed before the task times out and fails. Can be timedelta or float seconds. 
This should not be confused with execution_timeout of the BaseOperator class. 
timeout measures the time elapsed between the first poke and the current time 
(taking into account any reschedule delay between each poke), while 
execution_timeout checks the running time of the task (leaving out any 
reschedule delay). In case that the mode is poke (see below), both of them are 
equivalent (as the sensor is never rescheduled), which is not the case in 
reschedule mode.
   
   So currently, in reschedule mode, the duration is calculated as:
   ```python
   run_duration = (timezone.utcnow() - start_date).total_seconds()
   
   # and start_date is
   first_try_number = context["ti"].max_tries - self.retries + 1
   task_reschedules = TaskReschedule.find_for_task_instance(
       context["ti"], try_number=first_try_number
   )
   if not task_reschedules:
       start_date = timezone.utcnow()
   else:
       start_date = task_reschedules[0].start_date
   ```
   Personally, I have a Glue sensor, and I'm setting `timeout` to 4h (it wait 
for a partition in a database). Sometimes the poke method fails because there 
is a pressure on the Glue API, for that I set retries=10, and I don't care even 
if it's 100, because the sensor will timeout after 4h regardless the number of 
attempts.
   
   This behavior could be unexpected in Airflow, but it's the current behavior 
in the sync mode, and the users (including me) use it like that. So if we want 
to change something, at least we need to update the documentation and explain 
why this is different, and maybe fix the sync mode if there is an unexpected 
behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to