You're totally right, I did miss that - sorry!

The kwargs are stored in a new "next_kwargs" column for the TaskInstance -
this bit never actually touches the triggerer. There's two separate things
going on:

- Within a worker/local executor, a running operator raises TaskDeferred,
an exception saying which trigger it wishes to defer on, along with the
method name and kwargs to come back to. At that point, the task execute
wrapper catches the exception, serializes the requested trigger into the
new "triggers" table, and serializes the method name and kwargs to resume
on to the TaskInstance table ("next_method" and "next_kwargs"), and then
puts the task into a new state "deferred".

- The Triggerer is running entirely separately from all this, and watches
the triggers table. When it sees a new trigger requested, it loads it up
and starts it running in the async loop. When a trigger fires, the
triggerer looks up which task instances were depending on it, and then
marks them for re-scheduling, adding the event payload into their
next_kwargs column as a kwarg "event". If nothing else depended on that
trigger, it then gets cleaned up.

- Once the task is marked as "scheduled" again, the executor comes around
and starts it again like normal, only this time it honours the values of
next_method and next_kwargs and uses that as the entrypoint to the operator
rather than "execute()".

Hopefully that makes it clearer, those points are all a bit separate in the
AIP.

Andrew

On Thu, Apr 29, 2021 at 10:12 AM Jarek Potiuk <ja...@potiuk.com> wrote:

