I agree this needs a core fix in Airflow, but I'd like to highlight that
this is fundamentally changing the executor contract (as it changes tasks
from at-most-once submission to at-least-once) and so not only would it
need a very close level of testing, it would also be some level of breaking
change - since Airflow allows you to plug in third party executors.

Not that I'm against it, but we'd have to have a bit of a debate about what
level of semver impact it would have. I imagine we could just about justify
it as a minor-level change?

Andrew

On Tue, Mar 15, 2022 at 3:33 PM Ping Zhang <[email protected]> wrote:

> Hi all,
>
> Currently, the tasks could be stuck at `queued` state and could not be
> scheduled by the scheduler or picked up by the worker. This could happen
> when failure happens after a task is marked by `queued` before the executor
> marks it as `running`.
>
> There is a fix https://github.com/apache/airflow/pull/19769/files for the
> celery executor. However, it only targets the CeleryExecutor and it leaks
> the scheduler responsibility to the executor.
>
> We propose to move the state reconciliation logic to the scheduler. The
> proposed change:
>
> 1. when getting the _executable_task_instances_to_queued (
> https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L287),
> it also includes QUEUED state, like:   .filter(TI.state.in_([
> TaskInstanceState.SCHEDULED, TaskInstanceState.QUEUED]). In this way, the
> scheduler can process queued tis.
>
> 2. In the queue_command method in the base executor, it conditionally
> (based on last enqueued time by the executor, this is configurable) queues
> the ti even if its state is QUEUED.
>
> This could potentially send the same tasks twice to the executor. Since in
> the worker side, there are condition checks about whether a task can run or
> not. This won't cause issues.
>
> We have been running this in our prod for a while and would love to
> contribute it.
>
> Please let me know your thoughts.
>
> Thanks,
>
> Ping
>

Reply via email to