This is an automated email from the ASF dual-hosted git repository. beto pushed a commit to branch engine-manager in repository https://gitbox.apache.org/repos/asf/superset.git
commit e82e06891b0f7a6b88df2b9508d628e1a0c3635f Author: Beto Dealmeida <[email protected]> AuthorDate: Tue Jul 29 20:07:14 2025 -0400 Cleanup locks --- superset/engines/manager.py | 115 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 114 insertions(+), 1 deletion(-) diff --git a/superset/engines/manager.py b/superset/engines/manager.py index 04ebe48b0b7..01eae105635 100644 --- a/superset/engines/manager.py +++ b/superset/engines/manager.py @@ -71,8 +71,13 @@ class EngineManager: engines, as well as configuring the pool through the database settings. """ - def __init__(self, mode: EngineModes = EngineModes.NEW) -> None: + def __init__( + self, + mode: EngineModes = EngineModes.NEW, + cleanup_interval: float = 300.0, # 5 minutes default + ) -> None: self.mode = mode + self.cleanup_interval = cleanup_interval self._engines: dict[EngineKey, Engine] = {} self._engine_locks: dict[EngineKey, threading.Lock] = defaultdict( @@ -84,6 +89,25 @@ class EngineManager: threading.Lock ) + # Background cleanup thread management + self._cleanup_thread: threading.Thread | None = None + self._cleanup_stop_event = threading.Event() + self._cleanup_thread_lock = threading.Lock() + + def __del__(self) -> None: + """ + Ensure cleanup thread is stopped when the manager is destroyed. + """ + try: + self.stop_cleanup_thread() + except Exception as ex: + # Avoid exceptions during garbage collection, but log if possible + try: + logger.warning(f"Error stopping cleanup thread: {ex}") + except Exception: # noqa: S110 + # If logging fails during destruction, we can't do anything + pass + @contextmanager def get_engine( self, @@ -387,6 +411,95 @@ class EngineManager: return kwargs + def start_cleanup_thread(self) -> None: + """ + Start the background cleanup thread. + + The thread will periodically clean up abandoned locks at the configured + interval. This is safe to call multiple times - subsequent calls are no-ops. + """ + with self._cleanup_thread_lock: + if self._cleanup_thread is None or not self._cleanup_thread.is_alive(): + self._cleanup_stop_event.clear() + self._cleanup_thread = threading.Thread( + target=self._cleanup_worker, + name=f"EngineManager-cleanup-{id(self)}", + daemon=True, + ) + self._cleanup_thread.start() + logger.info( + f"Started cleanup thread with {self.cleanup_interval}s interval" + ) + + def stop_cleanup_thread(self) -> None: + """ + Stop the background cleanup thread gracefully. + + This will signal the thread to stop and wait for it to finish. + Safe to call even if no thread is running. + """ + with self._cleanup_thread_lock: + if self._cleanup_thread is not None and self._cleanup_thread.is_alive(): + self._cleanup_stop_event.set() + self._cleanup_thread.join(timeout=5.0) # 5 second timeout + if self._cleanup_thread.is_alive(): + logger.warning("Cleanup thread did not stop within timeout") + else: + logger.info("Cleanup thread stopped") + self._cleanup_thread = None + + def _cleanup_worker(self) -> None: + """ + Background thread worker that periodically cleans up abandoned locks. + """ + while not self._cleanup_stop_event.is_set(): + try: + self._cleanup_abandoned_locks() + except Exception: + logger.exception("Error during background cleanup") + + # Use wait() instead of sleep() to allow for immediate shutdown + if self._cleanup_stop_event.wait(timeout=self.cleanup_interval): + break # Stop event was set + + def cleanup(self) -> None: + """ + Public method to manually trigger cleanup of abandoned locks. + + This can be called periodically by external systems to prevent + memory leaks from accumulating locks. + """ + self._cleanup_abandoned_locks() + + def _cleanup_abandoned_locks(self) -> None: + """ + Remove locks for engines and tunnels that no longer exist. + + This prevents memory leaks from accumulating locks in defaultdict + when engines/tunnels are disposed outside of normal cleanup paths. + """ + # Clean up engine locks + active_engine_keys = set(self._engines.keys()) + abandoned_engine_locks = set(self._engine_locks.keys()) - active_engine_keys + for key in abandoned_engine_locks: + self._engine_locks.pop(key, None) + + if abandoned_engine_locks: + logger.debug( + f"Cleaned up {len(abandoned_engine_locks)} abandoned engine locks" + ) + + # Clean up tunnel locks + active_tunnel_keys = set(self._tunnels.keys()) + abandoned_tunnel_locks = set(self._tunnel_locks.keys()) - active_tunnel_keys + for key in abandoned_tunnel_locks: + self._tunnel_locks.pop(key, None) + + if abandoned_tunnel_locks: + logger.debug( + f"Cleaned up {len(abandoned_tunnel_locks)} abandoned tunnel locks" + ) + def _add_disposal_listener(self, engine: Engine, engine_key: EngineKey) -> None: @event.listens_for(engine, "engine_disposed") def on_engine_disposed(engine_instance: Engine) -> None:
