This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 89fd695aa8 add example dag for dataset_alias (#41037)
89fd695aa8 is described below
commit 89fd695aa8fa5d51851777718c6d0a3c4cf45fa5
Author: Wei Lee <[email protected]>
AuthorDate: Tue Jul 30 18:06:07 2024 +0800
add example dag for dataset_alias (#41037)
---
airflow/example_dags/example_dataset_alias.py | 104 ++++++++++++++++++++++++++
tests/always/test_example_dags.py | 2 +-
2 files changed, 105 insertions(+), 1 deletion(-)
diff --git a/airflow/example_dags/example_dataset_alias.py
b/airflow/example_dags/example_dataset_alias.py
new file mode 100644
index 0000000000..1e29241b8b
--- /dev/null
+++ b/airflow/example_dags/example_dataset_alias.py
@@ -0,0 +1,104 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example DAG for demonstrating the behavior of the DatasetAlias feature in
Airflow, including conditional and
+dataset expression-based scheduling.
+
+Notes on usage:
+
+Turn on all the DAGs.
+
+Before running any DAG, the schedule of the "dataset-alias-consumer" DAG will
show as "unresolved DatasetAlias".
+This is expected because the dataset alias has not been resolved into any
dataset yet.
+
+Once the "dataset-alias-producer" DAG is triggered, the "dataset-consumer" DAG
should be triggered upon completion.
+This is because the dataset alias "example-alias" is used to add a dataset
event to the dataset "s3://bucket/my-task"
+during the "produce_dataset_events_through_dataset_alias" task.
+After the completion of this task, the schedule of the
"dataset-alias-consumer" DAG should change to "Dataset" as
+the dataset alias "example-alias" is now resolved to the dataset
"s3://bucket/my-task".
+It's expected that the "dataset-alias-consumer" DAG is not triggered at this
point, despite also relying on
+the dataset alias "example-alias," which was initially resolved to nothing.
+Once the resolution occurs, triggering either the "dataset-producer" or
"dataset-alias-producer" DAG should
+also trigger both the "dataset-consumer" and "dataset-alias-consumer" DAGs.
+"""
+
+from __future__ import annotations
+
+import pendulum
+
+from airflow import DAG
+from airflow.datasets import Dataset, DatasetAlias
+from airflow.decorators import task
+
+with DAG(
+ dag_id="dataset_s3_bucket_producer",
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+ schedule=None,
+ catchup=False,
+ tags=["producer", "dataset"],
+):
+
+ @task(outlets=[Dataset("s3://bucket/my-task")])
+ def produce_dataset_events():
+ pass
+
+ produce_dataset_events()
+
+with DAG(
+ dag_id="dataset_alias_example_alias_producer",
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+ schedule=None,
+ catchup=False,
+ tags=["producer", "dataset-alias"],
+):
+
+ @task(outlets=[DatasetAlias("example-alias")])
+ def produce_dataset_events_through_dataset_alias(*, outlet_events=None):
+ bucket_name = "bucket"
+ object_path = "my-task"
+
outlet_events["example-alias"].add(Dataset(f"s3://{bucket_name}/{object_path}"))
+
+ produce_dataset_events_through_dataset_alias()
+
+with DAG(
+ dag_id="dataset_s3_bucket_consumer",
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+ schedule=[Dataset("s3://bucket/my-task")],
+ catchup=False,
+ tags=["consumer", "dataset"],
+):
+
+ @task
+ def consume_dataset_event():
+ pass
+
+ consume_dataset_event()
+
+with DAG(
+ dag_id="dataset_alias_example_alias_consumer",
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+ schedule=[DatasetAlias("example-alias")],
+ catchup=False,
+ tags=["consumer", "dataset-alias"],
+):
+
+ @task(inlets=[DatasetAlias("example-alias")])
+ def consume_dataset_event_from_dataset_alias(*, inlet_events=None):
+ for event in inlet_events[DatasetAlias("example-alias")]:
+ print(event)
+
+ consume_dataset_event_from_dataset_alias()
diff --git a/tests/always/test_example_dags.py
b/tests/always/test_example_dags.py
index d8c97ca4f0..db06b76907 100644
--- a/tests/always/test_example_dags.py
+++ b/tests/always/test_example_dags.py
@@ -193,7 +193,7 @@ def test_should_be_importable(example: str):
@pytest.mark.db_test
@pytest.mark.parametrize("example",
example_not_excluded_dags(xfail_db_exception=True))
def test_should_not_do_database_queries(example: str):
- with assert_queries_count(0, stacklevel_from_module=example.rsplit(os.sep,
1)[-1]):
+ with assert_queries_count(1, stacklevel_from_module=example.rsplit(os.sep,
1)[-1]):
DagBag(
dag_folder=example,
include_examples=False,