cbuffett opened a new issue, #34497:
URL: https://github.com/apache/airflow/issues/34497
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
My DAG has a number of tasks, the first of which is an ExternalTaskSensor.
This sensor functions correctly when the external DAG exists (normal
operation/deployment). However, when using `dag.test()` to debug the DAG, the
ExternalTaskSensor never terminates, rescheduling itself indefinitely. I
believe this happens because in this situation, the external DAG doesn't exist.
Using `check_existence` isn't an option as this immediately throws an
exception and terminates the debugger. Using `soft_fail` and/or `silent_fail`
result in the exception being logged instead of thrown, but the
ExternalTaskSensor continues to reschedule itself.
After some debugging, what I noticed is that the `start_date` keeps being
reset to the current time, because `task_reschedules` is always empty
```
def execute(self, context: Context) -> Any:
started_at: datetime.datetime | float
if self.reschedule:
# If reschedule, use the start date of the first try (first try
can be either the very
# first execution of the task, or the first execution after the
task was cleared.)
first_try_number = context["ti"].max_tries - self.retries + 1
task_reschedules = TaskReschedule.find_for_task_instance(
context["ti"], try_number=first_try_number
)
if not task_reschedules: # This is always empty
start_date = timezone.utcnow()
else:
start_date = task_reschedules[0].start_date
started_at = start_date
def run_duration() -> float:
# If we are in reschedule mode, then we have to compute diff
# based on the time in a DB, so can't use time.monotonic
return (timezone.utcnow() - start_date).total_seconds()
```
### What you think should happen instead
A way to ignore/skip ExternalTaskSensors when using dag.test(). At the very
least, the ExternalTaskSensor should respect the timeout value provided.
### How to reproduce
Running a DAG with the following ExternalTaskSensor using `dag.test()`
```
external_task_sensor = ExternalTaskSensor(
task_id='external_dag_sensor',
poke_interval=60,
timeout=300,
soft_fail=True,
retries=0,
external_dag_id=NON_EXISTENT_DAG,
execution_date_fn=return_date, # Since the external DAG doesn't
exist, this function just returns the dt passed in
allowed_states=[State.SUCCESS],
failed_states=[State.FAILED],
mode="reschedule"
)
```
### Operating System
Ubuntu 22.04
### Versions of Apache Airflow Providers
```
apache-airflow==2.6.1
apache-airflow-providers-amazon==8.3.1
apache-airflow-providers-apache-hive==6.0.0
apache-airflow-providers-cncf-kubernetes==6.1.0
apache-airflow-providers-common-sql==1.4.0
apache-airflow-providers-ftp==3.3.1
apache-airflow-providers-google==10.0.0
apache-airflow-providers-http==4.3.0
apache-airflow-providers-imap==3.1.1
apache-airflow-providers-jdbc==3.3.0
apache-airflow-providers-microsoft-mssql==3.3.2
apache-airflow-providers-mysql==5.0.0
apache-airflow-providers-postgres==5.4.0
apache-airflow-providers-slack==7.3.1
apache-airflow-providers-snowflake==4.0.5
apache-airflow-providers-sqlite==3.3.2
apache-airflow-providers-ssh==3.6.0
```
### Deployment
Other
### Deployment details
_No response_
### Anything else
Log entry showing the DAG continuing to reschedule itself well past the
timeout period
```
[2023-09-19T23:04:30.555-0700] {dag.py:3683} INFO -
*****************************************************
[2023-09-19T23:04:30.555-0700] {dag.py:3687} INFO - Running task
external_dag_sensor
[2023-09-19 23:04:32,082] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19T23:04:32.082-0700] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19 23:04:32,104] {taskinstance.py:1784} INFO - Rescheduling task,
marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:04:32.104-0700] {taskinstance.py:1784} INFO - Rescheduling
task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:04:32.104-0700] {dag.py:3691} INFO - external_dag_sensor ran
successfully!
[2023-09-19T23:04:32.105-0700] {dag.py:3694} INFO -
*****************************************************
[2023-09-19T23:05:32.095-0700] {dag.py:3683} INFO -
*****************************************************
[2023-09-19T23:05:32.096-0700] {dag.py:3687} INFO - Running task
external_dag_sensor
[2023-09-19 23:05:32,995] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19T23:05:32.995-0700] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19 23:05:33,010] {taskinstance.py:1784} INFO - Rescheduling task,
marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:05:33.010-0700] {taskinstance.py:1784} INFO - Rescheduling
task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:05:33.011-0700] {dag.py:3691} INFO - external_dag_sensor ran
successfully!
[2023-09-19T23:05:33.011-0700] {dag.py:3694} INFO -
*****************************************************
[2023-09-19T23:06:33.013-0700] {dag.py:3683} INFO -
*****************************************************
[2023-09-19T23:06:33.014-0700] {dag.py:3687} INFO - Running task
external_dag_sensor
[2023-09-19 23:06:33,921] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19T23:06:33.921-0700] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19 23:06:33,936] {taskinstance.py:1784} INFO - Rescheduling task,
marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:06:33.936-0700] {taskinstance.py:1784} INFO - Rescheduling
task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:06:33.936-0700] {dag.py:3691} INFO - external_dag_sensor ran
successfully!
[2023-09-19T23:06:33.936-0700] {dag.py:3694} INFO -
*****************************************************
[2023-09-19T23:07:33.987-0700] {dag.py:3683} INFO -
*****************************************************
[2023-09-19T23:07:33.987-0700] {dag.py:3687} INFO - Running task
external_dag_sensor
[2023-09-19 23:07:34,871] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19T23:07:34.871-0700] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19 23:07:34,886] {taskinstance.py:1784} INFO - Rescheduling task,
marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:07:34.886-0700] {taskinstance.py:1784} INFO - Rescheduling
task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:07:34.886-0700] {dag.py:3691} INFO - external_dag_sensor ran
successfully!
[2023-09-19T23:07:34.886-0700] {dag.py:3694} INFO -
*****************************************************
[2023-09-19T23:08:34.888-0700] {dag.py:3683} INFO -
*****************************************************
[2023-09-19T23:08:34.889-0700] {dag.py:3687} INFO - Running task
external_dag_sensor
[2023-09-19 23:08:35,784] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19T23:08:35.784-0700] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19 23:08:35,800] {taskinstance.py:1784} INFO - Rescheduling task,
marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:08:35.800-0700] {taskinstance.py:1784} INFO - Rescheduling
task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:08:35.800-0700] {dag.py:3691} INFO - external_dag_sensor ran
successfully!
[2023-09-19T23:08:35.800-0700] {dag.py:3694} INFO -
*****************************************************
[2023-09-19T23:09:35.799-0700] {dag.py:3683} INFO -
*****************************************************
[2023-09-19T23:09:35.799-0700] {dag.py:3687} INFO - Running task
external_dag_sensor
[2023-09-19 23:09:36,706] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19T23:09:36.706-0700] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19 23:09:36,722] {taskinstance.py:1784} INFO - Rescheduling task,
marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:09:36.722-0700] {taskinstance.py:1784} INFO - Rescheduling
task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:09:36.723-0700] {dag.py:3691} INFO - external_dag_sensor ran
successfully!
[2023-09-19T23:09:36.723-0700] {dag.py:3694} INFO -
*****************************************************
[2023-09-19T23:10:36.720-0700] {dag.py:3683} INFO -
*****************************************************
[2023-09-19T23:10:36.720-0700] {dag.py:3687} INFO - Running task
external_dag_sensor
[2023-09-19 23:10:37,606] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19T23:10:37.606-0700] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19 23:10:37,621] {taskinstance.py:1784} INFO - Rescheduling task,
marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:10:37.621-0700] {taskinstance.py:1784} INFO - Rescheduling
task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:10:37.621-0700] {dag.py:3691} INFO - external_dag_sensor ran
successfully!
[2023-09-19T23:10:37.621-0700] {dag.py:3694} INFO -
*****************************************************
[2023-09-19T23:11:37.619-0700] {dag.py:3683} INFO -
*****************************************************
[2023-09-19T23:11:37.619-0700] {dag.py:3687} INFO - Running task
external_dag_sensor
[2023-09-19 23:11:38,516] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19T23:11:38.516-0700] {external_task.py:247} INFO - Poking for DAG
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ...
[2023-09-19 23:11:38,532] {taskinstance.py:1784} INFO - Rescheduling task,
marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:11:38.532-0700] {taskinstance.py:1784} INFO - Rescheduling
task, marking task as UP_FOR_RESCHEDULE
[2023-09-19T23:11:38.533-0700] {dag.py:3691} INFO - external_dag_sensor ran
successfully!
[2023-09-19T23:11:38.534-0700] {dag.py:3694} INFO -
*****************************************************
```
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]