This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e2cd7c609b1 Add log record when listening dag is partitioned but run
has no key (#59375)
e2cd7c609b1 is described below
commit e2cd7c609b1b01c08802a444d0db27b8710902fd
Author: Henry Chen <[email protected]>
AuthorDate: Thu Dec 18 02:45:24 2025 +0800
Add log record when listening dag is partitioned but run has no key (#59375)
---
airflow-core/src/airflow/assets/manager.py | 6 +++++-
airflow-core/tests/unit/assets/test_manager.py | 2 ++
2 files changed, 7 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/assets/manager.py
b/airflow-core/src/airflow/assets/manager.py
index 31a64a34761..8382e0f52b0 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -144,7 +144,11 @@ class AssetManager(LoggingMixin):
)
)
if not asset_model:
- cls.logger().warning("AssetModel %s not found", asset)
+ msg = f"AssetModel {asset} not found; cannot create asset event."
+ cls.logger().warning(msg)
+ # if there is a task_instance, write to task log
+ if task_instance is not None and hasattr(task_instance, "log"):
+ task_instance.log.warning(msg)
return None
if not asset_model.active:
diff --git a/airflow-core/tests/unit/assets/test_manager.py
b/airflow-core/tests/unit/assets/test_manager.py
index 3d83ef34db9..d33555bb732 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -69,6 +69,7 @@ def create_mock_dag():
class TestAssetManager:
def test_register_asset_change_asset_doesnt_exist(self,
mock_task_instance):
+ mock_task_instance = mock.Mock()
asset = Asset(uri="asset_doesnt_exist", name="not exist")
mock_session = mock.Mock(spec=Session)
@@ -84,6 +85,7 @@ class TestAssetManager:
# AssetDagRunQueue rows
mock_session.add.assert_not_called()
mock_session.merge.assert_not_called()
+ mock_task_instance.log.warning.assert_called()
def test_register_asset_change(self, session, dag_maker,
mock_task_instance, testing_dag_bundle):
asset_manager = AssetManager()