Do we reach any consensus on this topic /AIP? I think persisting DAG is
pretty important actually.

-Tao

On Tue, Mar 12, 2019 at 3:01 AM Kevin Yang <yrql...@gmail.com> wrote:

> Hi Fokko,
>
> As a large cluster maintainer, I’m not a big fan of large DAG files
> neither. But I’m not sure if I’ll consider this bad practice. We have some
> large frameworks, e.g. experimentation and machine learning, that are
> complex by nature and generate large number of DAGs from their customer
> configs to get better flexibility. I consider them as advance use cases of
> Airflow and open up a lot potentials for Airflow, unless we’ve previously
> set some boundaries around how complex DAG codes can be that I’m not aware
> of. About resulting in an unworkable situation, yes we are experiencing
> pain from having such large DAG files, mainly on the webserver side, but
> the system overall are running stable. We are actually hoping to improve
> the situation by applying solutions like making webserver stateless. It is
> ok that if the owners of large DAG files need to pay but we should try
> minimize the price—longer refresh interval, extra task running time, but
> nothing too crazy.
>
>
> I think we’re aligned on storing info in DB as long as we can meet the
> requirements Dan mentioned earlier—we just need that balance decided, so
> I’m gonna skip this part( out of all the requirements, No.1 seems to be
> least clear, maybe we can expand on that). One thing about the proposed
> idea is that we implicitly couple DagRun with DAG version, which at the
> first glance make sense but imo not very ideal. I feel full versioning
> should track all changes instead of tracking changes only when we create
> DagRun. E.g. my task failed and I merged new code to fix my task and I want
> to rerun it with the current code, with serialize DAG during DagRun
> creation time we won’t have the up to date snapshot—sure we can work around
> it by like always keep a current snapshot of DAG but this is kinda messy
> and confusing. This is what popped up on the top of my head and w/o full
> versioning we might have some other tricky cases, e.g. ur backfill case.
> But I just gave a few thoughts into this and you might already have a
> complete story that will void my concerns.
>
>
> Cheers,
> Kevin Y
>
> On Sun, Mar 10, 2019 at 11:29 AM Driesprong, Fokko <fo...@driesprong.frl>
> wrote:
>
> > Thanks Kevin for opening the discussion. I think it is important to have
> a
> > clear overview on how to approach the AIP.
> >
> > First of all, how many DAGs do we have that take 30s to parse? I consider
> > this bad practice, and this would also result in an unworkable situation
> > with the current setup of Airflow since it will take a lot of resources
> on
> > the webserver/scheduler, and the whole system will become unresponsive. I
> > will be hard to cope with such DAGs in general.
> >
> > The idea from the AIP is to have the versioned version of the dag in the
> > DB, so in the end, you won't need to parse the whole thing every time.
> Only
> > when you trigger a DAG, or when you want to see the current status of the
> > dag.
> >
> > Like stated earlier, I strongly feel we shouldn't serialize the DAGs as
> > JSON(5) or pickles in general. For me, this is deferring the pain of
> > setting up a structure of the DAG object itself.
> > Having the DAG denormalized in the database will give us cleaner storage
> of
> > our DAG. We can, for example, enforce fields by making them not null, so
> we
> > know that is something is off at write time, instead of read.
> Furthermore,
> > we're missing logical types such as dates, which we efficiently can query
> > using the indices of the database.
> > Also, with all serialization formats, evolution isn't trivial. Consider
> the
> > situations when:
> > - We're introducing a new field, and it might be null, therefore we need
> to
> > bake in all kinds of logic into the Airflow code, which you don't want.
> > With proper migration scripts, you could prefill these fields, and make
> > them not null.
> > - Changing the models, for example, you still can't change a string-type
> > into a integer with adding custom logic. In this case, the reviewer needs
> > to be extra careful that there are no breaking changes introduced. Right
> > now we're doing minimal forward- and backward compatibilitytesting.
> >
> > In the case we get too many migrations, we could also squash (some of
> them)
> > when preparing the release.
> >
> > Personally, I don't think the serialization is the issue here. As Max
> > already mentioned, it is the optimal balance of (de)normalization. From
> the
> > user perspective, the serialization won't change much of the behaviour of
> > Airflow.
> >
> > For me, instead of having `DAG.serialize()` and `DAG.deser(version)` is
> not
> > the ideal approach. But it might be that we're on the same page :-) I
> > believe it should be something like `DagRun.find('fokkos_dag',
> > datetime(2018, 03, 01))` and construct the correct version of the dag.
> > Since there is an uniqueness constrain on dag_id, datetime, this will
> > always return the same dag. You will get the versioned DagRun as it ran
> > that time. Serializing the fields adn storing them in the database should
> > happen transparently when you update the DAG object. When you run a dag,
> > you'll parse the dag, and then run it. `Dag().create_dagrun(...)`, this
> > will create a DagRun as the name suggests, if the version of the dag
> still
> > exists in the database, it will reuse that one, otherwise it will create
> a
> > new version of the DAG (with all the operators etc). In this sense the
> > version of the DAGs should be done within the Dag(Run).
> >
> > The versioning will change the behavour from a user perspective. Right
> now
> > we store only a single version. For example, the poor mans backfilling
> > won't work anymore. This is clearing the state from past&future, up- and
> > downstream, and let it catch up again.
> > In this case, the old version of the DAG won't exists anymore, and
> > potentially there are tasks that aren't in the code anymore. In this case
> > we need to clear the version of the dag, and rerun it with the latest
> > version `DagRun.find('fokkos_dag', datetime(2018, 03, 01)).clear()`. How
> we
> > are going to do clear's downstram in the middle of the dag, that is
> > something I still have to figure out. Because potentially there are tasks
> > that can't be rerun because the underlying Python code has changed, both
> on
> > user level as on Airflow level. It will be impossible to get these
> features
> > pure in that sense.
> > I would not suggest adding a new status in here, indicating that the task
> > can't be rerun since it isn't part of the DAG anymore. We have to find
> the
> > balance here in adding complexity (also to the scheduler) and features
> that
> > we need to introduce to help the user.
> >
> > Cheers, Fokko
> >
> > Ps. Jarek, interesting idea. It shouldn't be too hard to make Airflow
> more
> > k8s native. You could package your dags within your container, and do a
> > rolling update. Add the DAGs as the last layer, and then point the DAGs
> > folder to the same location. The hard part here is that you need to
> > gracefuly restart the workers. Currently AFAIK the signals given to the
> pod
> > aren't respected. So when the scheduler/webserver/worker receives a
> > SIGTERM, it should stop the jobs nicely and then exit the container,
> before
> > k8s kills the container using a SIGKILL.  This will be challenging with
> the
> > workers, which they are potentially long-running. Maybe stop kicking off
> > new jobs, and let the old ones finish, will be good enough, but then we
> > need to increase the standard kill timeout substantially. Having this
> would
> > also enable the autoscaling of the workers.
> >
> >
> >
> > Op za 9 mrt. 2019 om 19:07 schreef Maxime Beauchemin <
> > maximebeauche...@gmail.com>:
> >
> > > I want to raise the question of the amount of normalization we want to
> > use
> > > here as it seems the to be an area that needs more attention.
> > >
> > > The SIP suggest having DAG blobs, task blobs and edges (call it the
> > > fairly-normalized). I also like the idea of all-encompassing (call it
> > > very-denormalized) DAG blobs as it seems easier to manage in terms of
> > > versioning. The question here is whether we go with one of these method
> > > exclusively, something in-between or even a hybrid approach (redundant
> > > blobs that use different level of normalization).
> > >
> > > It's nice and simple to just push or pull DAG atomic objects with a
> > version
> > > stamp on it. It's clearly simpler than dealing with 3 versioned tables
> > > (dag, tasks, edges). There are a lot of pros/cons, and they become more
> > > apparent with the perspective of very large DAGs. If the web server is
> > > building a "task details page", using the "fairly-normalized" model, it
> > can
> > > just pull what it needs instead of pulling the large DAG blob.
> Similarly,
> > > if building a sub-tree view (a subset of the DAG), perhaps it can only
> > > retrieve what it needs. But if you need the whole DAG (say for the
> > > scheduler use case) then you're dealing with more complex SQL/ORM
> > > operations (joins hopefully, or multiple db round trips)
> > >
> > > Now maybe the right approach is more something like 2 tables: DAG and
> > > task_details, where edges keys are denormalized into DAG (arguably
> > that's a
> > > few KBs at most, even for large DAGs), and maybe the DAG object has
> most
> > of
> > > the high level task metadata information (operator, name, baseoperator
> > key
> > > attrs), and task_details has the big blobs (SQL code). This is
> probably a
> > > nice compromise, the question becomes "how much task-level detail do we
> > > store in the DAG-centric blog?", probably not much to keep the DAG
> > objects
> > > as small as possible. The main downside here is that you cannot have
> the
> > > database join and have to do 2 round trips to reconstruct a DAG object
> > > (fetch the DAG, parse the object to get the list of tasks, and then run
> > > another db query to get those task details).
> > >
> > > To resume, I'd qualify the more normalized approach as the most proper,
> > but
> > > also the more complex. It'll shine in specific cases around large DAGs.
> > If
> > > we have the proper abstractions (methods like DAG.serialize(),
> > > DAG.deser(version)) then I guess that's not an issue.
> > >
> > > Max
> > >
> > > On Fri, Mar 8, 2019 at 5:21 PM Kevin Yang <yrql...@gmail.com> wrote:
> > >
> > > > Hi Julian, I'm definitely aligned with you guys on making the
> webserver
> > > > independent of DAG parsing, just the end goal to me would be to
> build a
> > > > complete story around serializing DAG--and move with the story in
> > mind. I
> > > > feel like you guys may already have a list of dynamic features we
> need
> > to
> > > > deprecate/change, if that is the case feel free to open the
> discussion
> > on
> > > > what we do to them with DAG serialization.
> > > >
> > > > Julian, Ash, Dan, on 2nd thought I do agree that if we can meet the
> > > > requirements Dan mentioned, it would be nice to have them stored in
> the
> > > DB.
> > > > Some combined solutions like having a column of serialized graph in
> the
> > > > serialized dag table can potentially meet all requirements. What
> format
> > > we
> > > > end up using to represent DAG between components is now less
> important
> > > > IMO--fine to refactor those endpoints only need DagModel to use only
> > > > DagModel, easy to do a batch replacement if we decide otherwise
> later.
> > > More
> > > > important is to define this source of truth for serialized DAG.
> > > >
> > > > Ash, ty for the email list, I'll tune my filters accordingly :D I'm
> > > leaning
> > > > towards having a separate process for the parser so we got no
> scheduler
> > > > dependency etc for this parser but we can discuss this in another
> > thread.
> > > >
> > > > On Fri, Mar 8, 2019 at 8:57 AM Dan Davydov
> > <ddavy...@twitter.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > >
> > > > > > Personally I don’t understand why people are pushing for a
> > JSON-based
> > > > DAG
> > > > > > representation
> > > > >
> > > > > It sounds like you agree that DAGs should be serialized (just in
> the
> > DB
> > > > > instead of JSON), so will only address why JSON is better than
> MySQL
> > > (AKA
> > > > > serializing at the DAG level vs the task level) as far as I can
> see,
> > > and
> > > > > not why we need serialization. If you zoom out and look at all the
> > use
> > > > > cases of serialized DAGs, e.g. having the scheduler use them
> instead
> > of
> > > > > parsing DAGs directly, then it becomes clear that we need all
> > > appropriate
> > > > > metadata in these DAGs, (operator params, DAG properties, etc), in
> > > which
> > > > > case it's not clear how it will fit nicely into a DB table (unless
> > you
> > > > > wanted to do something like (parent_task_id, task_id, task_params),
> > > also
> > > > > keep in mind that we will need to store different versions of each
> > DAG
> > > in
> > > > > the future so that we can ensure consistency in a dagrun, i.e. each
> > > task
> > > > in
> > > > > a dagrun uses the same version of a DAG.
> > > > >
> > > > > I think some of our requirements should be:
> > > > > 1. The data model will lead to acceptable performance in all of its
> > > > > consumers (scheduler, webserver, workers), i.e. no n+1 access
> > patterns
> > > > (my
> > > > > biggest concern about serializing at task level as you propose vs
> at
> > > DAG
> > > > > level)
> > > > > 2. We can have versioning of serialized DAGs
> > > > > 3. The ability to separate DAGs into their own data store (e.g. no
> > > > reliance
> > > > > on joins between the new table and the old one)
> > > > > 4. One source of truth/serialized representation for DAGs
> (currently
> > we
> > > > > have SimpleDAG)
> > > > >
> > > > > If we can full-fill all of these requirements and serialize at the
> > task
> > > > > level rather than the DAG level in the DB, then I agree that
> probably
> > > > makes
> > > > > more sense.
> > > > >
> > > > >
> > > > > > In the proposed PR’s we (Peter, Bas and me) aim to avoid
> re-parsing
> > > DAG
> > > > > > files by querying all the required information from the database.
> > In
> > > > one
> > > > > or
> > > > > > two cases this may however not be possible, in which case we
> might
> > > > either
> > > > > > have to fall back on the DAG file or add the missing information
> > into
> > > > the
> > > > > > database. We can tackle these problems as we encounter them.
> > > > >
> > > > > I think you would have the support of many of committers in
> removing
> > > any
> > > > > use-cases that stand in the way of full serialization, that being
> > said
> > > if
> > > > > we need to remove features we need to do this carefully and
> > > thoughtfully,
> > > > > and ideally with proposed alternatives/work-arounds to cover the
> > > > removals.
> > > > >
> > > > > The counter argument: this PR removes the need for the confusing
> > > > "Refresh"
> > > > > > button from the UI, and in general you only pay the cost for the
> > > > > expensive
> > > > > > DAGs when you ask about them. (I don't know what/when we call the
> > > > > > /pickle_info endpoint of the top of my head)
> > > > >
> > > > > Probably worth splitting out into a separate thread, but I'm
> actually
> > > not
> > > > > sure the refresh button does anything, I think we should double
> > > check...
> > > > I
> > > > > think about 2 years ago there was a commit made that made gunicorn
> > > > > webservers automatically rotate underneath flask (each one would
> > > reparse
> > > > > the DAGbag). Even if it works we should probably remove it since
> the
> > > > > webserver refresh interval is pretty fast, and it just causes
> > confusion
> > > > to
> > > > > users and implies that the DAGs are not refreshed automatically.
> > > > >
> > > > > Do you mean https://json5.org/ or is this a typo? That might be
> okay
> > > > for a
> > > > > > nicer user front end, but the "canonical" version stored in the
> DB
> > > > should
> > > > > > be something "plainer" like just JSON.
> > > > >
> > > > > I think he got this from my reply, and it was just an example, but
> > you
> > > > are
> > > > > right, I agree JSON would be better than JSON5.
> > > > >
> > > > > On Fri, Mar 8, 2019 at 8:53 AM Ash Berlin-Taylor <a...@apache.org>
> > > wrote:
> > > > >
> > > > > > Comments inline.
> > > > > >
> > > > > > > On 8 Mar 2019, at 11:28, Kevin Yang <yrql...@gmail.com> wrote:
> > > > > > >
> > > > > > > Hi all,
> > > > > > > When I was preparing some work related to this AIP I found
> > > something
> > > > > > very concerning. I noticed this JIRA ticket <
> > > > > > https://issues.apache.org/jira/browse/AIRFLOW-3562> is trying to
> > > > remove
> > > > > > the dependency of dagbag from webserver, which is awesome--we
> > wanted
> > > > > badly
> > > > > > but never got to start work on. However when I looked at some
> > > subtasks
> > > > of
> > > > > > it, which try to remove dagbag dependency from each endpoint, I
> > found
> > > > the
> > > > > > way we remove the dependency of dagbag is not very ideal. For
> > example
> > > > > this
> > > > > > PR <https://github.com/apache/airflow/pull/4867/files> will
> > require
> > > us
> > > > > to
> > > > > > parse the dag file each time we hit the endpoint.
> > > > > >
> > > > > > The counter argument: this PR removes the need for the confusing
> > > > > "Refresh"
> > > > > > button from the UI, and in general you only pay the cost for the
> > > > > expensive
> > > > > > DAGs when you ask about them. (I don't know what/when we call the
> > > > > > /pickle_info endpoint of the top of my head)
> > > > > >
> > > > > > This end point may be one to hold off on (as it can ask for
> > multiple
> > > > > dags)
> > > > > > but there are some that def don't need a full dag bag or to even
> > > parse
> > > > > the
> > > > > > dag file, the current DAG model has enough info.
> > > > > >
> > > > > > >
> > > > > > >
> > > > > > > If we go down this path, we indeed can get rid of the dagbag
> > > > dependency
> > > > > > easily, but we will have to 1. increase the DB load( not too
> > > concerning
> > > > > at
> > > > > > the moment ), 2. wait the DAG file to be parsed before getting
> the
> > > page
> > > > > > back, potentially multiple times. DAG file can sometimes take
> > quite a
> > > > > while
> > > > > > to parse, e.g. we have some framework DAG files generating large
> > > number
> > > > > of
> > > > > > DAGs from some static config files or even jupyter notebooks and
> > they
> > > > can
> > > > > > take 30+ seconds to parse. Yes we don't like large DAG files but
> > > people
> > > > > do
> > > > > > see the beauty of code as config and sometimes heavily
> > abuseleverage
> > > > it.
> > > > > > Assuming all users have the same nice small python file that can
> be
> > > > > parsed
> > > > > > fast, I'm still a bit worried about this approach. Continuing on
> > this
> > > > > path
> > > > > > means we've chosen DagModel to be the serialized representation
> of
> > > DAG
> > > > > and
> > > > > > DB columns to hold different properties--it can be one candidate
> > but
> > > I
> > > > > > don't know if we should settle on that now. I would personally
> > > prefer a
> > > > > > more compact, e.g. JSON5, and easy to scale representation( such
> > that
> > > > > > serializing new fields != DB upgrade).
> > > > > >
> > > > > > Do you mean https://json5.org/ or is this a typo? That might be
> > okay
> > > > for
> > > > > > a nicer user front end, but the "canonical" version stored in the
> > DB
> > > > > should
> > > > > > be something "plainer" like just JSON.
> > > > > >
> > > > > > I'm not sure that "serializing new fields != DB upgrade" is that
> > big
> > > > of a
> > > > > > concern, as we don't add fields that often. One possible way of
> > > dealing
> > > > > > with it if we do is to have a hybrid approach - a few distinct
> > > columns,
> > > > > but
> > > > > > then a JSON blob. (and if we were only to support postgres we
> could
> > > > just
> > > > > > use JSONb. But I think our friends at Google may object ;) )
> > > > > >
> > > > > > Adding a new column in a DB migration with a default NULL
> shouldn't
> > > be
> > > > an
> > > > > > expensive operation, or difficult to achieve.
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > In my imagination we would have to collect the list of dynamic
> > > > features
> > > > > > depending on unserializable fields of a DAG and start a
> > > discussion/vote
> > > > > on
> > > > > > dropping support of them( I'm working on this but if anyone has
> > > already
> > > > > > done so please take over), decide on the serialized
> representation
> > > of a
> > > > > DAG
> > > > > > and then replace dagbag with it in webserver. Per previous
> > discussion
> > > > and
> > > > > > some offline discussions with Dan, one future of DAG
> serialization
> > > > that I
> > > > > > like would look similar to this:
> > > > > > >
> > > > > >
> > > > > > > https://imgur.com/ncqqQgc
> > > > > >
> > > > > > Something I've thought about before for other things was to embed
> > an
> > > > API
> > > > > > server _into_ the scheduler - this would be useful for k8s
> > > > healthchecks,
> > > > > > native Prometheus metrics without needed statsd bridge, and could
> > > have
> > > > > > endpoints to get information such as this directly.
> > > > > >
> > > > > > I was thinking it would be _in_ the scheduler process using
> either
> > > > > threads
> > > > > > (ick. Python's still got a GIL doesn't it?) or using
> async/twisted
> > > etc.
> > > > > > (not a side-car process like we have with the logs webserver for
> > > > `airflow
> > > > > > worker`).
> > > > > >
> > > > > > (This is possibly an unrelated discussion, but might be worth
> > talking
> > > > > > about?)
> > > > > >
> > > > > > > We can still discuss/vote which approach we want to take but I
> > > don't
> > > > > > want the door to above design to be shut right now or we have to
> > > spend
> > > > a
> > > > > > lot effort switch path later.
> > > > > > >
> > > > > > > Bas and Peter, I'm very sorry to extend the discussion but I do
> > > think
> > > > > > this is tightly related to the AIP and PRs behind it. And my
> > sincere
> > > > > > apology for bringing this up so late( I only pull the open PR
> list
> > > > > > occasionally, if there's a way to subscribe to new PR event I'd
> > love
> > > to
> > > > > > know how).
> > > > > >
> > > > > > It's noisy, but you can subscribe to comm...@airflow.apache.org
> > (but
> > > > be
> > > > > > warned, this also includes all Jira tickets, edits of every
> comment
> > > on
> > > > > > github etc.).
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Kevin Y
> > > > > > >
> > > > > > > On Thu, Feb 28, 2019 at 1:36 PM Peter van t Hof <
> > > > pjrvant...@gmail.com
> > > > > > <mailto:pjrvant...@gmail.com>> wrote:
> > > > > > > Hi all,
> > > > > > >
> > > > > > > Just some comments one the point Bolke dit give in relation of
> my
> > > PR.
> > > > > > >
> > > > > > > At first, the main focus is: making the webserver stateless.
> > > > > > >
> > > > > > > > 1) Make the webserver stateless: needs the graph of the
> > *current*
> > > > dag
> > > > > > >
> > > > > > > This is the main goal but for this a lot more PR’s will be
> coming
> > > > once
> > > > > > my current is merged. For edges and graph view this is covered in
> > my
> > > PR
> > > > > > already.
> > > > > > >
> > > > > > > > 2) Version dags: for consistency mainly and not requiring
> > parsing
> > > > of
> > > > > > the
> > > > > > > > dag on every loop
> > > > > > >
> > > > > > > In my PR the historical graphs will be stored for each DagRun.
> > This
> > > > > > means that you can see if an older DagRun was the same graph
> > > structure,
> > > > > > even if some tasks does not exists anymore in the current graph.
> > > > > Especially
> > > > > > for dynamic DAG’s this is very useful.
> > > > > > >
> > > > > > > > 3) Make the scheduler not require DAG files. This could be
> done
> > > if
> > > > > the
> > > > > > > > edges contain all information when to trigger the next task.
> We
> > > can
> > > > > > then
> > > > > > > > have event driven dag parsing outside of the scheduler loop,
> > ie.
> > > by
> > > > > the
> > > > > > > > cli. Storage can also be somewhere else (git, artifactory,
> > > > > filesystem,
> > > > > > > > whatever).
> > > > > > >
> > > > > > > The scheduler is almost untouched in this PR. The only thing
> that
> > > is
> > > > > > added is that this edges are saved to the database but the
> > scheduling
> > > > > > itself din’t change. The scheduler depends now still on the DAG
> > > object.
> > > > > > >
> > > > > > > > 4) Fully serialise the dag so it becomes transferable to
> > workers
> > > > > > >
> > > > > > > It nice to see that people has a lot of idea’s about this. But
> as
> > > > Fokko
> > > > > > already mentioned this is out of scope for the issue what we are
> > > trying
> > > > > to
> > > > > > solve. I also have some idea’s about this but I like to limit
> this
> > > > PR/AIP
> > > > > > to the webserver.
> > > > > > >
> > > > > > > For now my PR does solve 1 and 2 and the rest of the behaviour
> > > (like
> > > > > > scheduling) is untouched.
> > > > > > >
> > > > > > > Gr,
> > > > > > > Peter
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to