This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 00b90fa776 Add example DAG to demostrate emitting approaches (#38821)
00b90fa776 is described below

commit 00b90fa776154b76aec7f3c8075f53ac6d30d2e8
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Fri Apr 19 09:34:08 2024 +0800

    Add example DAG to demostrate emitting approaches (#38821)
---
 .../example_dags/example_dataset_event_extra.py    | 78 ++++++++++++++++++++++
 tests/serialization/test_dag_serialization.py      |  3 +
 2 files changed, 81 insertions(+)

diff --git a/airflow/example_dags/example_dataset_event_extra.py 
b/airflow/example_dags/example_dataset_event_extra.py
new file mode 100644
index 0000000000..4ec3b2bcc7
--- /dev/null
+++ b/airflow/example_dags/example_dataset_event_extra.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example DAG to demonstrate annotating a dataset event with extra information.
+"""
+
+from __future__ import annotations
+
+import datetime
+
+from airflow.datasets import Dataset
+from airflow.datasets.metadata import Metadata
+from airflow.decorators import task
+from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
+
+ds = Dataset("s3://output/1.txt")
+
+with DAG(
+    dag_id="dataset_with_extra_by_yield",
+    catchup=False,
+    start_date=datetime.datetime.min,
+    schedule="@daily",
+    tags=["produces"],
+):
+
+    @task(outlets=[ds])
+    def dataset_with_extra_by_yield():
+        yield Metadata(ds, {"hi": "bye"})
+
+    dataset_with_extra_by_yield()
+
+with DAG(
+    dag_id="dataset_with_extra_by_context",
+    catchup=False,
+    start_date=datetime.datetime.min,
+    schedule="@daily",
+    tags=["produces"],
+):
+
+    @task(outlets=[ds])
+    def dataset_with_extra_by_context(*, dataset_events=None):
+        dataset_events[ds].extra = {"hi": "bye"}
+
+    dataset_with_extra_by_context()
+
+with DAG(
+    dag_id="dataset_with_extra_from_classic_operator",
+    catchup=False,
+    start_date=datetime.datetime.min,
+    schedule="@daily",
+    tags=["produces"],
+):
+
+    def _dataset_with_extra_from_classic_operator_post_execute(context):
+        context["dataset_events"].extra = {"hi": "bye"}
+
+    BashOperator(
+        task_id="dataset_with_extra_from_classic_operator",
+        outlets=[ds],
+        bash_command=":",
+        post_execute=_dataset_with_extra_from_classic_operator_post_execute,
+    )
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index 39270f51a9..ed63a565f9 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -640,6 +640,9 @@ class TestStringifiedDAGs:
                 "template_ext",
                 "template_fields",
                 # We store the string, real dag has the actual code
+                "_pre_execute_hook",
+                "_post_execute_hook",
+                "on_execute_callback",
                 "on_failure_callback",
                 "on_success_callback",
                 "on_retry_callback",

Reply via email to