I quite like the PIPE option. For me I feel that is more readable and easy to understand similar to how we currently set task dependencies.
Wrapper Function is also fine as long as a User doesn't have to change the way they define task i.e. they can still use their old DAGs. The main reason I like the PIPE approach compared to setting args / kwargs is I like the current approach where we can define task in isolation (without knowledge of other task, there are exceptions to that too but low in number, like using xcom_pull in a templated_field) and then set dependencies separately. So to be able to optionally set lineage info would be ideal. Regarding usecases: Two main use-cases I encounter most often is (1) to traceback an error in Data from the Data Provider, and (2) In case of Audits (to check GDPR compliance or a user-request) I can easily provide how we got the data and how was it modified. Regards, Kaxil On Sun, Jan 26, 2020, 21:11 Jarek Potiuk <[email protected]> wrote: > > > > File(url=“http://www.google.com”) | task1 | task2 > File(url=“ > > file:///tmp/out”) > > > > I love the UNIXY pipeline approach - a lot more than my operator > overloading proposal :). > And I think combining it with the builder pattern for more complex > cases works > very nicely (those complex cases will be rare I think). The | pattern will > only really > work "after" the tasks are defined though, not "when" they are defined so > they are > closer to dependencies rather than builder but I think it's good to have > it. > > At the end I think my proposal with "set_inlet" rather than "inlet()" is > really a question of an emphasis - do we see it as something as we "build" > operator > with (hence builder pattern) or something that we "set" afterwards. > Implementation - > wise I think they are pretty much the same :). And I am fine to put the > emphasis on > "building"/"builderPattern" and name the methods "inlet()" "outlet()". It's > at the end > the matter of how users will use it - and what will be the most popular way > of defining > inlets/outlets. I think we agree in this case that "builder" + unixy > pipeline is something > we both can agree with :). > > really do not want to have the mapping needed to be kept in sync by the DAG > > developer. It is okay for non lineage aware operators, but for lineage > > aware operators this shouldn’t be there - it should be inside the > operator. > > > > Agree. If we have lineage-aware operators we can add lineage as first-class > init option. > I do not think it should be a default approach (like "always add lineage to > all operators > when you update it") but it should be an option to build your own custom > lineage-aware > operator if you do not want to use builder pattern. Some "standard" > operators might > come with lineage built-in when they are pretty much "requiring" or > "strongly encouraging" > having lineage - for example where you have services that deal with > privacy. > > I am still however not 100% convinced about the implicit kwargs passing > from Inlet/Outlet classes. > It sounds too implicit and too hacky for me - I have a gut feeling that > there will be cases where > it will break initialisation code in some cases (and for sure it breaks IDE > autocompletion "cleannes"). > On the positive side I see how it saves on not having to define your own > custom lineage-aware > variants of the operators when they are not lineage-aware. This is quite a > value on its own - I fully agree. > > But on the other hand, it feels like a bit of duct-tape solution :). Not > that I am all against duct-tape and > zip-ties - sometimes they are the best engineering solution you can come up > with and that's OK. > > This opinion is not super-strong, I'd love to know what others think about > it. If it's ok for others > and it's only me who has this concern, I am quite OK with having all three > options available: > > - custom Inlet/Outlet args with kwargs passing > - inlet()/outlet() builder pattern > - UNIXY | operator. > > J. > > > > > > > > > > On 24 January 2020 at 14:35:00, Jarek Potiuk ([email protected]) > > wrote: > > > > After discussing with Bolke (we had indeed very good and constructive > > discussion at Polidea office this week), I am a great supporter of adding > > more lineage support to Airflow and I think we should all as community > > think about how to make it as easy as possible to use and maintain. If it > > is not yet, it will soon become a requirement for many businesses to > record > > the lineage data for many reasons (GDPR/ the California Consumer Privacy > > are the first ones that come to mind but there are many > enterprise-internal > > cases that are also super-important. And adding lineage data might > open-up > > Airflow deployments to a lot of new usages. So I am all for it and happy > to > > support / take part in the effort to implement it. > > > > > > *Some context/thoughts:* > > > > I thought a bit about it after our discussions with Bolke, and from the > > implementation point of view I think of lineage mainly as "additional > > meta-data" - IMHO it should not interfere too much with the ways current > > DAGs are implemented. Lineage should be easy to use but it also should be > > optional, and I think it's a bad idea to make the users choose the way > they > > are going to write the Dags depending whether they want to add lineage or > > not. I think it should be super easy to add lineage information to > existing > > Dags without heavily impacting the way how the DAGs are implemented in > the > > code. This should be an "extra" information added to existing dags. > > Otherwise we try to mix two different things - task dependency and data > > dependency. Putting them together is I think very difficult, but putting > > them as "separate layers" is quite doable. Also I think we already have > in > > total many millions of DAGs written by different companies and the easier > > we make it to add Lineage information to those DAGs - the better IMHO. > Also > > I think changing totally concept of writing DAGs and dependencies is a > > difficult one to pull-off - especially if we would like people to > maintain > > DAGs written before/after lineage in the same place. I think transition > > should be simple and incremental rather than revolutionary > > > > I think most of the options we have differ by "syntactic-sugar" - under > the > > hood they are all trying to achieve the same. And the syntactic-sugar is > > really what is most important for DAG developers, and Operator > developers. > > So I don't even discuss the need and scope of API/etc - this is secondary > > and I think we can agree that after we all agree what syntax we use to > add > > lineage information. > > > > *Comments on proposals:* > > > > Let me comment on those proposals (in this context above): > > > > 1) Bolke's Wrappers - point 1 > > > > It does not impact the way how operators are written (and does not > require > > to rewrite hundreds of operators). That's good. And it allows to > > incrementally change existing DAGs by replacing parameters of some > > operators to use Inlet/Outlet - it's good as well. It does not differ > that > > much from existing dags - that's great. One thing that worries me in this > > case that in case you write Dags with/without lineage or have some > > operators with/without lineage in the same DAG, you get a different > syntax > > (potentially for the same operator classes). I am not 100% sure if this > is > > a realistic case to mix lineage/non-lineage tasks in the same DAG (I > think > > it might happen) but there is a bit of "syntactic smell" (similar to > > code-smell) where we seem to have some inconsistent approach on how to > > define DAGs. It's not a deal-breaker, just a smell. It will be quite a > work > > to convert between one and the other and when you want to copy&paste > > portions of DAGs with/without lineage it will not work. > > > > 2) Builder pattern: It has similar characteristics as 1) but it IMHO it > is > > much nicer for the difference between lineage/non-lineage case. Main > > parameters remain the same, autocompletion when writing the dags will > > continue to work here (for operators) for example. and You can > > copy&paste easily between dags with/without lineage with just > > deleting/adding the builder methods. Indeed the parameter name have to be > > mapped, but as Bolke mentioned, they are slowly changing (if at all) and > I > > think it's not a big problem overall - especially that we can very easily > > verify that at setting inlet/outlet - if parameter name is wrong, the > whole > > DAG would fail so we have a safety net. I like it much better than 2). > > > > 3) I also see how we can extend 2. by utilising the pattern that we > already > > use in Airflow - similar to what we do with dependencies. Currently task > > dependencies are independent from the definition of the operators and > they > > are never defined at the place where operators are defined. And I think > > inlets/outlets should be similar. Similarly as in DAG dependencies, you > > could add the "lineage dependencies" right after where the operator is > > defined or "all dependencies in bulk". Depending on your dag style of > > currently defined dependencies - different people have different styles. > I > > think we could add lineage information very similarly. It would be > similar > > to the builder pattern but decoupled from task definition. And it would > be > > closer to dependency definition rather than to task definition. It could > > also be supported by 'python operator overloading' - similarly as we do > > with << >> for set_downstream/set_upstream. Example: > > > > task1 = BashOperator() # For example > > task2 = MysqlToHiveTransfer(mysql_conn_id=‘conn_id’, sql=’select * from > > table’, hive_cli_conn_id=‘hive_conn’, hive_table=‘hive_table’) > > > > task1 >> task2 > > > > > task2.set_inlet(‘mysql_conn_id’,{’sql’:‘mysql’}).set_outlet(‘hive_cli_conn_id’, > > > > ‘hive_table’) > > or > > task2 <= (‘mysql_conn_id’,{’sql’:‘mysql’}) >= (‘hive_cli_conn_id’, > > ‘hive_table’) > > > > I think this is basically the same as builder pattern, but naming it > > builder pattern implies that it is done at "creation time". With this > > proposal (set_ method naming and operator overloading and separating it > > from task definition) it is more afterthought to task creation time - > > similarly as dependencies are. It has one advantage over the builder > > pattern - DAG definition with Lineage and without it look exactly the > same > > and you can copy&paste them as they are. Lineage information is added > extra > > and you can easily add more of the lineage > > information without touching the original DAG definition. You cam also > > choose your lineage definition style - either you couple it closer with > > task definition (if you add it right after the task) or you have separate > > "lineage" section where you have all the lineage dependencies for all the > > tasks in your DAG in the same place. I like this approach most, I think > it > > serves the same purpose as 2) and 1) but is more flexible and more > > "incremental" in its nature. > > > > 4) Gerard's functional pattern - I think it's an interesting approach. > But > > I think it's much deeper change in the approach on how we think about > > Airflow. I believe we should not mix it with lineage discussion. They > might > > converge at some point, but @Gerard - I have a kind proposal / request - > > maybe you can open a separate thread with that at the devlist? It changes > > the whole approach for Dag writing to be more functional. With the > lineage > > discussion, I think we talk about what can be implemented in 2.0 > timeframe, > > where your change is much more futuristic and requires to change the > > paradigm shift for Airflow. I think the basic assumption for lineage is > > that we should be able to tap into both - existing operators and existing > > DAGs rather than rewrite them all. > > > > J. > > > > > > > > On Fri, Jan 24, 2020 at 12:10 AM Gerard Casas Saez > > <[email protected]> wrote: > > > > > Hi everyone! > > > > > > I think the whole data lineage proposal is great and I would like to > > > contribute a bit with my own thoughts on how to extend the Operators > API > > > for better lineage support. > > > > > > Lately, I’ve been experimenting a bit on extending the Operator API to > > > make it more `functional` to specify Data dependencies and pipeline > data > > > across the DAG. My approach is backwards compatible and it separates > the > > > way you specify operator arguments with Inlets/Outlets dynamically > > > generated. I used XCom as a simplification to pass around dynamic > values. > > > > > > My proposal is to include a __call__ function that would dynamically > > > replace class attributes before executing the `pre_execute` and > `execute` > > > function. This tied with a XComArg, a class that points to a previous > > task > > > XCom pushed value, allowed me to define DAGs in a more functional > > > approach. Basically my proposal is: > > > > > > > > > • Add a __call__ function in BaseOperator that accepts Inlets (in my > case > > > its XComArgs) > > > • Log their values on execution time (which would allow to expose a > REST > > > API like proposed before) > > > • Resolves them before executing the main `execute` function > > > • Set attribute in the operator class > > > • Executes the operator and returns an XComArgs that can later be tied > in > > > a new operator as an Inlet… > > > > > > > > > Here’s what it would look like (ML example, sorry): > > > > > > with DAG(...) as dag: > > > load = LoadDatasetOperator(task_id='load_dataset', ) > > > split = SplitTrainTestOperator(task_id='split', test_perc=0.3) > > > train = TrainTensorflowModelOperator(task_id='train') > > > validate = PrecisionRecallOperator(task_id='pr') > > > report = EmailOperator(task_id='send_pr_report', subject='New model > > > trained results', email='[email protected]’) > > > > > > dataset = load(path='hdfs://some/dataset') > > > splitted_ds = split(dataset=dataset) > > > model = train(dataset=splitted_ds['train'], > > > model_specification='hdfs://some/dataset') > > > metrics = validate(model=model, dataset=splitted_ds['test']) > > > report(html_content=metrics) > > > > > > As someone wise sometime said, code is better than words, so here’s my > > > experimental code: https://github.com/casassg/corrent (ignore the > awful > > > name and the injection part). > > > > > > Gerard Casas Saez > > > Twitter | Cortex | @casassaez > > > On Jan 22, 2020, 8:40 PM -0700, Tao Feng , wrote: > > > > Thanks Bolke. For those that are not aware, my team is working with > > > Bolke's > > > > team on Amundsen which is a data discovery and metadata project( > > > > https://github.com/lyft/amundsen) . I think although it ships with > > Atlas > > > > client(or it used to be), the new API per my understanding is generic > > > > enough that doesn't tight with atlas. E.g we(Lyft) could build a > neo4j > > / > > > > Amundsen client in our Airflow fork to ingest the lineage info in a > > push > > > > fashion to build the lineage. > > > > > > > > Amundsen itself has put up the effort to integrate Airflow with the > > > > tool(connect which DAG/task produces the data set etc). With this > > > change, I > > > > foresee it will help to provide more enriched metadata. > > > > > > > > Thanks, > > > > -Tao > > > > > > > > On Wed, Jan 22, 2020 at 8:46 AM Dan Davydov > > <[email protected] > > > > > > > > wrote: > > > > > > > > > Just want to preface my reply with the fact that I haven't thought > > > about > > > > > data lineage very much. > > > > > > > > > > This is an awesome idea :)! I like something like 1) personally, > e.g. > > > > > operators could optionally define a .outlet() and .inlet() > interface > > > which > > > > > would return the inlets and outlets of a given task, and then it's > up > > > to > > > > > the operator how it wants to set these inlets/outlets like the > > > Papermill > > > > > operator currently does. This also keeps allows inlets/outlets more > > > dynamic > > > > > (e.g. in the case of an operator that might generate inlets/outlets > > > > > dynamically at execution time). Seems the most extensible/least > > > coupling. > > > > > IMO we should strive to make DAGs easy to create with little > > > boilerplate, > > > > > but this is a lot less important for operators since they are a lot > > > more > > > > > stable and change less frequently, so it's fine to require > operators > > to > > > > > implement some interface manually. > > > > > > > > > > On Wed, Jan 22, 2020 at 8:33 AM Bolke de Bruin <[email protected]> > > > wrote: > > > > > > > > > > > Dear All, > > > > > > > > > > > > Over last few weeks I made serious improvements to the lineage > > > support > > > > > that > > > > > > Airflow has. Whilst not complete it’s starting to shape up and I > > > think it > > > > > > is good to share some thoughts and directions. Much has been > > > discussed > > > > > with > > > > > > several organisations like Polidea, Daily Motion and Lyft. Some > > have > > > > > > already implemented some support for lineage themselves (Daily > > > Motion) > > > > > and > > > > > > some have a need for it (Lyft with Amundsen). > > > > > > > > > > > > First a bit of a recap. What is lineage of why is it important? > > > Lineage > > > > > > allows you to track the origins of data what happens to it and > > where > > > it > > > > > > moves over time. Lineage is often associated with audibility of > > data > > > > > > pipelines which is not a very sexy subject ;-). However, there > are > > > much > > > > > > more prominent and user facing improvements possible if you have > > > lineage > > > > > > data available. Lineage greatly simplifies the ability to trace > > back > > > > > errors > > > > > > to the root cause in analytics. So, instead of the user calling > up > > > the > > > > > > engineering team in case of a data error, it could traceback to > the > > > > > origin > > > > > > of the data and call the one that has created the original data > > set. > > > > > > Lineage also greatly improves discoverability of data. Lineage > > > > > information > > > > > > gives insights into the importance of data sets. So if a new > > employee > > > > > joins > > > > > > a team he would normally go to the most senior person in that > team > > > to ask > > > > > > him what data sources he is using and what their meaning is. If > > > lineage > > > > > > information is exposed through a tool like Amundsen this is not > > > required > > > > > > because that person can just look it up. > > > > > > > > > > > > To summarise their are 3 use cases driving the need for lineage: > > > > > > > > > > > > 1. Discoverability of data > > > > > > 2. Improved data operations > > > > > > 3. Audibility of data pipelines > > > > > > > > > > > > So that’s all great I hear you thinking, but why don’t we have it > > in > > > > > > Airflow already if it is so important? The answer to that is two > > > fold. > > > > > > Firstly, adding lineage information is often associated with a > lot > > of > > > > > > metadata and meta programming. Typically if lineage is being > > > ’slapped on’ > > > > > > one needs to add a lot of metadata which then need to be kept in > > > sync. In > > > > > > that way it does not solve a problem for the developer and rather > > it > > > > > > creates one. Secondly, Airflow is a task based system and by > > > definition > > > > > > does not have a very good infrastructure that deals with data. In > > the > > > > > past > > > > > > we had some trials by Jeremiah to add Pipelines, but it never was > > > > > > integrated and I think it actually sparked him to start Prefect > ;-) > > > > > > (correct me if I am wrong if you are reading this Jermiah). > > > > > > > > > > > > Where is lineage support now in Airflow? In the 1.10.X series > there > > > is > > > > > some > > > > > > support for lineage, but it is buggy and difficult to use as it > is > > > based > > > > > on > > > > > > the metadata model of Apache Atlas. In master the foundation has > > much > > > > > > improved (but fully done yet). You can now set inlets and outlets > > > with > > > > > > lightweight objects like File(url=“http://www.google.com”) and > > > > > > Table(name=“my_table”) and the lineage system in Airflow will > > figure > > > out > > > > > a > > > > > > lot for you. You can also have inlets pick up outlets from > previous > > > > > > upstream tasks by passing a list of task_ids or even using “AUTO” > > > which > > > > > > picks up outlets from direct upstream tasks. > > > > > > > > > > > > The lightweight objects are automatically templated so you can do > > > > > something > > > > > > like File(url=“/tmp/my_data_{{ execution_date }}”) which does the > > > right > > > > > > thing for you. Templating inlets and outlets gives very powerful > > > > > > capabilities by for example creating a Task, that, based on the > > > inlets it > > > > > > receives, can drop PII information from an arbitrary table and > > output > > > > > this > > > > > > table somewhere else. This allows for creating Generic Tasks/Dags > > > that > > > > > can > > > > > > be re-used without any domain knowledge. A small example (not > PII) > > is > > > > > > available with the example_papermill_operator. > > > > > > > > > > > > Lineage information is exposed through an API endpoint. You can > > query > > > > > > “/api/experimental/lineage/<dag_id>/<execution_date>” and you > will > > > get a > > > > > > list of tasks with their inlets and outlets defined. The lineage > > > > > > information shared through the API and the lightweight object > model > > > are > > > > > > very close to the model used within Lyft’s Amundsen so when that > > gets > > > > > > proper visualisation support for lineage and pulls in the > > information > > > > > from > > > > > > Airflow it’s presto! Other systems might require some translation > > but > > > > > that > > > > > > shouldn’t be too hard. > > > > > > > > > > > > What doesn’t it do? Well, and here we get to the point of this > > > > > discussion, > > > > > > there is still meta programming involved to keep the normal > > > parameters > > > > > and > > > > > > the inlets and outlets to an operator in sync. This is because > it’s > > > hard > > > > > to > > > > > > make operators lineage aware without changing them. So while you > > set > > > > > > “inlets” and “outlets” to an Operator the operator itself doesn’t > > do > > > > > > anything with them, making them a lot less powerful. Actually, > > there > > > is > > > > > > only one operator that has out of the box support for lineage is > > the > > > > > > PapermillOperator. > > > > > > > > > > > > In discussions with the aforementioned organisations it became > > clear > > > > > that, > > > > > > while we could change all operators that Airflow comes out of the > > box > > > > > with, > > > > > > this will not help with the many custom operators that are > around. > > > They > > > > > > will simply not get updated as part of this exercise, leaving > them > > as > > > > > > technical debt. Thus we need an approach that works with the past > > and > > > > > > improves the future. The generic pattern for Airflow operators is > > > pretty > > > > > > simple: you can read many (yes we know there are exceptions!) as > > > > > > SourceToTarget(src_conn_id, src_xxx, src_xx, target_conn_id, > > > target_xxx, > > > > > > some_other_kwarg). Hence, we came up with the following: > > > > > > > > > > > > For existing non lineage aware operators: > > > > > > > > > > > > 1. Use wrapper objects to group parameters together as inlet or > as > > > > > outlet. > > > > > > For example usage for the MysqlToHiveTransfer could look like > > > > > > MysqlToHiveTransfer(Inlet(mysql_conn_id=‘mysql_conn’, > sql=’select * > > > from > > > > > > table’), Outlet(hive_cli_conn_id=‘hive_conn’, > > > > > hive_table=‘my_hive_table’)). > > > > > > The wrapper objects would then set the right kwargs to the > Operator > > > and > > > > > > create the lineage information. This resolves the issue of > keeping > > > > > > parameters in sync. > > > > > > 2. Use the build pattern to tell the lineage system which > arguments > > > to > > > > > the > > > > > > operator are for the Inlet and for the Outlet. Maybe with a type > > > hint if > > > > > > required. E.g. > > > > > > MysqlToHiveTransfer(mysql_conn_id=‘conn_id’, sql=’select * from > > > table’, > > > > > > hive_cli_conn_id=‘hive_conn’, > > > > > > hive_table=‘hive_table’).inlet(‘mysql_conn_id’,{’sql’: > > > > > > ‘mysql’}).outlet(‘hive_cli_conn_id’, ‘hive_table’) > > > > > > This requires a bit more work from the developer as the parameter > > > names > > > > > > need to be kept in sync. However, they are slow moving. > > > > > > > > > > > > Future lineage aware operators: > > > > > > > > > > > > 1. Update the Operator to set and support inlets and outlets > > itself. > > > E.g. > > > > > > like the current PapermillOperator > > > > > > 2. Have a dictionary inside the operator which tells the lineage > > > system > > > > > > what fields are used for inlet and outlet. This is the integrated > > > pattern > > > > > > of 2 for non lineage aware operators: > > > > > > # dictionary of parameter name with type > > > > > > inlet_fields = {‘mysql_conn_id’: ‘mysql_connection’, ’sql’: > ’sql’} > > > > > > outlet_fields = {‘hive_conn_id’: ‘hive_connection’, > ’hive_table’’: > > > > > > ’table’} > > > > > > Updates to the operator need to be checked to ensure the fields > > > names are > > > > > > kept in sync. > > > > > > 3. Enforce a naming pattern for Operators like > > > > > > MysqlToHiveTransfer(…) becomes > > > > > > MysqlToHive(mysql_conn_id, mysql_sql, hive_conn_id, hive_table) > or > > > > > > MysqlToHive(src_conn_id, src_sql, target_conn_id, target_table) > > > > > > This would allow the lineage system to figure out what is inlet > and > > > what > > > > > is > > > > > > outlet based on the naming scheme. It would require pylint plugin > > to > > > make > > > > > > sure Operators to behave correctly, but would also make operators > > > much > > > > > more > > > > > > predictable. > > > > > > > > > > > > Option number 3 for the future has the most impact. Out of the > box > > > the > > > > > > lineage system in Airflow can support (and its my intention to do > > > so) all > > > > > > the above patterns, but ideally we do improve the state so that > we > > > can > > > > > > deprecate what we do for non lineage aware operators in the > future: > > > > > wrapper > > > > > > objects and the build pattern wouldn’t be necessary anymore. > > > > > > > > > > > > What do you think? What are your thoughts on lineage, what kind > of > > > usages > > > > > > do you foresee? How would you like to be using it and have it > > > supported > > > > > in > > > > > > Airflow? Would you be able to work with the above ways of doing > it? > > > Pros > > > > > > and cons? > > > > > > > > > > > > Thanks > > > > > > Bolke > > > > > > > > > > > > > > > > > > > > -- > > > > Jarek Potiuk > > Polidea <https://www.polidea.com/> | Principal Software Engineer > > > > M: +48 660 796 129 <+48660796129> > > [image: Polidea] <https://www.polidea.com/> > > > > > -- > > Jarek Potiuk > Polidea <https://www.polidea.com/> | Principal Software Engineer > > M: +48 660 796 129 <+48660796129> > [image: Polidea] <https://www.polidea.com/> >
