Right now the scheduler re-launches all QUEUED tasks on restart (there are
safeguards for duplicates).

On Thu, Nov 30, 2017 at 11:13 AM, Grant Nicholas <
[email protected]> wrote:

> @Alex
> I agree setting the RUNNING state immediately when `airflow run` starts up
> would be useful on its own, but it doesn't solve all the problems. What
> happens if you have a task in the QUEUED state (that may or may not have
> been launched) and your scheduler crashes. What does the scheduler do on
> startup, does it launch all QUEUED tasks again (knowing that there may be
> duplicate tasks) or does it not launch the QUEUED tasks again (knowing that
> the task may be stuck in the QUEUED state forever). Right now, airflow does
> the latter which I think is not correct, as you can potentially have tasks
> stuck in the QUEUED state forever.
>
> Using a LAUNCHED state would explicitly keep track of whether tasks were
> submitted for execution or not. At that point it's up to your messaging
> system/queueing system/etc to be crash safe, and that is something you get
> for free with kubernetes and it's something you can tune with celery
> persistent queues.
>
> Note: Another option is to NOT introduce a new state but have airflow
> launch QUEUED tasks again on startup of the executor. This would mean that
> we may launch duplicate tasks, but this should not be an issue since we
> have built in protections on worker startup to avoid having two RUNNING
> task instances at once.
>
>
>
> On Thu, Nov 30, 2017 at 12:40 PM, Alex Guziel <
> [email protected]> wrote:
>
> > I think the more sensible thing here is to just to set the state to
> RUNNING
> > immediately in the airflow run process. I don't think the distinction
> > between launched and running adds much value.
> >
> > On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman <
> > [email protected]
> > > wrote:
> >
> > > @Alex
> > >
> > > That could potentially work since if you have the same task launched
> > twice
> > > then the second time would die due to the "already running dependency".
> > > Still less ideal than not launching that task at all since it still
> > allows
> > > for race conditions. @grant thoughts on this?
> > >
> > > On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel <[email protected].
> > > invalid>
> > > wrote:
> > >
> > > > It might be good enough to have RUNNING set immediately on the
> process
> > > run
> > > > and not being dependent on the dag file being parsed. It is annoying
> > here
> > > > too when dags parse on the scheduler but not the worker, since queued
> > > tasks
> > > > that don't heartbeat will not get retried, while running tasks will.
> > > >
> > > > On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas <
> > > > [email protected]> wrote:
> > > >
> > > > > ---Opening up this conversation to the whole mailing list, as
> > suggested
> > > > by
> > > > > Bolke---
> > > > >
> > > > >
> > > > > A "launched" state has been suggested in the past (see here
> > > > > <https://github.com/apache/incubator-airflow/blob/master/
> > > > > airflow/utils/state.py#L31>)
> > > > > but never implemented for reasons unknown to us. Does anyone have
> > more
> > > > > details about why?
> > > > >
> > > > > There are two big reasons why adding a new "launched" state to
> > airflow
> > > > > would be useful:
> > > > >
> > > > > 1. A "launched" state would be useful for crash safety of the
> > > scheduler.
> > > > If
> > > > > the scheduler crashes in between the scheduler launching the task
> and
> > > the
> > > > > task process starting up then we lose information about whether
> that
> > > task
> > > > > was launched or not. By moving the state of the task to "launched"
> > when
> > > > it
> > > > > is sent off to celery/dask/kubernetes/etc, when crashes happen you
> > know
> > > > > whether you have to relaunch the task or not.
> > > > >
> > > > > To workaround this issue, on startup of the kubernetes executor we
> > > query
> > > > > all "queued" tasks and if there is not a matching kubernetes pod
> for
> > > that
> > > > > task then we set the task state to "None" so it is rescheduled. See
> > > here
> > > > > <https://github.com/bloomberg/airflow/blob/airflow-
> > > > >
> > > > kubernetes-executor/airflow/contrib/executors/kubernetes_
> > > executor.py#L400>
> > > > > for
> > > > > details if you are curious. While this works for the kubernetes
> > > executor,
> > > > > other executors can't easily introspect launched tasks and this
> means
> > > the
> > > > > celery executor (afaik) is not crash safe.
> > > > >
> > > > > 2. A "launched" state would allow for dynamic backpressure of
> tasks,
> > > not
> > > > > just static backpressure. Right now, airflow only allows static
> > > > > backpressure (`parallelism` config).This means you must statically
> > say
> > > I
> > > > > only want to allow N running tasks at once. Imagine you have lots
> of
> > > > tasks
> > > > > being scheduled on your celery cluster/kubernetes cluster and since
> > the
> > > > > resource usage of each task is heterogenous you don't know exactly
> > how
> > > > many
> > > > > running tasks you can tolerate at once. If instead you can say "I
> > only
> > > > want
> > > > > tasks to be launched while I have less than N tasks in the launched
> > > > state"
> > > > > you get some adaptive backpressure.
> > > > >
> > > > > While we have workarounds described above for the kubernetes
> > executor,
> > > > how
> > > > > do people feel about introducing a launched state into airflow so
> we
> > > > don't
> > > > > need the workarounds? I think there are benefits to be gained for
> all
> > > the
> > > > > executors.
> > > > >
> > > > > On Sun, Nov 26, 2017 at 1:46 AM, Bolke de Bruin <[email protected]
> >
> > > > wrote:
> > > > >
> > > > > >
> > > > > > Hi Daniel,
> > > > > >
> > > > > > (BTW: I do think this discussion is better to have at the
> > > mailinglist,
> > > > > > more people might want to chime in and offer valuable opinions)
> > > > > >
> > > > > > Jumping right in: I am wondering if are you not duplicating the
> > > > “queued”
> > > > > > logic for (a.o) pools. Introducing LAUNCHED with the meaning
> > attached
> > > > to
> > > > > > it that you describe, would mean that we have a second place
> where
> > we
> > > > > > handle back pressure.
> > > > > >
> > > > > > Isn’t there a way to ask the k8s cluster how many tasks it has
> > > pending
> > > > > and
> > > > > > just to execute any queued tasks when it crosses a certain
> > threshold?
> > > > > Have
> > > > > > a look a base_executor where it is handling slots and queued
> tasks.
> > > > > >
> > > > > > Cheers
> > > > > > Bolke
> > > > > >
> > > > > >
> > > > > > Verstuurd vanaf mijn iPad
> > > > > >
> > > > > > Op 15 nov. 2017 om 01:39 heeft Daniel Imberman <
> > > > > [email protected]>
> > > > > > het volgende geschreven:
> > > > > >
> > > > > > Hi Bolke and Dan!
> > > > > >
> > > > > > I had a quick question WRT the launched state (
> > > > > > https://github.com/apache/incubator-airflow/blob/master/air
> > > > > > flow/utils/state.py#L32).
> > > > > >
> > > > > > We are handling the issue of throttling the executor when the k8s
> > > > cluster
> > > > > > has more than 5 pending tasks (which usually means that the
> cluster
> > > is
> > > > > > under a lot of strain), and one thought we had WRT crash safety
> was
> > > to
> > > > > use
> > > > > > a "LAUNCHED" state for pods that have been submitted but are not
> > > > running
> > > > > > yet.
> > > > > >
> > > > > > With the launched state currently being TBD, I was wondering if
> > there
> > > > was
> > > > > > any reason you guys would not want this state? There are other
> > > > > workarounds
> > > > > > we can do, but we wanted to check in with you guys first.
> > > > > >
> > > > > > Thanks!
> > > > > >
> > > > > > Daniel
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to