Very clear. Thanks! that explains all my questions. Unless others have
more questions, - I am ok to vote on it, as long as there are no more
questions  - the "trigger_run_timeout" is for me a good-enough
"escape_hatch" and as long as we add a bit of logs and metrics (not
needed to be detailed in the AIP) to be able to efficiently monitor
triggers - I am all in:). happy to help with
implementation/review/testing :)

On Thu, Apr 29, 2021 at 7:13 PM Andrew Godwin
<andrew.god...@astronomer.io.invalid> wrote:
>
> You're totally right, I did miss that - sorry!
>
> The kwargs are stored in a new "next_kwargs" column for the TaskInstance - 
> this bit never actually touches the triggerer. There's two separate things 
> going on:
>
> - Within a worker/local executor, a running operator raises TaskDeferred, an 
> exception saying which trigger it wishes to defer on, along with the method 
> name and kwargs to come back to. At that point, the task execute wrapper 
> catches the exception, serializes the requested trigger into the new 
> "triggers" table, and serializes the method name and kwargs to resume on to 
> the TaskInstance table ("next_method" and "next_kwargs"), and then puts the 
> task into a new state "deferred".
>
> - The Triggerer is running entirely separately from all this, and watches the 
> triggers table. When it sees a new trigger requested, it loads it up and 
> starts it running in the async loop. When a trigger fires, the triggerer 
> looks up which task instances were depending on it, and then marks them for 
> re-scheduling, adding the event payload into their next_kwargs column as a 
> kwarg "event". If nothing else depended on that trigger, it then gets cleaned 
> up.
>
> - Once the task is marked as "scheduled" again, the executor comes around and 
> starts it again like normal, only this time it honours the values of 
> next_method and next_kwargs and uses that as the entrypoint to the operator 
> rather than "execute()".
>
> Hopefully that makes it clearer, those points are all a bit separate in the 
> AIP.
>
> Andrew
>
> On Thu, Apr 29, 2021 at 10:12 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>>
>> Hey Andrew,
>>
>> Possibly you missed that one :)
>>
>> And that leads me to another question, actually very important. I
>> think (correct me please if I am wrong)  it is missing in the current
>> specs. Where the kwargs are going to be stored while the task is
>> deferred? Are they only stored in memory of the triggerer? Or in the
>> DB? And what are the consequences? What happens when the tasks are
>> triggered and the triggerer is restarted? How do we recover? Does it
>> mean that all the tasks that are deferred will have to be restarted?
>> How? Could you please elaborate a bit on that (and I think also it
>> needs a chapter in the specification).
>>
>>
>> J.
>>
>> On Thu, Apr 29, 2021 at 6:00 PM Andrew Godwin
>> <andrew.god...@astronomer.io.invalid> wrote:
>> >
>> > Yup, the triggerer HA is called out in the AIP, along with a future space 
>> > left to make it sharded (I'll design this into its API):
>> >
>> > "The triggerer is designed to be run in a highly-available (HA) manner; it 
>> > should coexist with other instances of itself, running the same triggers, 
>> > and deduplicating events when they fire off. In future, it should also be 
>> > designed to run in a sharded/partitioned architecture, where the set of 
>> > triggers is divided across replica sets of triggerers."
>> >
>> > I think a trigger_run_timeout is a very sensible thing to ship and I will 
>> > add it into the AIP, and I will propose we set a default of 2-3 seconds so 
>> > we catch the most egregious cases (ideally it would be sub-500ms if you 
>> > wanted to catch everything).
>> >
>> > Andrew
>> >
>> > On Thu, Apr 29, 2021 at 7:24 AM Kaxil Naik <kaxiln...@gmail.com> wrote:
>> >>
>> >> Similar to trigger_timeout (which is now in the AIP) we could have 
>> >> trigger_run_timeout to take care of that case.
>> >>
>> >> Regards,
>> >> Kaxil
>> >>
>> >> On Thu, Apr 29, 2021 at 1:51 PM Ash Berlin-Taylor <a...@apache.org> wrote:
>> >>>
>> >>> On point one: The triggerer is HA, so you can run multiple, so the 
>> >>> concern about blocking is valid, but it wouldn't block _all_ triggers.
>> >>>
>> >>> Or I thought that was the plan, but the AIP doesn't mention that, but 
>> >>> Andrew said that _somewhere_
>> >>>
>> >>> We should explicitly mention this is the case.
>> >>>
>> >>> -ash
>> >>>
>> >>> On 29 April 2021 09:45:47 BST, Jarek Potiuk <ja...@potiuk.com> wrote:
>> >>>>
>> >>>> Hey anyone from, the BIG users, any comments here ? Any thoughts about
>> >>>> the operation side of this? I am not sure if my worries are too
>> >>>> "excessive", so I would love to hear your thoughts here.. I think it
>> >>>> would be great to unblock Andrew so that he can carry on with the AIP
>> >>>> (which I think is great feature BTW).
>> >>>>
>> >>>> I think this boils down to two questions:
>> >>>>
>> >>>> 1) Would you be worried by having a single thread running all such
>> >>>> triggers for the whole installation where users could potentially
>> >>>> develop their own "blocking" triggers by mistake and block others ?
>> >>>> 2) What kind of monitoring/detection/prevention would you like to see
>> >>>> to get it "under control" :)
>> >>>>
>> >>>> J,
>> >>>>
>> >>>>
>> >>>> On Tue, Apr 27, 2021 at 9:04 PM Andrew Godwin
>> >>>> <andrew.god...@astronomer.io.invalid> wrote:
>> >>>>>
>> >>>>>
>> >>>>>  Oh totally, I was partially wearing my SRE hat when writing up parts 
>> >>>>> of this (hence why HA is built-in from the start), but I'm more used 
>> >>>>> to running web workloads than Airflow, so any input from people who've 
>> >>>>> run large Airflow installs are welcome.
>> >>>>>
>> >>>>>> That will not work I am afraid :). If we open it up for users to use, 
>> >>>>>> we have to deal with consequences. We have to be prepared for people 
>> >>>>>> doing all kinds of weird things - at least this is what I've learned 
>> >>>>>> during the last 2 years of working on Airflow.
>> >>>>>
>> >>>>>
>> >>>>>  It's maybe one of the key lessons I've had from 15 years writing open 
>> >>>>> source - people will always use your hidden APIs no matter what!
>> >>>>>
>> >>>>>  I think in this case, it's a situation where we just have to make it 
>> >>>>> progressively safer as you go up the stack - if you're just using 
>> >>>>> Operators you are fine, if you are authoring Operators there's some 
>> >>>>> new things to think about if you want to go deferrable, and if you're 
>> >>>>> authoring Triggers then you need a bit of async experience. Plus, 
>> >>>>> people who are just writing standard non-deferrable operators have 
>> >>>>> nothing to worry about as this is purely an additive feature, which I 
>> >>>>> like; letting people opt-in to complexity is always nice.
>> >>>>>
>> >>>>>  Hopefully we can get enough safety guards in that all three of these 
>> >>>>> levels will be reasonably accessible without encouraging people to go 
>> >>>>> straight to Triggers; it would likely be an improvement over Smart 
>> >>>>> Sensors, which as far as I can tell lack a lot of them.
>> >>>>>
>> >>>>>  Andrew
>> >>>>>
>> >>>>>  On Tue, Apr 27, 2021 at 12:29 PM Jarek Potiuk <ja...@potiuk.com> 
>> >>>>> wrote:
>> >>>>>>
>> >>>>>>
>> >>>>>>  Hey Andrew,
>> >>>>>>
>> >>>>>>  Just don't get me wrong :). I love the idea/AIP proposal. Just want 
>> >>>>>> to
>> >>>>>>  make sure that from day one we think about operational aspects of it.
>> >>>>>>  I think the easier we make it for the operations people, the less we
>> >>>>>>  will have to deal with their problems in the devlist/Github issues.
>> >>>>>>
>> >>>>>>  I would love to hear what others think about it - maybe some folks
>> >>>>>>  from the devlist who operate Airflow in scale could chime in here - 
>> >>>>>> we
>> >>>>>>  have some people from AirBnB/Twitter etc... Maybe you could state
>> >>>>>>  expectations when it comes to the operational side if you'd have 
>> >>>>>> 1000s
>> >>>>>>  of Triggers that potentially interfere with each other :) ?
>> >>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>>  On Mon, Apr 26, 2021 at 9:21 PM Andrew Godwin
>> >>>>>>  <andrew.god...@astronomer.io.invalid> wrote:
>> >>>>>>>
>> >>>>>>> 1) In general, I'm envisioning Triggers as a thing that are 
>> >>>>>>> generally abstracted away from DAG authors - instead, they would 
>> >>>>>>> come in a provider package (or core Airflow) and so we would be 
>> >>>>>>> expecting the same higher level of quality and testing.
>> >>>>>>
>> >>>>>>
>> >>>>>>  I see the intention, but we would have to have pretty comprehensive
>> >>>>>>  set of triggers from day one - similar to the different types of 
>> >>>>>> "rich
>> >>>>>>  intervals" we have in
>> >>>>>>  
>> >>>>>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-39+Richer+scheduler_interval.
>> >>>>>>  Maybe a list (groups) of the triggers we would like to have as
>> >>>>>>  "built-ins" would be really helpful?
>> >>>>>>  But even then, I think the "extendability" of this solution is what I
>> >>>>>>  like about it. More often than not, users will find out that they 
>> >>>>>> need
>> >>>>>>  something custom. I think if we describe the interface that the
>> >>>>>>  Trigger should implement and make it a "first-class" citizen -
>> >>>>>>  similarly as all other concepts in Airflow, it is expected that users
>> >>>>>>  will override them - Operators, Sensors, even the new "Richer
>> >>>>>>  schedules" are "meant" to be implemented by the users. If they are
>> >>>>>>  not, we should not make it public but rather have (and accept) only a
>> >>>>>>  fixed set of those - and for that we could just  implement an Enum of
>> >>>>>>  available Triggers. By providing an interface, we invite our users to
>> >>>>>>  implement their own custom Triggers, and I think we need to deal with
>> >>>>>>  consequences. I think we have to think about what happens when users
>> >>>>>>  start writing their triggers and what "toolbox" we give the people
>> >>>>>>  operating Airflow to deal with that.
>> >>>>>>
>> >>>>>>> a) I do think it should all be fixed as asyncio, mostly because the 
>> >>>>>>> library support is there and you don't suddenly want to make people 
>> >>>>>>> have to run multiple "flavours" of triggerer process based on how 
>> >>>>>>> many triggers they're using and what runtime loops they demand. If 
>> >>>>>>> trio were more popular, I'd pick that as it's safer and (in my 
>> >>>>>>> opinion) better-designed, but we are unfortunately nowhere near 
>> >>>>>>> that, and in addition this is not something that is impossible to 
>> >>>>>>> add in at a later stage.
>> >>>>>>
>> >>>>>>
>> >>>>>>  Asyncio would also be my choice by far so we do not differ here :)
>> >>>>>>
>> >>>>>>  From what I understand, you propose a single event loop to run all 
>> >>>>>> the
>> >>>>>>  deferred tasks? Am I right?
>> >>>>>>  My point is that multiple event loops or Thread-based async running
>> >>>>>>  are also part of asyncio. No problem with that. Asyncio by default
>> >>>>>>  uses a single event loop, but there is no problem to use more. This
>> >>>>>>  would be quite similar to "queues" we currently have with celery
>> >>>>>>  workers. I am not telling we should, but I can see the case where 
>> >>>>>> this
>> >>>>>>  might be advisable (for example to isolate groups of the deferred
>> >>>>>>  tasks so that they do not interfere/delay the other group). What do
>> >>>>>>  you think?
>> >>>>>>
>> >>>>>>> b) There's definitely some hooks we can use to detect long-running 
>> >>>>>>> triggers, and I'll see if I can grab some of them and implement 
>> >>>>>>> them. At very least, there's the SIGALRM watchdog method, and I 
>> >>>>>>> believe it's possible to put nice timeouts around asyncio tasks, 
>> >>>>>>> which we could use to enforce a max runtime specified in the 
>> >>>>>>> airflow.cfg file.
>> >>>>>>
>> >>>>>>
>> >>>>>>  I agree it will be indeed difficult to prevent people from making
>> >>>>>>  mistakes, but we should really think about how we should help the
>> >>>>>>  operations people to detect and diagnose them. And I think it should
>> >>>>>>  be part of specification:
>> >>>>>>
>> >>>>>>  1) what kind of metrics we are logging for the triggers - I think we
>> >>>>>>  should gather and publish (using airflow's metrics system) some 
>> >>>>>> useful
>> >>>>>>  metrics for the operations people (number of executions/execution
>> >>>>>>  length, queuing/delay time vs. expectation for some trigger like date
>> >>>>>>  trigger) etc. for all triggers from day one.
>> >>>>>>  2) you mentioned it - maybe we should have Debug mode turned on by
>> >>>>>>  default: 
>> >>>>>> https://docs.python.org/3/library/asyncio-dev.html#debug-mode
>> >>>>>>  . It has some useful features, mainly automated logging of too long
>> >>>>>>  running async methods. Not sure what consequences it has though.
>> >>>>>>  3) max execution time would be even nicer indeed
>> >>>>>>  4) maybe there should be some exposure in the UI/CLI/API on what's
>> >>>>>>  going in with triggers?
>> >>>>>>  5) maybe there should be a way to cancel /disable some triggers that
>> >>>>>>  are mis-behaving - using the UI/CLI/API  - until the code gets fixed
>> >>>>>>  for those ?
>> >>>>>>
>> >>>>>>  I think we simply need to document those aspects in "operations"
>> >>>>>>  chapter of your proposal. I am happy to propose some draft changes in
>> >>>>>>  the AIP if you would like to, after we discuss it here.
>> >>>>>>
>> >>>>>>> 2) This is why there's no actual _state_ that is persisted - 
>> >>>>>>> instead, you pass the method you want to call next and its keyword 
>> >>>>>>> arguments. Obviously we'll need to be quite clear about this in the 
>> >>>>>>> docs, but I feel this is better than persisting _some_ state. Again, 
>> >>>>>>> though, this is an implementation detail that would likely be hidden 
>> >>>>>>> inside an Operator or Sensor from the average DAG user; I'm not 
>> >>>>>>> proposing we expose this to things like PythonOperator or TaskFlow 
>> >>>>>>> yet, for the reasons you describe.
>> >>>>>>> I wish I had a better API to present for Operator authors, but with 
>> >>>>>>> the fundamental fact that the Operator/Task is going to run on 
>> >>>>>>> different machines for different phases of its life, I think having 
>> >>>>>>> explicit "give me this when you revive me" arguments is the best 
>> >>>>>>> tradeoff we can go for.
>> >>>>>>
>> >>>>>>
>> >>>>>>  Agree. We cannot do much about it. I think we should just be very
>> >>>>>>  clear when we document the life cycle. Maybe we should even update 
>> >>>>>> the
>> >>>>>>  semantics of pre/post so that  "pre_execute()" and "post_execute()"
>> >>>>>>  are executed for EVERY execute - including the deferred one (also
>> >>>>>>  execute_complete()). This way (in your example) the task execution
>> >>>>>>  would look like : pre_execute(), execute(-> throw TaskDeferred()),
>> >>>>>>  post_execute()   and then on another worker pre_execute(),
>> >>>>>>  execute_complete(), post_execute(). I think that would make sense.
>> >>>>>>  What do you think?
>> >>>>>>
>> >>>>>>> 3) I do think we should limit the size of the payload, as well as 
>> >>>>>>> the kwargs that pass between deferred phases of the task instance - 
>> >>>>>>> something pretty meaty, like 500KB, would seem reasonable to me. 
>> >>>>>>> I've also run into the problem in the past that if you design a 
>> >>>>>>> messaging system without a limit, people _will_ push the size of the 
>> >>>>>>> things sent up to eyebrow-raisingly-large sizes.
>> >>>>>>
>> >>>>>>
>> >>>>>>  Yeah. I think XCom should be our benchmark. Currently we have
>> >>>>>>  MAX_XCOM_SIZE = 49344. Should we use the same?
>> >>>>>>
>> >>>>>>  And that leads me to another question, actually very important. I
>> >>>>>>  think (correct me please if I am wrong)  it is missing in the current
>> >>>>>>  specs. Where the kwargs are going to be stored while the task is
>> >>>>>>  deferred? Are they only stored in memory of the triggerer? Or in the
>> >>>>>>  DB? And what are the consequences? What happens when the tasks are
>> >>>>>>  triggered and the triggerer is restarted? How do we recover? Does it
>> >>>>>>  mean that all the tasks that are deferred will have to be restarted?
>> >>>>>>  How? Could you please elaborate a bit on that (and I think also it
>> >>>>>>  needs a chapter in the specification).
>> >>>>>>
>> >>>>>>> Overall, I think it's important to stress that the average DAG 
>> >>>>>>> author should not even know that Triggers really exist; instead, 
>> >>>>>>> they should just be able to switch Sensors or Operators (e.g. 
>> >>>>>>> DateTimeSensor -> AsyncDateTimeSensor in my prototype) and get the 
>> >>>>>>> benefits of deferred operators with no extra thought required.
>> >>>>>>
>> >>>>>>
>> >>>>>>  That will not work I am afraid :). If we open it up for users to use,
>> >>>>>>  we have to deal with consequences. We have to be prepared for people
>> >>>>>>  doing all kinds of weird things - at least this is what I've learned
>> >>>>>>  during the last 2 years of working on Airflow. See the comment about.
>> >>>>>>  If we do not want users to implement custom versions of triggers we
>> >>>>>>  should use Enums and a closed set of those. If we create an API an
>> >>>>>>  interface - people will use it and create their own, no matter if we
>> >>>>>>  want or not.
>> >>>>>>
>> >>>>>>
>> >>>>>>  J.
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> --
>> >>>> +48 660 796 129
>>
>>
>>
>> --
>> +48 660 796 129



-- 
+48 660 796 129

Reply via email to