I opened a PR addressing the issue I found above, please take a look when
you get a chance:

https://github.com/apache/incubator-airflow/pull/2843

I could not find a reason why we would want to exclude backfilled and
externally triggered dagruns from the "reset orphaned task instances" check
so I removed the special casing. (Anyone with expertise in this area,
please let me know if there is a reason).
This means that all dagruns should rightfully be crash safe again after
this PR.

On Fri, Dec 1, 2017 at 11:57 AM, Maxime Beauchemin <
[email protected]> wrote:

> Oh Alex I didn't know there was a task instance audit feature already.
> Curious to learn about it, do you have pointer or a short description?
>
> Max
>
> On Fri, Dec 1, 2017 at 9:26 AM, Grant Nicholas <
> [email protected]> wrote:
>
> > @Alex just tested this and found the issue. It only resets the state of
> > tasks for dagruns with* "external_trigger=False"*. You can see where this
> > is explicitly specified in the query here:
> > https://github.com/apache/incubator-airflow/blob/master/
> > airflow/jobs.py#L242
> >
> > Is this intended behavior? I can say for us, this caused confusion
> because
> > the dagruns created through the UI default to "external_trigger=True" and
> > dagruns created through the CLI with "airflow trigger_dag {dag_id} -r
> > {run_id}" also default to "external_trigger=True". This means that dags
> > that are manually triggered are *NOT *crash safe by default.
> >
> > Is there a reason why we would only want this behavior for
> > "external_trigger=False" dags?
> >
> > On Thu, Nov 30, 2017 at 1:18 PM, Alex Guziel <[email protected].
> > invalid
> > > wrote:
> >
> > > See reset_state_for_orphaned_tasks in jobs.py
> > >
> > > On Thu, Nov 30, 2017 at 11:17 AM, Alex Guziel <[email protected]>
> > > wrote:
> > >
> > > > Right now the scheduler re-launches all QUEUED tasks on restart
> (there
> > > are
> > > > safeguards for duplicates).
> > > >
> > > > On Thu, Nov 30, 2017 at 11:13 AM, Grant Nicholas
> <grantnicholas2015@u.
> > > > northwestern.edu> wrote:
> > > >
> > > >> @Alex
> > > >> I agree setting the RUNNING state immediately when `airflow run`
> > starts
> > > up
> > > >> would be useful on its own, but it doesn't solve all the problems.
> > What
> > > >> happens if you have a task in the QUEUED state (that may or may not
> > have
> > > >> been launched) and your scheduler crashes. What does the scheduler
> do
> > on
> > > >> startup, does it launch all QUEUED tasks again (knowing that there
> may
> > > be
> > > >> duplicate tasks) or does it not launch the QUEUED tasks again
> (knowing
> > > >> that
> > > >> the task may be stuck in the QUEUED state forever). Right now,
> airflow
> > > >> does
> > > >> the latter which I think is not correct, as you can potentially have
> > > tasks
> > > >> stuck in the QUEUED state forever.
> > > >>
> > > >> Using a LAUNCHED state would explicitly keep track of whether tasks
> > were
> > > >> submitted for execution or not. At that point it's up to your
> > messaging
> > > >> system/queueing system/etc to be crash safe, and that is something
> you
> > > get
> > > >> for free with kubernetes and it's something you can tune with celery
> > > >> persistent queues.
> > > >>
> > > >> Note: Another option is to NOT introduce a new state but have
> airflow
> > > >> launch QUEUED tasks again on startup of the executor. This would
> mean
> > > that
> > > >> we may launch duplicate tasks, but this should not be an issue since
> > we
> > > >> have built in protections on worker startup to avoid having two
> > RUNNING
> > > >> task instances at once.
> > > >>
> > > >>
> > > >>
> > > >> On Thu, Nov 30, 2017 at 12:40 PM, Alex Guziel <
> > > >> [email protected]> wrote:
> > > >>
> > > >> > I think the more sensible thing here is to just to set the state
> to
> > > >> RUNNING
> > > >> > immediately in the airflow run process. I don't think the
> > distinction
> > > >> > between launched and running adds much value.
> > > >> >
> > > >> > On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman <
> > > >> > [email protected]
> > > >> > > wrote:
> > > >> >
> > > >> > > @Alex
> > > >> > >
> > > >> > > That could potentially work since if you have the same task
> > launched
> > > >> > twice
> > > >> > > then the second time would die due to the "already running
> > > >> dependency".
> > > >> > > Still less ideal than not launching that task at all since it
> > still
> > > >> > allows
> > > >> > > for race conditions. @grant thoughts on this?
> > > >> > >
> > > >> > > On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel <
> > > [email protected].
> > > >> > > invalid>
> > > >> > > wrote:
> > > >> > >
> > > >> > > > It might be good enough to have RUNNING set immediately on the
> > > >> process
> > > >> > > run
> > > >> > > > and not being dependent on the dag file being parsed. It is
> > > annoying
> > > >> > here
> > > >> > > > too when dags parse on the scheduler but not the worker, since
> > > >> queued
> > > >> > > tasks
> > > >> > > > that don't heartbeat will not get retried, while running tasks
> > > will.
> > > >> > > >
> > > >> > > > On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas <
> > > >> > > > [email protected]> wrote:
> > > >> > > >
> > > >> > > > > ---Opening up this conversation to the whole mailing list,
> as
> > > >> > suggested
> > > >> > > > by
> > > >> > > > > Bolke---
> > > >> > > > >
> > > >> > > > >
> > > >> > > > > A "launched" state has been suggested in the past (see here
> > > >> > > > > <https://github.com/apache/incubator-airflow/blob/master/
> > > >> > > > > airflow/utils/state.py#L31>)
> > > >> > > > > but never implemented for reasons unknown to us. Does anyone
> > > have
> > > >> > more
> > > >> > > > > details about why?
> > > >> > > > >
> > > >> > > > > There are two big reasons why adding a new "launched" state
> to
> > > >> > airflow
> > > >> > > > > would be useful:
> > > >> > > > >
> > > >> > > > > 1. A "launched" state would be useful for crash safety of
> the
> > > >> > > scheduler.
> > > >> > > > If
> > > >> > > > > the scheduler crashes in between the scheduler launching the
> > > task
> > > >> and
> > > >> > > the
> > > >> > > > > task process starting up then we lose information about
> > whether
> > > >> that
> > > >> > > task
> > > >> > > > > was launched or not. By moving the state of the task to
> > > "launched"
> > > >> > when
> > > >> > > > it
> > > >> > > > > is sent off to celery/dask/kubernetes/etc, when crashes
> happen
> > > you
> > > >> > know
> > > >> > > > > whether you have to relaunch the task or not.
> > > >> > > > >
> > > >> > > > > To workaround this issue, on startup of the kubernetes
> > executor
> > > we
> > > >> > > query
> > > >> > > > > all "queued" tasks and if there is not a matching kubernetes
> > pod
> > > >> for
> > > >> > > that
> > > >> > > > > task then we set the task state to "None" so it is
> > rescheduled.
> > > >> See
> > > >> > > here
> > > >> > > > > <https://github.com/bloomberg/airflow/blob/airflow-
> > > >> > > > >
> > > >> > > > kubernetes-executor/airflow/contrib/executors/kubernetes_
> > > >> > > executor.py#L400>
> > > >> > > > > for
> > > >> > > > > details if you are curious. While this works for the
> > kubernetes
> > > >> > > executor,
> > > >> > > > > other executors can't easily introspect launched tasks and
> > this
> > > >> means
> > > >> > > the
> > > >> > > > > celery executor (afaik) is not crash safe.
> > > >> > > > >
> > > >> > > > > 2. A "launched" state would allow for dynamic backpressure
> of
> > > >> tasks,
> > > >> > > not
> > > >> > > > > just static backpressure. Right now, airflow only allows
> > static
> > > >> > > > > backpressure (`parallelism` config).This means you must
> > > statically
> > > >> > say
> > > >> > > I
> > > >> > > > > only want to allow N running tasks at once. Imagine you have
> > > lots
> > > >> of
> > > >> > > > tasks
> > > >> > > > > being scheduled on your celery cluster/kubernetes cluster
> and
> > > >> since
> > > >> > the
> > > >> > > > > resource usage of each task is heterogenous you don't know
> > > exactly
> > > >> > how
> > > >> > > > many
> > > >> > > > > running tasks you can tolerate at once. If instead you can
> say
> > > "I
> > > >> > only
> > > >> > > > want
> > > >> > > > > tasks to be launched while I have less than N tasks in the
> > > >> launched
> > > >> > > > state"
> > > >> > > > > you get some adaptive backpressure.
> > > >> > > > >
> > > >> > > > > While we have workarounds described above for the kubernetes
> > > >> > executor,
> > > >> > > > how
> > > >> > > > > do people feel about introducing a launched state into
> airflow
> > > so
> > > >> we
> > > >> > > > don't
> > > >> > > > > need the workarounds? I think there are benefits to be
> gained
> > > for
> > > >> all
> > > >> > > the
> > > >> > > > > executors.
> > > >> > > > >
> > > >> > > > > On Sun, Nov 26, 2017 at 1:46 AM, Bolke de Bruin <
> > > >> [email protected]>
> > > >> > > > wrote:
> > > >> > > > >
> > > >> > > > > >
> > > >> > > > > > Hi Daniel,
> > > >> > > > > >
> > > >> > > > > > (BTW: I do think this discussion is better to have at the
> > > >> > > mailinglist,
> > > >> > > > > > more people might want to chime in and offer valuable
> > > opinions)
> > > >> > > > > >
> > > >> > > > > > Jumping right in: I am wondering if are you not
> duplicating
> > > the
> > > >> > > > “queued”
> > > >> > > > > > logic for (a.o) pools. Introducing LAUNCHED with the
> meaning
> > > >> > attached
> > > >> > > > to
> > > >> > > > > > it that you describe, would mean that we have a second
> place
> > > >> where
> > > >> > we
> > > >> > > > > > handle back pressure.
> > > >> > > > > >
> > > >> > > > > > Isn’t there a way to ask the k8s cluster how many tasks it
> > has
> > > >> > > pending
> > > >> > > > > and
> > > >> > > > > > just to execute any queued tasks when it crosses a certain
> > > >> > threshold?
> > > >> > > > > Have
> > > >> > > > > > a look a base_executor where it is handling slots and
> queued
> > > >> tasks.
> > > >> > > > > >
> > > >> > > > > > Cheers
> > > >> > > > > > Bolke
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > Verstuurd vanaf mijn iPad
> > > >> > > > > >
> > > >> > > > > > Op 15 nov. 2017 om 01:39 heeft Daniel Imberman <
> > > >> > > > > [email protected]>
> > > >> > > > > > het volgende geschreven:
> > > >> > > > > >
> > > >> > > > > > Hi Bolke and Dan!
> > > >> > > > > >
> > > >> > > > > > I had a quick question WRT the launched state (
> > > >> > > > > > https://github.com/apache/incubator-airflow/blob/master/
> air
> > > >> > > > > > flow/utils/state.py#L32).
> > > >> > > > > >
> > > >> > > > > > We are handling the issue of throttling the executor when
> > the
> > > >> k8s
> > > >> > > > cluster
> > > >> > > > > > has more than 5 pending tasks (which usually means that
> the
> > > >> cluster
> > > >> > > is
> > > >> > > > > > under a lot of strain), and one thought we had WRT crash
> > > safety
> > > >> was
> > > >> > > to
> > > >> > > > > use
> > > >> > > > > > a "LAUNCHED" state for pods that have been submitted but
> are
> > > not
> > > >> > > > running
> > > >> > > > > > yet.
> > > >> > > > > >
> > > >> > > > > > With the launched state currently being TBD, I was
> wondering
> > > if
> > > >> > there
> > > >> > > > was
> > > >> > > > > > any reason you guys would not want this state? There are
> > other
> > > >> > > > > workarounds
> > > >> > > > > > we can do, but we wanted to check in with you guys first.
> > > >> > > > > >
> > > >> > > > > > Thanks!
> > > >> > > > > >
> > > >> > > > > > Daniel
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to