This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 933fefca27 Fix double logging with some task logging handler (#27591)
933fefca27 is described below
commit 933fefca27a5cd514c9083040344a866c7f517db
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Thu Nov 10 17:58:06 2022 +0000
Fix double logging with some task logging handler (#27591)
A previous change to fix disappearing log messages turned on propagation
for the `airflow.task` logger, and disabled it again after
`set_context()` was called, but only if that function returned a special
sentinel value.
For the "core" task log handlers we returned them, but some providers
weren't "correctly" subclassing and weren't returning this sentinel
value.
The fix here is to change the logic from disable only on special value
to disable by default and maintain propagation on special value; this
means that if a handler doesn't return the value from `super()` (or if
they don't even subclass the default handler) propagation will still be
disabled by default.
---
airflow/utils/log/file_task_handler.py | 7 ++-
airflow/utils/log/logging_mixin.py | 41 ++++++++++++---
tests/utils/test_logging_mixin.py | 95 +++++++++++++++++++++++++++++-----
3 files changed, 119 insertions(+), 24 deletions(-)
diff --git a/airflow/utils/log/file_task_handler.py
b/airflow/utils/log/file_task_handler.py
index 692c279d0c..e20da8cebc 100644
--- a/airflow/utils/log/file_task_handler.py
+++ b/airflow/utils/log/file_task_handler.py
@@ -29,13 +29,13 @@ from airflow.configuration import AirflowConfigException,
conf
from airflow.exceptions import RemovedInAirflow3Warning
from airflow.utils.context import Context
from airflow.utils.helpers import parse_template_string,
render_template_to_string
-from airflow.utils.log.logging_mixin import DISABLE_PROPOGATE
from airflow.utils.log.non_caching_file_handler import NonCachingFileHandler
from airflow.utils.session import create_session
from airflow.utils.state import State
if TYPE_CHECKING:
from airflow.models import TaskInstance
+ from airflow.utils.log.logging_mixin import SetContextPropagate
class FileTaskHandler(logging.Handler):
@@ -62,7 +62,7 @@ class FileTaskHandler(logging.Handler):
stacklevel=(2 if type(self) == FileTaskHandler else 3),
)
- def set_context(self, ti: TaskInstance):
+ def set_context(self, ti: TaskInstance) -> None | SetContextPropagate:
"""
Provide task_instance context to airflow task handler.
@@ -73,8 +73,7 @@ class FileTaskHandler(logging.Handler):
if self.formatter:
self.handler.setFormatter(self.formatter)
self.handler.setLevel(self.level)
-
- return DISABLE_PROPOGATE
+ return None
def emit(self, record):
if self.handler:
diff --git a/airflow/utils/log/logging_mixin.py
b/airflow/utils/log/logging_mixin.py
index 8127bde9f7..b8f5a0871c 100644
--- a/airflow/utils/log/logging_mixin.py
+++ b/airflow/utils/log/logging_mixin.py
@@ -18,18 +18,35 @@
from __future__ import annotations
import abc
+import enum
import logging
import re
import sys
from io import IOBase
from logging import Handler, Logger, StreamHandler
-from typing import IO
+from typing import IO, cast
# 7-bit C1 ANSI escape sequences
ANSI_ESCAPE = re.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")
-# Private: A sentinel object
-DISABLE_PROPOGATE = object()
+
+# Private: A sentinel objects
+class SetContextPropagate(enum.Enum):
+ """:meta private:"""
+
+ # If a `set_context` function wants to _keep_ propagation set on it's
logger it needs to return this
+ # special value.
+ MAINTAIN_PROPAGATE = object()
+ # Don't use this one anymore!
+ DISABLE_PROPAGATE = object()
+
+
+def __getattr__(name):
+ if name in ("DISABLE_PROPOGATE", "DISABLE_PROPAGATE"):
+ # Compat for spelling on off chance someone is using this directly
+ # And old object that isn't needed anymore
+ return SetContextPropagate.DISABLE_PROPAGATE
+ raise AttributeError(f"module {__name__} has no attribute {name}")
def remove_escape_codes(text: str) -> str:
@@ -183,13 +200,23 @@ def set_context(logger, value):
:param value: value to set
"""
while logger:
+ orig_propagate = logger.propagate
for handler in logger.handlers:
# Not all handlers need to have context passed in so we ignore
# the error when handlers do not have set_context defined.
- set_context = getattr(handler, "set_context", None)
- if set_context and set_context(value) is DISABLE_PROPOGATE:
- logger.propagate = False
- if logger.propagate is True:
+
+ # Don't use getatrr so we have type checking. And we don't care if
handler is actually a
+ # FileTaskHandler, it just needs to have a set_context function!
+ if hasattr(handler, "set_context"):
+ from airflow.utils.log.file_task_handler import FileTaskHandler
+
+ flag = cast(FileTaskHandler, handler).set_context(value)
+ # By default we disable propagate once we have configured the
logger, unless that handler
+ # explicitly asks us to keep it on.
+ if flag is not SetContextPropagate.MAINTAIN_PROPAGATE:
+ logger.propagate = False
+ if orig_propagate is True:
+ # If we were set to propagate before we turned if off, then keep
passing set_context up
logger = logger.parent
else:
break
diff --git a/tests/utils/test_logging_mixin.py
b/tests/utils/test_logging_mixin.py
index 464c5773d3..a1ffa3d629 100644
--- a/tests/utils/test_logging_mixin.py
+++ b/tests/utils/test_logging_mixin.py
@@ -17,29 +17,64 @@
# under the License.
from __future__ import annotations
+import logging
+import sys
import warnings
from unittest import mock
-from airflow.utils.log.logging_mixin import StreamLogWriter, set_context
+import pytest
+
+from airflow.utils.log.logging_mixin import SetContextPropagate,
StreamLogWriter, set_context
+
+
[email protected]
+def logger():
+ parent = logging.getLogger(__name__)
+ parent.propagate = False
+ yield parent
+
+ parent.propagate = True
+
+
[email protected]
+def child_logger(logger):
+ yield logger.getChild("child")
+
+
[email protected]
+def parent_child_handlers(child_logger):
+ parent_handler = logging.NullHandler()
+ parent_handler.handle = mock.MagicMock(name="parent_handler.handle")
+
+ child_handler = logging.NullHandler()
+ child_handler.handle = mock.MagicMock(name="handler.handle")
+
+ logger = child_logger.parent
+ logger.addHandler(parent_handler)
+
+ child_logger.addHandler(child_handler),
+ child_logger.propagate = True
+
+ yield parent_handler, child_handler
+
+ logger.removeHandler(parent_handler)
+ child_logger.removeHandler(child_handler)
class TestLoggingMixin:
def setup_method(self):
warnings.filterwarnings(action="always")
- def test_set_context(self):
- handler1 = mock.MagicMock()
- handler2 = mock.MagicMock()
- parent = mock.MagicMock()
+ def test_set_context(self, child_logger, parent_child_handlers):
+ handler1, handler2 = parent_child_handlers
+ handler1.set_context = mock.MagicMock()
+ handler2.set_context = mock.MagicMock()
+
+ parent = logging.getLogger(__name__)
parent.propagate = False
- parent.handlers = [
- handler1,
- ]
- log = mock.MagicMock()
- log.handlers = [
- handler2,
- ]
- log.parent = parent
+ parent.addHandler(handler1)
+ log = parent.getChild("child")
+ log.addHandler(handler2),
log.propagate = True
value = "test"
@@ -105,3 +140,37 @@ class TestStreamLogWriter:
assert not log.closed
# has no specific effect
log.close()
+
+
[email protected](["maintain_propagate"],
[[SetContextPropagate.MAINTAIN_PROPAGATE], [None]])
+def test_set_context_propagation(parent_child_handlers, child_logger,
maintain_propagate):
+ # Test the behaviour of set_context and logger propagation and the
MAINTAIN_PROPAGATE return
+
+ parent_handler, handler = parent_child_handlers
+ handler.set_context = mock.MagicMock(return_value=maintain_propagate)
+
+ # Before settting_context, ensure logs make it to the parent
+ line = sys._getframe().f_lineno + 1
+ record = child_logger.makeRecord(
+ child_logger.name, logging.INFO, __file__, line, "test message", [],
None
+ )
+ child_logger.handle(record)
+
+ handler.handle.assert_called_once_with(record)
+ # Should call the parent handler too in the default/unconfigured case
+ parent_handler.handle.assert_called_once_with(record)
+
+ parent_handler.handle.reset_mock()
+ handler.handle.reset_mock()
+
+ # Ensure that once we've called set_context on the handler we disable
propagation to parent loggers by
+ # default!
+ set_context(child_logger, {})
+
+ child_logger.handle(record)
+
+ handler.handle.assert_called_once_with(record)
+ if maintain_propagate is SetContextPropagate.MAINTAIN_PROPAGATE:
+ parent_handler.handle.assert_called_once_with(record)
+ else:
+ parent_handler.handle.assert_not_called()