Hey Andrew, Possibly you missed that one :)
And that leads me to another question, actually very important. I think (correct me please if I am wrong) it is missing in the current specs. Where the kwargs are going to be stored while the task is deferred? Are they only stored in memory of the triggerer? Or in the DB? And what are the consequences? What happens when the tasks are triggered and the triggerer is restarted? How do we recover? Does it mean that all the tasks that are deferred will have to be restarted? How? Could you please elaborate a bit on that (and I think also it needs a chapter in the specification). J. On Thu, Apr 29, 2021 at 6:00 PM Andrew Godwin <andrew.god...@astronomer.io.invalid> wrote: > > Yup, the triggerer HA is called out in the AIP, along with a future space > left to make it sharded (I'll design this into its API): > > "The triggerer is designed to be run in a highly-available (HA) manner; it > should coexist with other instances of itself, running the same triggers, and > deduplicating events when they fire off. In future, it should also be > designed to run in a sharded/partitioned architecture, where the set of > triggers is divided across replica sets of triggerers." > > I think a trigger_run_timeout is a very sensible thing to ship and I will add > it into the AIP, and I will propose we set a default of 2-3 seconds so we > catch the most egregious cases (ideally it would be sub-500ms if you wanted > to catch everything). > > Andrew > > On Thu, Apr 29, 2021 at 7:24 AM Kaxil Naik <kaxiln...@gmail.com> wrote: >> >> Similar to trigger_timeout (which is now in the AIP) we could have >> trigger_run_timeout to take care of that case. >> >> Regards, >> Kaxil >> >> On Thu, Apr 29, 2021 at 1:51 PM Ash Berlin-Taylor <a...@apache.org> wrote: >>> >>> On point one: The triggerer is HA, so you can run multiple, so the concern >>> about blocking is valid, but it wouldn't block _all_ triggers. >>> >>> Or I thought that was the plan, but the AIP doesn't mention that, but >>> Andrew said that _somewhere_ >>> >>> We should explicitly mention this is the case. >>> >>> -ash >>> >>> On 29 April 2021 09:45:47 BST, Jarek Potiuk <ja...@potiuk.com> wrote: >>>> >>>> Hey anyone from, the BIG users, any comments here ? Any thoughts about >>>> the operation side of this? I am not sure if my worries are too >>>> "excessive", so I would love to hear your thoughts here.. I think it >>>> would be great to unblock Andrew so that he can carry on with the AIP >>>> (which I think is great feature BTW). >>>> >>>> I think this boils down to two questions: >>>> >>>> 1) Would you be worried by having a single thread running all such >>>> triggers for the whole installation where users could potentially >>>> develop their own "blocking" triggers by mistake and block others ? >>>> 2) What kind of monitoring/detection/prevention would you like to see >>>> to get it "under control" :) >>>> >>>> J, >>>> >>>> >>>> On Tue, Apr 27, 2021 at 9:04 PM Andrew Godwin >>>> <andrew.god...@astronomer.io.invalid> wrote: >>>>> >>>>> >>>>> Oh totally, I was partially wearing my SRE hat when writing up parts of >>>>> this (hence why HA is built-in from the start), but I'm more used to >>>>> running web workloads than Airflow, so any input from people who've run >>>>> large Airflow installs are welcome. >>>>> >>>>>> That will not work I am afraid :). If we open it up for users to use, we >>>>>> have to deal with consequences. We have to be prepared for people doing >>>>>> all kinds of weird things - at least this is what I've learned during >>>>>> the last 2 years of working on Airflow. >>>>> >>>>> >>>>> It's maybe one of the key lessons I've had from 15 years writing open >>>>> source - people will always use your hidden APIs no matter what! >>>>> >>>>> I think in this case, it's a situation where we just have to make it >>>>> progressively safer as you go up the stack - if you're just using >>>>> Operators you are fine, if you are authoring Operators there's some new >>>>> things to think about if you want to go deferrable, and if you're >>>>> authoring Triggers then you need a bit of async experience. Plus, people >>>>> who are just writing standard non-deferrable operators have nothing to >>>>> worry about as this is purely an additive feature, which I like; letting >>>>> people opt-in to complexity is always nice. >>>>> >>>>> Hopefully we can get enough safety guards in that all three of these >>>>> levels will be reasonably accessible without encouraging people to go >>>>> straight to Triggers; it would likely be an improvement over Smart >>>>> Sensors, which as far as I can tell lack a lot of them. >>>>> >>>>> Andrew >>>>> >>>>> On Tue, Apr 27, 2021 at 12:29 PM Jarek Potiuk <ja...@potiuk.com> wrote: >>>>>> >>>>>> >>>>>> Hey Andrew, >>>>>> >>>>>> Just don't get me wrong :). I love the idea/AIP proposal. Just want to >>>>>> make sure that from day one we think about operational aspects of it. >>>>>> I think the easier we make it for the operations people, the less we >>>>>> will have to deal with their problems in the devlist/Github issues. >>>>>> >>>>>> I would love to hear what others think about it - maybe some folks >>>>>> from the devlist who operate Airflow in scale could chime in here - we >>>>>> have some people from AirBnB/Twitter etc... Maybe you could state >>>>>> expectations when it comes to the operational side if you'd have 1000s >>>>>> of Triggers that potentially interfere with each other :) ? >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Apr 26, 2021 at 9:21 PM Andrew Godwin >>>>>> <andrew.god...@astronomer.io.invalid> wrote: >>>>>>> >>>>>>> 1) In general, I'm envisioning Triggers as a thing that are generally >>>>>>> abstracted away from DAG authors - instead, they would come in a >>>>>>> provider package (or core Airflow) and so we would be expecting the >>>>>>> same higher level of quality and testing. >>>>>> >>>>>> >>>>>> I see the intention, but we would have to have pretty comprehensive >>>>>> set of triggers from day one - similar to the different types of "rich >>>>>> intervals" we have in >>>>>> >>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-39+Richer+scheduler_interval. >>>>>> Maybe a list (groups) of the triggers we would like to have as >>>>>> "built-ins" would be really helpful? >>>>>> But even then, I think the "extendability" of this solution is what I >>>>>> like about it. More often than not, users will find out that they need >>>>>> something custom. I think if we describe the interface that the >>>>>> Trigger should implement and make it a "first-class" citizen - >>>>>> similarly as all other concepts in Airflow, it is expected that users >>>>>> will override them - Operators, Sensors, even the new "Richer >>>>>> schedules" are "meant" to be implemented by the users. If they are >>>>>> not, we should not make it public but rather have (and accept) only a >>>>>> fixed set of those - and for that we could just implement an Enum of >>>>>> available Triggers. By providing an interface, we invite our users to >>>>>> implement their own custom Triggers, and I think we need to deal with >>>>>> consequences. I think we have to think about what happens when users >>>>>> start writing their triggers and what "toolbox" we give the people >>>>>> operating Airflow to deal with that. >>>>>> >>>>>>> a) I do think it should all be fixed as asyncio, mostly because the >>>>>>> library support is there and you don't suddenly want to make people >>>>>>> have to run multiple "flavours" of triggerer process based on how many >>>>>>> triggers they're using and what runtime loops they demand. If trio were >>>>>>> more popular, I'd pick that as it's safer and (in my opinion) >>>>>>> better-designed, but we are unfortunately nowhere near that, and in >>>>>>> addition this is not something that is impossible to add in at a later >>>>>>> stage. >>>>>> >>>>>> >>>>>> Asyncio would also be my choice by far so we do not differ here :) >>>>>> >>>>>> From what I understand, you propose a single event loop to run all the >>>>>> deferred tasks? Am I right? >>>>>> My point is that multiple event loops or Thread-based async running >>>>>> are also part of asyncio. No problem with that. Asyncio by default >>>>>> uses a single event loop, but there is no problem to use more. This >>>>>> would be quite similar to "queues" we currently have with celery >>>>>> workers. I am not telling we should, but I can see the case where this >>>>>> might be advisable (for example to isolate groups of the deferred >>>>>> tasks so that they do not interfere/delay the other group). What do >>>>>> you think? >>>>>> >>>>>>> b) There's definitely some hooks we can use to detect long-running >>>>>>> triggers, and I'll see if I can grab some of them and implement them. >>>>>>> At very least, there's the SIGALRM watchdog method, and I believe it's >>>>>>> possible to put nice timeouts around asyncio tasks, which we could use >>>>>>> to enforce a max runtime specified in the airflow.cfg file. >>>>>> >>>>>> >>>>>> I agree it will be indeed difficult to prevent people from making >>>>>> mistakes, but we should really think about how we should help the >>>>>> operations people to detect and diagnose them. And I think it should >>>>>> be part of specification: >>>>>> >>>>>> 1) what kind of metrics we are logging for the triggers - I think we >>>>>> should gather and publish (using airflow's metrics system) some useful >>>>>> metrics for the operations people (number of executions/execution >>>>>> length, queuing/delay time vs. expectation for some trigger like date >>>>>> trigger) etc. for all triggers from day one. >>>>>> 2) you mentioned it - maybe we should have Debug mode turned on by >>>>>> default: https://docs.python.org/3/library/asyncio-dev.html#debug-mode >>>>>> . It has some useful features, mainly automated logging of too long >>>>>> running async methods. Not sure what consequences it has though. >>>>>> 3) max execution time would be even nicer indeed >>>>>> 4) maybe there should be some exposure in the UI/CLI/API on what's >>>>>> going in with triggers? >>>>>> 5) maybe there should be a way to cancel /disable some triggers that >>>>>> are mis-behaving - using the UI/CLI/API - until the code gets fixed >>>>>> for those ? >>>>>> >>>>>> I think we simply need to document those aspects in "operations" >>>>>> chapter of your proposal. I am happy to propose some draft changes in >>>>>> the AIP if you would like to, after we discuss it here. >>>>>> >>>>>>> 2) This is why there's no actual _state_ that is persisted - instead, >>>>>>> you pass the method you want to call next and its keyword arguments. >>>>>>> Obviously we'll need to be quite clear about this in the docs, but I >>>>>>> feel this is better than persisting _some_ state. Again, though, this >>>>>>> is an implementation detail that would likely be hidden inside an >>>>>>> Operator or Sensor from the average DAG user; I'm not proposing we >>>>>>> expose this to things like PythonOperator or TaskFlow yet, for the >>>>>>> reasons you describe. >>>>>>> I wish I had a better API to present for Operator authors, but with the >>>>>>> fundamental fact that the Operator/Task is going to run on different >>>>>>> machines for different phases of its life, I think having explicit >>>>>>> "give me this when you revive me" arguments is the best tradeoff we can >>>>>>> go for. >>>>>> >>>>>> >>>>>> Agree. We cannot do much about it. I think we should just be very >>>>>> clear when we document the life cycle. Maybe we should even update the >>>>>> semantics of pre/post so that "pre_execute()" and "post_execute()" >>>>>> are executed for EVERY execute - including the deferred one (also >>>>>> execute_complete()). This way (in your example) the task execution >>>>>> would look like : pre_execute(), execute(-> throw TaskDeferred()), >>>>>> post_execute() and then on another worker pre_execute(), >>>>>> execute_complete(), post_execute(). I think that would make sense. >>>>>> What do you think? >>>>>> >>>>>>> 3) I do think we should limit the size of the payload, as well as the >>>>>>> kwargs that pass between deferred phases of the task instance - >>>>>>> something pretty meaty, like 500KB, would seem reasonable to me. I've >>>>>>> also run into the problem in the past that if you design a messaging >>>>>>> system without a limit, people _will_ push the size of the things sent >>>>>>> up to eyebrow-raisingly-large sizes. >>>>>> >>>>>> >>>>>> Yeah. I think XCom should be our benchmark. Currently we have >>>>>> MAX_XCOM_SIZE = 49344. Should we use the same? >>>>>> >>>>>> And that leads me to another question, actually very important. I >>>>>> think (correct me please if I am wrong) it is missing in the current >>>>>> specs. Where the kwargs are going to be stored while the task is >>>>>> deferred? Are they only stored in memory of the triggerer? Or in the >>>>>> DB? And what are the consequences? What happens when the tasks are >>>>>> triggered and the triggerer is restarted? How do we recover? Does it >>>>>> mean that all the tasks that are deferred will have to be restarted? >>>>>> How? Could you please elaborate a bit on that (and I think also it >>>>>> needs a chapter in the specification). >>>>>> >>>>>>> Overall, I think it's important to stress that the average DAG author >>>>>>> should not even know that Triggers really exist; instead, they should >>>>>>> just be able to switch Sensors or Operators (e.g. DateTimeSensor -> >>>>>>> AsyncDateTimeSensor in my prototype) and get the benefits of deferred >>>>>>> operators with no extra thought required. >>>>>> >>>>>> >>>>>> That will not work I am afraid :). If we open it up for users to use, >>>>>> we have to deal with consequences. We have to be prepared for people >>>>>> doing all kinds of weird things - at least this is what I've learned >>>>>> during the last 2 years of working on Airflow. See the comment about. >>>>>> If we do not want users to implement custom versions of triggers we >>>>>> should use Enums and a closed set of those. If we create an API an >>>>>> interface - people will use it and create their own, no matter if we >>>>>> want or not. >>>>>> >>>>>> >>>>>> J. >>>> >>>> >>>> >>>> >>>> -- >>>> +48 660 796 129 -- +48 660 796 129