This is an automated email from the ASF dual-hosted git repository.

rusackas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f54b0b13d0 test(database): regression test for sqla engine creation 
(#27897) (#40237)
7f54b0b13d0 is described below

commit 7f54b0b13d0bc8a304a0ffc2284682192127c8a4
Author: Evan Rusackas <[email protected]>
AuthorDate: Fri May 29 21:47:49 2026 -0700

    test(database): regression test for sqla engine creation (#27897) (#40237)
    
    Co-authored-by: Claude <[email protected]>
    Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
 superset/models/core.py              | 71 +++++++++++++++++++++++++--
 tests/unit_tests/models/core_test.py | 94 ++++++++++++++++++++++++++++++++++++
 2 files changed, 162 insertions(+), 3 deletions(-)

diff --git a/superset/models/core.py b/superset/models/core.py
index 1f99630ab3d..0d22bd9b29b 100755
--- a/superset/models/core.py
+++ b/superset/models/core.py
@@ -24,6 +24,7 @@ from __future__ import annotations
 import builtins
 import logging
 import textwrap
+import threading
 from ast import literal_eval
 from contextlib import closing, contextmanager, nullcontext, suppress
 from copy import deepcopy
@@ -56,7 +57,7 @@ from sqlalchemy.engine.reflection import Inspector
 from sqlalchemy.engine.url import URL
 from sqlalchemy.exc import NoSuchModuleError
 from sqlalchemy.ext.hybrid import hybrid_property
-from sqlalchemy.orm import relationship
+from sqlalchemy.orm import Mapper, relationship
 from sqlalchemy.pool import NullPool
 from sqlalchemy.schema import UniqueConstraint
 from sqlalchemy.sql import ColumnElement, expression, Select
@@ -94,6 +95,15 @@ from superset.utils.oauth2 import (
 metadata = Model.metadata  # pylint: disable=no-member
 logger = logging.getLogger(__name__)
 
+# Per-process SQLAlchemy engine cache (#27897). Key is
+# (database_id, str(sqlalchemy_url), repr(sorted(engine_kwargs.items()))).
+# Lock-guarded against the gunicorn-threaded check-then-set race on first
+# access. Cache is per-process, per-(URL + final engine_kwargs), so a
+# password rotation, host change, or DB_CONNECTION_MUTATOR producing
+# different kwargs naturally falls through to a fresh engine.
+_ENGINE_CACHE: dict[tuple[int, str, str], Engine] = {}
+_ENGINE_CACHE_LOCK = threading.Lock()
+
 if TYPE_CHECKING:
     from superset_core.queries.types import AsyncQueryHandle, QueryOptions, 
QueryResult
 
@@ -495,7 +505,12 @@ class Database(CoreDatabase, AuditMixinNullable, 
ImportExportMixin):  # pylint:
                                 cursor.close()
 
                         sqla.event.listen(engine, "connect", run_prequeries)
-                    yield engine
+                        try:
+                            yield engine
+                        finally:
+                            sqla.event.remove(engine, "connect", 
run_prequeries)
+                    else:
+                        yield engine
 
     def _get_sqla_engine(  # pylint: disable=too-many-locals  # noqa: C901
         self,
@@ -565,10 +580,36 @@ class Database(CoreDatabase, AuditMixinNullable, 
ImportExportMixin):  # pylint:
                 security_manager,
                 source,
             )
+        # Per-process engine cache (#27897). SQLAlchemy expects 
``create_engine``
+        # to be called once per process per URL so its connection pool can do
+        # its job. Recreating the engine every call defeats the pool that
+        # operators configure via ``DB_CONNECTION_MUTATOR`` (e.g. duckdb with a
+        # size-1 queue). Cache regardless of ``nullpool``: even a NullPool
+        # engine has nontrivial construction cost (URL parsing, dialect
+        # resolution, connect_args setup, and re-running the mutator), and
+        # production callsites pass ``nullpool=True`` by default — gating the
+        # cache on ``not nullpool`` would leave it dormant everywhere it
+        # actually matters. Unsaved instances (``self.id is None``) are
+        # excluded so two distinct in-memory ``Database`` objects with the
+        # same URI can't collide on a shared cache entry.
+        cache_key: tuple[int, str, str] | None = None
+        if self.id is not None:
+            cache_key = (
+                self.id,
+                str(sqlalchemy_url),
+                repr(sorted(engine_kwargs.items())),
+            )
+            with _ENGINE_CACHE_LOCK:
+                if cached := _ENGINE_CACHE.get(cache_key):
+                    return cached
         try:
-            return create_engine(sqlalchemy_url, **engine_kwargs)
+            engine = create_engine(sqlalchemy_url, **engine_kwargs)
         except Exception as ex:
             raise self.db_engine_spec.get_dbapi_mapped_exception(ex) from ex
+        if cache_key is not None:
+            with _ENGINE_CACHE_LOCK:
+                _ENGINE_CACHE[cache_key] = engine
+        return engine
 
     def add_database_to_signature(
         self,
@@ -1343,6 +1384,30 @@ sqla.event.listen(Database, "after_update", 
security_manager.database_after_upda
 sqla.event.listen(Database, "after_delete", 
security_manager.database_after_delete)
 
 
+def _evict_engine_cache(
+    mapper: Mapper,
+    connection: Connection,
+    target: "Database",
+) -> None:
+    """Evict all cached engines for a database when it is updated or deleted.
+
+    URL/kwargs changes already produce a new cache key, so stale engines are
+    never served to callers.  This eviction step is purely to reclaim memory:
+    without it, old engines for a renamed host or rotated password would linger
+    in _ENGINE_CACHE until the process restarted.
+    """
+    if target.id is None:
+        return
+    with _ENGINE_CACHE_LOCK:
+        stale = [k for k in _ENGINE_CACHE if k[0] == target.id]
+        for k in stale:
+            _ENGINE_CACHE.pop(k, None)
+
+
+sqla.event.listen(Database, "after_update", _evict_engine_cache)
+sqla.event.listen(Database, "after_delete", _evict_engine_cache)
+
+
 class DatabaseUserOAuth2Tokens(Model, AuditMixinNullable):
     """
     Store OAuth2 tokens, for authenticating to DBs using user personal tokens.
diff --git a/tests/unit_tests/models/core_test.py 
b/tests/unit_tests/models/core_test.py
index 31df840e445..8d15b23077b 100644
--- a/tests/unit_tests/models/core_test.py
+++ b/tests/unit_tests/models/core_test.py
@@ -536,6 +536,97 @@ def test_get_sqla_engine(mocker: MockerFixture) -> None:
     )
 
 
+def test_get_sqla_engine_caches_engine_per_url(mocker: MockerFixture) -> None:
+    """
+    Regression for #27897: a single SQLAlchemy ``Engine`` should be created per
+    process/URL, not on every ``_get_sqla_engine`` call.
+
+    Per the SQLAlchemy docs 
(https://docs.sqlalchemy.org/en/20/core/connections.html),
+    the engine is meant to be created once and reused so its connection pool
+    can do its job. Calling ``create_engine`` repeatedly defeats pooling, so
+    user-configured pools (e.g. via ``DB_CONNECTION_MUTATOR``) never persist
+    state between requests.
+
+    Exercises the production default path (``nullpool=True``) — every
+    in-tree callsite uses it — so the assertion would have caught a fix
+    that only engaged under ``nullpool=False``.
+    """
+    from superset.models.core import _ENGINE_CACHE, Database
+
+    # Clear the process-wide cache so prior tests don't poison this assertion.
+    _ENGINE_CACHE.clear()
+
+    mocker.patch(
+        "superset.models.core.security_manager.find_user",
+        return_value=None,
+    )
+    create_engine = mocker.patch("superset.models.core.create_engine")
+
+    database = Database(database_name="my_db", sqlalchemy_uri="trino://")
+    database.id = 1  # Cache is keyed on id; skipped for unsaved instances.
+    database._get_sqla_engine()
+    database._get_sqla_engine()
+
+    assert create_engine.call_count == 1, (
+        "Database._get_sqla_engine should reuse the engine for the same URL "
+        f"(create_engine called {create_engine.call_count} times)"
+    )
+
+
+def test_get_sqla_engine_does_not_cache_unsaved_instances(
+    mocker: MockerFixture,
+) -> None:
+    """
+    Two distinct unsaved ``Database`` instances (``id is None``) with the
+    same URI must not share a cache entry — they're different in-memory
+    objects and may have diverging config that isn't yet persisted.
+    """
+    from superset.models.core import _ENGINE_CACHE, Database
+
+    _ENGINE_CACHE.clear()
+    mocker.patch(
+        "superset.models.core.security_manager.find_user",
+        return_value=None,
+    )
+    create_engine = mocker.patch("superset.models.core.create_engine")
+
+    Database(database_name="db_a", 
sqlalchemy_uri="trino://")._get_sqla_engine()
+    Database(database_name="db_b", 
sqlalchemy_uri="trino://")._get_sqla_engine()
+
+    assert create_engine.call_count == 2
+    assert _ENGINE_CACHE == {}
+
+
+def test_engine_cache_evicted_on_update_and_delete(mocker: MockerFixture) -> 
None:
+    """
+    Regression for #27897: engines cached for a database must be evicted when
+    that database is updated or deleted so that stale connections (old 
password,
+    old host, old SSH tunnel) do not linger in memory across config changes.
+    """
+    from unittest.mock import MagicMock
+
+    from superset.models.core import (
+        _ENGINE_CACHE,
+        _ENGINE_CACHE_LOCK,
+        _evict_engine_cache,
+    )
+
+    # Seed the cache with two entries for database id=1 and one for id=2.
+    with _ENGINE_CACHE_LOCK:
+        _ENGINE_CACHE.clear()
+        _ENGINE_CACHE[(1, "postgresql://old-host/db", "")] = MagicMock()
+        _ENGINE_CACHE[(1, "postgresql://new-host/db", "")] = MagicMock()
+        _ENGINE_CACHE[(2, "postgresql://other/db", "")] = MagicMock()
+
+    db_instance = MagicMock()
+    db_instance.id = 1
+    _evict_engine_cache(mapper=None, connection=None, target=db_instance)
+
+    # Both id=1 entries gone; id=2 entry untouched.
+    assert not any(k[0] == 1 for k in _ENGINE_CACHE)
+    assert any(k[0] == 2 for k in _ENGINE_CACHE)
+
+
 def test_get_sqla_engine_user_impersonation(mocker: MockerFixture) -> None:
     """
     Test user impersonation in `_get_sqla_engine`.
@@ -637,6 +728,7 @@ def test_get_sqla_engine_registers_prequery_event_listener(
     db_engine_spec = mocker.patch.object(Database, "db_engine_spec")
     db_engine_spec.get_prequeries.return_value = ['SET search_path = 
"my_schema"']
     event_listen = mocker.patch("superset.models.core.sqla.event.listen")
+    mocker.patch("superset.models.core.sqla.event.remove")
 
     database = Database(database_name="my_db", sqlalchemy_uri="postgresql://")
     with database.get_sqla_engine(catalog="my_catalog", schema="my_schema"):
@@ -671,6 +763,7 @@ def 
test_get_sqla_engine_prequery_cursor_closed_on_exception(
     db_engine_spec = mocker.patch.object(Database, "db_engine_spec")
     db_engine_spec.get_prequeries.return_value = ['SET search_path = 
"bad_schema"']
     event_listen = mocker.patch("superset.models.core.sqla.event.listen")
+    mocker.patch("superset.models.core.sqla.event.remove")
 
     database = Database(database_name="my_db", sqlalchemy_uri="postgresql://")
     with database.get_sqla_engine(catalog=None, schema="bad_schema"):
@@ -734,6 +827,7 @@ def 
test_get_raw_connection_executes_prequeries_exactly_once(
     original_listen.side_effect = lambda engine, event, fn: 
captured_listeners.append(
         fn
     )
+    mocker.patch("superset.models.core.sqla.event.remove")
 
     # Simulate SQLAlchemy firing the "connect" event when raw_connection() is 
called.
     mock_dbapi_conn = mocker.MagicMock()

Reply via email to