This is an automated email from the ASF dual-hosted git repository.

aminghadersohi pushed a commit to branch work-pr-39604
in repository https://gitbox.apache.org/repos/asf/superset.git

commit 0bc8c8de268f53d436edc2e624ee1bfa45e2cf1e
Author: Amin Ghadersohi <[email protected]>
AuthorDate: Fri May 8 14:26:05 2026 -0400

    fix(mcp): validate API keys via FastMCP AccessToken and lock down ApiKey 
perms
    
    Three independent bugs let MCP requests presenting Bearer tokens with the
    sst_ prefix authenticate as MCP_DEV_USERNAME without any validation under
    streamable-http:
    
    1. _resolve_user_from_api_key read the token from flask.request.headers,
       but the streamable-http transport never pushes a Flask request context
       — has_request_context() was always False, so the function returned
       None before validating, falling through to the dev-user fallback.
       Now reads the token from FastMCP's per-request AccessToken (which the
       CompositeTokenVerifier already populated) and fails closed when the
       key is invalid.
    
    2. CompositeTokenVerifier was only installed when MCP_AUTH_ENABLED=True.
       With FAB_API_KEY_ENABLED=True alone, no transport-level verifier
       existed at all. The factory now builds an API-key-only verifier in
       that case (jwt_verifier=None) that rejects non-API-key Bearer tokens
       at the transport instead of silently accepting them.
    
    3. The pass-through AccessToken was minted with scopes=[], which would
       make FastMCP's RequireAuthMiddleware 403 every API-key request when
       MCP_REQUIRED_SCOPES is non-empty. Pass-through now propagates
       self.required_scopes.
    
    Also addresses Daniel's review comment on superset/security/manager.py:
    adds "ApiKey" to ADMIN_ONLY_VIEW_MENUS so the FAB ApiKeyApi PVMs are
    gated to Admin instead of leaking to Alpha and Gamma.
    
    Renames the pass-through claim from _api_key_passthrough to the
    namespaced _superset_mcp_api_key_passthrough (exported as
    API_KEY_PASSTHROUGH_CLAIM) so a custom claim from an external IdP can't
    accidentally divert a JWT into the API-key validation path.
    
    Tests updated to mock get_access_token instead of app.test_request_context
    (the simulated Flask context was the reason the prior tests passed while
    production failed). New tests cover API-key-only verifier mode, scope
    propagation on pass-through, and the namespaced-claim isolation.
---
 superset/mcp_service/auth.py                       |  61 +++--
 superset/mcp_service/composite_token_verifier.py   |  36 ++-
 superset/mcp_service/mcp_config.py                 | 141 +++++++-----
 superset/mcp_service/server.py                     |   8 +-
 superset/security/manager.py                       |   4 +
 tests/unit_tests/mcp_service/test_auth_api_key.py  | 253 +++++++++++++--------
 .../mcp_service/test_composite_token_verifier.py   |  53 ++++-
 7 files changed, 365 insertions(+), 191 deletions(-)

diff --git a/superset/mcp_service/auth.py b/superset/mcp_service/auth.py
index 5320821d88a..04b2e802a66 100644
--- a/superset/mcp_service/auth.py
+++ b/superset/mcp_service/auth.py
@@ -288,8 +288,12 @@ def _resolve_user_from_jwt_context(app: Any) -> User | 
None:
     # API key pass-through: CompositeTokenVerifier accepted this token
     # at the transport layer but defers actual validation to
     # _resolve_user_from_api_key() (priority 2 in get_user_from_request).
+    from superset.mcp_service.composite_token_verifier import (
+        API_KEY_PASSTHROUGH_CLAIM,
+    )
+
     claims = getattr(access_token, "claims", None)
