Thanks Jarek - some replies to those questions: 1) In general, I'm envisioning Triggers as a thing that are generally abstracted away from DAG authors - instead, they would come in a provider package (or core Airflow) and so we would be expecting the same higher level of quality and testing.
That said, you're right that python's async support comes with a giant caveat in the form of a badly-written trigger blocking the entire loop. Python has some built-in protections to detect and alert on these (e.g. PYTHONASYNCIODEBUG=1), but in general it is a core design flaw of the async style that core Python has chosen. So: a) I do think it should all be fixed as asyncio, mostly because the library support is there and you don't suddenly want to make people have to run multiple "flavours" of triggerer process based on how many triggers they're using and what runtime loops they demand. If trio were more popular, I'd pick that as it's safer and (in my opinion) better-designed, but we are unfortunately nowhere near that, and in addition this is not something that is impossible to add in at a later stage. b) There's definitely some hooks we can use to detect long-running triggers, and I'll see if I can grab some of them and implement them. At very least, there's the SIGALRM watchdog method, and I believe it's possible to put nice timeouts around asyncio tasks, which we could use to enforce a max runtime specified in the airflow.cfg file. 2) This is why there's no actual _state_ that is persisted - instead, you pass the method you want to call next and its keyword arguments. Obviously we'll need to be quite clear about this in the docs, but I feel this is better than persisting _some_ state. Again, though, this is an implementation detail that would likely be hidden inside an Operator or Sensor from the average DAG user; I'm not proposing we expose this to things like PythonOperator or TaskFlow yet, for the reasons you describe. I wish I had a better API to present for Operator authors, but with the fundamental fact that the Operator/Task is going to run on different machines for different phases of its life, I think having explicit "give me this when you revive me" arguments is the best tradeoff we can go for. 3) I do think we should limit the size of the payload, as well as the kwargs that pass between deferred phases of the task instance - something pretty meaty, like 500KB, would seem reasonable to me. I've also run into the problem in the past that if you design a messaging system without a limit, people _will_ push the size of the things sent up to eyebrow-raisingly-large sizes. Overall, I think it's important to stress that the average DAG author should not even know that Triggers really exist; instead, they should just be able to switch Sensors or Operators (e.g. DateTimeSensor -> AsyncDateTimeSensor in my prototype) and get the benefits of deferred operators with no extra thought required. Obviously, this isn't as flexible as I'd like immediately, and it would be more exposed if/when we wanted to add the ability to start DAGs based on triggers, but as a first phase to get it in, tested, and running, I think it's preferable than going all-out trying to invent a safe DAG-author-facing API to use triggers within their own Python code. Andrew On Mon, Apr 26, 2021 at 12:45 PM Jarek Potiuk <ja...@potiuk.com> wrote: > Sorry I missed the original thread, it is very interesting :). > > I love the Trigger concept, and I have two comments/questions, mostly > ones that might make the concept more "fool-proof" for even casual > users, who might not understand the intrinsic details of Triggers and > not understand how they impact Task behaviour. Since Airflow is a pure > python, we know that sometimes the users of Airflow might (often > unknowingly) misuse certain features of Airflow. For example I am > myself guilty (when I first started to use Airflow) of running > multiple Variable retrievals while parsing DAGs (which as we know > might lead to a loot of DB queries). I think we should be careful here > to be gently guiding and helping our users to not fall into some > potential traps they might not be aware of. > > 1) Execution/threading model and potential of blocking the mechanism > by "badly written" triggers. > > My first point is the proposed threading model we are going to use to > execute the async-compliant methods. From what I understand (and > please correct me if I am wrong) how async coroutines work, a single > event loop runs all the coroutines in a single thread. This basically > means that the whole system can get in trouble if any coroutine starts > running a loooooooong operation, or even worse - when it mistakenly > runs a blocking (non-async-io for example) operation. If we are > running all coroutines in a single event loop, my understanding is > that no other coroutines from the same event loop will run while the > long running operation finishes in the "culprit" coroutine. Of course > - there are different ways you can run the coroutines - they can run > in threads, they can run in a number of event loops, etc. etc. But > those have costs and they need to be explicitly started. Sharing a > single event queue is fine if you have the same "application" where > all the "developers" of that application are aware that all > async/await coroutines should not run blocking operations and behave > "nicely". But I am not sure this will be that obvious to Dag > developers and they might not be aware of consequences of badly > written triggers. They might (even unknowingly or by mistake) execute > a blocking or long lasting operation that might stop all other > triggers from running. > > The problem with Airflow is that often DAGs in single installation are > written by multiple teams (data scientists etc.) and then operated by > yet another team (Data engineers/devops). If we are running a single > event loop for all triggers written by different teams, we might > easily end up with the situation that the operations team will have to > diagnose and debug delays or even blocking behaviour introduced by one > team but impacting another team. This is not nice to debug/diagnose. > > I think we should be very careful here and provide the right guidance, > prevention and possibly even detection of badly written triggers to > all the teams for that feature, and possibly even some level of > isolation between different groups of triggers (running in different > event loops). I guess I have two questions there: > > a) what is the threading/event loop model we want to implement, and > whether it should be configurable. If configurable - how? Should the > user be able to choose the event loop that is used for the task? > Should it be per DAG/TaskGroup/Otherwise?). Any thoughts here? > b) is this possible to implement some ways to prevent, detect (and > possibly automatically disable) or at the very least diagnose (log!) > badly written triggers? In a number of async solutions I saw (mainly > coming from the mobile development world), there are some modes that > you can run async operations and get logs/reports/cancelled coroutines > when such a coroutine executes too long or enters a blocking > operation. Should we do something similar here? Any thoughts about it > ? > > 2) Sharing state between execute()/execute_complete() methods and > pre_execute/post_execute contract. > > I am not sure if we can do something about it, but I think there is a > potential problem with keeping "state" of such tasks and expectations > of the DAG developers about it. I think users (DAG writers) might not > be entirely aware that .execute(), .execute_complete() might be > executed on a different object, and they might as well run > `self.external_id=<something>` hoping that they will find it when > execute_complete() runs. This is not what happens for example > currently with pre_execute(), execute() and post_execute() methods. > From what I know there is a guarantee those three methods will always > execute on the same object. This is of course not the way it should > be with triggers (payload of the trigger should take care of the state > passing). But this might be pretty unexpected behaviour for some of > the more complex use cases and operators, where > pre_execute/post_execute might be used to implement some cross-cutting > concerns between operators. Any thoughts here? > > 3) Should we set some limits on the "payload" size and content ? > > We already had past experiences where people tried to use XCom to pass > actual data between tasks. Undoubtedly our users will try to do the > same with the event payload. Should we set an explicit limit on the > size of the payload (same as for Xcom?). Should it be the size of the > serialized form of the Dictionary ? Should we put the same > restrictions as we have on XCom for the data being serialized there? > > J. > > > > On Mon, Apr 26, 2021 at 7:15 PM Jarek Potiuk <ja...@potiuk.com> wrote: > > > > Not that soon - December 2021 is the EOL for Python 3.6, so I while it > could be only 3.7+ feature, but warnings would still be nice. I don't think > we discussed either timeline or scope for 2.1 or 2.2 but I think having > them sooner both before December might be a good idea :) > > > > J. > > > > > > On Mon, Apr 26, 2021 at 7:09 PM Daniel Imberman < > daniel.imber...@gmail.com> wrote: > >> > >> @ash since we’re dropping 3.6 support soon anyways, would we even need > this warning? Based on the POC it seems that this involves core changes, so > I imagine if this is only 2.2.X compatible then ending py3.6 support for > 2.2.X would be sufficient? (unless truly ending 3.6 support would need to > wait for a 3.X release?) > >> > >> On Mon, Apr 26, 2021 at 9:08 AM, Andrew Godwin < > andrew.god...@astronomer.io.invalid> wrote: > >> > >> Yes - the prototype currently has a `airflow.utils.asyncio` module that > tries to adapt things for 3.6, which I will likely just replace with a > series of very polite errors about needing 3.7+. > >> > >> Andrew > >> > >> On Mon, Apr 26, 2021 at 9:50 AM Ash Berlin-Taylor <a...@apache.org> > wrote: > >>> > >>> Cool! > >>> > >>> I see that you are using async code for it in the draft PR, and I > remember you saying somewhere that Python 3.6 (which is still supported) > doesn't have great support for this. > >>> > >>> Should we have an explicit check for Py 3.6 and refuse to allow > triggers to be run there? (and not let `trigger` process run either. > >>> > >>> -ash > >>> > >>> On Mon, Apr 26 2021 at 14:37:23 +0100, Kaxil Naik <kaxiln...@gmail.com> > wrote: > >>> > >>> Thanks Andrew, that answers my questions. > >>> > >>> If we don't have any other questions by the end of the week we should > start a VOTE. > >>> > >>> Regards, > >>> Kaxil > >>> > >>> On Tue, Apr 20, 2021 at 2:24 AM Andrew Godwin < > andrew.god...@astronomer.io.invalid> wrote: > >>>> > >>>> Thanks Kaxil - notes on those two things: > >>>> > >>>> - A timeout is probably a reasonable thing to have in most > situations, as it gives you that little bit of self-healing ability (I put > a created timestamp into the Trigger schema partially out of this kind of > caution). I'll update the AIP to mention that triggers will come with an > optional timeout. > >>>> > >>>> - Correct, users who don't want the new process shouldn't be affected > at all. The main bad failure case here is that if you don't have the > process and then try to use a deferred operator, it would silently hang > forever; my plan was to take a cue from the code in the webserver that > warns you the scheduler isn't running, and do something similar for > triggers, somewhere (it can't be an immediate error when you try to run the > task, as the triggerer process may just be down for a few seconds for a > redeploy, or similar). > >>>> > >>>> Andrew > >>>> > >>>> On Mon, Apr 19, 2021 at 6:10 PM Kaxil Naik <kaxiln...@gmail.com> > wrote: > >>>>> > >>>>> +1 > >>>>> > >>>>> This is awesome Andrew, this is going to be a huge cost saver. > >>>>> > >>>>> The AIP is quite detailed and the Draft PR certainly helps. > >>>>> > >>>>> Just minor comments: > >>>>> > >>>>> Should we have a timeout on publishing a Trigger / Task to the > triggerer similar to > https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#operation-timeout > >>>>> In How are users affected by the change? section, do the users who > don't want to use the new process affected by it. Just confirming that if a > user does not opt-in they are unaffected. > >>>>> > >>>>> Regards, > >>>>> Kaxil > >>>>> > >>>>> On Thu, Apr 15, 2021 at 9:34 PM Andrew Godwin < > andrew.god...@astronomer.io.invalid> wrote: > >>>>>> > >>>>>> Hi all, > >>>>>> > >>>>>> After a period of designing and prototyping, I've completed a first > draft of AIP-40 that I'd love to get some feedback on from the community: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177050929 > >>>>>> > >>>>>> This AIP proposes a way of adding what I'm calling "deferrable" > Operators into Airflow - essentially, taking the goal of Smart Sensors and > making it much more generalisable, where any Sensor or Operator can choose > to "defer" its execution based on an asynchronous trigger, and where all > the triggers run in one (or more) processes for efficiency. > >>>>>> > >>>>>> It also means that any Operator can be made deferrable in a > backwards-compatible way should we wish to in future, though I'm not > proposing that for a first release. > >>>>>> > >>>>>> It comes with a working prototype, too, should you wish to see what > kind of code footprint this would have: > https://github.com/apache/airflow/pull/15389 > >>>>>> > >>>>>> I personally think this would be a huge improvement for Airflow's > efficiency - I suspect we might be able to reduce the amount of resources > some Airflow installs use by over 50% if all their idling operators were > ported to be deferrable - but I would welcome further opinions. > >>>>>> > >>>>>> Thanks, > >>>>>> Andrew > > > > > > > > -- > > +48 660 796 129 > > > > -- > +48 660 796 129 >