This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5fe1d45111 Adding Dataset Alias Example DAG with classic operators 
(#41302)
5fe1d45111 is described below

commit 5fe1d45111177563c4c703d35817e59172f736cc
Author: vatsrahul1001 <[email protected]>
AuthorDate: Thu Aug 8 13:05:20 2024 +0530

    Adding Dataset Alias Example DAG with classic operators (#41302)
---
 airflow/example_dags/example_dataset_alias.py      |   6 +-
 .../example_dataset_alias_with_no_taskflow.py      | 108 +++++++++++++++++++++
 tests/www/views/test_views_acl.py                  |  20 ++++
 3 files changed, 131 insertions(+), 3 deletions(-)

diff --git a/airflow/example_dags/example_dataset_alias.py 
b/airflow/example_dags/example_dataset_alias.py
index 37d368ccb1..c50a89e34f 100644
--- a/airflow/example_dags/example_dataset_alias.py
+++ b/airflow/example_dags/example_dataset_alias.py
@@ -22,15 +22,15 @@ Notes on usage:
 
 Turn on all the DAGs.
 
-Before running any DAG, the schedule of the "dataset-alias-consumer" DAG will 
show as "Unresolved DatasetAlias".
+Before running any DAG, the schedule of the 
"dataset_alias_example_alias_consumer" DAG will show as "Unresolved 
DatasetAlias".
 This is expected because the dataset alias has not been resolved into any 
dataset yet.
 
-Once the "dataset-alias-producer" DAG is triggered, the "dataset-consumer" DAG 
should be triggered upon completion.
+Once the "dataset_s3_bucket_producer" DAG is triggered, the 
"dataset_s3_bucket_consumer" DAG should be triggered upon completion.
 This is because the dataset alias "example-alias" is used to add a dataset 
event to the dataset "s3://bucket/my-task"
 during the "produce_dataset_events_through_dataset_alias" task.
 As the DAG "dataset-alias-consumer" relies on dataset alias "example-alias" 
which was previously unresolved,
 the DAG "dataset-alias-consumer" (along with all the DAGs in the same file) 
will be re-parsed and
-thus update its schedule to the dataset "s3://bucket/my-task" and will be 
triggered.
+thus update its schedule to the dataset "s3://bucket/my-task" and will also be 
triggered.
 """
 
 from __future__ import annotations
diff --git a/airflow/example_dags/example_dataset_alias_with_no_taskflow.py 
b/airflow/example_dags/example_dataset_alias_with_no_taskflow.py
new file mode 100644
index 0000000000..7d7227af39
--- /dev/null
+++ b/airflow/example_dags/example_dataset_alias_with_no_taskflow.py
@@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example DAG for demonstrating the behavior of the DatasetAlias feature in 
Airflow, including conditional and
+dataset expression-based scheduling.
+
+Notes on usage:
+
+Turn on all the DAGs.
+
+Before running any DAG, the schedule of the 
"dataset_alias_example_alias_consumer_with_no_taskflow" DAG will show as 
"unresolved DatasetAlias".
+This is expected because the dataset alias has not been resolved into any 
dataset yet.
+
+Once the "dataset_s3_bucket_producer_with_no_taskflow" DAG is triggered, the 
"dataset_s3_bucket_consumer_with_no_taskflow" DAG should be triggered upon 
completion.
+This is because the dataset alias "example-alias-no-taskflow" is used to add a 
dataset event to the dataset "s3://bucket/my-task-with-no-taskflow"
+during the "produce_dataset_events_through_dataset_alias_with_no_taskflow" 
task. Also, the schedule of the 
"dataset_alias_example_alias_consumer_with_no_taskflow" DAG should change to 
"Dataset" as
+the dataset alias "example-alias-no-taskflow" is now resolved to the dataset 
"s3://bucket/my-task-with-no-taskflow" and this DAG should also be triggered.
+"""
+
+from __future__ import annotations
+
+import pendulum
+
+from airflow import DAG
+from airflow.datasets import Dataset, DatasetAlias
+from airflow.operators.python import PythonOperator
+
+with DAG(
+    dag_id="dataset_s3_bucket_producer_with_no_taskflow",
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    schedule=None,
+    catchup=False,
+    tags=["producer", "dataset"],
+):
+
+    def produce_dataset_events():
+        pass
+
+    PythonOperator(
+        task_id="produce_dataset_events",
+        outlets=[Dataset("s3://bucket/my-task-with-no-taskflow")],
+        python_callable=produce_dataset_events,
+    )
+
+
+with DAG(
+    dag_id="dataset_alias_example_alias_producer_with_no_taskflow",
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    schedule=None,
+    catchup=False,
+    tags=["producer", "dataset-alias"],
+):
+
+    def produce_dataset_events_through_dataset_alias_with_no_taskflow(*, 
outlet_events=None):
+        bucket_name = "bucket"
+        object_path = "my-task"
+        
outlet_events["example-alias-no-taskflow"].add(Dataset(f"s3://{bucket_name}/{object_path}"))
+
+    PythonOperator(
+        
task_id="produce_dataset_events_through_dataset_alias_with_no_taskflow",
+        outlets=[DatasetAlias("example-alias-no-taskflow")],
+        
python_callable=produce_dataset_events_through_dataset_alias_with_no_taskflow,
+    )
+
+with DAG(
+    dag_id="dataset_s3_bucket_consumer_with_no_taskflow",
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    schedule=[Dataset("s3://bucket/my-task-with-no-taskflow")],
+    catchup=False,
+    tags=["consumer", "dataset"],
+):
+
+    def consume_dataset_event():
+        pass
+
+    PythonOperator(task_id="consume_dataset_event", 
python_callable=consume_dataset_event)
+
+with DAG(
+    dag_id="dataset_alias_example_alias_consumer_with_no_taskflow",
+    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+    schedule=[DatasetAlias("example-alias-no-taskflow")],
+    catchup=False,
+    tags=["consumer", "dataset-alias"],
+):
+
+    def consume_dataset_event_from_dataset_alias(*, inlet_events=None):
+        for event in inlet_events[DatasetAlias("example-alias-no-taskflow")]:
+            print(event)
+
+    PythonOperator(
+        task_id="consume_dataset_event_from_dataset_alias",
+        python_callable=consume_dataset_event_from_dataset_alias,
+        inlets=[DatasetAlias("example-alias-no-taskflow")],
+    )
diff --git a/tests/www/views/test_views_acl.py 
b/tests/www/views/test_views_acl.py
index 51bf56adac..1f0dd0d072 100644
--- a/tests/www/views/test_views_acl.py
+++ b/tests/www/views/test_views_acl.py
@@ -254,6 +254,26 @@ def test_dag_autocomplete_success(client_all_dags):
     )
     expected = [
         {"name": "airflow", "type": "owner", "dag_display_name": None},
+        {
+            "dag_display_name": None,
+            "name": "dataset_alias_example_alias_consumer_with_no_taskflow",
+            "type": "dag",
+        },
+        {
+            "dag_display_name": None,
+            "name": "dataset_alias_example_alias_producer_with_no_taskflow",
+            "type": "dag",
+        },
+        {
+            "dag_display_name": None,
+            "name": "dataset_s3_bucket_consumer_with_no_taskflow",
+            "type": "dag",
+        },
+        {
+            "dag_display_name": None,
+            "name": "dataset_s3_bucket_producer_with_no_taskflow",
+            "type": "dag",
+        },
         {
             "name": "example_dynamic_task_mapping_with_no_taskflow_operators",
             "type": "dag",

Reply via email to