This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 78dc35e250a Expose config option to set different loggers at different
levels. (#55850)
78dc35e250a is described below
commit 78dc35e250aa962d1a639dae866739342ce05d7a
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue Sep 23 13:50:09 2025 +0100
Expose config option to set different loggers at different levels. (#55850)
This is commonly very useful when debugging when you want to set _something_
at debug but not everything, espeically not sqlalchemy logs (as they are so
noisy as to overwhelm everything)
This gives users an easy way to do that.
The main functionality to do this was added not long ago when we ported
Airflow over to structlog. This exposes that capability via a config option.
As part of this I needed to make a change to how logs are handled from task
processes -- they were being mistakenly level flitered _again_, meaning that
if the task decided it wanted to log anything at debug, but the main process
was at info, it would get dropped and not make it to the log file. This now
changes it so that anything send by the subprocess makes it to the log file.
---
.../src/airflow/config_templates/config.yml | 23 +++++++++++++++
airflow-core/src/airflow/logging_config.py | 1 +
.../src/airflow_shared/logging/structlog.py | 29 +++++++++++-------
shared/logging/tests/logging/test_structlog.py | 2 +-
.../src/airflow/sdk/execution_time/supervisor.py | 34 ++++++++++++++--------
task-sdk/src/airflow/sdk/log.py | 7 +++--
.../task_sdk/execution_time/test_supervisor.py | 26 +++++++++++++++++
7 files changed, 97 insertions(+), 25 deletions(-)
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 99c5476736a..c4f10ac9624 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -788,11 +788,34 @@ logging:
description: |
Logging level.
+ Individual loggers can be set at a different level with the
:ref:`config:logging__namespace_levels`
+ option.
+
Supported values: ``CRITICAL``, ``ERROR``, ``WARNING``, ``INFO``,
``DEBUG``.
version_added: 2.0.0
type: string
example: ~
default: "INFO"
+ namespace_levels:
+ description: |
+ Set the log level for individual named loggers.
+
+ A very common convention in Python is to use the module name as the
name of the logger from which log
+ messages originate. This config option gives us the ability to set log
levels for those loggers
+ easily.
+
+ The format of this variable is a series of ``<logger>=<level>`` pairs,
separated by whitespace or
+ commas.
+
+ Each level is one of the possible values to ``logging_level``.
+
+ The logger names are viewable in task logs (as the "source"
attribute), or in server components by
+ including ``%(name)s`` in your format string, or in the between ``[]``
after the message in the
+ default format.
+ version_added: 3.1.0
+ type: string
+ example: "sqlalchemy=INFO sqlalchemy.engine=DEBUG, botocor"
+ default: ~
celery_logging_level:
description: |
Logging level for celery. If not set, it uses the value of
logging_level
diff --git a/airflow-core/src/airflow/logging_config.py
b/airflow-core/src/airflow/logging_config.py
index 0875ac9be90..495011d89d2 100644
--- a/airflow-core/src/airflow/logging_config.py
+++ b/airflow-core/src/airflow/logging_config.py
@@ -106,6 +106,7 @@ def configure_logging():
)
configure_logging(
log_level=level,
+ namespace_log_levels=conf.get("logging", "namespace_levels",
fallback=None),
stdlib_config=logging_config,
log_format=log_fmt,
callsite_parameters=callsite_params,
diff --git a/shared/logging/src/airflow_shared/logging/structlog.py
b/shared/logging/src/airflow_shared/logging/structlog.py
index fcba506f6a0..cded41c1563 100644
--- a/shared/logging/src/airflow_shared/logging/structlog.py
+++ b/shared/logging/src/airflow_shared/logging/structlog.py
@@ -148,7 +148,9 @@ def make_filtering_logger() -> Callable[...,
BindableLogger]:
if not logger_name and isinstance(logger, (NamedWriteLogger,
NamedBytesLogger)):
logger_name = logger.name
- if logger_name:
+ if (level_override := kwargs.get("context",
{}).pop("__level_override", None)) is not None:
+ level = level_override
+ elif logger_name:
level =
PER_LOGGER_LEVELS.longest_prefix(logger_name).get(PER_LOGGER_LEVELS[""])
else:
level = PER_LOGGER_LEVELS[""]
@@ -386,7 +388,7 @@ def configure_logging(
callsite_parameters: Iterable[CallsiteParameter] | None = None,
colors: bool | None = None,
output: LogOutputType | None = None,
- log_levels: str | dict[str, str] | None = None,
+ namespace_log_levels: str | dict[str, str] | None = None,
cache_logger_on_first_use: bool = True,
):
"""
@@ -413,7 +415,7 @@ def configure_logging(
If ``log_format`` is specified, then anything required to populate
that (such as ``%(lineno)d``) will
be automatically included.
- :param log_levels: Levels of extra loggers to configure.
+ :param namespace_log_levels: Levels of extra loggers to configure.
To make this easier to use, this can be a string consisting of pairs
of ``<logger>=<level>`` (either
string, or space delimited) which will set the level for that specific
logger.
@@ -437,11 +439,13 @@ def configure_logging(
PER_LOGGER_LEVELS[""] = NAME_TO_LEVEL[log_level.lower()]
# Extract per-logger-tree levels and set them
- if isinstance(log_levels, str):
+ if isinstance(namespace_log_levels, str):
log_from_level = partial(re.compile(r"\s*=\s*").split, maxsplit=2)
- log_levels = {log: level for log, level in map(log_from_level,
re.split(r"[\s,]+", log_levels))}
- if log_levels:
- for log, level in log_levels.items():
+ namespace_log_levels = {
+ log: level for log, level in map(log_from_level,
re.split(r"[\s,]+", namespace_log_levels))
+ }
+ if namespace_log_levels:
+ for log, level in namespace_log_levels.items():
try:
loglevel = NAME_TO_LEVEL[level.lower()]
except KeyError:
@@ -603,14 +607,19 @@ def init_log_file(
return full_path
-def logger_without_processor_of_type(logger: WrappedLogger, processor_type:
type):
+def reconfigure_logger(
+ logger: WrappedLogger, without_processor_type: type, level_override: int |
None = None
+):
procs = getattr(logger, "_processors", None)
if procs is None:
procs = structlog.get_config()["processors"]
- procs = [proc for proc in procs if not isinstance(proc, processor_type)]
+ procs = [proc for proc in procs if not isinstance(proc,
without_processor_type)]
return structlog.wrap_logger(
- getattr(logger, "_logger", None), processors=procs, **getattr(logger,
"_context", {})
+ getattr(logger, "_logger", None),
+ processors=procs,
+ **getattr(logger, "_context", {}),
+ __level_override=level_override,
)
diff --git a/shared/logging/tests/logging/test_structlog.py
b/shared/logging/tests/logging/test_structlog.py
index e201f84091f..6a5bacd73ba 100644
--- a/shared/logging/tests/logging/test_structlog.py
+++ b/shared/logging/tests/logging/test_structlog.py
@@ -306,7 +306,7 @@ def test_logger_filtering(structlog_config, levels):
colors=False,
log_format="[%(name)s] %(message)s",
log_level="DEBUG",
- log_levels=levels,
+ namespace_log_levels=levels,
) as sio:
structlog.get_logger("my").info("Hello", key1="value1")
structlog.get_logger("my.logger").info("Hello", key1="value2")
diff --git a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
index 797400c8508..01e3094cb59 100644
--- a/task-sdk/src/airflow/sdk/execution_time/supervisor.py
+++ b/task-sdk/src/airflow/sdk/execution_time/supervisor.py
@@ -55,6 +55,7 @@ import structlog
from pydantic import BaseModel, TypeAdapter
from airflow.configuration import conf
+from airflow.sdk._shared.logging.structlog import reconfigure_logger
from airflow.sdk.api.client import Client, ServerResponseError
from airflow.sdk.api.datamodels._generated import (
AssetResponse,
@@ -538,15 +539,11 @@ class WatchedSubprocess:
)
)
- from airflow.sdk._shared.logging.structlog import
logger_without_processor_of_type
-
- std_handle_log = logger_without_processor_of_type(
- self.process_log, structlog.processors.CallsiteParameterAdder
- )
- target_loggers: tuple[FilteringBoundLogger, ...] = (std_handle_log,)
+ target_loggers: tuple[FilteringBoundLogger, ...] = (self.process_log,)
if self.subprocess_logs_to_stdout:
target_loggers += (log,)
+
self.selector.register(
stdout, selectors.EVENT_READ,
self._create_log_forwarder(target_loggers, "task.stdout")
)
@@ -570,6 +567,13 @@ class WatchedSubprocess:
def _create_log_forwarder(self, loggers, name, log_level=logging.INFO) ->
Callable[[socket], bool]:
"""Create a socket handler that forwards logs to a logger."""
+ loggers = tuple(
+ reconfigure_logger(
+ log,
+ structlog.processors.CallsiteParameterAdder,
+ )
+ for log in loggers
+ )
return make_buffered_socket_reader(
forward_to_log(loggers, logger=name, level=log_level),
on_close=self._on_socket_closed
)
@@ -1374,12 +1378,7 @@ class ActivitySubprocess(WatchedSubprocess):
raise RuntimeError("send_fds is not available on this platform")
child_logs, read_logs = socketpair()
- from airflow.sdk._shared.logging.structlog import
logger_without_processor_of_type
-
- std_handle_log = logger_without_processor_of_type(
- self.process_log, structlog.processors.CallsiteParameterAdder
- )
- target_loggers: tuple[FilteringBoundLogger, ...] = (std_handle_log,)
+ target_loggers: tuple[FilteringBoundLogger, ...] = (self.process_log,)
if self.subprocess_logs_to_stdout:
target_loggers += (log,)
@@ -1713,6 +1712,17 @@ def process_log_messages_from_subprocess(
) -> Generator[None, bytes | bytearray, None]:
from structlog.stdlib import NAME_TO_LEVEL
+ loggers = tuple(
+ reconfigure_logger(
+ log,
+ structlog.processors.CallsiteParameterAdder,
+ # We need these logger to print _everything_ they are given. The
subprocess itself does the level
+ # filtering.
+ level_override=logging.NOTSET,
+ )
+ for log in loggers
+ )
+
while True:
# Generator receive syntax, values are "sent" in by the
`make_buffered_socket_reader` and returned to
# the yield.
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index 6e0d280d2d7..2539364a88d 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -91,6 +91,8 @@ def configure_logging(
if colored_console_log is None:
colored_console_log = conf.getboolean("logging",
"colored_console_log", fallback=True)
+ namespace_log_levels = conf.get("logging", "namespace_levels",
fallback=None)
+
from airflow.sdk._shared.logging import configure_logging,
translate_config_values
log_fmt, callsite_params = translate_config_values(
@@ -110,6 +112,7 @@ def configure_logging(
configure_logging(
json_output=json_output,
log_level=log_level,
+ namespace_log_levels=namespace_log_levels,
log_format=log_fmt,
output=output,
cache_logger_on_first_use=cache_logger_on_first_use,
@@ -278,9 +281,9 @@ def _showwarning(
if _warnings_showwarning is not None:
_warnings_showwarning(message, category, filename, lineno, file,
line)
else:
- from airflow.sdk._shared.logging.structlog import
logger_without_processor_of_type
+ from airflow.sdk._shared.logging.structlog import reconfigure_logger
- log = logger_without_processor_of_type(
+ log = reconfigure_logger(
structlog.get_logger("py.warnings").bind(),
structlog.processors.CallsiteParameterAdder
)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
index 94ad8d919e7..8a5e06e0e84 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py
@@ -40,6 +40,7 @@ import httpx
import msgspec
import psutil
import pytest
+import structlog
from pytest_unordered import unordered
from task_sdk import FAKE_BUNDLE, make_client
from uuid6 import uuid7
@@ -126,6 +127,7 @@ from airflow.sdk.execution_time.supervisor import (
InProcessSupervisorComms,
InProcessTestSupervisor,
_remote_logging_conn,
+ process_log_messages_from_subprocess,
set_supervisor_comms,
supervise,
)
@@ -2425,3 +2427,27 @@ def test_remote_logging_conn(remote_logging,
remote_conn, expected_env, monkeypa
f"Connection {expected_env} was not available during
upload_to_remote call"
)
assert connection_available["conn_uri"] is not None, "Connection
URI was None during upload"
+
+
+def test_process_log_messages_from_subprocess(monkeypatch, caplog):
+ from airflow.sdk._shared.logging.structlog import PER_LOGGER_LEVELS
+
+ read_end, write_end = socket.socketpair()
+
+ # Set global level at warning
+ monkeypatch.setitem(PER_LOGGER_LEVELS, "", logging.WARNING)
+ output_log = structlog.get_logger()
+
+ gen = process_log_messages_from_subprocess(loggers=(output_log,))
+
+ # We need to start up the generator to get it to the point it's at waiting
on the yield
+ next(gen)
+
+ # Now we can send in messages to it.
+ gen.send(b'{"level": "debug", "event": "A debug"}\n')
+ gen.send(b'{"level": "error", "event": "An error"}\n')
+
+ assert caplog.record_tuples == [
+ (None, logging.DEBUG, "A debug"),
+ (None, logging.ERROR, "An error"),
+ ]