This is an automated email from the ASF dual-hosted git repository.

collado pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e6151422 Add loadEntities batch call and rename listFullEntities 
(#2508)
0e6151422 is described below

commit 0e6151422eb10ea902d2df79843360cc66012a30
Author: Michael Collado <[email protected]>
AuthorDate: Fri Nov 7 09:12:06 2025 -0800

    Add loadEntities batch call and rename listFullEntities (#2508)
    
    * Add loadEntities batch call and rename listFullEntities
    
    * Changed batch call to implement loadResolvedEntities instead
    
    * Add loadResolvedEntities by id and entity cache support
    
    * Add additional test for loadResolvedEntities by id
    
    * Added additional test and updated comments in EntityCache interface
    
    * Add additional constructor to ResolvedEntitiesResult
    
    * Fixed unused method reference
    
    * Removed loadResolvedEntities method with lookup record param
    
    * Pulled out toResolvedPolarisEntity method per PR comment
---
 .../relational/jdbc/JdbcBasePersistenceImpl.java   |  11 +-
 .../AtomicOperationMetaStoreManager.java           |  60 +-
 .../polaris/core/persistence/BasePersistence.java  |   4 +-
 .../core/persistence/PolarisMetaStoreManager.java  |  31 +-
 .../TransactionWorkspaceMetaStoreManager.java      |  13 +-
 .../core/persistence/cache/EntityCache.java        |  30 +
 .../persistence/cache/InMemoryEntityCache.java     | 134 +++-
 .../dao/entity/ResolvedEntitiesResult.java         |  56 ++
 .../AbstractTransactionalPersistence.java          |   2 +-
 .../TransactionalMetaStoreManagerImpl.java         |  64 +-
 .../transactional/TransactionalPersistence.java    |   2 +-
 .../persistence/cache/InMemoryEntityCacheTest.java | 846 +++++++++++++++++----
 .../BasePolarisMetaStoreManagerTest.java           |   8 +-
 .../persistence/PolarisTestMetaStoreManager.java   | 112 +++
 .../polaris/service/admin/PolarisAdminService.java |   8 +-
 .../service/catalog/policy/PolicyCatalog.java      |   2 +-
 16 files changed, 1222 insertions(+), 161 deletions(-)

diff --git 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
index 8d6201453..9401df2dd 100644
--- 
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
+++ 
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
@@ -31,6 +31,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -463,7 +464,12 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
     PreparedQuery query =
         QueryGenerator.generateSelectQueryWithEntityIds(realmId, 
schemaVersion, entityIds);
     try {
-      return datasourceOperations.executeSelect(query, new 
ModelEntity(schemaVersion));
+      Map<PolarisEntityId, PolarisBaseEntity> idMap =
+          datasourceOperations.executeSelect(query, new 
ModelEntity(schemaVersion)).stream()
+              .collect(
+                  Collectors.toMap(
+                      e -> new PolarisEntityId(e.getCatalogId(), e.getId()), 
Function.identity()));
+      return entityIds.stream().map(idMap::get).collect(Collectors.toList());
     } catch (SQLException e) {
       throw new RuntimeException(
           String.format("Failed to retrieve polaris entities due to %s", 
e.getMessage()), e);
@@ -476,6 +482,7 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
       @Nonnull PolarisCallContext callCtx, List<PolarisEntityId> entityIds) {
     Map<PolarisEntityId, ModelEntity> idToEntityMap =
         lookupEntities(callCtx, entityIds).stream()
+            .filter(Objects::nonNull)
             .collect(
                 Collectors.toMap(
                     entry -> new PolarisEntityId(entry.getCatalogId(), 
entry.getId()),
@@ -570,7 +577,7 @@ public class JdbcBasePersistenceImpl implements 
BasePersistence, IntegrationPers
 
   @Nonnull
   @Override
-  public <T> Page<T> loadEntities(
+  public <T> Page<T> listFullEntities(
       @Nonnull PolarisCallContext callCtx,
       long catalogId,
       long parentId,
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
index c3841486a..1ec8c89d4 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDiagnostics;
 import org.apache.polaris.core.config.FeatureConfiguration;
@@ -67,6 +68,7 @@ import 
org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult;
 import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult;
 import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult;
 import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
 import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
 import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
 import org.apache.polaris.core.persistence.pagination.Page;
@@ -705,7 +707,7 @@ public class AtomicOperationMetaStoreManager extends 
BaseMetaStoreManager {
 
   /** {@inheritDoc} */
   @Override
-  public @Nonnull Page<PolarisBaseEntity> loadEntities(
+  public @Nonnull Page<PolarisBaseEntity> listFullEntities(
       @Nonnull PolarisCallContext callCtx,
       @Nullable List<PolarisEntityCore> catalogPath,
       @Nonnull PolarisEntityType entityType,
@@ -728,7 +730,7 @@ public class AtomicOperationMetaStoreManager extends 
BaseMetaStoreManager {
     // with sensitive data; but the window of inconsistency is only the 
duration of a single
     // in-flight request (the cache-based resolution follows a different path 
entirely).
 
-    return ms.loadEntities(
+    return ms.listFullEntities(
         callCtx,
         catalogId,
         parentId,
@@ -1200,7 +1202,7 @@ public class AtomicOperationMetaStoreManager extends 
BaseMetaStoreManager {
 
       // get the list of catalog roles, at most 2
       List<PolarisBaseEntity> catalogRoles =
-          ms.loadEntities(
+          ms.listFullEntities(
                   callCtx,
                   catalogId,
                   catalogId,
@@ -1520,7 +1522,7 @@ public class AtomicOperationMetaStoreManager extends 
BaseMetaStoreManager {
 
     // find all available tasks
     Page<PolarisBaseEntity> availableTasks =
-        ms.loadEntities(
+        ms.listFullEntities(
             callCtx,
             PolarisEntityConstants.getRootEntityId(),
             PolarisEntityConstants.getRootEntityId(),
@@ -1760,6 +1762,56 @@ public class AtomicOperationMetaStoreManager extends 
BaseMetaStoreManager {
     return result;
   }
 
+  @Nonnull
+  @Override
+  public ResolvedEntitiesResult loadResolvedEntities(
+      @Nonnull PolarisCallContext callCtx,
+      @Nonnull PolarisEntityType entityType,
+      @Nonnull List<PolarisEntityId> entityIds) {
+    BasePersistence ms = callCtx.getMetaStore();
+    return getResolvedEntitiesResult(callCtx, ms, entityIds, i -> entityType);
+  }
+
+  private static ResolvedEntitiesResult getResolvedEntitiesResult(
+      PolarisCallContext callCtx,
+      BasePersistence ms,
+      List<PolarisEntityId> entityIds,
+      Function<Integer, PolarisEntityType> entityTypeForIndex) {
+    List<PolarisBaseEntity> entities = ms.lookupEntities(callCtx, entityIds);
+    // mimic the behavior of loadEntity above, return null if not found or 
type mismatch
+    List<ResolvedPolarisEntity> ret =
+        IntStream.range(0, entityIds.size())
+            .mapToObj(
+                i -> {
+                  if (entities.get(i) != null
+                      && 
!entities.get(i).getType().equals(entityTypeForIndex.apply(i))) {
+                    return null;
+                  } else {
+                    return entities.get(i);
+                  }
+                })
+            .map(e -> toResolvedPolarisEntity(callCtx, e, ms))
+            .collect(Collectors.toList());
+    return new ResolvedEntitiesResult(ret);
+  }
+
+  private static ResolvedPolarisEntity toResolvedPolarisEntity(
+      PolarisCallContext callCtx, PolarisBaseEntity e, BasePersistence ms) {
+    if (e == null) {
+      return null;
+    } else {
+      // load the grant records
+      final List<PolarisGrantRecord> grantRecordsAsSecurable =
+          ms.loadAllGrantRecordsOnSecurable(callCtx, e.getCatalogId(), 
e.getId());
+      final List<PolarisGrantRecord> grantRecordsAsGrantee =
+          e.getType().isGrantee()
+              ? ms.loadAllGrantRecordsOnGrantee(callCtx, e.getCatalogId(), 
e.getId())
+              : List.of();
+      return new ResolvedPolarisEntity(
+          PolarisEntity.of(e), grantRecordsAsGrantee, grantRecordsAsSecurable);
+    }
+  }
+
   /** {@inheritDoc} */
   @Override
   public @Nonnull ResolvedEntityResult refreshResolvedEntity(
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java
index d238e490e..afea79fda 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java
@@ -279,7 +279,7 @@ public interface BasePersistence extends 
PolicyMappingPersistence {
 
   /**
    * List lightweight information of entities matching the given criteria with 
pagination. If all
-   * properties of the entity are required,use {@link #loadEntities} instead.
+   * properties of the entity are required,use {@link #listFullEntities} 
instead.
    *
    * @param callCtx call context
    * @param catalogId catalog id for that entity, NULL_ID if the entity is 
top-level
@@ -314,7 +314,7 @@ public interface BasePersistence extends 
PolicyMappingPersistence {
    * @return the paged list of matching entities after transformation
    */
   @Nonnull
-  <T> Page<T> loadEntities(
+  <T> Page<T> listFullEntities(
       @Nonnull PolarisCallContext callCtx,
       long catalogId,
       long parentId,
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
index cf3912fa9..efa73a60b 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
@@ -47,6 +47,7 @@ import 
org.apache.polaris.core.persistence.dao.entity.EntityResult;
 import org.apache.polaris.core.persistence.dao.entity.EntityWithPath;
 import org.apache.polaris.core.persistence.dao.entity.GenerateEntityIdResult;
 import org.apache.polaris.core.persistence.dao.entity.ListEntitiesResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
 import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
 import org.apache.polaris.core.persistence.pagination.Page;
 import org.apache.polaris.core.persistence.pagination.PageToken;
@@ -114,7 +115,7 @@ public interface PolarisMetaStoreManager
 
   /**
    * List lightweight information about entities matching the given criteria. 
If all properties of
-   * the entity are required,use {@link #loadEntities} instead.
+   * the entity are required,use {@link #listFullEntities} instead.
    *
    * @param callCtx call context
    * @param catalogPath path inside a catalog. If null or empty, the entities 
to list are top-level,
@@ -135,7 +136,7 @@ public interface PolarisMetaStoreManager
   /**
    * Load full entities matching the given criteria with pagination. If only 
the entity name/id/type
    * is required, use {@link #listEntities} instead. If no pagination is 
required, use {@link
-   * #loadEntitiesAll} instead.
+   * #listFullEntitiesAll} instead.
    *
    * @param callCtx call context
    * @param catalogPath path inside a catalog. If null or empty, the entities 
to list are top-level,
@@ -145,7 +146,7 @@ public interface PolarisMetaStoreManager
    * @return paged list of matching entities
    */
   @Nonnull
-  Page<PolarisBaseEntity> loadEntities(
+  Page<PolarisBaseEntity> listFullEntities(
       @Nonnull PolarisCallContext callCtx,
       @Nullable List<PolarisEntityCore> catalogPath,
       @Nonnull PolarisEntityType entityType,
@@ -154,7 +155,7 @@ public interface PolarisMetaStoreManager
 
   /**
    * Load full entities matching the given criteria into an unpaged list. If 
pagination is required
-   * use {@link #loadEntities} instead. If only the entity name/id/type is 
required, use {@link
+   * use {@link #listFullEntities} instead. If only the entity name/id/type is 
required, use {@link
    * #listEntities} instead.
    *
    * @param callCtx call context
@@ -164,12 +165,13 @@ public interface PolarisMetaStoreManager
    * @param entitySubType subType of entities to list (or ANY_SUBTYPE)
    * @return list of all matching entities
    */
-  default @Nonnull List<PolarisBaseEntity> loadEntitiesAll(
+  default @Nonnull List<PolarisBaseEntity> listFullEntitiesAll(
       @Nonnull PolarisCallContext callCtx,
       @Nullable List<PolarisEntityCore> catalogPath,
       @Nonnull PolarisEntityType entityType,
       @Nonnull PolarisEntitySubType entitySubType) {
-    return loadEntities(callCtx, catalogPath, entityType, entitySubType, 
PageToken.readEverything())
+    return listFullEntities(
+            callCtx, catalogPath, entityType, entitySubType, 
PageToken.readEverything())
         .items();
   }
 
@@ -416,6 +418,23 @@ public interface PolarisMetaStoreManager
       @Nonnull PolarisEntityType entityType,
       @Nonnull String entityName);
 
+  /**
+   * Load a batch of resolved entities of a specified entity type given their 
{@link
+   * PolarisEntityId}. Will return an empty list if the input list is empty. 
Order in that returned
+   * list is the same as the input list. Some elements might be NULL if the 
entity has been dropped.
+   *
+   * @param callCtx call context
+   * @param entityType the type of entities to load
+   * @param entityIds the list of entity ids to load
+   * @return a non-null list of entities corresponding to the lookup keys. 
Some elements might be
+   *     NULL if the entity has been dropped.
+   */
+  @Nonnull
+  ResolvedEntitiesResult loadResolvedEntities(
+      @Nonnull PolarisCallContext callCtx,
+      @Nonnull PolarisEntityType entityType,
+      @Nonnull List<PolarisEntityId> entityIds);
+
   /**
    * Refresh a resolved entity from the backend store. Will return NULL if the 
entity does not
    * exist, i.e. has been purged or dropped. Else, will determine what has 
changed based on the
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
index 872955893..99c1f8162 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
@@ -53,6 +53,7 @@ import 
org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult;
 import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult;
 import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult;
 import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
 import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
 import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
 import org.apache.polaris.core.persistence.pagination.Page;
@@ -133,7 +134,7 @@ public class TransactionWorkspaceMetaStoreManager 
implements PolarisMetaStoreMan
   }
 
   @Override
-  public @Nonnull Page<PolarisBaseEntity> loadEntities(
+  public @Nonnull Page<PolarisBaseEntity> listFullEntities(
       @Nonnull PolarisCallContext callCtx,
       @Nullable List<PolarisEntityCore> catalogPath,
       @Nonnull PolarisEntityType entityType,
@@ -379,6 +380,16 @@ public class TransactionWorkspaceMetaStoreManager 
implements PolarisMetaStoreMan
     return null;
   }
 
+  @Nonnull
+  @Override
+  public ResolvedEntitiesResult loadResolvedEntities(
+      @Nonnull PolarisCallContext callCtx,
+      @Nonnull PolarisEntityType entityType,
+      @Nonnull List<PolarisEntityId> entityIds) {
+    diagnostics.fail("illegal_method_in_transaction_workspace", 
"loadResolvedEntities");
+    return null;
+  }
+
   @Override
   public ResolvedEntityResult refreshResolvedEntity(
       @Nonnull PolarisCallContext callCtx,
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
index cd438c995..481fa7a9e 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
@@ -20,8 +20,10 @@ package org.apache.polaris.core.persistence.cache;
 
 import jakarta.annotation.Nonnull;
 import jakarta.annotation.Nullable;
+import java.util.List;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisEntityId;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
 
@@ -80,4 +82,32 @@ public interface EntityCache {
   @Nullable
   EntityCacheLookupResult getOrLoadEntityByName(
       @Nonnull PolarisCallContext callContext, @Nonnull EntityCacheByNameKey 
entityNameKey);
+
+  /**
+   * Load multiple entities by id, returning those found in the cache and 
loading those not found.
+   *
+   * <p>Cached entity versions and grant versions must be verified against the 
versions returned by
+   * the {@link
+   * 
org.apache.polaris.core.persistence.PolarisMetaStoreManager#loadEntitiesChangeTracking(PolarisCallContext,
+   * List)} API to ensure the returned entities are consistent with the 
current state of the
+   * metastore. Cache implementations must never return a mix of stale 
entities and fresh entities,
+   * as authorization or table conflict decisions could be made based on 
inconsistent data. For
+   * example, a Principal may have a grant to a Principal Role in a cached 
entry, but that grant may
+   * be revoked prior to the Principal Role being granted a privilege on a 
Catalog. If the Principal
+   * record is stale, but the Principal Role is refreshed, the Principal may 
be incorrectly
+   * authorized to access the Catalog.
+   *
+   * @param callCtx the Polaris call context
+   * @param entityType the entity type
+   * @param entityIds the list of entity ids to load
+   * @return the list of resolved entities, in the same order as the requested 
entity ids. As in
+   *     {@link
+   *     
org.apache.polaris.core.persistence.PolarisMetaStoreManager#loadResolvedEntities(PolarisCallContext,
+   *     PolarisEntityType, List)}, elements in the returned list may be null 
if the corresponding
+   *     entity id does not exist.
+   */
+  List<EntityCacheLookupResult> getOrLoadResolvedEntities(
+      @Nonnull PolarisCallContext callCtx,
+      @Nonnull PolarisEntityType entityType,
+      @Nonnull List<PolarisEntityId> entityIds);
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
index c30b996f1..cf5ad0c21 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
@@ -24,24 +24,39 @@ import com.github.benmanes.caffeine.cache.RemovalListener;
 import jakarta.annotation.Nonnull;
 import jakarta.annotation.Nullable;
 import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDiagnostics;
 import org.apache.polaris.core.config.BehaviorChangeConfiguration;
 import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.config.RealmConfig;
 import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisChangeTrackingVersions;
+import org.apache.polaris.core.entity.PolarisEntityId;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.entity.PolarisGrantRecord;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
 import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
+import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
 import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** An in-memory entity cache with a limit of 100k entities and a 1h TTL. */
 public class InMemoryEntityCache implements EntityCache {
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InMemoryEntityCache.class);
+  public static final int MAX_CACHE_REFRESH_ATTEMPTS = 100;
   private final PolarisDiagnostics diagnostics;
   private final PolarisMetaStoreManager polarisMetaStoreManager;
   private final Cache<Long, ResolvedPolarisEntity> byId;
@@ -451,4 +466,121 @@ public class InMemoryEntityCache implements EntityCache {
     // return what we found
     return new EntityCacheLookupResult(entry, cacheHit);
   }
+
+  @Override
+  public List<EntityCacheLookupResult> getOrLoadResolvedEntities(
+      @Nonnull PolarisCallContext callCtx,
+      @Nonnull PolarisEntityType entityType,
+      @Nonnull List<PolarisEntityId> entityIds) {
+    // use a map to collect cached entries to avoid concurrency problems in 
case a second thread is
+    // trying to populate
+    // the cache from a different snapshot
+    Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities = new 
HashMap<>();
+    boolean stateResolved = false;
+    for (int i = 0; i < MAX_CACHE_REFRESH_ATTEMPTS; i++) {
+      Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc =
+          idsToLoad -> polarisMetaStoreManager.loadResolvedEntities(callCtx, 
entityType, idsToLoad);
+      if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc)) 
{
+        stateResolved = true;
+        break;
+      }
+    }
+    if (!stateResolved) {
+      LOGGER.warn(
+          "Unable to resolve entities in cache after multiple attempts {} - 
resolved: {}",
+          entityIds,
+          resolvedEntities);
+      diagnostics.fail("cannot_resolve_all_entities", "Unable to resolve 
entities in cache");
+    }
+
+    return entityIds.stream()
+        .map(
+            id -> {
+              ResolvedPolarisEntity entity = resolvedEntities.get(id);
+              return entity == null ? null : new 
EntityCacheLookupResult(entity, true);
+            })
+        .collect(Collectors.toList());
+  }
+
+  private boolean isCacheStateValid(
+      @Nonnull PolarisCallContext callCtx,
+      @Nonnull Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities,
+      @Nonnull List<PolarisEntityId> entityIds,
+      @Nonnull Function<List<PolarisEntityId>, ResolvedEntitiesResult> 
loaderFunc) {
+    ChangeTrackingResult changeTrackingResult =
+        polarisMetaStoreManager.loadEntitiesChangeTracking(callCtx, entityIds);
+    List<PolarisEntityId> idsToLoad = new ArrayList<>();
+    if (changeTrackingResult.isSuccess()) {
+      idsToLoad.addAll(validateCacheEntries(entityIds, resolvedEntities, 
changeTrackingResult));
+    } else {
+      idsToLoad.addAll(entityIds);
+    }
+    if (!idsToLoad.isEmpty()) {
+      ResolvedEntitiesResult resolvedEntitiesResult = 
loaderFunc.apply(idsToLoad);
+      if (resolvedEntitiesResult.isSuccess()) {
+        LOGGER.debug("Resolved entities - validating cache");
+        resolvedEntitiesResult.getResolvedEntities().stream()
+            .filter(Objects::nonNull)
+            .forEach(
+                e -> {
+                  this.cacheNewEntry(e);
+                  resolvedEntities.put(
+                      new PolarisEntityId(e.getEntity().getCatalogId(), 
e.getEntity().getId()), e);
+                });
+      }
+    }
+
+    // the loader function should always return a batch of results from the 
same "snapshot" of the
+    // persistence, so
+    // if the changeTracking call above failed, we should have loaded the 
entire batch in one shot.
+    // There should be no
+    // need to revalidate the entities.
+    List<PolarisEntityId> idsToReload =
+        changeTrackingResult.isSuccess()
+            ? validateCacheEntries(entityIds, resolvedEntities, 
changeTrackingResult)
+            : List.of();
+    return idsToReload.isEmpty();
+  }
+
+  private List<PolarisEntityId> validateCacheEntries(
+      List<PolarisEntityId> entityIds,
+      Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities,
+      ChangeTrackingResult changeTrackingResult) {
+    List<PolarisEntityId> idsToReload = new ArrayList<>();
+    Iterator<PolarisEntityId> idIterator = entityIds.iterator();
+    Iterator<PolarisChangeTrackingVersions> changeTrackingIterator =
+        changeTrackingResult.getChangeTrackingVersions().iterator();
+    while (idIterator.hasNext() && changeTrackingIterator.hasNext()) {
+      PolarisEntityId entityId = idIterator.next();
+      PolarisChangeTrackingVersions changeTrackingVersions = 
changeTrackingIterator.next();
+      if (changeTrackingVersions == null) {
+        // entity has been purged
+        ResolvedPolarisEntity cachedEntity = getEntityById(entityId.getId());
+        if (cachedEntity != null || resolvedEntities.containsKey(entityId)) {
+          LOGGER.debug("Entity {} has been purged, removing from cache", 
entityId);
+          Optional.ofNullable(cachedEntity).ifPresent(this::removeCacheEntry);
+          resolvedEntities.remove(entityId);
+        }
+        continue;
+      }
+      // compare versions using equals rather than less than so we can use the 
same function to
+      // validate that the cache
+      // entries are consistent with a single call to the change tracking 
table, rather than some
+      // grants ahead and some
+      // grants behind
+      ResolvedPolarisEntity cachedEntity =
+          resolvedEntities.computeIfAbsent(entityId, id -> 
this.getEntityById(id.getId()));
+      if (cachedEntity == null
+          || cachedEntity.getEntity().getEntityVersion()
+              != changeTrackingVersions.getEntityVersion()
+          || cachedEntity.getEntity().getGrantRecordsVersion()
+              != changeTrackingVersions.getGrantRecordsVersion()) {
+        idsToReload.add(entityId);
+      } else {
+        resolvedEntities.put(entityId, cachedEntity);
+      }
+    }
+    LOGGER.debug("Cache entries {} need to be reloaded", idsToReload);
+    return idsToReload;
+  }
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ResolvedEntitiesResult.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ResolvedEntitiesResult.java
new file mode 100644
index 000000000..61eb27da1
--- /dev/null
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ResolvedEntitiesResult.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.persistence.dao.entity;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.util.List;
+import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
+
+/** Response object for the loadResolvedEntities call. */
+public class ResolvedEntitiesResult extends BaseResult {
+  private final List<ResolvedPolarisEntity> resolvedEntities;
+
+  public ResolvedEntitiesResult(List<ResolvedPolarisEntity> resolvedEntities) {
+    super(ReturnStatus.SUCCESS, null);
+    this.resolvedEntities = resolvedEntities;
+  }
+
+  public ResolvedEntitiesResult(
+      @Nonnull ReturnStatus returnStatus, @Nullable String extraInformation) {
+    super(returnStatus, extraInformation);
+    this.resolvedEntities = null;
+  }
+
+  @JsonCreator
+  private ResolvedEntitiesResult(
+      @JsonProperty("returnStatus") ReturnStatus returnStatus,
+      @JsonProperty("extraInformation") String extraInformation,
+      @JsonProperty("resolvedEntities") List<ResolvedPolarisEntity> 
resolvedEntities) {
+    super(returnStatus, extraInformation);
+    this.resolvedEntities = resolvedEntities;
+  }
+
+  public List<ResolvedPolarisEntity> getResolvedEntities() {
+    return resolvedEntities;
+  }
+}
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java
index 902e2c8d6..4eaf40fe5 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java
@@ -389,7 +389,7 @@ public abstract class AbstractTransactionalPersistence 
implements TransactionalP
   /** {@inheritDoc} */
   @Override
   @Nonnull
-  public <T> Page<T> loadEntities(
+  public <T> Page<T> listFullEntities(
       @Nonnull PolarisCallContext callCtx,
       long catalogId,
       long parentId,
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
index 402cdc280..db3ccd0f3 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
@@ -31,6 +31,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDiagnostics;
 import org.apache.polaris.core.config.FeatureConfiguration;
@@ -56,6 +57,7 @@ import 
org.apache.polaris.core.persistence.BaseMetaStoreManager;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
 import org.apache.polaris.core.persistence.PolarisObjectMapperUtil;
 import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
+import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
 import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
 import org.apache.polaris.core.persistence.dao.entity.BaseResult;
 import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult;
@@ -71,6 +73,7 @@ import 
org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult;
 import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult;
 import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult;
 import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
 import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
 import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
 import org.apache.polaris.core.persistence.pagination.Page;
@@ -723,10 +726,10 @@ public class TransactionalMetaStoreManagerImpl extends 
BaseMetaStoreManager {
   }
 
   /**
-   * See {@link PolarisMetaStoreManager#loadEntities(PolarisCallContext, List, 
PolarisEntityType,
-   * PolarisEntitySubType, PageToken)}
+   * See {@link PolarisMetaStoreManager#listFullEntities(PolarisCallContext, 
List,
+   * PolarisEntityType, PolarisEntitySubType, PageToken)}
    */
-  private @Nonnull Page<PolarisBaseEntity> loadEntities(
+  private @Nonnull Page<PolarisBaseEntity> listFullEntities(
       @Nonnull PolarisCallContext callCtx,
       @Nonnull TransactionalPersistence ms,
       @Nullable List<PolarisEntityCore> catalogPath,
@@ -756,7 +759,7 @@ public class TransactionalMetaStoreManagerImpl extends 
BaseMetaStoreManager {
 
   /** {@inheritDoc} */
   @Override
-  public @Nonnull Page<PolarisBaseEntity> loadEntities(
+  public @Nonnull Page<PolarisBaseEntity> listFullEntities(
       @Nonnull PolarisCallContext callCtx,
       @Nullable List<PolarisEntityCore> catalogPath,
       @Nonnull PolarisEntityType entityType,
@@ -768,7 +771,7 @@ public class TransactionalMetaStoreManagerImpl extends 
BaseMetaStoreManager {
     // run operation in a read transaction
     return ms.runInReadTransaction(
         callCtx,
-        () -> loadEntities(callCtx, ms, catalogPath, entityType, 
entitySubType, pageToken));
+        () -> listFullEntities(callCtx, ms, catalogPath, entityType, 
entitySubType, pageToken));
   }
 
   /** {@link #createPrincipal(PolarisCallContext, PrincipalEntity)} */
@@ -2293,6 +2296,57 @@ public class TransactionalMetaStoreManagerImpl extends 
BaseMetaStoreManager {
     return result;
   }
 
+  private static ResolvedEntitiesResult getResolvedEntitiesResult(
+      PolarisCallContext callCtx,
+      TransactionalPersistence ms,
+      List<PolarisEntityId> entityIds,
+      Function<Integer, PolarisEntityType> entityTypeForIndex) {
+    List<PolarisBaseEntity> entities = ms.lookupEntitiesInCurrentTxn(callCtx, 
entityIds);
+    // mimic the behavior of loadEntity above, return null if not found or 
type mismatch
+    List<ResolvedPolarisEntity> ret =
+        IntStream.range(0, entityIds.size())
+            .mapToObj(
+                i -> {
+                  if (entities.get(i) != null
+                      && 
!entities.get(i).getType().equals(entityTypeForIndex.apply(i))) {
+                    return null;
+                  } else {
+                    return entities.get(i);
+                  }
+                })
+            .map(e -> toResolvedPolarisEntity(callCtx, e, ms))
+            .collect(Collectors.toList());
+    return new ResolvedEntitiesResult(ret);
+  }
+
+  private static ResolvedPolarisEntity toResolvedPolarisEntity(
+      PolarisCallContext callCtx, PolarisBaseEntity e, 
TransactionalPersistence ms) {
+    if (e == null) {
+      return null;
+    } else {
+      // load the grant records
+      final List<PolarisGrantRecord> grantRecordsAsSecurable =
+          ms.loadAllGrantRecordsOnSecurableInCurrentTxn(callCtx, 
e.getCatalogId(), e.getId());
+      final List<PolarisGrantRecord> grantRecordsAsGrantee =
+          e.getType().isGrantee()
+              ? ms.loadAllGrantRecordsOnSecurableInCurrentTxn(callCtx, 
e.getCatalogId(), e.getId())
+              : List.of();
+      return new ResolvedPolarisEntity(
+          PolarisEntity.of(e), grantRecordsAsGrantee, grantRecordsAsSecurable);
+    }
+  }
+
+  @Nonnull
+  @Override
+  public ResolvedEntitiesResult loadResolvedEntities(
+      @Nonnull PolarisCallContext callCtx,
+      @Nonnull PolarisEntityType entityType,
+      @Nonnull List<PolarisEntityId> entityIds) {
+    TransactionalPersistence ms = ((TransactionalPersistence) 
callCtx.getMetaStore());
+    return ms.runInReadTransaction(
+        callCtx, () -> getResolvedEntitiesResult(callCtx, ms, entityIds, i -> 
entityType));
+  }
+
   /** {@inheritDoc} */
   private @Nonnull ResolvedEntityResult refreshResolvedEntity(
       @Nonnull PolarisCallContext callCtx,
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java
index 3802908b8..dfcd5f925 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java
@@ -225,7 +225,7 @@ public interface TransactionalPersistence
         pageToken);
   }
 
-  /** See {@link 
org.apache.polaris.core.persistence.BasePersistence#loadEntities} */
+  /** See {@link 
org.apache.polaris.core.persistence.BasePersistence#listFullEntities} */
   @Nonnull
   <T> Page<T> loadEntitiesInCurrentTxn(
       @Nonnull PolarisCallContext callCtx,
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
index ce3432050..95774364d 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
@@ -19,16 +19,25 @@
 package org.apache.polaris.core.persistence.cache;
 
 import static 
org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.time.Clock;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
 import org.apache.polaris.core.PolarisDiagnostics;
 import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisChangeTrackingVersions;
 import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityId;
 import org.apache.polaris.core.entity.PolarisEntitySubType;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.entity.PolarisGrantRecord;
@@ -37,6 +46,8 @@ import 
org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
 import org.apache.polaris.core.persistence.PolarisTestMetaStoreManager;
 import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
+import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
 import 
org.apache.polaris.core.persistence.transactional.TransactionalMetaStoreManagerImpl;
 import 
org.apache.polaris.core.persistence.transactional.TransactionalPersistence;
 import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore;
@@ -44,10 +55,14 @@ import 
org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPer
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Unit testing of the entity cache */
 public class InMemoryEntityCacheTest {
 
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(InMemoryEntityCache.class);
   private final PolarisDiagnostics diagServices;
   private final PolarisCallContext callCtx;
   private final PolarisTestMetaStoreManager tm;
@@ -105,31 +120,31 @@ public class InMemoryEntityCacheTest {
     EntityCacheLookupResult lookup =
         cache.getOrLoadEntityByName(
             this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
-    Assertions.assertThat(lookup).isNotNull();
-    Assertions.assertThat(lookup.isCacheHit()).isFalse();
-    Assertions.assertThat(lookup.getCacheEntry()).isNotNull();
+    assertThat(lookup).isNotNull();
+    assertThat(lookup.isCacheHit()).isFalse();
+    assertThat(lookup.getCacheEntry()).isNotNull();
 
     // validate the cache entry
     PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
-    Assertions.assertThat(catalog).isNotNull();
-    
Assertions.assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG);
+    assertThat(catalog).isNotNull();
+    assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG);
 
     // do it again, should be found in the cache
     lookup =
         cache.getOrLoadEntityByName(
             this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
-    Assertions.assertThat(lookup).isNotNull();
-    Assertions.assertThat(lookup.isCacheHit()).isTrue();
+    assertThat(lookup).isNotNull();
+    assertThat(lookup.isCacheHit()).isTrue();
 
     // do it again by id, should be found in the cache
     lookup =
         cache.getOrLoadEntityById(
             this.callCtx, catalog.getCatalogId(), catalog.getId(), 
catalog.getType());
-    Assertions.assertThat(lookup).isNotNull();
-    Assertions.assertThat(lookup.isCacheHit()).isTrue();
-    Assertions.assertThat(lookup.getCacheEntry()).isNotNull();
-    Assertions.assertThat(lookup.getCacheEntry().getEntity()).isNotNull();
-    
Assertions.assertThat(lookup.getCacheEntry().getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(lookup).isNotNull();
+    assertThat(lookup.isCacheHit()).isTrue();
+    assertThat(lookup.getCacheEntry()).isNotNull();
+    assertThat(lookup.getCacheEntry().getEntity()).isNotNull();
+    
assertThat(lookup.getCacheEntry().getGrantRecordsAsSecurable()).isNotNull();
 
     // get N1
     PolarisBaseEntity N1 =
@@ -140,128 +155,119 @@ public class InMemoryEntityCacheTest {
         new EntityCacheByNameKey(
             catalog.getId(), catalog.getId(), PolarisEntityType.NAMESPACE, 
"N1");
     ResolvedPolarisEntity cacheEntry = cache.getEntityByName(N1_name);
-    Assertions.assertThat(cacheEntry).isNull();
+    assertThat(cacheEntry).isNull();
 
     // try to find it in the cache by id. Should not be there, i.e. no cache 
hit
     lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), 
N1.getId(), N1.getType());
-    Assertions.assertThat(lookup).isNotNull();
-    Assertions.assertThat(lookup.isCacheHit()).isFalse();
+    assertThat(lookup).isNotNull();
+    assertThat(lookup.isCacheHit()).isFalse();
 
     // should be there now, by name
     cacheEntry = cache.getEntityByName(N1_name);
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getEntity()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
 
     // should be there now, by id
     cacheEntry = cache.getEntityById(N1.getId());
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getEntity()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
 
     // lookup N1
     ResolvedPolarisEntity N1_entry = cache.getEntityById(N1.getId());
-    Assertions.assertThat(N1_entry).isNotNull();
-    Assertions.assertThat(N1_entry.getEntity()).isNotNull();
-    Assertions.assertThat(N1_entry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(N1_entry).isNotNull();
+    assertThat(N1_entry.getEntity()).isNotNull();
+    assertThat(N1_entry.getGrantRecordsAsSecurable()).isNotNull();
 
     // negative tests, load an entity which does not exist
     lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), 10000, 
N1.getType());
-    Assertions.assertThat(lookup).isNull();
+    assertThat(lookup).isNull();
     lookup =
         cache.getOrLoadEntityByName(
             this.callCtx,
             new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"non_existant_catalog"));
-    Assertions.assertThat(lookup).isNull();
+    assertThat(lookup).isNull();
 
     // lookup N2 to validate grants
     EntityCacheByNameKey N2_name =
         new EntityCacheByNameKey(catalog.getId(), N1.getId(), 
PolarisEntityType.NAMESPACE, "N2");
     lookup = cache.getOrLoadEntityByName(callCtx, N2_name);
-    Assertions.assertThat(lookup).isNotNull();
+    assertThat(lookup).isNotNull();
     ResolvedPolarisEntity cacheEntry_N1 = lookup.getCacheEntry();
-    Assertions.assertThat(cacheEntry_N1).isNotNull();
-    Assertions.assertThat(cacheEntry_N1.getEntity()).isNotNull();
-    
Assertions.assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry_N1).isNotNull();
+    assertThat(cacheEntry_N1.getEntity()).isNotNull();
+    assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).isNotNull();
 
     // lookup catalog role R1
     EntityCacheByNameKey R1_name =
         new EntityCacheByNameKey(
             catalog.getId(), catalog.getId(), PolarisEntityType.CATALOG_ROLE, 
"R1");
     lookup = cache.getOrLoadEntityByName(callCtx, R1_name);
-    Assertions.assertThat(lookup).isNotNull();
+    assertThat(lookup).isNotNull();
     ResolvedPolarisEntity cacheEntry_R1 = lookup.getCacheEntry();
-    Assertions.assertThat(cacheEntry_R1).isNotNull();
-    Assertions.assertThat(cacheEntry_R1.getEntity()).isNotNull();
-    
Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).isNotNull();
-    
Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).isNotNull();
+    assertThat(cacheEntry_R1).isNotNull();
+    assertThat(cacheEntry_R1.getEntity()).isNotNull();
+    assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).isNotNull();
 
     // we expect one TABLE_READ grant on that securable granted to the catalog 
role R1
-    
Assertions.assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).hasSize(1);
+    assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).hasSize(1);
     PolarisGrantRecord gr = cacheEntry_N1.getGrantRecordsAsSecurable().get(0);
 
     // securable is N1, grantee is R1
-    
Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId());
-    Assertions.assertThat(gr.getGranteeCatalogId())
-        .isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
-    
Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId());
-    Assertions.assertThat(gr.getSecurableCatalogId())
-        .isEqualTo(cacheEntry_N1.getEntity().getCatalogId());
-    Assertions.assertThat(gr.getPrivilegeCode())
-        .isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode());
+    assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId());
+    
assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
+    
assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId());
+    
assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_N1.getEntity().getCatalogId());
+    
assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode());
 
     // R1 should have 4 privileges granted to it
-    Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).hasSize(4);
+    assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).hasSize(4);
     List<PolarisGrantRecord> matchPriv =
         cacheEntry_R1.getGrantRecordsAsGrantee().stream()
             .filter(
                 grantRecord ->
                     grantRecord.getPrivilegeCode() == 
PolarisPrivilege.TABLE_READ_DATA.getCode())
             .collect(Collectors.toList());
-    Assertions.assertThat(matchPriv).hasSize(1);
+    assertThat(matchPriv).hasSize(1);
     gr = matchPriv.get(0);
-    
Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId());
-    Assertions.assertThat(gr.getGranteeCatalogId())
-        .isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
-    
Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId());
-    Assertions.assertThat(gr.getSecurableCatalogId())
-        .isEqualTo(cacheEntry_N1.getEntity().getCatalogId());
-    Assertions.assertThat(gr.getPrivilegeCode())
-        .isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode());
+    assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId());
+    
assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
+    
assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId());
+    
assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_N1.getEntity().getCatalogId());
+    
assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode());
 
     // lookup principal role PR1
     EntityCacheByNameKey PR1_name =
         new EntityCacheByNameKey(PolarisEntityType.PRINCIPAL_ROLE, "PR1");
     lookup = cache.getOrLoadEntityByName(callCtx, PR1_name);
-    Assertions.assertThat(lookup).isNotNull();
+    assertThat(lookup).isNotNull();
     ResolvedPolarisEntity cacheEntry_PR1 = lookup.getCacheEntry();
-    Assertions.assertThat(cacheEntry_PR1).isNotNull();
-    Assertions.assertThat(cacheEntry_PR1.getEntity()).isNotNull();
-    
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).isNotNull();
-    
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).isNotNull();
+    assertThat(cacheEntry_PR1).isNotNull();
+    assertThat(cacheEntry_PR1.getEntity()).isNotNull();
+    assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).isNotNull();
 
     // R1 should have 1 CATALOG_ROLE_USAGE privilege granted *on* it to PR1
-    
Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).hasSize(1);
+    assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).hasSize(1);
     gr = cacheEntry_R1.getGrantRecordsAsSecurable().get(0);
-    
Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_R1.getEntity().getId());
-    Assertions.assertThat(gr.getSecurableCatalogId())
-        .isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
-    
Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_PR1.getEntity().getId());
-    Assertions.assertThat(gr.getGranteeCatalogId())
-        .isEqualTo(cacheEntry_PR1.getEntity().getCatalogId());
-    Assertions.assertThat(gr.getPrivilegeCode())
-        .isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode());
+    
assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_R1.getEntity().getId());
+    
assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
+    
assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_PR1.getEntity().getId());
+    
assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_PR1.getEntity().getCatalogId());
+    
assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode());
 
     // PR1 should have 1 grant on it to P1.
