Hi All,

I have made a first implementation that allows tracking of lineage in Airflow 
and integration with Apache Atlas. It was inspired by Jeremiah’s work in the 
past on Data Flow pipelines, but I think I kept it a little bit simpler. 

Operators now have two new parameters called “inlets” and “outlets”. These can 
be filled with objects derived from “DataSet”, like “File” and “HadoopFile”. 
Parameters are jinja2 templated, which
means they receive the context of the task when it is running and get rendered. 
So you can get definitions like this:

f_final = File(name="/tmp/final")
run_this_last = DummyOperator(task_id='run_this_last', dag=dag, 
    inlets={"auto": True},
    outlets={"datasets": [f_final,]})

f_in = File(name="/tmp/whole_directory/")
outlets = []
for file in FILE_CATEGORIES:
    f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
    outlets.append(f_out)
run_this = BashOperator(    
    task_id='run_after_loop', bash_command='echo 1', dag=dag,
    inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
    outlets={"datasets": outlets}
    )
run_this.set_downstream(run_this_last)

So I am trying to keep to boilerplate work down for developers. Operators can 
also extend inlets and outlets automatically. This will probably be a bit 
harder for the BashOperator without some special magic, but an update to the 
DruidOperator can be relatively quite straightforward.

In the future Operators can take advantage of the inlet/outlet definitions as 
they are also made available as part of the context for templating (as “inlets” 
and “outlets”).

I’m looking forward to your comments!

https://github.com/apache/incubator-airflow/pull/3321

Bolke.

Reply via email to