On this tangent, our scheduler occasionally crashes when the db tells 
SQLAlchemy that there's a lock on the task it's trying to set as queued or 
running.
Some (update query) retry logic in the (many) callers seems to be in order.

On 12/1/17, 2:19 PM, "Maxime Beauchemin" <[email protected]> wrote:

    Taking a tangent here:
    
    I like the idea of logging every state change to another table. Mutating
    task_instance from many places results in things that are hard to debug in
    some cases.
    
    As we need similar history-tracking of mutations on task_instances around
    retries, we may want keep track of history for anything that touches
    task_instance.
    
    It may be easy-ish to maintain this table using SQLAlchemy after-update
    hooks on the model where we'd systematically insert in a
    task_instance_history. Just a thought.
    
    Max
    
    On Thu, Nov 30, 2017 at 11:26 AM, Grant Nicholas <
    [email protected]> wrote:
    
    > Thanks, I see why that should work, I just know that from testing this
    > myself that I had to manually clear out old QUEUED task instances to get
    > them to reschedule. I'll do some more testing to confirm, it's totally
    > possible I did something wrong in our test suite setup.
    >
    > On Thu, Nov 30, 2017 at 1:18 PM, Alex Guziel <[email protected].
    > invalid
    > > wrote:
    >
    > > See reset_state_for_orphaned_tasks in jobs.py
    > >
    > > On Thu, Nov 30, 2017 at 11:17 AM, Alex Guziel <[email protected]>
    > > wrote:
    > >
    > > > Right now the scheduler re-launches all QUEUED tasks on restart (there
    > > are
    > > > safeguards for duplicates).
    > > >
    > > > On Thu, Nov 30, 2017 at 11:13 AM, Grant Nicholas <grantnicholas2015@u.
    > > > northwestern.edu> wrote:
    > > >
    > > >> @Alex
    > > >> I agree setting the RUNNING state immediately when `airflow run`
    > starts
    > > up
    > > >> would be useful on its own, but it doesn't solve all the problems.
    > What
    > > >> happens if you have a task in the QUEUED state (that may or may not
    > have
    > > >> been launched) and your scheduler crashes. What does the scheduler do
    > on
    > > >> startup, does it launch all QUEUED tasks again (knowing that there 
may
    > > be
    > > >> duplicate tasks) or does it not launch the QUEUED tasks again 
(knowing
    > > >> that
    > > >> the task may be stuck in the QUEUED state forever). Right now, 
airflow
    > > >> does
    > > >> the latter which I think is not correct, as you can potentially have
    > > tasks
    > > >> stuck in the QUEUED state forever.
    > > >>
    > > >> Using a LAUNCHED state would explicitly keep track of whether tasks
    > were
    > > >> submitted for execution or not. At that point it's up to your
    > messaging
    > > >> system/queueing system/etc to be crash safe, and that is something 
