This is an automated email from the ASF dual-hosted git repository.

jerryshao pushed a commit to branch branch-1.2
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-1.2 by this push:
     new b719c25cc0 [#10907] improvement(auth): Optimize JCasbin policy lookup 
(#10908)
b719c25cc0 is described below

commit b719c25cc08ef86da3344b46a28a77ce50602c9a
Author: Qi Yu <[email protected]>
AuthorDate: Wed May 6 15:32:49 2026 +0800

    [#10907] improvement(auth): Optimize JCasbin policy lookup (#10908)
    
    ### What changes were proposed in this pull request?
    
    This PR optimizes the JCasbin authorization path by adding a per-role
    policy index in JcasbinAuthorizer.
    
    The main changes are:
    
    - Build and maintain a per-role policy index when role privileges are
    loaded.
    - Cache the current request's user role IDs in
    AuthorizationRequestContext and use them for authorization checks.
    - Resolve allow/deny authorization with hash lookups across the user's
    current roles instead of calling JCasbin enforce for every privilege
    probe.
    - Remove the role's policy index when the role privilege cache entry is
    invalidated.
    - Update TestJcasbinAuthorizer to verify that changed user-role
    assignments take effect immediately.
    
    ### Why are the changes needed?
    
    The previous JCasbin authorization path used enforce for each
    authorization probe, which scans the loaded policy set and becomes
    expensive when the number of roles and policies grows.
    
    It also depended on role links cached inside the enforcer. If a user's
    role assignment changed while a role's policies were still cached,
    authorization could still be evaluated against stale role links.
    
    With this change, each authorization request loads the user's current
    role list once, then checks the indexed policies for only those roles.
    This reduces the hot-path lookup cost and makes user-role assignment
    changes visible without waiting for role privilege cache invalidation.
    
    Closes: #10907
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    ./gradlew :server-common:test --tests
    org.apache.gravitino.server.authorization.jcasbin.TestJcasbinAuthorizer
    -PskipITs -PskipDockerTests=false
---
 .../authorization/AuthorizationRequestContext.java |  44 ++
 .../TestAuthorizationRequestContext.java           |  40 +
 .../authorization/jcasbin/JcasbinAuthorizer.java   | 280 ++++---
 .../jcasbin/TestJcasbinAuthorizer.java             | 845 ++++++++++++++++++++-
 4 files changed, 1094 insertions(+), 115 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationRequestContext.java
 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationRequestContext.java
index 392eb13c27..050c093ce3 100644
--- 
a/core/src/main/java/org/apache/gravitino/authorization/AuthorizationRequestContext.java
+++ 
b/core/src/main/java/org/apache/gravitino/authorization/AuthorizationRequestContext.java
@@ -18,11 +18,15 @@
 package org.apache.gravitino.authorization;
 
 import java.security.Principal;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
+import javax.annotation.Nullable;
 import org.apache.gravitino.MetadataObject;
 
 public class AuthorizationRequestContext {
@@ -36,6 +40,13 @@ public class AuthorizationRequestContext {
   /** Used to determine whether the role has already been loaded. */
   private final AtomicBoolean hasLoadRole = new AtomicBoolean();
 
+  /**
+   * Role IDs of the current principal in the metalake under check. Populated 
once per request from
+   * {@code loadRole} and reused by every {@code authorize}/{@code deny} call 
so we don't re-derive
+   * the user→role linkage on each enforcer probe.
+   */
+  private volatile Set<Long> userRoleIds = Collections.emptySet();
+
   private volatile String originalAuthorizationExpression;
 
   /**
@@ -103,6 +114,39 @@ public class AuthorizationRequestContext {
     this.originalAuthorizationExpression = originalAuthorizationExpression;
   }
 
+  /**
+   * Returns the user role IDs associated with the current authorization 
request.
+   *
+   * <p>This context is request-scoped and is expected to be used only for the 
lifetime of a single
+   * authorization request.
+   *
+   * <p>The returned set is immutable and safe to iterate without defensive 
copying.
+   *
+   * @return the user role IDs for the current request, or an empty set if 
none have been set
+   */
+  public Set<Long> getUserRoleIds() {
+    return userRoleIds;
+  }
+
+  /**
+   * Sets the user role IDs associated with the current authorization request.
+   *
+   * <p>This context is request-scoped and is expected to be used only for the 
lifetime of a single
+   * authorization request.
+   *
+   * <p>The provided set is defensively copied before being stored, so 
subsequent caller-side
+   * mutations are not reflected in this context.
+   *
+   * @param userRoleIds the user role IDs for the current request; if {@code 
null}, an empty set is
+   *     stored
+   */
+  public void setUserRoleIds(@Nullable Set<Long> userRoleIds) {
+    this.userRoleIds =
+        userRoleIds == null || userRoleIds.isEmpty()
+            ? Collections.emptySet()
+            : Collections.unmodifiableSet(new HashSet<>(userRoleIds));
+  }
+
   public static class AuthorizationKey {
     private Principal principal;
     private String metalake;
diff --git 
a/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationRequestContext.java
 
b/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationRequestContext.java
index 2216a7dcfe..619cd77c22 100644
--- 
a/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationRequestContext.java
+++ 
b/core/src/test/java/org/apache/gravitino/authorization/TestAuthorizationRequestContext.java
@@ -19,8 +19,14 @@ package org.apache.gravitino.authorization;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import com.google.common.collect.ImmutableSet;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -101,4 +107,38 @@ public class TestAuthorizationRequestContext {
     context.loadRole(counter::incrementAndGet);
     assertEquals(2, counter.get(), "After a successful loadRole, further calls 
must be ignored.");
   }
+
+  @Test
+  public void testUserRoleIdsDefaultsToEmptySet() {
+    AuthorizationRequestContext context = new AuthorizationRequestContext();
+    assertEquals(Collections.emptySet(), context.getUserRoleIds());
+    assertTrue(context.getUserRoleIds().isEmpty());
+  }
+
+  @Test
+  public void testSetUserRoleIdsWithNullNormalizesToEmptySet() {
+    AuthorizationRequestContext context = new AuthorizationRequestContext();
+    context.setUserRoleIds(ImmutableSet.of(1L, 2L));
+    context.setUserRoleIds(null);
+    // Null must not propagate; downstream callers iterate the set without a 
null check.
+    assertSame(Collections.emptySet(), context.getUserRoleIds());
+  }
+
+  @Test
+  public void testSetUserRoleIdsRoundTrip() {
+    AuthorizationRequestContext context = new AuthorizationRequestContext();
+    Set<Long> roles = ImmutableSet.of(7L, 11L, 13L);
+    context.setUserRoleIds(roles);
+    assertEquals(roles, context.getUserRoleIds());
+  }
+
+  @Test
+  public void testSetUserRoleIdsDefensivelyCopiesAndReturnsImmutableSet() {
+    AuthorizationRequestContext context = new AuthorizationRequestContext();
+    Set<Long> roles = new HashSet<>(ImmutableSet.of(17L, 19L));
+    context.setUserRoleIds(roles);
+    roles.add(23L);
+    assertEquals(ImmutableSet.of(17L, 19L), context.getUserRoleIds());
+    assertThrows(UnsupportedOperationException.class, () -> 
context.getUserRoleIds().add(29L));
+  }
 }
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
index cea47e353b..3a2b1847eb 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authorization/jcasbin/JcasbinAuthorizer.java
@@ -27,14 +27,20 @@ import java.nio.charset.StandardCharsets;
 import java.security.Principal;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Configs;
@@ -75,17 +81,11 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
   /** Jcasbin deny enforcer is used for metadata authorization. */
   private Enforcer denyEnforcer;
 
-  /** allow internal authorizer */
-  private InternalAuthorizer allowInternalAuthorizer;
-
-  /** deny internal authorizer */
-  private InternalAuthorizer denyInternalAuthorizer;
-
   /**
-   * loadedRoles is used to cache roles that have loaded permissions. When the 
permissions of a role
-   * are updated, they should be removed from it.
+   * loadedRoles caches the indexed privileges for each loaded role. When a 
role's privileges are
+   * updated, the role should be removed from this cache.
    */
-  private Cache<Long, Boolean> loadedRoles;
+  private Cache<Long, Map<PolicyKey, Effect>> loadedRoles;
 
   private Cache<Long, Optional<Long>> ownerRel;
 
@@ -104,9 +104,7 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
 
     // Initialize enforcers before the caches that reference them in removal 
listeners
     allowEnforcer = new SyncedEnforcer(getModel("/jcasbin_model.conf"), new 
GravitinoAdapter());
-    allowInternalAuthorizer = new InternalAuthorizer(allowEnforcer);
     denyEnforcer = new SyncedEnforcer(getModel("/jcasbin_model.conf"), new 
GravitinoAdapter());
-    denyInternalAuthorizer = new InternalAuthorizer(denyEnforcer);
 
     loadedRoles =
         Caffeine.newBuilder()
@@ -164,12 +162,13 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
             metadataObject,
             privilege,
             (authorizationKey) ->
-                allowInternalAuthorizer.authorizeInternal(
-                    authorizationKey.getPrincipal().getName(),
-                    authorizationKey.getMetalake(),
-                    authorizationKey.getMetadataObject(),
-                    authorizationKey.getPrivilege().name(),
-                    requestContext));
+                loadAndResolveEffect(
+                        authorizationKey.getPrincipal().getName(),
+                        authorizationKey.getMetalake(),
+                        authorizationKey.getMetadataObject(),
+                        authorizationKey.getPrivilege().name(),
+                        requestContext)
+                    == Effect.ALLOW);
     LOG.debug(
         "Authorization expression: {},privilege {}, result {}\n, principal 
{},metalake {},metadata object {}",
         requestContext.getOriginalAuthorizationExpression(),
@@ -195,12 +194,13 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
             metadataObject,
             privilege,
             (authorizationKey) ->
-                denyInternalAuthorizer.authorizeInternal(
-                    authorizationKey.getPrincipal().getName(),
-                    authorizationKey.getMetalake(),
-                    authorizationKey.getMetadataObject(),
-                    authorizationKey.getPrivilege().name(),
-                    requestContext));
+                loadAndResolveEffect(
+                        authorizationKey.getPrincipal().getName(),
+                        authorizationKey.getMetalake(),
+                        authorizationKey.getMetadataObject(),
+                        authorizationKey.getPrivilege().name(),
+                        requestContext)
+                    == Effect.DENY);
     LOG.debug(
         "Authorization expression: {},privilege {},deny result {}\n, principal 
{},metalake {},metadata object {}",
         requestContext.getOriginalAuthorizationExpression(),
@@ -405,56 +405,81 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
     }
   }
 
-  private class InternalAuthorizer {
-
-    Enforcer enforcer;
-
-    public InternalAuthorizer(Enforcer enforcer) {
-      this.enforcer = enforcer;
+  /**
+   * Loads role privileges (if not yet loaded for this request) and resolves 
the effect of a single
+   * privilege probe. Returns {@code null} when the user/metadata cannot be 
looked up, so the caller
+   * treats it as "no rule applies".
+   */
+  @Nullable
+  private Effect loadAndResolveEffect(
+      String username,
+      String metalake,
+      MetadataObject metadataObject,
+      String privilege,
+      AuthorizationRequestContext requestContext) {
+    Long metadataId;
+    Long userId;
+    try {
+      UserEntity userEntity = getUserEntity(username, metalake);
+      userId = userEntity.id();
+      metadataId = MetadataIdConverter.getID(metadataObject, metalake);
+    } catch (Exception e) {
+      LOG.debug("Can not get entity id", e);
+      return null;
     }
+    loadRolePrivilege(metalake, username, userId, requestContext);
+    return resolveEffect(userId, metadataObject, metadataId, privilege, 
requestContext);
+  }
 
-    private boolean authorizeInternal(
-        String username,
-        String metalake,
-        MetadataObject metadataObject,
-        String privilege,
-        AuthorizationRequestContext requestContext) {
-      return loadPrivilegeAndAuthorize(
-          username, metalake, metadataObject, privilege, requestContext);
+  /**
+   * Resolve a single privilege probe against the per-role policy index. 
Replaces the previous
+   * {@code enforcer.enforce} call, which scanned every policy line in the 
enforcer for each probe.
+   * Per-request cost goes from {@code O(total_policies)} to {@code 
O(roles_per_user)} hash probes.
+   *
+   * <p>Returns {@link Effect#ALLOW} or {@link Effect#DENY} when a matching 
rule is found, or {@code
+   * null} when no role grants or denies this key. The top-level {@code 
authorize}/{@code deny}
+   * entrypoints share this resolver and only differ in how they compare the 
returned effect to a
+   * boolean.
+   *
+   * <p>Cross-role priority: any role with {@link Effect#DENY} short-circuits 
and beats {@link
+   * Effect#ALLOW} from other roles. Within a single role DENY also beats 
ALLOW; that ordering is
+   * enforced by {@link #loadPolicyByRoleEntity} when building the index.
+   *
+   * <p>OWNER is resolved against {@link #ownerRel} rather than the role 
index: being the owner maps
+   * to {@link Effect#ALLOW}, otherwise {@code null} (so {@code deny(OWNER)} 
stays {@code false} for
+   * owners — only an explicit DENY policy can deny a privilege).
+   */
+  @Nullable
+  private Effect resolveEffect(
+      Long userId,
+      MetadataObject metadataObject,
+      Long metadataId,
+      String privilege,
+      AuthorizationRequestContext requestContext) {
+    if (AuthConstants.OWNER.equals(privilege)) {
+      Optional<Long> owner = ownerRel.getIfPresent(metadataId);
+      return Objects.equals(Optional.of(userId), owner) ? Effect.ALLOW : null;
     }
-
-    private boolean loadPrivilegeAndAuthorize(
-        String username,
-        String metalake,
-        MetadataObject metadataObject,
-        String privilege,
-        AuthorizationRequestContext requestContext) {
-      Long metadataId;
-      Long userId;
-      try {
-        UserEntity userEntity = getUserEntity(username, metalake);
-        userId = userEntity.id();
-        metadataId = MetadataIdConverter.getID(metadataObject, metalake);
-      } catch (Exception e) {
-        LOG.debug("Can not get entity id", e);
-        return false;
-      }
-      loadRolePrivilege(metalake, username, userId, requestContext);
-      return authorizeByJcasbin(userId, metadataObject, metadataId, privilege);
+    Set<Long> roleIds = requestContext.getUserRoleIds();
+    if (roleIds.isEmpty()) {
+      return null;
     }
-
-    private boolean authorizeByJcasbin(
-        Long userId, MetadataObject metadataObject, Long metadataId, String 
privilege) {
-      if (AuthConstants.OWNER.equals(privilege)) {
-        Optional<Long> owner = ownerRel.getIfPresent(metadataId);
-        return Objects.equals(Optional.of(userId), owner);
+    PolicyKey key = new PolicyKey(metadataObject.type().name(), metadataId, 
privilege);
+    Effect resolved = null;
+    for (Long roleId : roleIds) {
+      Map<PolicyKey, Effect> idx = loadedRoles.getIfPresent(roleId);
+      if (idx == null) {
+        continue;
+      }
+      Effect effect = idx.get(key);
+      if (effect == Effect.DENY) {
+        return Effect.DENY;
+      }
+      if (effect == Effect.ALLOW) {
+        resolved = Effect.ALLOW;
       }
-      return enforcer.enforce(
-          String.valueOf(userId),
-          String.valueOf(metadataObject.type()),
-          String.valueOf(metadataId),
-          privilege);
     }
+    return resolved;
   }
 
   private static UserEntity getUserEntity(String username, String metalake) 
throws IOException {
@@ -482,36 +507,40 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
                         SupportsRelationOperations.Type.ROLE_USER_REL,
                         userNameIdentifier,
                         Entity.EntityType.USER);
+            Set<Long> roleIds = new HashSet<>(entities.size());
             List<CompletableFuture<Void>> loadRoleFutures = new ArrayList<>();
             for (RoleEntity role : entities) {
               Long roleId = role.id();
+              roleIds.add(roleId);
               allowEnforcer.addRoleForUser(String.valueOf(userId), 
String.valueOf(roleId));
               denyEnforcer.addRoleForUser(String.valueOf(userId), 
String.valueOf(roleId));
               if (loadedRoles.getIfPresent(roleId) != null) {
                 continue;
               }
               CompletableFuture<Void> loadRoleFuture =
-                  CompletableFuture.supplyAsync(
-                          () -> {
-                            try {
-                              return entityStore.get(
-                                  NameIdentifierUtil.ofRole(metalake, 
role.name()),
-                                  Entity.EntityType.ROLE,
-                                  RoleEntity.class);
-                            } catch (Exception e) {
-                              throw new RuntimeException("Failed to load role: 
" + role.name(), e);
-                            }
-                          },
-                          executor)
-                      .thenAcceptAsync(
-                          roleEntity -> {
-                            loadPolicyByRoleEntity(roleEntity);
-                            loadedRoles.put(roleId, true);
-                          },
-                          executor);
+                  CompletableFuture.runAsync(
+                      () -> {
+                        loadedRoles.get(
+                            roleId,
+                            unused -> {
+                              try {
+                                RoleEntity roleEntity =
+                                    entityStore.get(
+                                        NameIdentifierUtil.ofRole(metalake, 
role.name()),
+                                        Entity.EntityType.ROLE,
+                                        RoleEntity.class);
+                                return loadPolicyByRoleEntity(roleEntity);
+                              } catch (Exception e) {
+                                throw new RuntimeException(
+                                    "Failed to load role: " + role.name(), e);
+                              }
+                            });
+                      },
+                      executor);
               loadRoleFutures.add(loadRoleFuture);
             }
             CompletableFuture.allOf(loadRoleFutures.toArray(new 
CompletableFuture[0])).join();
+            requestContext.setUserRoleIds(roleIds);
           } catch (IOException e) {
             throw new RuntimeException(e);
           }
@@ -548,22 +577,27 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
     }
   }
 
-  private void loadPolicyByRoleEntity(RoleEntity roleEntity) {
+  private Map<PolicyKey, Effect> loadPolicyByRoleEntity(RoleEntity roleEntity) 
{
     String metalake = 
NameIdentifierUtil.getMetalake(roleEntity.nameIdentifier());
     List<SecurableObject> securableObjects = roleEntity.securableObjects();
+    Long roleId = roleEntity.id();
+    String roleIdStr = String.valueOf(roleId);
+    Map<PolicyKey, Effect> index = new ConcurrentHashMap<>();
 
     for (SecurableObject securableObject : securableObjects) {
+      Long metadataId = MetadataIdConverter.getID(securableObject, metalake);
+      String metadataIdStr = String.valueOf(metadataId);
+      String typeName = securableObject.type().name();
       for (Privilege privilege : securableObject.privileges()) {
         Privilege.Condition condition = privilege.condition();
-        if (AuthConstants.DENY.equalsIgnoreCase(condition.name())) {
+        String privilegeName =
+            AuthorizationUtils.replaceLegacyPrivilegeName(privilege.name())
+                .name()
+                .toUpperCase(Locale.ROOT);
+        boolean isDeny = AuthConstants.DENY.equalsIgnoreCase(condition.name());
+        if (isDeny) {
           denyEnforcer.addPolicy(
-              String.valueOf(roleEntity.id()),
-              securableObject.type().name(),
-              String.valueOf(MetadataIdConverter.getID(securableObject, 
metalake)),
-              AuthorizationUtils.replaceLegacyPrivilegeName(privilege.name())
-                  .name()
-                  .toUpperCase(java.util.Locale.ROOT),
-              AuthConstants.ALLOW);
+              roleIdStr, typeName, metadataIdStr, privilegeName, 
AuthConstants.ALLOW);
         }
         // Since different roles of a user may simultaneously hold both 
"allow" and "deny"
         // permissions
@@ -574,14 +608,58 @@ public class JcasbinAuthorizer implements 
GravitinoAuthorizer {
         // roles should receive a false result when calling the authorize 
method.
 
         allowEnforcer.addPolicy(
-            String.valueOf(roleEntity.id()),
-            securableObject.type().name(),
-            String.valueOf(MetadataIdConverter.getID(securableObject, 
metalake)),
-            AuthorizationUtils.replaceLegacyPrivilegeName(privilege.name())
-                .name()
-                .toUpperCase(java.util.Locale.ROOT),
-            condition.name().toLowerCase(java.util.Locale.ROOT));
+            roleIdStr,
+            typeName,
+            metadataIdStr,
+            privilegeName,
+            condition.name().toLowerCase(Locale.ROOT));
+
+        // Populate the per-role index. Within a single role DENY wins over 
ALLOW so that the
+        // index agrees with the allowEnforcer's policy_effect (some allow && 
!some deny).
+        PolicyKey key = new PolicyKey(typeName, metadataId, privilegeName);
+        Effect effect = isDeny ? Effect.DENY : Effect.ALLOW;
+        index.merge(
+            key, effect, (existing, incoming) -> existing == Effect.DENY ? 
existing : incoming);
       }
     }
+    return index;
+  }
+
+  /** Composite key for the per-role policy index. */
+  static final class PolicyKey {
+    private final String type;
+    private final Long metadataId;
+    private final String privilege;
+    private final int hash;
+
+    PolicyKey(String type, Long metadataId, String privilege) {
+      this.type = type;
+      this.metadataId = metadataId;
+      this.privilege = privilege;
+      this.hash = Objects.hash(type, metadataId, privilege);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (!(o instanceof PolicyKey)) {
+        return false;
+      }
+      PolicyKey other = (PolicyKey) o;
+      return hash == other.hash
+          && Objects.equals(metadataId, other.metadataId)
+          && Objects.equals(type, other.type)
+          && Objects.equals(privilege, other.privilege);
+    }
+
+    @Override
+    public int hashCode() {
+      return hash;
+    }
+  }
+
+  /** Per-role per-key effect; DENY beats ALLOW within a role and across 
roles. */
+  enum Effect {
+    ALLOW,
+    DENY
   }
 }
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
index 4388ef0952..fa0c86fee2 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authorization/jcasbin/TestJcasbinAuthorizer.java
@@ -17,8 +17,13 @@
 
 package org.apache.gravitino.server.authorization.jcasbin;
 
+import static org.apache.gravitino.authorization.Privilege.Name.RUN_JOB;
+import static org.apache.gravitino.authorization.Privilege.Name.SELECT_TABLE;
 import static org.apache.gravitino.authorization.Privilege.Name.USE_CATALOG;
+import static org.apache.gravitino.authorization.Privilege.Name.USE_SCHEMA;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -37,9 +42,17 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.security.Principal;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Entity;
@@ -70,6 +83,7 @@ import org.apache.gravitino.utils.PrincipalUtils;
 import org.casbin.jcasbin.main.Enforcer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
 
@@ -149,6 +163,13 @@ public class TestJcasbinAuthorizer {
         .thenReturn(baseMetalake);
   }
 
+  @BeforeEach
+  public void resetUserRoleStubBetweenTests() throws IOException {
+    // Restore the default empty role assignment for "tester" so that one 
test's stubbed roles
+    // don't leak into the next test. Each test re-stubs its own role list 
before asserting.
+    mockUserRoles(NameIdentifierUtil.ofUser(METALAKE, USERNAME));
+  }
+
   @AfterAll
   public static void stop() {
     if (principalUtilsMockedStatic != null) {
@@ -185,9 +206,10 @@ public class TestJcasbinAuthorizer {
             eq(Entity.EntityType.USER)))
         .thenReturn(ImmutableList.of(allowRole));
     assertTrue(doAuthorize(currentPrincipal));
-    // Test role cache.
-    // When permissions are changed but handleRolePrivilegeChange is not 
executed, the system will
-    // use the cached permissions in JCasbin, so authorize can succeed.
+    // After re-assigning the user from allowRole to a role with no 
privileges, authorize must
+    // return false even though allowRole's policies are still cached in the 
enforcer. Each
+    // request iterates the user's fresh role list, so removed role 
assignments take effect
+    // immediately without waiting for a handleRolePrivilegeChange call.
     Long newRoleId = -1L;
     RoleEntity tempNewRole = getRoleEntity(newRoleId, "tempNewRole", 
ImmutableList.of());
     when(entityStore.get(
@@ -200,10 +222,11 @@ public class TestJcasbinAuthorizer {
             eq(userNameIdentifier),
             eq(Entity.EntityType.USER)))
         .thenReturn(ImmutableList.of(tempNewRole));
-    assertTrue(doAuthorize(currentPrincipal));
-    // After clearing the cache, authorize will fail
+    assertFalse(doAuthorize(currentPrincipal));
+    // Invalidating the role cache is still a no-op in this scenario; we left 
it in to exercise
+    // the invalidation path.
     jcasbinAuthorizer.handleRolePrivilegeChange(ALLOW_ROLE_ID);
-    //    assertFalse(doAuthorize(currentPrincipal));
+    assertFalse(doAuthorize(currentPrincipal));
     // When the user is re-assigned the correct role, the authorization will 
succeed.
     when(supportsRelationOperations.listEntitiesByRelation(
             eq(SupportsRelationOperations.Type.ROLE_USER_REL),
@@ -363,11 +386,12 @@ public class TestJcasbinAuthorizer {
     makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
 
     // Get the loadedRoles cache via reflection
-    Cache<Long, Boolean> loadedRoles = getLoadedRolesCache(jcasbinAuthorizer);
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
 
     // Manually add a role to the cache
     Long testRoleId = 100L;
-    loadedRoles.put(testRoleId, true);
+    loadedRoles.put(testRoleId, Collections.emptyMap());
 
     // Verify it's in the cache
     assertNotNull(loadedRoles.getIfPresent(testRoleId));
@@ -410,7 +434,8 @@ public class TestJcasbinAuthorizer {
     Enforcer denyEnforcer = getDenyEnforcer(jcasbinAuthorizer);
 
     // Get the loadedRoles cache
-    Cache<Long, Boolean> loadedRoles = getLoadedRolesCache(jcasbinAuthorizer);
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
 
     // Add a role and its policy to the enforcer
     Long testRoleId = 300L;
@@ -421,7 +446,7 @@ public class TestJcasbinAuthorizer {
     denyEnforcer.addPolicy(roleIdStr, "CATALOG", "999", "USE_CATALOG", 
"allow");
 
     // Add role to cache
-    loadedRoles.put(testRoleId, true);
+    loadedRoles.put(testRoleId, Collections.emptyMap());
 
     // Verify role exists in enforcer (has policy)
     assertTrue(allowEnforcer.hasPolicy(roleIdStr, "CATALOG", "999", 
"USE_CATALOG", "allow"));
@@ -436,10 +461,759 @@ public class TestJcasbinAuthorizer {
     assertFalse(denyEnforcer.hasPolicy(roleIdStr, "CATALOG", "999", 
"USE_CATALOG", "allow"));
   }
 
+  @Test
+  public void testAuthorizeAndDenyReturnFalseForUserWithNoRoles() throws 
Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    mockUserRoles(userIdent);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
catalog, USE_CATALOG, ctx));
+    assertFalse(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
USE_CATALOG, ctx));
+  }
+
+  @Test
+  public void testDenyEndpointReturnsTrueForExplicitDenyRole() throws 
Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1001L;
+    RoleEntity denyRole =
+        getRoleEntity(
+            roleId,
+            "denyOnlyRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "DENY")));
+    mockRoleEntity(denyRole);
+    mockUserRoles(userIdent, denyRole);
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
USE_CATALOG, ctx));
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
catalog, USE_CATALOG, ctx));
+  }
+
+  @Test
+  public void testDenyEndpointReturnsFalseForAllowOnlyRole() throws Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1002L;
+    RoleEntity allowRole =
+        getRoleEntity(
+            roleId,
+            "allowOnlyRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "ALLOW")));
+    mockRoleEntity(allowRole);
+    mockUserRoles(userIdent, allowRole);
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertTrue(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
catalog, USE_CATALOG, ctx));
+    assertFalse(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
USE_CATALOG, ctx));
+  }
+
+  @Test
+  public void testLoadPolicyByRoleEntityAddsAllowOnlyToAllowEnforcer() throws 
Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 2010L;
+    RoleEntity allowRole =
+        getRoleEntity(
+            roleId,
+            "allowLoadRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "ALLOW")));
+    mockRoleEntity(allowRole);
+    mockUserRoles(userIdent, allowRole);
+
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    assertTrue(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+
+    String roleIdStr = String.valueOf(roleId);
+    String metadataIdStr = String.valueOf(CATALOG_ID);
+    Enforcer allowEnforcer = getAllowEnforcer(jcasbinAuthorizer);
+    Enforcer denyEnforcer = getDenyEnforcer(jcasbinAuthorizer);
+    assertTrue(
+        allowEnforcer.hasPolicy(roleIdStr, "CATALOG", metadataIdStr, 
"USE_CATALOG", "allow"));
+    assertFalse(
+        denyEnforcer.hasPolicy(roleIdStr, "CATALOG", metadataIdStr, 
"USE_CATALOG", "allow"));
+
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
+    assertEquals(
+        JcasbinAuthorizer.Effect.ALLOW,
+        loadedRoles
+            .getIfPresent(roleId)
+            .get(new JcasbinAuthorizer.PolicyKey("CATALOG", CATALOG_ID, 
"USE_CATALOG")));
+  }
+
+  @Test
+  public void 
testLoadPolicyByRoleEntityAddsDenyToBothEnforcersWithExpectedEffects()
+      throws Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 2011L;
+    RoleEntity denyRole =
+        getRoleEntity(
+            roleId,
+            "denyLoadRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "DENY")));
+    mockRoleEntity(denyRole);
+    mockUserRoles(userIdent, denyRole);
+
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    assertFalse(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+    assertTrue(
+        jcasbinAuthorizer.deny(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+
+    String roleIdStr = String.valueOf(roleId);
+    String metadataIdStr = String.valueOf(CATALOG_ID);
+    Enforcer allowEnforcer = getAllowEnforcer(jcasbinAuthorizer);
+    Enforcer denyEnforcer = getDenyEnforcer(jcasbinAuthorizer);
+    assertTrue(allowEnforcer.hasPolicy(roleIdStr, "CATALOG", metadataIdStr, 
"USE_CATALOG", "deny"));
+    assertTrue(denyEnforcer.hasPolicy(roleIdStr, "CATALOG", metadataIdStr, 
"USE_CATALOG", "allow"));
+
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
+    assertEquals(
+        JcasbinAuthorizer.Effect.DENY,
+        loadedRoles
+            .getIfPresent(roleId)
+            .get(new JcasbinAuthorizer.PolicyKey("CATALOG", CATALOG_ID, 
"USE_CATALOG")));
+  }
+
+  @Test
+  public void testCrossRoleDenyBeatsAllowOnBothEndpoints() throws Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long allowRoleId = 1007L;
+    Long denyRoleId = 1008L;
+    RoleEntity allowRole =
+        getRoleEntity(
+            allowRoleId,
+            "mixedAllow" + allowRoleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog",
+                    MetadataObject.Type.CATALOG,
+                    allowRoleId,
+                    USE_CATALOG,
+                    "ALLOW")));
+    RoleEntity denyRole =
+        getRoleEntity(
+            denyRoleId,
+            "mixedDeny" + denyRoleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, denyRoleId, 
USE_CATALOG, "DENY")));
+    mockRoleEntity(allowRole);
+    mockRoleEntity(denyRole);
+    mockUserRoles(userIdent, allowRole, denyRole);
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
catalog, USE_CATALOG, ctx));
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
USE_CATALOG, ctx));
+  }
+
+  @Test
+  public void testRoleAssignmentChangeImmediatelyVisibleToDenyEndpoint() 
throws Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long denyRoleId = 1009L;
+    RoleEntity denyRole =
+        getRoleEntity(
+            denyRoleId,
+            "denyAssignRole" + denyRoleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, denyRoleId, 
USE_CATALOG, "DENY")));
+    mockRoleEntity(denyRole);
+    mockUserRoles(userIdent, denyRole);
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    assertTrue(
+        jcasbinAuthorizer.deny(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+    // Revoke the deny role assignment from the user. Even though the enforcer 
still has the
+    // user→role edge from the previous request and the role's policy is still 
cached, the new
+    // request reads a fresh userRoleIds and therefore must no longer see the 
deny.
+    mockUserRoles(userIdent);
+    assertFalse(
+        jcasbinAuthorizer.deny(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+  }
+
+  @Test
+  public void testHandleRolePrivilegeChangeRemovesRoleFromIndex() throws 
Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1003L;
+    RoleEntity role =
+        getRoleEntity(
+            roleId,
+            "indexRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "ALLOW")));
+    mockRoleEntity(role);
+    mockUserRoles(userIdent, role);
+    assertTrue(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal,
+            METALAKE,
+            MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG),
+            USE_CATALOG,
+            new AuthorizationRequestContext()));
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
+    assertNotNull(loadedRoles.getIfPresent(roleId));
+    jcasbinAuthorizer.handleRolePrivilegeChange(roleId);
+    assertNull(loadedRoles.getIfPresent(roleId));
+  }
+
+  @Test
+  public void testRoleIndexReloadAfterPrivilegeChange() throws Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1004L;
+    RoleEntity withPrivilege =
+        getRoleEntity(
+            roleId,
+            "reloadRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "ALLOW")));
+    mockRoleEntity(withPrivilege);
+    mockUserRoles(userIdent, withPrivilege);
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    assertTrue(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+    // Revoke the privilege from the role and invalidate. The index must be 
repopulated
+    // from the new (empty) policy set, not stay stuck on the old ALLOW entry.
+    RoleEntity withoutPrivilege = getRoleEntity(roleId, "reloadRole" + roleId, 
ImmutableList.of());
+    mockRoleEntity(withoutPrivilege);
+    jcasbinAuthorizer.handleRolePrivilegeChange(roleId);
+    assertFalse(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+  }
+
+  @Test
+  public void testConcurrentRoleLoadsShareOnePolicyIndex() throws Exception {
+    Long roleId = 1010L;
+    JcasbinAuthorizer.PolicyKey policyKey =
+        new JcasbinAuthorizer.PolicyKey("CATALOG", CATALOG_ID, "USE_CATALOG");
+    Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect> expectedIndex =
+        Collections.singletonMap(policyKey, JcasbinAuthorizer.Effect.ALLOW);
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
+    AtomicInteger roleLoadCount = new AtomicInteger();
+    CountDownLatch ready = new CountDownLatch(2);
+    CountDownLatch start = new CountDownLatch(1);
+    ExecutorService requestExecutor = Executors.newFixedThreadPool(2);
+    try {
+      Future<Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> first 
=
+          requestExecutor.submit(
+              () -> {
+                ready.countDown();
+                start.await(5, TimeUnit.SECONDS);
+                return loadedRoles.get(
+                    roleId,
+                    unused -> {
+                      roleLoadCount.incrementAndGet();
+                      try {
+                        Thread.sleep(100L);
+                      } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException(e);
+                      }
+                      return expectedIndex;
+                    });
+              });
+      Future<Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
second =
+          requestExecutor.submit(
+              () -> {
+                ready.countDown();
+                start.await(5, TimeUnit.SECONDS);
+                return loadedRoles.get(
+                    roleId,
+                    unused -> {
+                      roleLoadCount.incrementAndGet();
+                      try {
+                        Thread.sleep(100L);
+                      } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException(e);
+                      }
+                      return expectedIndex;
+                    });
+              });
+      assertTrue(ready.await(5, TimeUnit.SECONDS));
+      start.countDown();
+      assertEquals(expectedIndex, first.get(5, TimeUnit.SECONDS));
+      assertEquals(expectedIndex, second.get(5, TimeUnit.SECONDS));
+    } finally {
+      requestExecutor.shutdownNow();
+    }
+    assertEquals(1, roleLoadCount.get());
+    assertEquals(expectedIndex, loadedRoles.getIfPresent(roleId));
+  }
+
+  @Test
+  public void testLoadedRoleCacheStoresPolicyIndexValue() throws Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1011L;
+    RoleEntity role =
+        getRoleEntity(
+            roleId,
+            "cachedIndexRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "ALLOW")));
+    mockRoleEntity(role);
+    mockUserRoles(userIdent, role);
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    assertTrue(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal, METALAKE, catalog, USE_CATALOG, new 
AuthorizationRequestContext()));
+    Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect> policyIndex =
+        getLoadedRolesCache(jcasbinAuthorizer).getIfPresent(roleId);
+    assertNotNull(policyIndex);
+    assertEquals(
+        JcasbinAuthorizer.Effect.ALLOW,
+        policyIndex.get(new JcasbinAuthorizer.PolicyKey("CATALOG", CATALOG_ID, 
"USE_CATALOG")));
+  }
+
+  @Test
+  public void testCachedRolePolicyIndexSkipsRoleEntityLoad() throws Exception {
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1012L;
+    RoleEntity role = getRoleEntity(roleId, "cachedRole" + roleId, 
ImmutableList.of());
+    mockUserRoles(userIdent, role);
+    getLoadedRolesCache(jcasbinAuthorizer)
+        .put(
+            roleId,
+            Collections.singletonMap(
+                new JcasbinAuthorizer.PolicyKey("CATALOG", CATALOG_ID, 
"USE_CATALOG"),
+                JcasbinAuthorizer.Effect.ALLOW));
+    when(entityStore.get(
+            eq(NameIdentifierUtil.ofRole(METALAKE, role.name())),
+            eq(Entity.EntityType.ROLE),
+            eq(RoleEntity.class)))
+        .thenThrow(new AssertionError("Cached roles should not be loaded 
again."));
+    assertTrue(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal,
+            METALAKE,
+            MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG),
+            USE_CATALOG,
+            new AuthorizationRequestContext()));
+  }
+
+  @Test
+  public void testPolicyKeyEqualityAndHash() {
+    JcasbinAuthorizer.PolicyKey base =
+        new JcasbinAuthorizer.PolicyKey("CATALOG", 1L, "USE_CATALOG");
+    JcasbinAuthorizer.PolicyKey same =
+        new JcasbinAuthorizer.PolicyKey("CATALOG", 1L, "USE_CATALOG");
+    JcasbinAuthorizer.PolicyKey differentType =
+        new JcasbinAuthorizer.PolicyKey("SCHEMA", 1L, "USE_CATALOG");
+    JcasbinAuthorizer.PolicyKey differentId =
+        new JcasbinAuthorizer.PolicyKey("CATALOG", 2L, "USE_CATALOG");
+    JcasbinAuthorizer.PolicyKey differentPrivilege =
+        new JcasbinAuthorizer.PolicyKey("CATALOG", 1L, "SELECT_TABLE");
+    assertEquals(base, same);
+    assertEquals(base.hashCode(), same.hashCode());
+    assertNotEquals(base, differentType);
+    assertNotEquals(base, differentId);
+    assertNotEquals(base, differentPrivilege);
+    assertNotEquals(base, null);
+    assertNotEquals(base, "not a policy key");
+  }
+
+  @Test
+  public void testIcebergTableLevelAuthorizeMatchesByEntityType() throws 
Exception {
+    // Iceberg API auth resolves to a TABLE-typed MetadataObject and probes 
SELECT_TABLE on it.
+    // The per-role index must key on entity type, so a TABLE-level grant must 
NOT satisfy a
+    // CATALOG-level probe (different type → different PolicyKey).
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1005L;
+    RoleEntity tableRole =
+        getRoleEntity(
+            roleId,
+            "icebergTableRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "icebergCat.icebergSchema.icebergTable",
+                    MetadataObject.Type.TABLE,
+                    roleId,
+                    SELECT_TABLE,
+                    "ALLOW")));
+    mockRoleEntity(tableRole);
+    mockUserRoles(userIdent, tableRole);
+
+    assertTrue(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal,
+            METALAKE,
+            MetadataObjects.of(
+                ImmutableList.of("icebergCat", "icebergSchema", 
"icebergTable"),
+                MetadataObject.Type.TABLE),
+            SELECT_TABLE,
+            new AuthorizationRequestContext()));
+
+    // Different privilege on the same TABLE: no policy → false.
+    assertFalse(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal,
+            METALAKE,
+            MetadataObjects.of(
+                ImmutableList.of("icebergCat", "icebergSchema", 
"icebergTable"),
+                MetadataObject.Type.TABLE),
+            USE_CATALOG,
+            new AuthorizationRequestContext()));
+
+    // Different entity type, same id: PolicyKey differs by `type`, so this 
must be false.
+    assertFalse(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal,
+            METALAKE,
+            MetadataObjects.of(null, "icebergCat", 
MetadataObject.Type.CATALOG),
+            SELECT_TABLE,
+            new AuthorizationRequestContext()));
+  }
+
+  @Test
+  public void testGravitinoApiSchemaLevelAuthorize() throws Exception {
+    // Gravitino API auth flows through AuthorizationExpressionEvaluator → 
JcasbinAuthorizer with
+    // SCHEMA-typed objects for schema-scoped operations. Verify that a 
SCHEMA-level USE_SCHEMA
+    // grant via a role authorizes the schema and only the schema.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 1006L;
+    RoleEntity schemaRole =
+        getRoleEntity(
+            roleId,
+            "schemaRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "cat1.schema1", MetadataObject.Type.SCHEMA, roleId, 
USE_SCHEMA, "ALLOW")));
+    mockRoleEntity(schemaRole);
+    mockUserRoles(userIdent, schemaRole);
+
+    assertTrue(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal,
+            METALAKE,
+            MetadataObjects.of(ImmutableList.of("cat1", "schema1"), 
MetadataObject.Type.SCHEMA),
+            USE_SCHEMA,
+            new AuthorizationRequestContext()));
+
+    // USE_CATALOG on the parent catalog has no matching policy.
+    assertFalse(
+        jcasbinAuthorizer.authorize(
+            currentPrincipal,
+            METALAKE,
+            MetadataObjects.of(null, "cat1", MetadataObject.Type.CATALOG),
+            USE_CATALOG,
+            new AuthorizationRequestContext()));
+  }
+
+  @Test
+  public void testIntraRoleDenyBeatsAllowForSameKey() throws Exception {
+    // Within a single role, granting both ALLOW and DENY for the same (type, 
metadataId,
+    // privilege) must resolve to DENY. The unified resolveEffect relies on
+    // loadPolicyByRoleEntity's merge function (existing == DENY ? existing : 
incoming) to enforce
+    // intra-role priority, so authorize/deny see DENY regardless of insertion 
order.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 2020L;
+    SecurableObject allow =
+        makeSecurableObject(
+            "testCatalog", MetadataObject.Type.CATALOG, roleId, USE_CATALOG, 
"ALLOW");
+    SecurableObject deny =
+        makeSecurableObject(
+            "testCatalog", MetadataObject.Type.CATALOG, roleId, USE_CATALOG, 
"DENY");
+    RoleEntity mixedRole =
+        getRoleEntity(roleId, "mixedSameKey" + roleId, ImmutableList.of(allow, 
deny));
+    mockRoleEntity(mixedRole);
+    mockUserRoles(userIdent, mixedRole);
+
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
catalog, USE_CATALOG, ctx));
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
USE_CATALOG, ctx));
+
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
+    assertEquals(
+        JcasbinAuthorizer.Effect.DENY,
+        loadedRoles
+            .getIfPresent(roleId)
+            .get(new JcasbinAuthorizer.PolicyKey("CATALOG", CATALOG_ID, 
"USE_CATALOG")));
+  }
+
+  @Test
+  public void testAuthorizeAndDenyBothFalseWhenRoleHasNoRuleForKey() throws 
Exception {
+    // The role grants USE_CATALOG on the catalog but the request probes 
SELECT_TABLE. The shared
+    // resolver returns null (no matching rule), so both endpoints must return 
false. This guards
+    // against the unified resolveEffect collapsing "no rule" into either 
ALLOW or DENY.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 2021L;
+    RoleEntity role =
+        getRoleEntity(
+            roleId,
+            "noRuleRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testCatalog", MetadataObject.Type.CATALOG, roleId, 
USE_CATALOG, "ALLOW")));
+    mockRoleEntity(role);
+    mockUserRoles(userIdent, role);
+
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertFalse(
+        jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, catalog, 
SELECT_TABLE, ctx));
+    assertFalse(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
SELECT_TABLE, ctx));
+  }
+
+  @Test
+  public void testAuthorizeAndDenyShareSameResolveAcrossPrivileges() throws 
Exception {
+    // A single role grants ALLOW on USE_CATALOG and DENY on USE_SCHEMA for 
the same metadata
+    // object. The unified resolveEffect must return the corresponding Effect 
for each PolicyKey
+    // independently, so authorize/deny disagree per privilege rather than 
collapsing to one
+    // verdict for the whole role. This is the core property roryqi asked for: 
one resolver, two
+    // endpoints differing only in how they compare the result.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 2022L;
+    SecurableObject allowUseCatalog =
+        makeSecurableObject(
+            "testCatalog", MetadataObject.Type.CATALOG, roleId, USE_CATALOG, 
"ALLOW");
+    SecurableObject denyUseSchema =
+        makeSecurableObject("testCatalog", MetadataObject.Type.CATALOG, 
roleId, USE_SCHEMA, "DENY");
+    RoleEntity role =
+        getRoleEntity(
+            roleId, "mixedPrivRole" + roleId, 
ImmutableList.of(allowUseCatalog, denyUseSchema));
+    mockRoleEntity(role);
+    mockUserRoles(userIdent, role);
+
+    MetadataObject catalog = MetadataObjects.of(null, "testCatalog", 
MetadataObject.Type.CATALOG);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertTrue(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
catalog, USE_CATALOG, ctx));
+    assertFalse(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
USE_CATALOG, ctx));
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
catalog, USE_SCHEMA, ctx));
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, catalog, 
USE_SCHEMA, ctx));
+  }
+
+  @Test
+  public void testAuthorizeReturnsFalseForDirectPrivilegeDenyOnly() throws 
Exception {
+    // Reproduces the case roryqi flagged: an OGNL expression like 
`METALAKE::RUN_JOB` is rewritten
+    // by AuthorizationExpressionConverter to a *single* 
`authorizer.authorize(...)` call — there is
+    // no companion `authorizer.deny(...)` call (which only ANY_xxx aliases 
generate). So the
+    // authorize endpoint must independently respect explicit DENY policies; a 
role that grants
+    // only DENY RUN_JOB on the metalake must cause authorize() to return 
false.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 3001L;
+    RoleEntity denyRole =
+        getRoleEntity(
+            roleId,
+            "denyRunJobRole" + roleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testMetalake", MetadataObject.Type.METALAKE, roleId, 
RUN_JOB, "DENY")));
+    mockRoleEntity(denyRole);
+    mockUserRoles(userIdent, denyRole);
+
+    MetadataObject metalake = MetadataObjects.of(null, METALAKE, 
MetadataObject.Type.METALAKE);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
metalake, RUN_JOB, ctx));
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, metalake, 
RUN_JOB, ctx));
+  }
+
+  @Test
+  public void testAuthorizeReturnsFalseForDirectPrivilegeDenyBeatsAllowRole() 
throws Exception {
+    // Companion to testAuthorizeReturnsFalseForDirectPrivilegeDenyOnly per 
roryqi's review: also
+    // assign the user a sibling role that ALLOWs RUN_JOB on the same 
metalake, so the test pins
+    // down DENY-beats-ALLOW on the authorize endpoint when the OGNL 
expression `METALAKE::RUN_JOB`
+    // emits only an authorize() call (no deny() probe). Distinct role IDs 
from any other test to
+    // avoid cache-pollution interactions.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long denyRoleId = 3010L;
+    Long allowRoleId = 3011L;
+    RoleEntity denyRole =
+        getRoleEntity(
+            denyRoleId,
+            "denyRunJobRole" + denyRoleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testMetalake", MetadataObject.Type.METALAKE, denyRoleId, 
RUN_JOB, "DENY")));
+    RoleEntity allowRole =
+        getRoleEntity(
+            allowRoleId,
+            "allowRunJobRole" + allowRoleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testMetalake", MetadataObject.Type.METALAKE, allowRoleId, 
RUN_JOB, "ALLOW")));
+    mockRoleEntity(denyRole);
+    mockRoleEntity(allowRole);
+    mockUserRoles(userIdent, denyRole, allowRole);
+
+    MetadataObject metalake = MetadataObjects.of(null, METALAKE, 
MetadataObject.Type.METALAKE);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
metalake, RUN_JOB, ctx));
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, metalake, 
RUN_JOB, ctx));
+  }
+
+  @Test
+  public void testAuthorizeReturnsFalseForDirectPrivilegeIntraRoleDeny() 
throws Exception {
+    // Same role grants both ALLOW and DENY for METALAKE::RUN_JOB. Even though 
the expression
+    // converter only emits a single authorize() call (no deny() probe), DENY 
must still beat
+    // ALLOW inside the role and propagate through resolveEffect to the 
authorize endpoint.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long roleId = 3002L;
+    SecurableObject allow =
+        makeSecurableObject("testMetalake", MetadataObject.Type.METALAKE, 
roleId, RUN_JOB, "ALLOW");
+    SecurableObject deny =
+        makeSecurableObject("testMetalake", MetadataObject.Type.METALAKE, 
roleId, RUN_JOB, "DENY");
+    RoleEntity mixedRole =
+        getRoleEntity(roleId, "mixedRunJobRole" + roleId, 
ImmutableList.of(allow, deny));
+    mockRoleEntity(mixedRole);
+    mockUserRoles(userIdent, mixedRole);
+
+    MetadataObject metalake = MetadataObjects.of(null, METALAKE, 
MetadataObject.Type.METALAKE);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
metalake, RUN_JOB, ctx));
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, metalake, 
RUN_JOB, ctx));
+  }
+
+  @Test
+  public void testAuthorizeReturnsFalseForDirectPrivilegeCrossRoleDeny() 
throws Exception {
+    // One role grants ALLOW RUN_JOB on metalake, another role grants DENY 
RUN_JOB on the same
+    // metalake. The direct-privilege expression `METALAKE::RUN_JOB` only 
triggers authorize();
+    // cross-role DENY priority must still take effect from the authorize side 
without relying on
+    // a paired deny() call from an ANY_xxx expansion.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    NameIdentifier userIdent = NameIdentifierUtil.ofUser(METALAKE, USERNAME);
+    Long allowRoleId = 3003L;
+    Long denyRoleId = 3004L;
+    RoleEntity allowRole =
+        getRoleEntity(
+            allowRoleId,
+            "allowRunJob" + allowRoleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testMetalake", MetadataObject.Type.METALAKE, allowRoleId, 
RUN_JOB, "ALLOW")));
+    RoleEntity denyRole =
+        getRoleEntity(
+            denyRoleId,
+            "denyRunJob" + denyRoleId,
+            ImmutableList.of(
+                makeSecurableObject(
+                    "testMetalake", MetadataObject.Type.METALAKE, denyRoleId, 
RUN_JOB, "DENY")));
+    mockRoleEntity(allowRole);
+    mockRoleEntity(denyRole);
+    mockUserRoles(userIdent, allowRole, denyRole);
+
+    MetadataObject metalake = MetadataObjects.of(null, METALAKE, 
MetadataObject.Type.METALAKE);
+    AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+    assertFalse(jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, 
metalake, RUN_JOB, ctx));
+    assertTrue(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, metalake, 
RUN_JOB, ctx));
+  }
+
+  @Test
+  public void testIsOwnerReturnsFalseWhenAnotherUserIsOwner() throws Exception 
{
+    // Existing testAuthorizeByOwner only covers "user is owner" and "no owner 
cached". This case
+    // pins the third branch: ownerRel resolves to a *different* user, so 
isOwner must return
+    // false even though the cache entry exists.
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    Long otherUserId = USER_ID + 1000L;
+    Cache<Long, Optional<Long>> ownerRel = getOwnerRelCache(jcasbinAuthorizer);
+    ownerRel.invalidateAll();
+    ownerRel.put(CATALOG_ID, Optional.of(otherUserId));
+
+    NameIdentifier catalogIdent = NameIdentifierUtil.ofCatalog(METALAKE, 
"testCatalog");
+    // Stub the relation lookup so loadOwnerPolicy doesn't overwrite our 
manually-injected entry
+    // when isOwner walks through. We pre-seed ownerRel above, so 
loadOwnerPolicy's
+    // `getIfPresent != null` short-circuit fires and the relation lookup is 
never reached — but
+    // stub it defensively so a future refactor that drops the short-circuit 
doesn't silently
+    // change this test's semantics.
+    doReturn(ImmutableList.of())
+        .when(supportsRelationOperations)
+        .listEntitiesByRelation(
+            eq(SupportsRelationOperations.Type.OWNER_REL),
+            eq(catalogIdent),
+            eq(Entity.EntityType.CATALOG));
+
+    assertFalse(doAuthorizeOwner(currentPrincipal));
+  }
+
+  @Test
+  public void testAuthorizeReturnsFalseWhenMetadataIdLookupFails() throws 
Exception {
+    // loadAndResolveEffect swallows exceptions from the user/metadata id 
lookup and returns null
+    // (treated as "no rule"). This guards against an unauthenticated user or 
a transient
+    // catalog-resolution error inadvertently leaking through as an ALLOW or 
DENY verdict — both
+    // endpoints must return false.
+    makeCompletableFutureUseCurrentThread(jcasbinAuthorizer);
+    Principal currentPrincipal = PrincipalUtils.getCurrentPrincipal();
+    MetadataObject unknown =
+        MetadataObjects.of(null, "missingCatalog", 
MetadataObject.Type.CATALOG);
+    metadataIdConverterMockedStatic
+        .when(() -> MetadataIdConverter.getID(eq(unknown), eq(METALAKE)))
+        .thenThrow(new RuntimeException("simulated id lookup failure"));
+    try {
+      AuthorizationRequestContext ctx = new AuthorizationRequestContext();
+      assertFalse(
+          jcasbinAuthorizer.authorize(currentPrincipal, METALAKE, unknown, 
USE_CATALOG, ctx));
+      assertFalse(jcasbinAuthorizer.deny(currentPrincipal, METALAKE, unknown, 
USE_CATALOG, ctx));
+    } finally {
+      // Restore the default stub so subsequent tests still resolve to 
CATALOG_ID.
+      metadataIdConverterMockedStatic
+          .when(() -> MetadataIdConverter.getID(eq(unknown), eq(METALAKE)))
+          .thenReturn(CATALOG_ID);
+    }
+  }
+
   @Test
   public void testCacheInitialization() throws Exception {
     // Verify that caches are initialized
-    Cache<Long, Boolean> loadedRoles = getLoadedRolesCache(jcasbinAuthorizer);
+    Cache<Long, Map<JcasbinAuthorizer.PolicyKey, JcasbinAuthorizer.Effect>> 
loadedRoles =
+        getLoadedRolesCache(jcasbinAuthorizer);
     Cache<Long, Optional<Long>> ownerRel = getOwnerRelCache(jcasbinAuthorizer);
 
     assertNotNull(loadedRoles, "loadedRoles cache should be initialized");
@@ -447,11 +1221,12 @@ public class TestJcasbinAuthorizer {
   }
 
   @SuppressWarnings("unchecked")
-  private static Cache<Long, Boolean> getLoadedRolesCache(JcasbinAuthorizer 
authorizer)
-      throws Exception {
+  private static Cache<Long, Map<JcasbinAuthorizer.PolicyKey, 
JcasbinAuthorizer.Effect>>
+      getLoadedRolesCache(JcasbinAuthorizer authorizer) throws Exception {
     Field field = JcasbinAuthorizer.class.getDeclaredField("loadedRoles");
     field.setAccessible(true);
-    return (Cache<Long, Boolean>) field.get(authorizer);
+    return (Cache<Long, Map<JcasbinAuthorizer.PolicyKey, 
JcasbinAuthorizer.Effect>>)
+        field.get(authorizer);
   }
 
   @SuppressWarnings("unchecked")
@@ -473,4 +1248,46 @@ public class TestJcasbinAuthorizer {
     field.setAccessible(true);
     return (Enforcer) field.get(authorizer);
   }
+
+  private static SecurableObject makeSecurableObject(
+      String name,
+      MetadataObject.Type type,
+      Long roleId,
+      Privilege.Name privilege,
+      String condition) {
+    try {
+      SecurableObjectPO po =
+          SecurableObjectPO.builder()
+              .withType(String.valueOf(type))
+              .withMetadataObjectId(CATALOG_ID)
+              .withRoleId(roleId)
+              .withPrivilegeNames(
+                  
objectMapper.writeValueAsString(ImmutableList.of(privilege.name())))
+              
.withPrivilegeConditions(objectMapper.writeValueAsString(ImmutableList.of(condition)))
+              .withDeletedAt(0L)
+              .withCurrentVersion(1L)
+              .withLastVersion(1L)
+              .build();
+      return POConverters.fromSecurableObjectPO(name, po, type);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void mockUserRoles(NameIdentifier userIdent, RoleEntity... 
roles)
+      throws IOException {
+    when(supportsRelationOperations.listEntitiesByRelation(
+            eq(SupportsRelationOperations.Type.ROLE_USER_REL),
+            eq(userIdent),
+            eq(Entity.EntityType.USER)))
+        .thenReturn(ImmutableList.copyOf(roles));
+  }
+
+  private static void mockRoleEntity(RoleEntity role) throws IOException {
+    when(entityStore.get(
+            eq(NameIdentifierUtil.ofRole(METALAKE, role.name())),
+            eq(Entity.EntityType.ROLE),
+            eq(RoleEntity.class)))
+        .thenReturn(role);
+  }
 }

Reply via email to