nathadfield opened a new issue, #36734:
URL: https://github.com/apache/airflow/issues/36734

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.7.3
   
   ### What happened?
   
   The behaviour of how sensors react to timeouts seems to be inconsistent when 
running in deferrable mode.  
   
   In the 
[documentation](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#timeouts)
 it states that, for sensors in `reschedule` mode, once the timeout is reached 
the task is failed and does not retry.  This is exactly what occurs when 
running an `S3KeySensor` and, in my opinion, is the correct behaviour for how 
we want to work with sensors.
   
   ```
   [2024-01-11, 14:21:06 UTC] {connection_wrapper.py:378} INFO - AWS Connection 
(conn_id='s3', conn_type='S3') credentials retrieved from login and password.
   [2024-01-11, 14:21:07 UTC] {taskinstance.py:1937} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.11/site-packages/airflow/providers/amazon/aws/sensors/s3.py",
 line 144, in execute
       super().execute(context)
     File "/usr/local/lib/python3.11/site-packages/airflow/sensors/base.py", 
line 274, in execute
       raise AirflowSensorTimeout(message)
   airflow.exceptions.AirflowSensorTimeout: Sensor has timed out; run duration 
of 10.264439 seconds exceeds the specified timeout of 10.0.
   [2024-01-11, 14:21:07 UTC] {taskinstance.py:1400} INFO - Immediate failure 
requested. Marking task as FAILED. dag_id=sensor_timeout, task_id=sensor_test, 
execution_date=20240104T070000, start_date=20240111T142106, 
end_date=20240111T142107 
   ```
   
   However, the same sensor running in deferrable mode sends the task into an 
`UP_FOR_RETRY` state.
   ```
   [2024-01-11, 14:20:59 UTC] {base.py:73} INFO - Using connection ID 's3' for 
task execution.
   [2024-01-11, 14:20:59 UTC] {connection_wrapper.py:378} INFO - AWS Connection 
(conn_id='s3', conn_type='S3') credentials retrieved from login and password.
   [2024-01-11, 14:21:16 UTC] {taskinstance.py:1159} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
sensor_timeout.sensor_test_defer scheduled__2024-01-04T07:00:00+00:00 [queued]>
   [2024-01-11, 14:21:16 UTC] {taskinstance.py:1159} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
sensor_timeout.sensor_test_defer scheduled__2024-01-04T07:00:00+00:00 [queued]>
   [2024-01-11, 14:21:16 UTC] {taskinstance.py:1359} INFO - Resuming after 
deferral
   [2024-01-11, 14:21:16 UTC] {taskinstance.py:1382} INFO - Executing 
<Task(S3KeySensor): sensor_test_defer> on 2024-01-04 07:00:00+00:00
   [2024-01-11, 14:21:16 UTC] {standard_task_runner.py:57} INFO - Started 
process 2301 to run task
   [2024-01-11, 14:21:16 UTC] {standard_task_runner.py:85} INFO - Job 8: 
Subtask sensor_test_defer
   [2024-01-11, 14:21:16 UTC] {task_command.py:416} INFO - Running 
<TaskInstance: sensor_timeout.sensor_test_defer 
scheduled__2024-01-04T07:00:00+00:00 [running]> on host 2403ea01d798
   [2024-01-11, 14:21:16 UTC] {taskinstance.py:1937} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File "/usr/local/lib/python3.11/site-packages/airflow/sensors/base.py", 
line 292, in resume_execution
       return super().resume_execution(next_method, next_kwargs, context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 
1601, in resume_execution
       raise TaskDeferralError(next_kwargs.get("error", "Unknown"))
   airflow.exceptions.TaskDeferralError: Trigger/execution timeout
   [2024-01-11, 14:21:16 UTC] {taskinstance.py:1400} INFO - Marking task as 
UP_FOR_RETRY. dag_id=sensor_timeout, task_id=sensor_test_defer, 
execution_date=20240104T070000, start_date=20240111T142056, 
end_date=20240111T142116
   ```
   
   ### What you think should happen instead?
   
   I believe that the behaviour of how sensors react to timeout should be 
consistent regardless of whether the task is deferred or not and, if the 
timeout is reached, the task should be failed and not put up for retry.
   
   Please not, this is not specifically about the `S3KeySensor` I just used 
this as an example.  The same behaviour is true of any sensor that implements 
deferrable capabilities.
   
   ### How to reproduce
   
   Here is an example DAG that will replicate the problem as I see it.
   
   ```
   from datetime import datetime
   from airflow import models
   from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
   
   with models.DAG(
       dag_id='sensor_timeout',
       start_date=datetime(2018, 10, 31),
       schedule='0 7 * * 4',
       catchup=False,
       max_active_runs=5,
   ):
       sensor = S3KeySensor(
           task_id='sensor_test',
           aws_conn_id='s3',
           bucket_name='my-s3-bucket',
           bucket_key='path/to/an/object',
           retries=3,
           wildcard_match=True,
           poke_interval=2,
           timeout=10,
           mode='reschedule',
           deferrable=False
       )
   
       sensor_defer = S3KeySensor(
           task_id='sensor_test_defer',
           aws_conn_id='s3,
           bucket_name='my-s3-bucket',
           bucket_key='path/to/an/object',
           retries=3,
           wildcard_match=True,
           timeout=10,
           deferrable=True,
       )
   ```
   
   ### Operating System
   
   PRETTY_NAME="Debian GNU/Linux 11 (bullseye)" NAME="Debian GNU/Linux" 
VERSION_ID="11" VERSION="11 (bullseye)" VERSION_CODENAME=bullseye ID=debian 
HOME_URL="https://www.debian.org/"; SUPPORT_URL="https://www.debian.org/support"; 
BUG_REPORT_URL="https://bugs.debian.org/";
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon==8.13.0
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to