-    
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).hasSize(1);
-    
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable().get(0).getPrivilegeCode())
+    assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).hasSize(1);
+    
assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable().get(0).getPrivilegeCode())
         .isEqualTo(PolarisPrivilege.PRINCIPAL_ROLE_USAGE.getCode());
 
     // PR1 should have 2 grants to it, on R1 and R2
-    
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).hasSize(2);
-    
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(0).getPrivilegeCode())
+    assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).hasSize(2);
+    
assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(0).getPrivilegeCode())
         .isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode());
-    
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(1).getPrivilegeCode())
+    
assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(1).getPrivilegeCode())
         .isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode());
   }
 
@@ -274,14 +280,14 @@ public class InMemoryEntityCacheTest {
     EntityCacheLookupResult lookup =
         cache.getOrLoadEntityByName(
             this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
-    Assertions.assertThat(lookup).isNotNull();
-    Assertions.assertThat(lookup.isCacheHit()).isFalse();
+    assertThat(lookup).isNotNull();
+    assertThat(lookup.isCacheHit()).isFalse();
 
     // the catalog
-    Assertions.assertThat(lookup.getCacheEntry()).isNotNull();
+    assertThat(lookup.getCacheEntry()).isNotNull();
     PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
-    Assertions.assertThat(catalog).isNotNull();
-    
Assertions.assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG);
+    assertThat(catalog).isNotNull();
+    assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG);
 
     // find table N5/N6/T6
     PolarisBaseEntity N5 =
