This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1c104a7d8dc Once again redact JWT tokens in task logs (#55499)
1c104a7d8dc is described below
commit 1c104a7d8dc6018e4121f688c460d509cb41c61c
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Thu Sep 11 11:18:53 2025 +0100
Once again redact JWT tokens in task logs (#55499)
The version of redact_jwt that got copies across in to the shared library
was
ot the working version (it only redacted jwt-like things when the string
started with them) -- I have fixed this by moving across the right version,
and also to prevent future confusion I have removed it, and all the other
now
unused processors from airflow.sdk.log
Fixes #55466
---
.../src/airflow_shared/logging/structlog.py | 5 +-
task-sdk/src/airflow/sdk/log.py | 98 +---------------------
2 files changed, 5 insertions(+), 98 deletions(-)
diff --git a/shared/logging/src/airflow_shared/logging/structlog.py
b/shared/logging/src/airflow_shared/logging/structlog.py
index 6e66c50ab60..bc381dfbc7d 100644
--- a/shared/logging/src/airflow_shared/logging/structlog.py
+++ b/shared/logging/src/airflow_shared/logging/structlog.py
@@ -53,6 +53,7 @@ __all__ = [
"structlog_processors",
]
+JWT_PATTERN = re.compile(r"eyJ[\.A-Za-z0-9-_]*")
LEVEL_TO_FILTERING_LOGGER: dict[int, type[Logger]] = {}
@@ -202,8 +203,8 @@ def logger_name(logger: Any, method_name: Any, event_dict:
EventDict) -> EventDi
# token. Better safe than sorry
def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) ->
EventDict:
for k, v in event_dict.items():
- if isinstance(v, str) and v.startswith("eyJ"):
- event_dict[k] = "eyJ***"
+ if isinstance(v, str):
+ event_dict[k] = re.sub(JWT_PATTERN, "eyJ***", v)
return event_dict
diff --git a/task-sdk/src/airflow/sdk/log.py b/task-sdk/src/airflow/sdk/log.py
index 28b51d08d6d..5923848ec8d 100644
--- a/task-sdk/src/airflow/sdk/log.py
+++ b/task-sdk/src/airflow/sdk/log.py
@@ -17,14 +17,10 @@
# under the License.
from __future__ import annotations
-import itertools
-import logging.config
-import re
-import sys
import warnings
from functools import cache
from pathlib import Path
-from typing import TYPE_CHECKING, Any, BinaryIO, Generic, TextIO, TypeVar
+from typing import TYPE_CHECKING, Any, BinaryIO, TextIO
import structlog
@@ -33,9 +29,7 @@ import structlog
from pydantic import JsonValue # noqa: TC002
if TYPE_CHECKING:
- from collections.abc import Callable
-
- from structlog.typing import EventDict, ExcInfo, FilteringBoundLogger,
Processor
+ from structlog.typing import EventDict, FilteringBoundLogger, Processor
from airflow.logging_config import RemoteLogIO
from airflow.sdk.types import Logger, RuntimeTaskInstanceProtocol as
RuntimeTI
@@ -44,65 +38,6 @@ if TYPE_CHECKING:
__all__ = ["configure_logging", "reset_logging", "mask_secret"]
-JWT_PATTERN = re.compile(r"eyJ[\.A-Za-z0-9-_]*")
-
-
-def exception_group_tracebacks(
- format_exception: Callable[[ExcInfo], list[dict[str, Any]]],
-) -> Processor:
- # Make mypy happy
- if not hasattr(__builtins__, "BaseExceptionGroup"):
- T = TypeVar("T")
-
- class BaseExceptionGroup(Generic[T]):
- exceptions: list[T]
-
- def _exception_group_tracebacks(logger: Any, method_name: Any, event_dict:
EventDict) -> EventDict:
- if exc_info := event_dict.get("exc_info", None):
- group: BaseExceptionGroup[Exception] | None = None
- if exc_info is True:
- # `log.exception('mesg")` case
- exc_info = sys.exc_info()
- if exc_info[0] is None:
- exc_info = None
-
- if (
- isinstance(exc_info, tuple)
- and len(exc_info) == 3
- and isinstance(exc_info[1], BaseExceptionGroup)
- ):
- group = exc_info[1]
- elif isinstance(exc_info, BaseExceptionGroup):
- group = exc_info
-
- if group:
- # Only remove it from event_dict if we handle it
- del event_dict["exc_info"]
- event_dict["exception"] = list(
- itertools.chain.from_iterable(
- format_exception((type(exc), exc, exc.__traceback__))
# type: ignore[attr-defined,arg-type]
- for exc in (*group.exceptions, group)
- )
- )
-
- return event_dict
-
- return _exception_group_tracebacks
-
-
-def logger_name(logger: Any, method_name: Any, event_dict: EventDict) ->
EventDict:
- if logger_name := event_dict.pop("logger_name", None):
- event_dict.setdefault("logger", logger_name)
- return event_dict
-
-
-def redact_jwt(logger: Any, method_name: str, event_dict: EventDict) ->
EventDict:
- for k, v in event_dict.items():
- if isinstance(v, str):
- event_dict[k] = re.sub(JWT_PATTERN, "eyJ***", v)
- return event_dict
-
-
def mask_logs(logger: Any, method_name: str, event_dict: EventDict) ->
EventDict:
from airflow.sdk._shared.secrets_masker import redact
@@ -110,35 +45,6 @@ def mask_logs(logger: Any, method_name: str, event_dict:
EventDict) -> EventDict
return event_dict
-def drop_positional_args(logger: Any, method_name: Any, event_dict: EventDict)
-> EventDict:
- event_dict.pop("positional_args", None)
- return event_dict
-
-
-class StdBinaryStreamHandler(logging.StreamHandler):
- """A logging.StreamHandler that sends logs as binary JSON over the given
stream."""
-
- stream: BinaryIO
-
- def __init__(self, stream: BinaryIO):
- super().__init__(stream)
-
- def emit(self, record: logging.LogRecord):
- try:
- msg = self.format(record)
- buffer = bytearray(msg, "utf-8", "backslashreplace")
-
- buffer += b"\n"
-
- stream = self.stream
- stream.write(buffer)
- self.flush()
- except RecursionError: # See issue 36272
- raise
- except Exception:
- self.handleError(record)
-
-
@cache
def logging_processors(
json_output: bool,