This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9d7c3e7d9c7 Add asset event emission listener event (#61718)
9d7c3e7d9c7 is described below
commit 9d7c3e7d9c7365c10f3267b660ff6e01f9b1a240
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Wed Feb 11 12:46:16 2026 +0800
Add asset event emission listener event (#61718)
---
airflow-core/src/airflow/assets/manager.py | 32 +++++++++++---
airflow-core/src/airflow/listeners/spec/asset.py | 11 +++++
.../airflow/listeners/{spec/asset.py => types.py} | 29 ++++++-------
.../tests/unit/listeners/asset_listener.py | 11 ++++-
.../tests/unit/listeners/test_asset_listener.py | 50 ++++++++++++++++------
5 files changed, 96 insertions(+), 37 deletions(-)
diff --git a/airflow-core/src/airflow/assets/manager.py
b/airflow-core/src/airflow/assets/manager.py
index cfb02b096b0..b7b0ce5c239 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -28,6 +28,7 @@ from sqlalchemy.orm import joinedload
from airflow._shared.observability.metrics.stats import Stats
from airflow.configuration import conf
from airflow.listeners.listener import get_listener_manager
+from airflow.listeners.types import AssetEvent as ListenerAssetEvent
from airflow.models.asset import (
AssetAliasModel,
AssetDagRunQueue,
@@ -234,7 +235,7 @@ class AssetManager(LoggingMixin):
dags_to_queue_from_asset_alias = set()
if source_alias_names:
- asset_alias_models = session.scalars(
+ asset_alias_models: Iterable[AssetAliasModel] = session.scalars(
select(AssetAliasModel)
.where(AssetAliasModel.name.in_(source_alias_names))
.options(
@@ -251,6 +252,8 @@ class AssetManager(LoggingMixin):
for alias_ref in asset_alias_model.scheduled_dags
if not alias_ref.dag.is_paused
}
+ else:
+ asset_alias_models = []
dags_to_queue_from_asset_ref = set(
session.scalars(
@@ -267,7 +270,20 @@ class AssetManager(LoggingMixin):
)
)
- cls.notify_asset_changed(asset=asset_model.to_serialized())
+ asset = asset_model.to_serialized()
+ cls.notify_asset_changed(asset=asset)
+ cls.nofity_asset_event_emitted(
+ asset_event=ListenerAssetEvent(
+ asset=asset,
+ extra=asset_event.extra,
+ source_dag_id=asset_event.source_dag_id,
+ source_task_id=asset_event.source_task_id,
+ source_run_id=asset_event.source_run_id,
+ source_map_index=asset_event.source_map_index,
+ source_aliases=[aam.to_serialized() for aam in
asset_alias_models],
+ partition_key=partition_key,
+ )
+ )
Stats.incr("asset.updates")
@@ -304,14 +320,18 @@ class AssetManager(LoggingMixin):
def notify_asset_changed(asset: SerializedAsset) -> None:
"""Run applicable notification actions when an asset is changed."""
try:
- # TODO: AIP-76 this will have to change. needs to know *what*
happened to the asset (e.g. partition key)
- # maybe we should just add the event to the signature
- # or add a new hook `on_asset_event`
- # https://github.com/apache/airflow/issues/58290
get_listener_manager().hook.on_asset_changed(asset=asset)
except Exception:
log.exception("error calling listener")
+ @staticmethod
+ def nofity_asset_event_emitted(asset_event: ListenerAssetEvent) -> None:
+ """Run applicable notification actions when an asset event is
emitted."""
+ try:
+
get_listener_manager().hook.on_asset_event_emitted(asset_event=asset_event)
+ except Exception:
+ log.exception("error calling listener")
+
@classmethod
def _queue_dagruns(
cls,
diff --git a/airflow-core/src/airflow/listeners/spec/asset.py
b/airflow-core/src/airflow/listeners/spec/asset.py
index 05ba0809bcd..ad53e08629f 100644
--- a/airflow-core/src/airflow/listeners/spec/asset.py
+++ b/airflow-core/src/airflow/listeners/spec/asset.py
@@ -23,6 +23,7 @@ from typing import TYPE_CHECKING
from pluggy import HookspecMarker
if TYPE_CHECKING:
+ from airflow.listeners.types import AssetEvent
from airflow.serialization.definitions.assets import SerializedAsset,
SerializedAssetAlias
hookspec = HookspecMarker("airflow")
@@ -41,3 +42,13 @@ def on_asset_alias_created(asset_alias:
SerializedAssetAlias):
@hookspec
def on_asset_changed(asset: SerializedAsset):
"""Execute when asset change is registered."""
+
+
+@hookspec
+def on_asset_event_emitted(asset_event: AssetEvent):
+ """
+ Execute when an asset event is emitted.
+
+ This is generally called together with ``on_asset_changed``, but with
+ information on the emitted event instead.
+ """
diff --git a/airflow-core/src/airflow/listeners/spec/asset.py
b/airflow-core/src/airflow/listeners/types.py
similarity index 70%
copy from airflow-core/src/airflow/listeners/spec/asset.py
copy to airflow-core/src/airflow/listeners/types.py
index 05ba0809bcd..120b8ef503a 100644
--- a/airflow-core/src/airflow/listeners/spec/asset.py
+++ b/airflow-core/src/airflow/listeners/types.py
@@ -20,24 +20,23 @@ from __future__ import annotations
from typing import TYPE_CHECKING
-from pluggy import HookspecMarker
+import attrs
if TYPE_CHECKING:
- from airflow.serialization.definitions.assets import SerializedAsset,
SerializedAssetAlias
-
-hookspec = HookspecMarker("airflow")
-
-
-@hookspec
-def on_asset_created(asset: SerializedAsset):
- """Execute when a new asset is created."""
+ from pydantic import JsonValue
+ from airflow.serialization.definitions.assets import SerializedAsset,
SerializedAssetAlias
-@hookspec
-def on_asset_alias_created(asset_alias: SerializedAssetAlias):
- """Execute when a new asset alias is created."""
[email protected]
+class AssetEvent:
+ """Asset event representation for asset listener hooks."""
-@hookspec
-def on_asset_changed(asset: SerializedAsset):
- """Execute when asset change is registered."""
+ asset: SerializedAsset
+ extra: dict[str, JsonValue]
+ source_dag_id: str | None
+ source_task_id: str | None
+ source_run_id: str | None
+ source_map_index: int | None
+ source_aliases: list[SerializedAssetAlias]
+ partition_key: str | None
diff --git a/airflow-core/tests/unit/listeners/asset_listener.py
b/airflow-core/tests/unit/listeners/asset_listener.py
index e03b34a773a..bdd5551018c 100644
--- a/airflow-core/tests/unit/listeners/asset_listener.py
+++ b/airflow-core/tests/unit/listeners/asset_listener.py
@@ -23,6 +23,7 @@ from airflow.listeners import hookimpl
changed = []
created = []
+emitted = []
@hookimpl
@@ -30,11 +31,17 @@ def on_asset_changed(asset):
changed.append(copy.deepcopy(asset))
+@hookimpl
+def on_asset_event_emitted(asset_event):
+ emitted.append(copy.deepcopy(asset_event))
+
+
@hookimpl
def on_asset_created(asset):
created.append(copy.deepcopy(asset))
def clear():
- global changed, created
- changed, created = [], []
+ changed.clear()
+ created.clear()
+ emitted.clear()
diff --git a/airflow-core/tests/unit/listeners/test_asset_listener.py
b/airflow-core/tests/unit/listeners/test_asset_listener.py
index b2ce78c2443..0de2588d331 100644
--- a/airflow-core/tests/unit/listeners/test_asset_listener.py
+++ b/airflow-core/tests/unit/listeners/test_asset_listener.py
@@ -14,15 +14,18 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
+
from __future__ import annotations
import pytest
+from airflow.listeners.types import AssetEvent
from airflow.models.asset import AssetModel
from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.sdk.definitions.asset import Asset
-from airflow.utils.session import provide_session
+from airflow.sdk import Asset
+from airflow.serialization.encoders import ensure_serialized_asset
+from tests_common.test_utils.db import clear_db_assets
from unit.listeners import asset_listener
@@ -33,31 +36,50 @@ def clean_listener_state():
asset_listener.clear()
[email protected]_test
-@provide_session
-def test_asset_listener_on_asset_changed_gets_calls(
- create_task_instance_of_operator, session, listener_manager
-):
- listener_manager(asset_listener)
[email protected]
+def asset(session):
asset_uri = "test://asset/"
asset_name = "test_asset_uri"
asset_group = "test-group"
asset = Asset(uri=asset_uri, name=asset_name, group=asset_group)
asset_model = AssetModel(uri=asset_uri, name=asset_name, group=asset_group)
session.add(asset_model)
-
session.flush()
+ yield asset
+ clear_db_assets()
- ti = create_task_instance_of_operator(
+
[email protected]
+def ti(create_task_instance_of_operator, asset, session):
+ return create_task_instance_of_operator(
operator_class=EmptyOperator,
dag_id="producing_dag",
task_id="test_task",
session=session,
outlets=[asset],
)
+
+
[email protected]_test
+def test_asset_listener_on_asset_changed(asset, ti, listener_manager):
+ listener_manager(asset_listener)
ti.run()
+ assert asset_listener.changed == [ensure_serialized_asset(asset)]
+
- assert len(asset_listener.changed) == 1
- assert asset_listener.changed[0].uri == asset_uri
- assert asset_listener.changed[0].name == asset_name
- assert asset_listener.changed[0].group == asset_group
[email protected]_test
+def test_asset_listener_on_asset_event_emitted(asset, ti, listener_manager):
+ listener_manager(asset_listener)
+ ti.run()
+ assert asset_listener.emitted == [
+ AssetEvent(
+ asset=ensure_serialized_asset(asset),
+ extra={},
+ source_dag_id=ti.dag_id,
+ source_task_id=ti.task_id,
+ source_run_id=ti.run_id,
+ source_map_index=ti.map_index,
+ source_aliases=[],
+ partition_key=None,
+ )
+ ]