This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ee12577f36 fix(asset): allow extra in Metadata to be optional (#47997)
9ee12577f36 is described below

commit 9ee12577f369866912ebca276b18a0ca763e10b7
Author: Wei Lee <[email protected]>
AuthorDate: Thu Mar 20 20:36:13 2025 +0800

    fix(asset): allow extra in Metadata to be optional (#47997)
    
    close: #47779
---
 task-sdk/src/airflow/sdk/definitions/asset/metadata.py |  2 +-
 tests/models/test_taskinstance.py                      | 17 ++++++++++++++++-
 2 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/definitions/asset/metadata.py 
b/task-sdk/src/airflow/sdk/definitions/asset/metadata.py
index 6639389c7ee..88e88645429 100644
--- a/task-sdk/src/airflow/sdk/definitions/asset/metadata.py
+++ b/task-sdk/src/airflow/sdk/definitions/asset/metadata.py
@@ -32,5 +32,5 @@ class Metadata:
     """Metadata to attach to an AssetEvent."""
 
     asset: Asset
-    extra: dict[str, Any]
+    extra: dict[str, Any] = attrs.field(factory=dict)
     alias: AssetAlias | None = None
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index f9a7d6fe12c..ffa0753e5f5 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2404,6 +2404,14 @@ class TestTaskInstance:
                 post_execute=_write2_post_execute,
             )
 
+            @task(outlets=Asset("test_outlet_asset_extra_3"))
+            def write3():
+                result = "write_3 result"
+                yield Metadata(Asset(name="test_outlet_asset_extra_3"))
+                return result
+
+            write3()
+
         dr: DagRun = dag_maker.create_dagrun()
         for ti in dr.get_task_instances(session=session):
             ti.run(session=session)
@@ -2416,7 +2424,7 @@ class TestTaskInstance:
         assert xcom.value == json.dumps("write_1 result")
 
         events = dict(iter(session.execute(select(AssetEvent.source_task_id, 
AssetEvent))))
-        assert set(events) == {"write1", "write2"}
+        assert set(events) == {"write1", "write2", "write3"}
 
         assert events["write1"].source_dag_id == dr.dag_id
         assert events["write1"].source_run_id == dr.run_id
@@ -2432,6 +2440,13 @@ class TestTaskInstance:
         assert events["write2"].asset.name == "test_outlet_asset_extra_2"
         assert events["write2"].extra == {"x": 1}
 
+        assert events["write3"].source_dag_id == dr.dag_id
+        assert events["write3"].source_run_id == dr.run_id
+        assert events["write3"].source_task_id == "write3"
+        assert events["write3"].asset.uri == "test_outlet_asset_extra_3"
+        assert events["write3"].asset.name == "test_outlet_asset_extra_3"
+        assert events["write3"].extra == {}
+
     @pytest.mark.want_activate_assets(True)
     def test_outlet_asset_alias(self, dag_maker, session):
         from airflow.sdk.definitions.asset import Asset, AssetAlias

Reply via email to