@@ -294,23 +300,23 @@ public class InMemoryEntityCacheTest {
             PolarisEntityType.TABLE_LIKE,
             PolarisEntitySubType.ICEBERG_TABLE,
             "T6");
-    Assertions.assertThat(T6v1).isNotNull();
+    assertThat(T6v1).isNotNull();
 
     // that table is not in the cache
     ResolvedPolarisEntity cacheEntry = cache.getEntityById(T6v1.getId());
-    Assertions.assertThat(cacheEntry).isNull();
+    assertThat(cacheEntry).isNull();
 
     // now load that table in the cache
     cacheEntry =
         cache.getAndRefreshIfNeeded(
             this.callCtx, T6v1, T6v1.getEntityVersion(), 
T6v1.getGrantRecordsVersion());
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getEntity()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
     PolarisBaseEntity table = cacheEntry.getEntity();
-    Assertions.assertThat(table.getId()).isEqualTo(T6v1.getId());
-    
Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
-    
Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion());
+    assertThat(table.getId()).isEqualTo(T6v1.getId());
+    assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
+    
assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion());
 
     // update the entity
     PolarisBaseEntity T6v2 =
@@ -319,31 +325,31 @@ public class InMemoryEntityCacheTest {
             T6v1,
             "{\"v2_properties\": \"some value\"}",
             "{\"v2_internal_properties\": \"internal value\"}");
