Thanks Andrew for the detailed responses and Jarek for the great questions. I really like the AIP and love the direction.
Vikram On Thu, Apr 29, 2021 at 10:27 AM Andrew Godwin <andrew.god...@astronomer.io.invalid> wrote: > Yup, logging and metrics are something I didn't put in the prototype but I > consider an essential part of feature "polishing" (along with docs and > tests, naturally). > > On the trigger timeout front, I did some investigation today, and it seems > that unfortunately we cannot get at the actual place we'd need to detect > timeouts per-trigger, as it's buried in asyncio with no override: > https://github.com/python/cpython/blob/e4fe303b8cca525e97d44e80c7e53bdab9dd9187/Lib/asyncio/base_events.py#L1876 > > So, instead, I can construct an asyncio task that runs at all times in the > triggerer at 0.2 second intervals and notices if it gets blocked for too > long, and probably then tie that into a SIGALRM based watchdog. It won't be > able to tell exactly which trigger was blocking, but it will at least > detect that something was, which I think is better than nothing (we can > then instruct users to set PYTHONASYNCIODEBUG=1 to see what's actually > doing it) > > Andrew > > On Thu, Apr 29, 2021 at 11:23 AM Jarek Potiuk <ja...@potiuk.com> wrote: > >> Very clear. Thanks! that explains all my questions. Unless others have >> more questions, - I am ok to vote on it, as long as there are no more >> questions - the "trigger_run_timeout" is for me a good-enough >> "escape_hatch" and as long as we add a bit of logs and metrics (not >> needed to be detailed in the AIP) to be able to efficiently monitor >> triggers - I am all in:). happy to help with >> implementation/review/testing :) >> >> On Thu, Apr 29, 2021 at 7:13 PM Andrew Godwin >> <andrew.god...@astronomer.io.invalid> wrote: >> > >> > You're totally right, I did miss that - sorry! >> > >> > The kwargs are stored in a new "next_kwargs" column for the >> TaskInstance - this bit never actually touches the triggerer. There's two >> separate things going on: >> > >> > - Within a worker/local executor, a running operator raises >> TaskDeferred, an exception saying which trigger it wishes to defer on, >> along with the method name and kwargs to come back to. At that point, the >> task execute wrapper catches the exception, serializes the requested >> trigger into the new "triggers" table, and serializes the method name and >> kwargs to resume on to the TaskInstance table ("next_method" and >> "next_kwargs"), and then puts the task into a new state "deferred". >> > >> > - The Triggerer is running entirely separately from all this, and >> watches the triggers table. When it sees a new trigger requested, it loads >> it up and starts it running in the async loop. When a trigger fires, the >> triggerer looks up which task instances were depending on it, and then >> marks them for re-scheduling, adding the event payload into their >> next_kwargs column as a kwarg "event". If nothing else depended on that >> trigger, it then gets cleaned up. >> > >> > - Once the task is marked as "scheduled" again, the executor comes >> around and starts it again like normal, only this time it honours the >> values of next_method and next_kwargs and uses that as the entrypoint to >> the operator rather than "execute()". >> > >> > Hopefully that makes it clearer, those points are all a bit separate in >> the AIP. >> > >> > Andrew >> > >> > On Thu, Apr 29, 2021 at 10:12 AM Jarek Potiuk <ja...@potiuk.com> wrote: >> >> >> >> Hey Andrew, >> >> >> >> Possibly you missed that one :) >> >> >> >> And that leads me to another question, actually very important. I >> >> think (correct me please if I am wrong) it is missing in the current >> >> specs. Where the kwargs are going to be stored while the task is >> >> deferred? Are they only stored in memory of the triggerer? Or in the >> >> DB? And what are the consequences? What happens when the tasks are >> >> triggered and the triggerer is restarted? How do we recover? Does it >> >> mean that all the tasks that are deferred will have to be restarted? >> >> How? Could you please elaborate a bit on that (and I think also it >> >> needs a chapter in the specification). >> >> >> >> >> >> J. >> >> >> >> On Thu, Apr 29, 2021 at 6:00 PM Andrew Godwin >> >> <andrew.god...@astronomer.io.invalid> wrote: >> >> > >> >> > Yup, the triggerer HA is called out in the AIP, along with a future >> space left to make it sharded (I'll design this into its API): >> >> > >> >> > "The triggerer is designed to be run in a highly-available (HA) >> manner; it should coexist with other instances of itself, running the same >> triggers, and deduplicating events when they fire off. In future, it should >> also be designed to run in a sharded/partitioned architecture, where the >> set of triggers is divided across replica sets of triggerers." >> >> > >> >> > I think a trigger_run_timeout is a very sensible thing to ship and I >> will add it into the AIP, and I will propose we set a default of 2-3 >> seconds so we catch the most egregious cases (ideally it would be sub-500ms >> if you wanted to catch everything). >> >> > >> >> > Andrew >> >> > >> >> > On Thu, Apr 29, 2021 at 7:24 AM Kaxil Naik <kaxiln...@gmail.com> >> wrote: >> >> >> >> >> >> Similar to trigger_timeout (which is now in the AIP) we could have >> trigger_run_timeout to take care of that case. >> >> >> >> >> >> Regards, >> >> >> Kaxil >> >> >> >> >> >> On Thu, Apr 29, 2021 at 1:51 PM Ash Berlin-Taylor <a...@apache.org> >> wrote: >> >> >>> >> >> >>> On point one: The triggerer is HA, so you can run multiple, so the >> concern about blocking is valid, but it wouldn't block _all_ triggers. >> >> >>> >> >> >>> Or I thought that was the plan, but the AIP doesn't mention that, >> but Andrew said that _somewhere_ >> >> >>> >> >> >>> We should explicitly mention this is the case. >> >> >>> >> >> >>> -ash >> >> >>> >> >> >>> On 29 April 2021 09:45:47 BST, Jarek Potiuk <ja...@potiuk.com> >> wrote: >> >> >>>> >> >> >>>> Hey anyone from, the BIG users, any comments here ? Any thoughts >> about >> >> >>>> the operation side of this? I am not sure if my worries are too >> >> >>>> "excessive", so I would love to hear your thoughts here.. I think >> it >> >> >>>> would be great to unblock Andrew so that he can carry on with the >> AIP >> >> >>>> (which I think is great feature BTW). >> >> >>>> >> >> >>>> I think this boils down to two questions: >> >> >>>> >> >> >>>> 1) Would you be worried by having a single thread running all such >> >> >>>> triggers for the whole installation where users could potentially >> >> >>>> develop their own "blocking" triggers by mistake and block others >> ? >> >> >>>> 2) What kind of monitoring/detection/prevention would you like to >> see >> >> >>>> to get it "under control" :) >> >> >>>> >> >> >>>> J, >> >> >>>> >> >> >>>> >> >> >>>> On Tue, Apr 27, 2021 at 9:04 PM Andrew Godwin >> >> >>>> <andrew.god...@astronomer.io.invalid> wrote: >> >> >>>>> >> >> >>>>> >> >> >>>>> Oh totally, I was partially wearing my SRE hat when writing up >> parts of this (hence why HA is built-in from the start), but I'm more used >> to running web workloads than Airflow, so any input from people who've run >> large Airflow installs are welcome. >> >> >>>>> >> >> >>>>>> That will not work I am afraid :). If we open it up for users >> to use, we have to deal with consequences. We have to be prepared for >> people doing all kinds of weird things - at least this is what I've learned >> during the last 2 years of working on Airflow. >> >> >>>>> >> >> >>>>> >> >> >>>>> It's maybe one of the key lessons I've had from 15 years >> writing open source - people will always use your hidden APIs no matter >> what! >> >> >>>>> >> >> >>>>> I think in this case, it's a situation where we just have to >> make it progressively safer as you go up the stack - if you're just using >> Operators you are fine, if you are authoring Operators there's some new >> things to think about if you want to go deferrable, and if you're authoring >> Triggers then you need a bit of async experience. Plus, people who are just >> writing standard non-deferrable operators have nothing to worry about as >> this is purely an additive feature, which I like; letting people opt-in to >> complexity is always nice. >> >> >>>>> >> >> >>>>> Hopefully we can get enough safety guards in that all three of >> these levels will be reasonably accessible without encouraging people to go >> straight to Triggers; it would likely be an improvement over Smart Sensors, >> which as far as I can tell lack a lot of them. >> >> >>>>> >> >> >>>>> Andrew >> >> >>>>> >> >> >>>>> On Tue, Apr 27, 2021 at 12:29 PM Jarek Potiuk <ja...@potiuk.com> >> wrote: >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> Hey Andrew, >> >> >>>>>> >> >> >>>>>> Just don't get me wrong :). I love the idea/AIP proposal. Just >> want to >> >> >>>>>> make sure that from day one we think about operational aspects >> of it. >> >> >>>>>> I think the easier we make it for the operations people, the >> less we >> >> >>>>>> will have to deal with their problems in the devlist/Github >> issues. >> >> >>>>>> >> >> >>>>>> I would love to hear what others think about it - maybe some >> folks >> >> >>>>>> from the devlist who operate Airflow in scale could chime in >> here - we >> >> >>>>>> have some people from AirBnB/Twitter etc... Maybe you could >> state >> >> >>>>>> expectations when it comes to the operational side if you'd >> have 1000s >> >> >>>>>> of Triggers that potentially interfere with each other :) ? >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> On Mon, Apr 26, 2021 at 9:21 PM Andrew Godwin >> >> >>>>>> <andrew.god...@astronomer.io.invalid> wrote: >> >> >>>>>>> >> >> >>>>>>> 1) In general, I'm envisioning Triggers as a thing that are >> generally abstracted away from DAG authors - instead, they would come in a >> provider package (or core Airflow) and so we would be expecting the same >> higher level of quality and testing. >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> I see the intention, but we would have to have pretty >> comprehensive >> >> >>>>>> set of triggers from day one - similar to the different types >> of "rich >> >> >>>>>> intervals" we have in >> >> >>>>>> >> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-39+Richer+scheduler_interval >> . >> >> >>>>>> Maybe a list (groups) of the triggers we would like to have as >> >> >>>>>> "built-ins" would be really helpful? >> >> >>>>>> But even then, I think the "extendability" of this solution is >> what I >> >> >>>>>> like about it. More often than not, users will find out that >> they need >> >> >>>>>> something custom. I think if we describe the interface that the >> >> >>>>>> Trigger should implement and make it a "first-class" citizen - >> >> >>>>>> similarly as all other concepts in Airflow, it is expected >> that users >> >> >>>>>> will override them - Operators, Sensors, even the new "Richer >> >> >>>>>> schedules" are "meant" to be implemented by the users. If they >> are >> >> >>>>>> not, we should not make it public but rather have (and accept) >> only a >> >> >>>>>> fixed set of those - and for that we could just implement an >> Enum of >> >> >>>>>> available Triggers. By providing an interface, we invite our >> users to >> >> >>>>>> implement their own custom Triggers, and I think we need to >> deal with >> >> >>>>>> consequences. I think we have to think about what happens when >> users >> >> >>>>>> start writing their triggers and what "toolbox" we give the >> people >> >> >>>>>> operating Airflow to deal with that. >> >> >>>>>> >> >> >>>>>>> a) I do think it should all be fixed as asyncio, mostly >> because the library support is there and you don't suddenly want to make >> people have to run multiple "flavours" of triggerer process based on how >> many triggers they're using and what runtime loops they demand. If trio >> were more popular, I'd pick that as it's safer and (in my opinion) >> better-designed, but we are unfortunately nowhere near that, and in >> addition this is not something that is impossible to add in at a later >> stage. >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> Asyncio would also be my choice by far so we do not differ >> here :) >> >> >>>>>> >> >> >>>>>> From what I understand, you propose a single event loop to run >> all the >> >> >>>>>> deferred tasks? Am I right? >> >> >>>>>> My point is that multiple event loops or Thread-based async >> running >> >> >>>>>> are also part of asyncio. No problem with that. Asyncio by >> default >> >> >>>>>> uses a single event loop, but there is no problem to use more. >> This >> >> >>>>>> would be quite similar to "queues" we currently have with >> celery >> >> >>>>>> workers. I am not telling we should, but I can see the case >> where this >> >> >>>>>> might be advisable (for example to isolate groups of the >> deferred >> >> >>>>>> tasks so that they do not interfere/delay the other group). >> What do >> >> >>>>>> you think? >> >> >>>>>> >> >> >>>>>>> b) There's definitely some hooks we can use to detect >> long-running triggers, and I'll see if I can grab some of them and >> implement them. At very least, there's the SIGALRM watchdog method, and I >> believe it's possible to put nice timeouts around asyncio tasks, which we >> could use to enforce a max runtime specified in the airflow.cfg file. >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> I agree it will be indeed difficult to prevent people from >> making >> >> >>>>>> mistakes, but we should really think about how we should help >> the >> >> >>>>>> operations people to detect and diagnose them. And I think it >> should >> >> >>>>>> be part of specification: >> >> >>>>>> >> >> >>>>>> 1) what kind of metrics we are logging for the triggers - I >> think we >> >> >>>>>> should gather and publish (using airflow's metrics system) >> some useful >> >> >>>>>> metrics for the operations people (number of >> executions/execution >> >> >>>>>> length, queuing/delay time vs. expectation for some trigger >> like date >> >> >>>>>> trigger) etc. for all triggers from day one. >> >> >>>>>> 2) you mentioned it - maybe we should have Debug mode turned >> on by >> >> >>>>>> default: >> https://docs.python.org/3/library/asyncio-dev.html#debug-mode >> >> >>>>>> . It has some useful features, mainly automated logging of too >> long >> >> >>>>>> running async methods. Not sure what consequences it has >> though. >> >> >>>>>> 3) max execution time would be even nicer indeed >> >> >>>>>> 4) maybe there should be some exposure in the UI/CLI/API on >> what's >> >> >>>>>> going in with triggers? >> >> >>>>>> 5) maybe there should be a way to cancel /disable some >> triggers that >> >> >>>>>> are mis-behaving - using the UI/CLI/API - until the code gets >> fixed >> >> >>>>>> for those ? >> >> >>>>>> >> >> >>>>>> I think we simply need to document those aspects in >> "operations" >> >> >>>>>> chapter of your proposal. I am happy to propose some draft >> changes in >> >> >>>>>> the AIP if you would like to, after we discuss it here. >> >> >>>>>> >> >> >>>>>>> 2) This is why there's no actual _state_ that is persisted - >> instead, you pass the method you want to call next and its keyword >> arguments. Obviously we'll need to be quite clear about this in the docs, >> but I feel this is better than persisting _some_ state. Again, though, this >> is an implementation detail that would likely be hidden inside an Operator >> or Sensor from the average DAG user; I'm not proposing we expose this to >> things like PythonOperator or TaskFlow yet, for the reasons you describe. >> >> >>>>>>> I wish I had a better API to present for Operator authors, but >> with the fundamental fact that the Operator/Task is going to run on >> different machines for different phases of its life, I think having >> explicit "give me this when you revive me" arguments is the best tradeoff >> we can go for. >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> Agree. We cannot do much about it. I think we should just be >> very >> >> >>>>>> clear when we document the life cycle. Maybe we should even >> update the >> >> >>>>>> semantics of pre/post so that "pre_execute()" and >> "post_execute()" >> >> >>>>>> are executed for EVERY execute - including the deferred one >> (also >> >> >>>>>> execute_complete()). This way (in your example) the task >> execution >> >> >>>>>> would look like : pre_execute(), execute(-> throw >> TaskDeferred()), >> >> >>>>>> post_execute() and then on another worker pre_execute(), >> >> >>>>>> execute_complete(), post_execute(). I think that would make >> sense. >> >> >>>>>> What do you think? >> >> >>>>>> >> >> >>>>>>> 3) I do think we should limit the size of the payload, as well >> as the kwargs that pass between deferred phases of the task instance - >> something pretty meaty, like 500KB, would seem reasonable to me. I've also >> run into the problem in the past that if you design a messaging system >> without a limit, people _will_ push the size of the things sent up to >> eyebrow-raisingly-large sizes. >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> Yeah. I think XCom should be our benchmark. Currently we have >> >> >>>>>> MAX_XCOM_SIZE = 49344. Should we use the same? >> >> >>>>>> >> >> >>>>>> And that leads me to another question, actually very >> important. I >> >> >>>>>> think (correct me please if I am wrong) it is missing in the >> current >> >> >>>>>> specs. Where the kwargs are going to be stored while the task >> is >> >> >>>>>> deferred? Are they only stored in memory of the triggerer? Or >> in the >> >> >>>>>> DB? And what are the consequences? What happens when the tasks >> are >> >> >>>>>> triggered and the triggerer is restarted? How do we recover? >> Does it >> >> >>>>>> mean that all the tasks that are deferred will have to be >> restarted? >> >> >>>>>> How? Could you please elaborate a bit on that (and I think >> also it >> >> >>>>>> needs a chapter in the specification). >> >> >>>>>> >> >> >>>>>>> Overall, I think it's important to stress that the average DAG >> author should not even know that Triggers really exist; instead, they >> should just be able to switch Sensors or Operators (e.g. DateTimeSensor -> >> AsyncDateTimeSensor in my prototype) and get the benefits of deferred >> operators with no extra thought required. >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> That will not work I am afraid :). If we open it up for users >> to use, >> >> >>>>>> we have to deal with consequences. We have to be prepared for >> people >> >> >>>>>> doing all kinds of weird things - at least this is what I've >> learned >> >> >>>>>> during the last 2 years of working on Airflow. See the comment >> about. >> >> >>>>>> If we do not want users to implement custom versions of >> triggers we >> >> >>>>>> should use Enums and a closed set of those. If we create an >> API an >> >> >>>>>> interface - people will use it and create their own, no matter >> if we >> >> >>>>>> want or not. >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> J. >> >> >>>> >> >> >>>> >> >> >>>> >> >> >>>> >> >> >>>> -- >> >> >>>> +48 660 796 129 >> >> >> >> >> >> >> >> -- >> >> +48 660 796 129 >> >> >> >> -- >> +48 660 796 129 >> >