This is an automated email from the ASF dual-hosted git repository.

collado pushed a commit to branch mcollado-authn-resolve-roles
in repository https://gitbox.apache.org/repos/asf/polaris.git

commit 5e24f64d5d8a870e00e2aae49887899b807c2bc3
Author: Michael Collado <[email protected]>
AuthorDate: Fri Dec 6 09:36:05 2024 -0800

    Change authentication workflow to lookup principal roles using security 
context
---
 dropwizard/service/build.gradle.kts                |   1 +
 .../service/dropwizard/PolarisApplication.java     |  72 +++++++++--
 ...olarisPrincipalRoleSecurityContextProvider.java | 144 +++++++++++++++++++++
 .../config/PolarisApplicationConfig.java           |   4 +-
 .../admin/PolarisAdminServiceAuthzTest.java        |   6 +-
 .../dropwizard/admin/PolarisAuthzTestBase.java     |  44 ++++++-
 .../dropwizard/catalog/BasePolarisCatalogTest.java |  23 +++-
 .../catalog/BasePolarisCatalogViewTest.java        |  11 +-
 .../PolarisCatalogHandlerWrapperAuthzTest.java     |  16 ++-
 .../catalog/PolarisPassthroughResolutionView.java  |  18 +--
 polaris-core/build.gradle.kts                      |   1 +
 .../polaris/core/auth/PolarisGrantManager.java     |  29 +++++
 .../core/persistence/PolarisEntityManager.java     |  17 +--
 .../resolver/PolarisResolutionManifest.java        |  19 ++-
 .../core/persistence/resolver/Resolver.java        | 130 ++++++++++---------
 .../polaris/core/persistence/ResolverTest.java     |  59 ++++++++-
 .../polaris/service/admin/PolarisAdminService.java |  47 ++++---
 .../polaris/service/admin/PolarisServiceImpl.java  |   2 +-
 .../service/catalog/BasePolarisCatalog.java        |   6 +-
 .../service/catalog/IcebergCatalogAdapter.java     |   5 +-
 .../catalog/PolarisCatalogHandlerWrapper.java      |  30 +++--
 .../service/context/CallContextCatalogFactory.java |   2 +
 .../context/PolarisCallContextCatalogFactory.java  |   3 +
 23 files changed, 528 insertions(+), 161 deletions(-)

