Jeremiah Lowin created AIRFLOW-825:
--------------------------------------
Summary: Add Dataflow semantics
Key: AIRFLOW-825
URL: https://issues.apache.org/jira/browse/AIRFLOW-825
Project: Apache Airflow
Issue Type: Improvement
Components: Dataflow
Reporter: Jeremiah Lowin
Assignee: Jeremiah Lowin
Following discussion on the dev list, this adds first-class Dataflow semantics
to Airflow.
Please see my PR for examples and unit tests. From the documentation:
A Dataflow object represents the result of an upstream task. If the upstream
task has multiple outputs contained in a tuple, dict, or other indexable form,
an index may be provided so the Dataflow only uses the appropriate output.
Dataflows are passed to downstream tasks with a key. This has two effects:
1. It sets up a dependency between the upstream and downstream tasks to
ensure that the downstream task does not run before the upstream result
is available.
2. It ensures that the [indexed] upstream result is available in the
downstream task's context as ``context['dataflows'][key]``. In addition,
the result will be passed directly to PythonOperators as a keyword
argument.
Dataflows use the XCom mechanism to exchange data. Data is passed through the
following series of steps:
1. After the upstream task runs, data is passed to the Dataflow object's
_set_data() method.
2. The Dataflow's serialize() method is called on the data. This method
takes the data object and returns a representation that can be used to
reconstruct it later.
3. _set_data() stores the serialized result as an XCom.
4. Before the downstream task runs, it calls the Dataflow _get_data()
method.
5. _get_data() retrieves the upstream XCom.
6. The Dataflow's deserialize() method is called. This method takes the
serialiezd representation and returns the data object.
7. The data object is passed to the downstream task.
The basic Dataflow object has identity serialize and deserialize methods,
meaning data is stored directly in the Airflow database. Therefore, for
performance and practical reasons, basic Dataflows should not be used with
large or complex results.
Dataflows can easily be extended to use remote storage. In this case, the
serialize method should write the data in to storage and return a URI, which
will be stored as an XCom. The URI will be passed to deserialize() so that
the data can be downloaded and reconstructed.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)