Forgot to answer your question for S3 it could look like:
s3_file = File("s3a://bucket/key")
Inlets = {"datasets:" [s3_file,]}
Obviously if you do something with the s3 file outside of Airflow you need to
track lineage yourself somehow.
B.
Sent from my iPhone
> On 6 May 2018, at 11:05, Bolke de Bruin <[email protected]> wrote:
>
> Hi Gerardo,
>
> Any lineage tracking system is dependent on how much data you can give it. So
> if you do transfers outside of the 'view' such a system has then lineage
> information is gone. Airflow can help in this area by tracking its internal
> lineage and providing that to those lineage systems.
>
> Apache Atlas is agnostic and can receive lineage info by rest API (used in my
> implementation) and Kafk topic. It does also come with a lot of connectors
> out of the box that tie into the hadoop ecosystem and make your live easier
> there. The Airflow Atlas connector supplies Atlas with information that it
> doesn't know about yet closing the loop further.
>
> Also you can write your own connector and put it on the Airflow class path
> and use that one.
>
> Bolke
>
> Sent from my iPhone
>
>> On 6 May 2018, at 09:13, Gerardo Curiel <[email protected]> wrote:
>>
>> Hi Bolke,
>>
>> Data lineage support sounds very interesting.
>>
>> I'm not very familiar with Atlas but first sight seems like a tool specific
>> to the Hadoop ecosystem. How would this look like if the files (inlets or
>> outlets) were stored on s3?.
>>
>> An example of a service that manages a similar use case is AWS Glue[1],
>> which creates a hive metastore based on the schema and other metadata it
>> can get from different sources (amongst them, s3 files).
>>
>>
>>> On Sun, May 6, 2018 at 7:49 AM, Bolke de Bruin <[email protected]> wrote:
>>>
>>> Hi All,
>>>
>>> I have made a first implementation that allows tracking of lineage in
>>> Airflow and integration with Apache Atlas. It was inspired by Jeremiah’s
>>> work in the past on Data Flow pipelines, but I think I kept it a little bit
>>> simpler.
>>>
>>> Operators now have two new parameters called “inlets” and “outlets”. These
>>> can be filled with objects derived from “DataSet”, like “File” and
>>> “HadoopFile”. Parameters are jinja2 templated, which
>>> means they receive the context of the task when it is running and get
>>> rendered. So you can get definitions like this:
>>>
>>> f_final = File(name="/tmp/final")
>>> run_this_last = DummyOperator(task_id='run_this_last', dag=dag,
>>> inlets={"auto": True},
>>> outlets={"datasets": [f_final,]})
>>>
>>> f_in = File(name="/tmp/whole_directory/")
>>> outlets = []
>>> for file in FILE_CATEGORIES:
>>> f_out = File(name="/tmp/{}/{{{{ execution_date }}}}".format(file))
>>> outlets.append(f_out)
>>> run_this = BashOperator(
>>> task_id='run_after_loop', bash_command='echo 1', dag=dag,
>>> inlets={"auto": False, "task_ids": [], "datasets": [f_in,]},
>>> outlets={"datasets": outlets}
>>> )
>>> run_this.set_downstream(run_this_last)
>>>
>>> So I am trying to keep to boilerplate work down for developers. Operators
>>> can also extend inlets and outlets automatically. This will probably be a
>>> bit harder for the BashOperator without some special magic, but an update
>>> to the DruidOperator can be relatively quite straightforward.
>>>
>>> In the future Operators can take advantage of the inlet/outlet definitions
>>> as they are also made available as part of the context for templating (as
>>> “inlets” and “outlets”).
>>>
>>> I’m looking forward to your comments!
>>>
>>> https://github.com/apache/incubator-airflow/pull/3321
>>>
>>> Bolke.
>>
>>
>>
>> [1] https://aws.amazon.com/glue/
>>
>> Cheers,
>>
>> --
>> Gerardo Curiel // https://gerar.do