>
> File(url=“http://www.google.com”) | task1 | task2 > File(url=“
> file:///tmp/out”)
>

I love the UNIXY pipeline approach - a lot more than my operator
overloading proposal :).
And I think combining it with the builder pattern for more complex
cases works
very nicely (those complex cases will be rare I think). The | pattern will
only really
work "after" the tasks are defined though, not "when" they are defined so
they are
closer to dependencies rather than builder but I think it's good to have
it.

At the end I think my proposal with "set_inlet" rather than "inlet()" is
really a question of an emphasis - do we see it as something as we "build"
operator
with (hence builder pattern) or something that we "set" afterwards.
Implementation -
wise I think they are pretty much the same :). And I am fine to put the
emphasis on
"building"/"builderPattern" and name the methods "inlet()" "outlet()". It's
at the end
the matter of how users will use it - and what will be the most popular way
of defining
inlets/outlets. I think we agree in this case that "builder" + unixy
pipeline is something
we both can agree with :).

really do not want to have the mapping needed to be kept in sync by the DAG
> developer. It is okay for non lineage aware operators, but for lineage
> aware operators this shouldn’t be there - it should be inside the operator.
>

Agree. If we have lineage-aware operators we can add lineage as first-class
init option.
I do not think it should be a default approach (like "always add lineage to
all operators
when you update it") but it should be an option to build your own custom
lineage-aware
operator if you do not want to use builder pattern. Some "standard"
operators might
come with lineage built-in when they are pretty much "requiring" or
"strongly encouraging"
having lineage - for example where you have services that deal with
privacy.

I am still however not 100% convinced about the implicit kwargs passing
from Inlet/Outlet classes.
It sounds too implicit and too hacky for me - I have a gut feeling that
there will be cases where
it will break initialisation code in some cases (and for sure it breaks IDE
autocompletion "cleannes").
On the positive side I see how it saves on not having to define your own
custom lineage-aware
variants of the operators when they are not lineage-aware. This is quite a
value on its own - I fully agree.

But on the other hand, it feels like a bit of duct-tape solution :). Not
that I am all against duct-tape and
zip-ties - sometimes they are the best engineering solution you can come up
with and that's OK.

This opinion is not super-strong, I'd love to know  what others think about
it. If it's ok for others
and it's only me who has this concern, I am quite OK with having all three
options available:

   - custom Inlet/Outlet args with kwargs passing
   - inlet()/outlet() builder pattern
   - UNIXY | operator.

J.