-    if isinstance(claims, dict) and claims.get("_api_key_passthrough"):
+    if isinstance(claims, dict) and claims.get(API_KEY_PASSTHROUGH_CLAIM):
         logger.debug("API key pass-through token detected, deferring to API 
key auth")
         return None
 
@@ -323,37 +327,53 @@ def _resolve_user_from_jwt_context(app: Any) -> User | 
None:
 
 def _resolve_user_from_api_key(app: Any) -> User | None:
     """
-    Resolve the current user from an API key in the Authorization header.
+    Resolve the current user from an API key passed via Bearer token.
 
-    Uses FAB SecurityManager's API key validation. Only attempts when
-    FAB_API_KEY_ENABLED is True and a request context is active.
+    Reads the token from FastMCP's per-request ``AccessToken`` (set by
+    ``CompositeTokenVerifier`` when a Bearer token matches an API key
+    prefix). The streamable-http transport does not push a Flask request
+    context, so we cannot rely on ``flask.request`` headers — the verifier
+    already saw the token and stashed it on the ``AccessToken``.
 
     Returns:
-        User object with relationships loaded, or None if no API key present
-        or API key auth is not enabled/available.
+        User object with relationships loaded, or None if no API key
+        pass-through token is present or API key auth is not enabled.
 
     Raises:
-        PermissionError: If an API key is present but invalid/expired,
-            or if validation is not available in this FAB version.
+        PermissionError: If an API key pass-through token is present but
+            invalid/expired (fail closed — do NOT fall through to weaker
+            auth sources like ``MCP_DEV_USERNAME``), or if validation is
+            not available in this FAB version.
     """
-    if not app.config.get("FAB_API_KEY_ENABLED", False) or not 
has_request_context():
+    if not app.config.get("FAB_API_KEY_ENABLED", False):
         return None
 
-    sm = app.appbuilder.sm
-    # extract_api_key_from_request is FAB's method for reading
-    # the Bearer token from the Authorization header and matching prefixes.
-    # Not all FAB versions include this method, so guard with hasattr.
-    if not hasattr(sm, "extract_api_key_from_request"):
-        logger.debug(
-            "FAB SecurityManager does not have extract_api_key_from_request; "
-            "API key authentication is not available in this FAB version"
-        )
+    try:
+        from fastmcp.server.dependencies import get_access_token
+    except ImportError:
+        logger.debug("fastmcp.server.dependencies not available, skipping API 
key auth")
+        return None
+
+    access_token = get_access_token()
+    if access_token is None:
         return None
 
-    api_key_string = sm.extract_api_key_from_request()
-    if api_key_string is None:
+    # Only validate tokens that the CompositeTokenVerifier flagged as
+    # API key pass-throughs. Plain JWTs were already validated by the JWT
+    # verifier and resolved in _resolve_user_from_jwt_context.
+    from superset.mcp_service.composite_token_verifier import (
+        API_KEY_PASSTHROUGH_CLAIM,
+    )
+
+    claims = getattr(access_token, "claims", None)
+    if not (isinstance(claims, dict) and 
claims.get(API_KEY_PASSTHROUGH_CLAIM)):
+        return None
+
+    api_key_string = getattr(access_token, "token", None)
+    if not api_key_string:
         return None
 
+    sm = app.appbuilder.sm
     if not hasattr(sm, "validate_api_key"):
         logger.warning(
             "FAB SecurityManager does not have validate_api_key; "
@@ -535,7 +555,6 @@ def _setup_user_context() -> User | None:
     # tool calls when no per-request middleware refreshes it.
     # Only clear in app-context-only mode; preserve g.user when
     # a request context is active (external middleware set it).
-    from flask import has_request_context
 
     if not has_request_context():
         g.pop("user", None)
diff --git a/superset/mcp_service/composite_token_verifier.py 
b/superset/mcp_service/composite_token_verifier.py
index e4f5c6a5c43..e404e364373 100644
--- a/superset/mcp_service/composite_token_verifier.py
+++ b/superset/mcp_service/composite_token_verifier.py
@@ -22,7 +22,9 @@ Routes Bearer tokens to the appropriate verifier based on 
prefix:
 - Tokens matching FAB_API_KEY_PREFIXES (e.g. ``sst_``) are passed through
   to the Flask layer where ``_resolve_user_from_api_key()`` handles
   actual validation via FAB SecurityManager.
-- All other tokens are delegated to the wrapped JWT verifier.
+- All other tokens are delegated to the wrapped JWT verifier (when one is
+  configured); when no JWT verifier is configured, non-API-key tokens are
+  rejected at the transport layer.
 """
 
 import logging
@@ -32,6 +34,12 @@ from fastmcp.server.auth.providers.jwt import TokenVerifier
 
 logger = logging.getLogger(__name__)
 
+# Namespaced claim that flags an AccessToken as an API-key pass-through.
+# Namespacing avoids collision with custom claims an external IdP might
+# happen to mint on a JWT — a plain ``_api_key_passthrough`` claim could
+# be silently misidentified as a Superset API-key request.
+API_KEY_PASSTHROUGH_CLAIM = "_superset_mcp_api_key_passthrough"
+
 
 class CompositeTokenVerifier(TokenVerifier):
     """Routes Bearer tokens between API key pass-through and JWT verification.
@@ -43,18 +51,21 @@ class CompositeTokenVerifier(TokenVerifier):
 
     Args:
         jwt_verifier: The wrapped JWT verifier for non-API-key tokens.
+            When ``None``, only API-key tokens are accepted; all other
+            Bearer tokens are rejected at the transport layer (used when
+            ``MCP_AUTH_ENABLED=False`` but ``FAB_API_KEY_ENABLED=True``).
         api_key_prefixes: List of prefixes that identify API key tokens
             (e.g. ``["sst_"]``).
     """
 
     def __init__(
         self,
-        jwt_verifier: TokenVerifier,
+        jwt_verifier: TokenVerifier | None,
         api_key_prefixes: list[str],
     ) -> None:
         super().__init__(
             base_url=getattr(jwt_verifier, "base_url", None),
-            required_scopes=jwt_verifier.required_scopes,
+            required_scopes=getattr(jwt_verifier, "required_scopes", None) or 
[],
         )
         self._jwt_verifier = jwt_verifier
         self._api_key_prefixes = tuple(api_key_prefixes)
@@ -66,15 +77,28 @@ class CompositeTokenVerifier(TokenVerifier):
         AccessToken with a ``_api_key_passthrough`` claim. The Flask-layer
         ``_resolve_user_from_api_key()`` performs the real validation.
 
-        Otherwise, delegate to the wrapped JWT verifier.
+        Otherwise, delegate to the wrapped JWT verifier when one is
+        configured; if no JWT verifier is configured, reject the token.
         """
         if any(token.startswith(prefix) for prefix in self._api_key_prefixes):
             logger.debug("API key token detected (prefix match), passing 
through")
+            # Populate ``scopes`` from ``self.required_scopes`` so FastMCP's
+            # ``RequireAuthMiddleware`` (transport-layer scope check) is
+            # satisfied for API-key requests. Without this, MCP_REQUIRED_SCOPES
+            # being non-empty would 403 every API-key call before
+            # ``_resolve_user_from_api_key`` even runs.
             return AccessToken(
                 token=token,
                 client_id="api_key",
-                scopes=[],
-                claims={"_api_key_passthrough": True},
+                scopes=list(self.required_scopes or []),
+                claims={API_KEY_PASSTHROUGH_CLAIM: True},
+            )
+
+        if self._jwt_verifier is None:
+            logger.debug(
+                "Bearer token does not match any API key prefix and no JWT "
+                "verifier is configured; rejecting"
             )
+            return None
 
         return await self._jwt_verifier.verify_token(token)
diff --git a/superset/mcp_service/mcp_config.py 
b/superset/mcp_service/mcp_config.py
index a0d9ae89336..45b4bbd8eac 100644
--- a/superset/mcp_service/mcp_config.py
+++ b/superset/mcp_service/mcp_config.py
@@ -304,71 +304,96 @@ MCP_TOOL_SEARCH_CONFIG: Dict[str, Any] = {
 
 
 def create_default_mcp_auth_factory(app: Flask) -> Optional[Any]:
-    """Default MCP auth factory using app.config values."""
-    if not app.config.get("MCP_AUTH_ENABLED", False):
-        return None
+    """Default MCP auth factory using app.config values.
 
-    jwks_uri = app.config.get("MCP_JWKS_URI")
-    public_key = app.config.get("MCP_JWT_PUBLIC_KEY")
-    secret = app.config.get("MCP_JWT_SECRET")
+    Returns an auth provider when ``MCP_AUTH_ENABLED=True`` (JWT verifier,
+    optionally wrapped with ``CompositeTokenVerifier`` for API keys) or
+    when only ``FAB_API_KEY_ENABLED=True`` (API-key-only verifier that
+    rejects all non-API-key Bearer tokens at the transport).
+    """
+    auth_enabled = app.config.get("MCP_AUTH_ENABLED", False)
+    api_key_enabled = app.config.get("FAB_API_KEY_ENABLED", False)
 
-    if not (jwks_uri or public_key or secret):
-        logger.warning("MCP_AUTH_ENABLED is True but no JWT keys/secret 
configured")
+    if not (auth_enabled or api_key_enabled):
         return None
 
-    try:
-        debug_errors = app.config.get("MCP_JWT_DEBUG_ERRORS", False)
+    jwt_verifier: Any | None = None
 
-        common_kwargs: dict[str, Any] = {
-            "issuer": app.config.get("MCP_JWT_ISSUER"),
-            "audience": app.config.get("MCP_JWT_AUDIENCE"),
-            "required_scopes": app.config.get("MCP_REQUIRED_SCOPES", []),
-        }
+    if auth_enabled:
+        jwks_uri = app.config.get("MCP_JWKS_URI")
+        public_key = app.config.get("MCP_JWT_PUBLIC_KEY")
+        secret = app.config.get("MCP_JWT_SECRET")
 
-        # For HS256 (symmetric), use the secret as the public_key parameter
-        if app.config.get("MCP_JWT_ALGORITHM") == "HS256" and secret:
-            common_kwargs["public_key"] = secret
-            common_kwargs["algorithm"] = "HS256"
-        else:
-            # For RS256 (asymmetric), use public key or JWKS
-            common_kwargs["jwks_uri"] = jwks_uri
-            common_kwargs["public_key"] = public_key
-            common_kwargs["algorithm"] = app.config.get("MCP_JWT_ALGORITHM", 
"RS256")
-
-        if debug_errors:
-            # DetailedJWTVerifier: detailed server-side logging of JWT
-            # validation failures. HTTP responses are always generic per
-            # RFC 6750 Section 3.1.
-            from superset.mcp_service.jwt_verifier import DetailedJWTVerifier
-
-            auth_provider = DetailedJWTVerifier(**common_kwargs)
+        if not (jwks_uri or public_key or secret):
+            logger.warning("MCP_AUTH_ENABLED is True but no JWT keys/secret 
configured")
+            if not api_key_enabled:
+                return None
         else:
-            # Default JWTVerifier: minimal logging, generic error responses.
-            from fastmcp.server.auth.providers.jwt import JWTVerifier
-
-            auth_provider = JWTVerifier(**common_kwargs)
-
-        # Wrap with CompositeTokenVerifier when API key auth is enabled
-        # so that API key tokens (e.g. sst_...) pass through the transport
-        # layer instead of being rejected by the JWT verifier.
-        if app.config.get("FAB_API_KEY_ENABLED", False):
-            from superset.mcp_service.composite_token_verifier import (
-                CompositeTokenVerifier,
-            )
-
-            api_key_prefixes = app.config.get("FAB_API_KEY_PREFIXES", ["sst_"])
-            auth_provider = CompositeTokenVerifier(
-                jwt_verifier=auth_provider,
-                api_key_prefixes=api_key_prefixes,
-            )
-            logger.info("API key auth enabled for MCP")
-
-        return auth_provider
-    except Exception:
-        # Do not log the exception — it may contain the HS256 secret
-        # from common_kwargs["public_key"]
-        logger.error("Failed to create MCP auth provider")
-        return None
+            try:
+                jwt_verifier = _build_jwt_verifier(
+                    app=app,
+                    jwks_uri=jwks_uri,
+                    public_key=public_key,
+                    secret=secret,
+                )
+            except Exception:
+                # Do not log the exception — it may contain secrets
+                logger.error("Failed to create MCP JWT verifier")
+                if not api_key_enabled:
+                    return None
+
+    if api_key_enabled:
+        from superset.mcp_service.composite_token_verifier import (
+            CompositeTokenVerifier,
+        )
+
+        api_key_prefixes = app.config.get("FAB_API_KEY_PREFIXES", ["sst_"])
+        logger.info("API key auth enabled for MCP")
+        return CompositeTokenVerifier(
+            jwt_verifier=jwt_verifier,
+            api_key_prefixes=api_key_prefixes,
+        )
+
+    return jwt_verifier
+
+
+def _build_jwt_verifier(
+    app: Flask,
+    jwks_uri: Optional[str],
+    public_key: Optional[str],
+    secret: Optional[str],
+) -> Any:
+    """Construct the JWT verifier from configured keys/secret."""
+    debug_errors = app.config.get("MCP_JWT_DEBUG_ERRORS", False)
+
+    common_kwargs: Dict[str, Any] = {
+        "issuer": app.config.get("MCP_JWT_ISSUER"),
+        "audience": app.config.get("MCP_JWT_AUDIENCE"),
+        "required_scopes": app.config.get("MCP_REQUIRED_SCOPES", []),
+    }
+
+    # For HS256 (symmetric), use the secret as the public_key parameter
+    if app.config.get("MCP_JWT_ALGORITHM") == "HS256" and secret:
+        common_kwargs["public_key"] = secret
+        common_kwargs["algorithm"] = "HS256"
+    else:
+        # For RS256 (asymmetric), use public key or JWKS
+        common_kwargs["jwks_uri"] = jwks_uri
+        common_kwargs["public_key"] = public_key
+        common_kwargs["algorithm"] = app.config.get("MCP_JWT_ALGORITHM", 
"RS256")
+
+    if debug_errors:
+        # DetailedJWTVerifier: detailed server-side logging of JWT
+        # validation failures. HTTP responses are always generic per
+        # RFC 6750 Section 3.1.
+        from superset.mcp_service.jwt_verifier import DetailedJWTVerifier
+
+        return DetailedJWTVerifier(**common_kwargs)
+
+    # Default JWTVerifier: minimal logging, generic error responses.
+    from fastmcp.server.auth.providers.jwt import JWTVerifier
+
+    return JWTVerifier(**common_kwargs)
 
 
 def default_user_resolver(app: Any, access_token: Any) -> str | None:
diff --git a/superset/mcp_service/server.py b/superset/mcp_service/server.py
index dc4bffbb598..ab6fb04a2ca 100644
--- a/superset/mcp_service/server.py
+++ b/superset/mcp_service/server.py
@@ -665,7 +665,9 @@ def _create_auth_provider(flask_app: Any) -> Any | None:
     """Create an auth provider from Flask app config.
 
     Tries MCP_AUTH_FACTORY first, then falls back to the default factory
-    when MCP_AUTH_ENABLED is True.
+    when either ``MCP_AUTH_ENABLED`` (JWT auth) or ``FAB_API_KEY_ENABLED``
+    (API key auth) is True. The default factory builds a
+    ``CompositeTokenVerifier`` that handles either or both auth modes.
     """
     auth_provider = None
     if auth_factory := flask_app.config.get("MCP_AUTH_FACTORY"):
@@ -678,7 +680,9 @@ def _create_auth_provider(flask_app: Any) -> Any | None:
         except Exception:
             # Do not log the exception — it may contain secrets
             logger.error("Failed to create auth provider from 
MCP_AUTH_FACTORY")
-    elif flask_app.config.get("MCP_AUTH_ENABLED", False):
+    elif flask_app.config.get("MCP_AUTH_ENABLED", False) or 
flask_app.config.get(
+        "FAB_API_KEY_ENABLED", False
+    ):
         from superset.mcp_service.mcp_config import (
             create_default_mcp_auth_factory,
         )
diff --git a/superset/security/manager.py b/superset/security/manager.py
index 44c23f86597..19a8cf8c522 100644
--- a/superset/security/manager.py
+++ b/superset/security/manager.py
@@ -462,6 +462,10 @@ class SupersetSecurityManager(  # pylint: 
disable=too-many-public-methods
         "PermissionViewMenu",
         "ViewMenu",
         "User",
+        # FAB ApiKeyApi blueprint (active when FAB_API_KEY_ENABLED=True).
+        # Listed unconditionally — harmless when the feature is off because
+        # no PVMs exist under this view menu.
+        "ApiKey",
     } | USER_MODEL_VIEWS
 
     ALPHA_ONLY_VIEW_MENUS = {
diff --git a/tests/unit_tests/mcp_service/test_auth_api_key.py 
b/tests/unit_tests/mcp_service/test_auth_api_key.py
index b53e624e83a..11717ae0edc 100644
--- a/tests/unit_tests/mcp_service/test_auth_api_key.py
+++ b/tests/unit_tests/mcp_service/test_auth_api_key.py
@@ -15,8 +15,15 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Tests for API key authentication in get_user_from_request()."""
+"""Tests for API key authentication in get_user_from_request().
 
+The streamable-http transport does not push a Flask request context, so
+``_resolve_user_from_api_key`` reads the token from FastMCP's per-request
+``AccessToken`` (populated by ``CompositeTokenVerifier``) rather than from
+``flask.request``. These tests mock ``get_access_token`` accordingly.
+"""
+
+from collections.abc import Generator
 from unittest.mock import MagicMock, patch
 
 import pytest
@@ -27,17 +34,34 @@ from superset.mcp_service.auth import (
     _resolve_user_from_jwt_context,
     get_user_from_request,
 )
+from superset.mcp_service.composite_token_verifier import 
API_KEY_PASSTHROUGH_CLAIM
 
 
 @pytest.fixture
-def mock_user():
+def mock_user() -> MagicMock:
     user = MagicMock()
     user.username = "api_key_user"
     return user
 
 
+def _passthrough_access_token(token: str) -> MagicMock:
+    """Build an AccessToken matching what CompositeTokenVerifier emits."""
+    access_token = MagicMock()
+    access_token.token = token
+    access_token.claims = {API_KEY_PASSTHROUGH_CLAIM: True}
+    return access_token
+
+
+def _patch_access_token(access_token: MagicMock | None):
+    """Patch get_access_token where _resolve_user_from_api_key imports it."""
+    return patch(
+        "fastmcp.server.dependencies.get_access_token",
+        return_value=access_token,
+    )
+
+
 @pytest.fixture
-def _enable_api_keys(app):
+def _enable_api_keys(app: SupersetApp) -> Generator[None, None, None]:
     """Enable FAB API key auth and clear MCP_DEV_USERNAME so the API key
     path is exercised instead of falling through to the dev-user fallback."""
     app.config["FAB_API_KEY_ENABLED"] = True
@@ -49,7 +73,7 @@ def _enable_api_keys(app):
 
 
 @pytest.fixture
-def _disable_api_keys(app):
+def _disable_api_keys(app: SupersetApp) -> Generator[None, None, None]:
     app.config["FAB_API_KEY_ENABLED"] = False
     old_dev = app.config.pop("MCP_DEV_USERNAME", None)
     yield
@@ -62,20 +86,22 @@ def _disable_api_keys(app):
 
 
 @pytest.mark.usefixtures("_enable_api_keys")
-def test_valid_api_key_returns_user(app, mock_user) -> None:
-    """A valid API key should authenticate and return the user."""
+def test_valid_api_key_returns_user(app: SupersetApp, mock_user: MagicMock) -> 
None:
+    """A valid API key pass-through token should authenticate and return the 
user."""
     mock_sm = MagicMock()
-    mock_sm.extract_api_key_from_request.return_value = "sst_abc123"
     mock_sm.validate_api_key.return_value = mock_user
 
-    with app.test_request_context(headers={"Authorization": "Bearer 
sst_abc123"}):
+    with app.app_context():
         g.user = None
         app.appbuilder = MagicMock()
         app.appbuilder.sm = mock_sm
 
-        with patch(
-            "superset.mcp_service.auth.load_user_with_relationships",
-            return_value=mock_user,
+        with (
+            _patch_access_token(_passthrough_access_token("sst_abc123")),
+            patch(
+                "superset.mcp_service.auth.load_user_with_relationships",
+                return_value=mock_user,
+            ),
         ):
             result = get_user_from_request()
 
@@ -83,54 +109,61 @@ def test_valid_api_key_returns_user(app, mock_user) -> 
None:
     mock_sm.validate_api_key.assert_called_once_with("sst_abc123")
 
 
-# -- Invalid API key -> PermissionError --
+# -- Invalid API key -> PermissionError (does not silently fall back) --
 
 
 @pytest.mark.usefixtures("_enable_api_keys")
-def test_invalid_api_key_raises(app) -> None:
-    """An invalid API key should raise PermissionError."""
+def test_invalid_api_key_raises(app: SupersetApp) -> None:
+    """An invalid API key pass-through token should raise PermissionError
+    (fail closed — do NOT fall through to MCP_DEV_USERNAME)."""
     mock_sm = MagicMock()
-    mock_sm.extract_api_key_from_request.return_value = "sst_bad_key"
     mock_sm.validate_api_key.return_value = None
 
-    with app.test_request_context(headers={"Authorization": "Bearer 
sst_bad_key"}):
-        g.user = None
-        app.appbuilder = MagicMock()
-        app.appbuilder.sm = mock_sm
+    # The dangerous fallthrough scenario: dev username IS set, but the
+    # request presented an invalid API key. The dev fallback must not
+    # mask the rejection.
+    app.config["MCP_DEV_USERNAME"] = "admin"
+    try:
+        with app.app_context():
+            g.user = None
+            app.appbuilder = MagicMock()
+            app.appbuilder.sm = mock_sm
 
-        with pytest.raises(PermissionError, match="Invalid or expired API 
key"):
-            get_user_from_request()
+            with _patch_access_token(_passthrough_access_token("sst_bad_key")):
+                with pytest.raises(PermissionError, match="Invalid or expired 
API key"):
+                    get_user_from_request()
+    finally:
+        app.config.pop("MCP_DEV_USERNAME", None)
 
 
 # -- API key disabled -> falls through to next auth method --
 
 
 @pytest.mark.usefixtures("_disable_api_keys")
-def test_api_key_disabled_skips_auth(app) -> None:
-    """When FAB_API_KEY_ENABLED is False, API key auth is skipped entirely."""
+def test_api_key_disabled_skips_auth(app: SupersetApp) -> None:
+    """When FAB_API_KEY_ENABLED is False, API key auth is skipped entirely
+    even if an AccessToken is present."""
     mock_sm = MagicMock()
 
-    with app.test_request_context(headers={"Authorization": "Bearer 
sst_abc123"}):
+    with app.app_context():
         g.user = None
         app.appbuilder = MagicMock()
         app.appbuilder.sm = mock_sm
 
-        # Without API key auth or MCP_DEV_USERNAME, should raise ValueError
-        # about no authenticated user (not about invalid API key)
-        with pytest.raises(ValueError, match="No authenticated user found"):
-            get_user_from_request()
+        with _patch_access_token(_passthrough_access_token("sst_abc123")):
+            with pytest.raises(ValueError, match="No authenticated user 
found"):
+                get_user_from_request()
 
-    # SecurityManager API key methods should never be called
-    mock_sm.extract_api_key_from_request.assert_not_called()
+    mock_sm.validate_api_key.assert_not_called()
 
 
-# -- No request context -> API key auth skipped --
+# -- No AccessToken -> API key auth skipped --
 
 
 @pytest.mark.usefixtures("_enable_api_keys")
-def test_no_request_context_skips_api_key_auth(app) -> None:
-    """Without a request context, API key auth should be skipped
-    (e.g., during MCP tool discovery with only an app context)."""
+def test_no_access_token_skips_api_key_auth(app: SupersetApp) -> None:
+    """Without a FastMCP AccessToken (e.g., MCP_AUTH_ENABLED=False and no
+    auth provider installed), API key auth is skipped."""
     mock_sm = MagicMock()
 
     with app.app_context():
@@ -138,20 +171,20 @@ def test_no_request_context_skips_api_key_auth(app) -> 
None:
         app.appbuilder = MagicMock()
         app.appbuilder.sm = mock_sm
 
-        # Explicitly mock has_request_context to False because the test
-        # framework's app fixture may implicitly provide a request context.
-        with patch("superset.mcp_service.auth.has_request_context", 
return_value=False):
+        with _patch_access_token(None):
             with pytest.raises(ValueError, match="No authenticated user 
found"):
                 get_user_from_request()
 
-    mock_sm.extract_api_key_from_request.assert_not_called()
+    mock_sm.validate_api_key.assert_not_called()
 
 
 # -- g.user fallback when no higher-priority auth succeeds --
 
 
 @pytest.mark.usefixtures("_disable_api_keys")
-def test_g_user_fallback_when_no_jwt_or_api_key(app, mock_user) -> None:
+def test_g_user_fallback_when_no_jwt_or_api_key(
+    app: SupersetApp, mock_user: MagicMock
+) -> None:
     """When no JWT or API key auth succeeds and MCP_DEV_USERNAME is not set,
     g.user (set by external middleware) is used as fallback."""
     with app.test_request_context():
@@ -162,106 +195,97 @@ def test_g_user_fallback_when_no_jwt_or_api_key(app, 
mock_user) -> None:
     assert result.username == "api_key_user"
 
 
-# -- FAB version without extract_api_key_from_request --
-
-
[email protected]("_enable_api_keys")
-def test_fab_without_extract_method_skips_gracefully(app) -> None:
-    """If FAB SecurityManager lacks extract_api_key_from_request,
-    API key auth should be skipped with a debug log, not crash."""
-    mock_sm = MagicMock(spec=[])  # empty spec = no attributes
-
-    with app.test_request_context():
-        g.user = None
-        app.appbuilder = MagicMock()
-        app.appbuilder.sm = mock_sm
-
-        with pytest.raises(ValueError, match="No authenticated user found"):
-            get_user_from_request()
-
-
 # -- FAB version without validate_api_key --
 
 
 @pytest.mark.usefixtures("_enable_api_keys")
-def test_fab_without_validate_method_raises(app) -> None:
-    """If FAB has extract_api_key_from_request but not validate_api_key,
-    should raise PermissionError about unavailable validation."""
-    mock_sm = MagicMock(spec=["extract_api_key_from_request"])
-    mock_sm.extract_api_key_from_request.return_value = "sst_abc123"
+def test_fab_without_validate_method_raises(app: SupersetApp) -> None:
+    """If FAB SecurityManager lacks validate_api_key, should raise
+    PermissionError about unavailable validation."""
+    mock_sm = MagicMock(spec=[])  # empty spec = no attributes
 
-    with app.test_request_context(headers={"Authorization": "Bearer 
sst_abc123"}):
+    with app.app_context():
         g.user = None
         app.appbuilder = MagicMock()
         app.appbuilder.sm = mock_sm
 
-        with pytest.raises(
-            PermissionError, match="API key validation is not available"
-        ):
-            get_user_from_request()
+        with _patch_access_token(_passthrough_access_token("sst_abc123")):
+            with pytest.raises(
+                PermissionError, match="API key validation is not available"
+            ):
+                get_user_from_request()
 
 
 # -- Relationship reload fallback --
 
 
 @pytest.mark.usefixtures("_enable_api_keys")
-def test_relationship_reload_failure_returns_original_user(app, mock_user) -> 
None:
+def test_relationship_reload_failure_returns_original_user(
+    app: SupersetApp, mock_user: MagicMock
+) -> None:
     """If load_user_with_relationships fails, the original user from
     validate_api_key should be returned as fallback."""
     mock_sm = MagicMock()
-    mock_sm.extract_api_key_from_request.return_value = "sst_abc123"
     mock_sm.validate_api_key.return_value = mock_user
 
-    with app.test_request_context(headers={"Authorization": "Bearer 
sst_abc123"}):
+    with app.app_context():
         g.user = None
         app.appbuilder = MagicMock()
         app.appbuilder.sm = mock_sm
 
-        with patch(
-            "superset.mcp_service.auth.load_user_with_relationships",
-            return_value=None,
+        with (
+            _patch_access_token(_passthrough_access_token("sst_abc123")),
+            patch(
+                "superset.mcp_service.auth.load_user_with_relationships",
+                return_value=None,
+            ),
         ):
             result = get_user_from_request()
 
     assert result is mock_user
 
 
-# -- Bearer token present but not matching API key prefix --
+# -- AccessToken without passthrough claim (plain JWT) -> skip API key auth --
 
 
 @pytest.mark.usefixtures("_enable_api_keys")
-def test_non_matching_bearer_token_skips_api_key_auth(app: SupersetApp) -> 
None:
-    """When a Bearer token is present but does not match FAB_API_KEY_PREFIXES
-    (e.g., a JWT token), extract_api_key_from_request returns None and API key
-    auth is skipped, falling through to the next auth method."""
+def test_jwt_access_token_skips_api_key_auth(app: SupersetApp) -> None:
+    """When the AccessToken is a plain JWT (no ``_api_key_passthrough`` claim),
+    API key auth is skipped — the JWT was already validated by the JWT
+    verifier and resolved in _resolve_user_from_jwt_context."""
     mock_sm = MagicMock()
-    mock_sm.extract_api_key_from_request.return_value = None
 
-    with app.test_request_context(
-        headers={"Authorization": "Bearer eyJhbGciOiJIUzI1NiJ9.not-an-api-key"}
-    ):
+    jwt_access_token = MagicMock()
+    jwt_access_token.token = "eyJhbGciOiJIUzI1NiJ9.not-an-api-key"  # noqa: 
S105
+    jwt_access_token.claims = {"sub": "alice"}
+
+    with app.app_context():
         g.user = None
         app.appbuilder = MagicMock()
         app.appbuilder.sm = mock_sm
 
-        with pytest.raises(ValueError, match="No authenticated user found"):
-            get_user_from_request()
+        with _patch_access_token(jwt_access_token):
+            # _resolve_user_from_jwt_context will try to resolve the user
+            # from the JWT claims and (in this isolated unit-test setup)
+            # raise ValueError because the username is not a real user.
+            # We assert that _resolve_user_from_api_key did NOT short-circuit
+            # to the API key path.
+            with pytest.raises(ValueError, match="not found"):
+                get_user_from_request()
 
-    # extract was called but returned None, so validate should NOT be called
-    mock_sm.extract_api_key_from_request.assert_called_once()
     mock_sm.validate_api_key.assert_not_called()
 
 
-# -- API key pass-through from CompositeTokenVerifier --
+# -- API key pass-through detection in JWT context resolver --
 
 
 def test_jwt_context_with_api_key_passthrough_returns_none(app: SupersetApp) 
-> None:
     """When CompositeTokenVerifier passes through an API key token,
-    _resolve_user_from_jwt_context should detect the _api_key_passthrough
-    claim and return None so get_user_from_request falls through to
-    _resolve_user_from_api_key."""
+    _resolve_user_from_jwt_context should detect the namespaced
+    pass-through claim and return None so get_user_from_request falls
+    through to _resolve_user_from_api_key."""
     mock_access_token = MagicMock()
-    mock_access_token.claims = {"_api_key_passthrough": True}
+    mock_access_token.claims = {API_KEY_PASSTHROUGH_CLAIM: True}
 
     with patch(
         "fastmcp.server.dependencies.get_access_token",
@@ -272,24 +296,51 @@ def 
test_jwt_context_with_api_key_passthrough_returns_none(app: SupersetApp) ->
     assert result is None
 
 
+# -- Plain JWT with a colliding non-namespaced claim is NOT mistaken for API 
key --
+
+
[email protected]("_enable_api_keys")
+def test_unnamespaced_passthrough_claim_does_not_trigger_api_key_path(
+    app: SupersetApp,
+) -> None:
+    """A JWT minted by an external IdP that happens to include a custom
+    ``_api_key_passthrough`` claim (legacy unnamespaced name) must NOT be
+    treated as an API-key pass-through. Only the namespaced
+    ``API_KEY_PASSTHROUGH_CLAIM`` triggers the API-key path."""
+    mock_sm = MagicMock()
+
+    rogue_token = MagicMock()
+    rogue_token.token = "eyJhbGciOiJSUzI1NiJ9.rogue_jwt"  # noqa: S105
+    rogue_token.claims = {"_api_key_passthrough": True, "sub": "alice"}
+
+    with app.app_context():
+        g.user = None
+        app.appbuilder = MagicMock()
+        app.appbuilder.sm = mock_sm
+
+        with _patch_access_token(rogue_token):
+            # JWT path tries to resolve user "alice" from DB and (in this
+            # isolated unit-test setup) raises ValueError. The assertion
+            # below confirms validate_api_key was never called — i.e., the
+            # rogue claim did NOT divert into _resolve_user_from_api_key.
+            with pytest.raises(ValueError, match="not found"):
+                get_user_from_request()
+
+    mock_sm.validate_api_key.assert_not_called()
+
+
 # -- SecurityManager method name regression test --
 
 
 def test_security_manager_has_expected_api_key_methods(app: SupersetApp) -> 
None:
-    """Regression test: verify the SecurityManager method names referenced in
-    auth._resolve_user_from_api_key() actually exist on the FAB SecurityManager
-    class.  This catches future renames before they silently break API key auth
-    at runtime (see PR #39437: _extract_api_key_from_request vs
-    extract_api_key_from_request)."""
+    """Regression test: verify the SecurityManager method name referenced in
+    auth._resolve_user_from_api_key() actually exists on the FAB
+    SecurityManager class. Catches future renames before they silently break
+    API key auth at runtime (see PR #39437)."""
     with app.app_context():
         from superset import security_manager
 
         sm = security_manager
-        assert hasattr(sm, "extract_api_key_from_request"), (
-            "FAB SecurityManager is missing 'extract_api_key_from_request'. "
-            "auth._resolve_user_from_api_key() references this method by name 
— "
-            "update auth.py if the FAB API changed."
-        )
         assert hasattr(sm, "validate_api_key"), (
             "FAB SecurityManager is missing 'validate_api_key'. "
             "auth._resolve_user_from_api_key() references this method by name 
— "
diff --git a/tests/unit_tests/mcp_service/test_composite_token_verifier.py 
b/tests/unit_tests/mcp_service/test_composite_token_verifier.py
index 8f496305a1c..d493c07ea03 100644
--- a/tests/unit_tests/mcp_service/test_composite_token_verifier.py
+++ b/tests/unit_tests/mcp_service/test_composite_token_verifier.py
@@ -22,7 +22,10 @@ from unittest.mock import AsyncMock, MagicMock
 import pytest
 from fastmcp.server.auth import AccessToken
 
-from superset.mcp_service.composite_token_verifier import 
CompositeTokenVerifier
+from superset.mcp_service.composite_token_verifier import (
+    API_KEY_PASSTHROUGH_CLAIM,
+    CompositeTokenVerifier,
+)
 
 
 @pytest.fixture
@@ -52,7 +55,7 @@ async def test_api_key_token_returns_passthrough(
     assert result is not None
     assert result.token == api_key
     assert result.client_id == "api_key"
-    assert result.claims.get("_api_key_passthrough") is True
+    assert result.claims.get(API_KEY_PASSTHROUGH_CLAIM) is True
 
 
 @pytest.mark.asyncio
@@ -63,7 +66,7 @@ async def test_second_prefix_matches(
     result = await composite_verifier.verify_token("pat_mytoken")
 
     assert result is not None
-    assert result.claims.get("_api_key_passthrough") is True
+    assert result.claims.get(API_KEY_PASSTHROUGH_CLAIM) is True
 
 
 @pytest.mark.asyncio
@@ -109,3 +112,47 @@ async def test_api_key_does_not_call_jwt_verifier(
     await composite_verifier.verify_token("sst_test_key")
 
     mock_jwt_verifier.verify_token.assert_not_awaited()
+
+
+# -- API-key-only mode (no JWT verifier configured) --
+
+
[email protected]
+async def test_api_key_only_mode_accepts_api_keys() -> None:
+    """When jwt_verifier is None, API key tokens are still passed through."""
+    verifier = CompositeTokenVerifier(jwt_verifier=None, 
api_key_prefixes=["sst_"])
+
+    result = await verifier.verify_token("sst_abc123")
+
+    assert result is not None
+    assert result.claims.get(API_KEY_PASSTHROUGH_CLAIM) is True
+
+
[email protected]
+async def test_api_key_only_mode_rejects_non_api_key_tokens() -> None:
+    """When jwt_verifier is None, non-API-key Bearer tokens are rejected at
+    the transport instead of being silently accepted."""
+    verifier = CompositeTokenVerifier(jwt_verifier=None, 
api_key_prefixes=["sst_"])
+
+    result = await verifier.verify_token("eyJhbGciOiJSUzI1NiJ9.jwt_payload")
+
+    assert result is None
+
+
[email protected]
+async def test_api_key_passthrough_propagates_required_scopes() -> None:
+    """The pass-through AccessToken must carry the verifier's required_scopes
+    so FastMCP's transport-level ``RequireAuthMiddleware`` does not 403 the
+    request before ``_resolve_user_from_api_key`` runs."""
+    jwt_verifier = MagicMock()
+    jwt_verifier.required_scopes = ["read", "write"]
+    jwt_verifier.verify_token = AsyncMock()
+
+    verifier = CompositeTokenVerifier(
+        jwt_verifier=jwt_verifier, api_key_prefixes=["sst_"]
+    )
+
+    result = await verifier.verify_token("sst_abc123")
+
+    assert result is not None
+    assert result.scopes == ["read", "write"]


Reply via email to