Lee-W commented on code in PR #44711:
URL: https://github.com/apache/airflow/pull/44711#discussion_r1872530082


##########
task_sdk/src/airflow/sdk/definitions/asset/decorators.py:
##########
@@ -81,41 +97,53 @@ class AssetDefinition(Asset):
     _source: asset
 
     def __attrs_post_init__(self) -> None:
-        from airflow.models.dag import DAG
-
-        with DAG(
-            dag_id=self.name,
-            schedule=self._source.schedule,
-            is_paused_upon_creation=self._source.is_paused_upon_creation,
-            dag_display_name=self._source.display_name or self.name,
-            description=self._source.description,
-            params=self._source.params,
-            on_success_callback=self._source.on_success_callback,
-            on_failure_callback=self._source.on_failure_callback,
-            auto_register=True,
-        ):
-            _AssetMainOperator(
-                task_id="__main__",
-                inlets=[
-                    AssetRef(name=inlet_asset_name)
-                    for inlet_asset_name in 
inspect.signature(self._function).parameters
-                    if inlet_asset_name not in ("self", "context")
-                ],
-                outlets=[self],
-                python_callable=self._function,
-                definition_name=self.name,
-                uri=self.uri,
-            )
+        with self._source.create_dag(dag_id=self.name):
+            _AssetMainOperator.from_definition(self)
 
 
 @attrs.define(kw_only=True)
-class asset:
-    """Create an asset by decorating a materialization function."""
+class MultiAssetDefinition(BaseAsset):
+    """
+    Representation from decorating a function with ``@asset.multi``.
+
+    This is implemented as an "asset-like" object that can be used in all 
places
+    that accept asset-ish things (e.g. normal assets, aliases, AssetAll,
+    AssetAny).
+
+    :meta private:
+    """
+
+    _function: Callable
+    _source: asset.multi
+
+    def __attrs_post_init__(self) -> None:
+        with self._source.create_dag(dag_id=self._function.__name__):
+            _AssetMainOperator.from_definition(self)
+
+    def evaluate(self, statuses: dict[str, bool]) -> bool:
+        return all(o.evaluate(statuses=statuses) for o in self._source.outlets)
+
+    def iter_assets(self) -> Iterator[tuple[AssetUniqueKey, Asset]]:
+        for o in self._source.outlets:
+            yield from o.iter_assets()
+
+    def iter_asset_aliases(self) -> Iterator[tuple[str, AssetAlias]]:
+        for o in self._source.outlets:
+            yield from o.iter_asset_aliases()
+
+    def iter_dag_dependencies(self, *, source: str, target: str) -> 
Iterator[DagDependency]:
+        for obj in self._source.outlets:
+            yield from obj.iter_dag_dependencies(source=source, target=target)
 
-    uri: str | ObjectStoragePath | None = None
-    group: str = Asset.asset_type
-    extra: dict[str, Any] = attrs.field(factory=dict)
-    watchers: list[BaseTrigger] = attrs.field(factory=list)
+
[email protected](kw_only=True)
+class _DAGArguments:

Review Comment:
   yep, dag factory was one thing I thought of as well 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to