Lee-W commented on code in PR #44711:
URL: https://github.com/apache/airflow/pull/44711#discussion_r1872509260
##########
task_sdk/src/airflow/sdk/definitions/asset/decorators.py:
##########
@@ -81,41 +97,53 @@ class AssetDefinition(Asset):
_source: asset
def __attrs_post_init__(self) -> None:
- from airflow.models.dag import DAG
-
- with DAG(
- dag_id=self.name,
- schedule=self._source.schedule,
- is_paused_upon_creation=self._source.is_paused_upon_creation,
- dag_display_name=self._source.display_name or self.name,
- description=self._source.description,
- params=self._source.params,
- on_success_callback=self._source.on_success_callback,
- on_failure_callback=self._source.on_failure_callback,
- auto_register=True,
- ):
- _AssetMainOperator(
- task_id="__main__",
- inlets=[
- AssetRef(name=inlet_asset_name)
- for inlet_asset_name in
inspect.signature(self._function).parameters
- if inlet_asset_name not in ("self", "context")
- ],
- outlets=[self],
- python_callable=self._function,
- definition_name=self.name,
- uri=self.uri,
- )
+ with self._source.create_dag(dag_id=self.name):
+ _AssetMainOperator.from_definition(self)
@attrs.define(kw_only=True)
-class asset:
- """Create an asset by decorating a materialization function."""
+class MultiAssetDefinition(BaseAsset):
+ """
+ Representation from decorating a function with ``@asset.multi``.
+
+ This is implemented as an "asset-like" object that can be used in all
places
+ that accept asset-ish things (e.g. normal assets, aliases, AssetAll,
+ AssetAny).
+
+ :meta private:
+ """
+
+ _function: Callable
+ _source: asset.multi
+
+ def __attrs_post_init__(self) -> None:
+ with self._source.create_dag(dag_id=self._function.__name__):
+ _AssetMainOperator.from_definition(self)
+
+ def evaluate(self, statuses: dict[str, bool]) -> bool:
+ return all(o.evaluate(statuses=statuses) for o in self._source.outlets)
+
+ def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]:
+ for o in self._source.outlets:
+ yield from o.iter_assets()
+
+ def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
+ for o in self._source.outlets:
+ yield from o.iter_asset_aliases()
+
+ def iter_dag_dependencies(self, *, source: str, target: str) ->
Iterator[DagDependency]:
+ for obj in self._source.outlets:
+ yield from obj.iter_dag_dependencies(source=source, target=target)
- uri: str | ObjectStoragePath | None = None
- group: str = Asset.asset_type
- extra: dict[str, Any] = attrs.field(factory=dict)
- watchers: list[BaseTrigger] = attrs.field(factory=list)
+
[email protected](kw_only=True)
+class _DAGArguments:
Review Comment:
Not sure whether `_DAGLike` makes more sense. This sound like arguments but
it seems to be a DAG to be?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]