cbuffett opened a new issue, #34497:
URL: https://github.com/apache/airflow/issues/34497

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   My DAG has a number of tasks, the first of which is an ExternalTaskSensor. 
This sensor functions correctly when the external DAG exists (normal 
operation/deployment). However, when using `dag.test()` to debug the DAG, the 
ExternalTaskSensor never terminates, rescheduling itself indefinitely. I 
believe this happens because in this situation, the external DAG doesn't exist.
   
   Using `check_existence` isn't an option as this immediately throws an 
exception and terminates the debugger. Using `soft_fail` and/or `silent_fail` 
result in the exception being logged instead of thrown, but the 
ExternalTaskSensor continues to reschedule itself.
   
   After some debugging, what I noticed is that the `start_date` keeps being 
reset to the current time, because `task_reschedules` is always empty
   
   ```
      def execute(self, context: Context) -> Any:
           started_at: datetime.datetime | float
   
           if self.reschedule:
   
               # If reschedule, use the start date of the first try (first try 
can be either the very
               # first execution of the task, or the first execution after the 
task was cleared.)
               first_try_number = context["ti"].max_tries - self.retries + 1
               task_reschedules = TaskReschedule.find_for_task_instance(
                   context["ti"], try_number=first_try_number
               )
               if not task_reschedules:  # This is always empty
                   start_date = timezone.utcnow()
               else:
                   start_date = task_reschedules[0].start_date
               started_at = start_date
   
               def run_duration() -> float:
                   # If we are in reschedule mode, then we have to compute diff
                   # based on the time in a DB, so can't use time.monotonic
                   return (timezone.utcnow() - start_date).total_seconds()
   ```
   
   ### What you think should happen instead
   
   A way to ignore/skip ExternalTaskSensors when using dag.test(). At the very 
