Having thought on this further, I actually think resubmitting into the executor would not be feasible, as it's not possible to tell duplicate task requests from resuming after reschedule/deferred etc.
In these situations, the scheduler will submit the same task instance - with the same try number and all other details the same - multiple times over the course of a small time period, which feels like it is going to be impossible to tell apart from the same task accidentally coming in twice without a lot of extra accounting on the executor side. As for adding more to the interface - I would love to see Airflow formalise the executor interface more rather than start exposing private/underscore-prefixed variables. I think adding a loop that solves this to Airflow executors that need it is probably a cleaner way of handling it - the contract then being "if it's in QUEUED, then it is the executor's job to do something with it" Andrew On Tue, Mar 15, 2022, 16:11 Jarek Potiuk <[email protected]> wrote: > Should not we make "_clear_stuck_queued_tasks" part of the executor > interface simply? And trigger it by the scheduler? I guess that > might solve it in a "portable" way without changing the "at-most-once" > semantics? > > J. > > On Tue, Mar 15, 2022 at 10:40 PM Andrew Godwin > <[email protected]> wrote: > > > > I agree this needs a core fix in Airflow, but I'd like to highlight that > this is fundamentally changing the executor contract (as it changes tasks > from at-most-once submission to at-least-once) and so not only would it > need a very close level of testing, it would also be some level of breaking > change - since Airflow allows you to plug in third party executors. > > > > Not that I'm against it, but we'd have to have a bit of a debate about > what level of semver impact it would have. I imagine we could just about > justify it as a minor-level change? > > > > Andrew > > > > On Tue, Mar 15, 2022 at 3:33 PM Ping Zhang <[email protected]> wrote: > >> > >> Hi all, > >> > >> Currently, the tasks could be stuck at `queued` state and could not be > scheduled by the scheduler or picked up by the worker. This could happen > when failure happens after a task is marked by `queued` before the executor > marks it as `running`. > >> > >> There is a fix https://github.com/apache/airflow/pull/19769/files for > the celery executor. However, it only targets the CeleryExecutor and it > leaks the scheduler responsibility to the executor. > >> > >> We propose to move the state reconciliation logic to the scheduler. The > proposed change: > >> > >> 1. when getting the _executable_task_instances_to_queued ( > https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L287), > it also includes QUEUED state, like: > .filter(TI.state.in_([TaskInstanceState.SCHEDULED, > TaskInstanceState.QUEUED]). In this way, the scheduler can process queued > tis. > >> > >> 2. In the queue_command method in the base executor, it conditionally > (based on last enqueued time by the executor, this is configurable) queues > the ti even if its state is QUEUED. > >> > >> This could potentially send the same tasks twice to the executor. Since > in the worker side, there are condition checks about whether a task can run > or not. This won't cause issues. > >> > >> We have been running this in our prod for a while and would love to > contribute it. > >> > >> Please let me know your thoughts. > >> > >> Thanks, > >> > >> Ping >
