This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new b2f7efecfc6 [v3-1-test] fix(asset-alias): Preserve `Asset.extra` when
using `AssetAlias` (#58038) (#58712)
b2f7efecfc6 is described below
commit b2f7efecfc64d7da8179b82df6a93875bf389df8
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Nov 26 17:59:44 2025 +0800
[v3-1-test] fix(asset-alias): Preserve `Asset.extra` when using
`AssetAlias` (#58038) (#58712)
Co-authored-by: Wei Lee <[email protected]>
---
airflow-core/src/airflow/models/taskinstance.py | 27 ++++++++-----
.../airflow/serialization/serialized_objects.py | 2 +
.../unit/serialization/test_serialized_objects.py | 3 +-
.../src/airflow/sdk/definitions/asset/__init__.py | 1 +
task-sdk/src/airflow/sdk/execution_time/context.py | 16 ++++----
.../tests/task_sdk/execution_time/test_context.py | 44 +++++++++++++---------
6 files changed, 58 insertions(+), 35 deletions(-)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 7b291841103..216bc9b1bca 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -1381,7 +1381,7 @@ class TaskInstance(Base, LoggingMixin):
session=session,
)
- def _asset_event_extras_from_aliases() -> dict[tuple[AssetUniqueKey,
str], set[str]]:
+ def _asset_event_extras_from_aliases() -> dict[tuple[AssetUniqueKey,
str, str], set[str]]:
d = defaultdict(set)
for event in outlet_events:
try:
@@ -1391,31 +1391,38 @@ class TaskInstance(Base, LoggingMixin):
if alias_name not in outlet_alias_names:
continue
asset_key = AssetUniqueKey(**event["dest_asset_key"])
- extra_json = json.dumps(event["extra"], sort_keys=True)
- d[asset_key, extra_json].add(alias_name)
+ # fallback for backward compatibility
+ asset_extra_json = json.dumps(event.get("dest_asset_extra",
{}), sort_keys=True)
+ asset_event_extra_json = json.dumps(event["extra"],
sort_keys=True)
+ d[asset_key, asset_extra_json,
asset_event_extra_json].add(alias_name)
return d
outlet_alias_names = {o.name for o in task_outlets if o.type ==
AssetAlias.__name__ and o.name}
if outlet_alias_names and (event_extras_from_aliases :=
_asset_event_extras_from_aliases()):
- for (asset_key, extra_json), event_aliase_names in
event_extras_from_aliases.items():
- extra = json.loads(extra_json)
+ for (
+ asset_key,
+ asset_extra_json,
+ asset_event_extras_json,
+ ), event_aliase_names in event_extras_from_aliases.items():
+ asset_event_extra = json.loads(asset_event_extras_json)
+ asset = Asset(name=asset_key.name, uri=asset_key.uri,
extra=json.loads(asset_extra_json))
ti.log.debug("register event for asset %s with aliases %s",
asset_key, event_aliase_names)
event = asset_manager.register_asset_change(
task_instance=ti,
- asset=asset_key,
+ asset=asset,
source_alias_names=event_aliase_names,
- extra=extra,
+ extra=asset_event_extra,
session=session,
)
if event is None:
ti.log.info("Dynamically creating AssetModel %s",
asset_key)
- session.add(AssetModel(name=asset_key.name,
uri=asset_key.uri))
+ session.add(AssetModel.from_public(asset))
session.flush() # So event can set up its asset fk.
asset_manager.register_asset_change(
task_instance=ti,
- asset=asset_key,
+ asset=asset,
source_alias_names=event_aliase_names,
- extra=extra,
+ extra=asset_event_extra,
session=session,
)
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 6a13ae1cdf7..fcc923ba592 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -398,6 +398,8 @@ def decode_outlet_event_accessor(var: dict[str, Any]) ->
OutletEventAccessor:
dest_asset_key=AssetUniqueKey(
name=e["dest_asset_key"]["name"],
uri=e["dest_asset_key"]["uri"]
),
+ # fallback for backward compatibility
+ dest_asset_extra=e.get("dest_asset_extra", {}),
extra=e["extra"],
)
for e in asset_alias_events
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 14ca6a5a2cc..bf9c4f34e44 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -409,7 +409,8 @@ class MockLazySelectSequence(LazySelectSequence):
AssetAliasEvent(
source_alias_name="test_alias",
dest_asset_key=AssetUniqueKey(name="test_name",
uri="test://asset-uri"),
- extra={},
+ dest_asset_extra={"extra": "from asset itself"},
+ extra={"extra": "from event"},
)
],
),
diff --git a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py
b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py
index 970e0385fd8..4e7b624051c 100644
--- a/task-sdk/src/airflow/sdk/definitions/asset/__init__.py
+++ b/task-sdk/src/airflow/sdk/definitions/asset/__init__.py
@@ -689,4 +689,5 @@ class AssetAliasEvent(attrs.AttrsInstance):
source_alias_name: str
dest_asset_key: AssetUniqueKey
+ dest_asset_extra: dict[str, JsonValue]
extra: dict[str, JsonValue]
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py
b/task-sdk/src/airflow/sdk/execution_time/context.py
index 02b9e90d04a..379a13a930e 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -425,9 +425,9 @@ class MacrosAccessor:
class _AssetRefResolutionMixin:
- _asset_ref_cache: dict[AssetRef, AssetUniqueKey] = {}
+ _asset_ref_cache: dict[AssetRef, tuple[AssetUniqueKey, dict[str,
JsonValue]]] = {}
- def _resolve_asset_ref(self, ref: AssetRef) -> AssetUniqueKey:
+ def _resolve_asset_ref(self, ref: AssetRef) -> tuple[AssetUniqueKey,
dict[str, JsonValue]]:
with contextlib.suppress(KeyError):
return self._asset_ref_cache[ref]
@@ -442,8 +442,8 @@ class _AssetRefResolutionMixin:
raise TypeError(f"Unimplemented asset ref: {type(ref)}")
unique_key = AssetUniqueKey.from_asset(asset)
for ref in refs_to_cache:
- self._asset_ref_cache[ref] = unique_key
- return unique_key
+ self._asset_ref_cache[ref] = (unique_key, asset.extra)
+ return (unique_key, asset.extra)
# TODO: This is temporary to avoid code duplication between here &
airflow/models/taskinstance.py
@staticmethod
@@ -488,14 +488,16 @@ class OutletEventAccessor(_AssetRefResolutionMixin):
return
if isinstance(asset, AssetRef):
- asset_key = self._resolve_asset_ref(asset)
+ asset_key, asset_extra = self._resolve_asset_ref(asset)
else:
asset_key = AssetUniqueKey.from_asset(asset)
+ asset_extra = asset.extra
asset_alias_name = self.key.name
event = AssetAliasEvent(
source_alias_name=asset_alias_name,
dest_asset_key=asset_key,
+ dest_asset_extra=asset_extra,
extra=extra or {},
)
self.asset_alias_events.append(event)
@@ -556,7 +558,7 @@ class OutletEventAccessors(
elif isinstance(key, AssetAlias):
hashable_key = AssetAliasUniqueKey.from_asset_alias(key)
elif isinstance(key, AssetRef):
- hashable_key = self._resolve_asset_ref(key)
+ hashable_key, _ = self._resolve_asset_ref(key)
else:
raise TypeError(f"Key should be either an asset or an asset alias,
not {type(key)}")
@@ -684,7 +686,7 @@ class TriggeringAssetEventsAccessor(
if isinstance(key, Asset):
hashable_key = AssetUniqueKey.from_asset(key)
elif isinstance(key, AssetRef):
- hashable_key = self._resolve_asset_ref(key)
+ hashable_key, _ = self._resolve_asset_ref(key)
elif isinstance(key, AssetAlias):
hashable_key = AssetAliasUniqueKey.from_asset_alias(key)
else:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py
b/task-sdk/tests/task_sdk/execution_time/test_context.py
index df048d1bf48..566c4ee8151 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_context.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_context.py
@@ -346,12 +346,13 @@ class TestCurrentContext:
class TestOutletEventAccessor:
@pytest.mark.parametrize(
- "add_arg",
+ "add_args",
[
- Asset("name", "uri"),
- Asset.ref(name="name"),
- Asset.ref(uri="uri"),
+ (Asset("name", "uri", extra={"extra": "from asset itself"}),
{"extra": "from event"}),
+ (Asset.ref(name="name"), {"extra": "from event"}),
+ (Asset.ref(uri="uri"), {"extra": "from event"}),
],
+ ids=["asset", "asset name ref", "asset uri ref"],
)
@pytest.mark.parametrize(
"key, asset_alias_events",
@@ -363,26 +364,31 @@ class TestOutletEventAccessor:
AssetAliasEvent(
source_alias_name="test_alias",
dest_asset_key=AssetUniqueKey(name="name", uri="uri"),
- extra={},
+ dest_asset_extra={"extra": "from asset itself"},
+ extra={"extra": "from event"},
)
],
),
),
+ ids=["inactive asset", "active asset"],
)
- def test_add(self, add_arg, key, asset_alias_events,
mock_supervisor_comms):
- mock_supervisor_comms.send.return_value = AssetResponse(name="name",
uri="uri", group="")
+ def test_add(self, add_args, key, asset_alias_events,
mock_supervisor_comms):
+ mock_supervisor_comms.send.return_value = AssetResponse(
+ name="name", uri="uri", group="", extra={"extra": "from asset
itself"}
+ )
outlet_event_accessor = OutletEventAccessor(key=key, extra={})
- outlet_event_accessor.add(add_arg)
+ outlet_event_accessor.add(*add_args)
assert outlet_event_accessor.asset_alias_events == asset_alias_events
@pytest.mark.parametrize(
- "add_arg",
+ "add_args",
[
- Asset("name", "uri"),
- Asset.ref(name="name"),
- Asset.ref(uri="uri"),
+ (Asset(name="name", uri="uri", extra={"extra": "from asset
itself"}), {"extra": "from event"}),
+ (Asset.ref(name="name"), {"extra": "from event"}),
+ (Asset.ref(uri="uri"), {"extra": "from event"}),
],
+ ids=["asset", "asset name ref", "asset uri ref"],
)
@pytest.mark.parametrize(
"key, asset_alias_events",
@@ -394,17 +400,21 @@ class TestOutletEventAccessor:
AssetAliasEvent(
source_alias_name="test_alias",
dest_asset_key=AssetUniqueKey(name="name", uri="uri"),
- extra={},
+ dest_asset_extra={"extra": "from asset itself"},
+ extra={"extra": "from event"},
)
],
),
),
+ ids=["inactive asset", "active asset"],
)
- def test_add_with_db(self, add_arg, key, asset_alias_events,
mock_supervisor_comms):
- mock_supervisor_comms.send.return_value = AssetResponse(name="name",
uri="uri", group="")
+ def test_add_with_db(self, add_args, key, asset_alias_events,
mock_supervisor_comms):
+ mock_supervisor_comms.send.return_value = AssetResponse(
+ name="name", uri="uri", group="", extra={"extra": "from asset
itself"}
+ )
- outlet_event_accessor = OutletEventAccessor(key=key, extra={"not": ""})
- outlet_event_accessor.add(add_arg, extra={})
+ outlet_event_accessor = OutletEventAccessor(key=key)
+ outlet_event_accessor.add(*add_args)
assert outlet_event_accessor.asset_alias_events == asset_alias_events