This is an automated email from the ASF dual-hosted git repository.

taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 66d68db536 Rename "dataset event" in context to use "outlet" (#39397)
66d68db536 is described below

commit 66d68db5361e7f7c99abc9864787226a8fb65deb
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Sun May 5 03:21:27 2024 +0800

    Rename "dataset event" in context to use "outlet" (#39397)
---
 ...vent_extra.py => example_outlet_event_extra.py} |  6 ++---
 airflow/models/baseoperator.py                     |  6 ++---
 airflow/models/taskinstance.py                     | 12 +++++-----
 airflow/operators/python.py                        |  4 ++--
 airflow/serialization/serialized_objects.py        | 10 ++++----
 airflow/utils/context.py                           | 28 +++++++++++++---------
 airflow/utils/context.pyi                          | 10 ++++----
 airflow/utils/operator_helpers.py                  |  8 +++----
 .../authoring-and-scheduling/datasets.rst          |  6 ++---
 docs/apache-airflow/templates-ref.rst              |  4 ++--
 tests/models/test_taskinstance.py                  | 16 ++++++-------
 tests/operators/test_python.py                     |  2 +-
 tests/serialization/test_serialized_objects.py     |  4 ++--
 13 files changed, 61 insertions(+), 55 deletions(-)

diff --git a/airflow/example_dags/example_dataset_event_extra.py 
b/airflow/example_dags/example_outlet_event_extra.py
similarity index 92%
rename from airflow/example_dags/example_dataset_event_extra.py
rename to airflow/example_dags/example_outlet_event_extra.py
index 4ec3b2bcc7..a5a3e23ae2 100644
--- a/airflow/example_dags/example_dataset_event_extra.py
+++ b/airflow/example_dags/example_outlet_event_extra.py
@@ -54,8 +54,8 @@ with DAG(
 ):
 
     @task(outlets=[ds])
-    def dataset_with_extra_by_context(*, dataset_events=None):
-        dataset_events[ds].extra = {"hi": "bye"}
+    def dataset_with_extra_by_context(*, outlet_events=None):
+        outlet_events[ds].extra = {"hi": "bye"}
 
     dataset_with_extra_by_context()
 
@@ -68,7 +68,7 @@ with DAG(
 ):
 
     def _dataset_with_extra_from_classic_operator_post_execute(context):
-        context["dataset_events"].extra = {"hi": "bye"}
+        context["outlet_events"].extra = {"hi": "bye"}
 
     BashOperator(
         task_id="dataset_with_extra_from_classic_operator",
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index ded7d2861e..5fc5e90b43 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -91,7 +91,7 @@ from airflow.ti_deps.deps.not_previously_skipped_dep import 
NotPreviouslySkipped
 from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
 from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
 from airflow.utils import timezone
-from airflow.utils.context import Context, context_get_dataset_events
+from airflow.utils.context import Context, context_get_outlet_events
 from airflow.utils.decorators import fixup_decorator_warning_stack
 from airflow.utils.edgemodifier import EdgeModifier
 from airflow.utils.helpers import validate_key
@@ -1279,7 +1279,7 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
             return
         ExecutionCallableRunner(
             self._pre_execute_hook,
-            context_get_dataset_events(context),
+            context_get_outlet_events(context),
             logger=self.log,
         ).run(context)
 
@@ -1304,7 +1304,7 @@ class BaseOperator(AbstractOperator, 
metaclass=BaseOperatorMeta):
             return
         ExecutionCallableRunner(
             self._post_execute_hook,
-            context_get_dataset_events(context),
+            context_get_outlet_events(context),
             logger=self.log,
         ).run(context, result)
 
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 112b239653..4e095e7743 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -109,10 +109,10 @@ from airflow.utils import timezone
 from airflow.utils.context import (
     ConnectionAccessor,
     Context,
-    DatasetEventAccessors,
     InletEventsAccessors,
+    OutletEventAccessors,
     VariableAccessor,
-    context_get_dataset_events,
+    context_get_outlet_events,
     context_merge,
 )
 from airflow.utils.email import send_email
@@ -440,7 +440,7 @@ def _execute_task(task_instance: TaskInstance | 
TaskInstancePydantic, context: C
 
             return ExecutionCallableRunner(
                 execute_callable,
-                context_get_dataset_events(context),
+                context_get_outlet_events(context),
                 logger=log,
             ).run(context=context, **execute_callable_kwargs)
         except SystemExit as e:
@@ -799,7 +799,7 @@ def _get_template_context(
         "dag_run": dag_run,
         "data_interval_end": timezone.coerce_datetime(data_interval.end),
         "data_interval_start": timezone.coerce_datetime(data_interval.start),
-        "dataset_events": DatasetEventAccessors(),
+        "outlet_events": OutletEventAccessors(),
         "ds": ds,
         "ds_nodash": ds_nodash,
         "execution_date": logical_date,
@@ -2641,7 +2641,7 @@ class TaskInstance(Base, LoggingMixin):
                 session.add(Log(self.state, self))
                 session.merge(self).task = self.task
                 if self.state == TaskInstanceState.SUCCESS:
-                    
self._register_dataset_changes(events=context["dataset_events"], 
session=session)
+                    
self._register_dataset_changes(events=context["outlet_events"], session=session)
 
                 session.commit()
                 if self.state == TaskInstanceState.SUCCESS:
@@ -2651,7 +2651,7 @@ class TaskInstance(Base, LoggingMixin):
 
             return None
 
-    def _register_dataset_changes(self, *, events: DatasetEventAccessors, 
session: Session) -> None:
+    def _register_dataset_changes(self, *, events: OutletEventAccessors, 
session: Session) -> None:
         if TYPE_CHECKING:
             assert self.task
 
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 977ef54ecb..554a27a444 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -52,7 +52,7 @@ from airflow.models.variable import Variable
 from airflow.operators.branch import BranchMixIn
 from airflow.typing_compat import Literal
 from airflow.utils import hashlib_wrapper
-from airflow.utils.context import context_copy_partial, 
context_get_dataset_events, context_merge
+from airflow.utils.context import context_copy_partial, 
context_get_outlet_events, context_merge
 from airflow.utils.file import get_unique_dag_module_name
 from airflow.utils.operator_helpers import ExecutionCallableRunner, 
KeywordParameters
 from airflow.utils.process_utils import execute_in_subprocess
@@ -233,7 +233,7 @@ class PythonOperator(BaseOperator):
     def execute(self, context: Context) -> Any:
         context_merge(context, self.op_kwargs, 
templates_dict=self.templates_dict)
         self.op_kwargs = self.determine_kwargs(context)
-        self._dataset_events = context_get_dataset_events(context)
+        self._dataset_events = context_get_outlet_events(context)
 
         return_value = self.execute_callable()
         if self.show_return_value_in_logs:
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 5d3088fa4c..6692043766 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -68,7 +68,7 @@ from airflow.task.priority_strategy import (
 )
 from airflow.triggers.base import BaseTrigger
 from airflow.utils.code_utils import get_python_source
-from airflow.utils.context import Context, DatasetEventAccessor, 
DatasetEventAccessors
+from airflow.utils.context import Context, OutletEventAccessor, 
OutletEventAccessors
 from airflow.utils.docs import get_docs_url
 from airflow.utils.helpers import exactly_one
 from airflow.utils.module_loading import import_string, qualname
@@ -536,12 +536,12 @@ class BaseSerialization:
         elif var.__class__.__name__ == "V1Pod" and _has_kubernetes() and 
isinstance(var, k8s.V1Pod):
             json_pod = PodGenerator.serialize_pod(var)
             return cls._encode(json_pod, type_=DAT.POD)
-        elif isinstance(var, DatasetEventAccessors):
+        elif isinstance(var, OutletEventAccessors):
             return cls._encode(
                 cls.serialize(var._dict, strict=strict, 
use_pydantic_models=use_pydantic_models),  # type: ignore[attr-defined]
                 type_=DAT.DATASET_EVENT_ACCESSORS,
             )
-        elif isinstance(var, DatasetEventAccessor):
+        elif isinstance(var, OutletEventAccessor):
             return cls._encode(
                 cls.serialize(var.extra, strict=strict, 
use_pydantic_models=use_pydantic_models),
                 type_=DAT.DATASET_EVENT_ACCESSOR,
@@ -693,11 +693,11 @@ class BaseSerialization:
         elif type_ == DAT.DICT:
             return {k: cls.deserialize(v, use_pydantic_models) for k, v in 
var.items()}
         elif type_ == DAT.DATASET_EVENT_ACCESSORS:
-            d = DatasetEventAccessors()  # type: ignore[assignment]
+            d = OutletEventAccessors()  # type: ignore[assignment]
             d._dict = cls.deserialize(var)  # type: ignore[attr-defined]
             return d
         elif type_ == DAT.DATASET_EVENT_ACCESSOR:
-            return DatasetEventAccessor(extra=cls.deserialize(var))
+            return OutletEventAccessor(extra=cls.deserialize(var))
         elif type_ == DAT.DAG:
             return SerializedDAG.deserialize_dag(var)
         elif type_ == DAT.OP:
diff --git a/airflow/utils/context.py b/airflow/utils/context.py
index 6af778b7ff..2b7302098d 100644
--- a/airflow/utils/context.py
+++ b/airflow/utils/context.py
@@ -62,7 +62,6 @@ KNOWN_CONTEXT_KEYS: set[str] = {
     "dag_run",
     "data_interval_end",
     "data_interval_start",
-    "dataset_events",
     "ds",
     "ds_nodash",
     "execution_date",
@@ -77,6 +76,7 @@ KNOWN_CONTEXT_KEYS: set[str] = {
     "next_ds_nodash",
     "next_execution_date",
     "outlets",
+    "outlet_events",
     "params",
     "prev_data_interval_start_success",
     "prev_data_interval_end_success",
@@ -157,17 +157,23 @@ class ConnectionAccessor:
 
 
 @attrs.define()
-class DatasetEventAccessor:
-    """Wrapper to access a DatasetEvent instance in template."""
+class OutletEventAccessor:
+    """Wrapper to access an outlet dataset event in template.
+
+    :meta private:
+    """
 
     extra: dict[str, Any]
 
 
-class DatasetEventAccessors(Mapping[str, DatasetEventAccessor]):
-    """Lazy mapping of dataset event accessors."""
+class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+    """Lazy mapping of outlet dataset event accessors.
+
+    :meta private:
+    """
 
     def __init__(self) -> None:
-        self._dict: dict[str, DatasetEventAccessor] = {}
+        self._dict: dict[str, OutletEventAccessor] = {}
 
     def __iter__(self) -> Iterator[str]:
         return iter(self._dict)
@@ -175,9 +181,9 @@ class DatasetEventAccessors(Mapping[str, 
DatasetEventAccessor]):
     def __len__(self) -> int:
         return len(self._dict)
 
-    def __getitem__(self, key: str | Dataset) -> DatasetEventAccessor:
+    def __getitem__(self, key: str | Dataset) -> OutletEventAccessor:
         if (uri := coerce_to_uri(key)) not in self._dict:
-            self._dict[uri] = DatasetEventAccessor({})
+            self._dict[uri] = OutletEventAccessor({})
         return self._dict[uri]
 
 
@@ -448,8 +454,8 @@ def lazy_mapping_from_context(source: Context) -> 
Mapping[str, Any]:
     return {k: _create_value(k, v) for k, v in source._context.items()}
 
 
-def context_get_dataset_events(context: Context) -> DatasetEventAccessors:
+def context_get_outlet_events(context: Context) -> OutletEventAccessors:
     try:
-        return context["dataset_events"]
+        return context["outlet_events"]
     except KeyError:
-        return DatasetEventAccessors()
+        return OutletEventAccessors()
diff --git a/airflow/utils/context.pyi b/airflow/utils/context.pyi
index 9cdc0a3d6f..d358ee5d37 100644
--- a/airflow/utils/context.pyi
+++ b/airflow/utils/context.pyi
@@ -57,14 +57,14 @@ class VariableAccessor:
 class ConnectionAccessor:
     def get(self, key: str, default_conn: Any = None) -> Any: ...
 
-class DatasetEventAccessor:
+class OutletEventAccessor:
     def __init__(self, *, extra: dict[str, Any]) -> None: ...
     extra: dict[str, Any]
 
-class DatasetEventAccessors(Mapping[str, DatasetEventAccessor]):
+class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
     def __iter__(self) -> Iterator[str]: ...
     def __len__(self) -> int: ...
-    def __getitem__(self, key: str | Dataset) -> DatasetEventAccessor: ...
+    def __getitem__(self, key: str | Dataset) -> OutletEventAccessor: ...
 
 class InletEventsAccessor(Sequence[DatasetEvent]):
     @overload
@@ -89,7 +89,7 @@ class Context(TypedDict, total=False):
     dag_run: DagRun | DagRunPydantic
     data_interval_end: DateTime
     data_interval_start: DateTime
-    dataset_events: DatasetEventAccessors
+    outlet_events: OutletEventAccessors
     ds: str
     ds_nodash: str
     exception: BaseException | str | None
@@ -143,4 +143,4 @@ def context_merge(context: Context, **kwargs: Any) -> None: 
...
 def context_update_for_unmapped(context: Context, task: BaseOperator) -> None: 
...
 def context_copy_partial(source: Context, keys: Container[str]) -> Context: ...
 def lazy_mapping_from_context(source: Context) -> Mapping[str, Any]: ...
-def context_get_dataset_events(context: Context) -> DatasetEventAccessors: ...
+def context_get_outlet_events(context: Context) -> OutletEventAccessors: ...
diff --git a/airflow/utils/operator_helpers.py 
b/airflow/utils/operator_helpers.py
index e92078ac30..7f06d65c84 100644
--- a/airflow/utils/operator_helpers.py
+++ b/airflow/utils/operator_helpers.py
@@ -25,7 +25,7 @@ from airflow import settings
 from airflow.utils.context import Context, lazy_mapping_from_context
 
 if TYPE_CHECKING:
-    from airflow.utils.context import DatasetEventAccessors
+    from airflow.utils.context import OutletEventAccessors
 
 R = TypeVar("R")
 
@@ -232,12 +232,12 @@ class ExecutionCallableRunner:
     def __init__(
         self,
         func: Callable,
-        dataset_events: DatasetEventAccessors,
+        outlet_events: OutletEventAccessors,
         *,
         logger: logging.Logger | None,
     ) -> None:
         self.func = func
-        self.dataset_events = dataset_events
+        self.outlet_events = outlet_events
         self.logger = logger or logging.getLogger(__name__)
 
     def run(self, *args, **kwargs) -> Any:
@@ -257,7 +257,7 @@ class ExecutionCallableRunner:
 
         for metadata in _run():
             if isinstance(metadata, Metadata):
-                self.dataset_events[metadata.uri].extra.update(metadata.extra)
+                self.outlet_events[metadata.uri].extra.update(metadata.extra)
                 continue
             self.logger.warning("Ignoring unknown data of %r received from 
task", type(metadata))
             if self.logger.isEnabledFor(logging.DEBUG):
diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst 
b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
index 87c31ce96f..c08b5ea0d8 100644
--- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
@@ -251,13 +251,13 @@ Airflow automatically collects all yielded metadata, and 
populates dataset event
 
 This can also be done in classic operators. The best way is to subclass the 
operator and override ``execute``. Alternatively, extras can also be added in a 
task's ``pre_execute`` or ``post_execute`` hook. If you choose to use hooks, 
however, remember that they are not rerun when a task is retried, and may cause 
the extra information to not match actual data in certain scenarios.
 
-Another way to achieve the same is by accessing ``dataset_events`` in a task's 
execution context directly:
+Another way to achieve the same is by accessing ``outlet_events`` in a task's 
execution context directly:
 
 .. code-block:: python
 
     @task(outlets=[example_s3_dataset])
-    def write_to_s3(*, dataset_events):
-        dataset_events[example_s3_dataset].extras = {"row_count": len(df)}
+    def write_to_s3(*, outlet_events):
+        outlet_events[example_s3_dataset].extras = {"row_count": len(df)}
 
 There's minimal magic here---Airflow simply writes the yielded values to the 
exact same accessor. This also works in classic operators, including 
``execute``, ``pre_execute``, and ``post_execute``.
 
diff --git a/docs/apache-airflow/templates-ref.rst 
b/docs/apache-airflow/templates-ref.rst
index e44f1121a4..05d4b10acc 100644
--- a/docs/apache-airflow/templates-ref.rst
+++ b/docs/apache-airflow/templates-ref.rst
@@ -64,6 +64,8 @@ Variable                                    Type              
    Description
 ``{{ inlets }}``                            list                  List of 
inlets declared on the task.
 ``{{ inlet_events }}``                      dict[str, ...]        Access past 
events of inlet datasets. See :doc:`Datasets 
<authoring-and-scheduling/datasets>`. Added in version 2.10.
 ``{{ outlets }}``                           list                  List of 
outlets declared on the task.
+``{{ outlet_events }}``                     dict[str, ...]        | Accessors 
to attach information to dataset events that will be emitted by the current 
task.
+                                                                  | See 
:doc:`Datasets <authoring-and-scheduling/datasets>`. Added in version 2.10.
 ``{{ dag }}``                               DAG                   The 
currently running :class:`~airflow.models.dag.DAG`. You can read more about 
DAGs in :doc:`DAGs <core-concepts/dags>`.
 ``{{ task }}``                              BaseOperator          | The 
currently running :class:`~airflow.models.baseoperator.BaseOperator`. You can 
read more about Tasks in :doc:`core-concepts/operators`
 ``{{ macros }}``                                                  | A 
reference to the macros package. See Macros_ below.
@@ -75,8 +77,6 @@ Variable                                    Type              
    Description
 ``{{ var.value }}``                                               Airflow 
variables. See `Airflow Variables in Templates`_ below.
 ``{{ var.json }}``                                                Airflow 
variables. See `Airflow Variables in Templates`_ below.
 ``{{ conn }}``                                                    Airflow 
connections. See `Airflow Connections in Templates`_ below.
-``{{ dataset_events }}``                    dict[str, ...]        | Accessors 
to attach information to dataset events that will be emitted by the current 
task.
-                                                                  | See 
:doc:`Datasets <authoring-and-scheduling/datasets>`. Added in version 2.10.
 ``{{ task_instance_key_str }}``             str                   | A unique, 
human-readable key to the task instance. The format is
                                                                   | 
``{dag_id}__{task_id}__{ds_nodash}``.
 ``{{ conf }}``                              AirflowConfigParser   | The full 
configuration object representing the content of your
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 0635a5f053..8afc1abaa1 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2321,13 +2321,13 @@ class TestTaskInstance:
         with dag_maker(schedule=None, session=session) as dag:
 
             @task(outlets=Dataset("test_outlet_dataset_extra_1"))
-            def write1(*, dataset_events):
-                dataset_events["test_outlet_dataset_extra_1"].extra = {"foo": 
"bar"}
+            def write1(*, outlet_events):
+                outlet_events["test_outlet_dataset_extra_1"].extra = {"foo": 
"bar"}
 
             write1()
 
             def _write2_post_execute(context, _):
-                context["dataset_events"]["test_outlet_dataset_extra_2"].extra 
= {"x": 1}
+                context["outlet_events"]["test_outlet_dataset_extra_2"].extra 
= {"x": 1}
 
             BashOperator(
                 task_id="write2",
@@ -2362,9 +2362,9 @@ class TestTaskInstance:
         with dag_maker(schedule=None, session=session):
 
             @task(outlets=Dataset("test_outlet_dataset_extra"))
-            def write(*, dataset_events):
-                dataset_events["test_outlet_dataset_extra"].extra = {"one": 1}
-                dataset_events["different_uri"].extra = {"foo": "bar"}  # Will 
be silently dropped.
+            def write(*, outlet_events):
+                outlet_events["test_outlet_dataset_extra"].extra = {"one": 1}
+                outlet_events["different_uri"].extra = {"foo": "bar"}  # Will 
be silently dropped.
 
             write()
 
@@ -2434,8 +2434,8 @@ class TestTaskInstance:
         with dag_maker(schedule=None, session=session):
 
             @task(outlets=Dataset("test_inlet_dataset_extra"))
-            def write(*, ti, dataset_events):
-                dataset_events["test_inlet_dataset_extra"].extra = {"from": 
ti.task_id}
+            def write(*, ti, outlet_events):
+                outlet_events["test_inlet_dataset_extra"].extra = {"from": 
ti.task_id}
 
             @task(inlets=Dataset("test_inlet_dataset_extra"))
             def read(*, inlet_events):
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 2ab71be86e..3d7b23c415 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -849,8 +849,8 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
             "ti",
             "var",  # Accessor for Variable; var->json and var->value.
             "conn",  # Accessor for Connection.
-            "dataset_events",  # Accessor for outlet DatasetEvent.
             "inlet_events",  # Accessor for inlet DatasetEvent.
+            "outlet_events",  # Accessor for outlet DatasetEvent.
         ]
 
         ti = create_task_instance(dag_id=self.dag_id, task_id=self.task_id, 
schedule=None)
diff --git a/tests/serialization/test_serialized_objects.py 
b/tests/serialization/test_serialized_objects.py
index a27812615d..9e14cb35fe 100644
--- a/tests/serialization/test_serialized_objects.py
+++ b/tests/serialization/test_serialized_objects.py
@@ -53,7 +53,7 @@ from airflow.serialization.serialized_objects import 
BaseSerialization
 from airflow.settings import _ENABLE_AIP_44
 from airflow.triggers.base import BaseTrigger
 from airflow.utils import timezone
-from airflow.utils.context import DatasetEventAccessors
+from airflow.utils.context import OutletEventAccessors
 from airflow.utils.operator_resources import Resources
 from airflow.utils.pydantic import BaseModel
 from airflow.utils.state import DagRunState, State
@@ -421,7 +421,7 @@ def test_serialized_mapped_operator_unmap(dag_maker):
 
 def test_ser_of_dataset_event_accessor():
     # todo: (Airflow 3.0) we should force reserialization on upgrade
-    d = DatasetEventAccessors()
+    d = OutletEventAccessors()
     d["hi"].extra = "blah1"  # todo: this should maybe be forbidden?  i.e. can 
extra be any json or just dict?
     d["yo"].extra = {"this": "that", "the": "other"}
     ser = BaseSerialization.serialize(var=d)

Reply via email to