> Hey Andrew,
>
> Possibly you missed that one :)
>
> And that leads me to another question, actually very important. I
> think (correct me please if I am wrong)  it is missing in the current
> specs. Where the kwargs are going to be stored while the task is
> deferred? Are they only stored in memory of the triggerer? Or in the
> DB? And what are the consequences? What happens when the tasks are
> triggered and the triggerer is restarted? How do we recover? Does it
> mean that all the tasks that are deferred will have to be restarted?
> How? Could you please elaborate a bit on that (and I think also it
> needs a chapter in the specification).
>
>
> J.
>
> On Thu, Apr 29, 2021 at 6:00 PM Andrew Godwin
> <andrew.god...@astronomer.io.invalid> wrote:
> >
> > Yup, the triggerer HA is called out in the AIP, along with a future
> space left to make it sharded (I'll design this into its API):
> >
> > "The triggerer is designed to be run in a highly-available (HA) manner;
> it should coexist with other instances of itself, running the same
> triggers, and deduplicating events when they fire off. In future, it should
> also be designed to run in a sharded/partitioned architecture, where the
> set of triggers is divided across replica sets of triggerers."
> >
> > I think a trigger_run_timeout is a very sensible thing to ship and I
> will add it into the AIP, and I will propose we set a default of 2-3
> seconds so we catch the most egregious cases (ideally it would be sub-500ms
> if you wanted to catch everything).
> >
> > Andrew
> >
> > On Thu, Apr 29, 2021 at 7:24 AM Kaxil Naik <kaxiln...@gmail.com> wrote:
> >>
> >> Similar to trigger_timeout (which is now in the AIP) we could have
> trigger_run_timeout to take care of that case.
> >>
> >> Regards,
> >> Kaxil
> >>
> >> On Thu, Apr 29, 2021 at 1:51 PM Ash Berlin-Taylor <a...@apache.org>
> wrote:
> >>>
> >>> On point one: The triggerer is HA, so you can run multiple, so the
> concern about blocking is valid, but it wouldn't block _all_ triggers.
> >>>
> >>> Or I thought that was the plan, but the AIP doesn't mention that, but
> Andrew said that _somewhere_
> >>>
> >>> We should explicitly mention this is the case.
> >>>
> >>> -ash
> >>>
> >>> On 29 April 2021 09:45:47 BST, Jarek Potiuk <ja...@potiuk.com> wrote:
> >>>>
> >>>> Hey anyone from, the BIG users, any comments here ? Any thoughts about
> >>>> the operation side of this? I am not sure if my worries are too
> >>>> "excessive", so I would love to hear your thoughts here.. I think it
> >>>> would be great to unblock Andrew so that he can carry on with the AIP
> >>>> (which I think is great feature BTW).
> >>>>
> >>>> I think this boils down to two questions:
> >>>>
> >>>> 1) Would you be worried by having a single thread running all such
> >>>> triggers for the whole installation where users could potentially
> >>>> develop their own "blocking" triggers by mistake and block others ?
> >>>> 2) What kind of monitoring/detection/prevention would you like to see
> >>>> to get it "under control" :)
> >>>>
> >>>> J,
> >>>>
> >>>>
> >>>> On Tue, Apr 27, 2021 at 9:04 PM Andrew Godwin
> >>>> <andrew.god...@astronomer.io.invalid> wrote:
> >>>>>
> >>>>>
> >>>>>  Oh totally, I was partially wearing my SRE hat when writing up
> parts of this (hence why HA is built-in from the start), but I'm more used
> to running web workloads than Airflow, so any input from people who've run
> large Airflow installs are welcome.
> >>>>>
> >>>>>> That will not work I am afraid :). If we open it up for users to
> use, we have to deal with consequences. We have to be prepared for people
> doing all kinds of weird things - at least this is what I've learned during
> the last 2 years of working on Airflow.
> >>>>>
> >>>>>
> >>>>>  It's maybe one of the key lessons I've had from 15 years writing
> open source - people will always use your hidden APIs no matter what!
> >>>>>
> >>>>>  I think in this case, it's a situation where we just have to make
> it progressively safer as you go up the stack - if you're just using
> Operators you are fine, if you are authoring Operators there's some new
> things to think about if you want to go deferrable, and if you're authoring
> Triggers then you need a bit of async experience. Plus, people who are just
> writing standard non-deferrable operators have nothing to worry about as
> this is purely an additive feature, which I like; letting people opt-in to
> complexity is always nice.
> >>>>>
> >>>>>  Hopefully we can get enough safety guards in that all three of
> these levels will be reasonably accessible without encouraging people to go
> straight to Triggers; it would likely be an improvement over Smart Sensors,
> which as far as I can tell lack a lot of them.
> >>>>>
> >>>>>  Andrew
> >>>>>
> >>>>>  On Tue, Apr 27, 2021 at 12:29 PM Jarek Potiuk <ja...@potiuk.com>
> wrote:
> >>>>>>
> >>>>>>
> >>>>>>  Hey Andrew,
> >>>>>>
> >>>>>>  Just don't get me wrong :). I love the idea/AIP proposal. Just
> want to
> >>>>>>  make sure that from day one we think about operational aspects of
> it.
> >>>>>>  I think the easier we make it for the operations people, the less
> we
> >>>>>>  will have to deal with their problems in the devlist/Github issues.
> >>>>>>
> >>>>>>  I would love to hear what others think about it - maybe some folks
> >>>>>>  from the devlist who operate Airflow in scale could chime in here
> - we
> >>>>>>  have some people from AirBnB/Twitter etc... Maybe you could state
> >>>>>>  expectations when it comes to the operational side if you'd have
> 1000s
> >>>>>>  of Triggers that potentially interfere with each other :) ?
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>  On Mon, Apr 26, 2021 at 9:21 PM Andrew Godwin
> >>>>>>  <andrew.god...@astronomer.io.invalid> wrote:
> >>>>>>>
> >>>>>>> 1) In general, I'm envisioning Triggers as a thing that are
> generally abstracted away from DAG authors - instead, they would come in a
> provider package (or core Airflow) and so we would be expecting the same
> higher level of quality and testing.
> >>>>>>
> >>>>>>
> >>>>>>  I see the intention, but we would have to have pretty comprehensive
> >>>>>>  set of triggers from day one - similar to the different types of
> "rich
> >>>>>>  intervals" we have in
> >>>>>>
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-39+Richer+scheduler_interval
> .
> >>>>>>  Maybe a list (groups) of the triggers we would like to have as
> >>>>>>  "built-ins" would be really helpful?
> >>>>>>  But even then, I think the "extendability" of this solution is
> what I
> >>>>>>  like about it. More often than not, users will find out that they
> need
> >>>>>>  something custom. I think if we describe the interface that the
> >>>>>>  Trigger should implement and make it a "first-class" citizen -
> >>>>>>  similarly as all other concepts in Airflow, it is expected that
> users
> >>>>>>  will override them - Operators, Sensors, even the new "Richer
> >>>>>>  schedules" are "meant" to be implemented by the users. If they are
> >>>>>>  not, we should not make it public but rather have (and accept)
> only a
> >>>>>>  fixed set of those - and for that we could just  implement an Enum
> of
> >>>>>>  available Triggers. By providing an interface, we invite our users
> to
> >>>>>>  implement their own custom Triggers, and I think we need to deal
> with
> >>>>>>  consequences. I think we have to think about what happens when
> users
> >>>>>>  start writing their triggers and what "toolbox" we give the people
> >>>>>>  operating Airflow to deal with that.
> >>>>>>
> >>>>>>> a) I do think it should all be fixed as asyncio, mostly because
> the library support is there and you don't suddenly want to make people
> have to run multiple "flavours" of triggerer process based on how many
> triggers they're using and what runtime loops they demand. If trio were
> more popular, I'd pick that as it's safer and (in my opinion)
> better-designed, but we are unfortunately nowhere near that, and in
> addition this is not something that is impossible to add in at a later
> stage.
> >>>>>>
> >>>>>>
> >>>>>>  Asyncio would also be my choice by far so we do not differ here :)
> >>>>>>
> >>>>>>  From what I understand, you propose a single event loop to run all
> the
> >>>>>>  deferred tasks? Am I right?
> >>>>>>  My point is that multiple event loops or Thread-based async running
> >>>>>>  are also part of asyncio. No problem with that. Asyncio by default
> >>>>>>  uses a single event loop, but there is no problem to use more. This
> >>>>>>  would be quite similar to "queues" we currently have with celery
> >>>>>>  workers. I am not telling we should, but I can see the case where
> this
> >>>>>>  might be advisable (for example to isolate groups of the deferred
> >>>>>>  tasks so that they do not interfere/delay the other group). What do
> >>>>>>  you think?
> >>>>>>
> >>>>>>> b) There's definitely some hooks we can use to detect long-running
> triggers, and I'll see if I can grab some of them and implement them. At
> very least, there's the SIGALRM watchdog method, and I believe it's
> possible to put nice timeouts around asyncio tasks, which we could use to
> enforce a max runtime specified in the airflow.cfg file.
> >>>>>>
> >>>>>>
> >>>>>>  I agree it will be indeed difficult to prevent people from making
> >>>>>>  mistakes, but we should really think about how we should help the
> >>>>>>  operations people to detect and diagnose them. And I think it
> should
> >>>>>>  be part of specification:
> >>>>>>
> >>>>>>  1) what kind of metrics we are logging for the triggers - I think
> we
> >>>>>>  should gather and publish (using airflow's metrics system) some
> useful
> >>>>>>  metrics for the operations people (number of executions/execution
> >>>>>>  length, queuing/delay time vs. expectation for some trigger like
> date
> >>>>>>  trigger) etc. for all triggers from day one.
> >>>>>>  2) you mentioned it - maybe we should have Debug mode turned on by
> >>>>>>  default:
> https://docs.python.org/3/library/asyncio-dev.html#debug-mode
> >>>>>>  . It has some useful features, mainly automated logging of too long
> >>>>>>  running async methods. Not sure what consequences it has though.
> >>>>>>  3) max execution time would be even nicer indeed
> >>>>>>  4) maybe there should be some exposure in the UI/CLI/API on what's
> >>>>>>  going in with triggers?
> >>>>>>  5) maybe there should be a way to cancel /disable some triggers
> that
> >>>>>>  are mis-behaving - using the UI/CLI/API  - until the code gets
> fixed
> >>>>>>  for those ?
> >>>>>>
> >>>>>>  I think we simply need to document those aspects in "operations"
> >>>>>>  chapter of your proposal. I am happy to propose some draft changes
> in
> >>>>>>  the AIP if you would like to, after we discuss it here.
> >>>>>>
> >>>>>>> 2) This is why there's no actual _state_ that is persisted -
> instead, you pass the method you want to call next and its keyword
> arguments. Obviously we'll need to be quite clear about this in the docs,
> but I feel this is better than persisting _some_ state. Again, though, this
> is an implementation detail that would likely be hidden inside an Operator
> or Sensor from the average DAG user; I'm not proposing we expose this to
> things like PythonOperator or TaskFlow yet, for the reasons you describe.
> >>>>>>> I wish I had a better API to present for Operator authors, but
> with the fundamental fact that the Operator/Task is going to run on
> different machines for different phases of its life, I think having
> explicit "give me this when you revive me" arguments is the best tradeoff
> we can go for.
> >>>>>>
> >>>>>>
> >>>>>>  Agree. We cannot do much about it. I think we should just be very
> >>>>>>  clear when we document the life cycle. Maybe we should even update
> the
> >>>>>>  semantics of pre/post so that  "pre_execute()" and "post_execute()"
> >>>>>>  are executed for EVERY execute - including the deferred one (also
> >>>>>>  execute_complete()). This way (in your example) the task execution
> >>>>>>  would look like : pre_execute(), execute(-> throw TaskDeferred()),
> >>>>>>  post_execute()   and then on another worker pre_execute(),
> >>>>>>  execute_complete(), post_execute(). I think that would make sense.
> >>>>>>  What do you think?
> >>>>>>
> >>>>>>> 3) I do think we should limit the size of the payload, as well as
> the kwargs that pass between deferred phases of the task instance -
> something pretty meaty, like 500KB, would seem reasonable to me. I've also
> run into the problem in the past that if you design a messaging system
> without a limit, people _will_ push the size of the things sent up to
> eyebrow-raisingly-large sizes.
> >>>>>>
> >>>>>>
> >>>>>>  Yeah. I think XCom should be our benchmark. Currently we have
> >>>>>>  MAX_XCOM_SIZE = 49344. Should we use the same?
> >>>>>>
> >>>>>>  And that leads me to another question, actually very important. I
> >>>>>>  think (correct me please if I am wrong)  it is missing in the
> current
> >>>>>>  specs. Where the kwargs are going to be stored while the task is
> >>>>>>  deferred? Are they only stored in memory of the triggerer? Or in
> the
> >>>>>>  DB? And what are the consequences? What happens when the tasks are
> >>>>>>  triggered and the triggerer is restarted? How do we recover? Does
> it
> >>>>>>  mean that all the tasks that are deferred will have to be
> restarted?
> >>>>>>  How? Could you please elaborate a bit on that (and I think also it
> >>>>>>  needs a chapter in the specification).
> >>>>>>
> >>>>>>> Overall, I think it's important to stress that the average DAG
> author should not even know that Triggers really exist; instead, they
> should just be able to switch Sensors or Operators (e.g. DateTimeSensor ->
> AsyncDateTimeSensor in my prototype) and get the benefits of deferred
> operators with no extra thought required.
> >>>>>>
> >>>>>>
> >>>>>>  That will not work I am afraid :). If we open it up for users to
> use,
> >>>>>>  we have to deal with consequences. We have to be prepared for
> people
> >>>>>>  doing all kinds of weird things - at least this is what I've
> learned
> >>>>>>  during the last 2 years of working on Airflow. See the comment
> about.
> >>>>>>  If we do not want users to implement custom versions of triggers we
> >>>>>>  should use Enums and a closed set of those. If we create an API an
> >>>>>>  interface - people will use it and create their own, no matter if
> we
> >>>>>>  want or not.
> >>>>>>
> >>>>>>
> >>>>>>  J.
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> +48 660 796 129
>
>
>
> --
> +48 660 796 129
>

Reply via email to