akshith-yalla commented on issue #36734: URL: https://github.com/apache/airflow/issues/36734#issuecomment-2709624913
Hi maintainers, I’ve retested the behavior described in [this issue](https://github.com/apache/airflow/issues/36734) and observed that the inconsistent sensor timeout behavior persists. Specifically, while sensors in **reschedule mode correctly fail** when the timeout is reached, sensors running in **deferrable mode** are being marked as `UP_FOR_RETRY` instead of **failing immediately**. For clarity, I’ve documented my observations, including reproduction steps, log outputs, and several solution approaches, below: Bug: Inconsistent sensor timeout behavior Reproducible(Yes/No): Yes Below are the same steps that I followed to reproduce the issue, as mentioned in the PR description: A sample DAG to replicate the issue: ``` from datetime import datetime from airflow import models from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor with models.DAG( dag_id='sensor_timeout', start_date=datetime(2018, 10, 31), schedule='0 7 * * 4', catchup=False, max_active_runs=5, ): sensor = S3KeySensor( task_id='sensor_test', aws_conn_id='s3', bucket_name='my-s3-bucket', // s3 bucket name bucket_key='path/to/an/object', // path to object retries=3, wildcard_match=True, poke_interval=2, timeout=10, mode='reschedule', deferrable=False ) sensor_defer = S3KeySensor( task_id='sensor_test_defer', aws_conn_id ='s3', bucket_name='my-s3-bucket', // s3 bucket name bucket_key='path/to/an/object', // path to object retries=3, wildcard_match=True, timeout=10, deferrable=True, ) ``` Status of the DAG runs:  Logs with deferred value set to false:  Logs with deferred value set to true:  Solution Approaches that I have tried: 1. I have adjusted the **timeout logic** to prevent further retries when the run duration exceeds the timeout. ``` if run_duration() > self.timeout: # If sensor is in soft fail mode but times out raise AirflowSkipException. message = ( f"Sensor has timed out; run duration of {run_duration()} seconds exceeds " f"the specified timeout of {self.timeout}." ) # Prevent further retries by setting max_tries equal to the current try number. context["ti"].max_tries = context["ti"].try_number if self.soft_fail: raise AirflowSkipException(message) else: raise AirflowSensorTimeout(message) ``` 2. Modified `resume_execution` to convert deferral-related errors into sensor timeouts. ``` def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] | None, context: Context): try: return super().resume_execution(next_method, next_kwargs, context) except TaskDeferralTimeout as e: raise AirflowSensorTimeout(*e.args) from e except (AirflowException, TaskDeferralError) as e: # Convert any deferral-related error into a sensor timeout to fail the task. raise AirflowSensorTimeout(str(e)) from e ``` 3. Added a check in `base.py` to handle` trigger/execution` timeouts for all sensors. ``` def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] | None, context: Context): try: return super().resume_execution(next_method, next_kwargs, context) except TaskDeferralTimeout as e: raise AirflowSensorTimeout(*e.args) from e except TaskDeferralError as e: # Handle cases where the trigger or execution times out if "timeout" in str(e).lower(): message = f"Sensor has timed out: {e}" if self.soft_fail: raise AirflowSkipException(message) from e else: raise AirflowSensorTimeout(message) from e else: if self.soft_fail: raise AirflowSkipException(str(e)) from e raise except AirflowException as e: if self.soft_fail: raise AirflowSkipException(str(e)) from e raise ``` 4. Further refined `resume_execution` to ensure a non-retryable failure on deferral timeout. ``` def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] | None, context: Context): try: return super().resume_execution(next_method, next_kwargs, context) except TaskDeferralTimeout as e: # Convert deferral timeout to a non-retryable failure message = f"Sensor has timed out: {e}" raise AirflowFailException(message) from e except TaskDeferralError as e: # Handle timeout-related deferral errors if "timeout" in str(e).lower(): message = f"Sensor has timed out: {e}" raise AirflowFailException(message) from e else: # Handle other deferral errors based on configuration if self.soft_fail: raise AirflowSkipException(str(e)) from e raise except AirflowException as e: # Handle other Airflow exceptions if self.soft_fail: raise AirflowSkipException(str(e)) from e raise ``` **PS:** One thing that I observed was that it was eventually going to a `FAILED` state, but after some time and not immediately. Please find the screenshot and the logs below:  --- I believe that ensuring consistent failure behavior across both modes is crucial for reliable sensor operations. I’d appreciate any insights or suggestions on how to move forward with a fix. Thank you for your attention to this matter. --- -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
