This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a94867e4a Add logging around listener (#42301)
0a94867e4a is described below

commit 0a94867e4aab220930fa87978dc8564d84576262
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Wed Sep 18 00:23:53 2024 -0700

    Add logging around listener (#42301)
---
 airflow/listeners/listener.py     | 10 ++++++++++
 tests/listeners/test_listeners.py | 20 ++++++++++++++++++++
 2 files changed, 30 insertions(+)

diff --git a/airflow/listeners/listener.py b/airflow/listeners/listener.py
index cdabfebb75..57d0360487 100644
--- a/airflow/listeners/listener.py
+++ b/airflow/listeners/listener.py
@@ -33,6 +33,15 @@ log = logging.getLogger(__name__)
 _listener_manager: ListenerManager | None = None
 
 
+def _before_hookcall(hook_name, hook_impls, kwargs):
+    log.debug("Calling %r with %r", hook_name, kwargs)
+    log.debug("Hook impls: %s", hook_impls)
+
+
+def _after_hookcall(outcome, hook_name, hook_impls, kwargs):
+    log.debug("Result from %r: %s", hook_name, outcome.get_result())
+
+
 class ListenerManager:
     """Manage listener registration and provides hook property for calling 
them."""
 
@@ -40,6 +49,7 @@ class ListenerManager:
         from airflow.listeners.spec import dagrun, dataset, importerrors, 
lifecycle, taskinstance
 
         self.pm = pluggy.PluginManager("airflow")
+        self.pm.add_hookcall_monitoring(_before_hookcall, _after_hookcall)
         self.pm.add_hookspecs(lifecycle)
         self.pm.add_hookspecs(dagrun)
         self.pm.add_hookspecs(dataset)
diff --git a/tests/listeners/test_listeners.py 
b/tests/listeners/test_listeners.py
index 6704521c33..3c34ab0ff8 100644
--- a/tests/listeners/test_listeners.py
+++ b/tests/listeners/test_listeners.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import contextlib
+import logging
 import os
 
 import pytest
@@ -175,3 +176,22 @@ def test_class_based_listener(create_task_instance, 
session=None):
 
     assert len(listener.state) == 2
     assert listener.state == [TaskInstanceState.RUNNING, 
TaskInstanceState.SUCCESS]
+
+
+def test_listener_logs_call(caplog, create_task_instance, session):
+    caplog.set_level(logging.DEBUG, logger="airflow.listeners.listener")
+    lm = get_listener_manager()
+    lm.add_listener(full_listener)
+
+    ti = create_task_instance(session=session, state=TaskInstanceState.QUEUED)
+    ti._run_raw_task()
+
+    listener_logs = [r for r in caplog.record_tuples if r[0] == 
"airflow.listeners.listener"]
+    assert len(listener_logs) == 6
+    assert all(r[:-1] == ("airflow.listeners.listener", logging.DEBUG) for r 
in listener_logs)
+    assert listener_logs[0][-1].startswith("Calling 'on_task_instance_running' 
with {'")
+    assert listener_logs[1][-1].startswith("Hook impls: [<HookImpl plugin")
+    assert listener_logs[2][-1] == "Result from 'on_task_instance_running': []"
+    assert listener_logs[3][-1].startswith("Calling 'on_task_instance_success' 
with {'")
+    assert listener_logs[4][-1].startswith("Hook impls: [<HookImpl plugin")
+    assert listener_logs[5][-1] == "Result from 'on_task_instance_success': []"

Reply via email to