AlexisBRENON opened a new issue, #32512:
URL: https://github.com/apache/airflow/issues/32512
### Apache Airflow version
2.6.2
### What happened
I created a sensor in reschedule mode with a given execution timeout.
However, the sensor never changes to failed test, even after the execution
timeout passed.
### What you think should happen instead
I expect the sensor to be poked a given number of times and then changing to
failed state while the execution timeout elapsed.
### How to reproduce
Define a DAG with a single sensor, checking for an innexistent file.
The sensor should poke every 30s and timeout after 1 minutes.
```python
import datetime
from airflow.models.dag import DAG
from airflow.sensors.filesystem import FileSensor
with DAG(dag_id="foo",
schedule_interval="@once",
start_date=datetime.datetime.now(datetime.timezone.utc) -
datetime.timedelta(days=1),
catchup=False) as dag:
FileSensor(task_id="wait_for_file",
filepath="/tmp/temporary_file_for_testing",
poke_interval=30,
mode="reschedule",
execution_timeout=datetime.timedelta(minutes=1))
```
### Operating System
Arch Linux
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other Docker-based deployment
### Deployment details
```bash
docker run --rm -it -v /tmp/dags:/opt/airflow/dags -p 8080:8080
apache/airflow:2.6.2-python3.11 airflow standalone
```
Similar behavior encountered in my production deployment based on Helm chart.
### Anything else
Logs of the task. You can see that the log covers more than 1 minute.
```
[2023-07-11, 08:26:38 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance: foo.wait_for_file
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
[2023-07-11, 08:26:38 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance: foo.wait_for_file
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
[2023-07-11, 08:26:38 UTC] {taskinstance.py:1308} INFO - Starting attempt 1
of 1
[2023-07-11, 08:26:38 UTC] {taskinstance.py:1327} INFO - Executing
<Task(FileSensor): wait_for_file> on 2023-07-10 08:26:07.521239+00:00
[2023-07-11, 08:26:38 UTC] {standard_task_runner.py:57} INFO - Started
process 55 to run task
[2023-07-11, 08:26:38 UTC] {standard_task_runner.py:84} INFO - Running:
['airflow', 'tasks', 'run', 'foo', 'wait_for_file',
'scheduled__2023-07-10T08:26:07.521239+00:00', '--job-id', '3', '--raw',
'--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpkxpv9y3q']
[2023-07-11, 08:26:38 UTC] {standard_task_runner.py:85} INFO - Job 3:
Subtask wait_for_file
[2023-07-11, 08:26:38 UTC] {task_command.py:410} INFO - Running
<TaskInstance: foo.wait_for_file scheduled__2023-07-10T08:26:07.521239+00:00
[running]> on host 686510c65d84
[2023-07-11, 08:26:38 UTC] {taskinstance.py:1545} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='foo'
AIRFLOW_CTX_TASK_ID='wait_for_file'
AIRFLOW_CTX_EXECUTION_DATE='2023-07-10T08:26:07.521239+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-10T08:26:07.521239+00:00'
[2023-07-11, 08:26:38 UTC] {base.py:73} INFO - Using connection ID
'fs_default' for task execution.
[2023-07-11, 08:26:38 UTC] {filesystem.py:64} INFO - Poking for file
/tmp/temporary_file_for_testing
[2023-07-11, 08:26:38 UTC] {taskinstance.py:1784} INFO - Rescheduling task,
marking task as UP_FOR_RESCHEDULE
[2023-07-11, 08:26:38 UTC] {local_task_job_runner.py:225} INFO - Task exited
with return code 0
[2023-07-11, 08:26:38 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks
scheduled from follow-on schedule check
[2023-07-11, 08:27:10 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance: foo.wait_for_file
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
[2023-07-11, 08:27:10 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance: foo.wait_for_file
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
[2023-07-11, 08:27:10 UTC] {taskinstance.py:1308} INFO - Starting attempt 1
of 1
[2023-07-11, 08:27:11 UTC] {taskinstance.py:1327} INFO - Executing
<Task(FileSensor): wait_for_file> on 2023-07-10 08:26:07.521239+00:00
[2023-07-11, 08:27:11 UTC] {standard_task_runner.py:57} INFO - Started
process 58 to run task
[2023-07-11, 08:27:11 UTC] {standard_task_runner.py:84} INFO - Running:
['airflow', 'tasks', 'run', 'foo', 'wait_for_file',
'scheduled__2023-07-10T08:26:07.521239+00:00', '--job-id', '4', '--raw',
'--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpaq2r2iaf']
[2023-07-11, 08:27:11 UTC] {standard_task_runner.py:85} INFO - Job 4:
Subtask wait_for_file
[2023-07-11, 08:27:11 UTC] {task_command.py:410} INFO - Running
<TaskInstance: foo.wait_for_file scheduled__2023-07-10T08:26:07.521239+00:00
[running]> on host 686510c65d84
[2023-07-11, 08:27:11 UTC] {taskinstance.py:1545} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='foo'
AIRFLOW_CTX_TASK_ID='wait_for_file'
AIRFLOW_CTX_EXECUTION_DATE='2023-07-10T08:26:07.521239+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-10T08:26:07.521239+00:00'
[2023-07-11, 08:27:11 UTC] {base.py:73} INFO - Using connection ID
'fs_default' for task execution.
[2023-07-11, 08:27:11 UTC] {filesystem.py:64} INFO - Poking for file
/tmp/temporary_file_for_testing
[2023-07-11, 08:27:11 UTC] {taskinstance.py:1784} INFO - Rescheduling task,
marking task as UP_FOR_RESCHEDULE
[2023-07-11, 08:27:11 UTC] {local_task_job_runner.py:225} INFO - Task exited
with return code 0
[2023-07-11, 08:27:11 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks
scheduled from follow-on schedule check
[2023-07-11, 08:27:43 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance: foo.wait_for_file
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
[2023-07-11, 08:27:43 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance: foo.wait_for_file
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
[2023-07-11, 08:27:43 UTC] {taskinstance.py:1308} INFO - Starting attempt 1
of 1
[2023-07-11, 08:27:43 UTC] {taskinstance.py:1327} INFO - Executing
<Task(FileSensor): wait_for_file> on 2023-07-10 08:26:07.521239+00:00
[2023-07-11, 08:27:43 UTC] {standard_task_runner.py:57} INFO - Started
process 61 to run task
[2023-07-11, 08:27:43 UTC] {standard_task_runner.py:84} INFO - Running:
['airflow', 'tasks', 'run', 'foo', 'wait_for_file',
'scheduled__2023-07-10T08:26:07.521239+00:00', '--job-id', '5', '--raw',
'--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpn89kv31n']
[2023-07-11, 08:27:43 UTC] {standard_task_runner.py:85} INFO - Job 5:
Subtask wait_for_file
[2023-07-11, 08:27:43 UTC] {task_command.py:410} INFO - Running
<TaskInstance: foo.wait_for_file scheduled__2023-07-10T08:26:07.521239+00:00
[running]> on host 686510c65d84
[2023-07-11, 08:27:44 UTC] {taskinstance.py:1545} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='foo'
AIRFLOW_CTX_TASK_ID='wait_for_file'
AIRFLOW_CTX_EXECUTION_DATE='2023-07-10T08:26:07.521239+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-10T08:26:07.521239+00:00'
[2023-07-11, 08:27:44 UTC] {base.py:73} INFO - Using connection ID
'fs_default' for task execution.
[2023-07-11, 08:27:44 UTC] {filesystem.py:64} INFO - Poking for file
/tmp/temporary_file_for_testing
[2023-07-11, 08:27:44 UTC] {taskinstance.py:1784} INFO - Rescheduling task,
marking task as UP_FOR_RESCHEDULE
[2023-07-11, 08:27:44 UTC] {local_task_job_runner.py:225} INFO - Task exited
with return code 0
[2023-07-11, 08:27:44 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks
scheduled from follow-on schedule check
[2023-07-11, 08:28:16 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance: foo.wait_for_file
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
[2023-07-11, 08:28:16 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance: foo.wait_for_file
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
[2023-07-11, 08:28:16 UTC] {taskinstance.py:1308} INFO - Starting attempt 1
of 1
[2023-07-11, 08:28:16 UTC] {taskinstance.py:1327} INFO - Executing
<Task(FileSensor): wait_for_file> on 2023-07-10 08:26:07.521239+00:00
[2023-07-11, 08:28:16 UTC] {standard_task_runner.py:57} INFO - Started
process 64 to run task
[2023-07-11, 08:28:16 UTC] {standard_task_runner.py:84} INFO - Running:
['airflow', 'tasks', 'run', 'foo', 'wait_for_file',
'scheduled__2023-07-10T08:26:07.521239+00:00', '--job-id', '6', '--raw',
'--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpartoz87c']
[2023-07-11, 08:28:16 UTC] {standard_task_runner.py:85} INFO - Job 6:
Subtask wait_for_file
[2023-07-11, 08:28:16 UTC] {task_command.py:410} INFO - Running
<TaskInstance: foo.wait_for_file scheduled__2023-07-10T08:26:07.521239+00:00
[running]> on host 686510c65d84
[2023-07-11, 08:28:16 UTC] {taskinstance.py:1545} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='foo'
AIRFLOW_CTX_TASK_ID='wait_for_file'
AIRFLOW_CTX_EXECUTION_DATE='2023-07-10T08:26:07.521239+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-10T08:26:07.521239+00:00'
[2023-07-11, 08:28:16 UTC] {base.py:73} INFO - Using connection ID
'fs_default' for task execution.
[2023-07-11, 08:28:16 UTC] {filesystem.py:64} INFO - Poking for file
/tmp/temporary_file_for_testing
[2023-07-11, 08:28:16 UTC] {taskinstance.py:1784} INFO - Rescheduling task,
marking task as UP_FOR_RESCHEDULE
[2023-07-11, 08:28:16 UTC] {local_task_job_runner.py:225} INFO - Task exited
with return code 0
[2023-07-11, 08:28:16 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks
scheduled from follow-on schedule check
[2023-07-11, 08:28:48 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=non-requeueable deps ti=<TaskInstance: foo.wait_for_file
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
[2023-07-11, 08:28:48 UTC] {taskinstance.py:1103} INFO - Dependencies all
met for dep_context=requeueable deps ti=<TaskInstance: foo.wait_for_file
scheduled__2023-07-10T08:26:07.521239+00:00 [queued]>
[2023-07-11, 08:28:48 UTC] {taskinstance.py:1308} INFO - Starting attempt 1
of 1
[2023-07-11, 08:28:48 UTC] {taskinstance.py:1327} INFO - Executing
<Task(FileSensor): wait_for_file> on 2023-07-10 08:26:07.521239+00:00
[2023-07-11, 08:28:48 UTC] {standard_task_runner.py:57} INFO - Started
process 67 to run task
[2023-07-11, 08:28:48 UTC] {standard_task_runner.py:84} INFO - Running:
['airflow', 'tasks', 'run', 'foo', 'wait_for_file',
'scheduled__2023-07-10T08:26:07.521239+00:00', '--job-id', '7', '--raw',
'--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmplriw9lrd']
[2023-07-11, 08:28:48 UTC] {standard_task_runner.py:85} INFO - Job 7:
Subtask wait_for_file
[2023-07-11, 08:28:48 UTC] {task_command.py:410} INFO - Running
<TaskInstance: foo.wait_for_file scheduled__2023-07-10T08:26:07.521239+00:00
[running]> on host 686510c65d84
[2023-07-11, 08:28:48 UTC] {taskinstance.py:1545} INFO - Exporting env vars:
AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='foo'
AIRFLOW_CTX_TASK_ID='wait_for_file'
AIRFLOW_CTX_EXECUTION_DATE='2023-07-10T08:26:07.521239+00:00'
AIRFLOW_CTX_TRY_NUMBER='1'
AIRFLOW_CTX_DAG_RUN_ID='scheduled__2023-07-10T08:26:07.521239+00:00'
[2023-07-11, 08:28:48 UTC] {base.py:73} INFO - Using connection ID
'fs_default' for task execution.
[2023-07-11, 08:28:48 UTC] {filesystem.py:64} INFO - Poking for file
/tmp/temporary_file_for_testing
[2023-07-11, 08:28:48 UTC] {taskinstance.py:1784} INFO - Rescheduling task,
marking task as UP_FOR_RESCHEDULE
[2023-07-11, 08:28:48 UTC] {local_task_job_runner.py:225} INFO - Task exited
with return code 0
[2023-07-11, 08:28:48 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks
scheduled from follow-on schedule check
...
```
### Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]