I digged a bit into the Airflow code and I think I found a possible solution, see draft at [1]: Add a "reschedule" flag to BaseSensorOperator, when set it doesn't sleep but raises an AirflowRescheduleTask exception. Within the TaskInstance this exception is handled, similar to a failure. The task state is set to UP_FOR_RETRY. The task is rescheduled by the scheduler.
Advantages: * Only small code changes are required * Leverage the existing retry mechanism, including delay and exponential backoff Tradeoff: * Overhead by running the task again and again I'd like to ask the community if that is a valueable change to be included into Airflow? If so I'll create a Jira, improve the branch, and send a PR: * Find a solution for timeout and soft_fail * Add tests * Add a task_reschedule table (similar to the task_fail table) Also some open questions: * Should a separate state (e.g. UP_FOR_RESCHEDULE) be used to differentiate between error and intended reschedule? Then proably also different parameters for delay and exponential backoff make sense. * Maybe it's feasable to execute the sensor poke code directly by the scheduler to avoid execution of mini task? But that can be done in a separate change. Kind Regards, Stefan [1] https://github.com/seelmann/incubator-airflow/commit/379bf0fdf9bbb9f26f74ac5fa6325ba5a0e975a2