eskarimov opened a new issue #19382:
URL: https://github.com/apache/airflow/issues/19382


   ### Apache Airflow version
   
   2.2.0
   
   ### Operating System
   
   Debian GNU/Linux 10 (buster)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   ```bash
   ./breeze start-airflow --python 3.7 --backend postgres
   ```
   
   ### What happened
   
   When a task is resumed after being deferred, its `start_time` is not equal 
to the original `start_time`, but to the timestamp when a task has resumed.
   
   In case a task has `execution_timeout` set up and it's running longer, it 
might not raise a timeout error, because technically a brand new task instance 
starts after being deferred. 
   I know it's expected that it'd be a brand new task instance, but the 
documentation describes the behaviour with `execution_timeout` set differently 
(see below in "What you expected to happen")
   
   It is especially true, if an Operator needs to be deferred multiple times, 
so every time it continues after `defer`, time starts to count again.
   
   Some task instance details after an example task has completed:
   | Attribute  | Value |
   | ------------- | ------------- |
   | execution_date | 2021-11-03, 14:45:29 |
   | trigger_timeout | 2021-11-03, 14:46:30 |
   | start_date | 2021-11-03, 14:46:32 |
   | end_date | 2021-11-03, 14:47:02 |
   | execution_timeout | 0:01:00 |
   | duration | 30.140004 |
   | state | success |
   
   ### What you expected to happen
   
   
[Documentation](https://airflow.apache.org/docs/apache-airflow/2.2.0/concepts/deferring.html#triggering-deferral)
 says:
   - Note that ``execution_timeout`` on Operators is considered over the *total 
runtime*, not individual executions in-between deferrals - this means that if 
``execution_timeout`` is set, an Operator may fail while it's deferred or while 
it's running after a deferral, even if it's only been resumed for a few seconds.
   
   Also, I see the [following code 
part](https://github.com/apache/airflow/blob/94a0a0e8ce4d2b54cd6a08301684e299ca3c36cb/airflow/models/taskinstance.py#L1495-L1500)
 trying to check the timeout value after the task is coming back from the 
deferral state:
   ```python
               # If we are coming in with a next_method (i.e. from a deferral),
               # calculate the timeout from our start_date.
               if self.next_method:
                   timeout_seconds = (
                       task_copy.execution_timeout - (timezone.utcnow() - 
self.start_date)
                   ).total_seconds()
   ```
   But the issue is that `self.start_date` isn't equal to the original task's 
`start_date`
   
   ### How to reproduce
   
   DAG:
   ```python
   from datetime import datetime, timedelta
   
   from airflow import DAG
   from airflow.sensors.time_delta import TimeDeltaSensorAsync
   
   with DAG(
       dag_id='time_delta_async_bug',
       schedule_interval=None,
       start_date=datetime(2021, 1, 1),
       catchup=False,
   ) as dag:
       time_delta_async_sensor = 
TimeDeltaSensorAsync(task_id='time_delta_task_id',
                                                      
delta=timedelta(seconds=60),
                                                      
execution_timeout=timedelta(seconds=60),
                                                      )
   ```
   Since there're not so many async Operators at the moment I slightly modified 
`TimeDeltaSensorAsync` in order to simulate task job after defer.
   Here is the full code for `TimeDeltaSensorAsync` class I used for to 
reproduce the issue, the only difference is the line with `time.sleep(30)` to 
simulate post-processing event after a trigger has completed.
   ```python
   class TimeDeltaSensorAsync(TimeDeltaSensor):
       """
       A drop-in replacement for TimeDeltaSensor that defers itself to avoid
       taking up a worker slot while it is waiting.
   
       :param delta: time length to wait after the data interval before 
succeeding.
       :type delta: datetime.timedelta
       """
   
       def execute(self, context):
           target_dttm = context['data_interval_end']
           target_dttm += self.delta
           self.defer(trigger=DateTimeTrigger(moment=target_dttm), 
method_name="execute_complete")
   
       def execute_complete(self, context, event=None):  # pylint: 
disable=unused-argument
           """Callback for when the trigger fires - returns immediately."""
           time.sleep(30) # Simulate processing event after trigger completed
           return None
   ```
   
   ### Anything else
   
   I've checked the mark box "I'm willing to submit a PR", but not sure where 
to start, would be happy if someone could help me with the guidance in which 
direction I should look at. 
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to