>
>
>
> On 24 January 2020 at 14:35:00, Jarek Potiuk ([email protected])
> wrote:
>
> After discussing with Bolke (we had indeed very good and constructive
> discussion at Polidea office this week), I am a great supporter of adding
> more lineage support to Airflow and I think we should all as community
> think about how to make it as easy as possible to use and maintain. If it
> is not yet, it will soon become a requirement for many businesses to record
> the lineage data for many reasons (GDPR/ the California Consumer Privacy
> are the first ones that come to mind but there are many enterprise-internal
> cases that are also super-important. And adding lineage data might open-up
> Airflow deployments to a lot of new usages. So I am all for it and happy to
> support / take part in the effort to implement it.
>
>
> *Some context/thoughts:*
>
> I thought a bit about it after our discussions with Bolke, and from the
> implementation point of view I think of lineage mainly as "additional
> meta-data" - IMHO it should not interfere too much with the ways current
> DAGs are implemented. Lineage should be easy to use but it also should be
> optional, and I think it's a bad idea to make the users choose the way they
> are going to write the Dags depending whether they want to add lineage or
> not. I think it should be super easy to add lineage information to existing
> Dags without heavily impacting the way how the DAGs are implemented in the
> code. This should be an "extra" information added to existing dags.
> Otherwise we try to mix two different things - task dependency and data
> dependency. Putting them together is I think very difficult, but putting
> them as "separate layers" is quite doable. Also I think we already have in
> total many millions of DAGs written by different companies and the easier
> we make it to add Lineage information to those DAGs - the better IMHO. Also
> I think changing totally concept of writing DAGs and dependencies is a
> difficult one to pull-off - especially if we would like people to maintain
> DAGs written before/after lineage in the same place. I think transition
> should be simple and incremental rather than revolutionary
>
> I think most of the options we have differ by "syntactic-sugar" - under the
> hood they are all trying to achieve the same. And the syntactic-sugar is
> really what is most important for DAG developers, and Operator developers.
> So I don't even discuss the need and scope of API/etc - this is secondary
> and I think we can agree that after we all agree what syntax we use to add
> lineage information.
>
> *Comments on proposals:*
>
> Let me comment on those proposals (in this context above):
>
> 1) Bolke's Wrappers - point 1
>
> It does not impact the way how operators are written (and does not require
> to rewrite hundreds of operators). That's good. And it allows to
> incrementally change existing DAGs by replacing parameters of some
> operators to use Inlet/Outlet - it's good as well. It does not differ that
> much from existing dags - that's great. One thing that worries me in this
> case that in case you write Dags with/without lineage or have some
> operators with/without lineage in the same DAG, you get a different syntax
> (potentially for the same operator classes). I am not 100% sure if this is
> a realistic case to mix lineage/non-lineage tasks in the same DAG (I think
> it might happen) but there is a bit of "syntactic smell" (similar to
> code-smell) where we seem to have some inconsistent approach on how to
> define DAGs. It's not a deal-breaker, just a smell. It will be quite a work
> to convert between one and the other and when you want to copy&paste
> portions of DAGs with/without lineage it will not work.
>
> 2) Builder pattern: It has similar characteristics as 1) but it IMHO it is
> much nicer for the difference between lineage/non-lineage case. Main
> parameters remain the same, autocompletion when writing the dags will
> continue to work here (for operators) for example. and You can
> copy&paste easily between dags with/without lineage with just
> deleting/adding the builder methods. Indeed the parameter name have to be
> mapped, but as Bolke mentioned, they are slowly changing (if at all) and I
> think it's not a big problem overall - especially that we can very easily
> verify that at setting inlet/outlet - if parameter name is wrong, the whole
> DAG would fail so we have a safety net. I like it much better than 2).
>
> 3) I also see how we can extend 2. by utilising the pattern that we already
> use in Airflow - similar to what we do with dependencies. Currently task
> dependencies are independent from the definition of the operators and they
> are never defined at the place where operators are defined. And I think
> inlets/outlets should be similar. Similarly as in DAG dependencies, you
> could add the "lineage dependencies" right after where the operator is
> defined or "all dependencies in bulk". Depending on your dag style of
> currently defined dependencies - different people have different styles. I
> think we could add lineage information very similarly. It would be similar
> to the builder pattern but decoupled from task definition. And it would be
> closer to dependency definition rather than to task definition. It could
> also be supported by 'python operator overloading' - similarly as we do
> with << >> for set_downstream/set_upstream. Example:
>
> task1 = BashOperator() # For example
> task2 = MysqlToHiveTransfer(mysql_conn_id=‘conn_id’, sql=’select * from
> table’, hive_cli_conn_id=‘hive_conn’, hive_table=‘hive_table’)
>
> task1 >> task2
>
> task2.set_inlet(‘mysql_conn_id’,{’sql’:‘mysql’}).set_outlet(‘hive_cli_conn_id’,
>
> ‘hive_table’)
> or
> task2 <= (‘mysql_conn_id’,{’sql’:‘mysql’}) >= (‘hive_cli_conn_id’,
> ‘hive_table’)
>
> I think this is basically the same as builder pattern, but naming it
> builder pattern implies that it is done at "creation time". With this
> proposal (set_ method naming and operator overloading and separating it
> from task definition) it is more afterthought to task creation time -
> similarly as dependencies are. It has one advantage over the builder
> pattern - DAG definition with Lineage and without it look exactly the same
> and you can copy&paste them as they are. Lineage information is added extra
> and you can easily add more of the lineage
> information without touching the original DAG definition. You cam also
> choose your lineage definition style - either you couple it closer with
> task definition (if you add it right after the task) or you have separate
> "lineage" section where you have all the lineage dependencies for all the
> tasks in your DAG in the same place. I like this approach most, I think it
> serves the same purpose as 2) and 1) but is more flexible and more
> "incremental" in its nature.
>
> 4) Gerard's functional pattern - I think it's an interesting approach. But
> I think it's much deeper change in the approach on how we think about
> Airflow. I believe we should not mix it with lineage discussion. They might
> converge at some point, but @Gerard - I have a kind proposal / request -
> maybe you can open a separate thread with that at the devlist? It changes
> the whole approach for Dag writing to be more functional. With the lineage
> discussion, I think we talk about what can be implemented in 2.0 timeframe,
> where your change is much more futuristic and requires to change the
> paradigm shift for Airflow. I think the basic assumption for lineage is
> that we should be able to tap into both - existing operators and existing
> DAGs rather than rewrite them all.
>
> J.
>
>
>
> On Fri, Jan 24, 2020 at 12:10 AM Gerard Casas Saez
> <[email protected]> wrote:
>
> > Hi everyone!
> >
> > I think the whole data lineage proposal is great and I would like to
> > contribute a bit with my own thoughts on how to extend the Operators API
> > for better lineage support.
> >
> > Lately, I’ve been experimenting a bit on extending the Operator API to
> > make it more `functional` to specify Data dependencies and pipeline data
> > across the DAG. My approach is backwards compatible and it separates the
> > way you specify operator arguments with Inlets/Outlets dynamically
> > generated. I used XCom as a simplification to pass around dynamic values.
> >
> > My proposal is to include a __call__ function that would dynamically
> > replace class attributes before executing the `pre_execute` and `execute`
> > function. This tied with a XComArg, a class that points to a previous
> task
> > XCom pushed value, allowed me to define DAGs in a more functional
> > approach. Basically my proposal is:
> >
> >
> > • Add a __call__ function in BaseOperator that accepts Inlets (in my case
> > its XComArgs)
> > • Log their values on execution time (which would allow to expose a REST
> > API like proposed before)
> > • Resolves them before executing the main `execute` function
> > • Set attribute in the operator class
> > • Executes the operator and returns an XComArgs that can later be tied in
> > a new operator as an Inlet…
> >
> >
> > Here’s what it would look like (ML example, sorry):
> >
> > with DAG(...) as dag:
> > load = LoadDatasetOperator(task_id='load_dataset', )
> > split = SplitTrainTestOperator(task_id='split', test_perc=0.3)
> > train = TrainTensorflowModelOperator(task_id='train')
> > validate = PrecisionRecallOperator(task_id='pr')
> > report = EmailOperator(task_id='send_pr_report', subject='New model
> > trained results', email='[email protected]’)
> >
> > dataset = load(path='hdfs://some/dataset')
> > splitted_ds = split(dataset=dataset)
> > model = train(dataset=splitted_ds['train'],
> > model_specification='hdfs://some/dataset')
> > metrics = validate(model=model, dataset=splitted_ds['test'])
> > report(html_content=metrics)
> >
> > As someone wise sometime said, code is better than words, so here’s my
> > experimental code: https://github.com/casassg/corrent (ignore the awful
> > name and the injection part).
> >
> > Gerard Casas Saez
> > Twitter | Cortex | @casassaez
> > On Jan 22, 2020, 8:40 PM -0700, Tao Feng , wrote:
> > > Thanks Bolke. For those that are not aware, my team is working with
> > Bolke's
> > > team on Amundsen which is a data discovery and metadata project(
> > > https://github.com/lyft/amundsen) . I think although it ships with
> Atlas
> > > client(or it used to be), the new API per my understanding is generic
> > > enough that doesn't tight with atlas. E.g we(Lyft) could build a neo4j
> /
> > > Amundsen client in our Airflow fork to ingest the lineage info in a
> push
> > > fashion to build the lineage.
> > >
> > > Amundsen itself has put up the effort to integrate Airflow with the
> > > tool(connect which DAG/task produces the data set etc). With this
> > change, I
> > > foresee it will help to provide more enriched metadata.
> > >
> > > Thanks,
> > > -Tao
> > >
> > > On Wed, Jan 22, 2020 at 8:46 AM Dan Davydov
> <[email protected]
> > >
> > > wrote:
> > >
> > > > Just want to preface my reply with the fact that I haven't thought
> > about
> > > > data lineage very much.
> > > >
> > > > This is an awesome idea :)! I like something like 1) personally, e.g.
> > > > operators could optionally define a .outlet() and .inlet() interface
> > which
> > > > would return the inlets and outlets of a given task, and then it's up
> > to
> > > > the operator how it wants to set these inlets/outlets like the
> > Papermill
> > > > operator currently does. This also keeps allows inlets/outlets more
> > dynamic
> > > > (e.g. in the case of an operator that might generate inlets/outlets
> > > > dynamically at execution time). Seems the most extensible/least
> > coupling.
> > > > IMO we should strive to make DAGs easy to create with little
> > boilerplate,
> > > > but this is a lot less important for operators since they are a lot
> > more
> > > > stable and change less frequently, so it's fine to require operators
> to
> > > > implement some interface manually.
> > > >
> > > > On Wed, Jan 22, 2020 at 8:33 AM Bolke de Bruin <[email protected]>
> > wrote:
> > > >
> > > > > Dear All,
> > > > >
> > > > > Over last few weeks I made serious improvements to the lineage
> > support
> > > > that
> > > > > Airflow has. Whilst not complete it’s starting to shape up and I
> > think it
> > > > > is good to share some thoughts and directions. Much has been
> > discussed
> > > > with
> > > > > several organisations like Polidea, Daily Motion and Lyft. Some
> have
> > > > > already implemented some support for lineage themselves (Daily
> > Motion)
> > > > and
> > > > > some have a need for it (Lyft with Amundsen).
> > > > >
> > > > > First a bit of a recap. What is lineage of why is it important?
> > Lineage
> > > > > allows you to track the origins of data what happens to it and
> where
> > it
> > > > > moves over time. Lineage is often associated with audibility of
> data
> > > > > pipelines which is not a very sexy subject ;-). However, there are
> > much
> > > > > more prominent and user facing improvements possible if you have
> > lineage
> > > > > data available. Lineage greatly simplifies the ability to trace
> back
> > > > errors
> > > > > to the root cause in analytics. So, instead of the user calling up
> > the
> > > > > engineering team in case of a data error, it could traceback to the
> > > > origin
> > > > > of the data and call the one that has created the original data
> set.
> > > > > Lineage also greatly improves discoverability of data. Lineage
> > > > information
> > > > > gives insights into the importance of data sets. So if a new
> employee
> > > > joins
> > > > > a team he would normally go to the most senior person in that team
> > to ask
> > > > > him what data sources he is using and what their meaning is. If
> > lineage
> > > > > information is exposed through a tool like Amundsen this is not
> > required
> > > > > because that person can just look it up.
> > > > >
> > > > > To summarise their are 3 use cases driving the need for lineage:
> > > > >
> > > > > 1. Discoverability of data
> > > > > 2. Improved data operations
> > > > > 3. Audibility of data pipelines
> > > > >
> > > > > So that’s all great I hear you thinking, but why don’t we have it
> in
> > > > > Airflow already if it is so important? The answer to that is two
> > fold.
> > > > > Firstly, adding lineage information is often associated with a lot
> of
> > > > > metadata and meta programming. Typically if lineage is being
> > ’slapped on’
> > > > > one needs to add a lot of metadata which then need to be kept in
> > sync. In
> > > > > that way it does not solve a problem for the developer and rather
> it
> > > > > creates one. Secondly, Airflow is a task based system and by
> > definition
> > > > > does not have a very good infrastructure that deals with data. In
> the
> > > > past
> > > > > we had some trials by Jeremiah to add Pipelines, but it never was
> > > > > integrated and I think it actually sparked him to start Prefect ;-)
> > > > > (correct me if I am wrong if you are reading this Jermiah).
> > > > >
> > > > > Where is lineage support now in Airflow? In the 1.10.X series there
> > is
> > > > some
> > > > > support for lineage, but it is buggy and difficult to use as it is
> > based
> > > > on
> > > > > the metadata model of Apache Atlas. In master the foundation has
> much
> > > > > improved (but fully done yet). You can now set inlets and outlets
> > with
> > > > > lightweight objects like File(url=“http://www.google.com”) and
> > > > > Table(name=“my_table”) and the lineage system in Airflow will
> figure
> > out
> > > > a
> > > > > lot for you. You can also have inlets pick up outlets from previous
> > > > > upstream tasks by passing a list of task_ids or even using “AUTO”
> > which
> > > > > picks up outlets from direct upstream tasks.
> > > > >
> > > > > The lightweight objects are automatically templated so you can do
> > > > something
> > > > > like File(url=“/tmp/my_data_{{ execution_date }}”) which does the
> > right
> > > > > thing for you. Templating inlets and outlets gives very powerful
> > > > > capabilities by for example creating a Task, that, based on the
> > inlets it
> > > > > receives, can drop PII information from an arbitrary table and
> output
> > > > this
> > > > > table somewhere else. This allows for creating Generic Tasks/Dags
> > that
> > > > can
> > > > > be re-used without any domain knowledge. A small example (not PII)
> is
> > > > > available with the example_papermill_operator.
> > > > >
> > > > > Lineage information is exposed through an API endpoint. You can
> query
> > > > > “/api/experimental/lineage/<dag_id>/<execution_date>” and you will
> > get a
> > > > > list of tasks with their inlets and outlets defined. The lineage
> > > > > information shared through the API and the lightweight object model
> > are
> > > > > very close to the model used within Lyft’s Amundsen so when that
> gets
> > > > > proper visualisation support for lineage and pulls in the
> information
> > > > from
> > > > > Airflow it’s presto! Other systems might require some translation
> but
> > > > that
> > > > > shouldn’t be too hard.
> > > > >
> > > > > What doesn’t it do? Well, and here we get to the point of this
> > > > discussion,
> > > > > there is still meta programming involved to keep the normal
> > parameters
> > > > and
> > > > > the inlets and outlets to an operator in sync. This is because it’s
> > hard
> > > > to
> > > > > make operators lineage aware without changing them. So while you
> set
> > > > > “inlets” and “outlets” to an Operator the operator itself doesn’t
> do
> > > > > anything with them, making them a lot less powerful. Actually,
> there
> > is
> > > > > only one operator that has out of the box support for lineage is
> the
> > > > > PapermillOperator.
> > > > >
> > > > > In discussions with the aforementioned organisations it became
> clear
> > > > that,
> > > > > while we could change all operators that Airflow comes out of the
> box
> > > > with,
> > > > > this will not help with the many custom operators that are around.
> > They
> > > > > will simply not get updated as part of this exercise, leaving them
> as
> > > > > technical debt. Thus we need an approach that works with the past
> and
> > > > > improves the future. The generic pattern for Airflow operators is
> > pretty
> > > > > simple: you can read many (yes we know there are exceptions!) as
> > > > > SourceToTarget(src_conn_id, src_xxx, src_xx, target_conn_id,
> > target_xxx,
> > > > > some_other_kwarg). Hence, we came up with the following:
> > > > >
> > > > > For existing non lineage aware operators:
> > > > >
> > > > > 1. Use wrapper objects to group parameters together as inlet or as
> > > > outlet.
> > > > > For example usage for the MysqlToHiveTransfer could look like
> > > > > MysqlToHiveTransfer(Inlet(mysql_conn_id=‘mysql_conn’, sql=’select *
> > from
> > > > > table’), Outlet(hive_cli_conn_id=‘hive_conn’,
> > > > hive_table=‘my_hive_table’)).
> > > > > The wrapper objects would then set the right kwargs to the Operator
> > and
> > > > > create the lineage information. This resolves the issue of keeping
> > > > > parameters in sync.
> > > > > 2. Use the build pattern to tell the lineage system which arguments
> > to
> > > > the
> > > > > operator are for the Inlet and for the Outlet. Maybe with a type
> > hint if
> > > > > required. E.g.
> > > > > MysqlToHiveTransfer(mysql_conn_id=‘conn_id’, sql=’select * from
> > table’,
> > > > > hive_cli_conn_id=‘hive_conn’,
> > > > > hive_table=‘hive_table’).inlet(‘mysql_conn_id’,{’sql’:
> > > > > ‘mysql’}).outlet(‘hive_cli_conn_id’, ‘hive_table’)
> > > > > This requires a bit more work from the developer as the parameter
> > names
> > > > > need to be kept in sync. However, they are slow moving.
> > > > >
> > > > > Future lineage aware operators:
> > > > >
> > > > > 1. Update the Operator to set and support inlets and outlets
> itself.
> > E.g.
> > > > > like the current PapermillOperator
> > > > > 2. Have a dictionary inside the operator which tells the lineage
> > system
> > > > > what fields are used for inlet and outlet. This is the integrated
> > pattern
> > > > > of 2 for non lineage aware operators:
> > > > > # dictionary of parameter name with type
> > > > > inlet_fields = {‘mysql_conn_id’: ‘mysql_connection’, ’sql’: ’sql’}
> > > > > outlet_fields = {‘hive_conn_id’: ‘hive_connection’, ’hive_table’’:
> > > > > ’table’}
> > > > > Updates to the operator need to be checked to ensure the fields
> > names are
> > > > > kept in sync.
> > > > > 3. Enforce a naming pattern for Operators like
> > > > > MysqlToHiveTransfer(…) becomes
> > > > > MysqlToHive(mysql_conn_id, mysql_sql, hive_conn_id, hive_table) or
> > > > > MysqlToHive(src_conn_id, src_sql, target_conn_id, target_table)
> > > > > This would allow the lineage system to figure out what is inlet and
> > what
> > > > is
> > > > > outlet based on the naming scheme. It would require pylint plugin
> to
> > make
> > > > > sure Operators to behave correctly, but would also make operators
> > much
> > > > more
> > > > > predictable.
> > > > >
> > > > > Option number 3 for the future has the most impact. Out of the box
> > the
> > > > > lineage system in Airflow can support (and its my intention to do
> > so) all
> > > > > the above patterns, but ideally we do improve the state so that we
> > can
> > > > > deprecate what we do for non lineage aware operators in the future:
> > > > wrapper
> > > > > objects and the build pattern wouldn’t be necessary anymore.
> > > > >
> > > > > What do you think? What are your thoughts on lineage, what kind of
> > usages
> > > > > do you foresee? How would you like to be using it and have it
> > supported
> > > > in
> > > > > Airflow? Would you be able to work with the above ways of doing it?
> > Pros
> > > > > and cons?
> > > > >
> > > > > Thanks
> > > > > Bolke
> > > > >
> > > >
> >
>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48660796129>
> [image: Polidea] <https://www.polidea.com/>
>


-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>

Reply via email to