This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c14241b72c Simplify dataset serialization code (#38089)
c14241b72c is described below
commit c14241b72c8057730adcd5cf7d969d8bf0b708f7
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Mar 14 19:51:08 2024 +0800
Simplify dataset serialization code (#38089)
---
airflow/datasets/__init__.py | 11 +++++++++++
airflow/models/dag.py | 19 ++++---------------
2 files changed, 15 insertions(+), 15 deletions(-)
diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py
index 4f8d587727..1edcbb946d 100644
--- a/airflow/datasets/__init__.py
+++ b/airflow/datasets/__init__.py
@@ -95,6 +95,9 @@ class BaseDatasetEventInput(Protocol):
def __and__(self, other: BaseDatasetEventInput) -> DatasetAll:
return DatasetAll(self, other)
+ def as_expression(self) -> dict[str, Any]:
+ raise NotImplementedError
+
def evaluate(self, statuses: dict[str, bool]) -> bool:
raise NotImplementedError
@@ -126,6 +129,11 @@ class Dataset(os.PathLike, BaseDatasetEventInput):
def __hash__(self) -> int:
return hash(self.uri)
+ def as_expression(self) -> dict[str, Any]:
+ if self.extra is None:
+ return {"uri": self.uri}
+ return {"uri": self.uri, "extra": self.extra}
+
def iter_datasets(self) -> Iterator[tuple[str, Dataset]]:
yield self.uri, self
@@ -141,6 +149,9 @@ class _DatasetBooleanCondition(BaseDatasetEventInput):
def __init__(self, *objects: BaseDatasetEventInput) -> None:
self.objects = objects
+ def as_expression(self) -> dict[str, Any]:
+ return {"objects": [o.as_expression() for o in self.objects]}
+
def evaluate(self, statuses: dict[str, bool]) -> bool:
return self.agg_func(x.evaluate(statuses=statuses) for x in
self.objects)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index ac38ae1eab..2ac247d739 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -3034,16 +3034,6 @@ class DAG(LoggingMixin):
)
return cls.bulk_write_to_db(dags=dags, session=session)
- def simplify_dataset_expression(self, dataset_expression) -> dict | None:
- """Simplifies a nested dataset expression into a 'any' or 'all' format
with URIs."""
- if dataset_expression is None:
- return None
- if dataset_expression.get("__type") == "dataset":
- return dataset_expression["__var"]["uri"]
-
- new_key = "any" if dataset_expression["__type"] == "dataset_any" else
"all"
- return {new_key: [self.simplify_dataset_expression(item) for item in
dataset_expression["__var"]]}
-
@classmethod
@provide_session
def bulk_write_to_db(
@@ -3063,8 +3053,6 @@ class DAG(LoggingMixin):
if not dags:
return
- from airflow.serialization.serialized_objects import BaseSerialization
# Avoid circular import.
-
log.info("Sync %s DAGs", len(dags))
dag_by_ids = {dag.dag_id: dag for dag in dags}
@@ -3129,9 +3117,10 @@ class DAG(LoggingMixin):
)
orm_dag.schedule_interval = dag.schedule_interval
orm_dag.timetable_description = dag.timetable.description
- orm_dag.dataset_expression = dag.simplify_dataset_expression(
- BaseSerialization.serialize(dag.dataset_triggers)
- )
+ if (dataset_triggers := dag.dataset_triggers) is None:
+ orm_dag.dataset_expression = None
+ else:
+ orm_dag.dataset_expression = dataset_triggers.as_expression()
orm_dag.processor_subdir = processor_subdir