Thanks Andrew for the detailed responses and Jarek for the great questions.
I really like the AIP and love the direction.

Vikram


On Thu, Apr 29, 2021 at 10:27 AM Andrew Godwin
<andrew.god...@astronomer.io.invalid> wrote:

> Yup, logging and metrics are something I didn't put in the prototype but I
> consider an essential part of feature "polishing" (along with docs and
> tests, naturally).
>
> On the trigger timeout front, I did some investigation today, and it seems
> that unfortunately we cannot get at the actual place we'd need to detect
> timeouts per-trigger, as it's buried in asyncio with no override:
> https://github.com/python/cpython/blob/e4fe303b8cca525e97d44e80c7e53bdab9dd9187/Lib/asyncio/base_events.py#L1876
>
> So, instead, I can construct an asyncio task that runs at all times in the
> triggerer at 0.2 second intervals and notices if it gets blocked for too
> long, and probably then tie that into a SIGALRM based watchdog. It won't be
> able to tell exactly which trigger was blocking, but it will at least
> detect that something was, which I think is better than nothing (we can
> then instruct users to set PYTHONASYNCIODEBUG=1 to see what's actually
> doing it)
>
> Andrew
>
> On Thu, Apr 29, 2021 at 11:23 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> Very clear. Thanks! that explains all my questions. Unless others have
>> more questions, - I am ok to vote on it, as long as there are no more
>> questions  - the "trigger_run_timeout" is for me a good-enough
>> "escape_hatch" and as long as we add a bit of logs and metrics (not
>> needed to be detailed in the AIP) to be able to efficiently monitor
>> triggers - I am all in:). happy to help with
>> implementation/review/testing :)
>>
>> On Thu, Apr 29, 2021 at 7:13 PM Andrew Godwin
>> <andrew.god...@astronomer.io.invalid> wrote:
>> >
>> > You're totally right, I did miss that - sorry!
>> >
>> > The kwargs are stored in a new "next_kwargs" column for the
>> TaskInstance - this bit never actually touches the triggerer. There's two
>> separate things going on:
>> >
>> > - Within a worker/local executor, a running operator raises
>> TaskDeferred, an exception saying which trigger it wishes to defer on,
>> along with the method name and kwargs to come back to. At that point, the
>> task execute wrapper catches the exception, serializes the requested
>> trigger into the new "triggers" table, and serializes the method name and
>> kwargs to resume on to the TaskInstance table ("next_method" and
>> "next_kwargs"), and then puts the task into a new state "deferred".
>> >
>> > - The Triggerer is running entirely separately from all this, and
>> watches the triggers table. When it sees a new trigger requested, it loads
>> it up and starts it running in the async loop. When a trigger fires, the
>> triggerer looks up which task instances were depending on it, and then
>> marks them for re-scheduling, adding the event payload into their
>> next_kwargs column as a kwarg "event". If nothing else depended on that
>> trigger, it then gets cleaned up.
>> >
>> > - Once the task is marked as "scheduled" again, the executor comes
>> around and starts it again like normal, only this time it honours the
>> values of next_method and next_kwargs and uses that as the entrypoint to
>> the operator rather than "execute()".
>> >
>> > Hopefully that makes it clearer, those points are all a bit separate in
>> the AIP.
>> >
>> > Andrew
>> >
>> > On Thu, Apr 29, 2021 at 10:12 AM Jarek Potiuk <ja...@potiuk.com> wrote:
>> >>
>> >> Hey Andrew,
>> >>
>> >> Possibly you missed that one :)
>> >>
>> >> And that leads me to another question, actually very important. I
>> >> think (correct me please if I am wrong)  it is missing in the current
>> >> specs. Where the kwargs are going to be stored while the task is
>> >> deferred? Are they only stored in memory of the triggerer? Or in the
>> >> DB? And what are the consequences? What happens when the tasks are
>> >> triggered and the triggerer is restarted? How do we recover? Does it
>> >> mean that all the tasks that are deferred will have to be restarted?
>> >> How? Could you please elaborate a bit on that (and I think also it
>> >> needs a chapter in the specification).
>> >>
>> >>
>> >> J.
>> >>
>> >> On Thu, Apr 29, 2021 at 6:00 PM Andrew Godwin
>> >> <andrew.god...@astronomer.io.invalid> wrote:
>> >> >
>> >> > Yup, the triggerer HA is called out in the AIP, along with a future
>> space left to make it sharded (I'll design this into its API):
>> >> >
>> >> > "The triggerer is designed to be run in a highly-available (HA)
>> manner; it should coexist with other instances of itself, running the same
>> triggers, and deduplicating events when they fire off. In future, it should
>> also be designed to run in a sharded/partitioned architecture, where the
>> set of triggers is divided across replica sets of triggerers."
>> >> >
>> >> > I think a trigger_run_timeout is a very sensible thing to ship and I
>> will add it into the AIP, and I will propose we set a default of 2-3
>> seconds so we catch the most egregious cases (ideally it would be sub-500ms
>> if you wanted to catch everything).
>> >> >
>> >> > Andrew
>> >> >
>> >> > On Thu, Apr 29, 2021 at 7:24 AM Kaxil Naik <kaxiln...@gmail.com>
>> wrote:
>> >> >>
>> >> >> Similar to trigger_timeout (which is now in the AIP) we could have
>> trigger_run_timeout to take care of that case.
>> >> >>
>> >> >> Regards,
>> >> >> Kaxil
>> >> >>
>> >> >> On Thu, Apr 29, 2021 at 1:51 PM Ash Berlin-Taylor <a...@apache.org>
>> wrote:
>> >> >>>
>> >> >>> On point one: The triggerer is HA, so you can run multiple, so the
>> concern about blocking is valid, but it wouldn't block _all_ triggers.
>> >> >>>
>> >> >>> Or I thought that was the plan, but the AIP doesn't mention that,
>> but Andrew said that _somewhere_
>> >> >>>
>> >> >>> We should explicitly mention this is the case.
>> >> >>>
>> >> >>> -ash
>> >> >>>
>> >> >>> On 29 April 2021 09:45:47 BST, Jarek Potiuk <ja...@potiuk.com>
>> wrote:
>> >> >>>>
>> >> >>>> Hey anyone from, the BIG users, any comments here ? Any thoughts
>> about
>> >> >>>> the operation side of this? I am not sure if my worries are too
>> >> >>>> "excessive", so I would love to hear your thoughts here.. I think
>> it
>> >> >>>> would be great to unblock Andrew so that he can carry on with the
>> AIP
>> >> >>>> (which I think is great feature BTW).
>> >> >>>>
>> >> >>>> I think this boils down to two questions:
>> >> >>>>
>> >> >>>> 1) Would you be worried by having a single thread running all such
>> >> >>>> triggers for the whole installation where users could potentially
>> >> >>>> develop their own "blocking" triggers by mistake and block others
>> ?
>> >> >>>> 2) What kind of monitoring/detection/prevention would you like to
>> see
>> >> >>>> to get it "under control" :)
>> >> >>>>
>> >> >>>> J,
>> >> >>>>
>> >> >>>>
>> >> >>>> On Tue, Apr 27, 2021 at 9:04 PM Andrew Godwin
>> >> >>>> <andrew.god...@astronomer.io.invalid> wrote:
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>  Oh totally, I was partially wearing my SRE hat when writing up
>> parts of this (hence why HA is built-in from the start), but I'm more used
>> to running web workloads than Airflow, so any input from people who've run
>> large Airflow installs are welcome.
>> >> >>>>>
>> >> >>>>>> That will not work I am afraid :). If we open it up for users
>> to use, we have to deal with consequences. We have to be prepared for
>> people doing all kinds of weird things - at least this is what I've learned
>> during the last 2 years of working on Airflow.
>> >> >>>>>
>> >> >>>>>
>> >> >>>>>  It's maybe one of the key lessons I've had from 15 years
>> writing open source - people will always use your hidden APIs no matter
>> what!
>> >> >>>>>
>> >> >>>>>  I think in this case, it's a situation where we just have to
>> make it progressively safer as you go up the stack - if you're just using
>> Operators you are fine, if you are authoring Operators there's some new
>> things to think about if you want to go deferrable, and if you're authoring
>> Triggers then you need a bit of async experience. Plus, people who are just
>> writing standard non-deferrable operators have nothing to worry about as
>> this is purely an additive feature, which I like; letting people opt-in to
>> complexity is always nice.
>> >> >>>>>
>> >> >>>>>  Hopefully we can get enough safety guards in that all three of
>> these levels will be reasonably accessible without encouraging people to go
>> straight to Triggers; it would likely be an improvement over Smart Sensors,
>> which as far as I can tell lack a lot of them.
>> >> >>>>>
>> >> >>>>>  Andrew
>> >> >>>>>
>> >> >>>>>  On Tue, Apr 27, 2021 at 12:29 PM Jarek Potiuk <ja...@potiuk.com>
>> wrote:
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>>  Hey Andrew,
>> >> >>>>>>
>> >> >>>>>>  Just don't get me wrong :). I love the idea/AIP proposal. Just
>> want to
>> >> >>>>>>  make sure that from day one we think about operational aspects
>> of it.
>> >> >>>>>>  I think the easier we make it for the operations people, the
>> less we
>> >> >>>>>>  will have to deal with their problems in the devlist/Github
>> issues.
>> >> >>>>>>
>> >> >>>>>>  I would love to hear what others think about it - maybe some
>> folks
>> >> >>>>>>  from the devlist who operate Airflow in scale could chime in
>> here - we
>> >> >>>>>>  have some people from AirBnB/Twitter etc... Maybe you could
>> state
>> >> >>>>>>  expectations when it comes to the operational side if you'd
>> have 1000s
>> >> >>>>>>  of Triggers that potentially interfere with each other :) ?
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>>  On Mon, Apr 26, 2021 at 9:21 PM Andrew Godwin
>> >> >>>>>>  <andrew.god...@astronomer.io.invalid> wrote:
>> >> >>>>>>>
>> >> >>>>>>> 1) In general, I'm envisioning Triggers as a thing that are
>> generally abstracted away from DAG authors - instead, they would come in a
>> provider package (or core Airflow) and so we would be expecting the same
>> higher level of quality and testing.
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>>  I see the intention, but we would have to have pretty
>> comprehensive
>> >> >>>>>>  set of triggers from day one - similar to the different types
>> of "rich
>> >> >>>>>>  intervals" we have in
>> >> >>>>>>
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-39+Richer+scheduler_interval
>> .
>> >> >>>>>>  Maybe a list (groups) of the triggers we would like to have as
>> >> >>>>>>  "built-ins" would be really helpful?
>> >> >>>>>>  But even then, I think the "extendability" of this solution is
>> what I
>> >> >>>>>>  like about it. More often than not, users will find out that
>> they need
>> >> >>>>>>  something custom. I think if we describe the interface that the
>> >> >>>>>>  Trigger should implement and make it a "first-class" citizen -
>> >> >>>>>>  similarly as all other concepts in Airflow, it is expected
>> that users
>> >> >>>>>>  will override them - Operators, Sensors, even the new "Richer
>> >> >>>>>>  schedules" are "meant" to be implemented by the users. If they
>> are
>> >> >>>>>>  not, we should not make it public but rather have (and accept)
>> only a
>> >> >>>>>>  fixed set of those - and for that we could just  implement an
>> Enum of
>> >> >>>>>>  available Triggers. By providing an interface, we invite our
>> users to
>> >> >>>>>>  implement their own custom Triggers, and I think we need to
>> deal with
>> >> >>>>>>  consequences. I think we have to think about what happens when
>> users
>> >> >>>>>>  start writing their triggers and what "toolbox" we give the
>> people
>> >> >>>>>>  operating Airflow to deal with that.
>> >> >>>>>>
>> >> >>>>>>> a) I do think it should all be fixed as asyncio, mostly
>> because the library support is there and you don't suddenly want to make
>> people have to run multiple "flavours" of triggerer process based on how
>> many triggers they're using and what runtime loops they demand. If trio
>> were more popular, I'd pick that as it's safer and (in my opinion)
>> better-designed, but we are unfortunately nowhere near that, and in
>> addition this is not something that is impossible to add in at a later
>> stage.
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>>  Asyncio would also be my choice by far so we do not differ
>> here :)
>> >> >>>>>>
>> >> >>>>>>  From what I understand, you propose a single event loop to run
>> all the
>> >> >>>>>>  deferred tasks? Am I right?
>> >> >>>>>>  My point is that multiple event loops or Thread-based async
>> running
>> >> >>>>>>  are also part of asyncio. No problem with that. Asyncio by
>> default
>> >> >>>>>>  uses a single event loop, but there is no problem to use more.
>> This
>> >> >>>>>>  would be quite similar to "queues" we currently have with
>> celery
>> >> >>>>>>  workers. I am not telling we should, but I can see the case
>> where this
>> >> >>>>>>  might be advisable (for example to isolate groups of the
>> deferred
>> >> >>>>>>  tasks so that they do not interfere/delay the other group).
>> What do
>> >> >>>>>>  you think?
>> >> >>>>>>
>> >> >>>>>>> b) There's definitely some hooks we can use to detect
>> long-running triggers, and I'll see if I can grab some of them and
>> implement them. At very least, there's the SIGALRM watchdog method, and I
>> believe it's possible to put nice timeouts around asyncio tasks, which we
>> could use to enforce a max runtime specified in the airflow.cfg file.
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>>  I agree it will be indeed difficult to prevent people from
>> making
>> >> >>>>>>  mistakes, but we should really think about how we should help
>> the
>> >> >>>>>>  operations people to detect and diagnose them. And I think it
>> should
>> >> >>>>>>  be part of specification:
>> >> >>>>>>
>> >> >>>>>>  1) what kind of metrics we are logging for the triggers - I
>> think we
>> >> >>>>>>  should gather and publish (using airflow's metrics system)
>> some useful
>> >> >>>>>>  metrics for the operations people (number of
>> executions/execution
>> >> >>>>>>  length, queuing/delay time vs. expectation for some trigger
>> like date
>> >> >>>>>>  trigger) etc. for all triggers from day one.
>> >> >>>>>>  2) you mentioned it - maybe we should have Debug mode turned
>> on by
>> >> >>>>>>  default:
>> https://docs.python.org/3/library/asyncio-dev.html#debug-mode
>> >> >>>>>>  . It has some useful features, mainly automated logging of too
>> long
>> >> >>>>>>  running async methods. Not sure what consequences it has
>> though.
>> >> >>>>>>  3) max execution time would be even nicer indeed
>> >> >>>>>>  4) maybe there should be some exposure in the UI/CLI/API on
>> what's
>> >> >>>>>>  going in with triggers?
>> >> >>>>>>  5) maybe there should be a way to cancel /disable some
>> triggers that
>> >> >>>>>>  are mis-behaving - using the UI/CLI/API  - until the code gets
>> fixed
>> >> >>>>>>  for those ?
>> >> >>>>>>
>> >> >>>>>>  I think we simply need to document those aspects in
>> "operations"
>> >> >>>>>>  chapter of your proposal. I am happy to propose some draft
>> changes in
>> >> >>>>>>  the AIP if you would like to, after we discuss it here.
>> >> >>>>>>
>> >> >>>>>>> 2) This is why there's no actual _state_ that is persisted -
>> instead, you pass the method you want to call next and its keyword
>> arguments. Obviously we'll need to be quite clear about this in the docs,
>> but I feel this is better than persisting _some_ state. Again, though, this
>> is an implementation detail that would likely be hidden inside an Operator
>> or Sensor from the average DAG user; I'm not proposing we expose this to
>> things like PythonOperator or TaskFlow yet, for the reasons you describe.
>> >> >>>>>>> I wish I had a better API to present for Operator authors, but
>> with the fundamental fact that the Operator/Task is going to run on
>> different machines for different phases of its life, I think having
>> explicit "give me this when you revive me" arguments is the best tradeoff
>> we can go for.
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>>  Agree. We cannot do much about it. I think we should just be
>> very
>> >> >>>>>>  clear when we document the life cycle. Maybe we should even
>> update the
>> >> >>>>>>  semantics of pre/post so that  "pre_execute()" and
>> "post_execute()"
>> >> >>>>>>  are executed for EVERY execute - including the deferred one
>> (also
>> >> >>>>>>  execute_complete()). This way (in your example) the task
>> execution
>> >> >>>>>>  would look like : pre_execute(), execute(-> throw
>> TaskDeferred()),
>> >> >>>>>>  post_execute()   and then on another worker pre_execute(),
>> >> >>>>>>  execute_complete(), post_execute(). I think that would make
>> sense.
>> >> >>>>>>  What do you think?
>> >> >>>>>>
>> >> >>>>>>> 3) I do think we should limit the size of the payload, as well
>> as the kwargs that pass between deferred phases of the task instance -
>> something pretty meaty, like 500KB, would seem reasonable to me. I've also
>> run into the problem in the past that if you design a messaging system
>> without a limit, people _will_ push the size of the things sent up to
>> eyebrow-raisingly-large sizes.
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>>  Yeah. I think XCom should be our benchmark. Currently we have
>> >> >>>>>>  MAX_XCOM_SIZE = 49344. Should we use the same?
>> >> >>>>>>
>> >> >>>>>>  And that leads me to another question, actually very
>> important. I
>> >> >>>>>>  think (correct me please if I am wrong)  it is missing in the
>> current
>> >> >>>>>>  specs. Where the kwargs are going to be stored while the task
>> is
>> >> >>>>>>  deferred? Are they only stored in memory of the triggerer? Or
>> in the
>> >> >>>>>>  DB? And what are the consequences? What happens when the tasks
>> are
>> >> >>>>>>  triggered and the triggerer is restarted? How do we recover?
>> Does it
>> >> >>>>>>  mean that all the tasks that are deferred will have to be
>> restarted?
>> >> >>>>>>  How? Could you please elaborate a bit on that (and I think
>> also it
>> >> >>>>>>  needs a chapter in the specification).
>> >> >>>>>>
>> >> >>>>>>> Overall, I think it's important to stress that the average DAG
>> author should not even know that Triggers really exist; instead, they
>> should just be able to switch Sensors or Operators (e.g. DateTimeSensor ->
>> AsyncDateTimeSensor in my prototype) and get the benefits of deferred
>> operators with no extra thought required.
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>>  That will not work I am afraid :). If we open it up for users
>> to use,
>> >> >>>>>>  we have to deal with consequences. We have to be prepared for
>> people
>> >> >>>>>>  doing all kinds of weird things - at least this is what I've
>> learned
>> >> >>>>>>  during the last 2 years of working on Airflow. See the comment
>> about.
>> >> >>>>>>  If we do not want users to implement custom versions of
>> triggers we
>> >> >>>>>>  should use Enums and a closed set of those. If we create an
>> API an
>> >> >>>>>>  interface - people will use it and create their own, no matter
>> if we
>> >> >>>>>>  want or not.
>> >> >>>>>>
>> >> >>>>>>
>> >> >>>>>>  J.
>> >> >>>>
>> >> >>>>
>> >> >>>>
>> >> >>>>
>> >> >>>> --
>> >> >>>> +48 660 796 129
>> >>
>> >>
>> >>
>> >> --
>> >> +48 660 796 129
>>
>>
>>
>> --
>> +48 660 796 129
>>
>

Reply via email to