Having thought on this further, I actually think resubmitting into the
executor would not be feasible, as it's not possible to tell duplicate task
requests from resuming after reschedule/deferred etc.

In these situations, the scheduler will submit the same task instance -
with the same try number and all other details the same - multiple times
over the course of a small time period, which feels like it is going to be
impossible to tell apart from the same task accidentally coming in twice
without a lot of extra accounting on the executor side.

As for adding more to the interface - I would love to see Airflow formalise
the executor interface more rather than start exposing
private/underscore-prefixed variables. I think adding a loop that solves
this to Airflow executors that need it is probably a cleaner way of
handling it - the contract then being "if it's in QUEUED, then it is the
executor's job to do something with it"

Andrew


On Tue, Mar 15, 2022, 16:11 Jarek Potiuk <[email protected]> wrote:

> Should not we make "_clear_stuck_queued_tasks" part of the executor
> interface simply?  And trigger it by the scheduler?  I guess that
> might solve it in a "portable" way without changing the "at-most-once"
> semantics?
>
> J.
>
> On Tue, Mar 15, 2022 at 10:40 PM Andrew Godwin
> <[email protected]> wrote:
> >
> > I agree this needs a core fix in Airflow, but I'd like to highlight that
> this is fundamentally changing the executor contract (as it changes tasks
> from at-most-once submission to at-least-once) and so not only would it
> need a very close level of testing, it would also be some level of breaking
> change - since Airflow allows you to plug in third party executors.
> >
> > Not that I'm against it, but we'd have to have a bit of a debate about
> what level of semver impact it would have. I imagine we could just about
> justify it as a minor-level change?
> >
> > Andrew
> >
> > On Tue, Mar 15, 2022 at 3:33 PM Ping Zhang <[email protected]> wrote:
> >>
> >> Hi all,
> >>
> >> Currently, the tasks could be stuck at `queued` state and could not be
> scheduled by the scheduler or picked up by the worker. This could happen
> when failure happens after a task is marked by `queued` before the executor
> marks it as `running`.
> >>
> >> There is a fix https://github.com/apache/airflow/pull/19769/files for
> the celery executor. However, it only targets the CeleryExecutor and it
> leaks the scheduler responsibility to the executor.
> >>
> >> We propose to move the state reconciliation logic to the scheduler. The
> proposed change:
> >>
> >> 1. when getting the _executable_task_instances_to_queued (
> https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L287),
> it also includes QUEUED state, like:
>  .filter(TI.state.in_([TaskInstanceState.SCHEDULED,
> TaskInstanceState.QUEUED]). In this way, the scheduler can process queued
> tis.
> >>
> >> 2. In the queue_command method in the base executor, it conditionally
> (based on last enqueued time by the executor, this is configurable) queues
> the ti even if its state is QUEUED.
> >>
> >> This could potentially send the same tasks twice to the executor. Since
> in the worker side, there are condition checks about whether a task can run
> or not. This won't cause issues.
> >>
> >> We have been running this in our prod for a while and would love to
> contribute it.
> >>
> >> Please let me know your thoughts.
> >>
> >> Thanks,
> >>
> >> Ping
>

Reply via email to