Lee-W commented on code in PR #47381:
URL: https://github.com/apache/airflow/pull/47381#discussion_r1983548912


##########
task_sdk/src/airflow/sdk/definitions/asset/__init__.py:
##########
@@ -451,11 +451,16 @@ def iter_dag_dependencies(self, *, source: str, target: 
str) -> Iterator[DagDepe
 
         :meta private:
         """
+        from airflow.models.asset import retrieve_asset_ids
+        from airflow.utils.session import create_session
+
+        with create_session() as session:
+            asset_id = str(retrieve_asset_ids(assets=[self], 
session=session)[0])
         yield DagDependency(
             source=source or "asset",
             target=target or "asset",
             dependency_type="asset",
-            dependency_id=self.name,
+            dependency_id=asset_id,

Review Comment:
   @bbovenzi Instead of asset.id, I'm thinking of using `AsestUniqueKey`. It 
would be longer, but we won't need to access the DB for this.
   
   ```python
   AssetUniqueKey.from_asset(Asset(name="abc", uri="test"))
   ```
   
   If I'm not mistaken, to make the asset ID work correctly, we'll need to 
collect all assets and write them into DB first before we serialize dag. I feel 
we're not doing things this way now and might need to change many things to 
achieve it 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to