This is an automated email from the ASF dual-hosted git repository.
taragolis pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 66d68db536 Rename "dataset event" in context to use "outlet" (#39397)
66d68db536 is described below
commit 66d68db5361e7f7c99abc9864787226a8fb65deb
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Sun May 5 03:21:27 2024 +0800
Rename "dataset event" in context to use "outlet" (#39397)
---
...vent_extra.py => example_outlet_event_extra.py} | 6 ++---
airflow/models/baseoperator.py | 6 ++---
airflow/models/taskinstance.py | 12 +++++-----
airflow/operators/python.py | 4 ++--
airflow/serialization/serialized_objects.py | 10 ++++----
airflow/utils/context.py | 28 +++++++++++++---------
airflow/utils/context.pyi | 10 ++++----
airflow/utils/operator_helpers.py | 8 +++----
.../authoring-and-scheduling/datasets.rst | 6 ++---
docs/apache-airflow/templates-ref.rst | 4 ++--
tests/models/test_taskinstance.py | 16 ++++++-------
tests/operators/test_python.py | 2 +-
tests/serialization/test_serialized_objects.py | 4 ++--
13 files changed, 61 insertions(+), 55 deletions(-)
diff --git a/airflow/example_dags/example_dataset_event_extra.py
b/airflow/example_dags/example_outlet_event_extra.py
similarity index 92%
rename from airflow/example_dags/example_dataset_event_extra.py
rename to airflow/example_dags/example_outlet_event_extra.py
index 4ec3b2bcc7..a5a3e23ae2 100644
--- a/airflow/example_dags/example_dataset_event_extra.py
+++ b/airflow/example_dags/example_outlet_event_extra.py
@@ -54,8 +54,8 @@ with DAG(
):
@task(outlets=[ds])
- def dataset_with_extra_by_context(*, dataset_events=None):
- dataset_events[ds].extra = {"hi": "bye"}
+ def dataset_with_extra_by_context(*, outlet_events=None):
+ outlet_events[ds].extra = {"hi": "bye"}
dataset_with_extra_by_context()
@@ -68,7 +68,7 @@ with DAG(
):
def _dataset_with_extra_from_classic_operator_post_execute(context):
- context["dataset_events"].extra = {"hi": "bye"}
+ context["outlet_events"].extra = {"hi": "bye"}
BashOperator(
task_id="dataset_with_extra_from_classic_operator",
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index ded7d2861e..5fc5e90b43 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -91,7 +91,7 @@ from airflow.ti_deps.deps.not_previously_skipped_dep import
NotPreviouslySkipped
from airflow.ti_deps.deps.prev_dagrun_dep import PrevDagrunDep
from airflow.ti_deps.deps.trigger_rule_dep import TriggerRuleDep
from airflow.utils import timezone
-from airflow.utils.context import Context, context_get_dataset_events
+from airflow.utils.context import Context, context_get_outlet_events
from airflow.utils.decorators import fixup_decorator_warning_stack
from airflow.utils.edgemodifier import EdgeModifier
from airflow.utils.helpers import validate_key
@@ -1279,7 +1279,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
return
ExecutionCallableRunner(
self._pre_execute_hook,
- context_get_dataset_events(context),
+ context_get_outlet_events(context),
logger=self.log,
).run(context)
@@ -1304,7 +1304,7 @@ class BaseOperator(AbstractOperator,
metaclass=BaseOperatorMeta):
return
ExecutionCallableRunner(
self._post_execute_hook,
- context_get_dataset_events(context),
+ context_get_outlet_events(context),
logger=self.log,
).run(context, result)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 112b239653..4e095e7743 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -109,10 +109,10 @@ from airflow.utils import timezone
from airflow.utils.context import (
ConnectionAccessor,
Context,
- DatasetEventAccessors,
InletEventsAccessors,
+ OutletEventAccessors,
VariableAccessor,
- context_get_dataset_events,
+ context_get_outlet_events,
context_merge,
)
from airflow.utils.email import send_email
@@ -440,7 +440,7 @@ def _execute_task(task_instance: TaskInstance |
TaskInstancePydantic, context: C
return ExecutionCallableRunner(
execute_callable,
- context_get_dataset_events(context),
+ context_get_outlet_events(context),
logger=log,
).run(context=context, **execute_callable_kwargs)
except SystemExit as e:
@@ -799,7 +799,7 @@ def _get_template_context(
"dag_run": dag_run,
"data_interval_end": timezone.coerce_datetime(data_interval.end),
"data_interval_start": timezone.coerce_datetime(data_interval.start),
- "dataset_events": DatasetEventAccessors(),
+ "outlet_events": OutletEventAccessors(),
"ds": ds,
"ds_nodash": ds_nodash,
"execution_date": logical_date,
@@ -2641,7 +2641,7 @@ class TaskInstance(Base, LoggingMixin):
session.add(Log(self.state, self))
session.merge(self).task = self.task
if self.state == TaskInstanceState.SUCCESS:
-
self._register_dataset_changes(events=context["dataset_events"],
session=session)
+
self._register_dataset_changes(events=context["outlet_events"], session=session)
session.commit()
if self.state == TaskInstanceState.SUCCESS:
@@ -2651,7 +2651,7 @@ class TaskInstance(Base, LoggingMixin):
return None
- def _register_dataset_changes(self, *, events: DatasetEventAccessors,
session: Session) -> None:
+ def _register_dataset_changes(self, *, events: OutletEventAccessors,
session: Session) -> None:
if TYPE_CHECKING:
assert self.task
diff --git a/airflow/operators/python.py b/airflow/operators/python.py
index 977ef54ecb..554a27a444 100644
--- a/airflow/operators/python.py
+++ b/airflow/operators/python.py
@@ -52,7 +52,7 @@ from airflow.models.variable import Variable
from airflow.operators.branch import BranchMixIn
from airflow.typing_compat import Literal
from airflow.utils import hashlib_wrapper
-from airflow.utils.context import context_copy_partial,
context_get_dataset_events, context_merge
+from airflow.utils.context import context_copy_partial,
context_get_outlet_events, context_merge
from airflow.utils.file import get_unique_dag_module_name
from airflow.utils.operator_helpers import ExecutionCallableRunner,
KeywordParameters
from airflow.utils.process_utils import execute_in_subprocess
@@ -233,7 +233,7 @@ class PythonOperator(BaseOperator):
def execute(self, context: Context) -> Any:
context_merge(context, self.op_kwargs,
templates_dict=self.templates_dict)
self.op_kwargs = self.determine_kwargs(context)
- self._dataset_events = context_get_dataset_events(context)
+ self._dataset_events = context_get_outlet_events(context)
return_value = self.execute_callable()
if self.show_return_value_in_logs:
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index 5d3088fa4c..6692043766 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -68,7 +68,7 @@ from airflow.task.priority_strategy import (
)
from airflow.triggers.base import BaseTrigger
from airflow.utils.code_utils import get_python_source
-from airflow.utils.context import Context, DatasetEventAccessor,
DatasetEventAccessors
+from airflow.utils.context import Context, OutletEventAccessor,
OutletEventAccessors
from airflow.utils.docs import get_docs_url
from airflow.utils.helpers import exactly_one
from airflow.utils.module_loading import import_string, qualname
@@ -536,12 +536,12 @@ class BaseSerialization:
elif var.__class__.__name__ == "V1Pod" and _has_kubernetes() and
isinstance(var, k8s.V1Pod):
json_pod = PodGenerator.serialize_pod(var)
return cls._encode(json_pod, type_=DAT.POD)
- elif isinstance(var, DatasetEventAccessors):
+ elif isinstance(var, OutletEventAccessors):
return cls._encode(
cls.serialize(var._dict, strict=strict,
use_pydantic_models=use_pydantic_models), # type: ignore[attr-defined]
type_=DAT.DATASET_EVENT_ACCESSORS,
)
- elif isinstance(var, DatasetEventAccessor):
+ elif isinstance(var, OutletEventAccessor):
return cls._encode(
cls.serialize(var.extra, strict=strict,
use_pydantic_models=use_pydantic_models),
type_=DAT.DATASET_EVENT_ACCESSOR,
@@ -693,11 +693,11 @@ class BaseSerialization:
elif type_ == DAT.DICT:
return {k: cls.deserialize(v, use_pydantic_models) for k, v in
var.items()}
elif type_ == DAT.DATASET_EVENT_ACCESSORS:
- d = DatasetEventAccessors() # type: ignore[assignment]
+ d = OutletEventAccessors() # type: ignore[assignment]
d._dict = cls.deserialize(var) # type: ignore[attr-defined]
return d
elif type_ == DAT.DATASET_EVENT_ACCESSOR:
- return DatasetEventAccessor(extra=cls.deserialize(var))
+ return OutletEventAccessor(extra=cls.deserialize(var))
elif type_ == DAT.DAG:
return SerializedDAG.deserialize_dag(var)
elif type_ == DAT.OP:
diff --git a/airflow/utils/context.py b/airflow/utils/context.py
index 6af778b7ff..2b7302098d 100644
--- a/airflow/utils/context.py
+++ b/airflow/utils/context.py
@@ -62,7 +62,6 @@ KNOWN_CONTEXT_KEYS: set[str] = {
"dag_run",
"data_interval_end",
"data_interval_start",
- "dataset_events",
"ds",
"ds_nodash",
"execution_date",
@@ -77,6 +76,7 @@ KNOWN_CONTEXT_KEYS: set[str] = {
"next_ds_nodash",
"next_execution_date",
"outlets",
+ "outlet_events",
"params",
"prev_data_interval_start_success",
"prev_data_interval_end_success",
@@ -157,17 +157,23 @@ class ConnectionAccessor:
@attrs.define()
-class DatasetEventAccessor:
- """Wrapper to access a DatasetEvent instance in template."""
+class OutletEventAccessor:
+ """Wrapper to access an outlet dataset event in template.
+
+ :meta private:
+ """
extra: dict[str, Any]
-class DatasetEventAccessors(Mapping[str, DatasetEventAccessor]):
- """Lazy mapping of dataset event accessors."""
+class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
+ """Lazy mapping of outlet dataset event accessors.
+
+ :meta private:
+ """
def __init__(self) -> None:
- self._dict: dict[str, DatasetEventAccessor] = {}
+ self._dict: dict[str, OutletEventAccessor] = {}
def __iter__(self) -> Iterator[str]:
return iter(self._dict)
@@ -175,9 +181,9 @@ class DatasetEventAccessors(Mapping[str,
DatasetEventAccessor]):
def __len__(self) -> int:
return len(self._dict)
- def __getitem__(self, key: str | Dataset) -> DatasetEventAccessor:
+ def __getitem__(self, key: str | Dataset) -> OutletEventAccessor:
if (uri := coerce_to_uri(key)) not in self._dict:
- self._dict[uri] = DatasetEventAccessor({})
+ self._dict[uri] = OutletEventAccessor({})
return self._dict[uri]
@@ -448,8 +454,8 @@ def lazy_mapping_from_context(source: Context) ->
Mapping[str, Any]:
return {k: _create_value(k, v) for k, v in source._context.items()}
-def context_get_dataset_events(context: Context) -> DatasetEventAccessors:
+def context_get_outlet_events(context: Context) -> OutletEventAccessors:
try:
- return context["dataset_events"]
+ return context["outlet_events"]
except KeyError:
- return DatasetEventAccessors()
+ return OutletEventAccessors()
diff --git a/airflow/utils/context.pyi b/airflow/utils/context.pyi
index 9cdc0a3d6f..d358ee5d37 100644
--- a/airflow/utils/context.pyi
+++ b/airflow/utils/context.pyi
@@ -57,14 +57,14 @@ class VariableAccessor:
class ConnectionAccessor:
def get(self, key: str, default_conn: Any = None) -> Any: ...
-class DatasetEventAccessor:
+class OutletEventAccessor:
def __init__(self, *, extra: dict[str, Any]) -> None: ...
extra: dict[str, Any]
-class DatasetEventAccessors(Mapping[str, DatasetEventAccessor]):
+class OutletEventAccessors(Mapping[str, OutletEventAccessor]):
def __iter__(self) -> Iterator[str]: ...
def __len__(self) -> int: ...
- def __getitem__(self, key: str | Dataset) -> DatasetEventAccessor: ...
+ def __getitem__(self, key: str | Dataset) -> OutletEventAccessor: ...
class InletEventsAccessor(Sequence[DatasetEvent]):
@overload
@@ -89,7 +89,7 @@ class Context(TypedDict, total=False):
dag_run: DagRun | DagRunPydantic
data_interval_end: DateTime
data_interval_start: DateTime
- dataset_events: DatasetEventAccessors
+ outlet_events: OutletEventAccessors
ds: str
ds_nodash: str
exception: BaseException | str | None
@@ -143,4 +143,4 @@ def context_merge(context: Context, **kwargs: Any) -> None:
...
def context_update_for_unmapped(context: Context, task: BaseOperator) -> None:
...
def context_copy_partial(source: Context, keys: Container[str]) -> Context: ...
def lazy_mapping_from_context(source: Context) -> Mapping[str, Any]: ...
-def context_get_dataset_events(context: Context) -> DatasetEventAccessors: ...
+def context_get_outlet_events(context: Context) -> OutletEventAccessors: ...
diff --git a/airflow/utils/operator_helpers.py
b/airflow/utils/operator_helpers.py
index e92078ac30..7f06d65c84 100644
--- a/airflow/utils/operator_helpers.py
+++ b/airflow/utils/operator_helpers.py
@@ -25,7 +25,7 @@ from airflow import settings
from airflow.utils.context import Context, lazy_mapping_from_context
if TYPE_CHECKING:
- from airflow.utils.context import DatasetEventAccessors
+ from airflow.utils.context import OutletEventAccessors
R = TypeVar("R")
@@ -232,12 +232,12 @@ class ExecutionCallableRunner:
def __init__(
self,
func: Callable,
- dataset_events: DatasetEventAccessors,
+ outlet_events: OutletEventAccessors,
*,
logger: logging.Logger | None,
) -> None:
self.func = func
- self.dataset_events = dataset_events
+ self.outlet_events = outlet_events
self.logger = logger or logging.getLogger(__name__)
def run(self, *args, **kwargs) -> Any:
@@ -257,7 +257,7 @@ class ExecutionCallableRunner:
for metadata in _run():
if isinstance(metadata, Metadata):
- self.dataset_events[metadata.uri].extra.update(metadata.extra)
+ self.outlet_events[metadata.uri].extra.update(metadata.extra)
continue
self.logger.warning("Ignoring unknown data of %r received from
task", type(metadata))
if self.logger.isEnabledFor(logging.DEBUG):
diff --git a/docs/apache-airflow/authoring-and-scheduling/datasets.rst
b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
index 87c31ce96f..c08b5ea0d8 100644
--- a/docs/apache-airflow/authoring-and-scheduling/datasets.rst
+++ b/docs/apache-airflow/authoring-and-scheduling/datasets.rst
@@ -251,13 +251,13 @@ Airflow automatically collects all yielded metadata, and
populates dataset event
This can also be done in classic operators. The best way is to subclass the
operator and override ``execute``. Alternatively, extras can also be added in a
task's ``pre_execute`` or ``post_execute`` hook. If you choose to use hooks,
however, remember that they are not rerun when a task is retried, and may cause
the extra information to not match actual data in certain scenarios.
-Another way to achieve the same is by accessing ``dataset_events`` in a task's
execution context directly:
+Another way to achieve the same is by accessing ``outlet_events`` in a task's
execution context directly:
.. code-block:: python
@task(outlets=[example_s3_dataset])
- def write_to_s3(*, dataset_events):
- dataset_events[example_s3_dataset].extras = {"row_count": len(df)}
+ def write_to_s3(*, outlet_events):
+ outlet_events[example_s3_dataset].extras = {"row_count": len(df)}
There's minimal magic here---Airflow simply writes the yielded values to the
exact same accessor. This also works in classic operators, including
``execute``, ``pre_execute``, and ``post_execute``.
diff --git a/docs/apache-airflow/templates-ref.rst
b/docs/apache-airflow/templates-ref.rst
index e44f1121a4..05d4b10acc 100644
--- a/docs/apache-airflow/templates-ref.rst
+++ b/docs/apache-airflow/templates-ref.rst
@@ -64,6 +64,8 @@ Variable Type
Description
``{{ inlets }}`` list List of
inlets declared on the task.
``{{ inlet_events }}`` dict[str, ...] Access past
events of inlet datasets. See :doc:`Datasets
<authoring-and-scheduling/datasets>`. Added in version 2.10.
``{{ outlets }}`` list List of
outlets declared on the task.
+``{{ outlet_events }}`` dict[str, ...] | Accessors
to attach information to dataset events that will be emitted by the current
task.
+ | See
:doc:`Datasets <authoring-and-scheduling/datasets>`. Added in version 2.10.
``{{ dag }}`` DAG The
currently running :class:`~airflow.models.dag.DAG`. You can read more about
DAGs in :doc:`DAGs <core-concepts/dags>`.
``{{ task }}`` BaseOperator | The
currently running :class:`~airflow.models.baseoperator.BaseOperator`. You can
read more about Tasks in :doc:`core-concepts/operators`
``{{ macros }}`` | A
reference to the macros package. See Macros_ below.
@@ -75,8 +77,6 @@ Variable Type
Description
``{{ var.value }}`` Airflow
variables. See `Airflow Variables in Templates`_ below.
``{{ var.json }}`` Airflow
variables. See `Airflow Variables in Templates`_ below.
``{{ conn }}`` Airflow
connections. See `Airflow Connections in Templates`_ below.
-``{{ dataset_events }}`` dict[str, ...] | Accessors
to attach information to dataset events that will be emitted by the current
task.
- | See
:doc:`Datasets <authoring-and-scheduling/datasets>`. Added in version 2.10.
``{{ task_instance_key_str }}`` str | A unique,
human-readable key to the task instance. The format is
|
``{dag_id}__{task_id}__{ds_nodash}``.
``{{ conf }}`` AirflowConfigParser | The full
configuration object representing the content of your
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 0635a5f053..8afc1abaa1 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2321,13 +2321,13 @@ class TestTaskInstance:
with dag_maker(schedule=None, session=session) as dag:
@task(outlets=Dataset("test_outlet_dataset_extra_1"))
- def write1(*, dataset_events):
- dataset_events["test_outlet_dataset_extra_1"].extra = {"foo":
"bar"}
+ def write1(*, outlet_events):
+ outlet_events["test_outlet_dataset_extra_1"].extra = {"foo":
"bar"}
write1()
def _write2_post_execute(context, _):
- context["dataset_events"]["test_outlet_dataset_extra_2"].extra
= {"x": 1}
+ context["outlet_events"]["test_outlet_dataset_extra_2"].extra
= {"x": 1}
BashOperator(
task_id="write2",
@@ -2362,9 +2362,9 @@ class TestTaskInstance:
with dag_maker(schedule=None, session=session):
@task(outlets=Dataset("test_outlet_dataset_extra"))
- def write(*, dataset_events):
- dataset_events["test_outlet_dataset_extra"].extra = {"one": 1}
- dataset_events["different_uri"].extra = {"foo": "bar"} # Will
be silently dropped.
+ def write(*, outlet_events):
+ outlet_events["test_outlet_dataset_extra"].extra = {"one": 1}
+ outlet_events["different_uri"].extra = {"foo": "bar"} # Will
be silently dropped.
write()
@@ -2434,8 +2434,8 @@ class TestTaskInstance:
with dag_maker(schedule=None, session=session):
@task(outlets=Dataset("test_inlet_dataset_extra"))
- def write(*, ti, dataset_events):
- dataset_events["test_inlet_dataset_extra"].extra = {"from":
ti.task_id}
+ def write(*, ti, outlet_events):
+ outlet_events["test_inlet_dataset_extra"].extra = {"from":
ti.task_id}
@task(inlets=Dataset("test_inlet_dataset_extra"))
def read(*, inlet_events):
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 2ab71be86e..3d7b23c415 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -849,8 +849,8 @@ class BaseTestPythonVirtualenvOperator(BasePythonTest):
"ti",
"var", # Accessor for Variable; var->json and var->value.
"conn", # Accessor for Connection.
- "dataset_events", # Accessor for outlet DatasetEvent.
"inlet_events", # Accessor for inlet DatasetEvent.
+ "outlet_events", # Accessor for outlet DatasetEvent.
]
ti = create_task_instance(dag_id=self.dag_id, task_id=self.task_id,
schedule=None)
diff --git a/tests/serialization/test_serialized_objects.py
b/tests/serialization/test_serialized_objects.py
index a27812615d..9e14cb35fe 100644
--- a/tests/serialization/test_serialized_objects.py
+++ b/tests/serialization/test_serialized_objects.py
@@ -53,7 +53,7 @@ from airflow.serialization.serialized_objects import
BaseSerialization
from airflow.settings import _ENABLE_AIP_44
from airflow.triggers.base import BaseTrigger
from airflow.utils import timezone
-from airflow.utils.context import DatasetEventAccessors
+from airflow.utils.context import OutletEventAccessors
from airflow.utils.operator_resources import Resources
from airflow.utils.pydantic import BaseModel
from airflow.utils.state import DagRunState, State
@@ -421,7 +421,7 @@ def test_serialized_mapped_operator_unmap(dag_maker):
def test_ser_of_dataset_event_accessor():
# todo: (Airflow 3.0) we should force reserialization on upgrade
- d = DatasetEventAccessors()
+ d = OutletEventAccessors()
d["hi"].extra = "blah1" # todo: this should maybe be forbidden? i.e. can
extra be any json or just dict?
d["yo"].extra = {"this": "that", "the": "other"}
ser = BaseSerialization.serialize(var=d)