least, the ExternalTaskSensor should respect the timeout value provided.
   
   ### How to reproduce
   
   Running a DAG with the following ExternalTaskSensor using `dag.test()`
   ```
       external_task_sensor = ExternalTaskSensor(
           task_id='external_dag_sensor',
           poke_interval=60,
           timeout=300,
           soft_fail=True,
           retries=0,
           external_dag_id=NON_EXISTENT_DAG,
           execution_date_fn=return_date,  # Since the external DAG doesn't 
exist, this function just returns the dt passed in
           allowed_states=[State.SUCCESS],
           failed_states=[State.FAILED],
           mode="reschedule"
       )
   ```
   
   ### Operating System
   
   Ubuntu 22.04
   
   ### Versions of Apache Airflow Providers
   
   ```
   apache-airflow==2.6.1
   apache-airflow-providers-amazon==8.3.1
   apache-airflow-providers-apache-hive==6.0.0
   apache-airflow-providers-cncf-kubernetes==6.1.0
   apache-airflow-providers-common-sql==1.4.0
   apache-airflow-providers-ftp==3.3.1
   apache-airflow-providers-google==10.0.0
   apache-airflow-providers-http==4.3.0
   apache-airflow-providers-imap==3.1.1
   apache-airflow-providers-jdbc==3.3.0
   apache-airflow-providers-microsoft-mssql==3.3.2
   apache-airflow-providers-mysql==5.0.0
   apache-airflow-providers-postgres==5.4.0
   apache-airflow-providers-slack==7.3.1
   apache-airflow-providers-snowflake==4.0.5
   apache-airflow-providers-sqlite==3.3.2
   apache-airflow-providers-ssh==3.6.0
   ```
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   Log entry showing the DAG continuing to reschedule itself well past the 
timeout period
   
   ```
   [2023-09-19T23:04:30.555-0700] {dag.py:3683} INFO - 
*****************************************************
   [2023-09-19T23:04:30.555-0700] {dag.py:3687} INFO - Running task 
external_dag_sensor
   [2023-09-19 23:04:32,082] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19T23:04:32.082-0700] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19 23:04:32,104] {taskinstance.py:1784} INFO - Rescheduling task, 
marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:04:32.104-0700] {taskinstance.py:1784} INFO - Rescheduling 
task, marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:04:32.104-0700] {dag.py:3691} INFO - external_dag_sensor ran 
successfully!
   [2023-09-19T23:04:32.105-0700] {dag.py:3694} INFO - 
*****************************************************
   [2023-09-19T23:05:32.095-0700] {dag.py:3683} INFO - 
*****************************************************
   [2023-09-19T23:05:32.096-0700] {dag.py:3687} INFO - Running task 
external_dag_sensor
   [2023-09-19 23:05:32,995] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19T23:05:32.995-0700] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19 23:05:33,010] {taskinstance.py:1784} INFO - Rescheduling task, 
marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:05:33.010-0700] {taskinstance.py:1784} INFO - Rescheduling 
task, marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:05:33.011-0700] {dag.py:3691} INFO - external_dag_sensor ran 
successfully!
   [2023-09-19T23:05:33.011-0700] {dag.py:3694} INFO - 
*****************************************************
   [2023-09-19T23:06:33.013-0700] {dag.py:3683} INFO - 
*****************************************************
   [2023-09-19T23:06:33.014-0700] {dag.py:3687} INFO - Running task 
external_dag_sensor
   [2023-09-19 23:06:33,921] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19T23:06:33.921-0700] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19 23:06:33,936] {taskinstance.py:1784} INFO - Rescheduling task, 
marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:06:33.936-0700] {taskinstance.py:1784} INFO - Rescheduling 
task, marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:06:33.936-0700] {dag.py:3691} INFO - external_dag_sensor ran 
successfully!
   [2023-09-19T23:06:33.936-0700] {dag.py:3694} INFO - 
*****************************************************
   [2023-09-19T23:07:33.987-0700] {dag.py:3683} INFO - 
*****************************************************
   [2023-09-19T23:07:33.987-0700] {dag.py:3687} INFO - Running task 
external_dag_sensor
   [2023-09-19 23:07:34,871] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19T23:07:34.871-0700] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19 23:07:34,886] {taskinstance.py:1784} INFO - Rescheduling task, 
marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:07:34.886-0700] {taskinstance.py:1784} INFO - Rescheduling 
task, marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:07:34.886-0700] {dag.py:3691} INFO - external_dag_sensor ran 
successfully!
   [2023-09-19T23:07:34.886-0700] {dag.py:3694} INFO - 
*****************************************************
   [2023-09-19T23:08:34.888-0700] {dag.py:3683} INFO - 
*****************************************************
   [2023-09-19T23:08:34.889-0700] {dag.py:3687} INFO - Running task 
external_dag_sensor
   [2023-09-19 23:08:35,784] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19T23:08:35.784-0700] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19 23:08:35,800] {taskinstance.py:1784} INFO - Rescheduling task, 
marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:08:35.800-0700] {taskinstance.py:1784} INFO - Rescheduling 
task, marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:08:35.800-0700] {dag.py:3691} INFO - external_dag_sensor ran 
successfully!
   [2023-09-19T23:08:35.800-0700] {dag.py:3694} INFO - 
*****************************************************
   [2023-09-19T23:09:35.799-0700] {dag.py:3683} INFO - 
*****************************************************
   [2023-09-19T23:09:35.799-0700] {dag.py:3687} INFO - Running task 
external_dag_sensor
   [2023-09-19 23:09:36,706] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19T23:09:36.706-0700] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19 23:09:36,722] {taskinstance.py:1784} INFO - Rescheduling task, 
marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:09:36.722-0700] {taskinstance.py:1784} INFO - Rescheduling 
task, marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:09:36.723-0700] {dag.py:3691} INFO - external_dag_sensor ran 
successfully!
   [2023-09-19T23:09:36.723-0700] {dag.py:3694} INFO - 
*****************************************************
   [2023-09-19T23:10:36.720-0700] {dag.py:3683} INFO - 
*****************************************************
   [2023-09-19T23:10:36.720-0700] {dag.py:3687} INFO - Running task 
external_dag_sensor
   [2023-09-19 23:10:37,606] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19T23:10:37.606-0700] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19 23:10:37,621] {taskinstance.py:1784} INFO - Rescheduling task, 
marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:10:37.621-0700] {taskinstance.py:1784} INFO - Rescheduling 
task, marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:10:37.621-0700] {dag.py:3691} INFO - external_dag_sensor ran 
successfully!
   [2023-09-19T23:10:37.621-0700] {dag.py:3694} INFO - 
*****************************************************
   [2023-09-19T23:11:37.619-0700] {dag.py:3683} INFO - 
*****************************************************
   [2023-09-19T23:11:37.619-0700] {dag.py:3687} INFO - Running task 
external_dag_sensor
   [2023-09-19 23:11:38,516] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19T23:11:38.516-0700] {external_task.py:247} INFO - Poking for DAG 
'external_dag_sensor' on 2023-09-19T23:04:29.501673-07:00 ... 
   [2023-09-19 23:11:38,532] {taskinstance.py:1784} INFO - Rescheduling task, 
marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:11:38.532-0700] {taskinstance.py:1784} INFO - Rescheduling 
task, marking task as UP_FOR_RESCHEDULE
   [2023-09-19T23:11:38.533-0700] {dag.py:3691} INFO - external_dag_sensor ran 
successfully!
   [2023-09-19T23:11:38.534-0700] {dag.py:3694} INFO - 
*****************************************************
   ```
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to