eskarimov opened a new issue #19382: URL: https://github.com/apache/airflow/issues/19382
### Apache Airflow version 2.2.0 ### Operating System Debian GNU/Linux 10 (buster) ### Versions of Apache Airflow Providers _No response_ ### Deployment Docker-Compose ### Deployment details ```bash ./breeze start-airflow --python 3.7 --backend postgres ``` ### What happened When a task is resumed after being deferred, its `start_time` is not equal to the original `start_time`, but to the timestamp when a task has resumed. In case a task has `execution_timeout` set up and it's running longer, it might not raise a timeout error, because technically a brand new task instance starts after being deferred. I know it's expected that it'd be a brand new task instance, but the documentation describes the behaviour with `execution_timeout` set differently (see below in "What you expected to happen") It is especially true, if an Operator needs to be deferred multiple times, so every time it continues after `defer`, time starts to count again. Some task instance details after an example task has completed: | Attribute | Value | | ------------- | ------------- | | execution_date | 2021-11-03, 14:45:29 | | trigger_timeout | 2021-11-03, 14:46:30 | | start_date | 2021-11-03, 14:46:32 | | end_date | 2021-11-03, 14:47:02 | | execution_timeout | 0:01:00 | | duration | 30.140004 | | state | success | ### What you expected to happen [Documentation](https://airflow.apache.org/docs/apache-airflow/2.2.0/concepts/deferring.html#triggering-deferral) says: - Note that ``execution_timeout`` on Operators is considered over the *total runtime*, not individual executions in-between deferrals - this means that if ``execution_timeout`` is set, an Operator may fail while it's deferred or while it's running after a deferral, even if it's only been resumed for a few seconds. Also, I see the [following code part](https://github.com/apache/airflow/blob/94a0a0e8ce4d2b54cd6a08301684e299ca3c36cb/airflow/models/taskinstance.py#L1495-L1500) trying to check the timeout value after the task is coming back from the deferral state: ```python # If we are coming in with a next_method (i.e. from a deferral), # calculate the timeout from our start_date. if self.next_method: timeout_seconds = ( task_copy.execution_timeout - (timezone.utcnow() - self.start_date) ).total_seconds() ``` But the issue is that `self.start_date` isn't equal to the original task's `start_date` ### How to reproduce DAG: ```python from datetime import datetime, timedelta from airflow import DAG from airflow.sensors.time_delta import TimeDeltaSensorAsync with DAG( dag_id='time_delta_async_bug', schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, ) as dag: time_delta_async_sensor = TimeDeltaSensorAsync(task_id='time_delta_task_id', delta=timedelta(seconds=60), execution_timeout=timedelta(seconds=60), ) ``` Since there're not so many async Operators at the moment I slightly modified `TimeDeltaSensorAsync` in order to simulate task job after defer. Here is the full code for `TimeDeltaSensorAsync` class I used for to reproduce the issue, the only difference is the line with `time.sleep(30)` to simulate post-processing event after a trigger has completed. ```python class TimeDeltaSensorAsync(TimeDeltaSensor): """ A drop-in replacement for TimeDeltaSensor that defers itself to avoid taking up a worker slot while it is waiting. :param delta: time length to wait after the data interval before succeeding. :type delta: datetime.timedelta """ def execute(self, context): target_dttm = context['data_interval_end'] target_dttm += self.delta self.defer(trigger=DateTimeTrigger(moment=target_dttm), method_name="execute_complete") def execute_complete(self, context, event=None): # pylint: disable=unused-argument """Callback for when the trigger fires - returns immediately.""" time.sleep(30) # Simulate processing event after trigger completed return None ``` ### Anything else I've checked the mark box "I'm willing to submit a PR", but not sure where to start, would be happy if someone could help me with the guidance in which direction I should look at. ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
