This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e979eccf63 Shorten base dataset protocol name (#39320)
e979eccf63 is described below
commit e979eccf63bcc3757681a17376d101a62e7d87cf
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Tue Apr 30 16:46:02 2024 +0800
Shorten base dataset protocol name (#39320)
---
airflow/datasets/__init__.py | 26 +++++++++++++-------------
airflow/models/dag.py | 12 ++++++------
airflow/timetables/datasets.py | 8 ++++----
tests/datasets/test_dataset.py | 4 ++--
4 files changed, 25 insertions(+), 25 deletions(-)
diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py
index 3364185225..37720314d3 100644
--- a/airflow/datasets/__init__.py
+++ b/airflow/datasets/__init__.py
@@ -105,19 +105,19 @@ def coerce_to_uri(value: str | Dataset) -> str:
return _sanitize_uri(str(value))
-class BaseDatasetEventInput:
+class BaseDataset:
"""Protocol for all dataset triggers to use in ``DAG(schedule=...)``.
:meta private:
"""
- def __or__(self, other: BaseDatasetEventInput) -> DatasetAny:
- if not isinstance(other, BaseDatasetEventInput):
+ def __or__(self, other: BaseDataset) -> DatasetAny:
+ if not isinstance(other, BaseDataset):
return NotImplemented
return DatasetAny(self, other)
- def __and__(self, other: BaseDatasetEventInput) -> DatasetAll:
- if not isinstance(other, BaseDatasetEventInput):
+ def __and__(self, other: BaseDataset) -> DatasetAll:
+ if not isinstance(other, BaseDataset):
return NotImplemented
return DatasetAll(self, other)
@@ -139,7 +139,7 @@ class BaseDatasetEventInput:
@attr.define()
-class Dataset(os.PathLike, BaseDatasetEventInput):
+class Dataset(os.PathLike, BaseDataset):
"""A representation of data dependencies between workflows."""
uri: str = attr.field(
@@ -175,13 +175,13 @@ class Dataset(os.PathLike, BaseDatasetEventInput):
return statuses.get(self.uri, False)
-class _DatasetBooleanCondition(BaseDatasetEventInput):
+class _DatasetBooleanCondition(BaseDataset):
"""Base class for dataset boolean logic."""
agg_func: Callable[[Iterable], bool]
- def __init__(self, *objects: BaseDatasetEventInput) -> None:
- if not all(isinstance(o, BaseDatasetEventInput) for o in objects):
+ def __init__(self, *objects: BaseDataset) -> None:
+ if not all(isinstance(o, BaseDataset) for o in objects):
raise TypeError("expect dataset expressions in condition")
self.objects = objects
@@ -203,8 +203,8 @@ class DatasetAny(_DatasetBooleanCondition):
agg_func = any
- def __or__(self, other: BaseDatasetEventInput) -> DatasetAny:
- if not isinstance(other, BaseDatasetEventInput):
+ def __or__(self, other: BaseDataset) -> DatasetAny:
+ if not isinstance(other, BaseDataset):
return NotImplemented
# Optimization: X | (Y | Z) is equivalent to X | Y | Z.
return DatasetAny(*self.objects, other)
@@ -225,8 +225,8 @@ class DatasetAll(_DatasetBooleanCondition):
agg_func = all
- def __and__(self, other: BaseDatasetEventInput) -> DatasetAll:
- if not isinstance(other, BaseDatasetEventInput):
+ def __and__(self, other: BaseDataset) -> DatasetAll:
+ if not isinstance(other, BaseDataset):
return NotImplemented
# Optimization: X & (Y & Z) is equivalent to X & Y & Z.
return DatasetAll(*self.objects, other)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 4e94432aef..111757b016 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -81,7 +81,7 @@ import airflow.templates
from airflow import settings, utils
from airflow.api_internal.internal_api_call import internal_api_call
from airflow.configuration import conf as airflow_conf, secrets_backend_list
-from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll
+from airflow.datasets import BaseDataset, Dataset, DatasetAll
from airflow.datasets.manager import dataset_manager
from airflow.exceptions import (
AirflowDagInconsistent,
@@ -177,7 +177,7 @@ ScheduleInterval = Union[None, str, timedelta,
relativedelta]
# but Mypy cannot handle that right now. Track progress of PEP 661 for
progress.
# See also: https://discuss.python.org/t/9126/7
ScheduleIntervalArg = Union[ArgNotSet, ScheduleInterval]
-ScheduleArg = Union[ArgNotSet, ScheduleInterval, Timetable,
BaseDatasetEventInput, Collection["Dataset"]]
+ScheduleArg = Union[ArgNotSet, ScheduleInterval, Timetable, BaseDataset,
Collection["Dataset"]]
SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"],
List[TaskInstance]], None]
@@ -633,8 +633,8 @@ class DAG(LoggingMixin):
self.timetable: Timetable
self.schedule_interval: ScheduleInterval
- self.dataset_triggers: BaseDatasetEventInput | None = None
- if isinstance(schedule, BaseDatasetEventInput):
+ self.dataset_triggers: BaseDataset | None = None
+ if isinstance(schedule, BaseDataset):
self.dataset_triggers = schedule
elif isinstance(schedule, Collection) and not isinstance(schedule,
str):
if not all(isinstance(x, Dataset) for x in schedule):
@@ -642,7 +642,7 @@ class DAG(LoggingMixin):
self.dataset_triggers = DatasetAll(*schedule)
elif isinstance(schedule, Timetable):
timetable = schedule
- elif schedule is not NOTSET and not isinstance(schedule,
BaseDatasetEventInput):
+ elif schedule is not NOTSET and not isinstance(schedule, BaseDataset):
schedule_interval = schedule
if isinstance(schedule, DatasetOrTimeSchedule):
@@ -3864,7 +3864,7 @@ class DagModel(Base):
"""
from airflow.models.serialized_dag import SerializedDagModel
- def dag_ready(dag_id: str, cond: BaseDatasetEventInput, statuses:
dict) -> bool | None:
+ def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool
| None:
# if dag was serialized before 2.9 and we *just* upgraded,
# we may be dealing with old version. In that case,
# just wait for the dag to be reserialized.
diff --git a/airflow/timetables/datasets.py b/airflow/timetables/datasets.py
index 30b268fdb9..a8f4a7f22f 100644
--- a/airflow/timetables/datasets.py
+++ b/airflow/timetables/datasets.py
@@ -19,7 +19,7 @@ from __future__ import annotations
import typing
-from airflow.datasets import BaseDatasetEventInput, DatasetAll
+from airflow.datasets import BaseDataset, DatasetAll
from airflow.exceptions import AirflowTimetableInvalid
from airflow.timetables.simple import DatasetTriggeredTimetable as
DatasetTriggeredSchedule
from airflow.utils.types import DagRunType
@@ -40,10 +40,10 @@ class DatasetOrTimeSchedule(DatasetTriggeredSchedule):
self,
*,
timetable: Timetable,
- datasets: Collection[Dataset] | BaseDatasetEventInput,
+ datasets: Collection[Dataset] | BaseDataset,
) -> None:
self.timetable = timetable
- if isinstance(datasets, BaseDatasetEventInput):
+ if isinstance(datasets, BaseDataset):
self.datasets = datasets
else:
self.datasets = DatasetAll(*datasets)
@@ -73,7 +73,7 @@ class DatasetOrTimeSchedule(DatasetTriggeredSchedule):
def validate(self) -> None:
if isinstance(self.timetable, DatasetTriggeredSchedule):
raise AirflowTimetableInvalid("cannot nest dataset timetables")
- if not isinstance(self.datasets, BaseDatasetEventInput):
+ if not isinstance(self.datasets, BaseDataset):
raise AirflowTimetableInvalid("all elements in 'datasets' must be
datasets")
@property
diff --git a/tests/datasets/test_dataset.py b/tests/datasets/test_dataset.py
index c1d68c0933..31aebff9fa 100644
--- a/tests/datasets/test_dataset.py
+++ b/tests/datasets/test_dataset.py
@@ -24,7 +24,7 @@ from typing import Callable
import pytest
from sqlalchemy.sql import select
-from airflow.datasets import BaseDatasetEventInput, Dataset, DatasetAll,
DatasetAny
+from airflow.datasets import BaseDataset, Dataset, DatasetAll, DatasetAny
from airflow.models.dataset import DatasetDagRunQueue, DatasetModel
from airflow.models.serialized_dag import SerializedDagModel
from airflow.operators.empty import EmptyOperator
@@ -346,7 +346,7 @@ def test_dag_with_complex_dataset_triggers(session,
dag_maker):
), "Serialized 'dataset_triggers' should be a dict"
-def datasets_equal(d1: BaseDatasetEventInput, d2: BaseDatasetEventInput) ->
bool:
+def datasets_equal(d1: BaseDataset, d2: BaseDataset) -> bool:
if type(d1) != type(d2):
return False