This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new 31c20f5b2d fix log for notifier(instance) without __name__ (#41591) 
(#41699)
31c20f5b2d is described below

commit 31c20f5b2da0babeaa5259cd60f4d35537ab67a6
Author: 鐘翊修 <[email protected]>
AuthorDate: Fri Aug 23 22:27:53 2024 +0800

    fix log for notifier(instance) without __name__ (#41591) (#41699)
    
    Co-authored-by: obarisk <[email protected]>
    Co-authored-by: Tzu-ping Chung <[email protected]>
---
 airflow/models/taskinstance.py    | 15 ++++++++++++---
 tests/models/test_taskinstance.py | 33 ++++++++++++++++++++++++++++++---
 2 files changed, 42 insertions(+), 6 deletions(-)

diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index baa4e3eed0..0c34d35024 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1550,12 +1550,21 @@ def _run_finished_callback(
     """
     if callbacks:
         callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
-        for callback in callbacks:
-            log.info("Executing %s callback", callback.__name__)
+
+        def get_callback_representation(callback: TaskStateChangeCallback) -> 
Any:
+            with contextlib.suppress(AttributeError):
+                return callback.__name__
+            with contextlib.suppress(AttributeError):
+                return callback.__class__.__name__
+            return callback
+
+        for idx, callback in enumerate(callbacks):
+            callback_repr = get_callback_representation(callback)
+            log.info("Executing callback at index %d: %s", idx, callback_repr)
             try:
                 callback(context)
             except Exception:
-                log.exception("Error when executing %s callback", 
callback.__name__)  # type: ignore[attr-defined]
+                log.exception("Error in callback at index %d: %s", idx, 
callback_repr)
 
 
 def _log_state(*, task_instance: TaskInstance | TaskInstancePydantic, 
lead_msg: str = "") -> None:
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 70425c4004..ba5ac7c7b5 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -76,6 +76,7 @@ from airflow.models.taskmap import TaskMap
 from airflow.models.taskreschedule import TaskReschedule
 from airflow.models.variable import Variable
 from airflow.models.xcom import LazyXComSelectSequence, XCom
+from airflow.notifications.basenotifier import BaseNotifier
 from airflow.operators.bash import BashOperator
 from airflow.operators.empty import EmptyOperator
 from airflow.operators.python import PythonOperator
@@ -3416,7 +3417,9 @@ class TestTaskInstance:
             ti.refresh_from_db()
             assert ti.state == State.SUCCESS
 
-    def test_finished_callbacks_handle_and_log_exception(self, caplog):
+    def test_finished_callbacks_callable_handle_and_log_exception(self, 
caplog):
+        called = completed = False
+
         def on_finish_callable(context):
             nonlocal called, completed
             called = True
@@ -3432,8 +3435,32 @@ class TestTaskInstance:
             assert not completed
             callback_name = callback_input[0] if isinstance(callback_input, 
list) else callback_input
             callback_name = qualname(callback_name).split(".")[-1]
-            assert "Executing on_finish_callable callback" in caplog.text
-            assert "Error when executing on_finish_callable callback" in 
caplog.text
+            assert "Executing callback at index 0: on_finish_callable" in 
caplog.text
+            assert "Error in callback at index 0: on_finish_callable" in 
caplog.text
+
+    def test_finished_callbacks_notifier_handle_and_log_exception(self, 
caplog):
+        class OnFinishNotifier(BaseNotifier):
+            """
+            error captured by BaseNotifier
+            """
+
+            def __init__(self, error: bool):
+                super().__init__()
+                self.raise_error = error
+
+            def notify(self, context):
+                self.execute()
+
+            def execute(self) -> None:
+                if self.raise_error:
+                    raise KeyError
+
+        caplog.clear()
+        callbacks = [OnFinishNotifier(error=False), 
OnFinishNotifier(error=True)]
+        _run_finished_callback(callbacks=callbacks, context={})
+        assert "Executing callback at index 0: OnFinishNotifier" in caplog.text
+        assert "Executing callback at index 1: OnFinishNotifier" in caplog.text
+        assert "KeyError" in caplog.text
 
     @pytest.mark.skip_if_database_isolation_mode  # Does not work in db 
isolation mode
     @provide_session

Reply via email to