I think the more sensible thing here is to just to set the state to RUNNING immediately in the airflow run process. I don't think the distinction between launched and running adds much value.
On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman <[email protected] > wrote: > @Alex > > That could potentially work since if you have the same task launched twice > then the second time would die due to the "already running dependency". > Still less ideal than not launching that task at all since it still allows > for race conditions. @grant thoughts on this? > > On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel <[email protected]. > invalid> > wrote: > > > It might be good enough to have RUNNING set immediately on the process > run > > and not being dependent on the dag file being parsed. It is annoying here > > too when dags parse on the scheduler but not the worker, since queued > tasks > > that don't heartbeat will not get retried, while running tasks will. > > > > On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas < > > [email protected]> wrote: > > > > > ---Opening up this conversation to the whole mailing list, as suggested > > by > > > Bolke--- > > > > > > > > > A "launched" state has been suggested in the past (see here > > > <https://github.com/apache/incubator-airflow/blob/master/ > > > airflow/utils/state.py#L31>) > > > but never implemented for reasons unknown to us. Does anyone have more > > > details about why? > > > > > > There are two big reasons why adding a new "launched" state to airflow > > > would be useful: > > > > > > 1. A "launched" state would be useful for crash safety of the > scheduler. > > If > > > the scheduler crashes in between the scheduler launching the task and > the > > > task process starting up then we lose information about whether that > task > > > was launched or not. By moving the state of the task to "launched" when > > it > > > is sent off to celery/dask/kubernetes/etc, when crashes happen you know > > > whether you have to relaunch the task or not. > > > > > > To workaround this issue, on startup of the kubernetes executor we > query > > > all "queued" tasks and if there is not a matching kubernetes pod for > that > > > task then we set the task state to "None" so it is rescheduled. See > here > > > <https://github.com/bloomberg/airflow/blob/airflow- > > > > > kubernetes-executor/airflow/contrib/executors/kubernetes_ > executor.py#L400> > > > for > > > details if you are curious. While this works for the kubernetes > executor, > > > other executors can't easily introspect launched tasks and this means > the > > > celery executor (afaik) is not crash safe. > > > > > > 2. A "launched" state would allow for dynamic backpressure of tasks, > not > > > just static backpressure. Right now, airflow only allows static > > > backpressure (`parallelism` config).This means you must statically say > I > > > only want to allow N running tasks at once. Imagine you have lots of > > tasks > > > being scheduled on your celery cluster/kubernetes cluster and since the > > > resource usage of each task is heterogenous you don't know exactly how > > many > > > running tasks you can tolerate at once. If instead you can say "I only > > want > > > tasks to be launched while I have less than N tasks in the launched > > state" > > > you get some adaptive backpressure. > > > > > > While we have workarounds described above for the kubernetes executor, > > how > > > do people feel about introducing a launched state into airflow so we > > don't > > > need the workarounds? I think there are benefits to be gained for all > the > > > executors. > > > > > > On Sun, Nov 26, 2017 at 1:46 AM, Bolke de Bruin <[email protected]> > > wrote: > > > > > > > > > > > Hi Daniel, > > > > > > > > (BTW: I do think this discussion is better to have at the > mailinglist, > > > > more people might want to chime in and offer valuable opinions) > > > > > > > > Jumping right in: I am wondering if are you not duplicating the > > “queued” > > > > logic for (a.o) pools. Introducing LAUNCHED with the meaning attached > > to > > > > it that you describe, would mean that we have a second place where we > > > > handle back pressure. > > > > > > > > Isn’t there a way to ask the k8s cluster how many tasks it has > pending > > > and > > > > just to execute any queued tasks when it crosses a certain threshold? > > > Have > > > > a look a base_executor where it is handling slots and queued tasks. > > > > > > > > Cheers > > > > Bolke > > > > > > > > > > > > Verstuurd vanaf mijn iPad > > > > > > > > Op 15 nov. 2017 om 01:39 heeft Daniel Imberman < > > > [email protected]> > > > > het volgende geschreven: > > > > > > > > Hi Bolke and Dan! > > > > > > > > I had a quick question WRT the launched state ( > > > > https://github.com/apache/incubator-airflow/blob/master/air > > > > flow/utils/state.py#L32). > > > > > > > > We are handling the issue of throttling the executor when the k8s > > cluster > > > > has more than 5 pending tasks (which usually means that the cluster > is > > > > under a lot of strain), and one thought we had WRT crash safety was > to > > > use > > > > a "LAUNCHED" state for pods that have been submitted but are not > > running > > > > yet. > > > > > > > > With the launched state currently being TBD, I was wondering if there > > was > > > > any reason you guys would not want this state? There are other > > > workarounds > > > > we can do, but we wanted to check in with you guys first. > > > > > > > > Thanks! > > > > > > > > Daniel > > > > > > > > > > > > > > > > > >
