This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 90a5cd66ae Revert "Simplify dataset serialization code (#38089)"
(#38158)
90a5cd66ae is described below
commit 90a5cd66ae19445dd7560caf3a5cff595dc43388
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu Mar 14 20:05:59 2024 +0100
Revert "Simplify dataset serialization code (#38089)" (#38158)
This reverts commit c14241b72c8057730adcd5cf7d969d8bf0b708f7.
---
airflow/datasets/__init__.py | 11 -----------
airflow/models/dag.py | 19 +++++++++++++++----
2 files changed, 15 insertions(+), 15 deletions(-)
diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py
index 1edcbb946d..4f8d587727 100644
--- a/airflow/datasets/__init__.py
+++ b/airflow/datasets/__init__.py
@@ -95,9 +95,6 @@ class BaseDatasetEventInput(Protocol):
def __and__(self, other: BaseDatasetEventInput) -> DatasetAll:
return DatasetAll(self, other)
- def as_expression(self) -> dict[str, Any]:
- raise NotImplementedError
-
def evaluate(self, statuses: dict[str, bool]) -> bool:
raise NotImplementedError
@@ -129,11 +126,6 @@ class Dataset(os.PathLike, BaseDatasetEventInput):
def __hash__(self) -> int:
return hash(self.uri)
- def as_expression(self) -> dict[str, Any]:
- if self.extra is None:
- return {"uri": self.uri}
- return {"uri": self.uri, "extra": self.extra}
-
def iter_datasets(self) -> Iterator[tuple[str, Dataset]]:
yield self.uri, self
@@ -149,9 +141,6 @@ class _DatasetBooleanCondition(BaseDatasetEventInput):
def __init__(self, *objects: BaseDatasetEventInput) -> None:
self.objects = objects
- def as_expression(self) -> dict[str, Any]:
- return {"objects": [o.as_expression() for o in self.objects]}
-
def evaluate(self, statuses: dict[str, bool]) -> bool:
return self.agg_func(x.evaluate(statuses=statuses) for x in
self.objects)
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 2ac247d739..ac38ae1eab 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -3034,6 +3034,16 @@ class DAG(LoggingMixin):
)
return cls.bulk_write_to_db(dags=dags, session=session)
+ def simplify_dataset_expression(self, dataset_expression) -> dict | None:
+ """Simplifies a nested dataset expression into a 'any' or 'all' format
with URIs."""
+ if dataset_expression is None:
+ return None
+ if dataset_expression.get("__type") == "dataset":
+ return dataset_expression["__var"]["uri"]
+
+ new_key = "any" if dataset_expression["__type"] == "dataset_any" else
"all"
+ return {new_key: [self.simplify_dataset_expression(item) for item in
dataset_expression["__var"]]}
+
@classmethod
@provide_session
def bulk_write_to_db(
@@ -3053,6 +3063,8 @@ class DAG(LoggingMixin):
if not dags:
return
+ from airflow.serialization.serialized_objects import BaseSerialization
# Avoid circular import.
+
log.info("Sync %s DAGs", len(dags))
dag_by_ids = {dag.dag_id: dag for dag in dags}
@@ -3117,10 +3129,9 @@ class DAG(LoggingMixin):
)
orm_dag.schedule_interval = dag.schedule_interval
orm_dag.timetable_description = dag.timetable.description
- if (dataset_triggers := dag.dataset_triggers) is None:
- orm_dag.dataset_expression = None
- else:
- orm_dag.dataset_expression = dataset_triggers.as_expression()
+ orm_dag.dataset_expression = dag.simplify_dataset_expression(
+ BaseSerialization.serialize(dag.dataset_triggers)
+ )
orm_dag.processor_subdir = processor_subdir