This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 868b696f115 [v3-2-test] Fix memory leak in LocalExecutor caused by 
unreleased file descriptor locks (#65121) (#66887)
868b696f115 is described below

commit 868b696f115d261f6179d60684dce77bca889b06
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu May 14 07:40:16 2026 +0300

    [v3-2-test] Fix memory leak in LocalExecutor caused by unreleased file 
descriptor locks (#65121) (#66887)
    
    This "backports" a change we made to Structlog to make it available in 
earlier versions
    of Airflow.
    (cherry picked from commit 82b3c924dce923d59115d53367e61cd3ab15dcee)
    
    Co-authored-by: Jeongwoo Do <[email protected]>
---
 shared/logging/src/airflow_shared/logging/structlog.py | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/shared/logging/src/airflow_shared/logging/structlog.py 
b/shared/logging/src/airflow_shared/logging/structlog.py
index 7dddafe3b0a..5441913ec19 100644
--- a/shared/logging/src/airflow_shared/logging/structlog.py
+++ b/shared/logging/src/airflow_shared/logging/structlog.py
@@ -24,6 +24,7 @@ import logging
 import os
 import re
 import sys
+import weakref
 from collections.abc import Callable, Iterable, Mapping, Sequence
 from functools import cache, cached_property, partial
 from pathlib import Path
@@ -593,6 +594,17 @@ def configure_logging(
             text_output = cast("TextIO", output)
         logger_factory = LoggerFactory(NamedWriteLogger, io=text_output)
 
+    # Replace structlog's WRITE_LOCKS dict with a WeakKeyDictionary so entries
+    # for closed file descriptors are garbage-collected instead of leaking.
+    # TODO: drop once structlog ships the upstream fix (tracked for 26.1.0).
+    try:
+        from structlog import _output as _structlog_output
+
+        if isinstance(_structlog_output.WRITE_LOCKS, dict):
+            _structlog_output.WRITE_LOCKS = weakref.WeakKeyDictionary()  # 
type: ignore[assignment]
+    except Exception:
+        pass
+
     structlog.configure(
         processors=shared_pre_chain + [for_structlog],
         cache_logger_on_first_use=cache_logger_on_first_use,

Reply via email to