akshith-yalla commented on issue #36734:
URL: https://github.com/apache/airflow/issues/36734#issuecomment-2709624913

   Hi maintainers,
   
   I’ve retested the behavior described in [this 
issue](https://github.com/apache/airflow/issues/36734) and observed that the 
inconsistent sensor timeout behavior persists. Specifically, while sensors in 
**reschedule mode correctly fail** when the timeout is reached, sensors running 
in **deferrable mode** are being marked as `UP_FOR_RETRY` instead of **failing 
immediately**.
   
   For clarity, I’ve documented my observations, including reproduction steps, 
log outputs, and several solution approaches, below:
   
   Bug: Inconsistent sensor timeout behavior
   
   Reproducible(Yes/No): Yes
   
   Below are the same steps that I followed to reproduce the issue, as 
mentioned in the PR description:
   
   A sample DAG to replicate the issue:
   
   ```
   from datetime import datetime
   from airflow import models
   from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
   
   with models.DAG(
       dag_id='sensor_timeout',
       start_date=datetime(2018, 10, 31),
       schedule='0 7 * * 4',
       catchup=False,
       max_active_runs=5,
   ):
       sensor = S3KeySensor(
           task_id='sensor_test',
           aws_conn_id='s3',
           bucket_name='my-s3-bucket', // s3 bucket name
           bucket_key='path/to/an/object', // path to object
           retries=3,
           wildcard_match=True,
           poke_interval=2,
           timeout=10,
           mode='reschedule',
           deferrable=False
       )
   
       sensor_defer = S3KeySensor(
           task_id='sensor_test_defer',
           aws_conn_id ='s3',
           bucket_name='my-s3-bucket', // s3 bucket name
           bucket_key='path/to/an/object', // path to object
           retries=3,
           wildcard_match=True,
           timeout=10,
           deferrable=True,
       )
   ```
   
   
   Status of the DAG runs:
   
   
![Image](https://github.com/user-attachments/assets/428dfcbb-d8d7-41ae-8d49-f9c130a37090)
   
   Logs with deferred value set to false:
   
   
![Image](https://github.com/user-attachments/assets/85accdc6-334f-4282-9053-074c90d6c7b7)
   
   Logs with deferred value set to true:
   
   
![Image](https://github.com/user-attachments/assets/6246c688-d794-4ed3-a7d7-de700bc40b1c)
   
   Solution Approaches that I have tried:
   
   1. I have adjusted the **timeout logic** to prevent further retries when the 
run duration exceeds the timeout.
   ```
   if run_duration() > self.timeout:
       # If sensor is in soft fail mode but times out raise 
AirflowSkipException.
       message = (
           f"Sensor has timed out; run duration of {run_duration()} seconds 
exceeds "
           f"the specified timeout of {self.timeout}."
       )
       # Prevent further retries by setting max_tries equal to the current try 
number.
       context["ti"].max_tries = context["ti"].try_number
       if self.soft_fail:
           raise AirflowSkipException(message)
       else:
           raise AirflowSensorTimeout(message)
   ```
   2. Modified `resume_execution` to convert deferral-related errors into 
sensor timeouts.
   ```
   def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] | 
None, context: Context):
      try:
          return super().resume_execution(next_method, next_kwargs, context)
      except TaskDeferralTimeout as e:
          raise AirflowSensorTimeout(*e.args) from e
      except (AirflowException, TaskDeferralError) as e:
          # Convert any deferral-related error into a sensor timeout to fail 
the task.
          raise AirflowSensorTimeout(str(e)) from e
   ```
   3. Added a check in `base.py` to handle` trigger/execution` timeouts for all 
sensors.
   ```
   def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] | 
None, context: Context):
      try:
          return super().resume_execution(next_method, next_kwargs, context)
      except TaskDeferralTimeout as e:
          raise AirflowSensorTimeout(*e.args) from e
      except TaskDeferralError as e:
          # Handle cases where the trigger or execution times out
          if "timeout" in str(e).lower():
              message = f"Sensor has timed out: {e}"
              if self.soft_fail:
                  raise AirflowSkipException(message) from e
              else:
                  raise AirflowSensorTimeout(message) from e
          else:
              if self.soft_fail:
                  raise AirflowSkipException(str(e)) from e
              raise
      except AirflowException as e:
          if self.soft_fail:
              raise AirflowSkipException(str(e)) from e
          raise
   ```
   4.  Further refined `resume_execution` to ensure a non-retryable failure on 
deferral timeout.
   ```
   def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] | 
None, context: Context):
      try:
          return super().resume_execution(next_method, next_kwargs, context)
      except TaskDeferralTimeout as e:
          # Convert deferral timeout to a non-retryable failure
          message = f"Sensor has timed out: {e}"
          raise AirflowFailException(message) from e
      except TaskDeferralError as e:
          # Handle timeout-related deferral errors
          if "timeout" in str(e).lower():
              message = f"Sensor has timed out: {e}"
              raise AirflowFailException(message) from e
          else:
              # Handle other deferral errors based on configuration
              if self.soft_fail:
                  raise AirflowSkipException(str(e)) from e
              raise
      except AirflowException as e:
          # Handle other Airflow exceptions
          if self.soft_fail:
              raise AirflowSkipException(str(e)) from e
          raise
   ```
   
   **PS:** One thing that I observed was that it was eventually going to a 
`FAILED` state, but after some time and not immediately.
   
    Please find the screenshot and the logs below:
   
   
![Image](https://github.com/user-attachments/assets/81750619-83a8-4db6-8cb6-e5857989066e)
   
   ---
   I believe that ensuring consistent failure behavior across both modes is 
crucial for reliable sensor operations. I’d appreciate any insights or 
suggestions on how to move forward with a fix.
   
   Thank you for your attention to this matter.
   
   ---


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to