This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 933fefca27 Fix double logging with some task logging handler (#27591)
933fefca27 is described below

commit 933fefca27a5cd514c9083040344a866c7f517db
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Thu Nov 10 17:58:06 2022 +0000

    Fix double logging with some task logging handler (#27591)
    
    A previous change to fix disappearing log messages turned on propagation
    for the `airflow.task` logger, and disabled it again after
    `set_context()` was called, but only if that function returned a special
    sentinel value.
    
    For the "core" task log handlers we returned them, but some providers
    weren't "correctly" subclassing and weren't returning this sentinel
    value.
    
    The fix here is to change the logic from disable only on special value
    to disable by default and maintain propagation on special value; this
    means that if a handler doesn't return the value from `super()` (or if
    they don't even subclass the default handler) propagation will still be
    disabled by default.
---
 airflow/utils/log/file_task_handler.py |  7 ++-
 airflow/utils/log/logging_mixin.py     | 41 ++++++++++++---
 tests/utils/test_logging_mixin.py      | 95 +++++++++++++++++++++++++++++-----
 3 files changed, 119 insertions(+), 24 deletions(-)

diff --git a/airflow/utils/log/file_task_handler.py 
b/airflow/utils/log/file_task_handler.py
index 692c279d0c..e20da8cebc 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -29,13 +29,13 @@ from airflow.configuration import AirflowConfigException, 
conf
 from airflow.exceptions import RemovedInAirflow3Warning
 from airflow.utils.context import Context
 from airflow.utils.helpers import parse_template_string, 
render_template_to_string
-from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
 from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
 from airflow.utils.session import create_session
 from airflow.utils.state import State
 
 if TYPE_CHECKING:
     from airflow.models import TaskInstance
+    from airflow.utils.log.logging_mixin import SetContextPropagate
 
 
 class FileTaskHandler(logging.Handler):
@@ -62,7 +62,7 @@ class FileTaskHandler(logging.Handler):
                 stacklevel=(2 if type(self) == FileTaskHandler else 3),
             )
 
-    def set_context(self, ti: TaskInstance):
+    def set_context(self, ti: TaskInstance) -> None | SetContextPropagate:
         """
         Provide task_instance context to airflow task handler.
 
@@ -73,8 +73,7 @@ class FileTaskHandler(logging.Handler):
         if self.formatter:
             self.handler.setFormatter(self.formatter)
         self.handler.setLevel(self.level)
-
-        return DISABLE_PROPOGATE
+        return None
 
     def emit(self, record):
         if self.handler:
diff --git a/airflow/utils/log/logging_mixin.py 
b/airflow/utils/log/logging_mixin.py
index 8127bde9f7..b8f5a0871c 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -18,18 +18,35 @@
 from __future__ import annotations
 
 import abc
+import enum
 import logging
 import re
 import sys
 from io import IOBase
 from logging import Handler, Logger, StreamHandler
-from typing import IO
+from typing import IO, cast
 
 # 7-bit C1 ANSI escape sequences
 ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")
 
-# Private: A sentinel object
-DISABLE_PROPOGATE = object()
+
+# Private: A sentinel objects
+class SetContextPropagate(enum.Enum):
+    """:meta private:"""
+
+    # If a `set_context` function wants to _keep_ propagation set on it's 
logger it needs to return this
+    # special value.
+    MAINTAIN_PROPAGATE = object()
+    # Don't use this one anymore!
+    DISABLE_PROPAGATE = object()
+
+
+def __getattr__(name):
+    if name in ("DISABLE_PROPOGATE", "DISABLE_PROPAGATE"):
+        # Compat for spelling on off chance someone is using this directly
+        # And old object that isn't needed anymore
+        return SetContextPropagate.DISABLE_PROPAGATE
+    raise AttributeError(f"module {__name__} has no attribute {name}")
 
 
 def remove_escape_codes(text: str) -> str:
@@ -183,13 +200,23 @@ def set_context(logger, value):
     :param value: value to set
     """
     while logger:
+        orig_propagate = logger.propagate
         for handler in logger.handlers:
             # Not all handlers need to have context passed in so we ignore
             # the error when handlers do not have set_context defined.
-            set_context = getattr(handler, "set_context", None)
-            if set_context and set_context(value) is DISABLE_PROPOGATE:
-                logger.propagate = False
-        if logger.propagate is True:
+
+            # Don't use getatrr so we have type checking. And we don't care if 
handler is actually a
+            # FileTaskHandler, it just needs to have a set_context function!
+            if hasattr(handler, "set_context"):
+                from airflow.utils.log.file_task_handler import FileTaskHandler
+
+                flag = cast(FileTaskHandler, handler).set_context(value)
+                # By default we disable propagate once we have configured the 
logger, unless that handler
+                # explicitly asks us to keep it on.
+                if flag is not SetContextPropagate.MAINTAIN_PROPAGATE:
+                    logger.propagate = False
+        if orig_propagate is True:
+            # If we were set to propagate before we turned if off, then keep 
passing set_context up
             logger = logger.parent
         else:
             break
diff --git a/tests/utils/test_logging_mixin.py 
b/tests/utils/test_logging_mixin.py
index 464c5773d3..a1ffa3d629 100644
--- a/tests/utils/test_logging_mixin.py
+++ b/tests/utils/test_logging_mixin.py
@@ -17,29 +17,64 @@
 # under the License.
 from __future__ import annotations
 
+import logging
+import sys
 import warnings
 from unittest import mock
 
-from airflow.utils.log.logging_mixin import StreamLogWriter, set_context
+import pytest
+
+from airflow.utils.log.logging_mixin import SetContextPropagate, 
StreamLogWriter, set_context
+
+
[email protected]
+def logger():
+    parent = logging.getLogger(__name__)
+    parent.propagate = False
+    yield parent
+
+    parent.propagate = True
+
+
[email protected]
+def child_logger(logger):
+    yield logger.getChild("child")
+
+
[email protected]
+def parent_child_handlers(child_logger):
+    parent_handler = logging.NullHandler()
+    parent_handler.handle = mock.MagicMock(name="parent_handler.handle")
+
+    child_handler = logging.NullHandler()
+    child_handler.handle = mock.MagicMock(name="handler.handle")
+
+    logger = child_logger.parent
+    logger.addHandler(parent_handler)
+
+    child_logger.addHandler(child_handler),
+    child_logger.propagate = True
+
+    yield parent_handler, child_handler
+
+    logger.removeHandler(parent_handler)
+    child_logger.removeHandler(child_handler)
 
 
 class TestLoggingMixin:
     def setup_method(self):
         warnings.filterwarnings(action="always")
 
-    def test_set_context(self):
-        handler1 = mock.MagicMock()
-        handler2 = mock.MagicMock()
-        parent = mock.MagicMock()
+    def test_set_context(self, child_logger, parent_child_handlers):
+        handler1, handler2 = parent_child_handlers
+        handler1.set_context = mock.MagicMock()
+        handler2.set_context = mock.MagicMock()
+
+        parent = logging.getLogger(__name__)
         parent.propagate = False
-        parent.handlers = [
-            handler1,
-        ]
-        log = mock.MagicMock()
-        log.handlers = [
-            handler2,
-        ]
-        log.parent = parent
+        parent.addHandler(handler1)
+        log = parent.getChild("child")
+        log.addHandler(handler2),
         log.propagate = True
 
         value = "test"
@@ -105,3 +140,37 @@ class TestStreamLogWriter:
         assert not log.closed
         # has no specific effect
         log.close()
+
+
[email protected](["maintain_propagate"], 
[[SetContextPropagate.MAINTAIN_PROPAGATE], [None]])
+def test_set_context_propagation(parent_child_handlers, child_logger, 
maintain_propagate):
+    # Test the behaviour of set_context and logger propagation and the 
MAINTAIN_PROPAGATE return
+
+    parent_handler, handler = parent_child_handlers
+    handler.set_context = mock.MagicMock(return_value=maintain_propagate)
+
+    # Before settting_context, ensure logs make it to the parent
+    line = sys._getframe().f_lineno + 1
+    record = child_logger.makeRecord(
+        child_logger.name, logging.INFO, __file__, line, "test message", [], 
None
+    )
+    child_logger.handle(record)
+
+    handler.handle.assert_called_once_with(record)
+    # Should call the parent handler too in the default/unconfigured case
+    parent_handler.handle.assert_called_once_with(record)
+
+    parent_handler.handle.reset_mock()
+    handler.handle.reset_mock()
+
+    # Ensure that once we've called set_context on the handler we disable 
propagation to parent loggers by
+    # default!
+    set_context(child_logger, {})
+
+    child_logger.handle(record)
+
+    handler.handle.assert_called_once_with(record)
+    if maintain_propagate is SetContextPropagate.MAINTAIN_PROPAGATE:
+        parent_handler.handle.assert_called_once_with(record)
+    else:
+        parent_handler.handle.assert_not_called()

Reply via email to