Taking a tangent here: I like the idea of logging every state change to another table. Mutating task_instance from many places results in things that are hard to debug in some cases.
As we need similar history-tracking of mutations on task_instances around retries, we may want keep track of history for anything that touches task_instance. It may be easy-ish to maintain this table using SQLAlchemy after-update hooks on the model where we'd systematically insert in a task_instance_history. Just a thought. Max On Thu, Nov 30, 2017 at 11:26 AM, Grant Nicholas < [email protected]> wrote: > Thanks, I see why that should work, I just know that from testing this > myself that I had to manually clear out old QUEUED task instances to get > them to reschedule. I'll do some more testing to confirm, it's totally > possible I did something wrong in our test suite setup. > > On Thu, Nov 30, 2017 at 1:18 PM, Alex Guziel <[email protected]. > invalid > > wrote: > > > See reset_state_for_orphaned_tasks in jobs.py > > > > On Thu, Nov 30, 2017 at 11:17 AM, Alex Guziel <[email protected]> > > wrote: > > > > > Right now the scheduler re-launches all QUEUED tasks on restart (there > > are > > > safeguards for duplicates). > > > > > > On Thu, Nov 30, 2017 at 11:13 AM, Grant Nicholas <grantnicholas2015@u. > > > northwestern.edu> wrote: > > > > > >> @Alex > > >> I agree setting the RUNNING state immediately when `airflow run` > starts > > up > > >> would be useful on its own, but it doesn't solve all the problems. > What > > >> happens if you have a task in the QUEUED state (that may or may not > have > > >> been launched) and your scheduler crashes. What does the scheduler do > on > > >> startup, does it launch all QUEUED tasks again (knowing that there may > > be > > >> duplicate tasks) or does it not launch the QUEUED tasks again (knowing > > >> that > > >> the task may be stuck in the QUEUED state forever). Right now, airflow > > >> does > > >> the latter which I think is not correct, as you can potentially have > > tasks > > >> stuck in the QUEUED state forever. > > >> > > >> Using a LAUNCHED state would explicitly keep track of whether tasks > were > > >> submitted for execution or not. At that point it's up to your > messaging > > >> system/queueing system/etc to be crash safe, and that is something you > > get > > >> for free with kubernetes and it's something you can tune with celery > > >> persistent queues. > > >> > > >> Note: Another option is to NOT introduce a new state but have airflow > > >> launch QUEUED tasks again on startup of the executor. This would mean > > that > > >> we may launch duplicate tasks, but this should not be an issue since > we > > >> have built in protections on worker startup to avoid having two > RUNNING > > >> task instances at once. > > >> > > >> > > >> > > >> On Thu, Nov 30, 2017 at 12:40 PM, Alex Guziel < > > >> [email protected]> wrote: > > >> > > >> > I think the more sensible thing here is to just to set the state to > > >> RUNNING > > >> > immediately in the airflow run process. I don't think the > distinction > > >> > between launched and running adds much value. > > >> > > > >> > On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman < > > >> > [email protected] > > >> > > wrote: > > >> > > > >> > > @Alex > > >> > > > > >> > > That could potentially work since if you have the same task > launched > > >> > twice > > >> > > then the second time would die due to the "already running > > >> dependency". > > >> > > Still less ideal than not launching that task at all since it > still > > >> > allows > > >> > > for race conditions. @grant thoughts on this? > > >> > > > > >> > > On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel < > > [email protected]. > > >> > > invalid> > > >> > > wrote: > > >> > > > > >> > > > It might be good enough to have RUNNING set immediately on the > > >> process > > >> > > run > > >> > > > and not being dependent on the dag file being parsed. It is > > annoying > > >> > here > > >> > > > too when dags parse on the scheduler but not the worker, since > > >> queued > > >> > > tasks > > >> > > > that don't heartbeat will not get retried, while running tasks > > will. > > >> > > > > > >> > > > On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas < > > >> > > > [email protected]> wrote: > > >> > > > > > >> > > > > ---Opening up this conversation to the whole mailing list, as > > >> > suggested > > >> > > > by > > >> > > > > Bolke--- > > >> > > > > > > >> > > > > > > >> > > > > A "launched" state has been suggested in the past (see here > > >> > > > > <https://github.com/apache/incubator-airflow/blob/master/ > > >> > > > > airflow/utils/state.py#L31>) > > >> > > > > but never implemented for reasons unknown to us. Does anyone > > have > > >> > more > > >> > > > > details about why? > > >> > > > > > > >> > > > > There are two big reasons why adding a new "launched" state to > > >> > airflow > > >> > > > > would be useful: > > >> > > > > > > >> > > > > 1. A "launched" state would be useful for crash safety of the > > >> > > scheduler. > > >> > > > If > > >> > > > > the scheduler crashes in between the scheduler launching the > > task > > >> and > > >> > > the > > >> > > > > task process starting up then we lose information about > whether > > >> that > > >> > > task > > >> > > > > was launched or not. By moving the state of the task to > > "launched" > > >> > when > > >> > > > it > > >> > > > > is sent off to celery/dask/kubernetes/etc, when crashes happen > > you > > >> > know > > >> > > > > whether you have to relaunch the task or not. > > >> > > > > > > >> > > > > To workaround this issue, on startup of the kubernetes > executor > > we > > >> > > query > > >> > > > > all "queued" tasks and if there is not a matching kubernetes > pod > > >> for > > >> > > that > > >> > > > > task then we set the task state to "None" so it is > rescheduled. > > >> See > > >> > > here > > >> > > > > <https://github.com/bloomberg/airflow/blob/airflow- > > >> > > > > > > >> > > > kubernetes-executor/airflow/contrib/executors/kubernetes_ > > >> > > executor.py#L400> > > >> > > > > for > > >> > > > > details if you are curious. While this works for the > kubernetes > > >> > > executor, > > >> > > > > other executors can't easily introspect launched tasks and > this > > >> means > > >> > > the > > >> > > > > celery executor (afaik) is not crash safe. > > >> > > > > > > >> > > > > 2. A "launched" state would allow for dynamic backpressure of > > >> tasks, > > >> > > not > > >> > > > > just static backpressure. Right now, airflow only allows > static > > >> > > > > backpressure (`parallelism` config).This means you must > > statically > > >> > say > > >> > > I > > >> > > > > only want to allow N running tasks at once. Imagine you have > > lots > > >> of > > >> > > > tasks > > >> > > > > being scheduled on your celery cluster/kubernetes cluster and > > >> since > > >> > the > > >> > > > > resource usage of each task is heterogenous you don't know > > exactly > > >> > how > > >> > > > many > > >> > > > > running tasks you can tolerate at once. If instead you can say > > "I > > >> > only > > >> > > > want > > >> > > > > tasks to be launched while I have less than N tasks in the > > >> launched > > >> > > > state" > > >> > > > > you get some adaptive backpressure. > > >> > > > > > > >> > > > > While we have workarounds described above for the kubernetes > > >> > executor, > > >> > > > how > > >> > > > > do people feel about introducing a launched state into airflow > > so > > >> we > > >> > > > don't > > >> > > > > need the workarounds? I think there are benefits to be gained > > for > > >> all > > >> > > the > > >> > > > > executors. > > >> > > > > > > >> > > > > On Sun, Nov 26, 2017 at 1:46 AM, Bolke de Bruin < > > >> [email protected]> > > >> > > > wrote: > > >> > > > > > > >> > > > > > > > >> > > > > > Hi Daniel, > > >> > > > > > > > >> > > > > > (BTW: I do think this discussion is better to have at the > > >> > > mailinglist, > > >> > > > > > more people might want to chime in and offer valuable > > opinions) > > >> > > > > > > > >> > > > > > Jumping right in: I am wondering if are you not duplicating > > the > > >> > > > “queued” > > >> > > > > > logic for (a.o) pools. Introducing LAUNCHED with the meaning > > >> > attached > > >> > > > to > > >> > > > > > it that you describe, would mean that we have a second place > > >> where > > >> > we > > >> > > > > > handle back pressure. > > >> > > > > > > > >> > > > > > Isn’t there a way to ask the k8s cluster how many tasks it > has > > >> > > pending > > >> > > > > and > > >> > > > > > just to execute any queued tasks when it crosses a certain > > >> > threshold? > > >> > > > > Have > > >> > > > > > a look a base_executor where it is handling slots and queued > > >> tasks. > > >> > > > > > > > >> > > > > > Cheers > > >> > > > > > Bolke > > >> > > > > > > > >> > > > > > > > >> > > > > > Verstuurd vanaf mijn iPad > > >> > > > > > > > >> > > > > > Op 15 nov. 2017 om 01:39 heeft Daniel Imberman < > > >> > > > > [email protected]> > > >> > > > > > het volgende geschreven: > > >> > > > > > > > >> > > > > > Hi Bolke and Dan! > > >> > > > > > > > >> > > > > > I had a quick question WRT the launched state ( > > >> > > > > > https://github.com/apache/incubator-airflow/blob/master/air > > >> > > > > > flow/utils/state.py#L32). > > >> > > > > > > > >> > > > > > We are handling the issue of throttling the executor when > the > > >> k8s > > >> > > > cluster > > >> > > > > > has more than 5 pending tasks (which usually means that the > > >> cluster > > >> > > is > > >> > > > > > under a lot of strain), and one thought we had WRT crash > > safety > > >> was > > >> > > to > > >> > > > > use > > >> > > > > > a "LAUNCHED" state for pods that have been submitted but are > > not > > >> > > > running > > >> > > > > > yet. > > >> > > > > > > > >> > > > > > With the launched state currently being TBD, I was wondering > > if > > >> > there > > >> > > > was > > >> > > > > > any reason you guys would not want this state? There are > other > > >> > > > > workarounds > > >> > > > > > we can do, but we wanted to check in with you guys first. > > >> > > > > > > > >> > > > > > Thanks! > > >> > > > > > > > >> > > > > > Daniel > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > >
