sean-rose opened a new issue, #29049:
URL: https://github.com/apache/airflow/issues/29049
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
Using Airflow 2.3.3, when recursively clearing downstream tasks any cleared
external task sensors in other DAGs which are using reschedule mode will
instantly fail with an `AirflowSensorTimeout` exception if the previous run is
older than the sensor's timeout.
### What you think should happen instead
The recursively cleared external task sensors should run normally, waiting
for the cleared upstream task to complete, retrying up to the configured number
of times and within the configured sensor timeout counting from the point in
time when the sensor was cleared.
### How to reproduce
1. Load the following DAGs:
```python
from datetime import datetime, timedelta, timezone
from time import sleep
from airflow.decorators import task
from airflow.models import DAG
from airflow.sensors.external_task import ExternalTaskMarker,
ExternalTaskSensor
from airflow.utils import timezone
default_args = {
'start_date': datetime.now(timezone.utc).replace(second=0,
microsecond=0),
'retries': 2,
'retry_delay': timedelta(seconds=10),
}
with DAG('parent_dag', schedule_interval='* * * * *', catchup=False,
default_args=default_args) as parent_dag:
@task(task_id='parent_task')
def parent_sleep():
sleep(10)
parent_task = parent_sleep()
child_dag__wait_for_parent_task = ExternalTaskMarker(
task_id='child_dag__wait_for_parent_task',
external_dag_id='child_dag',
external_task_id='wait_for_parent_task',
)
parent_task >> child_dag__wait_for_parent_task
with DAG('child_dag', schedule_interval='* * * * *', catchup=False,
default_args=default_args) as child_dag:
wait_for_parent_task = ExternalTaskSensor(
task_id='wait_for_parent_task',
external_dag_id='parent_dag',
external_task_id='parent_task',
mode='reschedule',
poke_interval=15,
timeout=60,
)
@task(task_id='child_task')
def child_sleep():
sleep(10)
child_task = child_sleep()
wait_for_parent_task >> child_task
```
2. Enable the `parent_dag` and `child_dag` DAGs and wait for them to
automatically run (they're scheduled to run every minute).
3. Wait for at least one additional minute (because the sensor timeout is
configured to be one minute).
4. Clear the earliest `parent_dag.parent_task` task instance with the
"Downstream" and "Recursive" options enabled.
5. When the cleared `child_dag.wait_for_parent_task` task tries to run it
will immediately fail with an `AirflowSensorTimeout` exception.
### Operating System
Debian 10.13
### Versions of Apache Airflow Providers
_No response_
### Deployment
Docker-Compose
### Deployment details
_No response_
### Anything else
This appears to be due to a bug in
`airflow.models.taskinstance.clear_task_instances()` where [it only increments
the task instance's `max_tries` property if the task is found in the DAG passed
in](https://github.com/apache/airflow/blob/2.3.3/airflow/models/taskinstance.py#L219-L223),
but when recursively clearing tasks that won't work properly for tasks in
downstream DAGs, because all task instances to be recursively cleared are
passed to `clear_task_instances()` with [the DAG of the initial task being
cleared](https://github.com/apache/airflow/blob/2.3.3/airflow/models/dag.py#L1905).
When a cleared task instance for a sensor using reschedule mode doesn't have
its `max_tries` property incremented that causes the [logic in
`BaseSensorOperator.execute()`](https://github.com/apache/airflow/blob/2.3.3/airflow/sensors/base.py#L247-L264)
to incorrectly choose an older `first_try_number` value, calculate the sensor
run duration as the total time passed since that previous run, and fail with an
`AirflowSensorTimeout` exception if that inflated run duration exceeds the
sensor timeout.
While I tested this in Airflow 2.3.3 because that's what my company is
running, I also looked at the current `main` branch code and this appears to
still be a problem in the latest version.
IMO the best solution would be to change
`airflow.models.taskinstance.clear_task_instances()` to make an effort to get
the associated DAGs for all the task instances being cleared so their
associated tasks can be read and their `max_tries` property can be incremented
correctly.
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]