Very clear. Thanks! that explains all my questions. Unless others have more questions, - I am ok to vote on it, as long as there are no more questions - the "trigger_run_timeout" is for me a good-enough "escape_hatch" and as long as we add a bit of logs and metrics (not needed to be detailed in the AIP) to be able to efficiently monitor triggers - I am all in:). happy to help with implementation/review/testing :)
On Thu, Apr 29, 2021 at 7:13 PM Andrew Godwin <andrew.god...@astronomer.io.invalid> wrote: > > You're totally right, I did miss that - sorry! > > The kwargs are stored in a new "next_kwargs" column for the TaskInstance - > this bit never actually touches the triggerer. There's two separate things > going on: > > - Within a worker/local executor, a running operator raises TaskDeferred, an > exception saying which trigger it wishes to defer on, along with the method > name and kwargs to come back to. At that point, the task execute wrapper > catches the exception, serializes the requested trigger into the new > "triggers" table, and serializes the method name and kwargs to resume on to > the TaskInstance table ("next_method" and "next_kwargs"), and then puts the > task into a new state "deferred". > > - The Triggerer is running entirely separately from all this, and watches the > triggers table. When it sees a new trigger requested, it loads it up and > starts it running in the async loop. When a trigger fires, the triggerer > looks up which task instances were depending on it, and then marks them for > re-scheduling, adding the event payload into their next_kwargs column as a > kwarg "event". If nothing else depended on that trigger, it then gets cleaned > up. > > - Once the task is marked as "scheduled" again, the executor comes around and > starts it again like normal, only this time it honours the values of > next_method and next_kwargs and uses that as the entrypoint to the operator > rather than "execute()". > > Hopefully that makes it clearer, those points are all a bit separate in the > AIP. > > Andrew > > On Thu, Apr 29, 2021 at 10:12 AM Jarek Potiuk <ja...@potiuk.com> wrote: >> >> Hey Andrew, >> >> Possibly you missed that one :) >> >> And that leads me to another question, actually very important. I >> think (correct me please if I am wrong) it is missing in the current >> specs. Where the kwargs are going to be stored while the task is >> deferred? Are they only stored in memory of the triggerer? Or in the >> DB? And what are the consequences? What happens when the tasks are >> triggered and the triggerer is restarted? How do we recover? Does it >> mean that all the tasks that are deferred will have to be restarted? >> How? Could you please elaborate a bit on that (and I think also it >> needs a chapter in the specification). >> >> >> J. >> >> On Thu, Apr 29, 2021 at 6:00 PM Andrew Godwin >> <andrew.god...@astronomer.io.invalid> wrote: >> > >> > Yup, the triggerer HA is called out in the AIP, along with a future space >> > left to make it sharded (I'll design this into its API): >> > >> > "The triggerer is designed to be run in a highly-available (HA) manner; it >> > should coexist with other instances of itself, running the same triggers, >> > and deduplicating events when they fire off. In future, it should also be >> > designed to run in a sharded/partitioned architecture, where the set of >> > triggers is divided across replica sets of triggerers." >> > >> > I think a trigger_run_timeout is a very sensible thing to ship and I will >> > add it into the AIP, and I will propose we set a default of 2-3 seconds so >> > we catch the most egregious cases (ideally it would be sub-500ms if you >> > wanted to catch everything). >> > >> > Andrew >> > >> > On Thu, Apr 29, 2021 at 7:24 AM Kaxil Naik <kaxiln...@gmail.com> wrote: >> >> >> >> Similar to trigger_timeout (which is now in the AIP) we could have >> >> trigger_run_timeout to take care of that case. >> >> >> >> Regards, >> >> Kaxil >> >> >> >> On Thu, Apr 29, 2021 at 1:51 PM Ash Berlin-Taylor <a...@apache.org> wrote: >> >>> >> >>> On point one: The triggerer is HA, so you can run multiple, so the >> >>> concern about blocking is valid, but it wouldn't block _all_ triggers. >> >>> >> >>> Or I thought that was the plan, but the AIP doesn't mention that, but >> >>> Andrew said that _somewhere_ >> >>> >> >>> We should explicitly mention this is the case. >> >>> >> >>> -ash >> >>> >> >>> On 29 April 2021 09:45:47 BST, Jarek Potiuk <ja...@potiuk.com> wrote: >> >>>> >> >>>> Hey anyone from, the BIG users, any comments here ? Any thoughts about >> >>>> the operation side of this? I am not sure if my worries are too >> >>>> "excessive", so I would love to hear your thoughts here.. I think it >> >>>> would be great to unblock Andrew so that he can carry on with the AIP >> >>>> (which I think is great feature BTW). >> >>>> >> >>>> I think this boils down to two questions: >> >>>> >> >>>> 1) Would you be worried by having a single thread running all such >> >>>> triggers for the whole installation where users could potentially >> >>>> develop their own "blocking" triggers by mistake and block others ? >> >>>> 2) What kind of monitoring/detection/prevention would you like to see >> >>>> to get it "under control" :) >> >>>> >> >>>> J, >> >>>> >> >>>> >> >>>> On Tue, Apr 27, 2021 at 9:04 PM Andrew Godwin >> >>>> <andrew.god...@astronomer.io.invalid> wrote: >> >>>>> >> >>>>> >> >>>>> Oh totally, I was partially wearing my SRE hat when writing up parts >> >>>>> of this (hence why HA is built-in from the start), but I'm more used >> >>>>> to running web workloads than Airflow, so any input from people who've >> >>>>> run large Airflow installs are welcome. >> >>>>> >> >>>>>> That will not work I am afraid :). If we open it up for users to use, >> >>>>>> we have to deal with consequences. We have to be prepared for people >> >>>>>> doing all kinds of weird things - at least this is what I've learned >> >>>>>> during the last 2 years of working on Airflow. >> >>>>> >> >>>>> >> >>>>> It's maybe one of the key lessons I've had from 15 years writing open >> >>>>> source - people will always use your hidden APIs no matter what! >> >>>>> >> >>>>> I think in this case, it's a situation where we just have to make it >> >>>>> progressively safer as you go up the stack - if you're just using >> >>>>> Operators you are fine, if you are authoring Operators there's some >> >>>>> new things to think about if you want to go deferrable, and if you're >> >>>>> authoring Triggers then you need a bit of async experience. Plus, >> >>>>> people who are just writing standard non-deferrable operators have >> >>>>> nothing to worry about as this is purely an additive feature, which I >> >>>>> like; letting people opt-in to complexity is always nice. >> >>>>> >> >>>>> Hopefully we can get enough safety guards in that all three of these >> >>>>> levels will be reasonably accessible without encouraging people to go >> >>>>> straight to Triggers; it would likely be an improvement over Smart >> >>>>> Sensors, which as far as I can tell lack a lot of them. >> >>>>> >> >>>>> Andrew >> >>>>> >> >>>>> On Tue, Apr 27, 2021 at 12:29 PM Jarek Potiuk <ja...@potiuk.com> >> >>>>> wrote: >> >>>>>> >> >>>>>> >> >>>>>> Hey Andrew, >> >>>>>> >> >>>>>> Just don't get me wrong :). I love the idea/AIP proposal. Just want >> >>>>>> to >> >>>>>> make sure that from day one we think about operational aspects of it. >> >>>>>> I think the easier we make it for the operations people, the less we >> >>>>>> will have to deal with their problems in the devlist/Github issues. >> >>>>>> >> >>>>>> I would love to hear what others think about it - maybe some folks >> >>>>>> from the devlist who operate Airflow in scale could chime in here - >> >>>>>> we >> >>>>>> have some people from AirBnB/Twitter etc... Maybe you could state >> >>>>>> expectations when it comes to the operational side if you'd have >> >>>>>> 1000s >> >>>>>> of Triggers that potentially interfere with each other :) ? >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> On Mon, Apr 26, 2021 at 9:21 PM Andrew Godwin >> >>>>>> <andrew.god...@astronomer.io.invalid> wrote: >> >>>>>>> >> >>>>>>> 1) In general, I'm envisioning Triggers as a thing that are >> >>>>>>> generally abstracted away from DAG authors - instead, they would >> >>>>>>> come in a provider package (or core Airflow) and so we would be >> >>>>>>> expecting the same higher level of quality and testing. >> >>>>>> >> >>>>>> >> >>>>>> I see the intention, but we would have to have pretty comprehensive >> >>>>>> set of triggers from day one - similar to the different types of >> >>>>>> "rich >> >>>>>> intervals" we have in >> >>>>>> >> >>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-39+Richer+scheduler_interval. >> >>>>>> Maybe a list (groups) of the triggers we would like to have as >> >>>>>> "built-ins" would be really helpful? >> >>>>>> But even then, I think the "extendability" of this solution is what I >> >>>>>> like about it. More often than not, users will find out that they >> >>>>>> need >> >>>>>> something custom. I think if we describe the interface that the >> >>>>>> Trigger should implement and make it a "first-class" citizen - >> >>>>>> similarly as all other concepts in Airflow, it is expected that users >> >>>>>> will override them - Operators, Sensors, even the new "Richer >> >>>>>> schedules" are "meant" to be implemented by the users. If they are >> >>>>>> not, we should not make it public but rather have (and accept) only a >> >>>>>> fixed set of those - and for that we could just implement an Enum of >> >>>>>> available Triggers. By providing an interface, we invite our users to >> >>>>>> implement their own custom Triggers, and I think we need to deal with >> >>>>>> consequences. I think we have to think about what happens when users >> >>>>>> start writing their triggers and what "toolbox" we give the people >> >>>>>> operating Airflow to deal with that. >> >>>>>> >> >>>>>>> a) I do think it should all be fixed as asyncio, mostly because the >> >>>>>>> library support is there and you don't suddenly want to make people >> >>>>>>> have to run multiple "flavours" of triggerer process based on how >> >>>>>>> many triggers they're using and what runtime loops they demand. If >> >>>>>>> trio were more popular, I'd pick that as it's safer and (in my >> >>>>>>> opinion) better-designed, but we are unfortunately nowhere near >> >>>>>>> that, and in addition this is not something that is impossible to >> >>>>>>> add in at a later stage. >> >>>>>> >> >>>>>> >> >>>>>> Asyncio would also be my choice by far so we do not differ here :) >> >>>>>> >> >>>>>> From what I understand, you propose a single event loop to run all >> >>>>>> the >> >>>>>> deferred tasks? Am I right? >> >>>>>> My point is that multiple event loops or Thread-based async running >> >>>>>> are also part of asyncio. No problem with that. Asyncio by default >> >>>>>> uses a single event loop, but there is no problem to use more. This >> >>>>>> would be quite similar to "queues" we currently have with celery >> >>>>>> workers. I am not telling we should, but I can see the case where >> >>>>>> this >> >>>>>> might be advisable (for example to isolate groups of the deferred >> >>>>>> tasks so that they do not interfere/delay the other group). What do >> >>>>>> you think? >> >>>>>> >> >>>>>>> b) There's definitely some hooks we can use to detect long-running >> >>>>>>> triggers, and I'll see if I can grab some of them and implement >> >>>>>>> them. At very least, there's the SIGALRM watchdog method, and I >> >>>>>>> believe it's possible to put nice timeouts around asyncio tasks, >> >>>>>>> which we could use to enforce a max runtime specified in the >> >>>>>>> airflow.cfg file. >> >>>>>> >> >>>>>> >> >>>>>> I agree it will be indeed difficult to prevent people from making >> >>>>>> mistakes, but we should really think about how we should help the >> >>>>>> operations people to detect and diagnose them. And I think it should >> >>>>>> be part of specification: >> >>>>>> >> >>>>>> 1) what kind of metrics we are logging for the triggers - I think we >> >>>>>> should gather and publish (using airflow's metrics system) some >> >>>>>> useful >> >>>>>> metrics for the operations people (number of executions/execution >> >>>>>> length, queuing/delay time vs. expectation for some trigger like date >> >>>>>> trigger) etc. for all triggers from day one. >> >>>>>> 2) you mentioned it - maybe we should have Debug mode turned on by >> >>>>>> default: >> >>>>>> https://docs.python.org/3/library/asyncio-dev.html#debug-mode >> >>>>>> . It has some useful features, mainly automated logging of too long >> >>>>>> running async methods. Not sure what consequences it has though. >> >>>>>> 3) max execution time would be even nicer indeed >> >>>>>> 4) maybe there should be some exposure in the UI/CLI/API on what's >> >>>>>> going in with triggers? >> >>>>>> 5) maybe there should be a way to cancel /disable some triggers that >> >>>>>> are mis-behaving - using the UI/CLI/API - until the code gets fixed >> >>>>>> for those ? >> >>>>>> >> >>>>>> I think we simply need to document those aspects in "operations" >> >>>>>> chapter of your proposal. I am happy to propose some draft changes in >> >>>>>> the AIP if you would like to, after we discuss it here. >> >>>>>> >> >>>>>>> 2) This is why there's no actual _state_ that is persisted - >> >>>>>>> instead, you pass the method you want to call next and its keyword >> >>>>>>> arguments. Obviously we'll need to be quite clear about this in the >> >>>>>>> docs, but I feel this is better than persisting _some_ state. Again, >> >>>>>>> though, this is an implementation detail that would likely be hidden >> >>>>>>> inside an Operator or Sensor from the average DAG user; I'm not >> >>>>>>> proposing we expose this to things like PythonOperator or TaskFlow >> >>>>>>> yet, for the reasons you describe. >> >>>>>>> I wish I had a better API to present for Operator authors, but with >> >>>>>>> the fundamental fact that the Operator/Task is going to run on >> >>>>>>> different machines for different phases of its life, I think having >> >>>>>>> explicit "give me this when you revive me" arguments is the best >> >>>>>>> tradeoff we can go for. >> >>>>>> >> >>>>>> >> >>>>>> Agree. We cannot do much about it. I think we should just be very >> >>>>>> clear when we document the life cycle. Maybe we should even update >> >>>>>> the >> >>>>>> semantics of pre/post so that "pre_execute()" and "post_execute()" >> >>>>>> are executed for EVERY execute - including the deferred one (also >> >>>>>> execute_complete()). This way (in your example) the task execution >> >>>>>> would look like : pre_execute(), execute(-> throw TaskDeferred()), >> >>>>>> post_execute() and then on another worker pre_execute(), >> >>>>>> execute_complete(), post_execute(). I think that would make sense. >> >>>>>> What do you think? >> >>>>>> >> >>>>>>> 3) I do think we should limit the size of the payload, as well as >> >>>>>>> the kwargs that pass between deferred phases of the task instance - >> >>>>>>> something pretty meaty, like 500KB, would seem reasonable to me. >> >>>>>>> I've also run into the problem in the past that if you design a >> >>>>>>> messaging system without a limit, people _will_ push the size of the >> >>>>>>> things sent up to eyebrow-raisingly-large sizes. >> >>>>>> >> >>>>>> >> >>>>>> Yeah. I think XCom should be our benchmark. Currently we have >> >>>>>> MAX_XCOM_SIZE = 49344. Should we use the same? >> >>>>>> >> >>>>>> And that leads me to another question, actually very important. I >> >>>>>> think (correct me please if I am wrong) it is missing in the current >> >>>>>> specs. Where the kwargs are going to be stored while the task is >> >>>>>> deferred? Are they only stored in memory of the triggerer? Or in the >> >>>>>> DB? And what are the consequences? What happens when the tasks are >> >>>>>> triggered and the triggerer is restarted? How do we recover? Does it >> >>>>>> mean that all the tasks that are deferred will have to be restarted? >> >>>>>> How? Could you please elaborate a bit on that (and I think also it >> >>>>>> needs a chapter in the specification). >> >>>>>> >> >>>>>>> Overall, I think it's important to stress that the average DAG >> >>>>>>> author should not even know that Triggers really exist; instead, >> >>>>>>> they should just be able to switch Sensors or Operators (e.g. >> >>>>>>> DateTimeSensor -> AsyncDateTimeSensor in my prototype) and get the >> >>>>>>> benefits of deferred operators with no extra thought required. >> >>>>>> >> >>>>>> >> >>>>>> That will not work I am afraid :). If we open it up for users to use, >> >>>>>> we have to deal with consequences. We have to be prepared for people >> >>>>>> doing all kinds of weird things - at least this is what I've learned >> >>>>>> during the last 2 years of working on Airflow. See the comment about. >> >>>>>> If we do not want users to implement custom versions of triggers we >> >>>>>> should use Enums and a closed set of those. If we create an API an >> >>>>>> interface - people will use it and create their own, no matter if we >> >>>>>> want or not. >> >>>>>> >> >>>>>> >> >>>>>> J. >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> -- >> >>>> +48 660 796 129 >> >> >> >> -- >> +48 660 796 129 -- +48 660 796 129