This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 868b696f115 [v3-2-test] Fix memory leak in LocalExecutor caused by
unreleased file descriptor locks (#65121) (#66887)
868b696f115 is described below
commit 868b696f115d261f6179d60684dce77bca889b06
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 14 07:40:16 2026 +0300
[v3-2-test] Fix memory leak in LocalExecutor caused by unreleased file
descriptor locks (#65121) (#66887)
This "backports" a change we made to Structlog to make it available in
earlier versions
of Airflow.
(cherry picked from commit 82b3c924dce923d59115d53367e61cd3ab15dcee)
Co-authored-by: Jeongwoo Do <[email protected]>
---
shared/logging/src/airflow_shared/logging/structlog.py | 12 ++++++++++++
1 file changed, 12 insertions(+)
diff --git a/shared/logging/src/airflow_shared/logging/structlog.py
b/shared/logging/src/airflow_shared/logging/structlog.py
index 7dddafe3b0a..5441913ec19 100644
--- a/shared/logging/src/airflow_shared/logging/structlog.py
+++ b/shared/logging/src/airflow_shared/logging/structlog.py
@@ -24,6 +24,7 @@ import logging
import os
import re
import sys
+import weakref
from collections.abc import Callable, Iterable, Mapping, Sequence
from functools import cache, cached_property, partial
from pathlib import Path
@@ -593,6 +594,17 @@ def configure_logging(
text_output = cast("TextIO", output)
logger_factory = LoggerFactory(NamedWriteLogger, io=text_output)
+ # Replace structlog's WRITE_LOCKS dict with a WeakKeyDictionary so entries
+ # for closed file descriptors are garbage-collected instead of leaking.
+ # TODO: drop once structlog ships the upstream fix (tracked for 26.1.0).
+ try:
+ from structlog import _output as _structlog_output
+
+ if isinstance(_structlog_output.WRITE_LOCKS, dict):
+ _structlog_output.WRITE_LOCKS = weakref.WeakKeyDictionary() #
type: ignore[assignment]
+ except Exception:
+ pass
+
structlog.configure(
processors=shared_pre_chain + [for_structlog],
cache_logger_on_first_use=cache_logger_on_first_use,