Just want to preface my reply with the fact that I haven't thought about
data lineage very much.

This is an awesome idea :)! I like something like 1) personally, e.g.
operators could optionally define a .outlet() and .inlet() interface which
would return the inlets and outlets of a given task, and then it's up to
the operator how it wants to set these inlets/outlets like the Papermill
operator currently does. This also keeps allows inlets/outlets more dynamic
(e.g. in the case of an operator that might generate inlets/outlets
dynamically at execution time). Seems the most extensible/least coupling.
IMO we should strive to make DAGs easy to create with little boilerplate,
but this is a lot less important for operators since they are a lot more
stable and change less frequently, so it's fine to require operators to
implement some interface manually.

On Wed, Jan 22, 2020 at 8:33 AM Bolke de Bruin <[email protected]> wrote:

> Dear All,
>
> Over last few weeks I made serious improvements to the lineage support that
> Airflow has. Whilst not complete it’s starting to shape up and I think it
> is good to share some thoughts and directions. Much has been discussed with
> several organisations like Polidea, Daily Motion and Lyft. Some have
> already implemented some support for lineage themselves (Daily Motion) and
> some have a need for it (Lyft with Amundsen).
>
> First a bit of a recap. What is lineage of why is it important? Lineage
> allows you to track the origins of data what happens to it and where it
> moves over time. Lineage is often associated with audibility of data
> pipelines which is not a very sexy subject ;-). However, there are much
> more prominent and user facing improvements possible if you have lineage
> data available. Lineage greatly simplifies the ability to trace back errors
> to the root cause in analytics. So, instead of the user calling up the
> engineering team in case of a data error, it could traceback to the origin
> of the data and call the one that has created the original data set.
> Lineage also greatly improves discoverability of data. Lineage information
> gives insights into the importance of data sets. So if a new employee joins
> a team he would normally go to the most senior person in that team to ask
> him what data sources he is using and what their meaning is. If lineage
> information is exposed through a tool like Amundsen this is not required
> because that person can just look it up.
>
> To summarise their are 3 use cases driving the need for lineage:
>
> 1. Discoverability of data
> 2. Improved data operations
> 3. Audibility of data pipelines
>
> So that’s all great I hear you thinking, but why don’t we have it in
> Airflow already if it is so important? The answer to that is two fold.
> Firstly, adding lineage information is often associated with a lot of
> metadata and meta programming. Typically if lineage is being ’slapped on’
> one needs to add a lot of metadata which then need to be kept in sync. In
> that way it does not solve a problem for the developer and rather it
> creates one. Secondly, Airflow is a task based system and by definition
> does not have a very good infrastructure that deals with data. In the past
> we had some trials by Jeremiah to add Pipelines, but it never was
> integrated and I think it actually sparked him to start Prefect ;-)
> (correct me if I am wrong if you are reading this Jermiah).
>
> Where is lineage support now in Airflow? In the 1.10.X series there is some
> support for lineage, but it is buggy and difficult to use as it is based on
> the metadata model of Apache Atlas. In master the foundation has much
> improved (but fully done yet). You can now set inlets and outlets with
> lightweight objects like File(url=“http://www.google.com”) and
> Table(name=“my_table”) and the lineage system in Airflow will figure out a
> lot for you. You can also have inlets pick up outlets from previous
> upstream tasks by passing a list of task_ids or even using “AUTO” which
> picks up outlets from direct upstream tasks.
>
> The lightweight objects are automatically templated so you can do something
> like File(url=“/tmp/my_data_{{ execution_date }}”) which does the right
> thing for you. Templating inlets and outlets gives very powerful
> capabilities by for example creating a Task, that, based on the inlets it
> receives, can drop PII information from an arbitrary table and output this
> table somewhere else. This allows for creating Generic Tasks/Dags that can
> be re-used without any domain knowledge. A small example (not PII) is
> available with the example_papermill_operator.
>
> Lineage information is exposed through an API endpoint. You can query
> “/api/experimental/lineage/<dag_id>/<execution_date>” and you will get a
> list of tasks with their inlets and outlets defined. The lineage
> information shared through the API and the lightweight object model are
> very close to the model used within Lyft’s Amundsen so when that gets
> proper visualisation support for lineage and pulls in the information from
> Airflow it’s presto! Other systems might require some translation but that
> shouldn’t be too hard.
>
> What doesn’t it do? Well, and here we get to the point of this discussion,
> there is still meta programming involved to keep the normal parameters and
> the inlets and outlets to an operator in sync. This is because it’s hard to
> make operators lineage aware without changing them.  So while you set
> “inlets” and “outlets” to an Operator the operator itself doesn’t do
> anything with them, making them a lot less powerful. Actually, there is
> only one operator that has out of the box support for lineage is the
> PapermillOperator.
>
> In discussions with the aforementioned organisations it became clear that,
> while we could change all operators that Airflow comes out of the box with,
> this will not help with the many custom operators that are around. They
> will simply not get updated as part of this exercise, leaving them as
> technical debt. Thus we need an approach that works with the past and
> improves the future. The generic pattern for Airflow operators is pretty
> simple: you can read many (yes we know there are exceptions!) as
> SourceToTarget(src_conn_id, src_xxx, src_xx, target_conn_id, target_xxx,
> some_other_kwarg). Hence, we came up with the following:
>
> For existing non lineage aware operators:
>
> 1. Use wrapper objects to group parameters together as inlet or as outlet.
> For example usage for the MysqlToHiveTransfer could look like
> MysqlToHiveTransfer(Inlet(mysql_conn_id=‘mysql_conn’, sql=’select * from
> table’), Outlet(hive_cli_conn_id=‘hive_conn’, hive_table=‘my_hive_table’)).
> The wrapper objects would then set the right kwargs to the Operator and
> create the lineage information. This resolves the issue of keeping
> parameters in sync.
> 2. Use the build pattern to tell the lineage system which arguments to the
> operator are for the Inlet and for the Outlet. Maybe with a type hint if
> required. E.g.
> MysqlToHiveTransfer(mysql_conn_id=‘conn_id’, sql=’select * from table’,
> hive_cli_conn_id=‘hive_conn’,
> hive_table=‘hive_table’).inlet(‘mysql_conn_id’,{’sql’:
> ‘mysql’}).outlet(‘hive_cli_conn_id’, ‘hive_table’)
> This requires a bit more work from the developer as the parameter names
> need to be kept in sync. However, they are slow moving.
>
> Future lineage aware operators:
>
> 1. Update the Operator to set and support inlets and outlets itself. E.g.
> like the current PapermillOperator
> 2. Have a dictionary inside the operator which tells the lineage system
> what fields are used for inlet and outlet. This is the integrated pattern
> of 2 for non lineage aware operators:
>     # dictionary of parameter name with type
>     inlet_fields = {‘mysql_conn_id’: ‘mysql_connection’, ’sql’: ’sql’}
>     outlet_fields = {‘hive_conn_id’: ‘hive_connection’, ’hive_table’’:
> ’table’}
> Updates to the operator need to be checked to ensure the fields names are
> kept in sync.
> 3. Enforce a naming pattern for Operators like
>     MysqlToHiveTransfer(…) becomes
> MysqlToHive(mysql_conn_id, mysql_sql, hive_conn_id, hive_table) or
>     MysqlToHive(src_conn_id, src_sql, target_conn_id, target_table)
> This would allow the lineage system to figure out what is inlet and what is
> outlet based on the naming scheme. It would require pylint plugin to make
> sure Operators to behave correctly, but would also make operators much more
> predictable.
>
> Option number 3 for the future has the most impact. Out of the box the
> lineage system in Airflow can support (and its my intention to do so) all
> the above patterns, but ideally we do improve the state so that we can
> deprecate what we do for non lineage aware operators in the future: wrapper
> objects and the build pattern wouldn’t be necessary anymore.
>
> What do you think? What are your thoughts on lineage, what kind of usages
> do you foresee? How would you like to be using it and have it supported in
> Airflow? Would you be able to work with the above ways of doing it? Pros
> and cons?
>
> Thanks
> Bolke
>

Reply via email to