chrisranderson opened a new issue, #30289:
URL: https://github.com/apache/airflow/issues/30289

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Airflow Version: 2.2.5
   
   If the first poke of a sensor throws an exception, `timeout` does not work. 
There can be any combination of the poke returning `False` or raising an 
exception after that. My guess is that something is initialized in some 
database incorrectly, because this returns an empty list every time if the 
first poke raises an exception:
   
   ```
   TaskReschedule.find_for_task_instance(
               context["ti"], try_number=first_try_number
           )
   ```
   
   This happens here in the main branch: 
https://github.com/apache/airflow/blob/main/airflow/sensors/base.py#L174-L181
   
   If the first poke returns `False`, I don't see this issue. 
   
   ### What you think should happen instead
   
   The timeout should be respected whether `poke` returns successfully or not. 
   
   A related issue is that if every poke raises an uncaught exception, the 
timeout will never be respected, since the timeout is checked only after a 
successful poke. Maybe both issues can be fixed at once?
   
   ### How to reproduce
   
   Use this code. Run the dag several times, and see if the total duration 
including all retries is greater than the timeout.
   
   ```
   import datetime
   import random
   
   from airflow import DAG
   from airflow.models import TaskReschedule
   from airflow.sensors.base import BaseSensorOperator
   from airflow.utils.context import Context
   
   
   class RandomlyFailSensor(BaseSensorOperator):
       def poke(self, context: Context) -> bool:
           first_try_number = context["ti"].max_tries - self.retries + 1
           task_reschedules = TaskReschedule.find_for_task_instance(
               context["ti"], try_number=first_try_number
           )
           self.log.error(f"\n\nIf this is the first attempt, or first attempt 
failed, "
                          f"this will be empty: \n\t{task_reschedules}\n\n")
   
           if random.random() < .5:
               self.log.error("\n\nIf this was the very first poke, the timeout 
*will not* work.\n\n")
               raise Exception('Failed!')
           else:
               self.log.error("\n\nIf this was the very first poke, the timeout 
*will* work.\n\n")
               return False
   
   
   dag = DAG(
       'sensors_test',
       schedule_interval=None,
       max_active_runs=1,
       catchup=False,
       default_args={
           "owner": "me",
           "depends_on_past": False,
           "start_date": datetime.datetime(2018, 1, 1),
           "email_on_failure": False,
           "email_on_retry": False,
           "execution_timeout": datetime.timedelta(minutes=10),
       }
   )
   
   t_always_fail_sensor = RandomlyFailSensor(
       task_id='random_fail_sensor',
       mode="reschedule",
       poke_interval=1,
       retry_delay=datetime.timedelta(seconds=1),
       timeout=15,
       retries=50,
       dag=dag
   )
   ```
   
   ### Operating System
   
   Debian 11? This Docker image: 
https://hub.docker.com/layers/library/python/3.8.12/images/sha256-60d1cda1542582095795c25bff869b0c615e2a913c4026ed0313ede156b60468?context=explore
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   I use an internal tool that hides the details of the deployment. If there is 
more info that would be helpful for debugging, let me know.
   
   ### Anything else
   
   - This happens every time, based on the conditions I describe above.
   - I'd be happy to submit a PR, but that depends on what my manager says.
   - @yuqian90 might know more about this issue, since they contributed related 
code in [this 
commit](https://github.com/apache/airflow/commit/a0e6a847aa72ddb15bdc147695273fb3aec8839d#diff-62f7d8a52fefdb8e05d4f040c6d3459b4a56fe46976c24f68843dbaeb5a98487R1164).
   
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to