This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c14241b72c Simplify dataset serialization code (#38089)
c14241b72c is described below

commit c14241b72c8057730adcd5cf7d969d8bf0b708f7
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Thu Mar 14 19:51:08 2024 +0800

    Simplify dataset serialization code (#38089)
---
 airflow/datasets/__init__.py | 11 +++++++++++
 airflow/models/dag.py        | 19 ++++---------------
 2 files changed, 15 insertions(+), 15 deletions(-)

diff --git a/airflow/datasets/__init__.py b/airflow/datasets/__init__.py
index 4f8d587727..1edcbb946d 100644
--- a/airflow/datasets/__init__.py
+++ b/airflow/datasets/__init__.py
@@ -95,6 +95,9 @@ class BaseDatasetEventInput(Protocol):
     def __and__(self, other: BaseDatasetEventInput) -> DatasetAll:
         return DatasetAll(self, other)
 
+    def as_expression(self) -> dict[str, Any]:
+        raise NotImplementedError
+
     def evaluate(self, statuses: dict[str, bool]) -> bool:
         raise NotImplementedError
 
@@ -126,6 +129,11 @@ class Dataset(os.PathLike, BaseDatasetEventInput):
     def __hash__(self) -> int:
         return hash(self.uri)
 
+    def as_expression(self) -> dict[str, Any]:
+        if self.extra is None:
+            return {"uri": self.uri}
+        return {"uri": self.uri, "extra": self.extra}
+
     def iter_datasets(self) -> Iterator[tuple[str, Dataset]]:
         yield self.uri, self
 
@@ -141,6 +149,9 @@ class _DatasetBooleanCondition(BaseDatasetEventInput):
     def __init__(self, *objects: BaseDatasetEventInput) -> None:
         self.objects = objects
 
+    def as_expression(self) -> dict[str, Any]:
+        return {"objects": [o.as_expression() for o in self.objects]}
+
     def evaluate(self, statuses: dict[str, bool]) -> bool:
         return self.agg_func(x.evaluate(statuses=statuses) for x in 
self.objects)
 
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index ac38ae1eab..2ac247d739 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -3034,16 +3034,6 @@ class DAG(LoggingMixin):
         )
         return cls.bulk_write_to_db(dags=dags, session=session)
 
-    def simplify_dataset_expression(self, dataset_expression) -> dict | None:
-        """Simplifies a nested dataset expression into a 'any' or 'all' format 
with URIs."""
-        if dataset_expression is None:
-            return None
-        if dataset_expression.get("__type") == "dataset":
-            return dataset_expression["__var"]["uri"]
-
-        new_key = "any" if dataset_expression["__type"] == "dataset_any" else 
"all"
-        return {new_key: [self.simplify_dataset_expression(item) for item in 
dataset_expression["__var"]]}
-
     @classmethod
     @provide_session
     def bulk_write_to_db(
@@ -3063,8 +3053,6 @@ class DAG(LoggingMixin):
         if not dags:
             return
 
-        from airflow.serialization.serialized_objects import BaseSerialization 
 # Avoid circular import.
-
         log.info("Sync %s DAGs", len(dags))
         dag_by_ids = {dag.dag_id: dag for dag in dags}
 
@@ -3129,9 +3117,10 @@ class DAG(LoggingMixin):
             )
             orm_dag.schedule_interval = dag.schedule_interval
             orm_dag.timetable_description = dag.timetable.description
-            orm_dag.dataset_expression = dag.simplify_dataset_expression(
-                BaseSerialization.serialize(dag.dataset_triggers)
-            )
+            if (dataset_triggers := dag.dataset_triggers) is None:
+                orm_dag.dataset_expression = None
+            else:
+                orm_dag.dataset_expression = dataset_triggers.as_expression()
 
             orm_dag.processor_subdir = processor_subdir
 

Reply via email to