Hi All, Thanks for all the responses! Couple of additional thoughts.
It is the intention to have all Operators lineage aware. In that way lineage is built in so that a developer does not have to do anything. In its most simple form it will then just record the inlets and outlets to that particular operator but you won’t need to ‘do’ anything with it. That makes using it within your dag optional but readily available, but not required. That makes that I am not in favour of having a “set_inlets” and “set_outlets” as an after the fact meta thing as it requires keeping stuff in sync again. That defeats DRY. The overloading of operators I do like (and I already have a local implementation of) and it was my intention to use a pipe for that (|) so it would be something like File(url=“http://www.google.com”) | task1 | task2 > File(url=“ file:///tmp/out”) In this example you define your first Inlet as File which task1 picks up. Task1 has outlets which are then made available to task2 (but unspecified by the user) which has a user defined outlet File. This format is more “unixy” for people used to the command line. Challenges will be when a task has multiple inlets/outlets that are not directly used downstream. But there we could use the builder pattern. Finally, for the builder pattern I really do not want to have the mapping needed to be kept in sync by the DAG developer. It is okay for non lineage aware operators, but for lineage aware operators this shouldn’t be there - it should be inside the operator. I agree with Jarek that Gerard’s idea is a bit further down the line. Yes it would be awesome to be able to work with data dependencies along side task dependencies in Airflow, but we need a good foundation for that. The lineage capability will hopefully provide that foundation, but it solves other issues as well. For the API @Tao yes the new API is actually almost equal to what Amundsen expects. It is completely independent from Atlas. I do intend to re-enable to push based API. Also external enrichment is on the list so you could do something like: File(key=“qualified_key”, external=True) or which would pickup the information it needs from the external lineage system. This should then work for Atlas and Amundsen. Cheers Bolke On 24 January 2020 at 14:35:00, Jarek Potiuk ([email protected]) wrote: After discussing with Bolke (we had indeed very good and constructive discussion at Polidea office this week), I am a great supporter of adding more lineage support to Airflow and I think we should all as community think about how to make it as easy as possible to use and maintain. If it is not yet, it will soon become a requirement for many businesses to record the lineage data for many reasons (GDPR/ the California Consumer Privacy are the first ones that come to mind but there are many enterprise-internal cases that are also super-important. And adding lineage data might open-up Airflow deployments to a lot of new usages. So I am all for it and happy to support / take part in the effort to implement it. *Some context/thoughts:* I thought a bit about it after our discussions with Bolke, and from the implementation point of view I think of lineage mainly as "additional meta-data" - IMHO it should not interfere too much with the ways current DAGs are implemented. Lineage should be easy to use but it also should be optional, and I think it's a bad idea to make the users choose the way they are going to write the Dags depending whether they want to add lineage or not. I think it should be super easy to add lineage information to existing Dags without heavily impacting the way how the DAGs are implemented in the code. This should be an "extra" information added to existing dags. Otherwise we try to mix two different things - task dependency and data dependency. Putting them together is I think very difficult, but putting them as "separate layers" is quite doable. Also I think we already have in total many millions of DAGs written by different companies and the easier we make it to add Lineage information to those DAGs - the better IMHO. Also I think changing totally concept of writing DAGs and dependencies is a difficult one to pull-off - especially if we would like people to maintain DAGs written before/after lineage in the same place. I think transition should be simple and incremental rather than revolutionary I think most of the options we have differ by "syntactic-sugar" - under the hood they are all trying to achieve the same. And the syntactic-sugar is really what is most important for DAG developers, and Operator developers. So I don't even discuss the need and scope of API/etc - this is secondary and I think we can agree that after we all agree what syntax we use to add lineage information. *Comments on proposals:* Let me comment on those proposals (in this context above): 1) Bolke's Wrappers - point 1 It does not impact the way how operators are written (and does not require to rewrite hundreds of operators). That's good. And it allows to incrementally change existing DAGs by replacing parameters of some operators to use Inlet/Outlet - it's good as well. It does not differ that much from existing dags - that's great. One thing that worries me in this case that in case you write Dags with/without lineage or have some operators with/without lineage in the same DAG, you get a different syntax (potentially for the same operator classes). I am not 100% sure if this is a realistic case to mix lineage/non-lineage tasks in the same DAG (I think it might happen) but there is a bit of "syntactic smell" (similar to code-smell) where we seem to have some inconsistent approach on how to define DAGs. It's not a deal-breaker, just a smell. It will be quite a work to convert between one and the other and when you want to copy&paste portions of DAGs with/without lineage it will not work. 2) Builder pattern: It has similar characteristics as 1) but it IMHO it is much nicer for the difference between lineage/non-lineage case. Main parameters remain the same, autocompletion when writing the dags will continue to work here (for operators) for example. and You can copy&paste easily between dags with/without lineage with just deleting/adding the builder methods. Indeed the parameter name have to be mapped, but as Bolke mentioned, they are slowly changing (if at all) and I think it's not a big problem overall - especially that we can very easily verify that at setting inlet/outlet - if parameter name is wrong, the whole DAG would fail so we have a safety net. I like it much better than 2). 3) I also see how we can extend 2. by utilising the pattern that we already use in Airflow - similar to what we do with dependencies. Currently task dependencies are independent from the definition of the operators and they are never defined at the place where operators are defined. And I think inlets/outlets should be similar. Similarly as in DAG dependencies, you could add the "lineage dependencies" right after where the operator is defined or "all dependencies in bulk". Depending on your dag style of currently defined dependencies - different people have different styles. I think we could add lineage information very similarly. It would be similar to the builder pattern but decoupled from task definition. And it would be closer to dependency definition rather than to task definition. It could also be supported by 'python operator overloading' - similarly as we do with << >> for set_downstream/set_upstream. Example: task1 = BashOperator() # For example task2 = MysqlToHiveTransfer(mysql_conn_id=‘conn_id’, sql=’select * from table’, hive_cli_conn_id=‘hive_conn’, hive_table=‘hive_table’) task1 >> task2 task2.set_inlet(‘mysql_conn_id’,{’sql’:‘mysql’}).set_outlet(‘hive_cli_conn_id’, ‘hive_table’) or task2 <= (‘mysql_conn_id’,{’sql’:‘mysql’}) >= (‘hive_cli_conn_id’, ‘hive_table’) I think this is basically the same as builder pattern, but naming it builder pattern implies that it is done at "creation time". With this proposal (set_ method naming and operator overloading and separating it from task definition) it is more afterthought to task creation time - similarly as dependencies are. It has one advantage over the builder pattern - DAG definition with Lineage and without it look exactly the same and you can copy&paste them as they are. Lineage information is added extra and you can easily add more of the lineage information without touching the original DAG definition. You cam also choose your lineage definition style - either you couple it closer with task definition (if you add it right after the task) or you have separate "lineage" section where you have all the lineage dependencies for all the tasks in your DAG in the same place. I like this approach most, I think it serves the same purpose as 2) and 1) but is more flexible and more "incremental" in its nature. 4) Gerard's functional pattern - I think it's an interesting approach. But I think it's much deeper change in the approach on how we think about Airflow. I believe we should not mix it with lineage discussion. They might converge at some point, but @Gerard - I have a kind proposal / request - maybe you can open a separate thread with that at the devlist? It changes the whole approach for Dag writing to be more functional. With the lineage discussion, I think we talk about what can be implemented in 2.0 timeframe, where your change is much more futuristic and requires to change the paradigm shift for Airflow. I think the basic assumption for lineage is that we should be able to tap into both - existing operators and existing DAGs rather than rewrite them all. J. On Fri, Jan 24, 2020 at 12:10 AM Gerard Casas Saez <[email protected]> wrote: > Hi everyone! > > I think the whole data lineage proposal is great and I would like to > contribute a bit with my own thoughts on how to extend the Operators API > for better lineage support. > > Lately, I’ve been experimenting a bit on extending the Operator API to > make it more `functional` to specify Data dependencies and pipeline data > across the DAG. My approach is backwards compatible and it separates the > way you specify operator arguments with Inlets/Outlets dynamically > generated. I used XCom as a simplification to pass around dynamic values. > > My proposal is to include a __call__ function that would dynamically > replace class attributes before executing the `pre_execute` and `execute` > function. This tied with a XComArg, a class that points to a previous task > XCom pushed value, allowed me to define DAGs in a more functional > approach. Basically my proposal is: > > > • Add a __call__ function in BaseOperator that accepts Inlets (in my case > its XComArgs) > • Log their values on execution time (which would allow to expose a REST > API like proposed before) > • Resolves them before executing the main `execute` function > • Set attribute in the operator class > • Executes the operator and returns an XComArgs that can later be tied in > a new operator as an Inlet… > > > Here’s what it would look like (ML example, sorry): > > with DAG(...) as dag: > load = LoadDatasetOperator(task_id='load_dataset', ) > split = SplitTrainTestOperator(task_id='split', test_perc=0.3) > train = TrainTensorflowModelOperator(task_id='train') > validate = PrecisionRecallOperator(task_id='pr') > report = EmailOperator(task_id='send_pr_report', subject='New model > trained results', email='[email protected]’) > > dataset = load(path='hdfs://some/dataset') > splitted_ds = split(dataset=dataset) > model = train(dataset=splitted_ds['train'], > model_specification='hdfs://some/dataset') > metrics = validate(model=model, dataset=splitted_ds['test']) > report(html_content=metrics) > > As someone wise sometime said, code is better than words, so here’s my > experimental code: https://github.com/casassg/corrent (ignore the awful > name and the injection part). > > Gerard Casas Saez > Twitter | Cortex | @casassaez > On Jan 22, 2020, 8:40 PM -0700, Tao Feng , wrote: > > Thanks Bolke. For those that are not aware, my team is working with > Bolke's > > team on Amundsen which is a data discovery and metadata project( > > https://github.com/lyft/amundsen) . I think although it ships with Atlas > > client(or it used to be), the new API per my understanding is generic > > enough that doesn't tight with atlas. E.g we(Lyft) could build a neo4j / > > Amundsen client in our Airflow fork to ingest the lineage info in a push > > fashion to build the lineage. > > > > Amundsen itself has put up the effort to integrate Airflow with the > > tool(connect which DAG/task produces the data set etc). With this > change, I > > foresee it will help to provide more enriched metadata. > > > > Thanks, > > -Tao > > > > On Wed, Jan 22, 2020 at 8:46 AM Dan Davydov <[email protected] > > > > wrote: > > > > > Just want to preface my reply with the fact that I haven't thought > about > > > data lineage very much. > > > > > > This is an awesome idea :)! I like something like 1) personally, e.g. > > > operators could optionally define a .outlet() and .inlet() interface > which > > > would return the inlets and outlets of a given task, and then it's up > to > > > the operator how it wants to set these inlets/outlets like the > Papermill > > > operator currently does. This also keeps allows inlets/outlets more > dynamic > > > (e.g. in the case of an operator that might generate inlets/outlets > > > dynamically at execution time). Seems the most extensible/least > coupling. > > > IMO we should strive to make DAGs easy to create with little > boilerplate, > > > but this is a lot less important for operators since they are a lot > more > > > stable and change less frequently, so it's fine to require operators to > > > implement some interface manually. > > > > > > On Wed, Jan 22, 2020 at 8:33 AM Bolke de Bruin <[email protected]> > wrote: > > > > > > > Dear All, > > > > > > > > Over last few weeks I made serious improvements to the lineage > support > > > that > > > > Airflow has. Whilst not complete it’s starting to shape up and I > think it > > > > is good to share some thoughts and directions. Much has been > discussed > > > with > > > > several organisations like Polidea, Daily Motion and Lyft. Some have > > > > already implemented some support for lineage themselves (Daily > Motion) > > > and > > > > some have a need for it (Lyft with Amundsen). > > > > > > > > First a bit of a recap. What is lineage of why is it important? > Lineage > > > > allows you to track the origins of data what happens to it and where > it > > > > moves over time. Lineage is often associated with audibility of data > > > > pipelines which is not a very sexy subject ;-). However, there are > much > > > > more prominent and user facing improvements possible if you have > lineage > > > > data available. Lineage greatly simplifies the ability to trace back > > > errors > > > > to the root cause in analytics. So, instead of the user calling up > the > > > > engineering team in case of a data error, it could traceback to the > > > origin > > > > of the data and call the one that has created the original data set. > > > > Lineage also greatly improves discoverability of data. Lineage > > > information > > > > gives insights into the importance of data sets. So if a new employee > > > joins > > > > a team he would normally go to the most senior person in that team > to ask > > > > him what data sources he is using and what their meaning is. If > lineage > > > > information is exposed through a tool like Amundsen this is not > required > > > > because that person can just look it up. > > > > > > > > To summarise their are 3 use cases driving the need for lineage: > > > > > > > > 1. Discoverability of data > > > > 2. Improved data operations > > > > 3. Audibility of data pipelines > > > > > > > > So that’s all great I hear you thinking, but why don’t we have it in > > > > Airflow already if it is so important? The answer to that is two > fold. > > > > Firstly, adding lineage information is often associated with a lot of > > > > metadata and meta programming. Typically if lineage is being > ’slapped on’ > > > > one needs to add a lot of metadata which then need to be kept in > sync. In > > > > that way it does not solve a problem for the developer and rather it > > > > creates one. Secondly, Airflow is a task based system and by > definition > > > > does not have a very good infrastructure that deals with data. In the > > > past > > > > we had some trials by Jeremiah to add Pipelines, but it never was > > > > integrated and I think it actually sparked him to start Prefect ;-) > > > > (correct me if I am wrong if you are reading this Jermiah). > > > > > > > > Where is lineage support now in Airflow? In the 1.10.X series there > is > > > some > > > > support for lineage, but it is buggy and difficult to use as it is > based > > > on > > > > the metadata model of Apache Atlas. In master the foundation has much > > > > improved (but fully done yet). You can now set inlets and outlets > with > > > > lightweight objects like File(url=“http://www.google.com”) and > > > > Table(name=“my_table”) and the lineage system in Airflow will figure > out > > > a > > > > lot for you. You can also have inlets pick up outlets from previous > > > > upstream tasks by passing a list of task_ids or even using “AUTO” > which > > > > picks up outlets from direct upstream tasks. > > > > > > > > The lightweight objects are automatically templated so you can do > > > something > > > > like File(url=“/tmp/my_data_{{ execution_date }}”) which does the > right > > > > thing for you. Templating inlets and outlets gives very powerful > > > > capabilities by for example creating a Task, that, based on the > inlets it > > > > receives, can drop PII information from an arbitrary table and output > > > this > > > > table somewhere else. This allows for creating Generic Tasks/Dags > that > > > can > > > > be re-used without any domain knowledge. A small example (not PII) is > > > > available with the example_papermill_operator. > > > > > > > > Lineage information is exposed through an API endpoint. You can query > > > > “/api/experimental/lineage/<dag_id>/<execution_date>” and you will > get a > > > > list of tasks with their inlets and outlets defined. The lineage > > > > information shared through the API and the lightweight object model > are > > > > very close to the model used within Lyft’s Amundsen so when that gets > > > > proper visualisation support for lineage and pulls in the information > > > from > > > > Airflow it’s presto! Other systems might require some translation but > > > that > > > > shouldn’t be too hard. > > > > > > > > What doesn’t it do? Well, and here we get to the point of this > > > discussion, > > > > there is still meta programming involved to keep the normal > parameters > > > and > > > > the inlets and outlets to an operator in sync. This is because it’s > hard > > > to > > > > make operators lineage aware without changing them. So while you set > > > > “inlets” and “outlets” to an Operator the operator itself doesn’t do > > > > anything with them, making them a lot less powerful. Actually, there > is > > > > only one operator that has out of the box support for lineage is the > > > > PapermillOperator. > > > > > > > > In discussions with the aforementioned organisations it became clear > > > that, > > > > while we could change all operators that Airflow comes out of the box > > > with, > > > > this will not help with the many custom operators that are around. > They > > > > will simply not get updated as part of this exercise, leaving them as > > > > technical debt. Thus we need an approach that works with the past and > > > > improves the future. The generic pattern for Airflow operators is > pretty > > > > simple: you can read many (yes we know there are exceptions!) as > > > > SourceToTarget(src_conn_id, src_xxx, src_xx, target_conn_id, > target_xxx, > > > > some_other_kwarg). Hence, we came up with the following: > > > > > > > > For existing non lineage aware operators: > > > > > > > > 1. Use wrapper objects to group parameters together as inlet or as > > > outlet. > > > > For example usage for the MysqlToHiveTransfer could look like > > > > MysqlToHiveTransfer(Inlet(mysql_conn_id=‘mysql_conn’, sql=’select * > from > > > > table’), Outlet(hive_cli_conn_id=‘hive_conn’, > > > hive_table=‘my_hive_table’)). > > > > The wrapper objects would then set the right kwargs to the Operator > and > > > > create the lineage information. This resolves the issue of keeping > > > > parameters in sync. > > > > 2. Use the build pattern to tell the lineage system which arguments > to > > > the > > > > operator are for the Inlet and for the Outlet. Maybe with a type > hint if > > > > required. E.g. > > > > MysqlToHiveTransfer(mysql_conn_id=‘conn_id’, sql=’select * from > table’, > > > > hive_cli_conn_id=‘hive_conn’, > > > > hive_table=‘hive_table’).inlet(‘mysql_conn_id’,{’sql’: > > > > ‘mysql’}).outlet(‘hive_cli_conn_id’, ‘hive_table’) > > > > This requires a bit more work from the developer as the parameter > names > > > > need to be kept in sync. However, they are slow moving. > > > > > > > > Future lineage aware operators: > > > > > > > > 1. Update the Operator to set and support inlets and outlets itself. > E.g. > > > > like the current PapermillOperator > > > > 2. Have a dictionary inside the operator which tells the lineage > system > > > > what fields are used for inlet and outlet. This is the integrated > pattern > > > > of 2 for non lineage aware operators: > > > > # dictionary of parameter name with type > > > > inlet_fields = {‘mysql_conn_id’: ‘mysql_connection’, ’sql’: ’sql’} > > > > outlet_fields = {‘hive_conn_id’: ‘hive_connection’, ’hive_table’’: > > > > ’table’} > > > > Updates to the operator need to be checked to ensure the fields > names are > > > > kept in sync. > > > > 3. Enforce a naming pattern for Operators like > > > > MysqlToHiveTransfer(…) becomes > > > > MysqlToHive(mysql_conn_id, mysql_sql, hive_conn_id, hive_table) or > > > > MysqlToHive(src_conn_id, src_sql, target_conn_id, target_table) > > > > This would allow the lineage system to figure out what is inlet and > what > > > is > > > > outlet based on the naming scheme. It would require pylint plugin to > make > > > > sure Operators to behave correctly, but would also make operators > much > > > more > > > > predictable. > > > > > > > > Option number 3 for the future has the most impact. Out of the box > the > > > > lineage system in Airflow can support (and its my intention to do > so) all > > > > the above patterns, but ideally we do improve the state so that we > can > > > > deprecate what we do for non lineage aware operators in the future: > > > wrapper > > > > objects and the build pattern wouldn’t be necessary anymore. > > > > > > > > What do you think? What are your thoughts on lineage, what kind of > usages > > > > do you foresee? How would you like to be using it and have it > supported > > > in > > > > Airflow? Would you be able to work with the above ways of doing it? > Pros > > > > and cons? > > > > > > > > Thanks > > > > Bolke > > > > > > > > -- Jarek Potiuk Polidea <https://www.polidea.com/> | Principal Software Engineer M: +48 660 796 129 <+48660796129> [image: Polidea] <https://www.polidea.com/>
