This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0a94867e4a Add logging around listener (#42301)
0a94867e4a is described below
commit 0a94867e4aab220930fa87978dc8564d84576262
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Wed Sep 18 00:23:53 2024 -0700
Add logging around listener (#42301)
---
airflow/listeners/listener.py | 10 ++++++++++
tests/listeners/test_listeners.py | 20 ++++++++++++++++++++
2 files changed, 30 insertions(+)
diff --git a/airflow/listeners/listener.py b/airflow/listeners/listener.py
index cdabfebb75..57d0360487 100644
--- a/airflow/listeners/listener.py
+++ b/airflow/listeners/listener.py
@@ -33,6 +33,15 @@ log = logging.getLogger(__name__)
_listener_manager: ListenerManager | None = None
+def _before_hookcall(hook_name, hook_impls, kwargs):
+ log.debug("Calling %r with %r", hook_name, kwargs)
+ log.debug("Hook impls: %s", hook_impls)
+
+
+def _after_hookcall(outcome, hook_name, hook_impls, kwargs):
+ log.debug("Result from %r: %s", hook_name, outcome.get_result())
+
+
class ListenerManager:
"""Manage listener registration and provides hook property for calling
them."""
@@ -40,6 +49,7 @@ class ListenerManager:
from airflow.listeners.spec import dagrun, dataset, importerrors,
lifecycle, taskinstance
self.pm = pluggy.PluginManager("airflow")
+ self.pm.add_hookcall_monitoring(_before_hookcall, _after_hookcall)
self.pm.add_hookspecs(lifecycle)
self.pm.add_hookspecs(dagrun)
self.pm.add_hookspecs(dataset)
diff --git a/tests/listeners/test_listeners.py
b/tests/listeners/test_listeners.py
index 6704521c33..3c34ab0ff8 100644
--- a/tests/listeners/test_listeners.py
+++ b/tests/listeners/test_listeners.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import contextlib
+import logging
import os
import pytest
@@ -175,3 +176,22 @@ def test_class_based_listener(create_task_instance,
session=None):
assert len(listener.state) == 2
assert listener.state == [TaskInstanceState.RUNNING,
TaskInstanceState.SUCCESS]
+
+
+def test_listener_logs_call(caplog, create_task_instance, session):
+ caplog.set_level(logging.DEBUG, logger="airflow.listeners.listener")
+ lm = get_listener_manager()
+ lm.add_listener(full_listener)
+
+ ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
+ ti._run_raw_task()
+
+ listener_logs = [r for r in caplog.record_tuples if r[0] ==
"airflow.listeners.listener"]
+ assert len(listener_logs) == 6
+ assert all(r[:-1] == ("airflow.listeners.listener", logging.DEBUG) for r
in listener_logs)
+ assert listener_logs[0][-1].startswith("Calling 'on_task_instance_running'
with {'")
+ assert listener_logs[1][-1].startswith("Hook impls: [<HookImpl plugin")
+ assert listener_logs[2][-1] == "Result from 'on_task_instance_running': []"
+ assert listener_logs[3][-1].startswith("Calling 'on_task_instance_success'
with {'")
+ assert listener_logs[4][-1].startswith("Hook impls: [<HookImpl plugin")
+ assert listener_logs[5][-1] == "Result from 'on_task_instance_success': []"