Hi Andrew and Jarek,

Thanks for the comment.

Airflow is currently designed to be more resilient with at-least-once task
delivery instead of at-most-once, as it already has singleton execution
guarantee. In fact for all CeleryExecutor users, we are already using
at-most-once delivery given the message redelivery of Celery.

Andrew, for your concern about the "multiple times over the course of a
small time period,", this won't be an issue since there is a throttle
control, only re-sending queued tasks to the executor if it is queued for a
period of time (configurable). I may miss some context. Why does the
executor need to treat task requests from the reschedule/deferred
differently?

To Jarek's point, I agree that we can also have a method
('_clear_stuck_queued_tasks', which can be renamed to something else with
more  formalized the executor interface)  in the BaseExecutor to simply
handle the case and be invoked by the scheduler.

So the revised plan will be:

1. have a method defined in the BaseExecutor to clear stuck queued tasks
for a period of time\
    1.1. rewind the queued state in the ti to scheduled state
    1.2. clear some in memory states (for example, self.running) in the
executor.
2. scheduler invokes that method periodically.


Please let me know your thoughts.

Thanks,

Ping


On Wed, Mar 16, 2022 at 9:35 AM Andrew Godwin
<[email protected]> wrote:

> Having thought on this further, I actually think resubmitting into the
> executor would not be feasible, as it's not possible to tell duplicate task
> requests from resuming after reschedule/deferred etc.
>
> In these situations, the scheduler will submit the same task instance -
> with the same try number and all other details the same - multiple times
> over the course of a small time period, which feels like it is going to be
> impossible to tell apart from the same task accidentally coming in twice
> without a lot of extra accounting on the executor side.
>
> As for adding more to the interface - I would love to see Airflow
> formalise the executor interface more rather than start exposing
> private/underscore-prefixed variables. I think adding a loop that solves
> this to Airflow executors that need it is probably a cleaner way of
> handling it - the contract then being "if it's in QUEUED, then it is the
> executor's job to do something with it"
>
> Andrew
>
>
> On Tue, Mar 15, 2022, 16:11 Jarek Potiuk <[email protected]> wrote:
>
>> Should not we make "_clear_stuck_queued_tasks" part of the executor
>> interface simply?  And trigger it by the scheduler?  I guess that
>> might solve it in a "portable" way without changing the "at-most-once"
>> semantics?
>>
>> J.
>>
>> On Tue, Mar 15, 2022 at 10:40 PM Andrew Godwin
>> <[email protected]> wrote:
>> >
>> > I agree this needs a core fix in Airflow, but I'd like to highlight
>> that this is fundamentally changing the executor contract (as it changes
>> tasks from at-most-once submission to at-least-once) and so not only would
>> it need a very close level of testing, it would also be some level of
>> breaking change - since Airflow allows you to plug in third party executors.
>> >
>> > Not that I'm against it, but we'd have to have a bit of a debate about
>> what level of semver impact it would have. I imagine we could just about
>> justify it as a minor-level change?
>> >
>> > Andrew
>> >
>> > On Tue, Mar 15, 2022 at 3:33 PM Ping Zhang <[email protected]> wrote:
>> >>
>> >> Hi all,
>> >>
>> >> Currently, the tasks could be stuck at `queued` state and could not be
>> scheduled by the scheduler or picked up by the worker. This could happen
>> when failure happens after a task is marked by `queued` before the executor
>> marks it as `running`.
>> >>
>> >> There is a fix https://github.com/apache/airflow/pull/19769/files for
>> the celery executor. However, it only targets the CeleryExecutor and it
>> leaks the scheduler responsibility to the executor.
>> >>
>> >> We propose to move the state reconciliation logic to the scheduler.
>> The proposed change:
>> >>
>> >> 1. when getting the _executable_task_instances_to_queued (
>> https://github.com/apache/airflow/blob/main/airflow/jobs/scheduler_job.py#L287),
>> it also includes QUEUED state, like:
>>  .filter(TI.state.in_([TaskInstanceState.SCHEDULED,
>> TaskInstanceState.QUEUED]). In this way, the scheduler can process queued
>> tis.
>> >>
>> >> 2. In the queue_command method in the base executor, it conditionally
>> (based on last enqueued time by the executor, this is configurable) queues
>> the ti even if its state is QUEUED.
>> >>
>> >> This could potentially send the same tasks twice to the executor.
>> Since in the worker side, there are condition checks about whether a task
>> can run or not. This won't cause issues.
>> >>
>> >> We have been running this in our prod for a while and would love to
>> contribute it.
>> >>
>> >> Please let me know your thoughts.
>> >>
>> >> Thanks,
>> >>
>> >> Ping
>>
>

Reply via email to