you
    > > get
    > > >> for free with kubernetes and it's something you can tune with celery
    > > >> persistent queues.
    > > >>
    > > >> Note: Another option is to NOT introduce a new state but have airflow
    > > >> launch QUEUED tasks again on startup of the executor. This would mean
    > > that
    > > >> we may launch duplicate tasks, but this should not be an issue since
    > we
    > > >> have built in protections on worker startup to avoid having two
    > RUNNING
    > > >> task instances at once.
    > > >>
    > > >>
    > > >>
    > > >> On Thu, Nov 30, 2017 at 12:40 PM, Alex Guziel <
    > > >> [email protected]> wrote:
    > > >>
    > > >> > I think the more sensible thing here is to just to set the state to
    > > >> RUNNING
    > > >> > immediately in the airflow run process. I don't think the
    > distinction
    > > >> > between launched and running adds much value.
    > > >> >
    > > >> > On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman <
    > > >> > [email protected]
    > > >> > > wrote:
    > > >> >
    > > >> > > @Alex
    > > >> > >
    > > >> > > That could potentially work since if you have the same task
    > launched
    > > >> > twice
    > > >> > > then the second time would die due to the "already running
    > > >> dependency".
    > > >> > > Still less ideal than not launching that task at all since it
    > still
    > > >> > allows
    > > >> > > for race conditions. @grant thoughts on this?
    > > >> > >
    > > >> > > On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel <
    > > [email protected].
    > > >> > > invalid>
    > > >> > > wrote:
    > > >> > >
    > > >> > > > It might be good enough to have RUNNING set immediately on the
    > > >> process
    > > >> > > run
    > > >> > > > and not being dependent on the dag file being parsed. It is
    > > annoying
    > > >> > here
    > > >> > > > too when dags parse on the scheduler but not the worker, since
    > > >> queued
    > > >> > > tasks
    > > >> > > > that don't heartbeat will not get retried, while running tasks
    > > will.
    > > >> > > >
    > > >> > > > On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas <
    > > >> > > > [email protected]> wrote:
    > > >> > > >
    > > >> > > > > ---Opening up this conversation to the whole mailing list, as
    > > >> > suggested
    > > >> > > > by
    > > >> > > > > Bolke---
    > > >> > > > >
    > > >> > > > >
    > > >> > > > > A "launched" state has been suggested in the past (see here
    > > >> > > > > <https://github.com/apache/incubator-airflow/blob/master/
    > > >> > > > > airflow/utils/state.py#L31>)
    > > >> > > > > but never implemented for reasons unknown to us. Does anyone
    > > have
    > > >> > more
    > > >> > > > > details about why?
    > > >> > > > >
    > > >> > > > > There are two big reasons why adding a new "launched" state 
to
    > > >> > airflow
    > > >> > > > > would be useful:
    > > >> > > > >
    > > >> > > > > 1. A "launched" state would be useful for crash safety of the
    > > >> > > scheduler.
    > > >> > > > If
    > > >> > > > > the scheduler crashes in between the scheduler launching the
    > > task
    > > >> and
    > > >> > > the
    > > >> > > > > task process starting up then we lose information about
    > whether
    > > >> that
    > > >> > > task
    > > >> > > > > was launched or not. By moving the state of the task to
    > > "launched"
    > > >> > when
    > > >> > > > it
    > > >> > > > > is sent off to celery/dask/kubernetes/etc, when crashes 
happen
    > > you
    > > >> > know
    > > >> > > > > whether you have to relaunch the task or not.
    > > >> > > > >
    > > >> > > > > To workaround this issue, on startup of the kubernetes
    > executor
    > > we
    > > >> > > query
    > > >> > > > > all "queued" tasks and if there is not a matching kubernetes
    > pod
    > > >> for
    > > >> > > that
    > > >> > > > > task then we set the task state to "None" so it is
    > rescheduled.
    > > >> See
    > > >> > > here
    > > >> > > > > <https://github.com/bloomberg/airflow/blob/airflow-
    > > >> > > > >
    > > >> > > > kubernetes-executor/airflow/contrib/executors/kubernetes_
    > > >> > > executor.py#L400>
    > > >> > > > > for
    > > >> > > > > details if you are curious. While this works for the
    > kubernetes
    > > >> > > executor,
    > > >> > > > > other executors can't easily introspect launched tasks and
    > this
    > > >> means
    > > >> > > the
    > > >> > > > > celery executor (afaik) is not crash safe.
    > > >> > > > >
    > > >> > > > > 2. A "launched" state would allow for dynamic backpressure of
    > > >> tasks,
    > > >> > > not
    > > >> > > > > just static backpressure. Right now, airflow only allows
    > static
    > > >> > > > > backpressure (`parallelism` config).This means you must
    > > statically
    > > >> > say
    > > >> > > I
    > > >> > > > > only want to allow N running tasks at once. Imagine you have
    > > lots
    > > >> of
    > > >> > > > tasks
    > > >> > > > > being scheduled on your celery cluster/kubernetes cluster and
    > > >> since
    > > >> > the
    > > >> > > > > resource usage of each task is heterogenous you don't know
    > > exactly
    > > >> > how
    > > >> > > > many
    > > >> > > > > running tasks you can tolerate at once. If instead you can 
say
    > > "I
    > > >> > only
    > > >> > > > want
    > > >> > > > > tasks to be launched while I have less than N tasks in the
    > > >> launched
    > > >> > > > state"
    > > >> > > > > you get some adaptive backpressure.
    > > >> > > > >
    > > >> > > > > While we have workarounds described above for the kubernetes
    > > >> > executor,
    > > >> > > > how
    > > >> > > > > do people feel about introducing a launched state into 
airflow
    > > so
    > > >> we
    > > >> > > > don't
    > > >> > > > > need the workarounds? I think there are benefits to be gained
    > > for
    > > >> all
    > > >> > > the
    > > >> > > > > executors.
    > > >> > > > >
    > > >> > > > > On Sun, Nov 26, 2017 at 1:46 AM, Bolke de Bruin <
    > > >> [email protected]>
    > > >> > > > wrote:
    > > >> > > > >
    > > >> > > > > >
    > > >> > > > > > Hi Daniel,
    > > >> > > > > >
    > > >> > > > > > (BTW: I do think this discussion is better to have at the
    > > >> > > mailinglist,
    > > >> > > > > > more people might want to chime in and offer valuable
    > > opinions)
    > > >> > > > > >
    > > >> > > > > > Jumping right in: I am wondering if are you not duplicating
    > > the
    > > >> > > > “queued”
    > > >> > > > > > logic for (a.o) pools. Introducing LAUNCHED with the 
meaning
    > > >> > attached
    > > >> > > > to
    > > >> > > > > > it that you describe, would mean that we have a second 
place
    > > >> where
    > > >> > we
    > > >> > > > > > handle back pressure.
    > > >> > > > > >
    > > >> > > > > > Isn’t there a way to ask the k8s cluster how many tasks it
    > has
    > > >> > > pending
    > > >> > > > > and
    > > >> > > > > > just to execute any queued tasks when it crosses a certain
    > > >> > threshold?
    > > >> > > > > Have
    > > >> > > > > > a look a base_executor where it is handling slots and 
queued
    > > >> tasks.
    > > >> > > > > >
    > > >> > > > > > Cheers
    > > >> > > > > > Bolke
    > > >> > > > > >
    > > >> > > > > >
    > > >> > > > > > Verstuurd vanaf mijn iPad
    > > >> > > > > >
    > > >> > > > > > Op 15 nov. 2017 om 01:39 heeft Daniel Imberman <
    > > >> > > > > [email protected]>
    > > >> > > > > > het volgende geschreven:
    > > >> > > > > >
    > > >> > > > > > Hi Bolke and Dan!
    > > >> > > > > >
    > > >> > > > > > I had a quick question WRT the launched state (
    > > >> > > > > > https://github.com/apache/incubator-airflow/blob/master/air
    > > >> > > > > > flow/utils/state.py#L32).
    > > >> > > > > >
    > > >> > > > > > We are handling the issue of throttling the executor when
    > the
    > > >> k8s
    > > >> > > > cluster
    > > >> > > > > > has more than 5 pending tasks (which usually means that the
    > > >> cluster
    > > >> > > is
    > > >> > > > > > under a lot of strain), and one thought we had WRT crash
    > > safety
    > > >> was
    > > >> > > to
    > > >> > > > > use
    > > >> > > > > > a "LAUNCHED" state for pods that have been submitted but 
are
    > > not
    > > >> > > > running
    > > >> > > > > > yet.
    > > >> > > > > >
    > > >> > > > > > With the launched state currently being TBD, I was 
wondering
    > > if
    > > >> > there
    > > >> > > > was
    > > >> > > > > > any reason you guys would not want this state? There are
    > other
    > > >> > > > > workarounds
    > > >> > > > > > we can do, but we wanted to check in with you guys first.
    > > >> > > > > >
    > > >> > > > > > Thanks!
    > > >> > > > > >
    > > >> > > > > > Daniel
    > > >> > > > > >
    > > >> > > > > >
    > > >> > > > > >
    > > >> > > > >
    > > >> > > >
    > > >> > >
    > > >> >
    > > >>
    > > >
    > > >
    > >
    >
    

Reply via email to