Lee-W commented on code in PR #47381:
URL: https://github.com/apache/airflow/pull/47381#discussion_r1983548912
##########
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##########
@@ -451,11 +451,16 @@ def iter_dag_dependencies(self, *, source: str, target:
str) -> Iterator[DagDepe
:meta private:
"""
+ from airflow.models.asset import retrieve_asset_ids
+ from airflow.utils.session import create_session
+
+ with create_session() as session:
+ asset_id = str(retrieve_asset_ids(assets=[self],
session=session)[0])
yield DagDependency(
source=source or "asset",
target=target or "asset",
dependency_type="asset",
- dependency_id=self.name,
+ dependency_id=asset_id,
Review Comment:
@bbovenzi Instead of asset.id, I'm thinking of using `AsestUniqueKey`. It
would be longer, but we won't need to access the DB for this.
```python
AssetUniqueKey.from_asset(Asset(name="abc", uri="test"))
```
If I'm not mistaken, to make the asset ID work correctly, we'll need to
collect all assets and write them into DB first before we serialize dag. I feel
we're not doing things this way now and might need to change many things to
achieve it 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]