This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e979eccf63 Shorten base dataset protocol name (#39320)
e979eccf63 is described below

commit e979eccf63bcc3757681a17376d101a62e7d87cf
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue Apr 30 16:46:02 2024 +0800

    Shorten base dataset protocol name (#39320)
---
 airflow/datasets/__init__.py   | 26 +++++++++++++-------------
 airflow/models/dag.py          | 12 ++++++------
 airflow/timetables/datasets.py |  8 ++++----
 tests/datasets/test_dataset.py |  4 ++--
 4 files changed, 25 insertions(+), 25 deletions(-)

diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py
index 3364185225..37720314d3 100644
--- a/airflow/datasets/__init__.py
+++ b/airflow/datasets/__init__.py
@@ -105,19 +105,19 @@ def coerce_to_uri(value: str | Dataset) -> str:
     return _sanitize_uri(str(value))
 
 
-class BaseDatasetEventInput:
+class BaseDataset:
     """Protocol for all dataset triggers to use in ``DAG(schedule=...)``.
 
     :meta private:
     """
 
-    def __or__(self, other: BaseDatasetEventInput) -> DatasetAny:
-        if not isinstance(other, BaseDatasetEventInput):
+    def __or__(self, other: BaseDataset) -> DatasetAny:
+        if not isinstance(other, BaseDataset):
             return NotImplemented
         return DatasetAny(self, other)
 
-    def __and__(self, other: BaseDatasetEventInput) -> DatasetAll:
-        if not isinstance(other, BaseDatasetEventInput):
+    def __and__(self, other: BaseDataset) -> DatasetAll:
+        if not isinstance(other, BaseDataset):
             return NotImplemented
         return DatasetAll(self, other)
 
@@ -139,7 +139,7 @@ class BaseDatasetEventInput:
 
 
 @attr.define()
-class Dataset(os.PathLike, BaseDatasetEventInput):
+class Dataset(os.PathLike, BaseDataset):
     """A representation of data dependencies between workflows."""
 
     uri: str = attr.field(
@@ -175,13 +175,13 @@ class Dataset(os.PathLike, BaseDatasetEventInput):
         return statuses.get(self.uri, False)
 
 
-class _DatasetBooleanCondition(BaseDatasetEventInput):
+class _DatasetBooleanCondition(BaseDataset):
     """Base class for dataset boolean logic."""
 
     agg_func: Callable[[Iterable], bool]
 
-    def __init__(self, *objects: BaseDatasetEventInput) -> None:
-        if not all(isinstance(o, BaseDatasetEventInput) for o in objects):
+    def __init__(self, *objects: BaseDataset) -> None:
+        if not all(isinstance(o, BaseDataset) for o in objects):
             raise TypeError("expect dataset expressions in condition")
         self.objects = objects
 
@@ -203,8 +203,8 @@ class DatasetAny(_DatasetBooleanCondition):
 
     agg_func = any
 
-    def __or__(self, other: BaseDatasetEventInput) -> DatasetAny:
-        if not isinstance(other, BaseDatasetEventInput):
+    def __or__(self, other: BaseDataset) -> DatasetAny:
+        if not isinstance(other, BaseDataset):
             return NotImplemented
         # Optimization: X | (Y | Z) is equivalent to X | Y | Z.
         return DatasetAny(*self.objects, other)
@@ -225,8 +225,8 @@ class DatasetAll(_DatasetBooleanCondition):
 
     agg_func = all
 
-    def __and__(self, other: BaseDatasetEventInput) -> DatasetAll:
-        if not isinstance(other, BaseDatasetEventInput):
+    def __and__(self, other: BaseDataset) -> DatasetAll:
+        if not isinstance(other, BaseDataset):
             return NotImplemented
         # Optimization: X & (Y & Z) is equivalent to X & Y & Z.
         return DatasetAll(*self.objects, other)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 4e94432aef..111757b016 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -81,7 +81,7 @@ import airflow.templates
 from airflow import settings, utils
 from airflow.api_internal.internal_api_call import internal_api_call
 from airflow.configuration import conf as airflow_conf, secrets_backend_list
-from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll
+from airflow.datasets import BaseDataset, Dataset, DatasetAll
 from airflow.datasets.manager import dataset_manager
 from airflow.exceptions import (
     AirflowDagInconsistent,
@@ -177,7 +177,7 @@ ScheduleInterval = Union[None, str, timedelta, 
relativedelta]
 # but Mypy cannot handle that right now. Track progress of PEP 661 for 
progress.
 # See also: https://discuss.python.org/t/9126/7
 ScheduleIntervalArg = Union[ArgNotSet, ScheduleInterval]
-ScheduleArg = Union[ArgNotSet, ScheduleInterval, Timetable, 
BaseDatasetEventInput, Collection["Dataset"]]
+ScheduleArg = Union[ArgNotSet, ScheduleInterval, Timetable, BaseDataset, 
Collection["Dataset"]]
 
 SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"], 
List[TaskInstance]], None]
 
@@ -633,8 +633,8 @@ class DAG(LoggingMixin):
 
         self.timetable: Timetable
         self.schedule_interval: ScheduleInterval
-        self.dataset_triggers: BaseDatasetEventInput | None = None
-        if isinstance(schedule, BaseDatasetEventInput):
+        self.dataset_triggers: BaseDataset | None = None
+        if isinstance(schedule, BaseDataset):
             self.dataset_triggers = schedule
         elif isinstance(schedule, Collection) and not isinstance(schedule, 
str):
             if not all(isinstance(x, Dataset) for x in schedule):
@@ -642,7 +642,7 @@ class DAG(LoggingMixin):
             self.dataset_triggers = DatasetAll(*schedule)
         elif isinstance(schedule, Timetable):
             timetable = schedule
-        elif schedule is not NOTSET and not isinstance(schedule, 
BaseDatasetEventInput):
+        elif schedule is not NOTSET and not isinstance(schedule, BaseDataset):
             schedule_interval = schedule
 
         if isinstance(schedule, DatasetOrTimeSchedule):
@@ -3864,7 +3864,7 @@ class DagModel(Base):
         """
         from airflow.models.serialized_dag import SerializedDagModel
 
-        def dag_ready(dag_id: str, cond: BaseDatasetEventInput, statuses: 
dict) -> bool | None:
+        def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool 
| None:
             # if dag was serialized before 2.9 and we *just* upgraded,
             # we may be dealing with old version.  In that case,
             # just wait for the dag to be reserialized.
diff --git a/airflow/timetables/datasets.py b/airflow/timetables/datasets.py
index 30b268fdb9..a8f4a7f22f 100644
--- a/airflow/timetables/datasets.py
+++ b/airflow/timetables/datasets.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 import typing
 
-from airflow.datasets import BaseDatasetEventInput, DatasetAll
+from airflow.datasets import BaseDataset, DatasetAll
 from airflow.exceptions import AirflowTimetableInvalid
 from airflow.timetables.simple import DatasetTriggeredTimetable as 
DatasetTriggeredSchedule
 from airflow.utils.types import DagRunType
@@ -40,10 +40,10 @@ class DatasetOrTimeSchedule(DatasetTriggeredSchedule):
         self,
         *,
         timetable: Timetable,
-        datasets: Collection[Dataset] | BaseDatasetEventInput,
+        datasets: Collection[Dataset] | BaseDataset,
     ) -> None:
         self.timetable = timetable
-        if isinstance(datasets, BaseDatasetEventInput):
+        if isinstance(datasets, BaseDataset):
             self.datasets = datasets
         else:
             self.datasets = DatasetAll(*datasets)
@@ -73,7 +73,7 @@ class DatasetOrTimeSchedule(DatasetTriggeredSchedule):
     def validate(self) -> None:
         if isinstance(self.timetable, DatasetTriggeredSchedule):
             raise AirflowTimetableInvalid("cannot nest dataset timetables")
-        if not isinstance(self.datasets, BaseDatasetEventInput):
+        if not isinstance(self.datasets, BaseDataset):
             raise AirflowTimetableInvalid("all elements in 'datasets' must be 
datasets")
 
     @property
diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py
index c1d68c0933..31aebff9fa 100644
--- a/tests/datasets/test_dataset.py
+++ b/tests/datasets/test_dataset.py
@@ -24,7 +24,7 @@ from typing import Callable
 import pytest
 from sqlalchemy.sql import select
 
-from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll, 
DatasetAny
+from airflow.datasets import BaseDataset, Dataset, DatasetAll, DatasetAny
 from airflow.models.dataset import DatasetDagRunQueue, DatasetModel
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.operators.empty import EmptyOperator
@@ -346,7 +346,7 @@ def test_dag_with_complex_dataset_triggers(session, 
dag_maker):
     ), "Serialized 'dataset_triggers' should be a dict"
 
 
-def datasets_equal(d1: BaseDatasetEventInput, d2: BaseDatasetEventInput) -> 
bool:
+def datasets_equal(d1: BaseDataset, d2: BaseDataset) -> bool:
     if type(d1) != type(d2):
         return False
 

Reply via email to