@Alex just tested this and found the issue. It only resets the state of
tasks for dagruns with* "external_trigger=False"*. You can see where this
is explicitly specified in the query here:
https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L242
Is this intended behavior? I can say for us, this caused confusion because
the dagruns created through the UI default to "external_trigger=True" and
dagruns created through the CLI with "airflow trigger_dag {dag_id} -r
{run_id}" also default to "external_trigger=True". This means that dags
that are manually triggered are *NOT *crash safe by default.
Is there a reason why we would only want this behavior for
"external_trigger=False" dags?
On Thu, Nov 30, 2017 at 1:18 PM, Alex Guziel <[email protected]
> wrote:
> See reset_state_for_orphaned_tasks in jobs.py
>
> On Thu, Nov 30, 2017 at 11:17 AM, Alex Guziel <[email protected]>
> wrote:
>
> > Right now the scheduler re-launches all QUEUED tasks on restart (there
> are
> > safeguards for duplicates).
> >
> > On Thu, Nov 30, 2017 at 11:13 AM, Grant Nicholas <grantnicholas2015@u.
> > northwestern.edu> wrote:
> >
> >> @Alex
> >> I agree setting the RUNNING state immediately when `airflow run` starts
> up
> >> would be useful on its own, but it doesn't solve all the problems. What
> >> happens if you have a task in the QUEUED state (that may or may not have
> >> been launched) and your scheduler crashes. What does the scheduler do on
> >> startup, does it launch all QUEUED tasks again (knowing that there may
> be
> >> duplicate tasks) or does it not launch the QUEUED tasks again (knowing
> >> that
> >> the task may be stuck in the QUEUED state forever). Right now, airflow
> >> does
> >> the latter which I think is not correct, as you can potentially have
> tasks
> >> stuck in the QUEUED state forever.
> >>
> >> Using a LAUNCHED state would explicitly keep track of whether tasks were
> >> submitted for execution or not. At that point it's up to your messaging
> >> system/queueing system/etc to be crash safe, and that is something you
> get
> >> for free with kubernetes and it's something you can tune with celery
> >> persistent queues.
> >>
> >> Note: Another option is to NOT introduce a new state but have airflow
> >> launch QUEUED tasks again on startup of the executor. This would mean
> that
> >> we may launch duplicate tasks, but this should not be an issue since we
> >> have built in protections on worker startup to avoid having two RUNNING
> >> task instances at once.
> >>
> >>
> >>
> >> On Thu, Nov 30, 2017 at 12:40 PM, Alex Guziel <
> >> [email protected]> wrote:
> >>
> >> > I think the more sensible thing here is to just to set the state to
> >> RUNNING
> >> > immediately in the airflow run process. I don't think the distinction
> >> > between launched and running adds much value.
> >> >
> >> > On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman <
> >> > [email protected]
> >> > > wrote:
> >> >
> >> > > @Alex
> >> > >
> >> > > That could potentially work since if you have the same task launched
> >> > twice
> >> > > then the second time would die due to the "already running
> >> dependency".
> >> > > Still less ideal than not launching that task at all since it still
> >> > allows
> >> > > for race conditions. @grant thoughts on this?
> >> > >
> >> > > On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel <
> [email protected].
> >> > > invalid>
> >> > > wrote:
> >> > >
> >> > > > It might be good enough to have RUNNING set immediately on the
> >> process
> >> > > run
> >> > > > and not being dependent on the dag file being parsed. It is
> annoying
> >> > here
> >> > > > too when dags parse on the scheduler but not the worker, since
> >> queued
> >> > > tasks
> >> > > > that don't heartbeat will not get retried, while running tasks
> will.
> >> > > >
> >> > > > On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas <
> >> > > > [email protected]> wrote:
> >> > > >
> >> > > > > ---Opening up this conversation to the whole mailing list, as
> >> > suggested
> >> > > > by
> >> > > > > Bolke---
> >> > > > >
> >> > > > >
> >> > > > > A "launched" state has been suggested in the past (see here
> >> > > > > <https://github.com/apache/incubator-airflow/blob/master/
> >> > > > > airflow/utils/state.py#L31>)
> >> > > > > but never implemented for reasons unknown to us. Does anyone
> have
> >> > more
> >> > > > > details about why?
> >> > > > >
> >> > > > > There are two big reasons why adding a new "launched" state to
> >> > airflow
> >> > > > > would be useful:
> >> > > > >
> >> > > > > 1. A "launched" state would be useful for crash safety of the
> >> > > scheduler.
> >> > > > If
> >> > > > > the scheduler crashes in between the scheduler launching the
> task
> >> and
> >> > > the
> >> > > > > task process starting up then we lose information about whether
> >> that
> >> > > task
> >> > > > > was launched or not. By moving the state of the task to
> "launched"
> >> > when
> >> > > > it
> >> > > > > is sent off to celery/dask/kubernetes/etc, when crashes happen
> you
> >> > know
> >> > > > > whether you have to relaunch the task or not.
> >> > > > >
> >> > > > > To workaround this issue, on startup of the kubernetes executor
> we
> >> > > query
> >> > > > > all "queued" tasks and if there is not a matching kubernetes pod
> >> for
> >> > > that
> >> > > > > task then we set the task state to "None" so it is rescheduled.
> >> See
> >> > > here
> >> > > > > <https://github.com/bloomberg/airflow/blob/airflow-
> >> > > > >
> >> > > > kubernetes-executor/airflow/contrib/executors/kubernetes_
> >> > > executor.py#L400>
> >> > > > > for
> >> > > > > details if you are curious. While this works for the kubernetes
> >> > > executor,
> >> > > > > other executors can't easily introspect launched tasks and this
> >> means
> >> > > the
> >> > > > > celery executor (afaik) is not crash safe.
> >> > > > >
> >> > > > > 2. A "launched" state would allow for dynamic backpressure of
> >> tasks,
> >> > > not
> >> > > > > just static backpressure. Right now, airflow only allows static
> >> > > > > backpressure (`parallelism` config).This means you must
> statically
> >> > say
> >> > > I
> >> > > > > only want to allow N running tasks at once. Imagine you have
> lots
> >> of
> >> > > > tasks
> >> > > > > being scheduled on your celery cluster/kubernetes cluster and
> >> since
> >> > the
> >> > > > > resource usage of each task is heterogenous you don't know
> exactly
> >> > how
> >> > > > many
> >> > > > > running tasks you can tolerate at once. If instead you can say
> "I
> >> > only
> >> > > > want
> >> > > > > tasks to be launched while I have less than N tasks in the
> >> launched
> >> > > > state"
> >> > > > > you get some adaptive backpressure.
> >> > > > >
> >> > > > > While we have workarounds described above for the kubernetes
> >> > executor,
> >> > > > how
> >> > > > > do people feel about introducing a launched state into airflow
> so
> >> we
> >> > > > don't
> >> > > > > need the workarounds? I think there are benefits to be gained
> for
> >> all
> >> > > the
> >> > > > > executors.
> >> > > > >
> >> > > > > On Sun, Nov 26, 2017 at 1:46 AM, Bolke de Bruin <
> >> [email protected]>
> >> > > > wrote:
> >> > > > >
> >> > > > > >
> >> > > > > > Hi Daniel,
> >> > > > > >
> >> > > > > > (BTW: I do think this discussion is better to have at the
> >> > > mailinglist,
> >> > > > > > more people might want to chime in and offer valuable
> opinions)
> >> > > > > >
> >> > > > > > Jumping right in: I am wondering if are you not duplicating
> the
> >> > > > “queued”
> >> > > > > > logic for (a.o) pools. Introducing LAUNCHED with the meaning
> >> > attached
> >> > > > to
> >> > > > > > it that you describe, would mean that we have a second place
> >> where
> >> > we
> >> > > > > > handle back pressure.
> >> > > > > >
> >> > > > > > Isn’t there a way to ask the k8s cluster how many tasks it has
> >> > > pending
> >> > > > > and
> >> > > > > > just to execute any queued tasks when it crosses a certain
> >> > threshold?
> >> > > > > Have
> >> > > > > > a look a base_executor where it is handling slots and queued
> >> tasks.
> >> > > > > >
> >> > > > > > Cheers
> >> > > > > > Bolke
> >> > > > > >
> >> > > > > >
> >> > > > > > Verstuurd vanaf mijn iPad
> >> > > > > >
> >> > > > > > Op 15 nov. 2017 om 01:39 heeft Daniel Imberman <
> >> > > > > [email protected]>
> >> > > > > > het volgende geschreven:
> >> > > > > >
> >> > > > > > Hi Bolke and Dan!
> >> > > > > >
> >> > > > > > I had a quick question WRT the launched state (
> >> > > > > > https://github.com/apache/incubator-airflow/blob/master/air
> >> > > > > > flow/utils/state.py#L32).
> >> > > > > >
> >> > > > > > We are handling the issue of throttling the executor when the
> >> k8s
> >> > > > cluster
> >> > > > > > has more than 5 pending tasks (which usually means that the
> >> cluster
> >> > > is
> >> > > > > > under a lot of strain), and one thought we had WRT crash
> safety
> >> was
> >> > > to
> >> > > > > use
> >> > > > > > a "LAUNCHED" state for pods that have been submitted but are
> not
> >> > > > running
> >> > > > > > yet.
> >> > > > > >
> >> > > > > > With the launched state currently being TBD, I was wondering
> if
> >> > there
> >> > > > was
> >> > > > > > any reason you guys would not want this state? There are other
> >> > > > > workarounds
> >> > > > > > we can do, but we wanted to check in with you guys first.
> >> > > > > >
> >> > > > > > Thanks!
> >> > > > > >
> >> > > > > > Daniel
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>