Do we reach any consensus on this topic /AIP? I think persisting DAG is pretty important actually.
-Tao On Tue, Mar 12, 2019 at 3:01 AM Kevin Yang <yrql...@gmail.com> wrote: > Hi Fokko, > > As a large cluster maintainer, I’m not a big fan of large DAG files > neither. But I’m not sure if I’ll consider this bad practice. We have some > large frameworks, e.g. experimentation and machine learning, that are > complex by nature and generate large number of DAGs from their customer > configs to get better flexibility. I consider them as advance use cases of > Airflow and open up a lot potentials for Airflow, unless we’ve previously > set some boundaries around how complex DAG codes can be that I’m not aware > of. About resulting in an unworkable situation, yes we are experiencing > pain from having such large DAG files, mainly on the webserver side, but > the system overall are running stable. We are actually hoping to improve > the situation by applying solutions like making webserver stateless. It is > ok that if the owners of large DAG files need to pay but we should try > minimize the price—longer refresh interval, extra task running time, but > nothing too crazy. > > > I think we’re aligned on storing info in DB as long as we can meet the > requirements Dan mentioned earlier—we just need that balance decided, so > I’m gonna skip this part( out of all the requirements, No.1 seems to be > least clear, maybe we can expand on that). One thing about the proposed > idea is that we implicitly couple DagRun with DAG version, which at the > first glance make sense but imo not very ideal. I feel full versioning > should track all changes instead of tracking changes only when we create > DagRun. E.g. my task failed and I merged new code to fix my task and I want > to rerun it with the current code, with serialize DAG during DagRun > creation time we won’t have the up to date snapshot—sure we can work around > it by like always keep a current snapshot of DAG but this is kinda messy > and confusing. This is what popped up on the top of my head and w/o full > versioning we might have some other tricky cases, e.g. ur backfill case. > But I just gave a few thoughts into this and you might already have a > complete story that will void my concerns. > > > Cheers, > Kevin Y > > On Sun, Mar 10, 2019 at 11:29 AM Driesprong, Fokko <fo...@driesprong.frl> > wrote: > > > Thanks Kevin for opening the discussion. I think it is important to have > a > > clear overview on how to approach the AIP. > > > > First of all, how many DAGs do we have that take 30s to parse? I consider > > this bad practice, and this would also result in an unworkable situation > > with the current setup of Airflow since it will take a lot of resources > on > > the webserver/scheduler, and the whole system will become unresponsive. I > > will be hard to cope with such DAGs in general. > > > > The idea from the AIP is to have the versioned version of the dag in the > > DB, so in the end, you won't need to parse the whole thing every time. > Only > > when you trigger a DAG, or when you want to see the current status of the > > dag. > > > > Like stated earlier, I strongly feel we shouldn't serialize the DAGs as > > JSON(5) or pickles in general. For me, this is deferring the pain of > > setting up a structure of the DAG object itself. > > Having the DAG denormalized in the database will give us cleaner storage > of > > our DAG. We can, for example, enforce fields by making them not null, so > we > > know that is something is off at write time, instead of read. > Furthermore, > > we're missing logical types such as dates, which we efficiently can query > > using the indices of the database. > > Also, with all serialization formats, evolution isn't trivial. Consider > the > > situations when: > > - We're introducing a new field, and it might be null, therefore we need > to > > bake in all kinds of logic into the Airflow code, which you don't want. > > With proper migration scripts, you could prefill these fields, and make > > them not null. > > - Changing the models, for example, you still can't change a string-type > > into a integer with adding custom logic. In this case, the reviewer needs > > to be extra careful that there are no breaking changes introduced. Right > > now we're doing minimal forward- and backward compatibilitytesting. > > > > In the case we get too many migrations, we could also squash (some of > them) > > when preparing the release. > > > > Personally, I don't think the serialization is the issue here. As Max > > already mentioned, it is the optimal balance of (de)normalization. From > the > > user perspective, the serialization won't change much of the behaviour of > > Airflow. > > > > For me, instead of having `DAG.serialize()` and `DAG.deser(version)` is > not > > the ideal approach. But it might be that we're on the same page :-) I > > believe it should be something like `DagRun.find('fokkos_dag', > > datetime(2018, 03, 01))` and construct the correct version of the dag. > > Since there is an uniqueness constrain on dag_id, datetime, this will > > always return the same dag. You will get the versioned DagRun as it ran > > that time. Serializing the fields adn storing them in the database should > > happen transparently when you update the DAG object. When you run a dag, > > you'll parse the dag, and then run it. `Dag().create_dagrun(...)`, this > > will create a DagRun as the name suggests, if the version of the dag > still > > exists in the database, it will reuse that one, otherwise it will create > a > > new version of the DAG (with all the operators etc). In this sense the > > version of the DAGs should be done within the Dag(Run). > > > > The versioning will change the behavour from a user perspective. Right > now > > we store only a single version. For example, the poor mans backfilling > > won't work anymore. This is clearing the state from past&future, up- and > > downstream, and let it catch up again. > > In this case, the old version of the DAG won't exists anymore, and > > potentially there are tasks that aren't in the code anymore. In this case > > we need to clear the version of the dag, and rerun it with the latest > > version `DagRun.find('fokkos_dag', datetime(2018, 03, 01)).clear()`. How > we > > are going to do clear's downstram in the middle of the dag, that is > > something I still have to figure out. Because potentially there are tasks > > that can't be rerun because the underlying Python code has changed, both > on > > user level as on Airflow level. It will be impossible to get these > features > > pure in that sense. > > I would not suggest adding a new status in here, indicating that the task > > can't be rerun since it isn't part of the DAG anymore. We have to find > the > > balance here in adding complexity (also to the scheduler) and features > that > > we need to introduce to help the user. > > > > Cheers, Fokko > > > > Ps. Jarek, interesting idea. It shouldn't be too hard to make Airflow > more > > k8s native. You could package your dags within your container, and do a > > rolling update. Add the DAGs as the last layer, and then point the DAGs > > folder to the same location. The hard part here is that you need to > > gracefuly restart the workers. Currently AFAIK the signals given to the > pod > > aren't respected. So when the scheduler/webserver/worker receives a > > SIGTERM, it should stop the jobs nicely and then exit the container, > before > > k8s kills the container using a SIGKILL. This will be challenging with > the > > workers, which they are potentially long-running. Maybe stop kicking off > > new jobs, and let the old ones finish, will be good enough, but then we > > need to increase the standard kill timeout substantially. Having this > would > > also enable the autoscaling of the workers. > > > > > > > > Op za 9 mrt. 2019 om 19:07 schreef Maxime Beauchemin < > > maximebeauche...@gmail.com>: > > > > > I want to raise the question of the amount of normalization we want to > > use > > > here as it seems the to be an area that needs more attention. > > > > > > The SIP suggest having DAG blobs, task blobs and edges (call it the > > > fairly-normalized). I also like the idea of all-encompassing (call it > > > very-denormalized) DAG blobs as it seems easier to manage in terms of > > > versioning. The question here is whether we go with one of these method > > > exclusively, something in-between or even a hybrid approach (redundant > > > blobs that use different level of normalization). > > > > > > It's nice and simple to just push or pull DAG atomic objects with a > > version > > > stamp on it. It's clearly simpler than dealing with 3 versioned tables > > > (dag, tasks, edges). There are a lot of pros/cons, and they become more > > > apparent with the perspective of very large DAGs. If the web server is > > > building a "task details page", using the "fairly-normalized" model, it > > can > > > just pull what it needs instead of pulling the large DAG blob. > Similarly, > > > if building a sub-tree view (a subset of the DAG), perhaps it can only > > > retrieve what it needs. But if you need the whole DAG (say for the > > > scheduler use case) then you're dealing with more complex SQL/ORM > > > operations (joins hopefully, or multiple db round trips) > > > > > > Now maybe the right approach is more something like 2 tables: DAG and > > > task_details, where edges keys are denormalized into DAG (arguably > > that's a > > > few KBs at most, even for large DAGs), and maybe the DAG object has > most > > of > > > the high level task metadata information (operator, name, baseoperator > > key > > > attrs), and task_details has the big blobs (SQL code). This is > probably a > > > nice compromise, the question becomes "how much task-level detail do we > > > store in the DAG-centric blog?", probably not much to keep the DAG > > objects > > > as small as possible. The main downside here is that you cannot have > the > > > database join and have to do 2 round trips to reconstruct a DAG object > > > (fetch the DAG, parse the object to get the list of tasks, and then run > > > another db query to get those task details). > > > > > > To resume, I'd qualify the more normalized approach as the most proper, > > but > > > also the more complex. It'll shine in specific cases around large DAGs. > > If > > > we have the proper abstractions (methods like DAG.serialize(), > > > DAG.deser(version)) then I guess that's not an issue. > > > > > > Max > > > > > > On Fri, Mar 8, 2019 at 5:21 PM Kevin Yang <yrql...@gmail.com> wrote: > > > > > > > Hi Julian, I'm definitely aligned with you guys on making the > webserver > > > > independent of DAG parsing, just the end goal to me would be to > build a > > > > complete story around serializing DAG--and move with the story in > > mind. I > > > > feel like you guys may already have a list of dynamic features we > need > > to > > > > deprecate/change, if that is the case feel free to open the > discussion > > on > > > > what we do to them with DAG serialization. > > > > > > > > Julian, Ash, Dan, on 2nd thought I do agree that if we can meet the > > > > requirements Dan mentioned, it would be nice to have them stored in > the > > > DB. > > > > Some combined solutions like having a column of serialized graph in > the > > > > serialized dag table can potentially meet all requirements. What > format > > > we > > > > end up using to represent DAG between components is now less > important > > > > IMO--fine to refactor those endpoints only need DagModel to use only > > > > DagModel, easy to do a batch replacement if we decide otherwise > later. > > > More > > > > important is to define this source of truth for serialized DAG. > > > > > > > > Ash, ty for the email list, I'll tune my filters accordingly :D I'm > > > leaning > > > > towards having a separate process for the parser so we got no > scheduler > > > > dependency etc for this parser but we can discuss this in another > > thread. > > > > > > > > On Fri, Mar 8, 2019 at 8:57 AM Dan Davydov > > <ddavy...@twitter.com.invalid > > > > > > > > wrote: > > > > > > > > > > > > > > > > Personally I don’t understand why people are pushing for a > > JSON-based > > > > DAG > > > > > > representation > > > > > > > > > > It sounds like you agree that DAGs should be serialized (just in > the > > DB > > > > > instead of JSON), so will only address why JSON is better than > MySQL > > > (AKA > > > > > serializing at the DAG level vs the task level) as far as I can > see, > > > and > > > > > not why we need serialization. If you zoom out and look at all the > > use > > > > > cases of serialized DAGs, e.g. having the scheduler use them > instead > > of > > > > > parsing DAGs directly, then it becomes clear that we need all > > > appropriate > > > > > metadata in these DAGs, (operator params, DAG properties, etc), in > > > which > > > > > case it's not clear how it will fit nicely into a DB table (unless > > you > > > > > wanted to do something like (parent_task_id, task_id, task_params), > > > also > > > > > keep in mind that we will need to store different versions of each > > DAG > > > in > > > > > the future so that we can ensure consistency in a dagrun, i.e. each > > > task > > > > in > > > > > a dagrun uses the same version of a DAG. > > > > > > > > > > I think some of our requirements should be: > > > > > 1. The data model will lead to acceptable performance in all of its > > > > > consumers (scheduler, webserver, workers), i.e. no n+1 access > > patterns > > > > (my > > > > > biggest concern about serializing at task level as you propose vs > at > > > DAG > > > > > level) > > > > > 2. We can have versioning of serialized DAGs > > > > > 3. The ability to separate DAGs into their own data store (e.g. no > > > > reliance > > > > > on joins between the new table and the old one) > > > > > 4. One source of truth/serialized representation for DAGs > (currently > > we > > > > > have SimpleDAG) > > > > > > > > > > If we can full-fill all of these requirements and serialize at the > > task > > > > > level rather than the DAG level in the DB, then I agree that > probably > > > > makes > > > > > more sense. > > > > > > > > > > > > > > > > In the proposed PR’s we (Peter, Bas and me) aim to avoid > re-parsing > > > DAG > > > > > > files by querying all the required information from the database. > > In > > > > one > > > > > or > > > > > > two cases this may however not be possible, in which case we > might > > > > either > > > > > > have to fall back on the DAG file or add the missing information > > into > > > > the > > > > > > database. We can tackle these problems as we encounter them. > > > > > > > > > > I think you would have the support of many of committers in > removing > > > any > > > > > use-cases that stand in the way of full serialization, that being > > said > > > if > > > > > we need to remove features we need to do this carefully and > > > thoughtfully, > > > > > and ideally with proposed alternatives/work-arounds to cover the > > > > removals. > > > > > > > > > > The counter argument: this PR removes the need for the confusing > > > > "Refresh" > > > > > > button from the UI, and in general you only pay the cost for the > > > > > expensive > > > > > > DAGs when you ask about them. (I don't know what/when we call the > > > > > > /pickle_info endpoint of the top of my head) > > > > > > > > > > Probably worth splitting out into a separate thread, but I'm > actually > > > not > > > > > sure the refresh button does anything, I think we should double > > > check... > > > > I > > > > > think about 2 years ago there was a commit made that made gunicorn > > > > > webservers automatically rotate underneath flask (each one would > > > reparse > > > > > the DAGbag). Even if it works we should probably remove it since > the > > > > > webserver refresh interval is pretty fast, and it just causes > > confusion > > > > to > > > > > users and implies that the DAGs are not refreshed automatically. > > > > > > > > > > Do you mean https://json5.org/ or is this a typo? That might be > okay > > > > for a > > > > > > nicer user front end, but the "canonical" version stored in the > DB > > > > should > > > > > > be something "plainer" like just JSON. > > > > > > > > > > I think he got this from my reply, and it was just an example, but > > you > > > > are > > > > > right, I agree JSON would be better than JSON5. > > > > > > > > > > On Fri, Mar 8, 2019 at 8:53 AM Ash Berlin-Taylor <a...@apache.org> > > > wrote: > > > > > > > > > > > Comments inline. > > > > > > > > > > > > > On 8 Mar 2019, at 11:28, Kevin Yang <yrql...@gmail.com> wrote: > > > > > > > > > > > > > > Hi all, > > > > > > > When I was preparing some work related to this AIP I found > > > something > > > > > > very concerning. I noticed this JIRA ticket < > > > > > > https://issues.apache.org/jira/browse/AIRFLOW-3562> is trying to > > > > remove > > > > > > the dependency of dagbag from webserver, which is awesome--we > > wanted > > > > > badly > > > > > > but never got to start work on. However when I looked at some > > > subtasks > > > > of > > > > > > it, which try to remove dagbag dependency from each endpoint, I > > found > > > > the > > > > > > way we remove the dependency of dagbag is not very ideal. For > > example > > > > > this > > > > > > PR <https://github.com/apache/airflow/pull/4867/files> will > > require > > > us > > > > > to > > > > > > parse the dag file each time we hit the endpoint. > > > > > > > > > > > > The counter argument: this PR removes the need for the confusing > > > > > "Refresh" > > > > > > button from the UI, and in general you only pay the cost for the > > > > > expensive > > > > > > DAGs when you ask about them. (I don't know what/when we call the > > > > > > /pickle_info endpoint of the top of my head) > > > > > > > > > > > > This end point may be one to hold off on (as it can ask for > > multiple > > > > > dags) > > > > > > but there are some that def don't need a full dag bag or to even > > > parse > > > > > the > > > > > > dag file, the current DAG model has enough info. > > > > > > > > > > > > > > > > > > > > > > > > > > > If we go down this path, we indeed can get rid of the dagbag > > > > dependency > > > > > > easily, but we will have to 1. increase the DB load( not too > > > concerning > > > > > at > > > > > > the moment ), 2. wait the DAG file to be parsed before getting > the > > > page > > > > > > back, potentially multiple times. DAG file can sometimes take > > quite a > > > > > while > > > > > > to parse, e.g. we have some framework DAG files generating large > > > number > > > > > of > > > > > > DAGs from some static config files or even jupyter notebooks and > > they > > > > can > > > > > > take 30+ seconds to parse. Yes we don't like large DAG files but > > > people > > > > > do > > > > > > see the beauty of code as config and sometimes heavily > > abuseleverage > > > > it. > > > > > > Assuming all users have the same nice small python file that can > be > > > > > parsed > > > > > > fast, I'm still a bit worried about this approach. Continuing on > > this > > > > > path > > > > > > means we've chosen DagModel to be the serialized representation > of > > > DAG > > > > > and > > > > > > DB columns to hold different properties--it can be one candidate > > but > > > I > > > > > > don't know if we should settle on that now. I would personally > > > prefer a > > > > > > more compact, e.g. JSON5, and easy to scale representation( such > > that > > > > > > serializing new fields != DB upgrade). > > > > > > > > > > > > Do you mean https://json5.org/ or is this a typo? That might be > > okay > > > > for > > > > > > a nicer user front end, but the "canonical" version stored in the > > DB > > > > > should > > > > > > be something "plainer" like just JSON. > > > > > > > > > > > > I'm not sure that "serializing new fields != DB upgrade" is that > > big > > > > of a > > > > > > concern, as we don't add fields that often. One possible way of > > > dealing > > > > > > with it if we do is to have a hybrid approach - a few distinct > > > columns, > > > > > but > > > > > > then a JSON blob. (and if we were only to support postgres we > could > > > > just > > > > > > use JSONb. But I think our friends at Google may object ;) ) > > > > > > > > > > > > Adding a new column in a DB migration with a default NULL > shouldn't > > > be > > > > an > > > > > > expensive operation, or difficult to achieve. > > > > > > > > > > > > > > > > > > > > > > > > > > In my imagination we would have to collect the list of dynamic > > > > features > > > > > > depending on unserializable fields of a DAG and start a > > > discussion/vote > > > > > on > > > > > > dropping support of them( I'm working on this but if anyone has > > > already > > > > > > done so please take over), decide on the serialized > representation > > > of a > > > > > DAG > > > > > > and then replace dagbag with it in webserver. Per previous > > discussion > > > > and > > > > > > some offline discussions with Dan, one future of DAG > serialization > > > > that I > > > > > > like would look similar to this: > > > > > > > > > > > > > > > > > > > > https://imgur.com/ncqqQgc > > > > > > > > > > > > Something I've thought about before for other things was to embed > > an > > > > API > > > > > > server _into_ the scheduler - this would be useful for k8s > > > > healthchecks, > > > > > > native Prometheus metrics without needed statsd bridge, and could > > > have > > > > > > endpoints to get information such as this directly. > > > > > > > > > > > > I was thinking it would be _in_ the scheduler process using > either > > > > > threads > > > > > > (ick. Python's still got a GIL doesn't it?) or using > async/twisted > > > etc. > > > > > > (not a side-car process like we have with the logs webserver for > > > > `airflow > > > > > > worker`). > > > > > > > > > > > > (This is possibly an unrelated discussion, but might be worth > > talking > > > > > > about?) > > > > > > > > > > > > > We can still discuss/vote which approach we want to take but I > > > don't > > > > > > want the door to above design to be shut right now or we have to > > > spend > > > > a > > > > > > lot effort switch path later. > > > > > > > > > > > > > > Bas and Peter, I'm very sorry to extend the discussion but I do > > > think > > > > > > this is tightly related to the AIP and PRs behind it. And my > > sincere > > > > > > apology for bringing this up so late( I only pull the open PR > list > > > > > > occasionally, if there's a way to subscribe to new PR event I'd > > love > > > to > > > > > > know how). > > > > > > > > > > > > It's noisy, but you can subscribe to comm...@airflow.apache.org > > (but > > > > be > > > > > > warned, this also includes all Jira tickets, edits of every > comment > > > on > > > > > > github etc.). > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > Kevin Y > > > > > > > > > > > > > > On Thu, Feb 28, 2019 at 1:36 PM Peter van t Hof < > > > > pjrvant...@gmail.com > > > > > > <mailto:pjrvant...@gmail.com>> wrote: > > > > > > > Hi all, > > > > > > > > > > > > > > Just some comments one the point Bolke dit give in relation of > my > > > PR. > > > > > > > > > > > > > > At first, the main focus is: making the webserver stateless. > > > > > > > > > > > > > > > 1) Make the webserver stateless: needs the graph of the > > *current* > > > > dag > > > > > > > > > > > > > > This is the main goal but for this a lot more PR’s will be > coming > > > > once > > > > > > my current is merged. For edges and graph view this is covered in > > my > > > PR > > > > > > already. > > > > > > > > > > > > > > > 2) Version dags: for consistency mainly and not requiring > > parsing > > > > of > > > > > > the > > > > > > > > dag on every loop > > > > > > > > > > > > > > In my PR the historical graphs will be stored for each DagRun. > > This > > > > > > means that you can see if an older DagRun was the same graph > > > structure, > > > > > > even if some tasks does not exists anymore in the current graph. > > > > > Especially > > > > > > for dynamic DAG’s this is very useful. > > > > > > > > > > > > > > > 3) Make the scheduler not require DAG files. This could be > done > > > if > > > > > the > > > > > > > > edges contain all information when to trigger the next task. > We > > > can > > > > > > then > > > > > > > > have event driven dag parsing outside of the scheduler loop, > > ie. > > > by > > > > > the > > > > > > > > cli. Storage can also be somewhere else (git, artifactory, > > > > > filesystem, > > > > > > > > whatever). > > > > > > > > > > > > > > The scheduler is almost untouched in this PR. The only thing > that > > > is > > > > > > added is that this edges are saved to the database but the > > scheduling > > > > > > itself din’t change. The scheduler depends now still on the DAG > > > object. > > > > > > > > > > > > > > > 4) Fully serialise the dag so it becomes transferable to > > workers > > > > > > > > > > > > > > It nice to see that people has a lot of idea’s about this. But > as > > > > Fokko > > > > > > already mentioned this is out of scope for the issue what we are > > > trying > > > > > to > > > > > > solve. I also have some idea’s about this but I like to limit > this > > > > PR/AIP > > > > > > to the webserver. > > > > > > > > > > > > > > For now my PR does solve 1 and 2 and the rest of the behaviour > > > (like > > > > > > scheduling) is untouched. > > > > > > > > > > > > > > Gr, > > > > > > > Peter > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >