This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 44d4a3f5223 [v3-1-test] Add log record when listening dag is 
partitioned but run has no key (#59375) (#59582)
44d4a3f5223 is described below

commit 44d4a3f522381eaf410e67208d568fe14e6a96c1
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 17 20:05:59 2025 +0100

    [v3-1-test] Add log record when listening dag is partitioned but run has no 
key (#59375) (#59582)
    
    (cherry picked from commit e2cd7c609b1b01c08802a444d0db27b8710902fd)
    
    Co-authored-by: Henry Chen <[email protected]>
---
 airflow-core/src/airflow/assets/manager.py     | 6 +++++-
 airflow-core/tests/unit/assets/test_manager.py | 2 ++
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/assets/manager.py 
b/airflow-core/src/airflow/assets/manager.py
index a00c7cae27d..c0dce3861ec 100644
--- a/airflow-core/src/airflow/assets/manager.py
+++ b/airflow-core/src/airflow/assets/manager.py
@@ -134,7 +134,11 @@ class AssetManager(LoggingMixin):
             )
         )
         if not asset_model:
-            cls.logger().warning("AssetModel %s not found", asset)
+            msg = f"AssetModel {asset} not found; cannot create asset event."
+            cls.logger().warning(msg)
+            # if there is a task_instance, write to task log
+            if task_instance is not None and hasattr(task_instance, "log"):
+                task_instance.log.warning(msg)
             return None
 
         if not asset_model.active:
diff --git a/airflow-core/tests/unit/assets/test_manager.py 
b/airflow-core/tests/unit/assets/test_manager.py
index 46a59198d80..94b63e72501 100644
--- a/airflow-core/tests/unit/assets/test_manager.py
+++ b/airflow-core/tests/unit/assets/test_manager.py
@@ -69,6 +69,7 @@ def create_mock_dag():
 
 class TestAssetManager:
     def test_register_asset_change_asset_doesnt_exist(self, 
mock_task_instance):
+        mock_task_instance = mock.Mock()
         asset = Asset(uri="asset_doesnt_exist", name="not exist")
 
         mock_session = mock.Mock(spec=Session)
@@ -84,6 +85,7 @@ class TestAssetManager:
         # AssetDagRunQueue rows
         mock_session.add.assert_not_called()
         mock_session.merge.assert_not_called()
+        mock_task_instance.log.warning.assert_called()
 
     def test_register_asset_change(self, session, dag_maker, 
mock_task_instance, testing_dag_bundle):
         asset_manager = AssetManager()

Reply via email to