Lee-W commented on code in PR #40693:
URL: https://github.com/apache/airflow/pull/40693#discussion_r1677388698
##########
airflow/datasets/__init__.py:
##########
@@ -179,6 +183,34 @@ def __eq__(self, other: Any) -> bool:
def __hash__(self) -> int:
return hash(self.name)
+ def as_expression(self) -> Any:
+ """
+ Serialize the dataset into its scheduling expression.
+
+ :meta private:
+ """
+ return {"any": [o.as_expression() for o in self.expand_datasets()]}
Review Comment:
updated
##########
airflow/datasets/__init__.py:
##########
@@ -179,6 +183,34 @@ def __eq__(self, other: Any) -> bool:
def __hash__(self) -> int:
return hash(self.name)
+ def as_expression(self) -> Any:
+ """
+ Serialize the dataset into its scheduling expression.
+
+ :meta private:
+ """
+ return {"any": [o.as_expression() for o in self.expand_datasets()]}
+
+ def iter_datasets(self) -> Iterator[tuple[str, Dataset]]:
+ for dataset in self.expand_datasets():
+ yield dataset.uri, dataset
+
+ def evaluate(self, statuses: dict[str, bool]) -> bool:
+ return any(x.evaluate(statuses=statuses) for x in
self.expand_datasets())
+
+ @provide_session
+ def expand_datasets(self, *, session: Session = NEW_SESSION) ->
list[Dataset]:
+ """Expand the dataset alias to resolved datasets."""
+ from airflow.models.dataset import DatasetAliasModel
+
+ dataset_alias_obj = session.scalars(
+ select(DatasetAliasModel).where(DatasetAliasModel.name ==
self.name).limit(1)
+ ).one()
Review Comment:
updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]