This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 31c20f5b2d fix log for notifier(instance) without __name__ (#41591)
(#41699)
31c20f5b2d is described below
commit 31c20f5b2da0babeaa5259cd60f4d35537ab67a6
Author: 鐘翊修 <[email protected]>
AuthorDate: Fri Aug 23 22:27:53 2024 +0800
fix log for notifier(instance) without __name__ (#41591) (#41699)
Co-authored-by: obarisk <[email protected]>
Co-authored-by: Tzu-ping Chung <[email protected]>
---
airflow/models/taskinstance.py | 15 ++++++++++++---
tests/models/test_taskinstance.py | 33 ++++++++++++++++++++++++++++++---
2 files changed, 42 insertions(+), 6 deletions(-)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index baa4e3eed0..0c34d35024 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -1550,12 +1550,21 @@ def _run_finished_callback(
"""
if callbacks:
callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
- for callback in callbacks:
- log.info("Executing %s callback", callback.__name__)
+
+ def get_callback_representation(callback: TaskStateChangeCallback) ->
Any:
+ with contextlib.suppress(AttributeError):
+ return callback.__name__
+ with contextlib.suppress(AttributeError):
+ return callback.__class__.__name__
+ return callback
+
+ for idx, callback in enumerate(callbacks):
+ callback_repr = get_callback_representation(callback)
+ log.info("Executing callback at index %d: %s", idx, callback_repr)
try:
callback(context)
except Exception:
- log.exception("Error when executing %s callback",
callback.__name__) # type: ignore[attr-defined]
+ log.exception("Error in callback at index %d: %s", idx,
callback_repr)
def _log_state(*, task_instance: TaskInstance | TaskInstancePydantic,
lead_msg: str = "") -> None:
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 70425c4004..ba5ac7c7b5 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -76,6 +76,7 @@ from airflow.models.taskmap import TaskMap
from airflow.models.taskreschedule import TaskReschedule
from airflow.models.variable import Variable
from airflow.models.xcom import LazyXComSelectSequence, XCom
+from airflow.notifications.basenotifier import BaseNotifier
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
@@ -3416,7 +3417,9 @@ class TestTaskInstance:
ti.refresh_from_db()
assert ti.state == State.SUCCESS
- def test_finished_callbacks_handle_and_log_exception(self, caplog):
+ def test_finished_callbacks_callable_handle_and_log_exception(self,
caplog):
+ called = completed = False
+
def on_finish_callable(context):
nonlocal called, completed
called = True
@@ -3432,8 +3435,32 @@ class TestTaskInstance:
assert not completed
callback_name = callback_input[0] if isinstance(callback_input,
list) else callback_input
callback_name = qualname(callback_name).split(".")[-1]
- assert "Executing on_finish_callable callback" in caplog.text
- assert "Error when executing on_finish_callable callback" in
caplog.text
+ assert "Executing callback at index 0: on_finish_callable" in
caplog.text
+ assert "Error in callback at index 0: on_finish_callable" in
caplog.text
+
+ def test_finished_callbacks_notifier_handle_and_log_exception(self,
caplog):
+ class OnFinishNotifier(BaseNotifier):
+ """
+ error captured by BaseNotifier
+ """
+
+ def __init__(self, error: bool):
+ super().__init__()
+ self.raise_error = error
+
+ def notify(self, context):
+ self.execute()
+
+ def execute(self) -> None:
+ if self.raise_error:
+ raise KeyError
+
+ caplog.clear()
+ callbacks = [OnFinishNotifier(error=False),
OnFinishNotifier(error=True)]
+ _run_finished_callback(callbacks=callbacks, context={})
+ assert "Executing callback at index 0: OnFinishNotifier" in caplog.text
+ assert "Executing callback at index 1: OnFinishNotifier" in caplog.text
+ assert "KeyError" in caplog.text
@pytest.mark.skip_if_database_isolation_mode # Does not work in db
isolation mode
@provide_session