chrisranderson opened a new issue, #30289:
URL: https://github.com/apache/airflow/issues/30289
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
Airflow Version: 2.2.5
If the first poke of a sensor throws an exception, `timeout` does not work.
There can be any combination of the poke returning `False` or raising an
exception after that. My guess is that something is initialized in some
database incorrectly, because this returns an empty list every time if the
first poke raises an exception:
```
TaskReschedule.find_for_task_instance(
context["ti"], try_number=first_try_number
)
```
This happens here in the main branch:
https://github.com/apache/airflow/blob/main/airflow/sensors/base.py#L174-L181
If the first poke returns `False`, I don't see this issue.
### What you think should happen instead
The timeout should be respected whether `poke` returns successfully or not.
A related issue is that if every poke raises an uncaught exception, the
timeout will never be respected, since the timeout is checked only after a
successful poke. Maybe both issues can be fixed at once?
### How to reproduce
Use this code. Run the dag several times, and see if the total duration
including all retries is greater than the timeout.
```
import datetime
import random
from airflow import DAG
from airflow.models import TaskReschedule
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.context import Context
class RandomlyFailSensor(BaseSensorOperator):
def poke(self, context: Context) -> bool:
first_try_number = context["ti"].max_tries - self.retries + 1
task_reschedules = TaskReschedule.find_for_task_instance(
context["ti"], try_number=first_try_number
)
self.log.error(f"\n\nIf this is the first attempt, or first attempt
failed, "
f"this will be empty: \n\t{task_reschedules}\n\n")
if random.random() < .5:
self.log.error("\n\nIf this was the very first poke, the timeout
*will not* work.\n\n")
raise Exception('Failed!')
else:
self.log.error("\n\nIf this was the very first poke, the timeout
*will* work.\n\n")
return False
dag = DAG(
'sensors_test',
schedule_interval=None,
max_active_runs=1,
catchup=False,
default_args={
"owner": "me",
"depends_on_past": False,
"start_date": datetime.datetime(2018, 1, 1),
"email_on_failure": False,
"email_on_retry": False,
"execution_timeout": datetime.timedelta(minutes=10),
}
)
t_always_fail_sensor = RandomlyFailSensor(
task_id='random_fail_sensor',
mode="reschedule",
poke_interval=1,
retry_delay=datetime.timedelta(seconds=1),
timeout=15,
retries=50,
dag=dag
)
```
### Operating System
Debian 11? This Docker image:
https://hub.docker.com/layers/library/python/3.8.12/images/sha256-60d1cda1542582095795c25bff869b0c615e2a913c4026ed0313ede156b60468?context=explore
### Versions of Apache Airflow Providers
_No response_
### Deployment
Docker-Compose
### Deployment details
I use an internal tool that hides the details of the deployment. If there is
more info that would be helpful for debugging, let me know.
### Anything else
- This happens every time, based on the conditions I describe above.
- I'd be happy to submit a PR, but that depends on what my manager says.
- @yuqian90 might know more about this issue, since they contributed related
code in [this
commit](https://github.com/apache/airflow/commit/a0e6a847aa72ddb15bdc147695273fb3aec8839d#diff-62f7d8a52fefdb8e05d4f040c6d3459b4a56fe46976c24f68843dbaeb5a98487R1164).
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]