This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch engine-manager
in repository https://gitbox.apache.org/repos/asf/superset.git

commit e82e06891b0f7a6b88df2b9508d628e1a0c3635f
Author: Beto Dealmeida <[email protected]>
AuthorDate: Tue Jul 29 20:07:14 2025 -0400

    Cleanup locks
---
 superset/engines/manager.py | 115 +++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 114 insertions(+), 1 deletion(-)

diff --git a/superset/engines/manager.py b/superset/engines/manager.py
index 04ebe48b0b7..01eae105635 100644
--- a/superset/engines/manager.py
+++ b/superset/engines/manager.py
@@ -71,8 +71,13 @@ class EngineManager:
     engines, as well as configuring the pool through the database settings.
     """
 
-    def __init__(self, mode: EngineModes = EngineModes.NEW) -> None:
+    def __init__(
+        self,
+        mode: EngineModes = EngineModes.NEW,
+        cleanup_interval: float = 300.0,  # 5 minutes default
+    ) -> None:
         self.mode = mode
+        self.cleanup_interval = cleanup_interval
 
         self._engines: dict[EngineKey, Engine] = {}
         self._engine_locks: dict[EngineKey, threading.Lock] = defaultdict(
@@ -84,6 +89,25 @@ class EngineManager:
             threading.Lock
         )
 
+        # Background cleanup thread management
+        self._cleanup_thread: threading.Thread | None = None
+        self._cleanup_stop_event = threading.Event()
+        self._cleanup_thread_lock = threading.Lock()
+
+    def __del__(self) -> None:
+        """
+        Ensure cleanup thread is stopped when the manager is destroyed.
+        """
+        try:
+            self.stop_cleanup_thread()
+        except Exception as ex:
+            # Avoid exceptions during garbage collection, but log if possible
+            try:
+                logger.warning(f"Error stopping cleanup thread: {ex}")
+            except Exception:  # noqa: S110
+                # If logging fails during destruction, we can't do anything
+                pass
+
     @contextmanager
     def get_engine(
         self,
@@ -387,6 +411,95 @@ class EngineManager:
 
         return kwargs
 
+    def start_cleanup_thread(self) -> None:
+        """
+        Start the background cleanup thread.
+
+        The thread will periodically clean up abandoned locks at the configured
+        interval. This is safe to call multiple times - subsequent calls are 
no-ops.
+        """
+        with self._cleanup_thread_lock:
+            if self._cleanup_thread is None or not 
self._cleanup_thread.is_alive():
+                self._cleanup_stop_event.clear()
+                self._cleanup_thread = threading.Thread(
+                    target=self._cleanup_worker,
+                    name=f"EngineManager-cleanup-{id(self)}",
+                    daemon=True,
+                )
+                self._cleanup_thread.start()
+                logger.info(
+                    f"Started cleanup thread with {self.cleanup_interval}s 
interval"
+                )
+
+    def stop_cleanup_thread(self) -> None:
+        """
+        Stop the background cleanup thread gracefully.
+
+        This will signal the thread to stop and wait for it to finish.
+        Safe to call even if no thread is running.
+        """
+        with self._cleanup_thread_lock:
+            if self._cleanup_thread is not None and 
self._cleanup_thread.is_alive():
+                self._cleanup_stop_event.set()
+                self._cleanup_thread.join(timeout=5.0)  # 5 second timeout
+                if self._cleanup_thread.is_alive():
+                    logger.warning("Cleanup thread did not stop within 
timeout")
+                else:
+                    logger.info("Cleanup thread stopped")
+                self._cleanup_thread = None
+
+    def _cleanup_worker(self) -> None:
+        """
+        Background thread worker that periodically cleans up abandoned locks.
+        """
+        while not self._cleanup_stop_event.is_set():
+            try:
+                self._cleanup_abandoned_locks()
+            except Exception:
+                logger.exception("Error during background cleanup")
+
+            # Use wait() instead of sleep() to allow for immediate shutdown
+            if self._cleanup_stop_event.wait(timeout=self.cleanup_interval):
+                break  # Stop event was set
+
+    def cleanup(self) -> None:
+        """
+        Public method to manually trigger cleanup of abandoned locks.
+
+        This can be called periodically by external systems to prevent
+        memory leaks from accumulating locks.
+        """
+        self._cleanup_abandoned_locks()
+
+    def _cleanup_abandoned_locks(self) -> None:
+        """
+        Remove locks for engines and tunnels that no longer exist.
+
+        This prevents memory leaks from accumulating locks in defaultdict
+        when engines/tunnels are disposed outside of normal cleanup paths.
+        """
+        # Clean up engine locks
+        active_engine_keys = set(self._engines.keys())
+        abandoned_engine_locks = set(self._engine_locks.keys()) - 
active_engine_keys
+        for key in abandoned_engine_locks:
+            self._engine_locks.pop(key, None)
+
+        if abandoned_engine_locks:
+            logger.debug(
+                f"Cleaned up {len(abandoned_engine_locks)} abandoned engine 
locks"
+            )
+
+        # Clean up tunnel locks
+        active_tunnel_keys = set(self._tunnels.keys())
+        abandoned_tunnel_locks = set(self._tunnel_locks.keys()) - 
active_tunnel_keys
+        for key in abandoned_tunnel_locks:
+            self._tunnel_locks.pop(key, None)
+
+        if abandoned_tunnel_locks:
+            logger.debug(
+                f"Cleaned up {len(abandoned_tunnel_locks)} abandoned tunnel 
locks"
+            )
+
     def _add_disposal_listener(self, engine: Engine, engine_key: EngineKey) -> 
None:
         @event.listens_for(engine, "engine_disposed")
         def on_engine_disposed(engine_instance: Engine) -> None:

Reply via email to