dstandish commented on code in PR #26208:
URL: https://github.com/apache/airflow/pull/26208#discussion_r965179609
##########
docs/apache-airflow/concepts/datasets.rst:
##########
@@ -15,33 +15,185 @@
specific language governing permissions and limitations
under the License.
-Datasets
-========
+Data-aware scheduling
+=====================
.. versionadded:: 2.4
-With datasets, instead of running a DAG on a schedule, a DAG can be configured
to run when a dataset has been updated.
+Quickstart
+----------
-To use this feature, define a dataset:
+In addition to scheduling DAGs based upon time, they can also be scheduled
based upon a task updating a dataset.
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
- :language: python
- :start-after: [START dataset_def]
- :end-before: [END dataset_def]
+.. code-block:: python
-Then reference the dataset as a task outlet:
+ from airflow import Dataset
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
- :language: python
- :dedent: 4
- :start-after: [START task_outlet]
- :end-before: [END task_outlet]
+ with DAG(...):
+ MyOperator(
+ # this task updates example.csv
+ outlets=[Dataset("s3://dataset-bucket/example.csv")],
+ ...,
+ )
-Finally, define a DAG and reference this dataset in the DAG's ``schedule``
argument:
-.. exampleinclude:: /../../airflow/example_dags/example_datasets.py
- :language: python
- :start-after: [START dag_dep]
- :end-before: [END dag_dep]
+ with DAG(
+ # this DAG should be run when example.csv is updated (by dag1)
+ schedule=[Dataset("s3://dataset-bucket/example.csv")],
+ ...,
+ ):
+ ...
-You can reference multiple datasets in the DAG's ``schedule`` argument. Once
there has been an update to all of the upstream datasets, the DAG will be
triggered. This means that the DAG will run as frequently as its
least-frequently-updated dataset.
+What is a "dataset"?
+--------------------
+
+An Airflow dataset is a stand-in for a logical grouping of data. Datasets may
be updated by upstream "producer" tasks, and dataset updates contribute to
scheduling downstream "consumer" DAGs.
+
+A dataset is defined by a Uniform Resource Identifier (URI):
+
+.. code-block:: python
+
+ from airflow import Dataset
+
+ example_dataset = Dataset('s3://dataset-bucket/example.csv')
+
+Airflow treats the dataset URI as an opaque value intended to be
human-readable, and makes no assumptions about the content or location of the
data represented by the identifier. It is treated as a string, so any use of
regular expressions (eg ``input_\d+.csv``) or file glob patterns (eg
``input_2022*.csv``) as an attempt to create multiple datasets from one
declaration will not work.
+
+There are two restrictions on the dataset URI:
+
+1. It must be a valid URI, which means it must only be composed of only ASCII
characters.
+2. The URI scheme cannot be ``airflow`` (this is reserved for future use).
+
+If you try to use either of the examples below, your code will cause a
ValueError to be raised, and Airflow will not import it.
+
+.. code-block:: python
+
+ # invalid datasets:
+ reserved = Dataset("airflow://example_dataset")
+ not_ascii = Dataset("èxample_datašet")
+
+The identifier does not have to be an absolute URI, it can be a scheme-less,
relative URI, or even just a simple path or string:
+
+.. code-block:: python
+
+ # valid datasets:
+ schemeless = Dataset("//example/dataset")
+ csv_file = Dataset("example.csv")
+
+If required, an extra dictionary can be included in a Dataset:
+
+.. code-block:: python
+
+ example_dataset = Dataset(
+ "s3://dataset/example.csv",
+ extra={'team': 'trainees'},
+ )
+
+..note::
+
+ Security Note: Dataset URI and extra fields are not encrypted, they are
stored in cleartext, in Airflow's metadata database. Do NOT store any sensitive
values, especially credentials, in dataset URIs or extra key values!
+
+The URI is also case sensitive throughout, so ``s3://example_dataset`` and
``s3://Example_Dataset`` are considered different, as is
``s3://example_dataset`` and ``S3://example_dataset``.
+
+How to use datasets in your DAGs
+--------------------------------
+
+You can use datasets to specify data dependencies in your DAGs. Take the
following example:
+
+.. code-block:: python
+
+ example_dataset = Dataset("s3://dataset/example.csv")
+
+ with DAG(dag_id='producer', ...):
+ BashOperator(task_id='producer', outlets=[example_dataset], ...)
+
+ with DAG(dag_id='consumer', schedule=[example_dataset], ...):
+ ...
+
+Once the ``producer`` task in the ``producer`` DAG has completed successfully,
Airflow schedules the ``consumer`` DAG. Only a task's success triggers dataset
updates — if the task fails or if it raises an
:class:`~airflow.exceptions.AirflowSkipException`, no update occurs, and the
``consumer`` DAG will not be scheduled.
Review Comment:
```suggestion
Once the ``producer`` task in the ``producer`` DAG has completed
successfully, Airflow schedules the ``consumer`` DAG. A dataset will be marked
as updated only if the task completes successfully — if the task fails or if it
is skipped, no update occurs, and the ``consumer`` DAG will not be scheduled.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]