-    Assertions.assertThat(T6v2).isNotNull();
+    assertThat(T6v2).isNotNull();
 
     // now refresh that entity. But because we don't change the versions, 
nothing should be reloaded
     cacheEntry =
         cache.getAndRefreshIfNeeded(
             this.callCtx, T6v1, T6v1.getEntityVersion(), 
T6v1.getGrantRecordsVersion());
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getEntity()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
     table = cacheEntry.getEntity();
-    Assertions.assertThat(table.getId()).isEqualTo(T6v1.getId());
-    
Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
-    
Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion());
+    assertThat(table.getId()).isEqualTo(T6v1.getId());
+    assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
+    
assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion());
 
     // now refresh again, this time with the new versions. Should be reloaded
     cacheEntry =
         cache.getAndRefreshIfNeeded(
             this.callCtx, T6v2, T6v2.getEntityVersion(), 
T6v2.getGrantRecordsVersion());
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getEntity()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
     table = cacheEntry.getEntity();
-    Assertions.assertThat(table.getId()).isEqualTo(T6v2.getId());
-    
Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v2.getEntityVersion());
-    
Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v2.getGrantRecordsVersion());
+    assertThat(table.getId()).isEqualTo(T6v2.getId());
+    assertThat(table.getEntityVersion()).isEqualTo(T6v2.getEntityVersion());
+    
assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v2.getGrantRecordsVersion());
 
     // update it again
     PolarisBaseEntity T6v3 =
