See reset_state_for_orphaned_tasks in jobs.py On Thu, Nov 30, 2017 at 11:17 AM, Alex Guziel <[email protected]> wrote:
> Right now the scheduler re-launches all QUEUED tasks on restart (there are > safeguards for duplicates). > > On Thu, Nov 30, 2017 at 11:13 AM, Grant Nicholas <grantnicholas2015@u. > northwestern.edu> wrote: > >> @Alex >> I agree setting the RUNNING state immediately when `airflow run` starts up >> would be useful on its own, but it doesn't solve all the problems. What >> happens if you have a task in the QUEUED state (that may or may not have >> been launched) and your scheduler crashes. What does the scheduler do on >> startup, does it launch all QUEUED tasks again (knowing that there may be >> duplicate tasks) or does it not launch the QUEUED tasks again (knowing >> that >> the task may be stuck in the QUEUED state forever). Right now, airflow >> does >> the latter which I think is not correct, as you can potentially have tasks >> stuck in the QUEUED state forever. >> >> Using a LAUNCHED state would explicitly keep track of whether tasks were >> submitted for execution or not. At that point it's up to your messaging >> system/queueing system/etc to be crash safe, and that is something you get >> for free with kubernetes and it's something you can tune with celery >> persistent queues. >> >> Note: Another option is to NOT introduce a new state but have airflow >> launch QUEUED tasks again on startup of the executor. This would mean that >> we may launch duplicate tasks, but this should not be an issue since we >> have built in protections on worker startup to avoid having two RUNNING >> task instances at once. >> >> >> >> On Thu, Nov 30, 2017 at 12:40 PM, Alex Guziel < >> [email protected]> wrote: >> >> > I think the more sensible thing here is to just to set the state to >> RUNNING >> > immediately in the airflow run process. I don't think the distinction >> > between launched and running adds much value. >> > >> > On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman < >> > [email protected] >> > > wrote: >> > >> > > @Alex >> > > >> > > That could potentially work since if you have the same task launched >> > twice >> > > then the second time would die due to the "already running >> dependency". >> > > Still less ideal than not launching that task at all since it still >> > allows >> > > for race conditions. @grant thoughts on this? >> > > >> > > On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel <[email protected]. >> > > invalid> >> > > wrote: >> > > >> > > > It might be good enough to have RUNNING set immediately on the >> process >> > > run >> > > > and not being dependent on the dag file being parsed. It is annoying >> > here >> > > > too when dags parse on the scheduler but not the worker, since >> queued >> > > tasks >> > > > that don't heartbeat will not get retried, while running tasks will. >> > > > >> > > > On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas < >> > > > [email protected]> wrote: >> > > > >> > > > > ---Opening up this conversation to the whole mailing list, as >> > suggested >> > > > by >> > > > > Bolke--- >> > > > > >> > > > > >> > > > > A "launched" state has been suggested in the past (see here >> > > > > <https://github.com/apache/incubator-airflow/blob/master/ >> > > > > airflow/utils/state.py#L31>) >> > > > > but never implemented for reasons unknown to us. Does anyone have >> > more >> > > > > details about why? >> > > > > >> > > > > There are two big reasons why adding a new "launched" state to >> > airflow >> > > > > would be useful: >> > > > > >> > > > > 1. A "launched" state would be useful for crash safety of the >> > > scheduler. >> > > > If >> > > > > the scheduler crashes in between the scheduler launching the task >> and >> > > the >> > > > > task process starting up then we lose information about whether >> that >> > > task >> > > > > was launched or not. By moving the state of the task to "launched" >> > when >> > > > it >> > > > > is sent off to celery/dask/kubernetes/etc, when crashes happen you >> > know >> > > > > whether you have to relaunch the task or not. >> > > > > >> > > > > To workaround this issue, on startup of the kubernetes executor we >> > > query >> > > > > all "queued" tasks and if there is not a matching kubernetes pod >> for >> > > that >> > > > > task then we set the task state to "None" so it is rescheduled. >> See >> > > here >> > > > > <https://github.com/bloomberg/airflow/blob/airflow- >> > > > > >> > > > kubernetes-executor/airflow/contrib/executors/kubernetes_ >> > > executor.py#L400> >> > > > > for >> > > > > details if you are curious. While this works for the kubernetes >> > > executor, >> > > > > other executors can't easily introspect launched tasks and this >> means >> > > the >> > > > > celery executor (afaik) is not crash safe. >> > > > > >> > > > > 2. A "launched" state would allow for dynamic backpressure of >> tasks, >> > > not >> > > > > just static backpressure. Right now, airflow only allows static >> > > > > backpressure (`parallelism` config).This means you must statically >> > say >> > > I >> > > > > only want to allow N running tasks at once. Imagine you have lots >> of >> > > > tasks >> > > > > being scheduled on your celery cluster/kubernetes cluster and >> since >> > the >> > > > > resource usage of each task is heterogenous you don't know exactly >> > how >> > > > many >> > > > > running tasks you can tolerate at once. If instead you can say "I >> > only >> > > > want >> > > > > tasks to be launched while I have less than N tasks in the >> launched >> > > > state" >> > > > > you get some adaptive backpressure. >> > > > > >> > > > > While we have workarounds described above for the kubernetes >> > executor, >> > > > how >> > > > > do people feel about introducing a launched state into airflow so >> we >> > > > don't >> > > > > need the workarounds? I think there are benefits to be gained for >> all >> > > the >> > > > > executors. >> > > > > >> > > > > On Sun, Nov 26, 2017 at 1:46 AM, Bolke de Bruin < >> [email protected]> >> > > > wrote: >> > > > > >> > > > > > >> > > > > > Hi Daniel, >> > > > > > >> > > > > > (BTW: I do think this discussion is better to have at the >> > > mailinglist, >> > > > > > more people might want to chime in and offer valuable opinions) >> > > > > > >> > > > > > Jumping right in: I am wondering if are you not duplicating the >> > > > “queued” >> > > > > > logic for (a.o) pools. Introducing LAUNCHED with the meaning >> > attached >> > > > to >> > > > > > it that you describe, would mean that we have a second place >> where >> > we >> > > > > > handle back pressure. >> > > > > > >> > > > > > Isn’t there a way to ask the k8s cluster how many tasks it has >> > > pending >> > > > > and >> > > > > > just to execute any queued tasks when it crosses a certain >> > threshold? >> > > > > Have >> > > > > > a look a base_executor where it is handling slots and queued >> tasks. >> > > > > > >> > > > > > Cheers >> > > > > > Bolke >> > > > > > >> > > > > > >> > > > > > Verstuurd vanaf mijn iPad >> > > > > > >> > > > > > Op 15 nov. 2017 om 01:39 heeft Daniel Imberman < >> > > > > [email protected]> >> > > > > > het volgende geschreven: >> > > > > > >> > > > > > Hi Bolke and Dan! >> > > > > > >> > > > > > I had a quick question WRT the launched state ( >> > > > > > https://github.com/apache/incubator-airflow/blob/master/air >> > > > > > flow/utils/state.py#L32). >> > > > > > >> > > > > > We are handling the issue of throttling the executor when the >> k8s >> > > > cluster >> > > > > > has more than 5 pending tasks (which usually means that the >> cluster >> > > is >> > > > > > under a lot of strain), and one thought we had WRT crash safety >> was >> > > to >> > > > > use >> > > > > > a "LAUNCHED" state for pods that have been submitted but are not >> > > > running >> > > > > > yet. >> > > > > > >> > > > > > With the launched state currently being TBD, I was wondering if >> > there >> > > > was >> > > > > > any reason you guys would not want this state? There are other >> > > > > workarounds >> > > > > > we can do, but we wanted to check in with you guys first. >> > > > > > >> > > > > > Thanks! >> > > > > > >> > > > > > Daniel >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >
