Should not we make "_clear_stuck_queued_tasks" part of the executor interface simply? And trigger it by the scheduler? I guess that might solve it in a "portable" way without changing the "at-most-once" semantics?
J. On Tue, Mar 15, 2022 at 10:40 PM Andrew Godwin <[email protected]> wrote: > > I agree this needs a core fix in Airflow, but I'd like to highlight that this > is fundamentally changing the executor contract (as it changes tasks from > at-most-once submission to at-least-once) and so not only would it need a > very close level of testing, it would also be some level of breaking change - > since Airflow allows you to plug in third party executors. > > Not that I'm against it, but we'd have to have a bit of a debate about what > level of semver impact it would have. I imagine we could just about justify > it as a minor-level change? > > Andrew > > On Tue, Mar 15, 2022 at 3:33 PM Ping Zhang <[email protected]> wrote: >> >> Hi all, >> >> Currently, the tasks could be stuck at `queued` state and could not be >> scheduled by the scheduler or picked up by the worker. This could happen >> when failure happens after a task is marked by `queued` before the executor >> marks it as `running`. >> >> There is a fix https://github.com/apache/airflow/pull/19769/files for the >> celery executor. However, it only targets the CeleryExecutor and it leaks >> the scheduler responsibility to the executor. >> >> We propose to move the state reconciliation logic to the scheduler. The >> proposed change: >> >> 1. when getting the _executable_task_instances_to_queued >> (https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L287), >> it also includes QUEUED state, like: >> .filter(TI.state.in_([TaskInstanceState.SCHEDULED, >> TaskInstanceState.QUEUED]). In this way, the scheduler can process queued >> tis. >> >> 2. In the queue_command method in the base executor, it conditionally (based >> on last enqueued time by the executor, this is configurable) queues the ti >> even if its state is QUEUED. >> >> This could potentially send the same tasks twice to the executor. Since in >> the worker side, there are condition checks about whether a task can run or >> not. This won't cause issues. >> >> We have been running this in our prod for a while and would love to >> contribute it. >> >> Please let me know your thoughts. >> >> Thanks, >> >> Ping
