This is an automated email from the ASF dual-hosted git repository.

yuqi1129 pushed a commit to branch feat/mcp-governance-task3-6
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 4e27ce044d9899f615beee4837295ed3b150ccae
Author: yuqi <[email protected]>
AuthorDate: Wed Jun 10 22:50:51 2026 +0800

    [#11575] refactor(mcp-server): forward raw Authorization header instead of 
assuming Bearer
    
    To support Gravitino simple authentication (Authorization: Basic 
base64(user:dummy))
    in addition to OAuth2 Bearer tokens, the per-request HTTP path now forwards 
the
    incoming Authorization header verbatim to Gravitino rather than extracting 
and
    re-wrapping a Bearer token. The auth scheme is chosen by the agent and 
preserved.
    
    - PlainRESTClientOperation / factory: rename `token` param to 
`authorization`
      (full header value forwarded verbatim).
    - GravitinoContext: static --token still wraps as `Bearer {token}` (OAuth2);
      per-request path forwards the raw header (Basic/Bearer/Negotiate all 
work).
    - audit._extract_principal: decode `Basic base64(user:secret)` to the 
username
      so audit records attribute simple-auth requests to the real principal.
    - Update unit tests accordingly (110 passing).
    
    This is the prerequisite for the live-Gravitino authorization integration 
test.
---
 mcp-server/mcp_server/client/factory.py            |  8 +-
 .../client/plain/plain_rest_client_operation.py    | 16 +++-
 mcp-server/mcp_server/core/audit.py                | 22 ++++--
 mcp-server/mcp_server/core/context.py              | 43 +++++------
 mcp-server/tests/unit/test_audit.py                | 17 ++++
 mcp-server/tests/unit/test_auth_flow.py            | 32 +++++---
 mcp-server/tests/unit/test_per_request_token.py    | 90 ++++++++++------------
 mcp-server/tests/unit/tools/mock_operation.py      |  2 +-
 8 files changed, 136 insertions(+), 94 deletions(-)

diff --git a/mcp-server/mcp_server/client/factory.py 
b/mcp-server/mcp_server/client/factory.py
index 2e5489d053..201625540d 100644
--- a/mcp-server/mcp_server/client/factory.py
+++ b/mcp-server/mcp_server/client/factory.py
@@ -29,7 +29,7 @@ class RESTClientFactory:
 
     @classmethod
     def create_rest_client(
-        cls, metalake_name: str, uri: str, token: str = ""
+        cls, metalake_name: str, uri: str, authorization: str = ""
     ) -> "PlainRESTClientOperation":
         """
         Create a new rest client instance with the specified parameters.
@@ -37,12 +37,14 @@ class RESTClientFactory:
         Args:
             metalake_name: Name of the metalake
             uri: URI of the Gravitino server endpoint
-            token: Bearer token for Authorization header (empty = anonymous)
+            authorization: Full Authorization header value forwarded verbatim
+                (e.g. "Bearer <token>" or "Basic <base64(user:dummy)>").
+                Empty string means anonymous.
 
         Returns:
             New instance of the configured rest client class
         """
-        return cls._rest_client_class(metalake_name, uri, token)
+        return cls._rest_client_class(metalake_name, uri, authorization)
 
     @classmethod
     def set_rest_client(cls, rest_client_class: type) -> None:
diff --git a/mcp-server/mcp_server/client/plain/plain_rest_client_operation.py 
b/mcp-server/mcp_server/client/plain/plain_rest_client_operation.py
index 8bb8b51aa3..6a612dd946 100644
--- a/mcp-server/mcp_server/client/plain/plain_rest_client_operation.py
+++ b/mcp-server/mcp_server/client/plain/plain_rest_client_operation.py
@@ -62,10 +62,20 @@ from mcp_server.client.topic_operation import TopicOperation
 
 # pylint: disable=too-many-instance-attributes
 class PlainRESTClientOperation(GravitinoOperation):
-    def __init__(self, metalake_name: str, uri: str, token: str = ""):
+    def __init__(self, metalake_name: str, uri: str, authorization: str = ""):
+        """Create a REST client for one identity.
+
+        Args:
+            metalake_name: Name of the metalake.
+            uri: Gravitino server URI.
+            authorization: Full ``Authorization`` header value forwarded 
verbatim
+                on every request (for example ``"Bearer <token>"`` for OAuth2 
or
+                ``"Basic <base64(user:dummy)>"`` for simple auth). Empty string
+                means anonymous (no header sent).
+        """
         headers = {}
-        if token:
-            headers["Authorization"] = f"Bearer {token}"
+        if authorization:
+            headers["Authorization"] = authorization
         _rest_client = httpx.AsyncClient(base_url=uri, headers=headers)
         self._catalog_operation = PlainRESTClientCatalogOperation(
             metalake_name, _rest_client
diff --git a/mcp-server/mcp_server/core/audit.py 
b/mcp-server/mcp_server/core/audit.py
index f65b71cdff..fffd3592f2 100644
--- a/mcp-server/mcp_server/core/audit.py
+++ b/mcp-server/mcp_server/core/audit.py
@@ -15,6 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import base64
+import binascii
 import json
 import logging
 from datetime import datetime, timezone
@@ -25,15 +27,25 @@ _audit_logger = logging.getLogger("gravitino.mcp.audit")
 def _extract_principal(authorization: str) -> str:
     """Derive a display principal from a raw Authorization header value.
 
-    - "Bearer <token>" → "bearer:<first-8-chars-of-token>"
-    - empty / missing  → "anonymous"
+    - "Basic <base64(user:secret)>" → "<user>"  (Gravitino simple auth)
+    - "Bearer <token>"              → "bearer:<first-8-chars-of-token>"
+    - empty / missing / unparsable  → "anonymous"
     """
     if not authorization:
         return "anonymous"
     parts = authorization.split()
-    if len(parts) == 2 and parts[0].lower() == "bearer":
-        token = parts[1]
-        return f"bearer:{token[:8]}"
+    if len(parts) != 2:
+        return "anonymous"
+    scheme, credential = parts[0].lower(), parts[1]
+    if scheme == "basic":
+        try:
+            decoded = base64.b64decode(credential).decode("utf-8")
+        except (binascii.Error, UnicodeDecodeError, ValueError):
+            return "anonymous"
+        user = decoded.split(":", 1)[0]
+        return user if user else "anonymous"
+    if scheme == "bearer":
+        return f"bearer:{credential[:8]}"
     return "anonymous"
 
 
diff --git a/mcp-server/mcp_server/core/context.py 
b/mcp-server/mcp_server/core/context.py
index f2cde66692..48e4c83614 100644
--- a/mcp-server/mcp_server/core/context.py
+++ b/mcp-server/mcp_server/core/context.py
@@ -19,24 +19,18 @@ from mcp_server.client.factory import RESTClientFactory
 from mcp_server.core.setting import Setting
 
 
-def _extract_bearer_token(authorization: str) -> str:
-    """Parse a Bearer token from a raw Authorization header value."""
-    parts = authorization.split()
-    if len(parts) == 2 and parts[0].lower() == "bearer":
-        return parts[1]
-    return ""
+def _get_request_authorization() -> str:
+    """Return the raw ``Authorization`` header of the current HTTP request.
 
-
-def _get_request_token() -> str:
-    """Extract the Bearer token from the current HTTP request, if any.
-
-    Returns an empty string in stdio mode or when the header is absent.
+    The header is forwarded to Gravitino verbatim so the auth scheme chosen by
+    the agent (``Basic`` for simple auth, ``Bearer`` for OAuth2, ``Negotiate``
+    for Kerberos) is preserved. Returns an empty string in stdio mode or when
+    the header is absent.
     """
     try:
         from fastmcp.server.dependencies import get_http_request
 
-        authorization = get_http_request().headers.get("authorization", "")
-        return _extract_bearer_token(authorization)
+        return get_http_request().headers.get("authorization", "")
     except Exception:  # noqa: BLE001 – stdio mode or missing request context
         return ""
 
@@ -44,28 +38,33 @@ def _get_request_token() -> str:
 class GravitinoContext:
     def __init__(self, setting: Setting):
         self._setting = setting
-        # Fallback client for stdio mode or when no per-request token is 
present.
+        # Static startup identity: the --token CLI value is treated as a Bearer
+        # token (OAuth2). Used in stdio mode or as the fallback when an HTTP
+        # request carries no Authorization header.
+        default_authorization = (
+            f"Bearer {setting.token}" if setting.token else ""
+        )
         self._default_client = RESTClientFactory.create_rest_client(
-            setting.metalake, setting.gravitino_uri, setting.token
+            setting.metalake, setting.gravitino_uri, default_authorization
         )
 
     def rest_client(self):
         """Return a REST client carrying the correct identity for this request.
 
-        In HTTP transport mode the Bearer token from the incoming MCP request's
-        Authorization header takes priority over the static startup token.
-        This ensures concurrent sessions with different principals are fully
-        isolated — one principal's token never leaks into another's Gravitino 
calls.
+        In HTTP transport mode the incoming request's ``Authorization`` header 
is
+        forwarded verbatim to Gravitino, taking priority over the static 
startup
+        token. This keeps concurrent sessions with different principals fully
+        isolated — one principal's identity never leaks into another's calls.
 
         Falls back to the shared default client (static startup token) when:
         - running in stdio mode (no HTTP request context), or
         - the incoming request carries no Authorization header.
         """
-        request_token = _get_request_token()
-        if request_token:
+        authorization = _get_request_authorization()
+        if authorization:
             return RESTClientFactory.create_rest_client(
                 self._setting.metalake,
                 self._setting.gravitino_uri,
-                request_token,
+                authorization,
             )
         return self._default_client
diff --git a/mcp-server/tests/unit/test_audit.py 
b/mcp-server/tests/unit/test_audit.py
index c52a09a391..24bca0efee 100644
--- a/mcp-server/tests/unit/test_audit.py
+++ b/mcp-server/tests/unit/test_audit.py
@@ -108,6 +108,23 @@ class TestExtractPrincipal(unittest.TestCase):
     def test_short_token_uses_full_token(self):
         self.assertEqual(audit._extract_principal("Bearer abc"), "bearer:abc")
 
+    def test_basic_auth_decodes_user(self):
+        """Simple auth header 'Basic base64(alice:dummy)' -> principal 
'alice'."""
+        # base64("alice:dummy") == "YWxpY2U6ZHVtbXk="
+        self.assertEqual(
+            audit._extract_principal("Basic YWxpY2U6ZHVtbXk="), "alice"
+        )
+
+    def test_basic_auth_invalid_base64_returns_anonymous(self):
+        self.assertEqual(
+            audit._extract_principal("Basic not-valid-base64!!"), "anonymous"
+        )
+
+    def test_unknown_scheme_returns_anonymous(self):
+        self.assertEqual(
+            audit._extract_principal("Negotiate abc123"), "anonymous"
+        )
+
 
 class TestAuditMiddlewareIntegration(unittest.TestCase):
     """Integration tests: AuditMiddleware emits records via the full MCP tool 
path."""
diff --git a/mcp-server/tests/unit/test_auth_flow.py 
b/mcp-server/tests/unit/test_auth_flow.py
index ca6b59b671..174e93407f 100644
--- a/mcp-server/tests/unit/test_auth_flow.py
+++ b/mcp-server/tests/unit/test_auth_flow.py
@@ -24,27 +24,39 @@ from mcp_server.core.context import GravitinoContext
 from mcp_server.core.setting import Setting
 
 
-class TestTokenInjection(unittest.TestCase):
-    """Verify Bearer token is correctly injected into the httpx client."""
+class TestAuthorizationInjection(unittest.TestCase):
+    """Verify the Authorization header is forwarded verbatim to the httpx 
client."""
 
-    def test_token_sets_authorization_header(self):
-        """When a token is provided, the httpx client carries Authorization: 
Bearer."""
+    def test_bearer_authorization_header(self):
+        """A Bearer authorization value is forwarded unchanged."""
         client = PlainRESTClientOperation(
-            "my_metalake", "http://localhost:8090";, token="my-secret-token"
+            "my_metalake",
+            "http://localhost:8090";,
+            authorization="Bearer my-secret-token",
         )
         headers = dict(client._catalog_operation.rest_client.headers)
         self.assertEqual(headers.get("authorization"), "Bearer 
my-secret-token")
 
-    def test_empty_token_no_authorization_header(self):
-        """When token is empty, no Authorization header is added."""
+    def test_basic_authorization_header(self):
+        """A Basic authorization value (simple auth) is forwarded unchanged."""
         client = PlainRESTClientOperation(
-            "my_metalake", "http://localhost:8090";, token=""
+            "my_metalake",
+            "http://localhost:8090";,
+            authorization="Basic YWxpY2U6ZHVtbXk=",
+        )
+        headers = dict(client._catalog_operation.rest_client.headers)
+        self.assertEqual(headers.get("authorization"), "Basic 
YWxpY2U6ZHVtbXk=")
+
+    def test_empty_authorization_no_header(self):
+        """When authorization is empty, no Authorization header is added."""
+        client = PlainRESTClientOperation(
+            "my_metalake", "http://localhost:8090";, authorization=""
         )
         headers = dict(client._catalog_operation.rest_client.headers)
         self.assertNotIn("authorization", headers)
 
-    def test_no_token_argument_no_authorization_header(self):
-        """When token argument is omitted entirely, no Authorization header is 
added."""
+    def test_no_authorization_argument_no_header(self):
+        """When authorization argument is omitted, no Authorization header is 
added."""
         client = PlainRESTClientOperation(
             "my_metalake", "http://localhost:8090";
         )
diff --git a/mcp-server/tests/unit/test_per_request_token.py 
b/mcp-server/tests/unit/test_per_request_token.py
index 77ad709ce6..d5e0f0aef2 100644
--- a/mcp-server/tests/unit/test_per_request_token.py
+++ b/mcp-server/tests/unit/test_per_request_token.py
@@ -15,12 +15,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Tests for per-request token isolation (Task 6).
+"""Tests for per-request identity isolation (Task 6).
 
-GravitinoContext.rest_client() must return a client carrying the token from
-the current HTTP request, not the shared startup token, when an Authorization
-header is present.  This ensures concurrent multi-principal sessions are fully
-isolated in HTTP transport mode.
+GravitinoContext.rest_client() must forward the current HTTP request's raw
+Authorization header (any scheme) to Gravitino, not the shared startup token,
+so concurrent multi-principal sessions stay fully isolated in HTTP mode.
 """
 
 import unittest
@@ -28,47 +27,38 @@ from unittest.mock import MagicMock, patch
 
 from mcp_server.core.context import (
     GravitinoContext,
-    _extract_bearer_token,
-    _get_request_token,
+    _get_request_authorization,
 )
 from mcp_server.core.setting import Setting
 
 
-class TestExtractBearerToken(unittest.TestCase):
-    """Unit tests for the token extraction helper."""
+class TestGetRequestAuthorization(unittest.TestCase):
+    """Unit tests for _get_request_authorization() (HTTP context 
extraction)."""
 
-    def test_well_formed_bearer_header(self):
-        self.assertEqual(
-            _extract_bearer_token("Bearer mytoken123"), "mytoken123"
-        )
-
-    def test_case_insensitive_bearer(self):
-        self.assertEqual(_extract_bearer_token("bearer MYTOKEN"), "MYTOKEN")
-
-    def test_empty_header_returns_empty(self):
-        self.assertEqual(_extract_bearer_token(""), "")
-
-    def test_non_bearer_scheme_returns_empty(self):
-        self.assertEqual(_extract_bearer_token("Basic dXNlcjpwYXNz"), "")
-
-    def test_only_scheme_no_token_returns_empty(self):
-        self.assertEqual(_extract_bearer_token("Bearer"), "")
+    def test_returns_raw_bearer_header(self):
+        mock_request = MagicMock()
+        mock_request.headers.get.return_value = "Bearer request-token-xyz"
 
+        with patch(
+            "fastmcp.server.dependencies.get_http_request",
+            return_value=mock_request,
+        ):
+            authorization = _get_request_authorization()
 
-class TestGetRequestToken(unittest.TestCase):
-    """Unit tests for _get_request_token() (HTTP context extraction)."""
+        self.assertEqual(authorization, "Bearer request-token-xyz")
 
-    def test_returns_token_when_http_request_available(self):
+    def test_returns_raw_basic_header_verbatim(self):
+        """Basic (simple auth) headers must pass through unchanged, not be 
dropped."""
         mock_request = MagicMock()
-        mock_request.headers.get.return_value = "Bearer request-token-xyz"
+        mock_request.headers.get.return_value = "Basic YWxpY2U6ZHVtbXk="
 
         with patch(
             "fastmcp.server.dependencies.get_http_request",
             return_value=mock_request,
         ):
-            token = _get_request_token()
+            authorization = _get_request_authorization()
 
-        self.assertEqual(token, "request-token-xyz")
+        self.assertEqual(authorization, "Basic YWxpY2U6ZHVtbXk=")
 
     def test_returns_empty_when_no_http_context(self):
         """Simulates stdio mode where get_http_request raises LookupError."""
@@ -76,9 +66,9 @@ class TestGetRequestToken(unittest.TestCase):
             "fastmcp.server.dependencies.get_http_request",
             side_effect=LookupError("no request context"),
         ):
-            token = _get_request_token()
+            authorization = _get_request_authorization()
 
-        self.assertEqual(token, "")
+        self.assertEqual(authorization, "")
 
     def test_returns_empty_when_no_authorization_header(self):
         mock_request = MagicMock()
@@ -88,13 +78,13 @@ class TestGetRequestToken(unittest.TestCase):
             "fastmcp.server.dependencies.get_http_request",
             return_value=mock_request,
         ):
-            token = _get_request_token()
+            authorization = _get_request_authorization()
 
-        self.assertEqual(token, "")
+        self.assertEqual(authorization, "")
 
 
-class TestGravitinoContextPerRequestToken(unittest.TestCase):
-    """GravitinoContext.rest_client() isolates per-request tokens."""
+class TestGravitinoContextPerRequestAuthorization(unittest.TestCase):
+    """GravitinoContext.rest_client() isolates per-request identities."""
 
     def _make_context(self, startup_token: str = "") -> GravitinoContext:
         return GravitinoContext(
@@ -105,12 +95,12 @@ class 
TestGravitinoContextPerRequestToken(unittest.TestCase):
             )
         )
 
-    def test_per_request_token_overrides_startup_token(self):
-        """When an HTTP request carries a token, it takes priority over the 
startup token."""
+    def test_per_request_header_overrides_startup_token(self):
+        """An HTTP request's Authorization header takes priority over the 
startup token."""
         ctx = self._make_context(startup_token="startup-token")
 
         mock_request = MagicMock()
-        mock_request.headers.get.return_value = "Bearer request-token"
+        mock_request.headers.get.return_value = "Basic YWxpY2U6ZHVtbXk="
 
         with patch(
             "fastmcp.server.dependencies.get_http_request",
@@ -119,10 +109,10 @@ class 
TestGravitinoContextPerRequestToken(unittest.TestCase):
             client = ctx.rest_client()
 
         headers = dict(client._catalog_operation.rest_client.headers)
-        self.assertEqual(headers.get("authorization"), "Bearer request-token")
+        self.assertEqual(headers.get("authorization"), "Basic 
YWxpY2U6ZHVtbXk=")
 
-    def test_falls_back_to_default_client_when_no_request_token(self):
-        """When no per-request token exists, the shared default client 
(startup token) is used."""
+    def test_falls_back_to_default_client_when_no_request_header(self):
+        """With no per-request header, the shared default client (startup 
token) is used."""
         ctx = self._make_context(startup_token="startup-token")
 
         with patch(
@@ -135,23 +125,23 @@ class 
TestGravitinoContextPerRequestToken(unittest.TestCase):
         self.assertIs(client, ctx._default_client)
 
     def test_two_concurrent_requests_get_different_clients(self):
-        """Different request tokens must produce different client instances."""
+        """Different request identities must produce different client 
instances."""
         ctx = self._make_context()
 
-        def make_mock(token: str) -> MagicMock:
+        def make_mock(authorization: str) -> MagicMock:
             m = MagicMock()
-            m.headers.get.return_value = f"Bearer {token}"
+            m.headers.get.return_value = authorization
             return m
 
         with patch(
             "fastmcp.server.dependencies.get_http_request",
-            return_value=make_mock("alice-token"),
+            return_value=make_mock("Basic YWxpY2U6ZHVtbXk="),
         ):
             client_alice = ctx.rest_client()
 
         with patch(
             "fastmcp.server.dependencies.get_http_request",
-            return_value=make_mock("bob-token"),
+            return_value=make_mock("Basic Ym9iOmR1bW15"),
         ):
             client_bob = ctx.rest_client()
 
@@ -160,7 +150,7 @@ class 
TestGravitinoContextPerRequestToken(unittest.TestCase):
         )
         bob_headers = dict(client_bob._catalog_operation.rest_client.headers)
         self.assertEqual(
-            alice_headers.get("authorization"), "Bearer alice-token"
+            alice_headers.get("authorization"), "Basic YWxpY2U6ZHVtbXk="
         )
-        self.assertEqual(bob_headers.get("authorization"), "Bearer bob-token")
+        self.assertEqual(bob_headers.get("authorization"), "Basic 
Ym9iOmR1bW15")
         self.assertIsNot(client_alice, client_bob)
diff --git a/mcp-server/tests/unit/tools/mock_operation.py 
b/mcp-server/tests/unit/tools/mock_operation.py
index eb6c1f2cff..1f69e9afa7 100644
--- a/mcp-server/tests/unit/tools/mock_operation.py
+++ b/mcp-server/tests/unit/tools/mock_operation.py
@@ -31,7 +31,7 @@ from mcp_server.client.statistic_operation import 
StatisticOperation
 
 
 class MockOperation(GravitinoOperation):
-    def __init__(self, metalake, uri, token=""):
+    def __init__(self, metalake, uri, authorization=""):
         pass
 
     def as_table_operation(self) -> TableOperation:

Reply via email to