Forgot to answer your question for S3 it could look like:

s3_file = File("s3a://bucket/key")
Inlets = {"datasets:" [s3_file,]}

Obviously if you do something with the s3 file outside of Airflow you need to 
track lineage yourself somehow. 

B.

Sent from my iPhone

> On 6 May 2018, at 11:05, Bolke de Bruin <[email protected]> wrote:
> 
> Hi Gerardo,
> 
> Any lineage tracking system is dependent on how much data you can give it. So 
> if you do transfers outside of the 'view' such a system has then lineage 
> information is gone. Airflow can help in this area by tracking its internal 
> lineage and providing that to those lineage systems. 
> 
> Apache Atlas is agnostic and can receive lineage info by rest API (used in my 
> implementation) and Kafk topic. It does also come with a lot of connectors 
> out of the box that tie into the hadoop ecosystem and make your live easier 
> there. The Airflow Atlas connector supplies Atlas with information that it 
> doesn't know about yet closing the loop further. 
> 
> Also you can write your own connector and put it on the Airflow class path 
> and use that one. 
> 
> Bolke
> 
> Sent from my iPhone
> 
>> On 6 May 2018, at 09:13, Gerardo Curiel <[email protected]> wrote:
>> 
>> Hi Bolke,
>> 
>> Data lineage support sounds very interesting.
>> 
>> I'm not very familiar with Atlas but first sight seems like a tool specific
>> to the Hadoop ecosystem. How would this look like if the files (inlets or
>> outlets) were stored on s3?.
>> 
>> An example of a service that manages a similar use case is AWS Glue[1],
>> which creates a hive metastore based on the schema and other metadata it
>> can get from different sources (amongst them, s3 files).
>> 
>> 
>>> On Sun, May 6, 2018 at 7:49 AM, Bolke de Bruin <[email protected]> wrote:
>>> 
>>> Hi All,
>>> 
>>> I have made a first implementation that allows tracking of lineage in
>>> Airflow and integration with Apache Atlas. It was inspired by Jeremiah’s
>>> work in the past on Data Flow pipelines, but I think I kept it a little bit
>>> simpler.
>>> 
>>> Operators now have two new parameters called “inlets” and “outlets”. These
>>> can be filled with objects derived from “DataSet”, like “File” and
>>> “HadoopFile”. Parameters are jinja2 templated, which
>>> means they receive the context of the task when it is running and get
>>> rendered. So you can get definitions like this:
>>> 
>>> f_final = File(name="/tmp/final")
>>> run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
>>>   inlets={"auto": True},
>>>   outlets={"datasets": [f_final,]})
>>> 
>>> f_in = File(name="/tmp/whole_directory/")
>>> outlets = []
>>> for file in FILE_CATEGORIES:
>>>   f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
>>>   outlets.append(f_out)
>>> run_this = BashOperator(
>>>   task_id='run_after_loop', bash_command='echo 1', dag=dag,
>>>   inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
>>>   outlets={"datasets": outlets}
>>>   )
>>> run_this.set_downstream(run_this_last)
>>> 
>>> So I am trying to keep to boilerplate work down for developers. Operators
>>> can also extend inlets and outlets automatically. This will probably be a
>>> bit harder for the BashOperator without some special magic, but an update
>>> to the DruidOperator can be relatively quite straightforward.
>>> 
>>> In the future Operators can take advantage of the inlet/outlet definitions
>>> as they are also made available as part of the context for templating (as
>>> “inlets” and “outlets”).
>>> 
>>> I’m looking forward to your comments!
>>> 
>>> https://github.com/apache/incubator-airflow/pull/3321
>>> 
>>> Bolke.
>> 
>> 
>> 
>> [1] https://aws.amazon.com/glue/
>> 
>> Cheers,
>> 
>> -- 
>> Gerardo Curiel // https://gerar.do

Reply via email to