sean-rose opened a new issue, #29049:
URL: https://github.com/apache/airflow/issues/29049

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Using Airflow 2.3.3, when recursively clearing downstream tasks any cleared 
external task sensors in other DAGs which are using reschedule mode will 
instantly fail with an `AirflowSensorTimeout` exception if the previous run is 
older than the sensor's timeout.
   
   ### What you think should happen instead
   
   The recursively cleared external task sensors should run normally, waiting 
for the cleared upstream task to complete, retrying up to the configured number 
of times and within the configured sensor timeout counting from the point in 
time when the sensor was cleared.
   
   ### How to reproduce
   
   1. Load the following DAGs:
       ```python
       from datetime import datetime, timedelta, timezone
       from time import sleep
   
       from airflow.decorators import task
       from airflow.models import DAG
       from airflow.sensors.external_task import ExternalTaskMarker, 
ExternalTaskSensor
       from airflow.utils import timezone
   
       default_args = {
           'start_date': datetime.now(timezone.utc).replace(second=0, 
microsecond=0),
           'retries': 2,
           'retry_delay': timedelta(seconds=10),
       }
   
       with DAG('parent_dag', schedule_interval='* * * * *', catchup=False, 
default_args=default_args) as parent_dag:
           @task(task_id='parent_task')
           def parent_sleep():
               sleep(10)
           parent_task = parent_sleep()
   
           child_dag__wait_for_parent_task = ExternalTaskMarker(
               task_id='child_dag__wait_for_parent_task',
               external_dag_id='child_dag',
               external_task_id='wait_for_parent_task',
           )
           parent_task >> child_dag__wait_for_parent_task
   
       with DAG('child_dag', schedule_interval='* * * * *', catchup=False, 
default_args=default_args) as child_dag:
           wait_for_parent_task = ExternalTaskSensor(
               task_id='wait_for_parent_task',
               external_dag_id='parent_dag',
               external_task_id='parent_task',
               mode='reschedule',
               poke_interval=15,
               timeout=60,
           )
   
           @task(task_id='child_task')
           def child_sleep():
               sleep(10)
           child_task = child_sleep()
           wait_for_parent_task >> child_task
       ```
   2. Enable the `parent_dag` and `child_dag` DAGs and wait for them to 
automatically run (they're scheduled to run every minute).
   3. Wait for at least one additional minute (because the sensor timeout is 
configured to be one minute).
   4. Clear the earliest `parent_dag.parent_task` task instance with the 
"Downstream" and "Recursive" options enabled.
   5. When the cleared `child_dag.wait_for_parent_task` task tries to run it 
will immediately fail with an `AirflowSensorTimeout` exception.
   
   ### Operating System
   
   Debian 10.13
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   This appears to be due to a bug in 
`airflow.models.taskinstance.clear_task_instances()` where [it only increments 
the task instance's `max_tries` property if the task is found in the DAG passed 
in](https://github.com/apache/airflow/blob/2.3.3/airflow/models/taskinstance.py#L219-L223),
 but when recursively clearing tasks that won't work properly for tasks in 
downstream DAGs, because all task instances to be recursively cleared are 
passed to `clear_task_instances()` with [the DAG of the initial task being 
cleared](https://github.com/apache/airflow/blob/2.3.3/airflow/models/dag.py#L1905).
   
   When a cleared task instance for a sensor using reschedule mode doesn't have 
its `max_tries` property incremented that causes the [logic in 
`BaseSensorOperator.execute()`](https://github.com/apache/airflow/blob/2.3.3/airflow/sensors/base.py#L247-L264)
 to incorrectly choose an older `first_try_number` value, calculate the sensor 
run duration as the total time passed since that previous run, and fail with an 
`AirflowSensorTimeout` exception if that inflated run duration exceeds the 
sensor timeout.
   
   While I tested this in Airflow 2.3.3 because that's what my company is 
running, I also looked at the current `main` branch code and this appears to 
still be a problem in the latest version.
   
   IMO the best solution would be to change 
`airflow.models.taskinstance.clear_task_instances()` to make an effort to get 
the associated DAGs for all the task instances being cleared so their 
associated tasks can be read and their `max_tries` property can be incremented 
correctly.
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to