I opened a PR addressing the issue I found above, please take a look when you get a chance:
https://github.com/apache/incubator-airflow/pull/2843 I could not find a reason why we would want to exclude backfilled and externally triggered dagruns from the "reset orphaned task instances" check so I removed the special casing. (Anyone with expertise in this area, please let me know if there is a reason). This means that all dagruns should rightfully be crash safe again after this PR. On Fri, Dec 1, 2017 at 11:57 AM, Maxime Beauchemin < [email protected]> wrote: > Oh Alex I didn't know there was a task instance audit feature already. > Curious to learn about it, do you have pointer or a short description? > > Max > > On Fri, Dec 1, 2017 at 9:26 AM, Grant Nicholas < > [email protected]> wrote: > > > @Alex just tested this and found the issue. It only resets the state of > > tasks for dagruns with* "external_trigger=False"*. You can see where this > > is explicitly specified in the query here: > > https://github.com/apache/incubator-airflow/blob/master/ > > airflow/jobs.py#L242 > > > > Is this intended behavior? I can say for us, this caused confusion > because > > the dagruns created through the UI default to "external_trigger=True" and > > dagruns created through the CLI with "airflow trigger_dag {dag_id} -r > > {run_id}" also default to "external_trigger=True". This means that dags > > that are manually triggered are *NOT *crash safe by default. > > > > Is there a reason why we would only want this behavior for > > "external_trigger=False" dags? > > > > On Thu, Nov 30, 2017 at 1:18 PM, Alex Guziel <[email protected]. > > invalid > > > wrote: > > > > > See reset_state_for_orphaned_tasks in jobs.py > > > > > > On Thu, Nov 30, 2017 at 11:17 AM, Alex Guziel <[email protected]> > > > wrote: > > > > > > > Right now the scheduler re-launches all QUEUED tasks on restart > (there > > > are > > > > safeguards for duplicates). > > > > > > > > On Thu, Nov 30, 2017 at 11:13 AM, Grant Nicholas > <grantnicholas2015@u. > > > > northwestern.edu> wrote: > > > > > > > >> @Alex > > > >> I agree setting the RUNNING state immediately when `airflow run` > > starts > > > up > > > >> would be useful on its own, but it doesn't solve all the problems. > > What > > > >> happens if you have a task in the QUEUED state (that may or may not > > have > > > >> been launched) and your scheduler crashes. What does the scheduler > do > > on > > > >> startup, does it launch all QUEUED tasks again (knowing that there > may > > > be > > > >> duplicate tasks) or does it not launch the QUEUED tasks again > (knowing > > > >> that > > > >> the task may be stuck in the QUEUED state forever). Right now, > airflow > > > >> does > > > >> the latter which I think is not correct, as you can potentially have > > > tasks > > > >> stuck in the QUEUED state forever. > > > >> > > > >> Using a LAUNCHED state would explicitly keep track of whether tasks > > were > > > >> submitted for execution or not. At that point it's up to your > > messaging > > > >> system/queueing system/etc to be crash safe, and that is something > you > > > get > > > >> for free with kubernetes and it's something you can tune with celery > > > >> persistent queues. > > > >> > > > >> Note: Another option is to NOT introduce a new state but have > airflow > > > >> launch QUEUED tasks again on startup of the executor. This would > mean > > > that > > > >> we may launch duplicate tasks, but this should not be an issue since > > we > > > >> have built in protections on worker startup to avoid having two > > RUNNING > > > >> task instances at once. > > > >> > > > >> > > > >> > > > >> On Thu, Nov 30, 2017 at 12:40 PM, Alex Guziel < > > > >> [email protected]> wrote: > > > >> > > > >> > I think the more sensible thing here is to just to set the state > to > > > >> RUNNING > > > >> > immediately in the airflow run process. I don't think the > > distinction > > > >> > between launched and running adds much value. > > > >> > > > > >> > On Thu, Nov 30, 2017 at 10:36 AM, Daniel Imberman < > > > >> > [email protected] > > > >> > > wrote: > > > >> > > > > >> > > @Alex > > > >> > > > > > >> > > That could potentially work since if you have the same task > > launched > > > >> > twice > > > >> > > then the second time would die due to the "already running > > > >> dependency". > > > >> > > Still less ideal than not launching that task at all since it > > still > > > >> > allows > > > >> > > for race conditions. @grant thoughts on this? > > > >> > > > > > >> > > On Wed, Nov 29, 2017 at 11:00 AM Alex Guziel < > > > [email protected]. > > > >> > > invalid> > > > >> > > wrote: > > > >> > > > > > >> > > > It might be good enough to have RUNNING set immediately on the > > > >> process > > > >> > > run > > > >> > > > and not being dependent on the dag file being parsed. It is > > > annoying > > > >> > here > > > >> > > > too when dags parse on the scheduler but not the worker, since > > > >> queued > > > >> > > tasks > > > >> > > > that don't heartbeat will not get retried, while running tasks > > > will. > > > >> > > > > > > >> > > > On Wed, Nov 29, 2017 at 10:04 AM, Grant Nicholas < > > > >> > > > [email protected]> wrote: > > > >> > > > > > > >> > > > > ---Opening up this conversation to the whole mailing list, > as > > > >> > suggested > > > >> > > > by > > > >> > > > > Bolke--- > > > >> > > > > > > > >> > > > > > > > >> > > > > A "launched" state has been suggested in the past (see here > > > >> > > > > <https://github.com/apache/incubator-airflow/blob/master/ > > > >> > > > > airflow/utils/state.py#L31>) > > > >> > > > > but never implemented for reasons unknown to us. Does anyone > > > have > > > >> > more > > > >> > > > > details about why? > > > >> > > > > > > > >> > > > > There are two big reasons why adding a new "launched" state > to > > > >> > airflow > > > >> > > > > would be useful: > > > >> > > > > > > > >> > > > > 1. A "launched" state would be useful for crash safety of > the > > > >> > > scheduler. > > > >> > > > If > > > >> > > > > the scheduler crashes in between the scheduler launching the > > > task > > > >> and > > > >> > > the > > > >> > > > > task process starting up then we lose information about > > whether > > > >> that > > > >> > > task > > > >> > > > > was launched or not. By moving the state of the task to > > > "launched" > > > >> > when > > > >> > > > it > > > >> > > > > is sent off to celery/dask/kubernetes/etc, when crashes > happen > > > you > > > >> > know > > > >> > > > > whether you have to relaunch the task or not. > > > >> > > > > > > > >> > > > > To workaround this issue, on startup of the kubernetes > > executor > > > we > > > >> > > query > > > >> > > > > all "queued" tasks and if there is not a matching kubernetes > > pod > > > >> for > > > >> > > that > > > >> > > > > task then we set the task state to "None" so it is > > rescheduled. > > > >> See > > > >> > > here > > > >> > > > > <https://github.com/bloomberg/airflow/blob/airflow- > > > >> > > > > > > > >> > > > kubernetes-executor/airflow/contrib/executors/kubernetes_ > > > >> > > executor.py#L400> > > > >> > > > > for > > > >> > > > > details if you are curious. While this works for the > > kubernetes > > > >> > > executor, > > > >> > > > > other executors can't easily introspect launched tasks and > > this > > > >> means > > > >> > > the > > > >> > > > > celery executor (afaik) is not crash safe. > > > >> > > > > > > > >> > > > > 2. A "launched" state would allow for dynamic backpressure > of > > > >> tasks, > > > >> > > not > > > >> > > > > just static backpressure. Right now, airflow only allows > > static > > > >> > > > > backpressure (`parallelism` config).This means you must > > > statically > > > >> > say > > > >> > > I > > > >> > > > > only want to allow N running tasks at once. Imagine you have > > > lots > > > >> of > > > >> > > > tasks > > > >> > > > > being scheduled on your celery cluster/kubernetes cluster > and > > > >> since > > > >> > the > > > >> > > > > resource usage of each task is heterogenous you don't know > > > exactly > > > >> > how > > > >> > > > many > > > >> > > > > running tasks you can tolerate at once. If instead you can > say > > > "I > > > >> > only > > > >> > > > want > > > >> > > > > tasks to be launched while I have less than N tasks in the > > > >> launched > > > >> > > > state" > > > >> > > > > you get some adaptive backpressure. > > > >> > > > > > > > >> > > > > While we have workarounds described above for the kubernetes > > > >> > executor, > > > >> > > > how > > > >> > > > > do people feel about introducing a launched state into > airflow > > > so > > > >> we > > > >> > > > don't > > > >> > > > > need the workarounds? I think there are benefits to be > gained > > > for > > > >> all > > > >> > > the > > > >> > > > > executors. > > > >> > > > > > > > >> > > > > On Sun, Nov 26, 2017 at 1:46 AM, Bolke de Bruin < > > > >> [email protected]> > > > >> > > > wrote: > > > >> > > > > > > > >> > > > > > > > > >> > > > > > Hi Daniel, > > > >> > > > > > > > > >> > > > > > (BTW: I do think this discussion is better to have at the > > > >> > > mailinglist, > > > >> > > > > > more people might want to chime in and offer valuable > > > opinions) > > > >> > > > > > > > > >> > > > > > Jumping right in: I am wondering if are you not > duplicating > > > the > > > >> > > > “queued” > > > >> > > > > > logic for (a.o) pools. Introducing LAUNCHED with the > meaning > > > >> > attached > > > >> > > > to > > > >> > > > > > it that you describe, would mean that we have a second > place > > > >> where > > > >> > we > > > >> > > > > > handle back pressure. > > > >> > > > > > > > > >> > > > > > Isn’t there a way to ask the k8s cluster how many tasks it > > has > > > >> > > pending > > > >> > > > > and > > > >> > > > > > just to execute any queued tasks when it crosses a certain > > > >> > threshold? > > > >> > > > > Have > > > >> > > > > > a look a base_executor where it is handling slots and > queued > > > >> tasks. > > > >> > > > > > > > > >> > > > > > Cheers > > > >> > > > > > Bolke > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > Verstuurd vanaf mijn iPad > > > >> > > > > > > > > >> > > > > > Op 15 nov. 2017 om 01:39 heeft Daniel Imberman < > > > >> > > > > [email protected]> > > > >> > > > > > het volgende geschreven: > > > >> > > > > > > > > >> > > > > > Hi Bolke and Dan! > > > >> > > > > > > > > >> > > > > > I had a quick question WRT the launched state ( > > > >> > > > > > https://github.com/apache/incubator-airflow/blob/master/ > air > > > >> > > > > > flow/utils/state.py#L32). > > > >> > > > > > > > > >> > > > > > We are handling the issue of throttling the executor when > > the > > > >> k8s > > > >> > > > cluster > > > >> > > > > > has more than 5 pending tasks (which usually means that > the > > > >> cluster > > > >> > > is > > > >> > > > > > under a lot of strain), and one thought we had WRT crash > > > safety > > > >> was > > > >> > > to > > > >> > > > > use > > > >> > > > > > a "LAUNCHED" state for pods that have been submitted but > are > > > not > > > >> > > > running > > > >> > > > > > yet. > > > >> > > > > > > > > >> > > > > > With the launched state currently being TBD, I was > wondering > > > if > > > >> > there > > > >> > > > was > > > >> > > > > > any reason you guys would not want this state? There are > > other > > > >> > > > > workarounds > > > >> > > > > > we can do, but we wanted to check in with you guys first. > > > >> > > > > > > > > >> > > > > > Thanks! > > > >> > > > > > > > > >> > > > > > Daniel > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > > > > > > >
