Hey Andrew, Just don't get me wrong :). I love the idea/AIP proposal. Just want to make sure that from day one we think about operational aspects of it. I think the easier we make it for the operations people, the less we will have to deal with their problems in the devlist/Github issues.
I would love to hear what others think about it - maybe some folks from the devlist who operate Airflow in scale could chime in here - we have some people from AirBnB/Twitter etc... Maybe you could state expectations when it comes to the operational side if you'd have 1000s of Triggers that potentially interfere with each other :) ? On Mon, Apr 26, 2021 at 9:21 PM Andrew Godwin <andrew.god...@astronomer.io.invalid> wrote: > 1) In general, I'm envisioning Triggers as a thing that are generally > abstracted away from DAG authors - instead, they would come in a provider > package (or core Airflow) and so we would be expecting the same higher level > of quality and testing. I see the intention, but we would have to have pretty comprehensive set of triggers from day one - similar to the different types of "rich intervals" we have in https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-39+Richer+scheduler_interval. Maybe a list (groups) of the triggers we would like to have as "built-ins" would be really helpful? But even then, I think the "extendability" of this solution is what I like about it. More often than not, users will find out that they need something custom. I think if we describe the interface that the Trigger should implement and make it a "first-class" citizen - similarly as all other concepts in Airflow, it is expected that users will override them - Operators, Sensors, even the new "Richer schedules" are "meant" to be implemented by the users. If they are not, we should not make it public but rather have (and accept) only a fixed set of those - and for that we could just implement an Enum of available Triggers. By providing an interface, we invite our users to implement their own custom Triggers, and I think we need to deal with consequences. I think we have to think about what happens when users start writing their triggers and what "toolbox" we give the people operating Airflow to deal with that. > a) I do think it should all be fixed as asyncio, mostly because the > library support is there and you don't suddenly want to make people have to > run multiple "flavours" of triggerer process based on how many triggers > they're using and what runtime loops they demand. If trio were more popular, > I'd pick that as it's safer and (in my opinion) better-designed, but we are > unfortunately nowhere near that, and in addition this is not something that > is impossible to add in at a later stage. Asyncio would also be my choice by far so we do not differ here :) >From what I understand, you propose a single event loop to run all the deferred tasks? Am I right? My point is that multiple event loops or Thread-based async running are also part of asyncio. No problem with that. Asyncio by default uses a single event loop, but there is no problem to use more. This would be quite similar to "queues" we currently have with celery workers. I am not telling we should, but I can see the case where this might be advisable (for example to isolate groups of the deferred tasks so that they do not interfere/delay the other group). What do you think? > b) There's definitely some hooks we can use to detect long-running > triggers, and I'll see if I can grab some of them and implement them. At very > least, there's the SIGALRM watchdog method, and I believe it's possible to > put nice timeouts around asyncio tasks, which we could use to enforce a max > runtime specified in the airflow.cfg file. I agree it will be indeed difficult to prevent people from making mistakes, but we should really think about how we should help the operations people to detect and diagnose them. And I think it should be part of specification: 1) what kind of metrics we are logging for the triggers - I think we should gather and publish (using airflow's metrics system) some useful metrics for the operations people (number of executions/execution length, queuing/delay time vs. expectation for some trigger like date trigger) etc. for all triggers from day one. 2) you mentioned it - maybe we should have Debug mode turned on by default: https://docs.python.org/3/library/asyncio-dev.html#debug-mode . It has some useful features, mainly automated logging of too long running async methods. Not sure what consequences it has though. 3) max execution time would be even nicer indeed 4) maybe there should be some exposure in the UI/CLI/API on what's going in with triggers? 5) maybe there should be a way to cancel /disable some triggers that are mis-behaving - using the UI/CLI/API - until the code gets fixed for those ? I think we simply need to document those aspects in "operations" chapter of your proposal. I am happy to propose some draft changes in the AIP if you would like to, after we discuss it here. > 2) This is why there's no actual _state_ that is persisted - instead, you > pass the method you want to call next and its keyword arguments. Obviously > we'll need to be quite clear about this in the docs, but I feel this is > better than persisting _some_ state. Again, though, this is an implementation > detail that would likely be hidden inside an Operator or Sensor from the > average DAG user; I'm not proposing we expose this to things like > PythonOperator or TaskFlow yet, for the reasons you describe. > I wish I had a better API to present for Operator authors, but with the > fundamental fact that the Operator/Task is going to run on different machines > for different phases of its life, I think having explicit "give me this when > you revive me" arguments is the best tradeoff we can go for. Agree. We cannot do much about it. I think we should just be very clear when we document the life cycle. Maybe we should even update the semantics of pre/post so that "pre_execute()" and "post_execute()" are executed for EVERY execute - including the deferred one (also execute_complete()). This way (in your example) the task execution would look like : pre_execute(), execute(-> throw TaskDeferred()), post_execute() and then on another worker pre_execute(), execute_complete(), post_execute(). I think that would make sense. What do you think? > 3) I do think we should limit the size of the payload, as well as the kwargs > that pass between deferred phases of the task instance - something pretty > meaty, like 500KB, would seem reasonable to me. I've also run into the > problem in the past that if you design a messaging system without a limit, > people _will_ push the size of the things sent up to eyebrow-raisingly-large > sizes. Yeah. I think XCom should be our benchmark. Currently we have MAX_XCOM_SIZE = 49344. Should we use the same? And that leads me to another question, actually very important. I think (correct me please if I am wrong) it is missing in the current specs. Where the kwargs are going to be stored while the task is deferred? Are they only stored in memory of the triggerer? Or in the DB? And what are the consequences? What happens when the tasks are triggered and the triggerer is restarted? How do we recover? Does it mean that all the tasks that are deferred will have to be restarted? How? Could you please elaborate a bit on that (and I think also it needs a chapter in the specification). > Overall, I think it's important to stress that the average DAG author should > not even know that Triggers really exist; instead, they should just be able > to switch Sensors or Operators (e.g. DateTimeSensor -> AsyncDateTimeSensor in > my prototype) and get the benefits of deferred operators with no extra > thought required. That will not work I am afraid :). If we open it up for users to use, we have to deal with consequences. We have to be prepared for people doing all kinds of weird things - at least this is what I've learned during the last 2 years of working on Airflow. See the comment about. If we do not want users to implement custom versions of triggers we should use Enums and a closed set of those. If we create an API an interface - people will use it and create their own, no matter if we want or not. J.