This is an automated email from the ASF dual-hosted git repository.
rusackas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/superset.git
The following commit(s) were added to refs/heads/master by this push:
new 7f54b0b13d0 test(database): regression test for sqla engine creation
(#27897) (#40237)
7f54b0b13d0 is described below
commit 7f54b0b13d0bc8a304a0ffc2284682192127c8a4
Author: Evan Rusackas <[email protected]>
AuthorDate: Fri May 29 21:47:49 2026 -0700
test(database): regression test for sqla engine creation (#27897) (#40237)
Co-authored-by: Claude <[email protected]>
Co-authored-by: Claude Sonnet 4.6 <[email protected]>
---
superset/models/core.py | 71 +++++++++++++++++++++++++--
tests/unit_tests/models/core_test.py | 94 ++++++++++++++++++++++++++++++++++++
2 files changed, 162 insertions(+), 3 deletions(-)
diff --git a/superset/models/core.py b/superset/models/core.py
index 1f99630ab3d..0d22bd9b29b 100755
--- a/superset/models/core.py
+++ b/superset/models/core.py
@@ -24,6 +24,7 @@ from __future__ import annotations
import builtins
import logging
import textwrap
+import threading
from ast import literal_eval
from contextlib import closing, contextmanager, nullcontext, suppress
from copy import deepcopy
@@ -56,7 +57,7 @@ from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.url import URL
from sqlalchemy.exc import NoSuchModuleError
from sqlalchemy.ext.hybrid import hybrid_property
-from sqlalchemy.orm import relationship
+from sqlalchemy.orm import Mapper, relationship
from sqlalchemy.pool import NullPool
from sqlalchemy.schema import UniqueConstraint
from sqlalchemy.sql import ColumnElement, expression, Select
@@ -94,6 +95,15 @@ from superset.utils.oauth2 import (
metadata = Model.metadata # pylint: disable=no-member
logger = logging.getLogger(__name__)
+# Per-process SQLAlchemy engine cache (#27897). Key is
+# (database_id, str(sqlalchemy_url), repr(sorted(engine_kwargs.items()))).
+# Lock-guarded against the gunicorn-threaded check-then-set race on first
+# access. Cache is per-process, per-(URL + final engine_kwargs), so a
+# password rotation, host change, or DB_CONNECTION_MUTATOR producing
+# different kwargs naturally falls through to a fresh engine.
+_ENGINE_CACHE: dict[tuple[int, str, str], Engine] = {}
+_ENGINE_CACHE_LOCK = threading.Lock()
+
if TYPE_CHECKING:
from superset_core.queries.types import AsyncQueryHandle, QueryOptions,
QueryResult
@@ -495,7 +505,12 @@ class Database(CoreDatabase, AuditMixinNullable,
ImportExportMixin): # pylint:
cursor.close()
sqla.event.listen(engine, "connect", run_prequeries)
- yield engine
+ try:
+ yield engine
+ finally:
+ sqla.event.remove(engine, "connect",
run_prequeries)
+ else:
+ yield engine
def _get_sqla_engine( # pylint: disable=too-many-locals # noqa: C901
self,
@@ -565,10 +580,36 @@ class Database(CoreDatabase, AuditMixinNullable,
ImportExportMixin): # pylint:
security_manager,
source,
)
+ # Per-process engine cache (#27897). SQLAlchemy expects
``create_engine``
+ # to be called once per process per URL so its connection pool can do
+ # its job. Recreating the engine every call defeats the pool that
+ # operators configure via ``DB_CONNECTION_MUTATOR`` (e.g. duckdb with a
+ # size-1 queue). Cache regardless of ``nullpool``: even a NullPool
+ # engine has nontrivial construction cost (URL parsing, dialect
+ # resolution, connect_args setup, and re-running the mutator), and
+ # production callsites pass ``nullpool=True`` by default — gating the
+ # cache on ``not nullpool`` would leave it dormant everywhere it
+ # actually matters. Unsaved instances (``self.id is None``) are
+ # excluded so two distinct in-memory ``Database`` objects with the
+ # same URI can't collide on a shared cache entry.
+ cache_key: tuple[int, str, str] | None = None
+ if self.id is not None:
+ cache_key = (
+ self.id,
+ str(sqlalchemy_url),
+ repr(sorted(engine_kwargs.items())),
+ )
+ with _ENGINE_CACHE_LOCK:
+ if cached := _ENGINE_CACHE.get(cache_key):
+ return cached
try:
- return create_engine(sqlalchemy_url, **engine_kwargs)
+ engine = create_engine(sqlalchemy_url, **engine_kwargs)
except Exception as ex:
raise self.db_engine_spec.get_dbapi_mapped_exception(ex) from ex
+ if cache_key is not None:
+ with _ENGINE_CACHE_LOCK:
+ _ENGINE_CACHE[cache_key] = engine
+ return engine
def add_database_to_signature(
self,
@@ -1343,6 +1384,30 @@ sqla.event.listen(Database, "after_update",
security_manager.database_after_upda
sqla.event.listen(Database, "after_delete",
security_manager.database_after_delete)
+def _evict_engine_cache(
+ mapper: Mapper,
+ connection: Connection,
+ target: "Database",
+) -> None:
+ """Evict all cached engines for a database when it is updated or deleted.
+
+ URL/kwargs changes already produce a new cache key, so stale engines are
+ never served to callers. This eviction step is purely to reclaim memory:
+ without it, old engines for a renamed host or rotated password would linger
+ in _ENGINE_CACHE until the process restarted.
+ """
+ if target.id is None:
+ return
+ with _ENGINE_CACHE_LOCK:
+ stale = [k for k in _ENGINE_CACHE if k[0] == target.id]
+ for k in stale:
+ _ENGINE_CACHE.pop(k, None)
+
+
+sqla.event.listen(Database, "after_update", _evict_engine_cache)
+sqla.event.listen(Database, "after_delete", _evict_engine_cache)
+
+
class DatabaseUserOAuth2Tokens(Model, AuditMixinNullable):
"""
Store OAuth2 tokens, for authenticating to DBs using user personal tokens.
diff --git a/tests/unit_tests/models/core_test.py
b/tests/unit_tests/models/core_test.py
index 31df840e445..8d15b23077b 100644
--- a/tests/unit_tests/models/core_test.py
+++ b/tests/unit_tests/models/core_test.py
@@ -536,6 +536,97 @@ def test_get_sqla_engine(mocker: MockerFixture) -> None:
)
+def test_get_sqla_engine_caches_engine_per_url(mocker: MockerFixture) -> None:
+ """
+ Regression for #27897: a single SQLAlchemy ``Engine`` should be created per
+ process/URL, not on every ``_get_sqla_engine`` call.
+
+ Per the SQLAlchemy docs
(https://docs.sqlalchemy.org/en/20/core/connections.html),
+ the engine is meant to be created once and reused so its connection pool
+ can do its job. Calling ``create_engine`` repeatedly defeats pooling, so
+ user-configured pools (e.g. via ``DB_CONNECTION_MUTATOR``) never persist
+ state between requests.
+
+ Exercises the production default path (``nullpool=True``) — every
+ in-tree callsite uses it — so the assertion would have caught a fix
+ that only engaged under ``nullpool=False``.
+ """
+ from superset.models.core import _ENGINE_CACHE, Database
+
+ # Clear the process-wide cache so prior tests don't poison this assertion.
+ _ENGINE_CACHE.clear()
+
+ mocker.patch(
+ "superset.models.core.security_manager.find_user",
+ return_value=None,
+ )
+ create_engine = mocker.patch("superset.models.core.create_engine")
+
+ database = Database(database_name="my_db", sqlalchemy_uri="trino://")
+ database.id = 1 # Cache is keyed on id; skipped for unsaved instances.
+ database._get_sqla_engine()
+ database._get_sqla_engine()
+
+ assert create_engine.call_count == 1, (
+ "Database._get_sqla_engine should reuse the engine for the same URL "
+ f"(create_engine called {create_engine.call_count} times)"
+ )
+
+
+def test_get_sqla_engine_does_not_cache_unsaved_instances(
+ mocker: MockerFixture,
+) -> None:
+ """
+ Two distinct unsaved ``Database`` instances (``id is None``) with the
+ same URI must not share a cache entry — they're different in-memory
+ objects and may have diverging config that isn't yet persisted.
+ """
+ from superset.models.core import _ENGINE_CACHE, Database
+
+ _ENGINE_CACHE.clear()
+ mocker.patch(
+ "superset.models.core.security_manager.find_user",
+ return_value=None,
+ )
+ create_engine = mocker.patch("superset.models.core.create_engine")
+
+ Database(database_name="db_a",
sqlalchemy_uri="trino://")._get_sqla_engine()
+ Database(database_name="db_b",
sqlalchemy_uri="trino://")._get_sqla_engine()
+
+ assert create_engine.call_count == 2
+ assert _ENGINE_CACHE == {}
+
+
+def test_engine_cache_evicted_on_update_and_delete(mocker: MockerFixture) ->
None:
+ """
+ Regression for #27897: engines cached for a database must be evicted when
+ that database is updated or deleted so that stale connections (old
password,
+ old host, old SSH tunnel) do not linger in memory across config changes.
+ """
+ from unittest.mock import MagicMock
+
+ from superset.models.core import (
+ _ENGINE_CACHE,
+ _ENGINE_CACHE_LOCK,
+ _evict_engine_cache,
+ )
+
+ # Seed the cache with two entries for database id=1 and one for id=2.
+ with _ENGINE_CACHE_LOCK:
+ _ENGINE_CACHE.clear()
+ _ENGINE_CACHE[(1, "postgresql://old-host/db", "")] = MagicMock()
+ _ENGINE_CACHE[(1, "postgresql://new-host/db", "")] = MagicMock()
+ _ENGINE_CACHE[(2, "postgresql://other/db", "")] = MagicMock()
+
+ db_instance = MagicMock()
+ db_instance.id = 1
+ _evict_engine_cache(mapper=None, connection=None, target=db_instance)
+
+ # Both id=1 entries gone; id=2 entry untouched.
+ assert not any(k[0] == 1 for k in _ENGINE_CACHE)
+ assert any(k[0] == 2 for k in _ENGINE_CACHE)
+
+
def test_get_sqla_engine_user_impersonation(mocker: MockerFixture) -> None:
"""
Test user impersonation in `_get_sqla_engine`.
@@ -637,6 +728,7 @@ def test_get_sqla_engine_registers_prequery_event_listener(
db_engine_spec = mocker.patch.object(Database, "db_engine_spec")
db_engine_spec.get_prequeries.return_value = ['SET search_path =
"my_schema"']
event_listen = mocker.patch("superset.models.core.sqla.event.listen")
+ mocker.patch("superset.models.core.sqla.event.remove")
database = Database(database_name="my_db", sqlalchemy_uri="postgresql://")
with database.get_sqla_engine(catalog="my_catalog", schema="my_schema"):
@@ -671,6 +763,7 @@ def
test_get_sqla_engine_prequery_cursor_closed_on_exception(
db_engine_spec = mocker.patch.object(Database, "db_engine_spec")
db_engine_spec.get_prequeries.return_value = ['SET search_path =
"bad_schema"']
event_listen = mocker.patch("superset.models.core.sqla.event.listen")
+ mocker.patch("superset.models.core.sqla.event.remove")
database = Database(database_name="my_db", sqlalchemy_uri="postgresql://")
with database.get_sqla_engine(catalog=None, schema="bad_schema"):
@@ -734,6 +827,7 @@ def
test_get_raw_connection_executes_prequeries_exactly_once(
original_listen.side_effect = lambda engine, event, fn:
captured_listeners.append(
fn
)
+ mocker.patch("superset.models.core.sqla.event.remove")
# Simulate SQLAlchemy firing the "connect" event when raw_connection() is
called.
mock_dbapi_conn = mocker.MagicMock()