hussein-awala commented on code in PR #33718:
URL: https://github.com/apache/airflow/pull/33718#discussion_r1306170740
##########
airflow/sensors/time_delta.py:
##########
@@ -64,7 +64,11 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
def execute(self, context: Context):
target_dttm = context["data_interval_end"]
target_dttm += self.delta
- self.defer(trigger=DateTimeTrigger(moment=target_dttm),
method_name="execute_complete")
+ self.defer(
+ trigger=DateTimeTrigger(moment=target_dttm),
+ method_name="execute_complete",
+ timeout=self.timeout,
Review Comment:
> Can you clarify which scenario you are talking about.
> Maybe just clarify, with this example, what is the desired behavior that
we need to design for, and how this design perhaps falls short.
When we add `timeout=1h` for a sensor, we expect that the total execution
date for this sensor is 1h, even if fails and retires multiple time.
From the doc:
> timeout
([datetime.timedelta](https://docs.python.org/3/library/datetime.html#datetime.timedelta)
| [float](https://docs.python.org/3/library/functions.html#float)) – Time
elapsed before the task times out and fails. Can be timedelta or float seconds.
This should not be confused with execution_timeout of the BaseOperator class.
timeout measures the time elapsed between the first poke and the current time
(taking into account any reschedule delay between each poke), while
execution_timeout checks the running time of the task (leaving out any
reschedule delay). In case that the mode is poke (see below), both of them are
equivalent (as the sensor is never rescheduled), which is not the case in
reschedule mode.
So currently, in reschedule mode, the duration is calculated as:
```python
run_duration = (timezone.utcnow() - start_date).total_seconds()
# and start_date is
first_try_number = context["ti"].max_tries - self.retries + 1
task_reschedules = TaskReschedule.find_for_task_instance(
context["ti"], try_number=first_try_number
)
if not task_reschedules:
start_date = timezone.utcnow()
else:
start_date = task_reschedules[0].start_date
```
Personally, I have a Glue sensor, and I'm setting `timeout` to 4h (it wait
for a partition in a database). Sometimes the poke method fails because there
is a pressure on the Glue API, for that I set retries=10, and I don't care even
if it's 100, because the sensor will timeout after 4h regardless the number of
attempts.
This behavior could be unexpected in Airflow, but it's the current behavior
in the sync mode, and the users (including me) use it like that. So if we want
to change something, at least we need to update the documentation and explain
why this is different, and maybe fix the sync mode if there is an unexpected
behavior.
##########
airflow/sensors/time_delta.py:
##########
@@ -64,7 +64,11 @@ class TimeDeltaSensorAsync(TimeDeltaSensor):
def execute(self, context: Context):
target_dttm = context["data_interval_end"]
target_dttm += self.delta
- self.defer(trigger=DateTimeTrigger(moment=target_dttm),
method_name="execute_complete")
+ self.defer(
+ trigger=DateTimeTrigger(moment=target_dttm),
+ method_name="execute_complete",
+ timeout=self.timeout,
Review Comment:
> Can you clarify which scenario you are talking about.
> Maybe just clarify, with this example, what is the desired behavior that
we need to design for, and how this design perhaps falls short.
When we add `timeout=1h` for a sensor, we expect that the total execution
date for this sensor is 1h, even if fails and retires multiple time.
From the doc:
> timeout
([datetime.timedelta](https://docs.python.org/3/library/datetime.html#datetime.timedelta)
| [float](https://docs.python.org/3/library/functions.html#float)) – Time
elapsed before the task times out and fails. Can be timedelta or float seconds.
This should not be confused with execution_timeout of the BaseOperator class.
timeout measures the time elapsed between the first poke and the current time
(taking into account any reschedule delay between each poke), while
execution_timeout checks the running time of the task (leaving out any
reschedule delay). In case that the mode is poke (see below), both of them are
equivalent (as the sensor is never rescheduled), which is not the case in
reschedule mode.
So currently, in reschedule mode, the duration is calculated as:
```python
run_duration = (timezone.utcnow() - start_date).total_seconds()
# and start_date is
first_try_number = context["ti"].max_tries - self.retries + 1
task_reschedules = TaskReschedule.find_for_task_instance(
context["ti"], try_number=first_try_number
)
if not task_reschedules:
start_date = timezone.utcnow()
else:
start_date = task_reschedules[0].start_date
```
Personally, I have a Glue sensor, and I'm setting `timeout` to 4h (it wait
for a partition in a database). Sometimes the poke method fails because there
is a pressure on the Glue API, for that I set retries=10, and I don't care even
if it's 100, because the sensor will timeout after 4h regardless the number of
attempts.
This behavior could be unexpected in Airflow, but it's the current behavior
in the sync mode, and the users (including me) use it like that. So if we want
to change something, at least we need to update the documentation and explain
why this is different, and maybe fix the sync mode if there is an unexpected
behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]