Right now the scheduler re-launches all QUEUED tasks on restart (there are safeguards for duplicates).
On Thu, Nov 30, 2017 at 11:13 AM, Grant Nicholas < [email protected]> wrote: > @Alex > I agree setting the RUNNING state immediately when `airflow run` starts up > would be useful on its own, but it doesn't solve all the problems. What > happens if you have a task in the QUEUED state (that may or may not have > been launched) and your scheduler crashes. What does the scheduler do on > startup, does it launch all QUEUED tasks again (knowing that there may be > duplicate tasks) or does it not launch the QUEUED tasks again (knowing that > the task may be stuck in the QUEUED state forever). Right now, airflow does > the latter which I think is not correct, as you can potentially have tasks > stuck in the QUEUED state forever. > > Using a LAUNCHED state would explicitly keep track of whether tasks were > submitted for execution or not. At that point it's up to your messaging > system/queueing system/etc to be crash safe, and that is something you get > for free with kubernetes and it's something you can tune with celery > persistent queues. > > Note: Another option is to NOT introduce a new state but have airflow > launch QUEUED tasks again on startup of the executor. This would mean that > we may launch duplicate tasks, but this should not be an issue since we > have built in protections on worker startup to avoid having two RUNNING > task instances at once. > > > > On Thu, Nov 30, 2017 at 12:40 PM, Alex Guziel < > [email protected]> wrote: > > > I think the more sensible thing here is to just to set the state to > RUNNING > > immediately in the airflow run process. I don't think the distinction > > between launched and running adds much value. > > > > On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman < > > [email protected] > > > wrote: > > > > > @Alex > > > > > > That could potentially work since if you have the same task launched > > twice > > > then the second time would die due to the "already running dependency". > > > Still less ideal than not launching that task at all since it still > > allows > > > for race conditions. @grant thoughts on this? > > > > > > On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel <[email protected]. > > > invalid> > > > wrote: > > > > > > > It might be good enough to have RUNNING set immediately on the > process > > > run > > > > and not being dependent on the dag file being parsed. It is annoying > > here > > > > too when dags parse on the scheduler but not the worker, since queued > > > tasks > > > > that don't heartbeat will not get retried, while running tasks will. > > > > > > > > On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas < > > > > [email protected]> wrote: > > > > > > > > > ---Opening up this conversation to the whole mailing list, as > > suggested > > > > by > > > > > Bolke--- > > > > > > > > > > > > > > > A "launched" state has been suggested in the past (see here > > > > > <https://github.com/apache/incubator-airflow/blob/master/ > > > > > airflow/utils/state.py#L31>) > > > > > but never implemented for reasons unknown to us. Does anyone have > > more > > > > > details about why? > > > > > > > > > > There are two big reasons why adding a new "launched" state to > > airflow > > > > > would be useful: > > > > > > > > > > 1. A "launched" state would be useful for crash safety of the > > > scheduler. > > > > If > > > > > the scheduler crashes in between the scheduler launching the task > and > > > the > > > > > task process starting up then we lose information about whether > that > > > task > > > > > was launched or not. By moving the state of the task to "launched" > > when > > > > it > > > > > is sent off to celery/dask/kubernetes/etc, when crashes happen you > > know > > > > > whether you have to relaunch the task or not. > > > > > > > > > > To workaround this issue, on startup of the kubernetes executor we > > > query > > > > > all "queued" tasks and if there is not a matching kubernetes pod > for > > > that > > > > > task then we set the task state to "None" so it is rescheduled. See > > > here > > > > > <https://github.com/bloomberg/airflow/blob/airflow- > > > > > > > > > kubernetes-executor/airflow/contrib/executors/kubernetes_ > > > executor.py#L400> > > > > > for > > > > > details if you are curious. While this works for the kubernetes > > > executor, > > > > > other executors can't easily introspect launched tasks and this > means > > > the > > > > > celery executor (afaik) is not crash safe. > > > > > > > > > > 2. A "launched" state would allow for dynamic backpressure of > tasks, > > > not > > > > > just static backpressure. Right now, airflow only allows static > > > > > backpressure (`parallelism` config).This means you must statically > > say > > > I > > > > > only want to allow N running tasks at once. Imagine you have lots > of > > > > tasks > > > > > being scheduled on your celery cluster/kubernetes cluster and since > > the > > > > > resource usage of each task is heterogenous you don't know exactly > > how > > > > many > > > > > running tasks you can tolerate at once. If instead you can say "I > > only > > > > want > > > > > tasks to be launched while I have less than N tasks in the launched > > > > state" > > > > > you get some adaptive backpressure. > > > > > > > > > > While we have workarounds described above for the kubernetes > > executor, > > > > how > > > > > do people feel about introducing a launched state into airflow so > we > > > > don't > > > > > need the workarounds? I think there are benefits to be gained for > all > > > the > > > > > executors. > > > > > > > > > > On Sun, Nov 26, 2017 at 1:46 AM, Bolke de Bruin <[email protected] > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Daniel, > > > > > > > > > > > > (BTW: I do think this discussion is better to have at the > > > mailinglist, > > > > > > more people might want to chime in and offer valuable opinions) > > > > > > > > > > > > Jumping right in: I am wondering if are you not duplicating the > > > > “queued” > > > > > > logic for (a.o) pools. Introducing LAUNCHED with the meaning > > attached > > > > to > > > > > > it that you describe, would mean that we have a second place > where > > we > > > > > > handle back pressure. > > > > > > > > > > > > Isn’t there a way to ask the k8s cluster how many tasks it has > > > pending > > > > > and > > > > > > just to execute any queued tasks when it crosses a certain > > threshold? > > > > > Have > > > > > > a look a base_executor where it is handling slots and queued > tasks. > > > > > > > > > > > > Cheers > > > > > > Bolke > > > > > > > > > > > > > > > > > > Verstuurd vanaf mijn iPad > > > > > > > > > > > > Op 15 nov. 2017 om 01:39 heeft Daniel Imberman < > > > > > [email protected]> > > > > > > het volgende geschreven: > > > > > > > > > > > > Hi Bolke and Dan! > > > > > > > > > > > > I had a quick question WRT the launched state ( > > > > > > https://github.com/apache/incubator-airflow/blob/master/air > > > > > > flow/utils/state.py#L32). > > > > > > > > > > > > We are handling the issue of throttling the executor when the k8s > > > > cluster > > > > > > has more than 5 pending tasks (which usually means that the > cluster > > > is > > > > > > under a lot of strain), and one thought we had WRT crash safety > was > > > to > > > > > use > > > > > > a "LAUNCHED" state for pods that have been submitted but are not > > > > running > > > > > > yet. > > > > > > > > > > > > With the launched state currently being TBD, I was wondering if > > there > > > > was > > > > > > any reason you guys would not want this state? There are other > > > > > workarounds > > > > > > we can do, but we wanted to check in with you guys first. > > > > > > > > > > > > Thanks! > > > > > > > > > > > > Daniel > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
