This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 00b90fa776 Add example DAG to demostrate emitting approaches (#38821)
00b90fa776 is described below
commit 00b90fa776154b76aec7f3c8075f53ac6d30d2e8
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Fri Apr 19 09:34:08 2024 +0800
Add example DAG to demostrate emitting approaches (#38821)
---
.../example_dags/example_dataset_event_extra.py | 78 ++++++++++++++++++++++
tests/serialization/test_dag_serialization.py | 3 +
2 files changed, 81 insertions(+)
diff --git a/airflow/example_dags/example_dataset_event_extra.py
b/airflow/example_dags/example_dataset_event_extra.py
new file mode 100644
index 0000000000..4ec3b2bcc7
--- /dev/null
+++ b/airflow/example_dags/example_dataset_event_extra.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example DAG to demonstrate annotating a dataset event with extra information.
+"""
+
+from __future__ import annotations
+
+import datetime
+
+from airflow.datasets import Dataset
+from airflow.datasets.metadata import Metadata
+from airflow.decorators import task
+from airflow.models.dag import DAG
+from airflow.operators.bash import BashOperator
+
+ds = Dataset("s3://output/1.txt")
+
+with DAG(
+ dag_id="dataset_with_extra_by_yield",
+ catchup=False,
+ start_date=datetime.datetime.min,
+ schedule="@daily",
+ tags=["produces"],
+):
+
+ @task(outlets=[ds])
+ def dataset_with_extra_by_yield():
+ yield Metadata(ds, {"hi": "bye"})
+
+ dataset_with_extra_by_yield()
+
+with DAG(
+ dag_id="dataset_with_extra_by_context",
+ catchup=False,
+ start_date=datetime.datetime.min,
+ schedule="@daily",
+ tags=["produces"],
+):
+
+ @task(outlets=[ds])
+ def dataset_with_extra_by_context(*, dataset_events=None):
+ dataset_events[ds].extra = {"hi": "bye"}
+
+ dataset_with_extra_by_context()
+
+with DAG(
+ dag_id="dataset_with_extra_from_classic_operator",
+ catchup=False,
+ start_date=datetime.datetime.min,
+ schedule="@daily",
+ tags=["produces"],
+):
+
+ def _dataset_with_extra_from_classic_operator_post_execute(context):
+ context["dataset_events"].extra = {"hi": "bye"}
+
+ BashOperator(
+ task_id="dataset_with_extra_from_classic_operator",
+ outlets=[ds],
+ bash_command=":",
+ post_execute=_dataset_with_extra_from_classic_operator_post_execute,
+ )
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index 39270f51a9..ed63a565f9 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -640,6 +640,9 @@ class TestStringifiedDAGs:
"template_ext",
"template_fields",
# We store the string, real dag has the actual code
+ "_pre_execute_hook",
+ "_post_execute_hook",
+ "on_execute_callback",
"on_failure_callback",
"on_success_callback",
"on_retry_callback",