See reset_state_for_orphaned_tasks in jobs.py

On Thu, Nov 30, 2017 at 11:17 AM, Alex Guziel <[email protected]>
wrote:

> Right now the scheduler re-launches all QUEUED tasks on restart (there are
> safeguards for duplicates).
>
> On Thu, Nov 30, 2017 at 11:13 AM, Grant Nicholas <grantnicholas2015@u.
> northwestern.edu> wrote:
>
>> @Alex
>> I agree setting the RUNNING state immediately when `airflow run` starts up
>> would be useful on its own, but it doesn't solve all the problems. What
>> happens if you have a task in the QUEUED state (that may or may not have
>> been launched) and your scheduler crashes. What does the scheduler do on
>> startup, does it launch all QUEUED tasks again (knowing that there may be
>> duplicate tasks) or does it not launch the QUEUED tasks again (knowing
>> that
>> the task may be stuck in the QUEUED state forever). Right now, airflow
>> does
>> the latter which I think is not correct, as you can potentially have tasks
>> stuck in the QUEUED state forever.
>>
>> Using a LAUNCHED state would explicitly keep track of whether tasks were
>> submitted for execution or not. At that point it's up to your messaging
>> system/queueing system/etc to be crash safe, and that is something you get
>> for free with kubernetes and it's something you can tune with celery
>> persistent queues.
>>
>> Note: Another option is to NOT introduce a new state but have airflow
>> launch QUEUED tasks again on startup of the executor. This would mean that
>> we may launch duplicate tasks, but this should not be an issue since we
>> have built in protections on worker startup to avoid having two RUNNING
>> task instances at once.
>>
>>
>>
>> On Thu, Nov 30, 2017 at 12:40 PM, Alex Guziel <
>> [email protected]> wrote:
>>
>> > I think the more sensible thing here is to just to set the state to
>> RUNNING
>> > immediately in the airflow run process. I don't think the distinction
>> > between launched and running adds much value.
>> >
>> > On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman <
>> > [email protected]
>> > > wrote:
>> >
>> > > @Alex
>> > >
>> > > That could potentially work since if you have the same task launched
>> > twice
>> > > then the second time would die due to the "already running
>> dependency".
>> > > Still less ideal than not launching that task at all since it still
>> > allows
>> > > for race conditions. @grant thoughts on this?
>> > >
>> > > On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel <[email protected].
>> > > invalid>
>> > > wrote:
>> > >
>> > > > It might be good enough to have RUNNING set immediately on the
>> process
>> > > run
>> > > > and not being dependent on the dag file being parsed. It is annoying
>> > here
>> > > > too when dags parse on the scheduler but not the worker, since
>> queued
>> > > tasks
>> > > > that don't heartbeat will not get retried, while running tasks will.
>> > > >
>> > > > On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas <
>> > > > [email protected]> wrote:
>> > > >
>> > > > > ---Opening up this conversation to the whole mailing list, as
>> > suggested
>> > > > by
>> > > > > Bolke---
>> > > > >
>> > > > >
>> > > > > A "launched" state has been suggested in the past (see here
>> > > > > <https://github.com/apache/incubator-airflow/blob/master/
>> > > > > airflow/utils/state.py#L31>)
>> > > > > but never implemented for reasons unknown to us. Does anyone have
>> > more
>> > > > > details about why?
>> > > > >
>> > > > > There are two big reasons why adding a new "launched" state to
>> > airflow
>> > > > > would be useful:
>> > > > >
>> > > > > 1. A "launched" state would be useful for crash safety of the
>> > > scheduler.
>> > > > If
>> > > > > the scheduler crashes in between the scheduler launching the task
>> and
>> > > the
>> > > > > task process starting up then we lose information about whether
>> that
>> > > task
>> > > > > was launched or not. By moving the state of the task to "launched"
>> > when
>> > > > it
>> > > > > is sent off to celery/dask/kubernetes/etc, when crashes happen you
>> > know
>> > > > > whether you have to relaunch the task or not.
>> > > > >
>> > > > > To workaround this issue, on startup of the kubernetes executor we
>> > > query
>> > > > > all "queued" tasks and if there is not a matching kubernetes pod
>> for
>> > > that
>> > > > > task then we set the task state to "None" so it is rescheduled.
>> See
>> > > here
>> > > > > <https://github.com/bloomberg/airflow/blob/airflow-
>> > > > >
>> > > > kubernetes-executor/airflow/contrib/executors/kubernetes_
>> > > executor.py#L400>
>> > > > > for
>> > > > > details if you are curious. While this works for the kubernetes
>> > > executor,
>> > > > > other executors can't easily introspect launched tasks and this
>> means
>> > > the
>> > > > > celery executor (afaik) is not crash safe.
>> > > > >
>> > > > > 2. A "launched" state would allow for dynamic backpressure of
>> tasks,
>> > > not
>> > > > > just static backpressure. Right now, airflow only allows static
>> > > > > backpressure (`parallelism` config).This means you must statically
>> > say
>> > > I
>> > > > > only want to allow N running tasks at once. Imagine you have lots
>> of
>> > > > tasks
>> > > > > being scheduled on your celery cluster/kubernetes cluster and
>> since
>> > the
>> > > > > resource usage of each task is heterogenous you don't know exactly
>> > how
>> > > > many
>> > > > > running tasks you can tolerate at once. If instead you can say "I
>> > only
>> > > > want
>> > > > > tasks to be launched while I have less than N tasks in the
>> launched
>> > > > state"
>> > > > > you get some adaptive backpressure.
>> > > > >
>> > > > > While we have workarounds described above for the kubernetes
>> > executor,
>> > > > how
>> > > > > do people feel about introducing a launched state into airflow so
>> we
>> > > > don't
>> > > > > need the workarounds? I think there are benefits to be gained for
>> all
>> > > the
>> > > > > executors.
>> > > > >
>> > > > > On Sun, Nov 26, 2017 at 1:46 AM, Bolke de Bruin <
>> [email protected]>
>> > > > wrote:
>> > > > >
>> > > > > >
>> > > > > > Hi Daniel,
>> > > > > >
>> > > > > > (BTW: I do think this discussion is better to have at the
>> > > mailinglist,
>> > > > > > more people might want to chime in and offer valuable opinions)
>> > > > > >
>> > > > > > Jumping right in: I am wondering if are you not duplicating the
>> > > > “queued”
>> > > > > > logic for (a.o) pools. Introducing LAUNCHED with the meaning
>> > attached
>> > > > to
>> > > > > > it that you describe, would mean that we have a second place
>> where
>> > we
>> > > > > > handle back pressure.
>> > > > > >
>> > > > > > Isn’t there a way to ask the k8s cluster how many tasks it has
>> > > pending
>> > > > > and
>> > > > > > just to execute any queued tasks when it crosses a certain
>> > threshold?
>> > > > > Have
>> > > > > > a look a base_executor where it is handling slots and queued
>> tasks.
>> > > > > >
>> > > > > > Cheers
>> > > > > > Bolke
>> > > > > >
>> > > > > >
>> > > > > > Verstuurd vanaf mijn iPad
>> > > > > >
>> > > > > > Op 15 nov. 2017 om 01:39 heeft Daniel Imberman <
>> > > > > [email protected]>
>> > > > > > het volgende geschreven:
>> > > > > >
>> > > > > > Hi Bolke and Dan!
>> > > > > >
>> > > > > > I had a quick question WRT the launched state (
>> > > > > > https://github.com/apache/incubator-airflow/blob/master/air
>> > > > > > flow/utils/state.py#L32).
>> > > > > >
>> > > > > > We are handling the issue of throttling the executor when the
>> k8s
>> > > > cluster
>> > > > > > has more than 5 pending tasks (which usually means that the
>> cluster
>> > > is
>> > > > > > under a lot of strain), and one thought we had WRT crash safety
>> was
>> > > to
>> > > > > use
>> > > > > > a "LAUNCHED" state for pods that have been submitted but are not
>> > > > running
>> > > > > > yet.
>> > > > > >
>> > > > > > With the launched state currently being TBD, I was wondering if
>> > there
>> > > > was
>> > > > > > any reason you guys would not want this state? There are other
>> > > > > workarounds
>> > > > > > we can do, but we wanted to check in with you guys first.
>> > > > > >
>> > > > > > Thanks!
>> > > > > >
>> > > > > > Daniel
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to