@@ -352,7 +358,7 @@ public class InMemoryEntityCacheTest {
             T6v2,
             "{\"v3_properties\": \"some value\"}",
             "{\"v3_internal_properties\": \"internal value\"}");
-    Assertions.assertThat(T6v3).isNotNull();
+    assertThat(T6v3).isNotNull();
 
     // the two catalog roles
     PolarisBaseEntity R1 =
@@ -368,9 +374,9 @@ public class InMemoryEntityCacheTest {
             this.callCtx, N2, N2.getEntityVersion(), 
N2.getGrantRecordsVersion());
 
     // should have one single grant
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1);
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1);
 
     // perform an additional grant to R1
     this.tm.grantPrivilege(R1, List.of(catalog, N1), N2, 
PolarisPrivilege.NAMESPACE_FULL_METADATA);
@@ -380,8 +386,8 @@ public class InMemoryEntityCacheTest {
         this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N2");
 
     // same entity version but different grant records
-    Assertions.assertThat(N2v2).isNotNull();
-    
Assertions.assertThat(N2v2.getGrantRecordsVersion()).isEqualTo(N2.getGrantRecordsVersion()
 + 1);
+    assertThat(N2v2).isNotNull();
+    
assertThat(N2v2.getGrantRecordsVersion()).isEqualTo(N2.getGrantRecordsVersion() 
+ 1);
 
     // the cache is outdated now
     lookup =
@@ -389,24 +395,24 @@ public class InMemoryEntityCacheTest {
             this.callCtx,
             new EntityCacheByNameKey(
                 catalog.getId(), N1.getId(), PolarisEntityType.NAMESPACE, 
"N2"));
-    Assertions.assertThat(lookup).isNotNull();
+    assertThat(lookup).isNotNull();
     cacheEntry = lookup.getCacheEntry();
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1);
-    Assertions.assertThat(cacheEntry.getEntity().getGrantRecordsVersion())
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getEntity()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1);
+    assertThat(cacheEntry.getEntity().getGrantRecordsVersion())
         .isEqualTo(N2.getGrantRecordsVersion());
 
     // now refresh
     cacheEntry =
         cache.getAndRefreshIfNeeded(
             this.callCtx, N2, N2v2.getEntityVersion(), 
N2v2.getGrantRecordsVersion());
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(2);
-    Assertions.assertThat(cacheEntry.getEntity().getGrantRecordsVersion())
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getEntity()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(2);
+    assertThat(cacheEntry.getEntity().getGrantRecordsVersion())
         .isEqualTo(N2v2.getGrantRecordsVersion());
   }
 
@@ -418,23 +424,23 @@ public class InMemoryEntityCacheTest {
     EntityCacheLookupResult lookup =
         cache.getOrLoadEntityByName(
             this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
-    Assertions.assertThat(lookup).isNotNull();
-    Assertions.assertThat(lookup.getCacheEntry()).isNotNull();
+    assertThat(lookup).isNotNull();
+    assertThat(lookup.getCacheEntry()).isNotNull();
     PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
 
     PolarisBaseEntity N1 =
         this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N1");
     lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), 
N1.getId(), N1.getType());
-    Assertions.assertThat(lookup).isNotNull();
+    assertThat(lookup).isNotNull();
 
     EntityCacheByNameKey T4_name =
         new EntityCacheByNameKey(N1.getCatalogId(), N1.getId(), 
PolarisEntityType.TABLE_LIKE, "T4");
     lookup = cache.getOrLoadEntityByName(callCtx, T4_name);
-    Assertions.assertThat(lookup).isNotNull();
+    assertThat(lookup).isNotNull();
     ResolvedPolarisEntity cacheEntry_T4 = lookup.getCacheEntry();
-    Assertions.assertThat(cacheEntry_T4).isNotNull();
-    Assertions.assertThat(cacheEntry_T4.getEntity()).isNotNull();
-    
Assertions.assertThat(cacheEntry_T4.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry_T4).isNotNull();
+    assertThat(cacheEntry_T4.getEntity()).isNotNull();
+    assertThat(cacheEntry_T4.getGrantRecordsAsSecurable()).isNotNull();
 
     PolarisBaseEntity T4 = cacheEntry_T4.getEntity();
 
@@ -445,18 +451,17 @@ public class InMemoryEntityCacheTest {
         new EntityCacheByNameKey(
             N1.getCatalogId(), N1.getId(), PolarisEntityType.TABLE_LIKE, 
"T4_renamed");
     lookup = cache.getOrLoadEntityByName(callCtx, T4_renamed);
-    Assertions.assertThat(lookup).isNotNull();
+    assertThat(lookup).isNotNull();
     ResolvedPolarisEntity cacheEntry_T4_renamed = lookup.getCacheEntry();
-    Assertions.assertThat(cacheEntry_T4_renamed).isNotNull();
+    assertThat(cacheEntry_T4_renamed).isNotNull();
     PolarisBaseEntity T4_renamed_entity = cacheEntry_T4_renamed.getEntity();
 
     // new entry if lookup by id
     EntityCacheLookupResult lookupResult =
         cache.getOrLoadEntityById(callCtx, T4.getCatalogId(), T4.getId(), 
T4.getType());
-    Assertions.assertThat(lookupResult).isNotNull();
-    Assertions.assertThat(lookupResult.getCacheEntry()).isNotNull();
-    Assertions.assertThat(lookupResult.getCacheEntry().getEntity().getName())
-        .isEqualTo("T4_renamed");
+    assertThat(lookupResult).isNotNull();
+    assertThat(lookupResult.getCacheEntry()).isNotNull();
+    
assertThat(lookupResult.getCacheEntry().getEntity().getName()).isEqualTo("T4_renamed");
 
     // old name is gone, replaced by new name
     // Assertions.assertNull(cache.getOrLoadEntityByName(callCtx, T4_name));
@@ -469,7 +474,7 @@ public class InMemoryEntityCacheTest {
         T4_renamed_entity.getGrantRecordsVersion());
 
     // now the loading by the old name should return null
-    Assertions.assertThat(cache.getOrLoadEntityByName(callCtx, 
T4_name)).isNull();
+    assertThat(cache.getOrLoadEntityByName(callCtx, T4_name)).isNull();
   }
 
   /* Helper for `testEntityWeigher` */