diff --git a/dropwizard/service/build.gradle.kts 
b/dropwizard/service/build.gradle.kts
index 7381e7f6..b1357f29 100644
--- a/dropwizard/service/build.gradle.kts
+++ b/dropwizard/service/build.gradle.kts
@@ -37,6 +37,7 @@ dependencies {
   implementation("io.dropwizard:dropwizard-core")
   implementation("io.dropwizard:dropwizard-auth")
   implementation("io.dropwizard:dropwizard-json-logging")
+  implementation("org.glassfish.jersey.inject:jersey-hk2:3.0.16")
 
   implementation(platform(libs.iceberg.bom))
   implementation("org.apache.iceberg:iceberg-api")
diff --git 
a/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/PolarisApplication.java
 
b/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/PolarisApplication.java
index 98c852a3..90b46fcb 100644
--- 
a/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/PolarisApplication.java
+++ 
b/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/PolarisApplication.java
@@ -37,8 +37,6 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.fasterxml.jackson.databind.module.SimpleValueInstantiators;
 import com.fasterxml.jackson.databind.type.TypeFactory;
 import io.dropwizard.auth.AuthDynamicFeature;
-import io.dropwizard.auth.AuthFilter;
-import io.dropwizard.auth.oauth.OAuthCredentialAuthFilter;
 import io.dropwizard.configuration.EnvironmentVariableSubstitutor;
 import io.dropwizard.configuration.SubstitutingSourceProvider;
 import io.dropwizard.core.Application;
@@ -59,6 +57,7 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider;
 import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
 import io.opentelemetry.semconv.ServiceAttributes;
 import io.prometheus.metrics.exporter.servlet.jakarta.PrometheusMetricsServlet;
+import jakarta.annotation.Priority;
 import jakarta.inject.Inject;
 import jakarta.inject.Provider;
 import jakarta.inject.Singleton;
@@ -70,17 +69,24 @@ import jakarta.servlet.ServletException;
 import jakarta.servlet.ServletRequest;
 import jakarta.servlet.ServletResponse;
 import jakarta.servlet.http.HttpServletRequest;
+import jakarta.ws.rs.Priorities;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.container.ContainerRequestFilter;
+import jakarta.ws.rs.core.SecurityContext;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
+import java.security.Principal;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.Executors;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.iceberg.exceptions.NotAuthorizedException;
 import org.apache.iceberg.rest.RESTSerializers;
 import org.apache.polaris.core.PolarisConfigurationStore;
 import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
@@ -116,6 +122,7 @@ import 
org.apache.polaris.service.context.CallContextCatalogFactory;
 import org.apache.polaris.service.context.CallContextResolver;
 import org.apache.polaris.service.context.PolarisCallContextCatalogFactory;
 import org.apache.polaris.service.context.RealmContextResolver;
+import 
org.apache.polaris.service.dropwizard.auth.PolarisPrincipalRoleSecurityContextProvider;
 import org.apache.polaris.service.dropwizard.config.PolarisApplicationConfig;
 import org.apache.polaris.service.dropwizard.context.RealmScopeContext;
 import 
org.apache.polaris.service.dropwizard.exception.JerseyViolationExceptionMapper;
@@ -297,12 +304,9 @@ public class PolarisApplication extends 
Application<PolarisApplicationConfig> {
                     .in(RealmScoped.class)
                     .to(PolarisRemoteCache.class);
 
-                // factory to use a cache delegating grant cache
                 // currently depends explicitly on the metaStoreManager as the 
delegate grant
                 // manager
-                bindFactory(PolarisMetaStoreManagerFactory.class)
-                    .in(RealmScoped.class)
-                    .to(PolarisGrantManager.class);
+                
bindFactory(PolarisMetaStoreManagerFactory.class).to(PolarisGrantManager.class);
                 polarisMetricRegistry.init(
                     IcebergRestCatalogApi.class,
                     IcebergRestConfigurationApi.class,
@@ -396,14 +400,8 @@ public class PolarisApplication extends 
Application<PolarisApplicationConfig> {
     if (configuration.hasRateLimiter()) {
       environment.jersey().register(RateLimiterFilter.class);
     }
-    Authenticator<String, AuthenticatedPolarisPrincipal> authenticator =
-        configuration.findService(new TypeLiteral<>() {});
-    AuthFilter<String, AuthenticatedPolarisPrincipal> 
oauthCredentialAuthFilter =
-        new OAuthCredentialAuthFilter.Builder<AuthenticatedPolarisPrincipal>()
-            .setAuthenticator(authenticator::authenticate)
-            .setPrefix("Bearer")
-            .buildAuthFilter();
-    environment.jersey().register(new 
AuthDynamicFeature(oauthCredentialAuthFilter));
+    environment.jersey().register(new 
AuthDynamicFeature(PolarisPrincipalAuthenticator.class));
+    
environment.jersey().register(PolarisPrincipalRoleSecurityContextProvider.class);
     environment.healthChecks().register("polaris", new PolarisHealthCheck());
 
     environment.jersey().register(IcebergRestOAuth2Api.class);
@@ -478,6 +476,52 @@ public class PolarisApplication extends 
Application<PolarisApplicationConfig> {
         .build();
   }
 
+  @jakarta.ws.rs.ext.Provider
+  @Priority(Priorities.AUTHENTICATION)
+  private static class PolarisPrincipalAuthenticator implements 
ContainerRequestFilter {
+    @Inject private Authenticator<String, AuthenticatedPolarisPrincipal> 
authenticator;
+
+    @Override
+    public void filter(ContainerRequestContext requestContext) throws 
IOException {
+      String authHeader = requestContext.getHeaderString("Authorization");
+      if (authHeader == null) {
+        throw new IOException("Authorization header is missing");
+      }
+      int spaceIdx = authHeader.indexOf(' ');
+      if (spaceIdx <= 0 || !authHeader.substring(0, 
spaceIdx).equalsIgnoreCase("Bearer")) {
+        throw new IOException("Authorization header is not a Bearer token");
+      }
+      String credential = authHeader.substring(spaceIdx + 1);
+      Optional<AuthenticatedPolarisPrincipal> principal = 
authenticator.authenticate(credential);
+      if (principal.isEmpty()) {
+        throw new NotAuthorizedException("Unable to authenticate");
+      }
+      SecurityContext securityContext = requestContext.getSecurityContext();
+      requestContext.setSecurityContext(
+          new SecurityContext() {
+            @Override
+            public Principal getUserPrincipal() {
+              return principal.get();
+            }
+
+            @Override
+            public boolean isUserInRole(String role) {
+              return securityContext.isUserInRole(role);
+            }
+
+            @Override
+            public boolean isSecure() {
+              return securityContext.isSecure();
+            }
+
+            @Override
+            public String getAuthenticationScheme() {
+              return securityContext.getAuthenticationScheme();
+            }
+          });
+    }
+  }
+
   /** Resolves and sets ThreadLocal CallContext/RealmContext based on the 
request contents. */
   private static class ContextResolverFilter implements Filter {
     private final RealmContextResolver realmContextResolver;
diff --git 
a/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/auth/PolarisPrincipalRoleSecurityContextProvider.java
 
b/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/auth/PolarisPrincipalRoleSecurityContextProvider.java
new file mode 100644
index 00000000..9e288bf8
--- /dev/null
+++ 
b/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/auth/PolarisPrincipalRoleSecurityContextProvider.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.dropwizard.auth;
+
+import jakarta.annotation.Priority;
+import jakarta.inject.Inject;
+import jakarta.inject.Provider;
+import jakarta.ws.rs.Priorities;
+import jakarta.ws.rs.container.ContainerRequestContext;
+import jakarta.ws.rs.container.ContainerRequestFilter;
+import jakarta.ws.rs.core.SecurityContext;
+import java.io.IOException;
+import java.security.Principal;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.iceberg.exceptions.NotAuthorizedException;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
+import org.apache.polaris.core.auth.PolarisGrantManager;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PrincipalRoleEntity;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.service.auth.BasePolarisAuthenticator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Priority(Priorities.AUTHENTICATION + 1)
+public class PolarisPrincipalRoleSecurityContextProvider implements 
ContainerRequestFilter {
+  private static final Logger LOGGER =
+      
LoggerFactory.getLogger(PolarisPrincipalRoleSecurityContextProvider.class);
+  @Inject Provider<SecurityContext> securityContextProvider;
+  @Inject MetaStoreManagerFactory metaStoreManager;
+  @Inject Provider<PolarisGrantManager> polarisGrantManagerProvider;
+
+  @Override
+  public void filter(ContainerRequestContext requestContext) throws 
IOException {
+    AuthenticatedPolarisPrincipal polarisPrincipal =
+        (AuthenticatedPolarisPrincipal) 
securityContextProvider.get().getUserPrincipal();
+    if (polarisPrincipal == null) {
+      return;
+    }
+    SecurityContext securityContext =
+        createSecurityContext(requestContext.getSecurityContext(), 
polarisPrincipal);
+    requestContext.setSecurityContext(securityContext);
+  }
+
+  public SecurityContext createSecurityContext(
+      SecurityContext ctx, AuthenticatedPolarisPrincipal principal) {
+    RealmContext realmContext = 
CallContext.getCurrentContext().getRealmContext();
+    List<PrincipalRoleEntity> activeRoles =
+        loadActivePrincipalRoles(
+            principal.getActivatedPrincipalRoleNames(),
+            principal.getPrincipalEntity(),
+            metaStoreManager.getOrCreateMetaStoreManager(realmContext));
+    Set<String> validRoleNames =
+        
activeRoles.stream().map(PrincipalRoleEntity::getName).collect(Collectors.toSet());
+    return new SecurityContext() {
+      @Override
+      public Principal getUserPrincipal() {
+        return principal;
+      }
+
+      @Override
+      public boolean isUserInRole(String role) {
+        return validRoleNames.contains(role);
+      }
+
+      @Override
+      public boolean isSecure() {
+        return ctx.isSecure();
+      }
+
+      @Override
+      public String getAuthenticationScheme() {
+        return ctx.getAuthenticationScheme();
+      }
+    };
+  }
+
+  protected List<PrincipalRoleEntity> loadActivePrincipalRoles(
+      Set<String> tokenRoles, PolarisEntity principal, PolarisMetaStoreManager 
metaStoreManager) {
+    PolarisCallContext polarisContext = 
CallContext.getCurrentContext().getPolarisCallContext();
+    PolarisGrantManager.LoadGrantsResult principalGrantResults =
+        polarisGrantManagerProvider.get().loadGrantsToGrantee(polarisContext, 
principal);
+    polarisContext
+        .getDiagServices()
+        .check(
+            principalGrantResults.isSuccess(),
+            "Failed to resolve principal roles for principal name={} id={}",
+            principal.getName(),
+            principal.getId());
+    if (!principalGrantResults.isSuccess()) {
+      LOGGER.warn(
+          "Failed to resolve principal roles for principal name={} id={}",
+          principal.getName(),
+          principal.getId());
+      throw new NotAuthorizedException("Unable to authenticate");
+    }
+    boolean allRoles = 
tokenRoles.contains(BasePolarisAuthenticator.PRINCIPAL_ROLE_ALL);
+    Predicate<PrincipalRoleEntity> includeRoleFilter =
+        allRoles ? r -> true : r -> tokenRoles.contains(r.getName());
+    List<PrincipalRoleEntity> activeRoles =
+        principalGrantResults.getGrantRecords().stream()
+            .map(
+                gr ->
+                    metaStoreManager.loadEntity(
+                        polarisContext, gr.getSecurableCatalogId(), 
gr.getSecurableId()))
+            .filter(PolarisMetaStoreManager.EntityResult::isSuccess)
+            .map(PolarisMetaStoreManager.EntityResult::getEntity)
+            .map(PrincipalRoleEntity::of)
+            .filter(includeRoleFilter)
+            .toList();
+    if (activeRoles.size() != principalGrantResults.getGrantRecords().size()) {
+      LOGGER
+          .atWarn()
+          .addKeyValue("principal", principal.getName())
+          .addKeyValue("scopes", tokenRoles)
+          .addKeyValue("roles", activeRoles)
+          .log("Some principal roles were not found in the principal's 
grants");
+    }
+    return activeRoles;
+  }
+}
diff --git 
a/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/config/PolarisApplicationConfig.java
 
b/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/config/PolarisApplicationConfig.java
index 2c351d0f..c5952b14 100644
--- 
a/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/config/PolarisApplicationConfig.java
+++ 
b/dropwizard/service/src/main/java/org/apache/polaris/service/dropwizard/config/PolarisApplicationConfig.java
@@ -71,7 +71,7 @@ public class PolarisApplicationConfig extends Configuration {
   /**
    * Override the default binding of registered services so that the 
configured instances are used.
    */
-  private static final int OVERRIDE_BINDING_RANK = 10;
+  private static final int OVERRIDE_BINDING_RANK = 20;
 
   private MetaStoreManagerFactory metaStoreManagerFactory;
   private String defaultRealm = "default-realm";
@@ -125,7 +125,7 @@ public class PolarisApplicationConfig extends Configuration 
{
             .to(FileIOFactory.class)
             .ranked(OVERRIDE_BINDING_RANK);
         bindFactory(SupplierFactory.create(serviceLocator, 
config::getPolarisAuthenticator))
-            .to(Authenticator.class)
+            .to(new TypeLiteral<Authenticator<String, 
AuthenticatedPolarisPrincipal>>() {})
             .ranked(OVERRIDE_BINDING_RANK);
         bindFactory(SupplierFactory.create(serviceLocator, () -> tokenBroker))
             .to(TokenBrokerFactoryConfig.class);
diff --git 
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/admin/PolarisAdminServiceAuthzTest.java
 
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/admin/PolarisAdminServiceAuthzTest.java
index 292b9bc5..05e923ff 100644
--- 
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/admin/PolarisAdminServiceAuthzTest.java
+++ 
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/admin/PolarisAdminServiceAuthzTest.java
@@ -45,7 +45,11 @@ public class PolarisAdminServiceAuthzTest extends 
PolarisAuthzTestBase {
     final AuthenticatedPolarisPrincipal authenticatedPrincipal =
         new AuthenticatedPolarisPrincipal(principalEntity, 
activatedPrincipalRoles);
     return new PolarisAdminService(
-        callContext, entityManager, metaStoreManager, authenticatedPrincipal, 
polarisAuthorizer);
+        callContext,
+        entityManager,
+        metaStoreManager,
+        securityContext(authenticatedPrincipal, activatedPrincipalRoles),
+        polarisAuthorizer);
   }
 
   private void doTestSufficientPrivileges(
diff --git 
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/admin/PolarisAuthzTestBase.java
 
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/admin/PolarisAuthzTestBase.java
index 58a2e6ee..c7e6ae30 100644
--- 
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/admin/PolarisAuthzTestBase.java
+++ 
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/admin/PolarisAuthzTestBase.java
@@ -25,6 +25,7 @@ import com.google.auth.oauth2.GoogleCredentials;
 import com.google.common.collect.ImmutableMap;
 import jakarta.annotation.Nonnull;
 import jakarta.annotation.Nullable;
+import jakarta.ws.rs.core.SecurityContext;
 import java.io.IOException;
 import java.time.Clock;
 import java.util.Date;
@@ -32,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.Catalog;
@@ -199,7 +201,11 @@ public abstract class PolarisAuthzTestBase {
 
     this.adminService =
         new PolarisAdminService(
-            callContext, entityManager, metaStoreManager, authenticatedRoot, 
polarisAuthorizer);
+            callContext,
+            entityManager,
+            metaStoreManager,
+            securityContext(authenticatedRoot, Set.of()),
+            polarisAuthorizer);
 
     String storageLocation = "file:///tmp/authz";
     FileStorageConfigInfo storageConfigModel =
@@ -305,6 +311,32 @@ public abstract class PolarisAuthzTestBase {
     }
   }
 
+  protected @Nonnull SecurityContext securityContext(
+      AuthenticatedPolarisPrincipal p, Set<String> roles) {
+    SecurityContext securityContext = Mockito.mock(SecurityContext.class);
+    Mockito.when(securityContext.getUserPrincipal()).thenReturn(p);
+    Set<String> principalRoleNames = loadPrincipalRolesNames(p);
+    Mockito.when(securityContext.isUserInRole(Mockito.anyString()))
+        .thenAnswer(invocation -> 
principalRoleNames.contains(invocation.getArgument(0)));
+    return securityContext;
+  }
+
+  protected @Nonnull Set<String> 
loadPrincipalRolesNames(AuthenticatedPolarisPrincipal p) {
+    return metaStoreManager
+        .loadGrantsToGrantee(
+            callContext.getPolarisCallContext(), 0L, 
p.getPrincipalEntity().getId())
+        .getGrantRecords()
+        .stream()
+        .filter(gr -> gr.getPrivilegeCode() == 
PolarisPrivilege.PRINCIPAL_ROLE_USAGE.getCode())
+        .map(
+            gr ->
+                metaStoreManager.loadEntity(
+                    callContext.getPolarisCallContext(), 0L, 
gr.getSecurableId()))
+        .map(PolarisMetaStoreManager.EntityResult::getEntity)
+        .map(PolarisBaseEntity::getName)
+        .collect(Collectors.toSet());
+  }
+
   protected @Nonnull PrincipalEntity rotateAndRefreshPrincipal(
       PolarisMetaStoreManager metaStoreManager,
       String principalName,
@@ -350,15 +382,19 @@ public abstract class PolarisAuthzTestBase {
         throw new RuntimeException(e);
       }
     }
+    SecurityContext securityContext = Mockito.mock(SecurityContext.class);
+    
Mockito.when(securityContext.getUserPrincipal()).thenReturn(authenticatedRoot);
+    
Mockito.when(securityContext.isUserInRole(Mockito.anyString())).thenReturn(true);
     PolarisPassthroughResolutionView passthroughView =
         new PolarisPassthroughResolutionView(
-            callContext, entityManager, authenticatedRoot, CATALOG_NAME);
+            callContext, entityManager, securityContext, CATALOG_NAME);
     this.baseCatalog =
         new BasePolarisCatalog(
             entityManager,
             metaStoreManager,
             callContext,
             passthroughView,
+            securityContext,
             authenticatedRoot,
             Mockito.mock(),
             new DefaultFileIOFactory());
@@ -386,11 +422,13 @@ public abstract class PolarisAuthzTestBase {
     public Catalog createCallContextCatalog(
         CallContext context,
         AuthenticatedPolarisPrincipal authenticatedPolarisPrincipal,
+        SecurityContext securityContext,
         final PolarisResolutionManifest resolvedManifest) {
       // This depends on the BasePolarisCatalog allowing calling initialize 
multiple times
       // to override the previous config.
       Catalog catalog =
-          super.createCallContextCatalog(context, 
authenticatedPolarisPrincipal, resolvedManifest);
+          super.createCallContextCatalog(
+              context, authenticatedPolarisPrincipal, securityContext, 
resolvedManifest);
       catalog.initialize(
           CATALOG_NAME,
           ImmutableMap.of(
diff --git 
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/BasePolarisCatalogTest.java
 
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/BasePolarisCatalogTest.java
index 1b7c59e2..96fca279 100644
--- 
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/BasePolarisCatalogTest.java
+++ 
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/BasePolarisCatalogTest.java
@@ -26,6 +26,7 @@ import static org.mockito.Mockito.when;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
 import jakarta.annotation.Nullable;
+import jakarta.ws.rs.core.SecurityContext;
 import java.io.IOException;
 import java.time.Clock;
 import java.util.Arrays;
@@ -133,6 +134,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
   private PolarisEntityManager entityManager;
   private AuthenticatedPolarisPrincipal authenticatedRoot;
   private PolarisEntity catalogEntity;
+  private SecurityContext securityContext;
 
   @BeforeEach
   @SuppressWarnings("unchecked")
@@ -179,13 +181,17 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
 
     authenticatedRoot = new AuthenticatedPolarisPrincipal(rootEntity, 
Set.of());
 
+    securityContext = Mockito.mock(SecurityContext.class);
+    when(securityContext.getUserPrincipal()).thenReturn(authenticatedRoot);
+    when(securityContext.isUserInRole(isA(String.class))).thenReturn(true);
     adminService =
         new PolarisAdminService(
             callContext,
             entityManager,
             metaStoreManager,
-            authenticatedRoot,
+            securityContext,
             new PolarisAuthorizerImpl(new PolarisConfigurationStore() {}));
+
     String storageLocation = "s3://my-bucket/path/to/data";
     storageConfigModel =
         AwsStorageConfigInfo.builder()
@@ -210,7 +216,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
 
     PolarisPassthroughResolutionView passthroughView =
         new PolarisPassthroughResolutionView(
-            callContext, entityManager, authenticatedRoot, CATALOG_NAME);
+            callContext, entityManager, securityContext, CATALOG_NAME);
     TaskExecutor taskExecutor = Mockito.mock();
     this.catalog =
         new BasePolarisCatalog(
@@ -218,6 +224,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
             metaStoreManager,
             callContext,
             passthroughView,
+            securityContext,
             authenticatedRoot,
             taskExecutor,
             new DefaultFileIOFactory());
@@ -786,7 +793,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
     CallContext callContext = CallContext.getCurrentContext();
     PolarisPassthroughResolutionView passthroughView =
         new PolarisPassthroughResolutionView(
-            callContext, entityManager, authenticatedRoot, 
catalogWithoutStorage);
+            callContext, entityManager, securityContext, 
catalogWithoutStorage);
     TaskExecutor taskExecutor = Mockito.mock();
     BasePolarisCatalog catalog =
         new BasePolarisCatalog(
@@ -794,6 +801,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
             metaStoreManager,
             callContext,
             passthroughView,
+            securityContext,
             authenticatedRoot,
             taskExecutor,
             new DefaultFileIOFactory());
@@ -851,7 +859,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
     CallContext callContext = CallContext.getCurrentContext();
     PolarisPassthroughResolutionView passthroughView =
         new PolarisPassthroughResolutionView(
-            callContext, entityManager, authenticatedRoot, catalogName);
+            callContext, entityManager, securityContext, catalogName);
     TaskExecutor taskExecutor = Mockito.mock();
     BasePolarisCatalog catalog =
         new BasePolarisCatalog(
@@ -859,6 +867,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
             metaStoreManager,
             callContext,
             passthroughView,
+            securityContext,
             authenticatedRoot,
             taskExecutor,
             new DefaultFileIOFactory());
@@ -1392,13 +1401,14 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
     CallContext callContext = CallContext.of(realmContext, polarisContext);
     PolarisPassthroughResolutionView passthroughView =
         new PolarisPassthroughResolutionView(
-            callContext, entityManager, authenticatedRoot, noPurgeCatalogName);
+            callContext, entityManager, securityContext, noPurgeCatalogName);
     BasePolarisCatalog noPurgeCatalog =
         new BasePolarisCatalog(
             entityManager,
             metaStoreManager,
             callContext,
             passthroughView,
+            securityContext,
             authenticatedRoot,
             Mockito.mock(),
             new DefaultFileIOFactory());
@@ -1474,7 +1484,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
     CallContext.setCurrentContext(callContext);
     PolarisPassthroughResolutionView passthroughView =
         new PolarisPassthroughResolutionView(
-            callContext, entityManager, authenticatedRoot, CATALOG_NAME);
+            callContext, entityManager, securityContext, CATALOG_NAME);
 
     TestFileIOFactory measured = new TestFileIOFactory();
     BasePolarisCatalog catalog =
@@ -1483,6 +1493,7 @@ public class BasePolarisCatalogTest extends 
CatalogTests<BasePolarisCatalog> {
             metaStoreManager,
             callContext,
             passthroughView,
+            securityContext,
             authenticatedRoot,
             Mockito.mock(),
             measured);
diff --git 
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/BasePolarisCatalogViewTest.java
 
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/BasePolarisCatalogViewTest.java
index 17f1cf01..f547e592 100644
--- 
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/BasePolarisCatalogViewTest.java
+++ 
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/BasePolarisCatalogViewTest.java
@@ -18,10 +18,13 @@
  */
 package org.apache.polaris.service.dropwizard.catalog;
 
+import static org.mockito.Mockito.when;
+
 import com.google.auth.oauth2.AccessToken;
 import com.google.auth.oauth2.GoogleCredentials;
 import com.google.common.collect.ImmutableMap;
 import jakarta.annotation.Nullable;
+import jakarta.ws.rs.core.SecurityContext;
 import java.time.Clock;
 import java.util.Date;
 import java.util.HashMap;
@@ -111,12 +114,15 @@ public class BasePolarisCatalogViewTest extends 
ViewCatalogTests<BasePolarisCata
     AuthenticatedPolarisPrincipal authenticatedRoot =
         new AuthenticatedPolarisPrincipal(rootEntity, Set.of());
 
+    SecurityContext securityContext = Mockito.mock(SecurityContext.class);
+    when(securityContext.getUserPrincipal()).thenReturn(authenticatedRoot);
+    when(securityContext.isUserInRole(Mockito.anyString())).thenReturn(true);
     PolarisAdminService adminService =
         new PolarisAdminService(
             callContext,
             entityManager,
             metaStoreManager,
-            authenticatedRoot,
+            securityContext,
             new PolarisAuthorizerImpl(new PolarisConfigurationStore() {}));
     adminService.createCatalog(
         new CatalogEntity.Builder()
@@ -133,13 +139,14 @@ public class BasePolarisCatalogViewTest extends 
ViewCatalogTests<BasePolarisCata
 
     PolarisPassthroughResolutionView passthroughView =
         new PolarisPassthroughResolutionView(
-            callContext, entityManager, authenticatedRoot, CATALOG_NAME);
+            callContext, entityManager, securityContext, CATALOG_NAME);
     this.catalog =
         new BasePolarisCatalog(
             entityManager,
             metaStoreManager,
             callContext,
             passthroughView,
+            securityContext,
             authenticatedRoot,
             Mockito.mock(),
             new DefaultFileIOFactory());
diff --git 
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
 
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
index db80ac1f..f7cdc375 100644
--- 
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
+++ 
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/PolarisCatalogHandlerWrapperAuthzTest.java
@@ -19,6 +19,7 @@
 package org.apache.polaris.service.dropwizard.catalog;
 
 import com.google.common.collect.ImmutableMap;
+import jakarta.ws.rs.core.SecurityContext;
 import java.time.Instant;
 import java.util.List;
 import java.util.Map;
@@ -92,7 +93,7 @@ public class PolarisCatalogHandlerWrapperAuthzTest extends 
PolarisAuthzTestBase
         callContext,
         entityManager,
         metaStoreManager,
-        authenticatedPrincipal,
+        securityContext(authenticatedPrincipal, activatedPrincipalRoles),
         factory,
         catalogName,
         polarisAuthorizer);
@@ -223,13 +224,14 @@ public class PolarisCatalogHandlerWrapperAuthzTest 
extends PolarisAuthzTestBase
 
     final AuthenticatedPolarisPrincipal authenticatedPrincipal =
         new AuthenticatedPolarisPrincipal(
-            PrincipalEntity.of(newPrincipal.getPrincipal()), Set.of());
+            PrincipalEntity.of(newPrincipal.getPrincipal()),
+            Set.of(PRINCIPAL_ROLE1, PRINCIPAL_ROLE2));
     PolarisCatalogHandlerWrapper wrapper =
         new PolarisCatalogHandlerWrapper(
             callContext,
             entityManager,
             metaStoreManager,
-            authenticatedPrincipal,
+            securityContext(authenticatedPrincipal, Set.of(PRINCIPAL_ROLE1, 
PRINCIPAL_ROLE2)),
             new TestPolarisCallContextCatalogFactory(),
             CATALOG_NAME,
             polarisAuthorizer);
@@ -254,13 +256,14 @@ public class PolarisCatalogHandlerWrapperAuthzTest 
extends PolarisAuthzTestBase
         rotateAndRefreshPrincipal(
             metaStoreManager, principalName, credentials, 
callContext.getPolarisCallContext());
     final AuthenticatedPolarisPrincipal authenticatedPrincipal1 =
-        new 
AuthenticatedPolarisPrincipal(PrincipalEntity.of(refreshPrincipal), Set.of());
+        new AuthenticatedPolarisPrincipal(
+            PrincipalEntity.of(refreshPrincipal), Set.of(PRINCIPAL_ROLE1, 
PRINCIPAL_ROLE2));
     PolarisCatalogHandlerWrapper refreshedWrapper =
         new PolarisCatalogHandlerWrapper(
             callContext,
             entityManager,
             metaStoreManager,
-            authenticatedPrincipal1,
+            securityContext(authenticatedPrincipal1, Set.of(PRINCIPAL_ROLE1, 
PRINCIPAL_ROLE2)),
             new TestPolarisCallContextCatalogFactory(),
             CATALOG_NAME,
             polarisAuthorizer);
@@ -1689,10 +1692,11 @@ public class PolarisCatalogHandlerWrapperAuthzTest 
extends PolarisAuthzTestBase
           public Catalog createCallContextCatalog(
               CallContext context,
               AuthenticatedPolarisPrincipal authenticatedPolarisPrincipal,
+              SecurityContext securityContext,
               PolarisResolutionManifest resolvedManifest) {
             Catalog catalog =
                 super.createCallContextCatalog(
-                    context, authenticatedPolarisPrincipal, resolvedManifest);
+                    context, authenticatedPolarisPrincipal, securityContext, 
resolvedManifest);
             String fileIoImpl = "org.apache.iceberg.inmemory.InMemoryFileIO";
             catalog.initialize(
                 externalCatalog, 
ImmutableMap.of(CatalogProperties.FILE_IO_IMPL, fileIoImpl));
diff --git 
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/PolarisPassthroughResolutionView.java
 
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/PolarisPassthroughResolutionView.java
index 549184c5..8b93ef62 100644
--- 
a/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/PolarisPassthroughResolutionView.java
+++ 
b/dropwizard/service/src/test/java/org/apache/polaris/service/dropwizard/catalog/PolarisPassthroughResolutionView.java
@@ -18,10 +18,10 @@
  */
 package org.apache.polaris.service.dropwizard.catalog;
 
+import jakarta.ws.rs.core.SecurityContext;
 import java.util.Arrays;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
 import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.entity.PolarisEntitySubType;
@@ -42,24 +42,24 @@ import 
org.apache.polaris.core.persistence.resolver.ResolverPath;
 public class PolarisPassthroughResolutionView implements 
PolarisResolutionManifestCatalogView {
   private final PolarisEntityManager entityManager;
   private final CallContext callContext;
-  private final AuthenticatedPolarisPrincipal authenticatedPrincipal;
+  private final SecurityContext securityContext;
   private final String catalogName;
 
   public PolarisPassthroughResolutionView(
       CallContext callContext,
       PolarisEntityManager entityManager,
-      AuthenticatedPolarisPrincipal authenticatedPrincipal,
+      SecurityContext securityContext,
       String catalogName) {
     this.entityManager = entityManager;
     this.callContext = callContext;
-    this.authenticatedPrincipal = authenticatedPrincipal;
+    this.securityContext = securityContext;
     this.catalogName = catalogName;
   }
 
   @Override
   public PolarisResolvedPathWrapper getResolvedReferenceCatalogEntity() {
     PolarisResolutionManifest manifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
     manifest.resolveAll();
     return manifest.getResolvedReferenceCatalogEntity();
   }
@@ -67,7 +67,7 @@ public class PolarisPassthroughResolutionView implements 
PolarisResolutionManife
   @Override
   public PolarisResolvedPathWrapper getResolvedPath(Object key) {
     PolarisResolutionManifest manifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
 
     if (key instanceof Namespace namespace) {
       manifest.addPath(
@@ -85,7 +85,7 @@ public class PolarisPassthroughResolutionView implements 
PolarisResolutionManife
   @Override
   public PolarisResolvedPathWrapper getResolvedPath(Object key, 
PolarisEntitySubType subType) {
     PolarisResolutionManifest manifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
 
     if (key instanceof TableIdentifier identifier) {
       manifest.addPath(
@@ -106,7 +106,7 @@ public class PolarisPassthroughResolutionView implements 
PolarisResolutionManife
   @Override
   public PolarisResolvedPathWrapper getPassthroughResolvedPath(Object key) {
     PolarisResolutionManifest manifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
 
     if (key instanceof Namespace namespace) {
       manifest.addPassthroughPath(
@@ -124,7 +124,7 @@ public class PolarisPassthroughResolutionView implements 
PolarisResolutionManife
   public PolarisResolvedPathWrapper getPassthroughResolvedPath(
       Object key, PolarisEntitySubType subType) {
     PolarisResolutionManifest manifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
 
     if (key instanceof TableIdentifier identifier) {
       manifest.addPassthroughPath(
diff --git a/polaris-core/build.gradle.kts b/polaris-core/build.gradle.kts
index 3ebffd9c..320a0868 100644
--- a/polaris-core/build.gradle.kts
+++ b/polaris-core/build.gradle.kts
@@ -67,6 +67,7 @@ dependencies {
   implementation(libs.swagger.jaxrs)
   implementation(libs.jakarta.inject.api)
   implementation(libs.jakarta.validation.api)
+  implementation(libs.jakarta.ws.rs.api)
   implementation(libs.smallrye.common.annotation)
 
   implementation("org.apache.iceberg:iceberg-aws")
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisGrantManager.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisGrantManager.java
index 1cf52142..35291a2a 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisGrantManager.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisGrantManager.java
@@ -115,6 +115,20 @@ public interface PolarisGrantManager {
       @Nonnull PolarisEntityCore securable,
       @Nonnull PolarisPrivilege privilege);
 
+  /**
+   * This method should be used by the Polaris app to cache all grant records 
on a securable.
+   *
+   * @param callCtx call context
+   * @param securable the securable entity
+   * @return the list of grants and the version of the grant records. We will 
return
+   *     ENTITY_NOT_FOUND if the securable cannot be found
+   */
+  @Nonnull
+  default LoadGrantsResult loadGrantsOnSecurable(
+      @Nonnull PolarisCallContext callCtx, PolarisBaseEntity securable) {
+    return loadGrantsOnSecurable(callCtx, securable.getCatalogId(), 
securable.getId());
+  }
+
   /**
    * This method should be used by the Polaris app to cache all grant records 
on a securable.
    *
@@ -128,6 +142,21 @@ public interface PolarisGrantManager {
   LoadGrantsResult loadGrantsOnSecurable(
       @Nonnull PolarisCallContext callCtx, long securableCatalogId, long 
securableId);
 
+  /**
+   * This method should be used by the Polaris app to load all grants made to 
a grantee, either a
+   * role or a principal.
+   *
+   * @param callCtx call context
+   * @param grantee the grantee entity
+   * @return the list of grants and the version of the grant records. We will 
return NULL if the
+   *     grantee does not exist
+   */
+  @Nonnull
+  default LoadGrantsResult loadGrantsToGrantee(
+      @Nonnull PolarisCallContext callCtx, PolarisBaseEntity grantee) {
+    return loadGrantsToGrantee(callCtx, grantee.getCatalogId(), 
grantee.getId());
+  }
+
   /**
    * This method should be used by the Polaris app to load all grants made to 
a grantee, either a
    * role or a principal.
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisEntityManager.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisEntityManager.java
index 215e600a..5cc36a84 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisEntityManager.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisEntityManager.java
@@ -20,8 +20,8 @@ package org.apache.polaris.core.persistence;
 
 import jakarta.annotation.Nonnull;
 import jakarta.annotation.Nullable;
+import jakarta.ws.rs.core.SecurityContext;
 import java.util.List;
-import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.entity.PolarisEntity;
 import org.apache.polaris.core.entity.PolarisEntityConstants;
@@ -58,33 +58,28 @@ public class PolarisEntityManager {
       @Nonnull StorageCredentialCache credentialCache,
       @Nonnull EntityCache entityCache) {
     this.metaStoreManager = metaStoreManager;
-    this.entityCache = entityCache;
     this.credentialCache = credentialCache;
+    this.entityCache = entityCache;
   }
 
   public Resolver prepareResolver(
       @Nonnull CallContext callContext,
-      @Nonnull AuthenticatedPolarisPrincipal authenticatedPrincipal,
+      @Nonnull SecurityContext securityContext,
       @Nullable String referenceCatalogName) {
     return new Resolver(
         callContext.getPolarisCallContext(),
         metaStoreManager,
-        authenticatedPrincipal.getPrincipalEntity().getId(),
-        null, /* callerPrincipalName */
-        authenticatedPrincipal.getActivatedPrincipalRoleNames().isEmpty()
-            ? null
-            : authenticatedPrincipal.getActivatedPrincipalRoleNames(),
+        securityContext,
         entityCache,
         referenceCatalogName);
   }
 
   public PolarisResolutionManifest prepareResolutionManifest(
       @Nonnull CallContext callContext,
-      @Nonnull AuthenticatedPolarisPrincipal authenticatedPrincipal,
+      @Nonnull SecurityContext securityContext,
       @Nullable String referenceCatalogName) {
     PolarisResolutionManifest manifest =
-        new PolarisResolutionManifest(
-            callContext, this, authenticatedPrincipal, referenceCatalogName);
+        new PolarisResolutionManifest(callContext, this, securityContext, 
referenceCatalogName);
     manifest.setSimulatedResolvedRootContainerEntity(
         getSimulatedResolvedRootContainerEntity(callContext));
     return manifest;
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/resolver/PolarisResolutionManifest.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/resolver/PolarisResolutionManifest.java
index 629e282e..2ab0140a 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/resolver/PolarisResolutionManifest.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/resolver/PolarisResolutionManifest.java
@@ -20,6 +20,7 @@ package org.apache.polaris.core.persistence.resolver;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import jakarta.ws.rs.core.SecurityContext;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -55,6 +56,7 @@ public class PolarisResolutionManifest implements 
PolarisResolutionManifestCatal
 
   private final PolarisEntityManager entityManager;
   private final CallContext callContext;
+  private final SecurityContext securityContext;
   private final AuthenticatedPolarisPrincipal authenticatedPrincipal;
   private final String catalogName;
   private final Resolver primaryResolver;
@@ -81,15 +83,22 @@ public class PolarisResolutionManifest implements 
PolarisResolutionManifestCatal
   public PolarisResolutionManifest(
       CallContext callContext,
       PolarisEntityManager entityManager,
-      AuthenticatedPolarisPrincipal authenticatedPrincipal,
+      SecurityContext securityContext,
       String catalogName) {
     this.entityManager = entityManager;
     this.callContext = callContext;
-    this.authenticatedPrincipal = authenticatedPrincipal;
     this.catalogName = catalogName;
-    this.primaryResolver =
-        entityManager.prepareResolver(callContext, authenticatedPrincipal, 
catalogName);
+    this.primaryResolver = entityManager.prepareResolver(callContext, 
securityContext, catalogName);
     this.diagnostics = callContext.getPolarisCallContext().getDiagServices();
+    this.diagnostics.checkNotNull(securityContext, 
"null_security_context_for_resolution_manifest");
+    this.securityContext = securityContext;
+    diagnostics.check(
+        securityContext.getUserPrincipal() instanceof 
AuthenticatedPolarisPrincipal,
+        "invalid_principal_type_for_resolution_manifest",
+        "principal={}",
+        securityContext.getUserPrincipal());
+    this.authenticatedPrincipal =
+        (AuthenticatedPolarisPrincipal) securityContext.getUserPrincipal();
 
     // TODO: Make the rootContainer lookup no longer optional in the 
persistence store.
     // For now, we'll try to resolve the rootContainer as "optional", and only 
if we fail to find
@@ -193,7 +202,7 @@ public class PolarisResolutionManifest implements 
PolarisResolutionManifestCatal
 
     // Run a single-use Resolver for this path.
     Resolver passthroughResolver =
-        entityManager.prepareResolver(callContext, authenticatedPrincipal, 
catalogName);
+        entityManager.prepareResolver(callContext, securityContext, 
catalogName);
     passthroughResolver.addPath(requestedPath);
     ResolverStatus status = passthroughResolver.resolveAll();
 
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/resolver/Resolver.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/resolver/Resolver.java
index ebefa158..81412482 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/resolver/Resolver.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/resolver/Resolver.java
@@ -20,6 +20,7 @@ package org.apache.polaris.core.persistence.resolver;
 
 import jakarta.annotation.Nonnull;
 import jakarta.annotation.Nullable;
+import jakarta.ws.rs.core.SecurityContext;
 import java.util.AbstractSet;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -31,6 +32,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
 import org.apache.polaris.core.entity.PolarisBaseEntity;
 import org.apache.polaris.core.entity.PolarisChangeTrackingVersions;
 import org.apache.polaris.core.entity.PolarisEntityConstants;
@@ -64,18 +66,12 @@ public class Resolver {
   private final @Nonnull EntityCache cache;
 
   // the id of the principal making the call or 0 if unknown
-  private final long callerPrincipalId;
-
-  // the name of the principal making the call or null if unknown. If 0, the 
principal name will be
-  // not null
-  private final String callerPrincipalName;
+  private final @Nonnull AuthenticatedPolarisPrincipal polarisPrincipal;
+  private final @Nonnull SecurityContext securityContext;
 
   // reference catalog name for name resolution
   private final String referenceCatalogName;
 
-  // if not null, subset of principal roles to activate
-  private final @Nullable Set<String> callerPrincipalRoleNamesScope;
-
   // set of entities to resolve given their name. This does not include 
namespaces or table_like
   // entities which are
   // part of a path
@@ -112,10 +108,7 @@ public class Resolver {
    *
    * @param polarisCallContext the polaris call context
    * @param polarisRemoteCache meta store manager
-   * @param callerPrincipalId if not 0, the id of the principal calling the 
service
-   * @param callerPrincipalName if callerPrincipalId is 0, the name of the 
principal calling the
-   *     service
-   * @param callerPrincipalRoleNamesScope if not null, scope principal roles
+   * @param securityContext The {@link AuthenticatedPolarisPrincipal} for the 
current request
    * @param cache shared entity cache
    * @param referenceCatalogName if not null, specifies the name of the 
reference catalog. The
    *     reference catalog is the catalog used to resolve catalog roles and 
catalog path. Also, if a
@@ -128,28 +121,30 @@ public class Resolver {
   public Resolver(
       @Nonnull PolarisCallContext polarisCallContext,
       @Nonnull PolarisRemoteCache polarisRemoteCache,
-      long callerPrincipalId,
-      @Nullable String callerPrincipalName,
-      @Nullable Set<String> callerPrincipalRoleNamesScope,
+      @Nonnull SecurityContext securityContext,
       @Nonnull EntityCache cache,
       @Nullable String referenceCatalogName) {
     this.polarisCallContext = polarisCallContext;
     this.diagnostics = polarisCallContext.getDiagServices();
     this.polarisRemoteCache = polarisRemoteCache;
     this.cache = cache;
-    this.callerPrincipalName = callerPrincipalName;
-    this.callerPrincipalId = callerPrincipalId;
+    this.securityContext = securityContext;
     this.referenceCatalogName = referenceCatalogName;
 
-    // scoped principal role names
-    this.callerPrincipalRoleNamesScope = callerPrincipalRoleNamesScope;
-
     // validate inputs
+    this.diagnostics.checkNotNull(polarisCallContext, 
"unexpected_null_polarisCallContext");
     this.diagnostics.checkNotNull(polarisRemoteCache, 
"unexpected_null_polarisRemoteCache");
     this.diagnostics.checkNotNull(cache, "unexpected_null_cache");
+    this.diagnostics.checkNotNull(securityContext, 
"security_context_must_be_specified");
+    this.diagnostics.checkNotNull(
+        securityContext.getUserPrincipal(), "principal_must_be_specified");
     this.diagnostics.check(
-        callerPrincipalId != 0 || callerPrincipalName != null, 
"principal_must_be_specified");
+        securityContext.getUserPrincipal() instanceof 
AuthenticatedPolarisPrincipal,
+        "unexpected_principal_type",
+        "class={}",
+        securityContext.getUserPrincipal().getClass().getName());
 
+    this.polarisPrincipal = (AuthenticatedPolarisPrincipal) 
securityContext.getUserPrincipal();
     // paths to resolve
     this.pathsToResolve = new ArrayList<>();
     this.resolvedPaths = new ArrayList<>();
@@ -409,12 +404,7 @@ public class Resolver {
     List<EntityCacheEntry> toValidate = new ArrayList<>();
 
     // first resolve the principal and determine the set of activated 
principal roles
-    ResolverStatus status =
-        this.resolveCallerPrincipalAndPrincipalRoles(
-            toValidate,
-            this.callerPrincipalId,
-            this.callerPrincipalName,
-            this.callerPrincipalRoleNamesScope);
+    ResolverStatus status = 
this.resolveCallerPrincipalAndPrincipalRoles(toValidate);
 
     // if success, continue resolving
     if (status.getStatus() == ResolverStatus.StatusEnum.SUCCESS) {
@@ -703,25 +693,18 @@ public class Resolver {
    *
    * @param toValidate all entities we have resolved from the cache, hence we 
will have to verify
    *     that these entities have not changed in the backend
-   * @param callerPrincipalId the id of the principal which made the call
-   * @param callerPrincipalRoleNamesScope if not null, subset of roles 
activated by this call
    * @return the status of resolution
    */
   private ResolverStatus resolveCallerPrincipalAndPrincipalRoles(
-      List<EntityCacheEntry> toValidate,
-      long callerPrincipalId,
-      String callerPrincipalName,
-      Set<String> callerPrincipalRoleNamesScope) {
+      List<EntityCacheEntry> toValidate) {
 
     // resolve the principal, by name or id
     this.resolvedCallerPrincipal =
-        (callerPrincipalId != PolarisEntityConstants.getNullId())
-            ? this.resolveById(
-                toValidate,
-                PolarisEntityType.PRINCIPAL,
-                PolarisEntityConstants.getNullId(),
-                callerPrincipalId)
-            : this.resolveByName(toValidate, PolarisEntityType.PRINCIPAL, 
callerPrincipalName);
+        this.resolveById(
+            toValidate,
+            PolarisEntityType.PRINCIPAL,
+            PolarisEntityConstants.getNullId(),
+            polarisPrincipal.getPrincipalEntity().getId());
 
     // if the principal was not found, we can end right there
     if (this.resolvedCallerPrincipal == null
@@ -729,35 +712,54 @@ public class Resolver {
       return new 
ResolverStatus(ResolverStatus.StatusEnum.CALLER_PRINCIPAL_DOES_NOT_EXIST);
     }
 
-    // activate all principal roles which still exist
-    for (PolarisGrantRecord grantRecord : 
this.resolvedCallerPrincipal.getGrantRecordsAsGrantee()) {
-      if (grantRecord.getPrivilegeCode() == 
PolarisPrivilege.PRINCIPAL_ROLE_USAGE.getCode()) {
-
-        // resolve principal role granted to that principal
-        EntityCacheEntry principalRole =
-            this.resolveById(
-                toValidate,
-                PolarisEntityType.PRINCIPAL_ROLE,
-                PolarisEntityConstants.getNullId(),
-                grantRecord.getSecurableId());
-
-        // skip if purged or has been dropped
-        if (principalRole != null && !principalRole.getEntity().isDropped()) {
-          // add it to the activated list if no scoped principal role or this 
principal role is
-          // activated
-          if (callerPrincipalRoleNamesScope == null
-              || 
callerPrincipalRoleNamesScope.contains(principalRole.getEntity().getName())) {
-            // this principal role is activated
-            this.resolvedCallerPrincipalRoles.add(principalRole);
-          }
-        }
-      }
-    }
+    // activate all principal roles specified in the authenticated principal
+    resolvedCallerPrincipalRoles =
+        this.polarisPrincipal.getActivatedPrincipalRoleNames().isEmpty()
+            ? resolveAllPrincipalRoles(toValidate, resolvedCallerPrincipal)
+            : resolvePrincipalRolesByName(
+                toValidate, 
this.polarisPrincipal.getActivatedPrincipalRoleNames());
 
     // total success
     return new ResolverStatus(ResolverStatus.StatusEnum.SUCCESS);
   }
 
+  /**
+   * Resolve all principal roles that the principal has grants for
+   *
+   * @param toValidate
+   * @param resolvedCallerPrincipal1
+   * @return the list of resolved principal roles the principal has grants for
+   */
+  private List<EntityCacheEntry> resolveAllPrincipalRoles(
+      List<EntityCacheEntry> toValidate, EntityCacheEntry 
resolvedCallerPrincipal1) {
+    return resolvedCallerPrincipal1.getGrantRecordsAsGrantee().stream()
+        .filter(gr -> gr.getPrivilegeCode() == 
PolarisPrivilege.PRINCIPAL_ROLE_USAGE.getCode())
+        .map(
+            gr ->
+                resolveById(
+                    toValidate,
+                    PolarisEntityType.PRINCIPAL_ROLE,
+                    PolarisEntityConstants.getRootEntityId(),
+                    gr.getSecurableId()))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Resolve the specified list of principal roles. The SecurityContext is 
used to determine whether
+   * the principal actually has the roles specified.
+   *
+   * @param toValidate
+   * @param roleNames
+   * @return the filtered list of resolved principal roles
+   */
+  private List<EntityCacheEntry> resolvePrincipalRolesByName(
+      List<EntityCacheEntry> toValidate, Set<String> roleNames) {
+    return roleNames.stream()
+        .filter(securityContext::isUserInRole)
+        .map(roleName -> resolveByName(toValidate, 
PolarisEntityType.PRINCIPAL_ROLE, roleName))
+        .collect(Collectors.toList());
+  }
+
   /**
    * Resolve the reference catalog and determine all activated role. The 
principal and principal
    * roles should have already been resolved
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/core/persistence/ResolverTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/ResolverTest.java
index f05cd789..9bc6cabf 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/core/persistence/ResolverTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/ResolverTest.java
@@ -22,21 +22,28 @@ import static 
org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RAND
 
 import jakarta.annotation.Nonnull;
 import jakarta.annotation.Nullable;
+import jakarta.ws.rs.core.SecurityContext;
+import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
 import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
 import org.apache.polaris.core.entity.PolarisBaseEntity;
 import org.apache.polaris.core.entity.PolarisEntityCore;
 import org.apache.polaris.core.entity.PolarisEntitySubType;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.entity.PolarisGrantRecord;
 import org.apache.polaris.core.entity.PolarisPrivilege;
+import org.apache.polaris.core.entity.PrincipalEntity;
+import org.apache.polaris.core.entity.PrincipalRoleEntity;
 import org.apache.polaris.core.persistence.cache.EntityCache;
 import org.apache.polaris.core.persistence.cache.EntityCacheEntry;
 import 
org.apache.polaris.core.persistence.cache.PolarisRemoteCache.CachedEntryResult;
@@ -439,12 +446,53 @@ public class ResolverTest {
     if (cache == null) {
       this.cache = new EntityCache(this.metaStoreManager);
     }
+    boolean allRoles = principalRolesScope == null;
+    Optional<List<PrincipalRoleEntity>> roleEntities =
+        Optional.ofNullable(principalRolesScope)
+            .map(
+                scopes ->
+                    scopes.stream()
+                        .map(
+                            role ->
+                                metaStoreManager.readEntityByName(
+                                    callCtx,
+                                    null,
+                                    PolarisEntityType.PRINCIPAL_ROLE,
+                                    PolarisEntitySubType.NULL_SUBTYPE,
+                                    role))
+                        
.filter(PolarisMetaStoreManager.EntityResult::isSuccess)
+                        .map(PolarisMetaStoreManager.EntityResult::getEntity)
+                        .map(PrincipalRoleEntity::of)
+                        .collect(Collectors.toList()));
+    AuthenticatedPolarisPrincipal authenticatedPrincipal =
+        new AuthenticatedPolarisPrincipal(
+            PrincipalEntity.of(P1), 
Optional.ofNullable(principalRolesScope).orElse(Set.of()));
     return new Resolver(
         this.callCtx,
-        this.metaStoreManager,
-        this.P1.getId(),
-        null,
-        principalRolesScope,
+        metaStoreManager,
+        new SecurityContext() {
+          @Override
+          public Principal getUserPrincipal() {
+            return authenticatedPrincipal;
+          }
+
+          @Override
+          public boolean isUserInRole(String role) {
+            return roleEntities
+                .map(l -> 
l.stream().map(PrincipalRoleEntity::getName).anyMatch(role::equals))
+                .orElse(allRoles);
+          }
+
+          @Override
+          public boolean isSecure() {
+            return false;
+          }
+
+          @Override
+          public String getAuthenticationScheme() {
+            return "";
+          }
+        },
         this.cache,
         referenceCatalogName);
   }
@@ -725,9 +773,6 @@ public class ResolverTest {
         }
       }
 
-      // validate that we were able to resolve the caller principal
-      this.ensureResolved(resolver.getResolvedCallerPrincipal(), 
PolarisEntityType.PRINCIPAL, "P1");
-
       // validate that the correct set if principal roles have been activated
       List<EntityCacheEntry> principalRolesResolved = 
resolver.getResolvedCallerPrincipalRoles();
       principalRolesResolved.sort(Comparator.comparing(p -> 
p.getEntity().getName()));
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
 
b/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
index b8447528..6a90e2ab 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
@@ -22,6 +22,8 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 import jakarta.annotation.Nonnull;
 import jakarta.annotation.Nullable;
+import jakarta.validation.constraints.NotNull;
+import jakarta.ws.rs.core.SecurityContext;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -44,6 +46,7 @@ import org.apache.iceberg.exceptions.NotFoundException;
 import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisConfiguration;
+import org.apache.polaris.core.PolarisDiagnostics;
 import org.apache.polaris.core.admin.model.CatalogGrant;
 import org.apache.polaris.core.admin.model.CatalogPrivilege;
 import org.apache.polaris.core.admin.model.GrantResource;
@@ -106,6 +109,7 @@ public class PolarisAdminService {
 
   private final CallContext callContext;
   private final PolarisEntityManager entityManager;
+  private final SecurityContext securityContext;
   private final AuthenticatedPolarisPrincipal authenticatedPrincipal;
   private final PolarisAuthorizer authorizer;
   private final PolarisMetaStoreManager metaStoreManager;
@@ -114,15 +118,25 @@ public class PolarisAdminService {
   private PolarisResolutionManifest resolutionManifest = null;
 
   public PolarisAdminService(
-      CallContext callContext,
-      PolarisEntityManager entityManager,
-      PolarisMetaStoreManager metaStoreManager,
-      AuthenticatedPolarisPrincipal authenticatedPrincipal,
-      PolarisAuthorizer authorizer) {
+      @NotNull CallContext callContext,
+      @NotNull PolarisEntityManager entityManager,
+      @NotNull PolarisMetaStoreManager metaStoreManager,
+      @NotNull SecurityContext securityContext,
+      @NotNull PolarisAuthorizer authorizer) {
     this.callContext = callContext;
     this.entityManager = entityManager;
     this.metaStoreManager = metaStoreManager;
-    this.authenticatedPrincipal = authenticatedPrincipal;
+    this.securityContext = securityContext;
+    PolarisDiagnostics diagServices = 
callContext.getPolarisCallContext().getDiagServices();
+    diagServices.checkNotNull(securityContext, "null_security_context");
+    diagServices.checkNotNull(securityContext.getUserPrincipal(), 
"null_security_context");
+    diagServices.check(
+        securityContext.getUserPrincipal() instanceof 
AuthenticatedPolarisPrincipal,
+        "unexpected_principal_type",
+        "class={}",
+        securityContext.getUserPrincipal().getClass().getName());
+    this.authenticatedPrincipal =
+        (AuthenticatedPolarisPrincipal) securityContext.getUserPrincipal();
     this.authorizer = authorizer;
   }
 
@@ -155,7 +169,7 @@ public class PolarisAdminService {
   private void authorizeBasicRootOperationOrThrow(PolarisAuthorizableOperation 
op) {
     resolutionManifest =
         entityManager.prepareResolutionManifest(
-            callContext, authenticatedPrincipal, null /* referenceCatalogName 
*/);
+            callContext, securityContext, null /* referenceCatalogName */);
     resolutionManifest.resolveAll();
     PolarisResolvedPathWrapper rootContainerWrapper =
         resolutionManifest.getResolvedRootContainerEntityAsPath();
@@ -181,8 +195,7 @@ public class PolarisAdminService {
       PolarisEntityType entityType,
       @Nullable String referenceCatalogName) {
     resolutionManifest =
-        entityManager.prepareResolutionManifest(
-            callContext, authenticatedPrincipal, referenceCatalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
referenceCatalogName);
     resolutionManifest.addTopLevelName(topLevelEntityName, entityType, false 
/* isOptional */);
     ResolverStatus status = resolutionManifest.resolveAll();
     if (status.getStatus() == 
ResolverStatus.StatusEnum.ENTITY_COULD_NOT_BE_RESOLVED) {
@@ -215,7 +228,7 @@ public class PolarisAdminService {
   private void authorizeBasicCatalogRoleOperationOrThrow(
       PolarisAuthorizableOperation op, String catalogName, String 
catalogRoleName) {
     resolutionManifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
     resolutionManifest.addPath(
         new ResolverPath(List.of(catalogRoleName), 
PolarisEntityType.CATALOG_ROLE),
         catalogRoleName);
@@ -235,7 +248,7 @@ public class PolarisAdminService {
   private void authorizeGrantOnRootContainerToPrincipalRoleOperationOrThrow(
       PolarisAuthorizableOperation op, String principalRoleName) {
     resolutionManifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, null);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
null);
     resolutionManifest.addTopLevelName(
         principalRoleName, PolarisEntityType.PRINCIPAL_ROLE, false /* 
isOptional */);
     ResolverStatus status = resolutionManifest.resolveAll();
@@ -268,7 +281,7 @@ public class PolarisAdminService {
       PolarisEntityType topLevelEntityType,
       String principalRoleName) {
     resolutionManifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, null);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
null);
     resolutionManifest.addTopLevelName(
         topLevelEntityName, topLevelEntityType, false /* isOptional */);
     resolutionManifest.addTopLevelName(
@@ -301,7 +314,7 @@ public class PolarisAdminService {
   private void authorizeGrantOnPrincipalRoleToPrincipalOperationOrThrow(
       PolarisAuthorizableOperation op, String principalRoleName, String 
principalName) {
     resolutionManifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, null);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
null);
     resolutionManifest.addTopLevelName(
         principalRoleName, PolarisEntityType.PRINCIPAL_ROLE, false /* 
isOptional */);
     resolutionManifest.addTopLevelName(
@@ -334,7 +347,7 @@ public class PolarisAdminService {
       String catalogRoleName,
       String principalRoleName) {
     resolutionManifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
     resolutionManifest.addPath(
         new ResolverPath(List.of(catalogRoleName), 
PolarisEntityType.CATALOG_ROLE),
         catalogRoleName);
@@ -369,7 +382,7 @@ public class PolarisAdminService {
   private void authorizeGrantOnCatalogOperationOrThrow(
       PolarisAuthorizableOperation op, String catalogName, String 
catalogRoleName) {
     resolutionManifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
     resolutionManifest.addTopLevelName(
         catalogName, PolarisEntityType.CATALOG, false /* isOptional */);
     resolutionManifest.addPath(
@@ -401,7 +414,7 @@ public class PolarisAdminService {
       Namespace namespace,
       String catalogRoleName) {
     resolutionManifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
     resolutionManifest.addPath(
         new ResolverPath(Arrays.asList(namespace.levels()), 
PolarisEntityType.NAMESPACE),
         namespace);
@@ -441,7 +454,7 @@ public class PolarisAdminService {
       TableIdentifier identifier,
       String catalogRoleName) {
     resolutionManifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
     resolutionManifest.addPath(
         new ResolverPath(
             PolarisCatalogHelpers.tableIdentifierToList(identifier), 
PolarisEntityType.TABLE_LIKE),
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
 
b/service/common/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
index 72ea099c..f67f9da5 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/admin/PolarisServiceImpl.java
@@ -111,7 +111,7 @@ public class PolarisServiceImpl
         CallContext.getCurrentContext(),
         entityManager,
         metaStoreManager,
-        authenticatedPrincipal,
+        securityContext,
         polarisAuthorizer);
   }
 
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
index 3958e79e..4b20d95f 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/BasePolarisCatalog.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import jakarta.annotation.Nonnull;
 import jakarta.annotation.Nullable;
+import jakarta.ws.rs.core.SecurityContext;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
@@ -152,6 +153,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
   private final PolarisResolutionManifestCatalogView resolvedEntityView;
   private final CatalogEntity catalogEntity;
   private final TaskExecutor taskExecutor;
+  private final SecurityContext securityContext;
   private final AuthenticatedPolarisPrincipal authenticatedPrincipal;
   private String ioImplClassName;
   private FileIO catalogFileIO;
@@ -177,6 +179,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
       PolarisMetaStoreManager metaStoreManager,
       CallContext callContext,
       PolarisResolutionManifestCatalogView resolvedEntityView,
+      SecurityContext securityContext,
       AuthenticatedPolarisPrincipal authenticatedPrincipal,
       TaskExecutor taskExecutor,
       FileIOFactory fileIOFactory) {
@@ -185,6 +188,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
     this.resolvedEntityView = resolvedEntityView;
     this.catalogEntity =
         
CatalogEntity.of(resolvedEntityView.getResolvedReferenceCatalogEntity().getRawLeafEntity());
+    this.securityContext = securityContext;
     this.authenticatedPrincipal = authenticatedPrincipal;
     this.taskExecutor = taskExecutor;
     this.catalogId = catalogEntity.getId();
@@ -1126,7 +1130,7 @@ public class BasePolarisCatalog extends 
BaseMetastoreViewCatalog
         siblingTables.size() + siblingNamespaces.size());
     PolarisResolutionManifest resolutionManifest =
         new PolarisResolutionManifest(
-            callContext, entityManager, authenticatedPrincipal, 
parentPath.getFirst().getName());
+            callContext, entityManager, securityContext, 
parentPath.getFirst().getName());
     siblingTables.forEach(
         tbl ->
             resolutionManifest.addPath(
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
index 483778cc..b0d29140 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/IcebergCatalogAdapter.java
@@ -142,7 +142,7 @@ public class IcebergCatalogAdapter
         CallContext.getCurrentContext(),
         entityManager,
         metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext),
-        authenticatedPrincipal,
+        securityContext,
         catalogFactory,
         catalogName,
         polarisAuthorizer);
@@ -542,8 +542,7 @@ public class IcebergCatalogAdapter
     }
     // FIXME remove call to CallContext.getCurrentContext()
     CallContext callContext = CallContext.getCurrentContext();
-    Resolver resolver =
-        entityManager.prepareResolver(callContext, authenticatedPrincipal, 
warehouse);
+    Resolver resolver = entityManager.prepareResolver(callContext, 
securityContext, warehouse);
     ResolverStatus resolverStatus = resolver.resolveAll();
     if (!resolverStatus.getStatus().equals(ResolverStatus.StatusEnum.SUCCESS)) 
{
       throw new NotFoundException("Unable to find warehouse %s", warehouse);
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
index 3d12f75a..823cb0db 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/PolarisCatalogHandlerWrapper.java
@@ -20,6 +20,7 @@ package org.apache.polaris.service.catalog;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import jakarta.ws.rs.core.SecurityContext;
 import java.io.Closeable;
 import java.io.IOException;
 import java.time.OffsetDateTime;
@@ -72,6 +73,7 @@ import org.apache.iceberg.rest.responses.LoadViewResponse;
 import org.apache.iceberg.rest.responses.UpdateNamespacePropertiesResponse;
 import org.apache.polaris.core.PolarisConfiguration;
 import org.apache.polaris.core.PolarisConfigurationStore;
+import org.apache.polaris.core.PolarisDiagnostics;
 import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
 import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
 import org.apache.polaris.core.auth.PolarisAuthorizer;
@@ -116,6 +118,7 @@ public class PolarisCatalogHandlerWrapper {
   private final PolarisMetaStoreManager metaStoreManager;
   private final String catalogName;
   private final AuthenticatedPolarisPrincipal authenticatedPrincipal;
+  private final SecurityContext securityContext;
   private final PolarisAuthorizer authorizer;
   private final CallContextCatalogFactory catalogFactory;
 
@@ -132,7 +135,7 @@ public class PolarisCatalogHandlerWrapper {
       CallContext callContext,
       PolarisEntityManager entityManager,
       PolarisMetaStoreManager metaStoreManager,
-      AuthenticatedPolarisPrincipal authenticatedPrincipal,
+      SecurityContext securityContext,
       CallContextCatalogFactory catalogFactory,
       String catalogName,
       PolarisAuthorizer authorizer) {
@@ -140,7 +143,16 @@ public class PolarisCatalogHandlerWrapper {
     this.entityManager = entityManager;
     this.metaStoreManager = metaStoreManager;
     this.catalogName = catalogName;
-    this.authenticatedPrincipal = authenticatedPrincipal;
+    PolarisDiagnostics diagServices = 
callContext.getPolarisCallContext().getDiagServices();
+    diagServices.checkNotNull(securityContext, "null_security_context");
+    diagServices.checkNotNull(securityContext.getUserPrincipal(), 
"null_user_principal");
+    diagServices.check(
+        securityContext.getUserPrincipal() instanceof 
AuthenticatedPolarisPrincipal,
+        "invalid_principal_type",
+        "Principal must be an AuthenticatedPolarisPrincipal");
+    this.securityContext = securityContext;
+    this.authenticatedPrincipal =
+        (AuthenticatedPolarisPrincipal) securityContext.getUserPrincipal();
     this.authorizer = authorizer;
     this.catalogFactory = catalogFactory;
   }
@@ -169,7 +181,7 @@ public class PolarisCatalogHandlerWrapper {
   private void initializeCatalog() {
     this.baseCatalog =
         catalogFactory.createCallContextCatalog(
-            callContext, authenticatedPrincipal, resolutionManifest);
+            callContext, authenticatedPrincipal, securityContext, 
resolutionManifest);
     this.namespaceCatalog =
         (baseCatalog instanceof SupportsNamespaces) ? (SupportsNamespaces) 
baseCatalog : null;
     this.viewCatalog = (baseCatalog instanceof ViewCatalog) ? (ViewCatalog) 
baseCatalog : null;
@@ -186,7 +198,7 @@ public class PolarisCatalogHandlerWrapper {
       List<Namespace> extraPassthroughNamespaces,
       List<TableIdentifier> extraPassthroughTableLikes) {
     resolutionManifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
     resolutionManifest.addPath(
         new ResolverPath(Arrays.asList(namespace.levels()), 
PolarisEntityType.NAMESPACE),
         namespace);
@@ -227,7 +239,7 @@ public class PolarisCatalogHandlerWrapper {
   private void authorizeCreateNamespaceUnderNamespaceOperationOrThrow(
       PolarisAuthorizableOperation op, Namespace namespace) {
     resolutionManifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
 
     Namespace parentNamespace = 
PolarisCatalogHelpers.getParentNamespace(namespace);
     resolutionManifest.addPath(
@@ -262,7 +274,7 @@ public class PolarisCatalogHandlerWrapper {
     Namespace namespace = identifier.namespace();
 
     resolutionManifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
     resolutionManifest.addPath(
         new ResolverPath(Arrays.asList(namespace.levels()), 
PolarisEntityType.NAMESPACE),
         namespace);
@@ -297,7 +309,7 @@ public class PolarisCatalogHandlerWrapper {
   private void authorizeBasicTableLikeOperationOrThrow(
       PolarisAuthorizableOperation op, PolarisEntitySubType subType, 
TableIdentifier identifier) {
     resolutionManifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
 
     // The underlying Catalog is also allowed to fetch "fresh" versions of the 
target entity.
     resolutionManifest.addPassthroughPath(
@@ -331,7 +343,7 @@ public class PolarisCatalogHandlerWrapper {
       final PolarisEntitySubType subType,
       List<TableIdentifier> ids) {
     resolutionManifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
     ids.forEach(
         identifier ->
             resolutionManifest.addPassthroughPath(
@@ -385,7 +397,7 @@ public class PolarisCatalogHandlerWrapper {
       TableIdentifier src,
       TableIdentifier dst) {
     resolutionManifest =
-        entityManager.prepareResolutionManifest(callContext, 
authenticatedPrincipal, catalogName);
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
     // Add src, dstParent, and dst(optional)
     resolutionManifest.addPath(
         new ResolverPath(
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
 
b/service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
index 24551a2d..7c812d6d 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/context/CallContextCatalogFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.polaris.service.context;
 
+import jakarta.ws.rs.core.SecurityContext;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
 import org.apache.polaris.core.context.CallContext;
@@ -27,5 +28,6 @@ public interface CallContextCatalogFactory {
   Catalog createCallContextCatalog(
       CallContext context,
       AuthenticatedPolarisPrincipal authenticatedPrincipal,
+      SecurityContext securityContext,
       PolarisResolutionManifest resolvedManifest);
 }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
 
b/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
index a72f7143..41dbec81 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/context/PolarisCallContextCatalogFactory.java
@@ -19,6 +19,7 @@
 package org.apache.polaris.service.context;
 
 import jakarta.inject.Inject;
+import jakarta.ws.rs.core.SecurityContext;
 import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Map;
@@ -67,6 +68,7 @@ public class PolarisCallContextCatalogFactory implements 
CallContextCatalogFacto
   public Catalog createCallContextCatalog(
       CallContext context,
       AuthenticatedPolarisPrincipal authenticatedPrincipal,
+      SecurityContext securityContext,
       final PolarisResolutionManifest resolvedManifest) {
     PolarisBaseEntity baseCatalogEntity =
         
resolvedManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity();
@@ -85,6 +87,7 @@ public class PolarisCallContextCatalogFactory implements 
CallContextCatalogFacto
             
metaStoreManagerFactory.getOrCreateMetaStoreManager(context.getRealmContext()),
             context,
             resolvedManifest,
+            securityContext,
             authenticatedPrincipal,
             taskExecutor,
             fileIOFactory);

Reply via email to