This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9ee12577f36 fix(asset): allow extra in Metadata to be optional (#47997)
9ee12577f36 is described below
commit 9ee12577f369866912ebca276b18a0ca763e10b7
Author: Wei Lee <[email protected]>
AuthorDate: Thu Mar 20 20:36:13 2025 +0800
fix(asset): allow extra in Metadata to be optional (#47997)
close: #47779
---
task-sdk/src/airflow/sdk/definitions/asset/metadata.py | 2 +-
tests/models/test_taskinstance.py | 17 ++++++++++++++++-
2 files changed, 17 insertions(+), 2 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/definitions/asset/metadata.py
b/task-sdk/src/airflow/sdk/definitions/asset/metadata.py
index 6639389c7ee..88e88645429 100644
--- a/task-sdk/src/airflow/sdk/definitions/asset/metadata.py
+++ b/task-sdk/src/airflow/sdk/definitions/asset/metadata.py
@@ -32,5 +32,5 @@ class Metadata:
"""Metadata to attach to an AssetEvent."""
asset: Asset
- extra: dict[str, Any]
+ extra: dict[str, Any] = attrs.field(factory=dict)
alias: AssetAlias | None = None
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index f9a7d6fe12c..ffa0753e5f5 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2404,6 +2404,14 @@ class TestTaskInstance:
post_execute=_write2_post_execute,
)
+ @task(outlets=Asset("test_outlet_asset_extra_3"))
+ def write3():
+ result = "write_3 result"
+ yield Metadata(Asset(name="test_outlet_asset_extra_3"))
+ return result
+
+ write3()
+
dr: DagRun = dag_maker.create_dagrun()
for ti in dr.get_task_instances(session=session):
ti.run(session=session)
@@ -2416,7 +2424,7 @@ class TestTaskInstance:
assert xcom.value == json.dumps("write_1 result")
events = dict(iter(session.execute(select(AssetEvent.source_task_id,
AssetEvent))))
- assert set(events) == {"write1", "write2"}
+ assert set(events) == {"write1", "write2", "write3"}
assert events["write1"].source_dag_id == dr.dag_id
assert events["write1"].source_run_id == dr.run_id
@@ -2432,6 +2440,13 @@ class TestTaskInstance:
assert events["write2"].asset.name == "test_outlet_asset_extra_2"
assert events["write2"].extra == {"x": 1}
+ assert events["write3"].source_dag_id == dr.dag_id
+ assert events["write3"].source_run_id == dr.run_id
+ assert events["write3"].source_task_id == "write3"
+ assert events["write3"].asset.uri == "test_outlet_asset_extra_3"
+ assert events["write3"].asset.name == "test_outlet_asset_extra_3"
+ assert events["write3"].extra == {}
+
@pytest.mark.want_activate_assets(True)
def test_outlet_asset_alias(self, dag_maker, session):
from airflow.sdk.definitions.asset import Asset, AssetAlias