Lee-W commented on code in PR #58038:
URL: https://github.com/apache/airflow/pull/58038#discussion_r2563991948


##########
airflow-core/src/airflow/models/taskinstance.py:
##########
@@ -1401,31 +1401,38 @@ def _asset_event_extras_from_aliases() -> 
dict[tuple[AssetUniqueKey, str], set[s
                 if alias_name not in outlet_alias_names:
                     continue
                 asset_key = AssetUniqueKey(**event["dest_asset_key"])
-                extra_json = json.dumps(event["extra"], sort_keys=True)
-                d[asset_key, extra_json].add(alias_name)
+                # fallback for backward compatibility
+                asset_extra_json = json.dumps(event.get("dest_asset_extra", 
{}), sort_keys=True)
+                asset_event_extra_json = json.dumps(event["extra"], 
sort_keys=True)
+                d[asset_key, asset_extra_json, 
asset_event_extra_json].add(alias_name)
             return d
 
         outlet_alias_names = {o.name for o in task_outlets if o.type == 
AssetAlias.__name__ and o.name}
         if outlet_alias_names and (event_extras_from_aliases := 
_asset_event_extras_from_aliases()):
-            for (asset_key, extra_json), event_aliase_names in 
event_extras_from_aliases.items():
-                extra = json.loads(extra_json)
+            for (
+                asset_key,
+                asset_extra_json,
+                asset_event_extras_json,
+            ), event_aliase_names in event_extras_from_aliases.items():
+                asset_event_extra = json.loads(asset_event_extras_json)
+                asset = Asset(name=asset_key.name, uri=asset_key.uri, 
extra=json.loads(asset_extra_json))
                 ti.log.debug("register event for asset %s with aliases %s", 
asset_key, event_aliase_names)
                 event = asset_manager.register_asset_change(
                     task_instance=ti,
-                    asset=asset_key,
+                    asset=asset,

Review Comment:
   https://github.com/apache/airflow/issues/58711



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to