nathadfield opened a new issue, #36734: URL: https://github.com/apache/airflow/issues/36734
### Apache Airflow version Other Airflow 2 version (please specify below) ### If "Other Airflow 2 version" selected, which one? 2.7.3 ### What happened? The behaviour of how sensors react to timeouts seems to be inconsistent when running in deferrable mode. In the [documentation](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#timeouts) it states that, for sensors in `reschedule` mode, once the timeout is reached the task is failed and does not retry. This is exactly what occurs when running an `S3KeySensor` and, in my opinion, is the correct behaviour for how we want to work with sensors. ``` [2024-01-11, 14:21:06 UTC] {connection_wrapper.py:378} INFO - AWS Connection (conn_id='s3', conn_type='S3') credentials retrieved from login and password. [2024-01-11, 14:21:07 UTC] {taskinstance.py:1937} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/providers/amazon/aws/sensors/s3.py", line 144, in execute super().execute(context) File "/usr/local/lib/python3.11/site-packages/airflow/sensors/base.py", line 274, in execute raise AirflowSensorTimeout(message) airflow.exceptions.AirflowSensorTimeout: Sensor has timed out; run duration of 10.264439 seconds exceeds the specified timeout of 10.0. [2024-01-11, 14:21:07 UTC] {taskinstance.py:1400} INFO - Immediate failure requested. Marking task as FAILED. dag_id=sensor_timeout, task_id=sensor_test, execution_date=20240104T070000, start_date=20240111T142106, end_date=20240111T142107 ``` However, the same sensor running in deferrable mode sends the task into an `UP_FOR_RETRY` state. ``` [2024-01-11, 14:20:59 UTC] {base.py:73} INFO - Using connection ID 's3' for task execution. [2024-01-11, 14:20:59 UTC] {connection_wrapper.py:378} INFO - AWS Connection (conn_id='s3', conn_type='S3') credentials retrieved from login and password. [2024-01-11, 14:21:16 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: sensor_timeout.sensor_test_defer scheduled__2024-01-04T07:00:00+00:00 [queued]> [2024-01-11, 14:21:16 UTC] {taskinstance.py:1159} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: sensor_timeout.sensor_test_defer scheduled__2024-01-04T07:00:00+00:00 [queued]> [2024-01-11, 14:21:16 UTC] {taskinstance.py:1359} INFO - Resuming after deferral [2024-01-11, 14:21:16 UTC] {taskinstance.py:1382} INFO - Executing <Task(S3KeySensor): sensor_test_defer> on 2024-01-04 07:00:00+00:00 [2024-01-11, 14:21:16 UTC] {standard_task_runner.py:57} INFO - Started process 2301 to run task [2024-01-11, 14:21:16 UTC] {standard_task_runner.py:85} INFO - Job 8: Subtask sensor_test_defer [2024-01-11, 14:21:16 UTC] {task_command.py:416} INFO - Running <TaskInstance: sensor_timeout.sensor_test_defer scheduled__2024-01-04T07:00:00+00:00 [running]> on host 2403ea01d798 [2024-01-11, 14:21:16 UTC] {taskinstance.py:1937} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/lib/python3.11/site-packages/airflow/sensors/base.py", line 292, in resume_execution return super().resume_execution(next_method, next_kwargs, context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1601, in resume_execution raise TaskDeferralError(next_kwargs.get("error", "Unknown")) airflow.exceptions.TaskDeferralError: Trigger/execution timeout [2024-01-11, 14:21:16 UTC] {taskinstance.py:1400} INFO - Marking task as UP_FOR_RETRY. dag_id=sensor_timeout, task_id=sensor_test_defer, execution_date=20240104T070000, start_date=20240111T142056, end_date=20240111T142116 ``` ### What you think should happen instead? I believe that the behaviour of how sensors react to timeout should be consistent regardless of whether the task is deferred or not and, if the timeout is reached, the task should be failed and not put up for retry. Please not, this is not specifically about the `S3KeySensor` I just used this as an example. The same behaviour is true of any sensor that implements deferrable capabilities. ### How to reproduce Here is an example DAG that will replicate the problem as I see it. ``` from datetime import datetime from airflow import models from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor with models.DAG( dag_id='sensor_timeout', start_date=datetime(2018, 10, 31), schedule='0 7 * * 4', catchup=False, max_active_runs=5, ): sensor = S3KeySensor( task_id='sensor_test', aws_conn_id='s3', bucket_name='my-s3-bucket', bucket_key='path/to/an/object', retries=3, wildcard_match=True, poke_interval=2, timeout=10, mode='reschedule', deferrable=False ) sensor_defer = S3KeySensor( task_id='sensor_test_defer', aws_conn_id='s3, bucket_name='my-s3-bucket', bucket_key='path/to/an/object', retries=3, wildcard_match=True, timeout=10, deferrable=True, ) ``` ### Operating System PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian HOME_URL="https://www.debian.org/" SUPPORT_URL="https://www.debian.org/support" BUG_REPORT_URL="https://bugs.debian.org/" ### Versions of Apache Airflow Providers apache-airflow-providers-amazon==8.13.0 ### Deployment Astronomer ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
