uranusjr commented on code in PR #58038:
URL: https://github.com/apache/airflow/pull/58038#discussion_r2541176798


##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1401,31 +1401,38 @@ def _asset_event_extras_from_aliases() -> 
dict[tuple[AssetUniqueKey, str], set[s
                 if alias_name not in outlet_alias_names:
                     continue
                 asset_key = AssetUniqueKey(**event["dest_asset_key"])
-                extra_json = json.dumps(event["extra"], sort_keys=True)
-                d[asset_key, extra_json].add(alias_name)
+                # fallback for backward compatibility
+                asset_extra_json = json.dumps(event.get("dest_asset_extra", 
{}), sort_keys=True)
+                asset_event_extra_json = json.dumps(event["extra"], 
sort_keys=True)
+                d[asset_key, asset_extra_json, 
asset_event_extra_json].add(alias_name)
             return d
 
         outlet_alias_names = {o.name for o in task_outlets if o.type == 
AssetAlias.__name__ and o.name}
         if outlet_alias_names and (event_extras_from_aliases := 
_asset_event_extras_from_aliases()):
-            for (asset_key, extra_json), event_aliase_names in 
event_extras_from_aliases.items():
-                extra = json.loads(extra_json)
+            for (
+                asset_key,
+                asset_extra_json,
+                asset_event_extras_json,
+            ), event_aliase_names in event_extras_from_aliases.items():
+                asset_event_extra = json.loads(asset_event_extras_json)
+                asset = Asset(name=asset_key.name, uri=asset_key.uri, 
extra=json.loads(asset_extra_json))
                 ti.log.debug("register event for asset %s with aliases %s", 
asset_key, event_aliase_names)
                 event = asset_manager.register_asset_change(
                     task_instance=ti,
-                    asset=asset_key,
+                    asset=asset,

Review Comment:
   asset.extra would still be lost if the asset is declared elsewhere but with 
a different extra. This is unavoidable since that other extra is in the 
database and is kept in sync to the source dag file. I don’t think there’s a 
way to fix this, but we should probably note this somewhere.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to