Thanks, I see why that should work, I just know that from testing this myself that I had to manually clear out old QUEUED task instances to get them to reschedule. I'll do some more testing to confirm, it's totally possible I did something wrong in our test suite setup.
On Thu, Nov 30, 2017 at 1:18 PM, Alex Guziel <[email protected] > wrote: > See reset_state_for_orphaned_tasks in jobs.py > > On Thu, Nov 30, 2017 at 11:17 AM, Alex Guziel <[email protected]> > wrote: > > > Right now the scheduler re-launches all QUEUED tasks on restart (there > are > > safeguards for duplicates). > > > > On Thu, Nov 30, 2017 at 11:13 AM, Grant Nicholas <grantnicholas2015@u. > > northwestern.edu> wrote: > > > >> @Alex > >> I agree setting the RUNNING state immediately when `airflow run` starts > up > >> would be useful on its own, but it doesn't solve all the problems. What > >> happens if you have a task in the QUEUED state (that may or may not have > >> been launched) and your scheduler crashes. What does the scheduler do on > >> startup, does it launch all QUEUED tasks again (knowing that there may > be > >> duplicate tasks) or does it not launch the QUEUED tasks again (knowing > >> that > >> the task may be stuck in the QUEUED state forever). Right now, airflow > >> does > >> the latter which I think is not correct, as you can potentially have > tasks > >> stuck in the QUEUED state forever. > >> > >> Using a LAUNCHED state would explicitly keep track of whether tasks were > >> submitted for execution or not. At that point it's up to your messaging > >> system/queueing system/etc to be crash safe, and that is something you > get > >> for free with kubernetes and it's something you can tune with celery > >> persistent queues. > >> > >> Note: Another option is to NOT introduce a new state but have airflow > >> launch QUEUED tasks again on startup of the executor. This would mean > that > >> we may launch duplicate tasks, but this should not be an issue since we > >> have built in protections on worker startup to avoid having two RUNNING > >> task instances at once. > >> > >> > >> > >> On Thu, Nov 30, 2017 at 12:40 PM, Alex Guziel < > >> [email protected]> wrote: > >> > >> > I think the more sensible thing here is to just to set the state to > >> RUNNING > >> > immediately in the airflow run process. I don't think the distinction > >> > between launched and running adds much value. > >> > > >> > On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman < > >> > [email protected] > >> > > wrote: > >> > > >> > > @Alex > >> > > > >> > > That could potentially work since if you have the same task launched > >> > twice > >> > > then the second time would die due to the "already running > >> dependency". > >> > > Still less ideal than not launching that task at all since it still > >> > allows > >> > > for race conditions. @grant thoughts on this? > >> > > > >> > > On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel < > [email protected]. > >> > > invalid> > >> > > wrote: > >> > > > >> > > > It might be good enough to have RUNNING set immediately on the > >> process > >> > > run > >> > > > and not being dependent on the dag file being parsed. It is > annoying > >> > here > >> > > > too when dags parse on the scheduler but not the worker, since > >> queued > >> > > tasks > >> > > > that don't heartbeat will not get retried, while running tasks > will. > >> > > > > >> > > > On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas < > >> > > > [email protected]> wrote: > >> > > > > >> > > > > ---Opening up this conversation to the whole mailing list, as > >> > suggested > >> > > > by > >> > > > > Bolke--- > >> > > > > > >> > > > > > >> > > > > A "launched" state has been suggested in the past (see here > >> > > > > <https://github.com/apache/incubator-airflow/blob/master/ > >> > > > > airflow/utils/state.py#L31>) > >> > > > > but never implemented for reasons unknown to us. Does anyone > have > >> > more > >> > > > > details about why? > >> > > > > > >> > > > > There are two big reasons why adding a new "launched" state to > >> > airflow > >> > > > > would be useful: > >> > > > > > >> > > > > 1. A "launched" state would be useful for crash safety of the > >> > > scheduler. > >> > > > If > >> > > > > the scheduler crashes in between the scheduler launching the > task > >> and > >> > > the > >> > > > > task process starting up then we lose information about whether > >> that > >> > > task > >> > > > > was launched or not. By moving the state of the task to > "launched" > >> > when > >> > > > it > >> > > > > is sent off to celery/dask/kubernetes/etc, when crashes happen > you > >> > know > >> > > > > whether you have to relaunch the task or not. > >> > > > > > >> > > > > To workaround this issue, on startup of the kubernetes executor > we > >> > > query > >> > > > > all "queued" tasks and if there is not a matching kubernetes pod > >> for > >> > > that > >> > > > > task then we set the task state to "None" so it is rescheduled. > >> See > >> > > here > >> > > > > <https://github.com/bloomberg/airflow/blob/airflow- > >> > > > > > >> > > > kubernetes-executor/airflow/contrib/executors/kubernetes_ > >> > > executor.py#L400> > >> > > > > for > >> > > > > details if you are curious. While this works for the kubernetes > >> > > executor, > >> > > > > other executors can't easily introspect launched tasks and this > >> means > >> > > the > >> > > > > celery executor (afaik) is not crash safe. > >> > > > > > >> > > > > 2. A "launched" state would allow for dynamic backpressure of > >> tasks, > >> > > not > >> > > > > just static backpressure. Right now, airflow only allows static > >> > > > > backpressure (`parallelism` config).This means you must > statically > >> > say > >> > > I > >> > > > > only want to allow N running tasks at once. Imagine you have > lots > >> of > >> > > > tasks > >> > > > > being scheduled on your celery cluster/kubernetes cluster and > >> since > >> > the > >> > > > > resource usage of each task is heterogenous you don't know > exactly > >> > how > >> > > > many > >> > > > > running tasks you can tolerate at once. If instead you can say > "I > >> > only > >> > > > want > >> > > > > tasks to be launched while I have less than N tasks in the > >> launched > >> > > > state" > >> > > > > you get some adaptive backpressure. > >> > > > > > >> > > > > While we have workarounds described above for the kubernetes > >> > executor, > >> > > > how > >> > > > > do people feel about introducing a launched state into airflow > so > >> we > >> > > > don't > >> > > > > need the workarounds? I think there are benefits to be gained > for > >> all > >> > > the > >> > > > > executors. > >> > > > > > >> > > > > On Sun, Nov 26, 2017 at 1:46 AM, Bolke de Bruin < > >> [email protected]> > >> > > > wrote: > >> > > > > > >> > > > > > > >> > > > > > Hi Daniel, > >> > > > > > > >> > > > > > (BTW: I do think this discussion is better to have at the > >> > > mailinglist, > >> > > > > > more people might want to chime in and offer valuable > opinions) > >> > > > > > > >> > > > > > Jumping right in: I am wondering if are you not duplicating > the > >> > > > “queued” > >> > > > > > logic for (a.o) pools. Introducing LAUNCHED with the meaning > >> > attached > >> > > > to > >> > > > > > it that you describe, would mean that we have a second place > >> where > >> > we > >> > > > > > handle back pressure. > >> > > > > > > >> > > > > > Isn’t there a way to ask the k8s cluster how many tasks it has > >> > > pending > >> > > > > and > >> > > > > > just to execute any queued tasks when it crosses a certain > >> > threshold? > >> > > > > Have > >> > > > > > a look a base_executor where it is handling slots and queued > >> tasks. > >> > > > > > > >> > > > > > Cheers > >> > > > > > Bolke > >> > > > > > > >> > > > > > > >> > > > > > Verstuurd vanaf mijn iPad > >> > > > > > > >> > > > > > Op 15 nov. 2017 om 01:39 heeft Daniel Imberman < > >> > > > > [email protected]> > >> > > > > > het volgende geschreven: > >> > > > > > > >> > > > > > Hi Bolke and Dan! > >> > > > > > > >> > > > > > I had a quick question WRT the launched state ( > >> > > > > > https://github.com/apache/incubator-airflow/blob/master/air > >> > > > > > flow/utils/state.py#L32). > >> > > > > > > >> > > > > > We are handling the issue of throttling the executor when the > >> k8s > >> > > > cluster > >> > > > > > has more than 5 pending tasks (which usually means that the > >> cluster > >> > > is > >> > > > > > under a lot of strain), and one thought we had WRT crash > safety > >> was > >> > > to > >> > > > > use > >> > > > > > a "LAUNCHED" state for pods that have been submitted but are > not > >> > > > running > >> > > > > > yet. > >> > > > > > > >> > > > > > With the launched state currently being TBD, I was wondering > if > >> > there > >> > > > was > >> > > > > > any reason you guys would not want this state? There are other > >> > > > > workarounds > >> > > > > > we can do, but we wanted to check in with you guys first. > >> > > > > > > >> > > > > > Thanks! > >> > > > > > > >> > > > > > Daniel > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > > > >
