Lee-W commented on code in PR #37016:
URL: https://github.com/apache/airflow/pull/37016#discussion_r1497150676


##########
airflow/models/dataset.py:
##########
@@ -336,3 +337,49 @@ def __repr__(self) -> str:
         ]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetBooleanCondition:

Review Comment:
   Looks like this is not something we're going to use directly. Should we make 
it an abstract class?



##########
airflow/models/dataset.py:
##########
@@ -336,3 +337,49 @@ def __repr__(self) -> str:
         ]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetBooleanCondition:
+    """
+    Base class for boolean logic for dataset triggers.
+
+    :meta private:
+    """
+
+    agg_func: Callable

Review Comment:
   ```suggestion
       agg_func: Callable[[Iterable], bool]
   ```



##########
airflow/timetables/datasets.py:
##########
@@ -52,24 +55,20 @@ def deserialize(cls, data: dict[str, typing.Any]) -> 
Timetable:
         from airflow.serialization.serialized_objects import decode_timetable
 
         return cls(
-            timetable=decode_timetable(data["timetable"]), 
datasets=[Dataset(**d) for d in data["datasets"]]
+            timetable=decode_timetable(data["timetable"]),
+            datasets=[],  # don't need the datasets after deserialization

Review Comment:
   same question here



##########
airflow/models/dataset.py:
##########
@@ -336,3 +337,49 @@ def __repr__(self) -> str:
         ]:
             args.append(f"{attr}={getattr(self, attr)!r}")
         return f"{self.__class__.__name__}({', '.join(args)})"
+
+
+class DatasetBooleanCondition:
+    """
+    Base class for boolean logic for dataset triggers.
+
+    :meta private:
+    """
+
+    agg_func: Callable
+
+    def __init__(self, *objects):
+        self.objects = objects
+
+    def evaluate(self, statuses: dict[str, bool]):
+        return self.agg_func(self.eval_one(x, statuses) for x in self.objects)
+
+    def eval_one(self, obj: Dataset | DatasetAny | DatasetAll, statuses):
+        if isinstance(obj, Dataset):
+            return statuses.get(obj.uri, False)
+        return obj.evaluate(statuses=statuses)
+
+    def all_datasets(self) -> dict[str, Dataset]:

Review Comment:
   Should it be a property or should we name it as something like 
`get_all_datasets`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to