AlexisBRENON opened a new issue, #32512:
URL: https://github.com/apache/airflow/issues/32512

   ### Apache Airflow version
   
   2.6.2
   
   ### What happened
   
   I created a sensor in reschedule mode with a given execution timeout.
   However, the sensor never changes to failed test, even after the execution 
timeout passed.
   
   ### What you think should happen instead
   
   I expect the sensor to be poked a given number of times and then changing to 
failed state while the execution timeout elapsed.
   
   ### How to reproduce
   
   Define a DAG with a single sensor, checking for an innexistent file.
   The sensor should poke every 30s and timeout after 1 minutes.
   
   ```python
   import datetime
   from airflow.models.dag import DAG
   from airflow.sensors.filesystem import FileSensor
   
   with DAG(dag_id="foo",
            schedule_interval="@once",
            start_date=datetime.datetime.now(datetime.timezone.utc) - 
datetime.timedelta(days=1),
            catchup=False) as dag:
       FileSensor(task_id="wait_for_file",
                  filepath="/tmp/temporary_file_for_testing",
                  poke_interval=30,
                  mode="reschedule",
                  execution_timeout=datetime.timedelta(minutes=1))
   ```
   
   ### Operating System
   
   Arch Linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other Docker-based deployment
   
   ### Deployment details
   
   ```bash
   docker run --rm -it -v /tmp/dags:/opt/airflow/dags -p 8080:8080 
apache/airflow:2.6.2-python3.11 airflow standalone
   ```
   
   Similar behavior encountered in my production deployment based on Helm chart.
   
   ### Anything else
   
   Logs of the task. You can see that the log covers more than 1 minute.
   ```
   [2023-07-11, 08:26:38 UTC] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: foo.wait_for_file 
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
   [2023-07-11, 08:26:38 UTC] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: foo.wait_for_file 
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
   [2023-07-11, 08:26:38 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 
of 1
   [2023-07-11, 08:26:38 UTC] {taskinstance.py:1327} INFO - Executing 
<Task(FileSensor): wait_for_file> on 2023-07-10 08:26:07.521239+00:00
   [2023-07-11, 08:26:38 UTC] {standard_task_runner.py:57} INFO - Started 
process 55 to run task
   [2023-07-11, 08:26:38 UTC] {standard_task_runner.py:84} INFO - Running: 
['airflow', 'tasks', 'run', 'foo', 'wait_for_file', 
'scheduled__2023-07-10T08:26:07.521239+00:00', '--job-id', '3', '--raw', 
'--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpkxpv9y3q']
   [2023-07-11, 08:26:38 UTC] {standard_task_runner.py:85} INFO - Job 3: 
Subtask wait_for_file
   [2023-07-11, 08:26:38 UTC] {task_command.py:410} INFO - Running 
<TaskInstance: foo.wait_for_file scheduled__2023-07-10T08:26:07.521239+00:00 
[running]> on host 686510c65d84
   [2023-07-11, 08:26:38 UTC] {taskinstance.py:1545} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='foo' 
AIRFLOW_CTX_TASK_ID='wait_for_file' 
AIRFLOW_CTX_EXECUTION_DATE='2023-07-10T08:26:07.521239+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-10T08:26:07.521239+00:00'
   [2023-07-11, 08:26:38 UTC] {base.py:73} INFO - Using connection ID 
'fs_default' for task execution.
   [2023-07-11, 08:26:38 UTC] {filesystem.py:64} INFO - Poking for file 
/tmp/temporary_file_for_testing
   [2023-07-11, 08:26:38 UTC] {taskinstance.py:1784} INFO - Rescheduling task, 
marking task as UP_FOR_RESCHEDULE
   [2023-07-11, 08:26:38 UTC] {local_task_job_runner.py:225} INFO - Task exited 
with return code 0
   [2023-07-11, 08:26:38 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   [2023-07-11, 08:27:10 UTC] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: foo.wait_for_file 
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
   [2023-07-11, 08:27:10 UTC] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: foo.wait_for_file 
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
   [2023-07-11, 08:27:10 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 
of 1
   [2023-07-11, 08:27:11 UTC] {taskinstance.py:1327} INFO - Executing 
<Task(FileSensor): wait_for_file> on 2023-07-10 08:26:07.521239+00:00
   [2023-07-11, 08:27:11 UTC] {standard_task_runner.py:57} INFO - Started 
process 58 to run task
   [2023-07-11, 08:27:11 UTC] {standard_task_runner.py:84} INFO - Running: 
['airflow', 'tasks', 'run', 'foo', 'wait_for_file', 
'scheduled__2023-07-10T08:26:07.521239+00:00', '--job-id', '4', '--raw', 
'--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpaq2r2iaf']
   [2023-07-11, 08:27:11 UTC] {standard_task_runner.py:85} INFO - Job 4: 
Subtask wait_for_file
   [2023-07-11, 08:27:11 UTC] {task_command.py:410} INFO - Running 
<TaskInstance: foo.wait_for_file scheduled__2023-07-10T08:26:07.521239+00:00 
[running]> on host 686510c65d84
   [2023-07-11, 08:27:11 UTC] {taskinstance.py:1545} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='foo' 
AIRFLOW_CTX_TASK_ID='wait_for_file' 
AIRFLOW_CTX_EXECUTION_DATE='2023-07-10T08:26:07.521239+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-10T08:26:07.521239+00:00'
   [2023-07-11, 08:27:11 UTC] {base.py:73} INFO - Using connection ID 
'fs_default' for task execution.
   [2023-07-11, 08:27:11 UTC] {filesystem.py:64} INFO - Poking for file 
/tmp/temporary_file_for_testing
   [2023-07-11, 08:27:11 UTC] {taskinstance.py:1784} INFO - Rescheduling task, 
marking task as UP_FOR_RESCHEDULE
   [2023-07-11, 08:27:11 UTC] {local_task_job_runner.py:225} INFO - Task exited 
with return code 0
   [2023-07-11, 08:27:11 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   [2023-07-11, 08:27:43 UTC] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: foo.wait_for_file 
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
   [2023-07-11, 08:27:43 UTC] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: foo.wait_for_file 
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
   [2023-07-11, 08:27:43 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 
of 1
   [2023-07-11, 08:27:43 UTC] {taskinstance.py:1327} INFO - Executing 
<Task(FileSensor): wait_for_file> on 2023-07-10 08:26:07.521239+00:00
   [2023-07-11, 08:27:43 UTC] {standard_task_runner.py:57} INFO - Started 
process 61 to run task
   [2023-07-11, 08:27:43 UTC] {standard_task_runner.py:84} INFO - Running: 
['airflow', 'tasks', 'run', 'foo', 'wait_for_file', 
'scheduled__2023-07-10T08:26:07.521239+00:00', '--job-id', '5', '--raw', 
'--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpn89kv31n']
   [2023-07-11, 08:27:43 UTC] {standard_task_runner.py:85} INFO - Job 5: 
Subtask wait_for_file
   [2023-07-11, 08:27:43 UTC] {task_command.py:410} INFO - Running 
<TaskInstance: foo.wait_for_file scheduled__2023-07-10T08:26:07.521239+00:00 
[running]> on host 686510c65d84
   [2023-07-11, 08:27:44 UTC] {taskinstance.py:1545} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='foo' 
AIRFLOW_CTX_TASK_ID='wait_for_file' 
AIRFLOW_CTX_EXECUTION_DATE='2023-07-10T08:26:07.521239+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-10T08:26:07.521239+00:00'
   [2023-07-11, 08:27:44 UTC] {base.py:73} INFO - Using connection ID 
'fs_default' for task execution.
   [2023-07-11, 08:27:44 UTC] {filesystem.py:64} INFO - Poking for file 
/tmp/temporary_file_for_testing
   [2023-07-11, 08:27:44 UTC] {taskinstance.py:1784} INFO - Rescheduling task, 
marking task as UP_FOR_RESCHEDULE
   [2023-07-11, 08:27:44 UTC] {local_task_job_runner.py:225} INFO - Task exited 
with return code 0
   [2023-07-11, 08:27:44 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   [2023-07-11, 08:28:16 UTC] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: foo.wait_for_file 
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
   [2023-07-11, 08:28:16 UTC] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: foo.wait_for_file 
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
   [2023-07-11, 08:28:16 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 
of 1
   [2023-07-11, 08:28:16 UTC] {taskinstance.py:1327} INFO - Executing 
<Task(FileSensor): wait_for_file> on 2023-07-10 08:26:07.521239+00:00
   [2023-07-11, 08:28:16 UTC] {standard_task_runner.py:57} INFO - Started 
process 64 to run task
   [2023-07-11, 08:28:16 UTC] {standard_task_runner.py:84} INFO - Running: 
['airflow', 'tasks', 'run', 'foo', 'wait_for_file', 
'scheduled__2023-07-10T08:26:07.521239+00:00', '--job-id', '6', '--raw', 
'--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpartoz87c']
   [2023-07-11, 08:28:16 UTC] {standard_task_runner.py:85} INFO - Job 6: 
Subtask wait_for_file
   [2023-07-11, 08:28:16 UTC] {task_command.py:410} INFO - Running 
<TaskInstance: foo.wait_for_file scheduled__2023-07-10T08:26:07.521239+00:00 
[running]> on host 686510c65d84
   [2023-07-11, 08:28:16 UTC] {taskinstance.py:1545} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='foo' 
AIRFLOW_CTX_TASK_ID='wait_for_file' 
AIRFLOW_CTX_EXECUTION_DATE='2023-07-10T08:26:07.521239+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-10T08:26:07.521239+00:00'
   [2023-07-11, 08:28:16 UTC] {base.py:73} INFO - Using connection ID 
'fs_default' for task execution.
   [2023-07-11, 08:28:16 UTC] {filesystem.py:64} INFO - Poking for file 
/tmp/temporary_file_for_testing
   [2023-07-11, 08:28:16 UTC] {taskinstance.py:1784} INFO - Rescheduling task, 
marking task as UP_FOR_RESCHEDULE
   [2023-07-11, 08:28:16 UTC] {local_task_job_runner.py:225} INFO - Task exited 
with return code 0
   [2023-07-11, 08:28:16 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   [2023-07-11, 08:28:48 UTC] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: foo.wait_for_file 
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
   [2023-07-11, 08:28:48 UTC] {taskinstance.py:1103} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: foo.wait_for_file 
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
   [2023-07-11, 08:28:48 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 
of 1
   [2023-07-11, 08:28:48 UTC] {taskinstance.py:1327} INFO - Executing 
<Task(FileSensor): wait_for_file> on 2023-07-10 08:26:07.521239+00:00
   [2023-07-11, 08:28:48 UTC] {standard_task_runner.py:57} INFO - Started 
process 67 to run task
   [2023-07-11, 08:28:48 UTC] {standard_task_runner.py:84} INFO - Running: 
['airflow', 'tasks', 'run', 'foo', 'wait_for_file', 
'scheduled__2023-07-10T08:26:07.521239+00:00', '--job-id', '7', '--raw', 
'--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmplriw9lrd']
   [2023-07-11, 08:28:48 UTC] {standard_task_runner.py:85} INFO - Job 7: 
Subtask wait_for_file
   [2023-07-11, 08:28:48 UTC] {task_command.py:410} INFO - Running 
<TaskInstance: foo.wait_for_file scheduled__2023-07-10T08:26:07.521239+00:00 
[running]> on host 686510c65d84
   [2023-07-11, 08:28:48 UTC] {taskinstance.py:1545} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='foo' 
AIRFLOW_CTX_TASK_ID='wait_for_file' 
AIRFLOW_CTX_EXECUTION_DATE='2023-07-10T08:26:07.521239+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-10T08:26:07.521239+00:00'
   [2023-07-11, 08:28:48 UTC] {base.py:73} INFO - Using connection ID 
'fs_default' for task execution.
   [2023-07-11, 08:28:48 UTC] {filesystem.py:64} INFO - Poking for file 
/tmp/temporary_file_for_testing
   [2023-07-11, 08:28:48 UTC] {taskinstance.py:1784} INFO - Rescheduling task, 
marking task as UP_FOR_RESCHEDULE
   [2023-07-11, 08:28:48 UTC] {local_task_job_runner.py:225} INFO - Task exited 
with return code 0
   [2023-07-11, 08:28:48 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ...
   ```
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to