This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5fe1d45111 Adding Dataset Alias Example DAG with classic operators
(#41302)
5fe1d45111 is described below
commit 5fe1d45111177563c4c703d35817e59172f736cc
Author: vatsrahul1001 <[email protected]>
AuthorDate: Thu Aug 8 13:05:20 2024 +0530
Adding Dataset Alias Example DAG with classic operators (#41302)
---
airflow/example_dags/example_dataset_alias.py | 6 +-
.../example_dataset_alias_with_no_taskflow.py | 108 +++++++++++++++++++++
tests/www/views/test_views_acl.py | 20 ++++
3 files changed, 131 insertions(+), 3 deletions(-)
diff --git a/airflow/example_dags/example_dataset_alias.py
b/airflow/example_dags/example_dataset_alias.py
index 37d368ccb1..c50a89e34f 100644
--- a/airflow/example_dags/example_dataset_alias.py
+++ b/airflow/example_dags/example_dataset_alias.py
@@ -22,15 +22,15 @@ Notes on usage:
Turn on all the DAGs.
-Before running any DAG, the schedule of the "dataset-alias-consumer" DAG will
show as "Unresolved DatasetAlias".
+Before running any DAG, the schedule of the
"dataset_alias_example_alias_consumer" DAG will show as "Unresolved
DatasetAlias".
This is expected because the dataset alias has not been resolved into any
dataset yet.
-Once the "dataset-alias-producer" DAG is triggered, the "dataset-consumer" DAG
should be triggered upon completion.
+Once the "dataset_s3_bucket_producer" DAG is triggered, the
"dataset_s3_bucket_consumer" DAG should be triggered upon completion.
This is because the dataset alias "example-alias" is used to add a dataset
event to the dataset "s3://bucket/my-task"
during the "produce_dataset_events_through_dataset_alias" task.
As the DAG "dataset-alias-consumer" relies on dataset alias "example-alias"
which was previously unresolved,
the DAG "dataset-alias-consumer" (along with all the DAGs in the same file)
will be re-parsed and
-thus update its schedule to the dataset "s3://bucket/my-task" and will be
triggered.
+thus update its schedule to the dataset "s3://bucket/my-task" and will also be
triggered.
"""
from __future__ import annotations
diff --git a/airflow/example_dags/example_dataset_alias_with_no_taskflow.py
b/airflow/example_dags/example_dataset_alias_with_no_taskflow.py
new file mode 100644
index 0000000000..7d7227af39
--- /dev/null
+++ b/airflow/example_dags/example_dataset_alias_with_no_taskflow.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example DAG for demonstrating the behavior of the DatasetAlias feature in
Airflow, including conditional and
+dataset expression-based scheduling.
+
+Notes on usage:
+
+Turn on all the DAGs.
+
+Before running any DAG, the schedule of the
"dataset_alias_example_alias_consumer_with_no_taskflow" DAG will show as
"unresolved DatasetAlias".
+This is expected because the dataset alias has not been resolved into any
dataset yet.
+
+Once the "dataset_s3_bucket_producer_with_no_taskflow" DAG is triggered, the
"dataset_s3_bucket_consumer_with_no_taskflow" DAG should be triggered upon
completion.
+This is because the dataset alias "example-alias-no-taskflow" is used to add a
dataset event to the dataset "s3://bucket/my-task-with-no-taskflow"
+during the "produce_dataset_events_through_dataset_alias_with_no_taskflow"
task. Also, the schedule of the
"dataset_alias_example_alias_consumer_with_no_taskflow" DAG should change to
"Dataset" as
+the dataset alias "example-alias-no-taskflow" is now resolved to the dataset
"s3://bucket/my-task-with-no-taskflow" and this DAG should also be triggered.
+"""
+
+from __future__ import annotations
+
+import pendulum
+
+from airflow import DAG
+from airflow.datasets import Dataset, DatasetAlias
+from airflow.operators.python import PythonOperator
+
+with DAG(
+ dag_id="dataset_s3_bucket_producer_with_no_taskflow",
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+ schedule=None,
+ catchup=False,
+ tags=["producer", "dataset"],
+):
+
+ def produce_dataset_events():
+ pass
+
+ PythonOperator(
+ task_id="produce_dataset_events",
+ outlets=[Dataset("s3://bucket/my-task-with-no-taskflow")],
+ python_callable=produce_dataset_events,
+ )
+
+
+with DAG(
+ dag_id="dataset_alias_example_alias_producer_with_no_taskflow",
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+ schedule=None,
+ catchup=False,
+ tags=["producer", "dataset-alias"],
+):
+
+ def produce_dataset_events_through_dataset_alias_with_no_taskflow(*,
outlet_events=None):
+ bucket_name = "bucket"
+ object_path = "my-task"
+
outlet_events["example-alias-no-taskflow"].add(Dataset(f"s3://{bucket_name}/{object_path}"))
+
+ PythonOperator(
+
task_id="produce_dataset_events_through_dataset_alias_with_no_taskflow",
+ outlets=[DatasetAlias("example-alias-no-taskflow")],
+
python_callable=produce_dataset_events_through_dataset_alias_with_no_taskflow,
+ )
+
+with DAG(
+ dag_id="dataset_s3_bucket_consumer_with_no_taskflow",
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+ schedule=[Dataset("s3://bucket/my-task-with-no-taskflow")],
+ catchup=False,
+ tags=["consumer", "dataset"],
+):
+
+ def consume_dataset_event():
+ pass
+
+ PythonOperator(task_id="consume_dataset_event",
python_callable=consume_dataset_event)
+
+with DAG(
+ dag_id="dataset_alias_example_alias_consumer_with_no_taskflow",
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+ schedule=[DatasetAlias("example-alias-no-taskflow")],
+ catchup=False,
+ tags=["consumer", "dataset-alias"],
+):
+
+ def consume_dataset_event_from_dataset_alias(*, inlet_events=None):
+ for event in inlet_events[DatasetAlias("example-alias-no-taskflow")]:
+ print(event)
+
+ PythonOperator(
+ task_id="consume_dataset_event_from_dataset_alias",
+ python_callable=consume_dataset_event_from_dataset_alias,
+ inlets=[DatasetAlias("example-alias-no-taskflow")],
+ )
diff --git a/tests/www/views/test_views_acl.py
b/tests/www/views/test_views_acl.py
index 51bf56adac..1f0dd0d072 100644
--- a/tests/www/views/test_views_acl.py
+++ b/tests/www/views/test_views_acl.py
@@ -254,6 +254,26 @@ def test_dag_autocomplete_success(client_all_dags):
)
expected = [
{"name": "airflow", "type": "owner", "dag_display_name": None},
+ {
+ "dag_display_name": None,
+ "name": "dataset_alias_example_alias_consumer_with_no_taskflow",
+ "type": "dag",
+ },
+ {
+ "dag_display_name": None,
+ "name": "dataset_alias_example_alias_producer_with_no_taskflow",
+ "type": "dag",
+ },
+ {
+ "dag_display_name": None,
+ "name": "dataset_s3_bucket_consumer_with_no_taskflow",
+ "type": "dag",
+ },
+ {
+ "dag_display_name": None,
+ "name": "dataset_s3_bucket_producer_with_no_taskflow",
+ "type": "dag",
+ },
{
"name": "example_dynamic_task_mapping_with_no_taskflow_operators",
"type": "dag",