This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 816594e6cdb Remove global from Task SDK log (#59695)
816594e6cdb is described below
commit 816594e6cdb51c0057d9059fe8a68c144a3f7ded
Author: Jens Scheffler <[email protected]>
AuthorDate: Mon Dec 22 05:14:39 2025 +0100
Remove global from Task SDK log (#59695)
---
task-sdk/src/airflow/sdk/log.py | 43 ++++++++++++++++++++++++++---------------
1 file changed, 27 insertions(+), 16 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index 2dacf0a611c..0dc6298182a 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import warnings
+from collections.abc import Callable
from functools import cache
from pathlib import Path
from typing import TYPE_CHECKING, Any, BinaryIO, TextIO
@@ -39,6 +40,29 @@ if TYPE_CHECKING:
__all__ = ["configure_logging", "reset_logging", "mask_secret"]
+class _WarningsInterceptor:
+ """A class to hold the reference to the original warnings.showwarning
function."""
+
+ _original_showwarning: Callable | None = None
+
+ @staticmethod
+ def register(new_callable: Callable) -> None:
+ if _WarningsInterceptor._original_showwarning is None:
+ _WarningsInterceptor._original_showwarning = warnings.showwarning
+ warnings.showwarning = new_callable
+
+ @staticmethod
+ def reset() -> None:
+ if _WarningsInterceptor._original_showwarning is not None:
+ warnings.showwarning = _WarningsInterceptor._original_showwarning
+ _WarningsInterceptor._original_showwarning = None
+
+ @staticmethod
+ def emit_warning(*args: Any) -> None:
+ if _WarningsInterceptor._original_showwarning is not None:
+ _WarningsInterceptor._original_showwarning(*args)
+
+
def mask_logs(logger: Any, method_name: str, event_dict: EventDict) ->
EventDict:
from airflow.sdk._shared.secrets_masker import redact
@@ -121,12 +145,7 @@ def configure_logging(
callsite_parameters=callsite_params,
)
- global _warnings_showwarning
-
- if _warnings_showwarning is None:
- _warnings_showwarning = warnings.showwarning
- # Capture warnings and show them via structlog -- i.e. in task logs
- warnings.showwarning = _showwarning
+ _WarningsInterceptor.register(_showwarning)
def logger_at_level(name: str, level: int) -> Logger:
@@ -258,18 +277,11 @@ def reset_logging():
"""
from airflow.sdk._shared.logging.structlog import structlog_processors
- global _warnings_showwarning
- if _warnings_showwarning is not None:
- warnings.showwarning = _warnings_showwarning
- _warnings_showwarning = None
-
+ _WarningsInterceptor.reset()
structlog_processors.cache_clear()
logging_processors.cache_clear()
-_warnings_showwarning: Any = None
-
-
def _showwarning(
message: Warning | str,
category: type[Warning],
@@ -288,8 +300,7 @@ def _showwarning(
warnings logger named "py.warnings" with level logging.WARNING.
"""
if file is not None:
- if _warnings_showwarning is not None:
- _warnings_showwarning(message, category, filename, lineno, file,
line)
+ _WarningsInterceptor.emit_warning(message, category, filename, lineno,
file, line)
else:
from airflow.sdk._shared.logging.structlog import reconfigure_logger