@@ -495,7 +500,584 @@ public class InMemoryEntityCacheTest {
             .setMetadataLocation("a".repeat(1000 * 1000))
             .build();
 
-    
Assertions.assertThat(getEntityWeight(smallEntity)).isLessThan(getEntityWeight(mediumEntity));
-    
Assertions.assertThat(getEntityWeight(mediumEntity)).isLessThan(getEntityWeight(largeEntity));
+    
assertThat(getEntityWeight(smallEntity)).isLessThan(getEntityWeight(mediumEntity));
+    
assertThat(getEntityWeight(mediumEntity)).isLessThan(getEntityWeight(largeEntity));
+  }
+
+  @Test
+  public void testBatchLoadByEntityIds() {
+    // get a new cache
+    InMemoryEntityCache cache = this.allocateNewCache();
+
+    // Load catalog into cache
+    EntityCacheLookupResult lookup =
+        cache.getOrLoadEntityByName(
+            this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
+    assertThat(lookup).isNotNull();
+    PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+    // Get some entities that exist in the test setup
+    PolarisBaseEntity N1 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N1");
+    PolarisBaseEntity N5 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N5");
+    PolarisBaseEntity N5_N6 =
+        this.tm.ensureExistsByName(List.of(catalog, N5), 
PolarisEntityType.NAMESPACE, "N6");
+
+    // Pre-load N1 into cache
+    cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), N1.getId(), 
N1.getType());
+
+    // Create list of entity IDs - N1 is already cached, N5, N5_N6 are not
+    List<PolarisEntityId> entityIds =
+        List.of(getPolarisEntityId(N1), getPolarisEntityId(N5), 
getPolarisEntityId(N5_N6));
+
+    // Test batch loading by entity IDs (all are namespaces)
+    List<EntityCacheLookupResult> results =
+        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.NAMESPACE, entityIds);
+
+    // Verify all entities were found
+    assertThat(results).hasSize(3);
+    assertThat(results.get(0)).isNotNull(); // N1 - was cached
+    assertThat(results.get(1)).isNotNull(); // N5 - was loaded
+    assertThat(results.get(2)).isNotNull(); // N5_N6 - was loaded
+
+    // Verify the entities are correct
+    
assertThat(results.get(0).getCacheEntry().getEntity().getId()).isEqualTo(N1.getId());
+    
assertThat(results.get(1).getCacheEntry().getEntity().getId()).isEqualTo(N5.getId());
+    
assertThat(results.get(2).getCacheEntry().getEntity().getId()).isEqualTo(N5_N6.getId());
+
+    // All should be cache hits now since they were loaded in the previous call
+    assertThat(results.get(0).isCacheHit()).isTrue();
+    assertThat(results.get(1).isCacheHit()).isTrue();
+    assertThat(results.get(2).isCacheHit()).isTrue();
+
+    // Test with a non-existent entity ID
+    List<PolarisEntityId> nonExistentIds =
+        List.of(new PolarisEntityId(catalog.getCatalogId(), 99999L));
+    List<EntityCacheLookupResult> nonExistentResults =
+        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.NAMESPACE, nonExistentIds);
+
+    assertThat(nonExistentResults).hasSize(1);
+    assertThat(nonExistentResults.get(0)).isNull();
+
+    // Test with table entities separately
+    PolarisBaseEntity T6 =
+        this.tm.ensureExistsByName(
+            List.of(catalog, N5, N5_N6),
+            PolarisEntityType.TABLE_LIKE,
+            PolarisEntitySubType.ICEBERG_TABLE,
+            "T6");
+
+    List<PolarisEntityId> tableIds = List.of(getPolarisEntityId(T6));
+
+    List<EntityCacheLookupResult> tableResults =
+        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.TABLE_LIKE, tableIds);
+
+    assertThat(tableResults).hasSize(1);
+    assertThat(tableResults.get(0)).isNotNull();
+    
assertThat(tableResults.get(0).getCacheEntry().getEntity().getId()).isEqualTo(T6.getId());
+  }
+
+  @Test
+  public void testBatchLoadWithStaleVersions() {
+    // get a new cache
+    InMemoryEntityCache cache = this.allocateNewCache();
+
+    // Load catalog into cache
+    EntityCacheLookupResult lookup =
+        cache.getOrLoadEntityByName(
+            this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
+    assertThat(lookup).isNotNull();
+    PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+    // Get table T6 that we can update
+    PolarisBaseEntity N5 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N5");
+    PolarisBaseEntity N5_N6 =
+        this.tm.ensureExistsByName(List.of(catalog, N5), 
PolarisEntityType.NAMESPACE, "N6");
+    PolarisBaseEntity T6v1 =
+        this.tm.ensureExistsByName(
+            List.of(catalog, N5, N5_N6),
+            PolarisEntityType.TABLE_LIKE,
+            PolarisEntitySubType.ICEBERG_TABLE,
+            "T6");
+
+    // Load T6 into cache initially
+    cache.getOrLoadEntityById(this.callCtx, T6v1.getCatalogId(), T6v1.getId(), 
T6v1.getType());
+
+    // Verify it's in cache with original version
+    ResolvedPolarisEntity cachedT6 = cache.getEntityById(T6v1.getId());
+    assertThat(cachedT6).isNotNull();
+    
assertThat(cachedT6.getEntity().getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
+
+    // Update the entity to create a new version
+    PolarisBaseEntity T6v2 =
+        this.tm.updateEntity(
+            List.of(catalog, N5, N5_N6),
+            T6v1,
+            "{\"v2_properties\": \"some value\"}",
+            "{\"v2_internal_properties\": \"internal value\"}");
+    assertThat(T6v2).isNotNull();
+    assertThat(T6v2.getEntityVersion()).isGreaterThan(T6v1.getEntityVersion());
+
+    List<EntityCacheLookupResult> results;
+    // Create entity ID list with the updated entity
+    List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(T6v2));
+
+    // Call batch load - this should detect the stale version and reload
+    results =
+        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.TABLE_LIKE, entityIds);
+
+    // Verify the entity was reloaded with the new version
+    assertThat(results).hasSize(1);
+    assertThat(results.get(0)).isNotNull();
+
+    ResolvedPolarisEntity reloadedT6 = results.get(0).getCacheEntry();
+    assertThat(reloadedT6.getEntity().getId()).isEqualTo(T6v2.getId());
+    
assertThat(reloadedT6.getEntity().getEntityVersion()).isEqualTo(T6v2.getEntityVersion());
+
+    // Verify the cache now contains the updated version
+    cachedT6 = cache.getEntityById(T6v2.getId());
+    assertThat(cachedT6).isNotNull();
+    
assertThat(cachedT6.getEntity().getEntityVersion()).isEqualTo(T6v2.getEntityVersion());
+  }
+
+  @Test
+  public void testBatchLoadWithStaleGrantVersions() {
+    // get a new cache
+    InMemoryEntityCache cache = this.allocateNewCache();
+
+    // Load catalog into cache
+    EntityCacheLookupResult lookup =
+        cache.getOrLoadEntityByName(
+            this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
+    assertThat(lookup).isNotNull();
+    PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+    // Get entities we'll work with
+    PolarisBaseEntity N1 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N1");
+    PolarisBaseEntity N2 =
+        this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N2");
+    PolarisBaseEntity R1 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.CATALOG_ROLE, "R1");
+
+    // Load N2 into cache initially
+    cache.getOrLoadEntityByName(
+        this.callCtx,
+        new EntityCacheByNameKey(catalog.getId(), N1.getId(), 
PolarisEntityType.NAMESPACE, "N2"));
+
+    // Verify it's in cache with original grant version
+    ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId());
+    assertThat(cachedN2).isNotNull();
+    int originalGrantVersion = cachedN2.getEntity().getGrantRecordsVersion();
+
+    // Grant additional privilege to change grant version
+    this.tm.grantPrivilege(R1, List.of(catalog, N1), N2, 
PolarisPrivilege.NAMESPACE_FULL_METADATA);
+
+    // Get the updated entity with new grant version
+    PolarisBaseEntity N2Updated =
+        this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N2");
+    
assertThat(N2Updated.getGrantRecordsVersion()).isGreaterThan(originalGrantVersion);
+
+    // Create entity ID list
+    List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(N2Updated));
+
+    // Call batch load - this should detect the stale grant version and reload
+    List<EntityCacheLookupResult> results =
+        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.NAMESPACE, entityIds);
+
+    // Verify the entity was reloaded with the new grant version
+    assertThat(results).hasSize(1);
+    assertThat(results.get(0)).isNotNull();
+
+    ResolvedPolarisEntity reloadedN2 = results.get(0).getCacheEntry();
+    assertThat(reloadedN2.getEntity().getId()).isEqualTo(N2Updated.getId());
+    assertThat(reloadedN2.getEntity().getGrantRecordsVersion())
+        .isEqualTo(N2Updated.getGrantRecordsVersion());
+
+    // Should now have more grant records
+    
assertThat(reloadedN2.getGrantRecordsAsSecurable().size()).isGreaterThan(1);
+
+    // Verify the cache now contains the updated grant version
+    cachedN2 = cache.getEntityById(N2Updated.getId());
+    assertThat(cachedN2).isNotNull();
+    assertThat(cachedN2.getEntity().getGrantRecordsVersion())
+        .isEqualTo(N2Updated.getGrantRecordsVersion());
+  }
+
+  @Test
+  public void testBatchLoadVersionRetryLogic() {
+    // get a new cache
+    PolarisMetaStoreManager metaStoreManager = 
Mockito.spy(this.metaStoreManager);
+    InMemoryEntityCache cache =
+        new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(), 
metaStoreManager);
+
+    // Load catalog into cache
+    EntityCacheLookupResult lookup =
+        cache.getOrLoadEntityByName(
+            this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
+    assertThat(lookup).isNotNull();
+    PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+    // Get entities that we can work with
+    PolarisBaseEntity N1 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N1");
+    PolarisBaseEntity N2 =
+        this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N2");
+
+    // Load N2 into cache initially
+    cache.getOrLoadEntityByName(
+        this.callCtx,
+        new EntityCacheByNameKey(catalog.getId(), N1.getId(), 
PolarisEntityType.NAMESPACE, "N2"));
+
+    // Verify it's in cache with original version
+    ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId());
+    assertThat(cachedN2).isNotNull();
+    int originalEntityVersion = cachedN2.getEntity().getEntityVersion();
+
+    // Update the entity multiple times to create version skew
+    PolarisBaseEntity N2v2 =
+        this.tm.updateEntity(List.of(catalog, N1), N2, "{\"v2\": \"value\"}", 
null);
+
+    // the first call should return the first version, then we call the real 
method to get the
+    // latest
+    Mockito.doReturn(
+            new ChangeTrackingResult(
+                List.of(
+                    changeTrackingFor(catalog), changeTrackingFor(N1), 
changeTrackingFor(N2v2))))
+        .when(metaStoreManager)
+        .loadEntitiesChangeTracking(Mockito.any(), Mockito.any());
+    Mockito.doCallRealMethod()
+        .when(metaStoreManager)
+        .loadEntitiesChangeTracking(Mockito.any(), Mockito.any());
+
+    // update again to create v3, which isn't returned in the change tracking 
result
+    PolarisBaseEntity N2v3 =
+        this.tm.updateEntity(List.of(catalog, N1), N2v2, "{\"v3\": 
\"value\"}", null);
+
+    // Verify versions increased
+    assertThat(N2v2.getEntityVersion()).isGreaterThan(originalEntityVersion);
+    assertThat(N2v3.getEntityVersion()).isGreaterThan(N2v2.getEntityVersion());
+
+    // Create entity ID list
+    List<PolarisEntityId> entityIds =
+        List.of(getPolarisEntityId(catalog), getPolarisEntityId(N1), 
getPolarisEntityId(N2));
+
+    // Call batch load - this should detect the stale versions and reload 
until consistent
+    List<EntityCacheLookupResult> results =
+        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.NAMESPACE, entityIds);
+
+    // Verify the entity was reloaded with the latest version
+    assertThat(results).hasSize(3);
+    assertThat(results)
+        .doesNotContainNull()
+        .extracting(EntityCacheLookupResult::getCacheEntry)
+        .doesNotContainNull()
+        .extracting(e -> e.getEntity().getId())
+        .containsExactly(catalog.getId(), N1.getId(), N2.getId());
+
+    ResolvedPolarisEntity reloadedN2 = results.get(2).getCacheEntry();
+    assertThat(reloadedN2.getEntity().getId()).isEqualTo(N2v3.getId());
+    
assertThat(reloadedN2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion());
+
+    // Verify the cache now contains the latest version
+    cachedN2 = cache.getEntityById(N2v3.getId());
+    assertThat(cachedN2).isNotNull();
+    
assertThat(cachedN2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion());
+  }
+
+  @Test
+  public void testBatchLoadVersionRetryFailsAfterMaxAttempts() {
+    // get a new cache
+    PolarisMetaStoreManager metaStoreManager = 
Mockito.spy(this.metaStoreManager);
+    InMemoryEntityCache cache =
+        new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(), 
metaStoreManager);
+
+    // Load catalog into cache
+    EntityCacheLookupResult lookup =
+        cache.getOrLoadEntityByName(
+            this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
+    assertThat(lookup).isNotNull();
+    PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+    // Get entities that we can work with
+    PolarisBaseEntity N1 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N1");
+    PolarisBaseEntity N2 =
+        this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N2");
+
+    // Load N2 into cache initially
+    cache.getOrLoadEntityByName(
+        this.callCtx,
+        new EntityCacheByNameKey(catalog.getId(), N1.getId(), 
PolarisEntityType.NAMESPACE, "N2"));
+
+    // Verify it's in cache with original version
+    ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId());
+    assertThat(cachedN2).isNotNull();
+    int originalEntityVersion = cachedN2.getEntity().getEntityVersion();
+
+    // Update the entity multiple times to create version skew
+    PolarisBaseEntity N2v2 =
+        this.tm.updateEntity(List.of(catalog, N1), N2, "{\"v2\": \"value\"}", 
null);
+
+    // return v2 for the change tracking
+    Mockito.doReturn(
+            new ChangeTrackingResult(
+                List.of(
+                    changeTrackingFor(catalog), changeTrackingFor(N1), 
changeTrackingFor(N2v2))))
+        .when(metaStoreManager)
+        .loadEntitiesChangeTracking(Mockito.any(), Mockito.any());
+
+    // update again to create v3, which isn't returned in the change tracking 
result
+    PolarisBaseEntity N2v3 =
+        this.tm.updateEntity(List.of(catalog, N1), N2v2, "{\"v3\": 
\"value\"}", null);
+
+    // Verify versions increased
+    assertThat(N2v2.getEntityVersion()).isGreaterThan(originalEntityVersion);
+    assertThat(N2v3.getEntityVersion()).isGreaterThan(N2v2.getEntityVersion());
+
+    // Create entity ID list
+    List<PolarisEntityId> entityIds =
+        List.of(getPolarisEntityId(catalog), getPolarisEntityId(N1), 
getPolarisEntityId(N2));
+    Assertions.assertThatThrownBy(
+            () ->
+                cache.getOrLoadResolvedEntities(
+                    this.callCtx, PolarisEntityType.NAMESPACE, entityIds))
+        .isInstanceOf(RuntimeException.class);
+  }
+
+  private static PolarisEntityId getPolarisEntityId(PolarisBaseEntity catalog) 
{
+    return new PolarisEntityId(catalog.getCatalogId(), catalog.getId());
+  }
+
+  @Test
+  public void testConcurrentClientLoadingBehavior() throws Exception {
+    // Load catalog into cache
+    EntityCacheLookupResult lookup =
+        allocateNewCache()
+            .getOrLoadEntityByName(
+                this.callCtx, new 
EntityCacheByNameKey(PolarisEntityType.CATALOG, "test"));
+    assertThat(lookup).isNotNull();
+    PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+    // Get multiple entities to create a larger list for concurrent processing
+    PolarisBaseEntity N1 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N1");
+    PolarisBaseEntity N2 =
+        this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N2");
+    PolarisBaseEntity N3 =
+        this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N3");
+    PolarisBaseEntity N5 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N5");
+    PolarisBaseEntity N5_N6 =
+        this.tm.ensureExistsByName(List.of(catalog, N5), 
PolarisEntityType.NAMESPACE, "N6");
+
+    // Create entity IDs list for both clients
+    List<PolarisEntityId> entityIds =
+        List.of(
+            getPolarisEntityId(N1),
+            getPolarisEntityId(N2),
+            getPolarisEntityId(N3),
+            getPolarisEntityId(N5),
+            getPolarisEntityId(N5_N6));
+
+    // Update one of the entities to create version differences
+    PolarisBaseEntity N2v2 =
+        this.tm.updateEntity(
+            List.of(catalog, N1), N2, "{\"concurrent_test\": 
\"client1_version\"}", null);
+    PolarisBaseEntity N2v3 =
+        this.tm.updateEntity(
+            List.of(catalog, N1), N2v2, "{\"concurrent_test\": 
\"client2_version\"}", null);
+
+    // Mock the metastore manager to control the timing of method calls
+    PolarisMetaStoreManager mockedMetaStoreManager = 
Mockito.spy(this.metaStoreManager);
+
+    // Create caches with the mocked metastore manager
+    InMemoryEntityCache cache =
+        new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(), 
mockedMetaStoreManager);
+
+    // Synchronization primitives for controlling execution order
+    CountDownLatch client1ChangeTrackingResult = new CountDownLatch(1);
+    Semaphore client1ResolvedEntitiesBlocker = new Semaphore(0);
+
+    // Atomic references to capture results from both threads
+    AtomicReference<Exception> client1Exception = new AtomicReference<>();
+    AtomicReference<Exception> client2Exception = new AtomicReference<>();
+
+    // Configure mock behavior:
+    // 1. Allow both threads to call loadEntitiesChangeTracking() with 
different versions
+    // 2. Block client1's loadResolvedEntities() until client2's 
loadResolvedEntities() completes
+
+    // Mock loadEntitiesChangeTracking to return different versions for each 
client
+    Mockito.doAnswer(
+            invocation -> {
+              // First call (client1) - returns older version for N2
+              LOGGER.debug("Returning change tracking for client1");
+              return new ChangeTrackingResult(
+                  List.of(
+                      changeTrackingFor(N1),
+                      changeTrackingFor(N2v2), // older version
+                      changeTrackingFor(N3),
+                      changeTrackingFor(N5),
+                      changeTrackingFor(N5_N6)));
+            })
+        .doAnswer(
+            invocation -> {
+              // Second call (client2) - returns newer version for N2
+              LOGGER.debug("Returning change tracking for client2");
+              return new ChangeTrackingResult(
+                  List.of(
+                      changeTrackingFor(N1),
+                      changeTrackingFor(N2v3), // newer version
+                      changeTrackingFor(N3),
+                      changeTrackingFor(N5),
+                      changeTrackingFor(N5_N6)));
+            })
+        .when(mockedMetaStoreManager)
+        .loadEntitiesChangeTracking(Mockito.any(), Mockito.any());
+
+    // Mock loadResolvedEntities to control timing - client1 blocks until 
client2 completes
+    // client1 receives the older version of all entities, while client2 
receives the newer version
+    // of N2
+    Answer client1Answer =
+        invocation -> {
+          // This is client1's loadResolvedEntities call - block until client2 
completes
+          try {
+            client1ChangeTrackingResult.countDown();
+            LOGGER.debug("Awaiting client2 to complete resolved entities 
load");
+            client1ResolvedEntitiesBlocker.acquire(); // Block until client2 
signals completion
+            List<ResolvedPolarisEntity> resolvedEntities =
+                List.of(
+                    getResolvedPolarisEntity(N1),
+                    getResolvedPolarisEntity(N2v2),
+                    getResolvedPolarisEntity(N3),
+                    getResolvedPolarisEntity(N5),
+                    getResolvedPolarisEntity(N5_N6));
+            LOGGER.debug("Client1 returning results {}", resolvedEntities);
+            return new ResolvedEntitiesResult(resolvedEntities);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
+        };
+    Answer client2Answer =
+        invocation -> {
+          // This is client2's loadResolvedEntities call - execute normally 
and signal client1
+          try {
+            LOGGER.debug("Client2 loading resolved entities");
+            var result =
+                new ResolvedEntitiesResult(
+                    List.of(
+                        getResolvedPolarisEntity(N1),
+                        getResolvedPolarisEntity(N2v3),
+                        getResolvedPolarisEntity(N3),
+                        getResolvedPolarisEntity(N5),
+                        getResolvedPolarisEntity(N5_N6)));
+            client1ResolvedEntitiesBlocker.release(); // Allow client1 to 
proceed
+            LOGGER.debug("Client2 returning results {}", 
result.getResolvedEntities());
+            return result;
+          } catch (Exception e) {
+            client1ResolvedEntitiesBlocker.release(); // Release in case of 
error
+            throw e;
+          }
+        };
+    Mockito.doAnswer(client1Answer)
+        .doAnswer(client2Answer)
+        .when(mockedMetaStoreManager)
+        .loadResolvedEntities(Mockito.any(), Mockito.any(), Mockito.any());
+
+    // ExecutorService isn't AutoCloseable in JDK 11 :(
+    ExecutorService executorService = Executors.newFixedThreadPool(2);
+    try {
+      // Client 1 task - should get older version and be blocked during 
loadResolvedEntities
+      Future<List<EntityCacheLookupResult>> client1Task =
+          executorService.submit(
+              () -> {
+                try {
+                  // Client1 calls getOrLoadResolvedEntities
+                  // - loadEntitiesChangeTracking returns older version for N2
+                  // - loadResolvedEntities will block until client2 completes
+                  return cache.getOrLoadResolvedEntities(
+                      this.callCtx, PolarisEntityType.NAMESPACE, entityIds);
+                } catch (Exception e) {
+                  client1Exception.set(e);
+                  return null;
+                }
+              });
+
+      // Client 2 task - should get newer version and complete first
+      Future<List<EntityCacheLookupResult>> client2Task =
+          executorService.submit(
+              () -> {
+                try {
+                  // Client2 calls getOrLoadResolvedEntities
+                  // - loadEntitiesChangeTracking returns newer version for N2
+                  // - loadResolvedEntities executes normally and signals 
client1 when done
+                  client1ChangeTrackingResult.await();
+                  return cache.getOrLoadResolvedEntities(
+                      this.callCtx, PolarisEntityType.NAMESPACE, entityIds);
+                } catch (Exception e) {
+                  client2Exception.set(e);
+                  client1ResolvedEntitiesBlocker.release(); // Release in case 
of error
+                  return null;
+                }
+              });
+
+      // Wait for both tasks to complete
+      List<EntityCacheLookupResult> client1Results = client1Task.get();
+      List<EntityCacheLookupResult> client2Results = client2Task.get();
+
+      // Verify no exceptions occurred
+      assertThat(client1Exception.get()).isNull();
+      assertThat(client2Exception.get()).isNull();
+
+      // Verify both clients got results
+      assertThat(client1Results).isNotNull();
+      assertThat(client2Results).isNotNull();
+      assertThat(client1Results).hasSize(5);
+      assertThat(client2Results).hasSize(5);
+
+      // All entities should be found
+      assertThat(client1Results).doesNotContainNull();
+      assertThat(client2Results).doesNotContainNull();
+
+      // Verify that client1 got the older version of N2 (index 1 in the list)
+      ResolvedPolarisEntity client1N2 = client1Results.get(1).getCacheEntry();
+      assertThat(client1N2.getEntity().getId()).isEqualTo(N2.getId());
+      
assertThat(client1N2.getEntity().getEntityVersion()).isEqualTo(N2v2.getEntityVersion());
+
+      // Verify that client2 got the newer version of N2
+      ResolvedPolarisEntity client2N2 = client2Results.get(1).getCacheEntry();
+      assertThat(client2N2.getEntity().getId()).isEqualTo(N2.getId());
+      
assertThat(client2N2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion());
+
+      // Verify that both clients got consistent versions for other entities
+      for (int i = 0; i < 5; i++) {
+        if (i != 1) { // Skip N2 which we expect to be different
+          ResolvedPolarisEntity client1Entity = 
client1Results.get(i).getCacheEntry();
+          ResolvedPolarisEntity client2Entity = 
client2Results.get(i).getCacheEntry();
+
+          assertThat(client1Entity.getEntity().getId())
+              .isEqualTo(client2Entity.getEntity().getId());
+          assertThat(client1Entity.getEntity().getEntityVersion())
+              .isEqualTo(client2Entity.getEntity().getEntityVersion());
+          assertThat(client1Entity.getEntity().getGrantRecordsVersion())
+              .isEqualTo(client2Entity.getEntity().getGrantRecordsVersion());
+        }
+      }
+      assertThat(entityIds).extracting(id -> 
cache.getEntityById(id.getId())).doesNotContainNull();
+    } finally {
+      executorService.shutdown();
+    }
+  }
+
+  private static ResolvedPolarisEntity 
getResolvedPolarisEntity(PolarisBaseEntity catalog) {
+    return new ResolvedPolarisEntity(PolarisEntity.of(catalog), List.of(), 
List.of());
+  }
+
+  private static PolarisChangeTrackingVersions 
changeTrackingFor(PolarisBaseEntity entity) {
+    return new PolarisChangeTrackingVersions(
+        entity.getEntityVersion(), entity.getGrantRecordsVersion());
   }
 }
diff --git 
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
 
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
index 43a0c5581..f8816260a 100644
--- 
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
+++ 
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
@@ -118,7 +118,7 @@ public abstract class BasePolarisMetaStoreManagerTest {
         .containsExactly(PolarisEntity.toCore(task1), 
PolarisEntity.toCore(task2));
 
     List<PolarisBaseEntity> listedEntities =
-        metaStoreManager.loadEntitiesAll(
+        metaStoreManager.listFullEntitiesAll(
             polarisTestMetaStoreManager.polarisCallContext,
             null,
             PolarisEntityType.TASK,
@@ -238,6 +238,12 @@ public abstract class BasePolarisMetaStoreManagerTest {
     polarisTestMetaStoreManager.testLookup();
   }
 
+  /** test batch entity load */
+  @Test
+  protected void testLoadResolvedEntitiesById() {
+    polarisTestMetaStoreManager.testLoadResolvedEntitiesById();
+  }
+
   /** Test the set of functions for the entity cache */
   @Test
   protected void testEntityCache() {
diff --git 
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java
 
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java
index c8e486ee6..b32a4059d 100644
--- 
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java
+++ 
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.polaris.core.PolarisCallContext;
@@ -48,6 +49,7 @@ import 
org.apache.polaris.core.persistence.dao.entity.EntityResult;
 import org.apache.polaris.core.persistence.dao.entity.LoadGrantsResult;
 import org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult;
 import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
 import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
 import org.apache.polaris.core.persistence.pagination.PageToken;
 import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
@@ -55,6 +57,7 @@ import org.apache.polaris.core.policy.PolicyEntity;
 import org.apache.polaris.core.policy.PolicyType;
 import org.apache.polaris.core.policy.PredefinedPolicyTypes;
 import org.assertj.core.api.Assertions;
+import org.assertj.core.api.InstanceOfAssertFactories;
 
 /** Test the Polaris persistence layer */
 public class PolarisTestMetaStoreManager {
@@ -650,6 +653,7 @@ public class PolarisTestMetaStoreManager {
             .parentId(parentId)
             .name(name)
             .propertiesAsMap(properties)
+            .internalPropertiesAsMap(Map.of())
             .build();
     PolarisBaseEntity entity =
         polarisMetaStoreManager
@@ -2685,6 +2689,114 @@ public class PolarisTestMetaStoreManager {
     this.ensureNotExistsById(catalog.getId(), T1.getId(), 
PolarisEntityType.NAMESPACE);
   }
 
+  public void testLoadResolvedEntitiesById() {
+    // load all principals
+    List<EntityNameLookupRecord> principals =
+        polarisMetaStoreManager
+            .listEntities(
+                this.polarisCallContext,
+                null,
+                PolarisEntityType.PRINCIPAL,
+                PolarisEntitySubType.NULL_SUBTYPE,
+                PageToken.readEverything())
+            .getEntities();
+
+    // create new catalog
+    PolarisBaseEntity catalog =
+        new PolarisBaseEntity(
+            PolarisEntityConstants.getNullId(),
+            
polarisMetaStoreManager.generateNewEntityId(this.polarisCallContext).getId(),
+            PolarisEntityType.CATALOG,
+            PolarisEntitySubType.NULL_SUBTYPE,
+            PolarisEntityConstants.getRootEntityId(),
+            "test");
+    CreateCatalogResult catalogCreated =
+        polarisMetaStoreManager.createCatalog(this.polarisCallContext, 
catalog, List.of());
+    Assertions.assertThat(catalogCreated).isNotNull();
+
+    // load the catalog again, since the grant versions are different
+    catalog =
+        polarisMetaStoreManager
+            .loadEntity(
+                polarisCallContext,
+                0L,
+                catalogCreated.getCatalog().getId(),
+                PolarisEntityType.CATALOG)
+            .getEntity();
+
+    // now create all objects
+    PolarisBaseEntity N1 = this.createEntity(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N1");
+    PolarisBaseEntity N1_N2 =
+        this.createEntity(List.of(catalog, N1), PolarisEntityType.NAMESPACE, 
"N2");
+    PolarisBaseEntity T1 =
+        this.createEntity(
+            List.of(catalog, N1, N1_N2),
+            PolarisEntityType.TABLE_LIKE,
+            PolarisEntitySubType.ICEBERG_TABLE,
+            "T1");
+
+    // batch load all entities. They should all be present and non-null
+    ResolvedEntitiesResult entitiesResult =
+        polarisMetaStoreManager.loadResolvedEntities(
+            polarisCallContext,
+            PolarisEntityType.NAMESPACE,
+            List.of(
+                new PolarisEntityId(N1.getCatalogId(), N1.getId()),
+                new PolarisEntityId(N1_N2.getCatalogId(), N1_N2.getId())));
+    Assertions.assertThat(entitiesResult)
+        .isNotNull()
+        .returns(BaseResult.ReturnStatus.SUCCESS, 
ResolvedEntitiesResult::getReturnStatus)
+        .extracting(
+            ResolvedEntitiesResult::getResolvedEntities,
+            InstanceOfAssertFactories.list(ResolvedPolarisEntity.class))
+        .hasSize(2)
+        .allSatisfy(entity -> Assertions.assertThat(entity).isNotNull())
+        .extracting(r -> getEntityCore(r.getEntity()))
+        .containsExactly(getEntityCore(N1), getEntityCore(N1_N2));
+
+    // try entities which do not exist
+    entitiesResult =
+        polarisMetaStoreManager.loadResolvedEntities(
+            polarisCallContext,
+            PolarisEntityType.CATALOG,
+            List.of(
+                new PolarisEntityId(catalog.getId(), 27),
+                new PolarisEntityId(catalog.getId(), 35)));
+    Assertions.assertThat(entitiesResult)
+        .isNotNull()
+        .returns(BaseResult.ReturnStatus.SUCCESS, 
ResolvedEntitiesResult::getReturnStatus)
+        .extracting(
+            ResolvedEntitiesResult::getResolvedEntities,
+            InstanceOfAssertFactories.list(ResolvedPolarisEntity.class))
+        .hasSize(2)
+        .allSatisfy(entity -> Assertions.assertThat(entity).isNull());
+
+    // existing entities, some with wrong type
+    entitiesResult =
+        polarisMetaStoreManager.loadResolvedEntities(
+            polarisCallContext,
+            PolarisEntityType.NAMESPACE,
+            List.of(
+                new PolarisEntityId(catalog.getCatalogId(), catalog.getId()),
+                new PolarisEntityId(catalog.getId(), N1_N2.getId()),
+                new PolarisEntityId(catalog.getId(), T1.getId())));
+    Assertions.assertThat(entitiesResult)
+        .isNotNull()
+        .returns(BaseResult.ReturnStatus.SUCCESS, 
ResolvedEntitiesResult::getReturnStatus)
+        .extracting(
+            ResolvedEntitiesResult::getResolvedEntities,
+            InstanceOfAssertFactories.list(ResolvedPolarisEntity.class))
+        .hasSize(3)
+        .filteredOn(Objects::nonNull)
+        .hasSize(1)
+        .extracting(r -> getEntityCore(r.getEntity()))
+        .containsExactly(getEntityCore(N1_N2));
+  }
+
+  private static PolarisEntityCore getEntityCore(PolarisBaseEntity entity) {
+    return new PolarisEntityCore.Builder<>(entity).build();
+  }
+
   /** Test the set of functions for the entity cache */
   public void testEntityCache() {
     // create test catalog
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
index b64394844..a80d28965 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
@@ -965,7 +965,7 @@ public class PolarisAdminService {
   /** List all catalogs without checking for permission. */
   private Stream<CatalogEntity> listCatalogsUnsafe() {
     return metaStoreManager
-        .loadEntitiesAll(
+        .listFullEntitiesAll(
             getCurrentPolarisContext(),
             null,
             PolarisEntityType.CATALOG,
@@ -1212,7 +1212,7 @@ public class PolarisAdminService {
     authorizeBasicRootOperationOrThrow(op);
 
     return metaStoreManager
-        .loadEntitiesAll(
+        .listFullEntitiesAll(
             getCurrentPolarisContext(),
             null,
             PolarisEntityType.PRINCIPAL,
@@ -1319,7 +1319,7 @@ public class PolarisAdminService {
     authorizeBasicRootOperationOrThrow(op);
 
     return metaStoreManager
-        .loadEntitiesAll(
+        .listFullEntitiesAll(
             getCurrentPolarisContext(),
             null,
             PolarisEntityType.PRINCIPAL_ROLE,
@@ -1443,7 +1443,7 @@ public class PolarisAdminService {
     CatalogEntity catalogEntity = getCatalogByName(resolutionManifest, 
catalogName);
     List<PolarisEntityCore> catalogPath = 
PolarisEntity.toCoreList(List.of(catalogEntity));
     return metaStoreManager
-        .loadEntitiesAll(
+        .listFullEntitiesAll(
             getCurrentPolarisContext(),
             catalogPath,
             PolarisEntityType.CATALOG_ROLE,
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
index 8e4063b87..9b2f7c8a6 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
@@ -188,7 +188,7 @@ public class PolicyCatalog {
     }
     // with a policyType filter we need to load the full PolicyEntity to apply 
the filter
     return metaStoreManager
-        .loadEntitiesAll(
+        .listFullEntitiesAll(
             callContext.getPolarisCallContext(),
             catalogPath,
             PolarisEntityType.POLICY,

Reply via email to