This is an automated email from the ASF dual-hosted git repository.

collado pushed a commit to branch mcollado-loadentities-batch
in repository https://gitbox.apache.org/repos/asf/polaris.git

commit abe9627ab09fa309e9779a1cc6409ffd07aa3832
Author: Michael Collado <[email protected]>
AuthorDate: Tue Sep 23 13:25:42 2025 -0700

    Add loadResolvedEntities by id and entity cache support
---
 .../apache/polaris/core/entity/PolarisEntity.java  |   2 +
 .../AtomicOperationMetaStoreManager.java           |  51 +-
 .../core/persistence/PolarisMetaStoreManager.java  |  21 +-
 .../TransactionWorkspaceMetaStoreManager.java      |  10 +
 .../core/persistence/cache/EntityCache.java        |  35 +
 .../persistence/cache/InMemoryEntityCache.java     | 165 +++-
 .../TransactionalMetaStoreManagerImpl.java         |  43 +-
 .../persistence/cache/InMemoryEntityCacheTest.java | 845 +++++++++++++++++----
 .../BasePolarisMetaStoreManagerTest.java           |   2 +-
 .../polaris/service/admin/PolarisAdminService.java |   8 +-
 .../service/catalog/policy/PolicyCatalog.java      |   2 +-
 11 files changed, 1013 insertions(+), 171 deletions(-)

diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java 
b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java
index 98701af45..60a358538 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java
@@ -234,6 +234,8 @@ public class PolarisEntity extends PolarisBaseEntity {
         + getParentId()
         + ";entityVersion="
         + getEntityVersion()
+        + ";grantVersion="
+        + getGrantRecordsVersion()
         + ";type="
         + getType()
         + ";subType="
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
index de298b70a..22b45c6e8 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
@@ -1804,26 +1804,53 @@ public class AtomicOperationMetaStoreManager extends 
BaseMetaStoreManager {
                     return entities.get(i);
                   }
                 })
-            .map(
-                e -> {
-                  if (e == null) {
+            .map(e -> toResolvedPolarisEntity(callCtx, e, ms))
+            .collect(Collectors.toList());
+    return new ResolvedEntitiesResult(ret);
+  }
+
+  @Nonnull
+  @Override
+  public ResolvedEntitiesResult loadResolvedEntities(
+      @Nonnull PolarisCallContext callCtx,
+      @Nonnull PolarisEntityType entityType,
+      @Nonnull List<PolarisEntityId> entityIds) {
+    BasePersistence ms = callCtx.getMetaStore();
+    List<PolarisBaseEntity> entities = ms.lookupEntities(callCtx, entityIds);
+
+    // mimic the behavior of loadEntity above, return null if not found or 
type mismatch
+    List<ResolvedPolarisEntity> ret =
+        IntStream.range(0, entityIds.size())
+            .mapToObj(
+                i -> {
+                  if (entities.get(i) != null && 
!entities.get(i).getType().equals(entityType)) {
                     return null;
                   } else {
-                    // load the grant records
-                    final List<PolarisGrantRecord> grantRecordsAsSecurable =
-                        ms.loadAllGrantRecordsOnSecurable(callCtx, 
e.getCatalogId(), e.getId());
-                    final List<PolarisGrantRecord> grantRecordsAsGrantee =
-                        e.getType().isGrantee()
-                            ? ms.loadAllGrantRecordsOnGrantee(callCtx, 
e.getCatalogId(), e.getId())
-                            : List.of();
-                    return new ResolvedPolarisEntity(
-                        PolarisEntity.of(e), grantRecordsAsGrantee, 
grantRecordsAsSecurable);
+                    return entities.get(i);
                   }
                 })
+            .map(e -> toResolvedPolarisEntity(callCtx, e, ms))
             .collect(Collectors.toList());
     return new ResolvedEntitiesResult(ret);
   }
 
+  private static ResolvedPolarisEntity toResolvedPolarisEntity(
+      PolarisCallContext callCtx, PolarisBaseEntity e, BasePersistence ms) {
+    if (e == null) {
+      return null;
+    } else {
+      // load the grant records
+      final List<PolarisGrantRecord> grantRecordsAsSecurable =
+          ms.loadAllGrantRecordsOnSecurable(callCtx, e.getCatalogId(), 
e.getId());
+      final List<PolarisGrantRecord> grantRecordsAsGrantee =
+          e.getType().isGrantee()
+              ? ms.loadAllGrantRecordsOnGrantee(callCtx, e.getCatalogId(), 
e.getId())
+              : List.of();
+      return new ResolvedPolarisEntity(
+          PolarisEntity.of(e), grantRecordsAsGrantee, grantRecordsAsSecurable);
+    }
+  }
+
   /** {@inheritDoc} */
   @Override
   public @Nonnull ResolvedEntityResult refreshResolvedEntity(
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
index 0d2857743..245cd6dcc 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
@@ -137,7 +137,7 @@ public interface PolarisMetaStoreManager
   /**
    * Load full entities matching the given criteria with pagination. If only 
the entity name/id/type
    * is required, use {@link #listEntities} instead. If no pagination is 
required, use {@link
-   * #loadFullEntitiesAll} instead.
+   * #listFullEntitiesAll} instead.
    *
    * @param callCtx call context
    * @param catalogPath path inside a catalog. If null or empty, the entities 
to list are top-level,
@@ -166,7 +166,7 @@ public interface PolarisMetaStoreManager
    * @param entitySubType subType of entities to list (or ANY_SUBTYPE)
    * @return list of all matching entities
    */
-  default @Nonnull List<PolarisBaseEntity> loadFullEntitiesAll(
+  default @Nonnull List<PolarisBaseEntity> listFullEntitiesAll(
       @Nonnull PolarisCallContext callCtx,
       @Nullable List<PolarisEntityCore> catalogPath,
       @Nonnull PolarisEntityType entityType,
@@ -434,6 +434,23 @@ public interface PolarisMetaStoreManager
       @Nonnull PolarisCallContext callCtx,
       @Nonnull List<EntityNameLookupRecord> entityLookupRecords);
 
+  /**
+   * Load a batch of resolved entities of a specified entity type given their 
{@link
+   * PolarisEntityId}. Will return an empty list if the input list is empty. 
Order in that returned
+   * list is the same as the input list. Some elements might be NULL if the 
entity has been dropped.
+   *
+   * @param callCtx call context
+   * @param entityType the type of entities to load
+   * @param entityIds the list of entity ids to load
+   * @return a non-null list of entities corresponding to the lookup keys. 
Some elements might be
+   *     NULL if the entity has been dropped.
+   */
+  @Nonnull
+  ResolvedEntitiesResult loadResolvedEntities(
+      @Nonnull PolarisCallContext callCtx,
+      @Nonnull PolarisEntityType entityType,
+      @Nonnull List<PolarisEntityId> entityIds);
+
   /**
    * Refresh a resolved entity from the backend store. Will return NULL if the 
entity does not
    * exist, i.e. has been purged or dropped. Else, will determine what has 
changed based on the
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
index 238d29a89..add217f05 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
@@ -389,6 +389,16 @@ public class TransactionWorkspaceMetaStoreManager 
implements PolarisMetaStoreMan
     return null;
   }
 
+  @Nonnull
+  @Override
+  public ResolvedEntitiesResult loadResolvedEntities(
+      @Nonnull PolarisCallContext callCtx,
+      @Nonnull PolarisEntityType entityType,
+      @Nonnull List<PolarisEntityId> entityIds) {
+    diagnostics.fail("illegal_method_in_transaction_workspace", 
"loadResolvedEntities");
+    return null;
+  }
+
   @Override
   public ResolvedEntityResult refreshResolvedEntity(
       @Nonnull PolarisCallContext callCtx,
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
index cd438c995..93dc87ac9 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
@@ -20,8 +20,11 @@ package org.apache.polaris.core.persistence.cache;
 
 import jakarta.annotation.Nonnull;
 import jakarta.annotation.Nullable;
+import java.util.List;
 import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.entity.EntityNameLookupRecord;
 import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisEntityId;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
 
@@ -80,4 +83,36 @@ public interface EntityCache {
   @Nullable
   EntityCacheLookupResult getOrLoadEntityByName(
       @Nonnull PolarisCallContext callContext, @Nonnull EntityCacheByNameKey 
entityNameKey);
+
+  /**
+   * Load multiple entities by id, returning those found in the cache and 
loading those not found.
+   *
+   * @param callCtx the Polaris call context
+   * @param entityType the entity type
+   * @param entityIds the list of entity ids to load
+   * @return the list of resolved entities, in the same order as the requested 
entity ids. As in
+   *     {@link
+   *     
org.apache.polaris.core.persistence.PolarisMetaStoreManager#loadResolvedEntities(PolarisCallContext,
+   *     PolarisEntityType, List)}, elements in the returned list may be null 
if the corresponding
+   *     entity id does not exist.
+   */
+  List<EntityCacheLookupResult> getOrLoadResolvedEntities(
+      @Nonnull PolarisCallContext callCtx,
+      @Nonnull PolarisEntityType entityType,
+      @Nonnull List<PolarisEntityId> entityIds);
+
+  /**
+   * Load multiple entities by {@link EntityNameLookupRecord}, returning those 
found in the cache
+   * and loading those not found.
+   *
+   * @param callCtx the Polaris call context
+   * @param lookupRecords the list of entity name to load
+   * @return the list of resolved entities, in the same order as the requested 
entity records. As in
+   *     {@link
+   *     
org.apache.polaris.core.persistence.PolarisMetaStoreManager#loadResolvedEntities(PolarisCallContext,
+   *     PolarisEntityType, List)}, elements in the returned list may be null 
if the corresponding
+   *     entity id does not exist.
+   */
+  List<EntityCacheLookupResult> getOrLoadResolvedEntities(
+      @Nonnull PolarisCallContext callCtx, @Nonnull 
List<EntityNameLookupRecord> lookupRecords);
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
index efac734ed..59ecb1977 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
@@ -24,24 +24,39 @@ import com.github.benmanes.caffeine.cache.RemovalListener;
 import jakarta.annotation.Nonnull;
 import jakarta.annotation.Nullable;
 import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDiagnostics;
 import org.apache.polaris.core.config.BehaviorChangeConfiguration;
 import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.config.RealmConfig;
+import org.apache.polaris.core.entity.EntityNameLookupRecord;
 import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisChangeTrackingVersions;
+import org.apache.polaris.core.entity.PolarisEntityId;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.entity.PolarisGrantRecord;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
 import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
+import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
 import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** An in-memory entity cache with a limit of 100k entities and a 1h TTL. */
 public class InMemoryEntityCache implements EntityCache {
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(InMemoryEntityCache.class);
   private EntityCacheMode cacheMode;
   private final PolarisDiagnostics diagnostics;
   private final PolarisMetaStoreManager polarisMetaStoreManager;
@@ -473,4 +488,152 @@ public class InMemoryEntityCache implements EntityCache {
     // return what we found
     return new EntityCacheLookupResult(entry, cacheHit);
   }
+
+  @Override
+  public List<EntityCacheLookupResult> getOrLoadResolvedEntities(
+      @Nonnull PolarisCallContext callCtx,
+      @Nonnull PolarisEntityType entityType,
+      @Nonnull List<PolarisEntityId> entityIds) {
+    // use a map to collect cached entries to avoid concurrency problems in 
case a second thread is
+    // trying to populate
+    // the cache from a different snapshot
+    Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities = new 
HashMap<>();
+    for (int i = 0; i < 100; i++) {
+      Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc =
+          idsToLoad -> polarisMetaStoreManager.loadResolvedEntities(callCtx, 
entityType, idsToLoad);
+      if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc)) 
{
+        break;
+      }
+    }
+
+    return entityIds.stream()
+        .map(
+            id -> {
+              ResolvedPolarisEntity entity = resolvedEntities.get(id);
+              return entity == null ? null : new 
EntityCacheLookupResult(entity, true);
+            })
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<EntityCacheLookupResult> getOrLoadResolvedEntities(
+      @Nonnull PolarisCallContext callCtx, @Nonnull 
List<EntityNameLookupRecord> lookupRecords) {
+    Map<PolarisEntityId, EntityNameLookupRecord> entityIdMap =
+        lookupRecords.stream()
+            .collect(
+                Collectors.toMap(
+                    e -> new PolarisEntityId(e.getCatalogId(), e.getId()),
+                    Function.identity(),
+                    (a, b) -> a));
+    Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc =
+        idsToLoad ->
+            polarisMetaStoreManager.loadResolvedEntities(
+                callCtx, 
idsToLoad.stream().map(entityIdMap::get).collect(Collectors.toList()));
+
+    // use a map to collect cached entries to avoid concurrency problems in 
case a second thread is
+    // trying to populate
+    // the cache from a different snapshot
+    Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities = new 
HashMap<>();
+    List<PolarisEntityId> entityIds =
+        lookupRecords.stream()
+            .map(e -> new PolarisEntityId(e.getCatalogId(), e.getId()))
+            .collect(Collectors.toList());
+    for (int i = 0; i < 100; i++) {
+      if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc)) 
{
+        break;
+      }
+    }
+
+    return lookupRecords.stream()
+        .map(
+            lookupRecord -> {
+              ResolvedPolarisEntity entity =
+                  resolvedEntities.get(
+                      new PolarisEntityId(lookupRecord.getCatalogId(), 
lookupRecord.getId()));
+              return entity == null ? null : new 
EntityCacheLookupResult(entity, true);
+            })
+        .collect(Collectors.toList());
+  }
+
+  private boolean isCacheStateValid(
+      @Nonnull PolarisCallContext callCtx,
+      @Nonnull Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities,
+      @Nonnull List<PolarisEntityId> entityIds,
+      @Nonnull Function<List<PolarisEntityId>, ResolvedEntitiesResult> 
loaderFunc) {
+    ChangeTrackingResult changeTrackingResult =
+        polarisMetaStoreManager.loadEntitiesChangeTracking(callCtx, entityIds);
+    List<PolarisEntityId> idsToLoad = new ArrayList<>();
+    if (changeTrackingResult.isSuccess()) {
+      idsToLoad.addAll(validateCacheEntries(entityIds, resolvedEntities, 
changeTrackingResult));
+    } else {
+      idsToLoad.addAll(entityIds);
+    }
+    if (!idsToLoad.isEmpty()) {
+      ResolvedEntitiesResult resolvedEntitiesResult = 
loaderFunc.apply(idsToLoad);
+      if (resolvedEntitiesResult.isSuccess()) {
+        LOGGER.debug("Resolved entities - validating cache");
+        resolvedEntitiesResult.getResolvedEntities().stream()
+            .filter(Objects::nonNull)
+            .forEach(
+                e -> {
+                  this.cacheNewEntry(e);
+                  resolvedEntities.put(
+                      new PolarisEntityId(e.getEntity().getCatalogId(), 
e.getEntity().getId()), e);
+                });
+      }
+    }
+
+    // the loader function should always return a batch of results from the 
same "snapshot" of the
+    // persistence, so
+    // if the changeTracking call above failed, we should have loaded the 
entire batch in one shot.
+    // There should be no
+    // need to revalidate the entities.
+    List<PolarisEntityId> idsToReload =
+        changeTrackingResult.isSuccess()
+            ? validateCacheEntries(entityIds, resolvedEntities, 
changeTrackingResult)
+            : List.of();
+    return idsToReload.isEmpty();
+  }
+
+  private List<PolarisEntityId> validateCacheEntries(
+      List<PolarisEntityId> entityIds,
+      Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities,
+      ChangeTrackingResult changeTrackingResult) {
+    List<PolarisEntityId> idsToReload = new ArrayList<>();
+    Iterator<PolarisEntityId> idIterator = entityIds.iterator();
+    Iterator<PolarisChangeTrackingVersions> changeTrackingIterator =
+        changeTrackingResult.getChangeTrackingVersions().iterator();
+    while (idIterator.hasNext() && changeTrackingIterator.hasNext()) {
+      PolarisEntityId entityId = idIterator.next();
+      PolarisChangeTrackingVersions changeTrackingVersions = 
changeTrackingIterator.next();
+      if (changeTrackingVersions == null) {
+        // entity has been purged
+        ResolvedPolarisEntity cachedEntity = getEntityById(entityId.getId());
+        if (cachedEntity != null || resolvedEntities.containsKey(entityId)) {
+          LOGGER.debug("Entity {} has been purged, removing from cache", 
entityId);
+          Optional.ofNullable(cachedEntity).ifPresent(this::removeCacheEntry);
+          resolvedEntities.remove(entityId);
+        }
+        continue;
+      }
+      // compare versions using equals rather than less than so we can use the 
same function to
+      // validate that the cache
+      // entries are consistent with a single call to the change tracking 
table, rather than some
+      // grants ahead and some
+      // grants behind
+      ResolvedPolarisEntity cachedEntity =
+          resolvedEntities.computeIfAbsent(entityId, id -> 
this.getEntityById(id.getId()));
+      if (cachedEntity == null
+          || cachedEntity.getEntity().getEntityVersion()
+              != changeTrackingVersions.getEntityVersion()
+          || cachedEntity.getEntity().getGrantRecordsVersion()
+              != changeTrackingVersions.getGrantRecordsVersion()) {
+        idsToReload.add(entityId);
+      } else {
+        resolvedEntities.put(entityId, cachedEntity);
+      }
+    }
+    LOGGER.debug("Cache entries {} need to be reloaded", idsToReload);
+    return idsToReload;
+  }
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
index 45a02027e..899a82016 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
@@ -2310,24 +2310,19 @@ public class TransactionalMetaStoreManagerImpl extends 
BaseMetaStoreManager {
     return result;
   }
 
-  /** Refer to {@link #loadEntity(PolarisCallContext, long, long, 
PolarisEntityType)} */
-  private @Nonnull ResolvedEntitiesResult loadResolvedEntities(
-      @Nonnull PolarisCallContext callCtx,
-      @Nonnull TransactionalPersistence ms,
-      @Nonnull List<EntityNameLookupRecord> entityLookupRecords) {
-    List<PolarisBaseEntity> entities =
-        ms.lookupEntitiesInCurrentTxn(
-            callCtx,
-            entityLookupRecords.stream()
-                .map(r -> new PolarisEntityId(r.getCatalogId(), r.getId()))
-                .collect(Collectors.toList()));
+  private static ResolvedEntitiesResult getResolvedEntitiesResult(
+      PolarisCallContext callCtx,
+      TransactionalPersistence ms,
+      List<PolarisEntityId> entityIds,
+      Function<Integer, PolarisEntityType> entityTypeForIndex) {
+    List<PolarisBaseEntity> entities = ms.lookupEntitiesInCurrentTxn(callCtx, 
entityIds);
     // mimic the behavior of loadEntity above, return null if not found or 
type mismatch
     List<ResolvedPolarisEntity> ret =
-        IntStream.range(0, entityLookupRecords.size())
+        IntStream.range(0, entityIds.size())
             .mapToObj(
                 i -> {
                   if (entities.get(i) != null
-                      && 
!entities.get(i).getType().equals(entityLookupRecords.get(i).getType())) {
+                      && 
!entities.get(i).getType().equals(entityTypeForIndex.apply(i))) {
                     return null;
                   } else {
                     return entities.get(i);
@@ -2361,11 +2356,25 @@ public class TransactionalMetaStoreManagerImpl extends 
BaseMetaStoreManager {
       @Nonnull PolarisCallContext callCtx,
       @Nonnull List<EntityNameLookupRecord> entityLookupRecords) {
     TransactionalPersistence ms = ((TransactionalPersistence) 
callCtx.getMetaStore());
+    List<PolarisEntityId> entityIds =
+        entityLookupRecords.stream()
+            .map(r -> new PolarisEntityId(r.getCatalogId(), r.getId()))
+            .collect(Collectors.toList());
+    Function<Integer, PolarisEntityType> entityTypeForIndex =
+        i -> entityLookupRecords.get(i).getType();
     return ms.runInReadTransaction(
-        callCtx,
-        () ->
-            this.loadResolvedEntities(
-                callCtx, (TransactionalPersistence) callCtx.getMetaStore(), 
entityLookupRecords));
+        callCtx, () -> getResolvedEntitiesResult(callCtx, ms, entityIds, 
entityTypeForIndex));
+  }
+
+  @Nonnull
+  @Override
+  public ResolvedEntitiesResult loadResolvedEntities(
+      @Nonnull PolarisCallContext callCtx,
+      @Nonnull PolarisEntityType entityType,
+      @Nonnull List<PolarisEntityId> entityIds) {
+    TransactionalPersistence ms = ((TransactionalPersistence) 
callCtx.getMetaStore());
+    return ms.runInReadTransaction(
+        callCtx, () -> getResolvedEntitiesResult(callCtx, ms, entityIds, i -> 
entityType));
   }
 
   /** {@inheritDoc} */
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
index d750c4821..baf20c86a 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
@@ -19,16 +19,26 @@
 package org.apache.polaris.core.persistence.cache;
 
 import static 
org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS;
+import static org.assertj.core.api.Assertions.assertThat;
 
 import java.time.Clock;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
 import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.entity.EntityNameLookupRecord;
 import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisChangeTrackingVersions;
 import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityId;
 import org.apache.polaris.core.entity.PolarisEntitySubType;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.entity.PolarisGrantRecord;
@@ -37,17 +47,21 @@ import 
org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
 import org.apache.polaris.core.persistence.PolarisTestMetaStoreManager;
 import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
+import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
 import 
org.apache.polaris.core.persistence.transactional.TransactionalMetaStoreManagerImpl;
 import 
org.apache.polaris.core.persistence.transactional.TransactionalPersistence;
 import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore;
 import 
org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPersistenceImpl;
-import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Unit testing of the entity cache */
 public class InMemoryEntityCacheTest {
 
+  public static final Logger LOGGER = 
LoggerFactory.getLogger(InMemoryEntityCache.class);
   private final PolarisDiagnostics diagServices;
   private final PolarisCallContext callCtx;
   private final PolarisTestMetaStoreManager tm;
@@ -105,31 +119,31 @@ public class InMemoryEntityCacheTest {
     EntityCacheLookupResult lookup =
         cache.getOrLoadEntityByName(
             this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
-    Assertions.assertThat(lookup).isNotNull();
-    Assertions.assertThat(lookup.isCacheHit()).isFalse();
-    Assertions.assertThat(lookup.getCacheEntry()).isNotNull();
+    assertThat(lookup).isNotNull();
+    assertThat(lookup.isCacheHit()).isFalse();
+    assertThat(lookup.getCacheEntry()).isNotNull();
 
     // validate the cache entry
     PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
-    Assertions.assertThat(catalog).isNotNull();
-    
Assertions.assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG);
+    assertThat(catalog).isNotNull();
+    assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG);
 
     // do it again, should be found in the cache
     lookup =
         cache.getOrLoadEntityByName(
             this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
-    Assertions.assertThat(lookup).isNotNull();
-    Assertions.assertThat(lookup.isCacheHit()).isTrue();
+    assertThat(lookup).isNotNull();
+    assertThat(lookup.isCacheHit()).isTrue();
 
     // do it again by id, should be found in the cache
     lookup =
         cache.getOrLoadEntityById(
             this.callCtx, catalog.getCatalogId(), catalog.getId(), 
catalog.getType());
-    Assertions.assertThat(lookup).isNotNull();
-    Assertions.assertThat(lookup.isCacheHit()).isTrue();
-    Assertions.assertThat(lookup.getCacheEntry()).isNotNull();
-    Assertions.assertThat(lookup.getCacheEntry().getEntity()).isNotNull();
-    
Assertions.assertThat(lookup.getCacheEntry().getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(lookup).isNotNull();
+    assertThat(lookup.isCacheHit()).isTrue();
+    assertThat(lookup.getCacheEntry()).isNotNull();
+    assertThat(lookup.getCacheEntry().getEntity()).isNotNull();
+    
assertThat(lookup.getCacheEntry().getGrantRecordsAsSecurable()).isNotNull();
 
     // get N1
     PolarisBaseEntity N1 =
@@ -140,128 +154,119 @@ public class InMemoryEntityCacheTest {
         new EntityCacheByNameKey(
             catalog.getId(), catalog.getId(), PolarisEntityType.NAMESPACE, 
"N1");
     ResolvedPolarisEntity cacheEntry = cache.getEntityByName(N1_name);
-    Assertions.assertThat(cacheEntry).isNull();
+    assertThat(cacheEntry).isNull();
 
     // try to find it in the cache by id. Should not be there, i.e. no cache 
hit
     lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), 
N1.getId(), N1.getType());
-    Assertions.assertThat(lookup).isNotNull();
-    Assertions.assertThat(lookup.isCacheHit()).isFalse();
+    assertThat(lookup).isNotNull();
+    assertThat(lookup.isCacheHit()).isFalse();
 
     // should be there now, by name
     cacheEntry = cache.getEntityByName(N1_name);
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getEntity()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
 
     // should be there now, by id
     cacheEntry = cache.getEntityById(N1.getId());
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getEntity()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
 
     // lookup N1
     ResolvedPolarisEntity N1_entry = cache.getEntityById(N1.getId());
-    Assertions.assertThat(N1_entry).isNotNull();
-    Assertions.assertThat(N1_entry.getEntity()).isNotNull();
-    Assertions.assertThat(N1_entry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(N1_entry).isNotNull();
+    assertThat(N1_entry.getEntity()).isNotNull();
+    assertThat(N1_entry.getGrantRecordsAsSecurable()).isNotNull();
 
     // negative tests, load an entity which does not exist
     lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), 10000, 
N1.getType());
-    Assertions.assertThat(lookup).isNull();
+    assertThat(lookup).isNull();
     lookup =
         cache.getOrLoadEntityByName(
             this.callCtx,
             new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"non_existant_catalog"));
-    Assertions.assertThat(lookup).isNull();
+    assertThat(lookup).isNull();
 
     // lookup N2 to validate grants
     EntityCacheByNameKey N2_name =
         new EntityCacheByNameKey(catalog.getId(), N1.getId(), 
PolarisEntityType.NAMESPACE, "N2");
     lookup = cache.getOrLoadEntityByName(callCtx, N2_name);
-    Assertions.assertThat(lookup).isNotNull();
+    assertThat(lookup).isNotNull();
     ResolvedPolarisEntity cacheEntry_N1 = lookup.getCacheEntry();
-    Assertions.assertThat(cacheEntry_N1).isNotNull();
-    Assertions.assertThat(cacheEntry_N1.getEntity()).isNotNull();
-    
Assertions.assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry_N1).isNotNull();
+    assertThat(cacheEntry_N1.getEntity()).isNotNull();
+    assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).isNotNull();
 
     // lookup catalog role R1
     EntityCacheByNameKey R1_name =
         new EntityCacheByNameKey(
             catalog.getId(), catalog.getId(), PolarisEntityType.CATALOG_ROLE, 
"R1");
     lookup = cache.getOrLoadEntityByName(callCtx, R1_name);
-    Assertions.assertThat(lookup).isNotNull();
+    assertThat(lookup).isNotNull();
     ResolvedPolarisEntity cacheEntry_R1 = lookup.getCacheEntry();
-    Assertions.assertThat(cacheEntry_R1).isNotNull();
-    Assertions.assertThat(cacheEntry_R1.getEntity()).isNotNull();
-    
Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).isNotNull();
-    
Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).isNotNull();
+    assertThat(cacheEntry_R1).isNotNull();
+    assertThat(cacheEntry_R1.getEntity()).isNotNull();
+    assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).isNotNull();
 
     // we expect one TABLE_READ grant on that securable granted to the catalog 
role R1
-    
Assertions.assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).hasSize(1);
+    assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).hasSize(1);
     PolarisGrantRecord gr = cacheEntry_N1.getGrantRecordsAsSecurable().get(0);
 
     // securable is N1, grantee is R1
-    
Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId());
-    Assertions.assertThat(gr.getGranteeCatalogId())
-        .isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
-    
Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId());
-    Assertions.assertThat(gr.getSecurableCatalogId())
-        .isEqualTo(cacheEntry_N1.getEntity().getCatalogId());
-    Assertions.assertThat(gr.getPrivilegeCode())
-        .isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode());
+    assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId());
+    
assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
+    
assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId());
+    
assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_N1.getEntity().getCatalogId());
+    
assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode());
 
     // R1 should have 4 privileges granted to it
-    Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).hasSize(4);
+    assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).hasSize(4);
     List<PolarisGrantRecord> matchPriv =
         cacheEntry_R1.getGrantRecordsAsGrantee().stream()
             .filter(
                 grantRecord ->
                     grantRecord.getPrivilegeCode() == 
PolarisPrivilege.TABLE_READ_DATA.getCode())
             .collect(Collectors.toList());
-    Assertions.assertThat(matchPriv).hasSize(1);
+    assertThat(matchPriv).hasSize(1);
     gr = matchPriv.get(0);
-    
Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId());
-    Assertions.assertThat(gr.getGranteeCatalogId())
-        .isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
-    
Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId());
-    Assertions.assertThat(gr.getSecurableCatalogId())
-        .isEqualTo(cacheEntry_N1.getEntity().getCatalogId());
-    Assertions.assertThat(gr.getPrivilegeCode())
-        .isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode());
+    assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId());
+    
assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
+    
assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId());
+    
assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_N1.getEntity().getCatalogId());
+    
assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode());
 
     // lookup principal role PR1
     EntityCacheByNameKey PR1_name =
         new EntityCacheByNameKey(PolarisEntityType.PRINCIPAL_ROLE, "PR1");
     lookup = cache.getOrLoadEntityByName(callCtx, PR1_name);
-    Assertions.assertThat(lookup).isNotNull();
+    assertThat(lookup).isNotNull();
     ResolvedPolarisEntity cacheEntry_PR1 = lookup.getCacheEntry();
-    Assertions.assertThat(cacheEntry_PR1).isNotNull();
-    Assertions.assertThat(cacheEntry_PR1.getEntity()).isNotNull();
-    
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).isNotNull();
-    
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).isNotNull();
+    assertThat(cacheEntry_PR1).isNotNull();
+    assertThat(cacheEntry_PR1.getEntity()).isNotNull();
+    assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).isNotNull();
 
     // R1 should have 1 CATALOG_ROLE_USAGE privilege granted *on* it to PR1
-    
Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).hasSize(1);
+    assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).hasSize(1);
     gr = cacheEntry_R1.getGrantRecordsAsSecurable().get(0);
-    
Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_R1.getEntity().getId());
-    Assertions.assertThat(gr.getSecurableCatalogId())
-        .isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
-    
Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_PR1.getEntity().getId());
-    Assertions.assertThat(gr.getGranteeCatalogId())
-        .isEqualTo(cacheEntry_PR1.getEntity().getCatalogId());
-    Assertions.assertThat(gr.getPrivilegeCode())
-        .isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode());
+    
assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_R1.getEntity().getId());
+    
assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
+    
assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_PR1.getEntity().getId());
+    
assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_PR1.getEntity().getCatalogId());
+    
assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode());
 
     // PR1 should have 1 grant on it to P1.
-    
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).hasSize(1);
-    
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable().get(0).getPrivilegeCode())
+    assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).hasSize(1);
+    
assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable().get(0).getPrivilegeCode())
         .isEqualTo(PolarisPrivilege.PRINCIPAL_ROLE_USAGE.getCode());
 
     // PR1 should have 2 grants to it, on R1 and R2
-    
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).hasSize(2);
-    
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(0).getPrivilegeCode())
+    assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).hasSize(2);
+    
assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(0).getPrivilegeCode())
         .isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode());
-    
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(1).getPrivilegeCode())
+    
assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(1).getPrivilegeCode())
         .isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode());
   }
 
@@ -274,14 +279,14 @@ public class InMemoryEntityCacheTest {
     EntityCacheLookupResult lookup =
         cache.getOrLoadEntityByName(
             this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
-    Assertions.assertThat(lookup).isNotNull();
-    Assertions.assertThat(lookup.isCacheHit()).isFalse();
+    assertThat(lookup).isNotNull();
+    assertThat(lookup.isCacheHit()).isFalse();
 
     // the catalog
-    Assertions.assertThat(lookup.getCacheEntry()).isNotNull();
+    assertThat(lookup.getCacheEntry()).isNotNull();
     PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
-    Assertions.assertThat(catalog).isNotNull();
-    
Assertions.assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG);
+    assertThat(catalog).isNotNull();
+    assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG);
 
     // find table N5/N6/T6
     PolarisBaseEntity N5 =
@@ -294,23 +299,23 @@ public class InMemoryEntityCacheTest {
             PolarisEntityType.TABLE_LIKE,
             PolarisEntitySubType.ICEBERG_TABLE,
             "T6");
-    Assertions.assertThat(T6v1).isNotNull();
+    assertThat(T6v1).isNotNull();
 
     // that table is not in the cache
     ResolvedPolarisEntity cacheEntry = cache.getEntityById(T6v1.getId());
-    Assertions.assertThat(cacheEntry).isNull();
+    assertThat(cacheEntry).isNull();
 
     // now load that table in the cache
     cacheEntry =
         cache.getAndRefreshIfNeeded(
             this.callCtx, T6v1, T6v1.getEntityVersion(), 
T6v1.getGrantRecordsVersion());
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getEntity()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
     PolarisBaseEntity table = cacheEntry.getEntity();
-    Assertions.assertThat(table.getId()).isEqualTo(T6v1.getId());
-    
Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
-    
Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion());
+    assertThat(table.getId()).isEqualTo(T6v1.getId());
+    assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
+    
assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion());
 
     // update the entity
     PolarisBaseEntity T6v2 =
@@ -319,31 +324,31 @@ public class InMemoryEntityCacheTest {
             T6v1,
             "{\"v2_properties\": \"some value\"}",
             "{\"v2_internal_properties\": \"internal value\"}");
-    Assertions.assertThat(T6v2).isNotNull();
+    assertThat(T6v2).isNotNull();
 
     // now refresh that entity. But because we don't change the versions, 
nothing should be reloaded
     cacheEntry =
         cache.getAndRefreshIfNeeded(
             this.callCtx, T6v1, T6v1.getEntityVersion(), 
T6v1.getGrantRecordsVersion());
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getEntity()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
     table = cacheEntry.getEntity();
-    Assertions.assertThat(table.getId()).isEqualTo(T6v1.getId());
-    
Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
-    
Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion());
+    assertThat(table.getId()).isEqualTo(T6v1.getId());
+    assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
+    
assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion());
 
     // now refresh again, this time with the new versions. Should be reloaded
     cacheEntry =
         cache.getAndRefreshIfNeeded(
             this.callCtx, T6v2, T6v2.getEntityVersion(), 
T6v2.getGrantRecordsVersion());
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getEntity()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
     table = cacheEntry.getEntity();
-    Assertions.assertThat(table.getId()).isEqualTo(T6v2.getId());
-    
Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v2.getEntityVersion());
-    
Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v2.getGrantRecordsVersion());
+    assertThat(table.getId()).isEqualTo(T6v2.getId());
+    assertThat(table.getEntityVersion()).isEqualTo(T6v2.getEntityVersion());
+    
assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v2.getGrantRecordsVersion());
 
     // update it again
     PolarisBaseEntity T6v3 =
@@ -352,7 +357,7 @@ public class InMemoryEntityCacheTest {
             T6v2,
             "{\"v3_properties\": \"some value\"}",
             "{\"v3_internal_properties\": \"internal value\"}");
-    Assertions.assertThat(T6v3).isNotNull();
+    assertThat(T6v3).isNotNull();
 
     // the two catalog roles
     PolarisBaseEntity R1 =
@@ -368,9 +373,9 @@ public class InMemoryEntityCacheTest {
             this.callCtx, N2, N2.getEntityVersion(), 
N2.getGrantRecordsVersion());
 
     // should have one single grant
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1);
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1);
 
     // perform an additional grant to R1
     this.tm.grantPrivilege(R1, List.of(catalog, N1), N2, 
PolarisPrivilege.NAMESPACE_FULL_METADATA);
@@ -380,8 +385,8 @@ public class InMemoryEntityCacheTest {
         this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N2");
 
     // same entity version but different grant records
-    Assertions.assertThat(N2v2).isNotNull();
-    
Assertions.assertThat(N2v2.getGrantRecordsVersion()).isEqualTo(N2.getGrantRecordsVersion()
 + 1);
+    assertThat(N2v2).isNotNull();
+    
assertThat(N2v2.getGrantRecordsVersion()).isEqualTo(N2.getGrantRecordsVersion() 
+ 1);
 
     // the cache is outdated now
     lookup =
@@ -389,24 +394,24 @@ public class InMemoryEntityCacheTest {
             this.callCtx,
             new EntityCacheByNameKey(
                 catalog.getId(), N1.getId(), PolarisEntityType.NAMESPACE, 
"N2"));
-    Assertions.assertThat(lookup).isNotNull();
+    assertThat(lookup).isNotNull();
     cacheEntry = lookup.getCacheEntry();
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1);
-    Assertions.assertThat(cacheEntry.getEntity().getGrantRecordsVersion())
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getEntity()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1);
+    assertThat(cacheEntry.getEntity().getGrantRecordsVersion())
         .isEqualTo(N2.getGrantRecordsVersion());
 
     // now refresh
     cacheEntry =
         cache.getAndRefreshIfNeeded(
             this.callCtx, N2, N2v2.getEntityVersion(), 
N2v2.getGrantRecordsVersion());
-    Assertions.assertThat(cacheEntry).isNotNull();
-    Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
-    Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(2);
-    Assertions.assertThat(cacheEntry.getEntity().getGrantRecordsVersion())
+    assertThat(cacheEntry).isNotNull();
+    assertThat(cacheEntry.getEntity()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(2);
+    assertThat(cacheEntry.getEntity().getGrantRecordsVersion())
         .isEqualTo(N2v2.getGrantRecordsVersion());
   }
 
@@ -418,23 +423,23 @@ public class InMemoryEntityCacheTest {
     EntityCacheLookupResult lookup =
         cache.getOrLoadEntityByName(
             this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
-    Assertions.assertThat(lookup).isNotNull();
-    Assertions.assertThat(lookup.getCacheEntry()).isNotNull();
+    assertThat(lookup).isNotNull();
+    assertThat(lookup.getCacheEntry()).isNotNull();
     PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
 
     PolarisBaseEntity N1 =
         this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N1");
     lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), 
N1.getId(), N1.getType());
-    Assertions.assertThat(lookup).isNotNull();
+    assertThat(lookup).isNotNull();
 
     EntityCacheByNameKey T4_name =
         new EntityCacheByNameKey(N1.getCatalogId(), N1.getId(), 
PolarisEntityType.TABLE_LIKE, "T4");
     lookup = cache.getOrLoadEntityByName(callCtx, T4_name);
-    Assertions.assertThat(lookup).isNotNull();
+    assertThat(lookup).isNotNull();
     ResolvedPolarisEntity cacheEntry_T4 = lookup.getCacheEntry();
-    Assertions.assertThat(cacheEntry_T4).isNotNull();
-    Assertions.assertThat(cacheEntry_T4.getEntity()).isNotNull();
-    
Assertions.assertThat(cacheEntry_T4.getGrantRecordsAsSecurable()).isNotNull();
+    assertThat(cacheEntry_T4).isNotNull();
+    assertThat(cacheEntry_T4.getEntity()).isNotNull();
+    assertThat(cacheEntry_T4.getGrantRecordsAsSecurable()).isNotNull();
 
     PolarisBaseEntity T4 = cacheEntry_T4.getEntity();
 
@@ -445,18 +450,17 @@ public class InMemoryEntityCacheTest {
         new EntityCacheByNameKey(
             N1.getCatalogId(), N1.getId(), PolarisEntityType.TABLE_LIKE, 
"T4_renamed");
     lookup = cache.getOrLoadEntityByName(callCtx, T4_renamed);
-    Assertions.assertThat(lookup).isNotNull();
+    assertThat(lookup).isNotNull();
     ResolvedPolarisEntity cacheEntry_T4_renamed = lookup.getCacheEntry();
-    Assertions.assertThat(cacheEntry_T4_renamed).isNotNull();
+    assertThat(cacheEntry_T4_renamed).isNotNull();
     PolarisBaseEntity T4_renamed_entity = cacheEntry_T4_renamed.getEntity();
 
     // new entry if lookup by id
     EntityCacheLookupResult lookupResult =
         cache.getOrLoadEntityById(callCtx, T4.getCatalogId(), T4.getId(), 
T4.getType());
-    Assertions.assertThat(lookupResult).isNotNull();
-    Assertions.assertThat(lookupResult.getCacheEntry()).isNotNull();
-    Assertions.assertThat(lookupResult.getCacheEntry().getEntity().getName())
-        .isEqualTo("T4_renamed");
+    assertThat(lookupResult).isNotNull();
+    assertThat(lookupResult.getCacheEntry()).isNotNull();
+    
assertThat(lookupResult.getCacheEntry().getEntity().getName()).isEqualTo("T4_renamed");
 
     // old name is gone, replaced by new name
     // Assertions.assertNull(cache.getOrLoadEntityByName(callCtx, T4_name));
@@ -469,7 +473,7 @@ public class InMemoryEntityCacheTest {
         T4_renamed_entity.getGrantRecordsVersion());
 
     // now the loading by the old name should return null
-    Assertions.assertThat(cache.getOrLoadEntityByName(callCtx, 
T4_name)).isNull();
+    assertThat(cache.getOrLoadEntityByName(callCtx, T4_name)).isNull();
   }
 
   /* Helper for `testEntityWeigher` */
@@ -490,7 +494,582 @@ public class InMemoryEntityCacheTest {
             .setMetadataLocation("a".repeat(1000 * 1000))
             .build();
 
-    
Assertions.assertThat(getEntityWeight(smallEntity)).isLessThan(getEntityWeight(mediumEntity));
-    
Assertions.assertThat(getEntityWeight(mediumEntity)).isLessThan(getEntityWeight(largeEntity));
+    
assertThat(getEntityWeight(smallEntity)).isLessThan(getEntityWeight(mediumEntity));
+    
assertThat(getEntityWeight(mediumEntity)).isLessThan(getEntityWeight(largeEntity));
+  }
+
+  @Test
+  public void testBatchLoadByEntityIds() {
+    // get a new cache
+    InMemoryEntityCache cache = this.allocateNewCache();
+
+    // Load catalog into cache
+    EntityCacheLookupResult lookup =
+        cache.getOrLoadEntityByName(
+            this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
+    assertThat(lookup).isNotNull();
+    PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+    // Get some entities that exist in the test setup
+    PolarisBaseEntity N1 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N1");
+    PolarisBaseEntity N5 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N5");
+    PolarisBaseEntity N5_N6 =
+        this.tm.ensureExistsByName(List.of(catalog, N5), 
PolarisEntityType.NAMESPACE, "N6");
+
+    // Pre-load N1 into cache
+    cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), N1.getId(), 
N1.getType());
+
+    // Create list of entity IDs - N1 is already cached, N5, N5_N6 are not
+    List<PolarisEntityId> entityIds =
+        List.of(getPolarisEntityId(N1), getPolarisEntityId(N5), 
getPolarisEntityId(N5_N6));
+
+    // Test batch loading by entity IDs (all are namespaces)
+    List<EntityCacheLookupResult> results =
+        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.NAMESPACE, entityIds);
+
+    // Verify all entities were found
+    assertThat(results).hasSize(3);
+    assertThat(results.get(0)).isNotNull(); // N1 - was cached
+    assertThat(results.get(1)).isNotNull(); // N5 - was loaded
+    assertThat(results.get(2)).isNotNull(); // N5_N6 - was loaded
+
+    // Verify the entities are correct
+    
assertThat(results.get(0).getCacheEntry().getEntity().getId()).isEqualTo(N1.getId());
+    
assertThat(results.get(1).getCacheEntry().getEntity().getId()).isEqualTo(N5.getId());
+    
assertThat(results.get(2).getCacheEntry().getEntity().getId()).isEqualTo(N5_N6.getId());
+
+    // All should be cache hits now since they were loaded in the previous call
+    assertThat(results.get(0).isCacheHit()).isTrue();
+    assertThat(results.get(1).isCacheHit()).isTrue();
+    assertThat(results.get(2).isCacheHit()).isTrue();
+
+    // Test with a non-existent entity ID
+    List<PolarisEntityId> nonExistentIds =
+        List.of(new PolarisEntityId(catalog.getCatalogId(), 99999L));
+    List<EntityCacheLookupResult> nonExistentResults =
+        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.NAMESPACE, nonExistentIds);
+
+    assertThat(nonExistentResults).hasSize(1);
+    assertThat(nonExistentResults.get(0)).isNull();
+
+    // Test with table entities separately
+    PolarisBaseEntity T6 =
+        this.tm.ensureExistsByName(
+            List.of(catalog, N5, N5_N6),
+            PolarisEntityType.TABLE_LIKE,
+            PolarisEntitySubType.ICEBERG_TABLE,
+            "T6");
+
+    List<PolarisEntityId> tableIds = List.of(getPolarisEntityId(T6));
+
+    List<EntityCacheLookupResult> tableResults =
+        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.TABLE_LIKE, tableIds);
+
+    assertThat(tableResults).hasSize(1);
+    assertThat(tableResults.get(0)).isNotNull();
+    
assertThat(tableResults.get(0).getCacheEntry().getEntity().getId()).isEqualTo(T6.getId());
+  }
+
+  @Test
+  public void testBatchLoadByNameLookupRecords() {
+    // get a new cache
+    InMemoryEntityCache cache = this.allocateNewCache();
+
+    // Load catalog into cache
+    EntityCacheLookupResult lookup =
+        cache.getOrLoadEntityByName(
+            this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
+    assertThat(lookup).isNotNull();
+    PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+    // Get some entities that exist in the test setup
+    PolarisBaseEntity N1 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N1");
+    PolarisBaseEntity N2 =
+        this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N2");
+    PolarisBaseEntity R1 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.CATALOG_ROLE, "R1");
+
+    // Pre-load N1 into cache by name
+    cache.getOrLoadEntityByName(
+        this.callCtx,
+        new EntityCacheByNameKey(
+            catalog.getId(), catalog.getId(), PolarisEntityType.NAMESPACE, 
"N1"));
+
+    // Create list of EntityNameLookupRecords
+    List<EntityNameLookupRecord> lookupRecords =
+        List.of(
+            new EntityNameLookupRecord(N1), // already cached
+            new EntityNameLookupRecord(N2), // not cached
+            new EntityNameLookupRecord(R1) // not cached
+            );
+
+    // Test batch loading by name lookup records
+    List<EntityCacheLookupResult> results =
+        cache.getOrLoadResolvedEntities(this.callCtx, lookupRecords);
+
+    // Verify all entities were found
+    assertThat(results).hasSize(3);
+    assertThat(results.get(0)).isNotNull(); // N1 - was cached
+    assertThat(results.get(1)).isNotNull(); // N2 - was loaded
+    assertThat(results.get(2)).isNotNull(); // R1 - was loaded
+
+    // Verify the entities are correct
+    
assertThat(results.get(0).getCacheEntry().getEntity().getId()).isEqualTo(N1.getId());
+    
assertThat(results.get(1).getCacheEntry().getEntity().getId()).isEqualTo(N2.getId());
+    
assertThat(results.get(2).getCacheEntry().getEntity().getId()).isEqualTo(R1.getId());
+
+    // All should be cache hits now
+    assertThat(results.get(0).isCacheHit()).isTrue();
+    assertThat(results.get(1).isCacheHit()).isTrue();
+    assertThat(results.get(2).isCacheHit()).isTrue();
+  }
+
+  @Test
+  public void testBatchLoadWithStaleVersions() {
+    // get a new cache
+    InMemoryEntityCache cache = this.allocateNewCache();
+
+    // Load catalog into cache
+    EntityCacheLookupResult lookup =
+        cache.getOrLoadEntityByName(
+            this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
+    assertThat(lookup).isNotNull();
+    PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+    // Get table T6 that we can update
+    PolarisBaseEntity N5 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N5");
+    PolarisBaseEntity N5_N6 =
+        this.tm.ensureExistsByName(List.of(catalog, N5), 
PolarisEntityType.NAMESPACE, "N6");
+    PolarisBaseEntity T6v1 =
+        this.tm.ensureExistsByName(
+            List.of(catalog, N5, N5_N6),
+            PolarisEntityType.TABLE_LIKE,
+            PolarisEntitySubType.ICEBERG_TABLE,
+            "T6");
+
+    // Load T6 into cache initially
+    cache.getOrLoadEntityById(this.callCtx, T6v1.getCatalogId(), T6v1.getId(), 
T6v1.getType());
+
+    // Verify it's in cache with original version
+    ResolvedPolarisEntity cachedT6 = cache.getEntityById(T6v1.getId());
+    assertThat(cachedT6).isNotNull();
+    
assertThat(cachedT6.getEntity().getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
+
+    // Update the entity to create a new version
+    PolarisBaseEntity T6v2 =
+        this.tm.updateEntity(
+            List.of(catalog, N5, N5_N6),
+            T6v1,
+            "{\"v2_properties\": \"some value\"}",
+            "{\"v2_internal_properties\": \"internal value\"}");
+    assertThat(T6v2).isNotNull();
+    assertThat(T6v2.getEntityVersion()).isGreaterThan(T6v1.getEntityVersion());
+
+    // Create entity ID list with the updated entity
+    List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(T6v2));
+
+    // Call batch load - this should detect the stale version and reload
+    List<EntityCacheLookupResult> results =
+        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.TABLE_LIKE, entityIds);
+
+    // Verify the entity was reloaded with the new version
+    assertThat(results).hasSize(1);
+    assertThat(results.get(0)).isNotNull();
+
+    ResolvedPolarisEntity reloadedT6 = results.get(0).getCacheEntry();
+    assertThat(reloadedT6.getEntity().getId()).isEqualTo(T6v2.getId());
+    
assertThat(reloadedT6.getEntity().getEntityVersion()).isEqualTo(T6v2.getEntityVersion());
+
+    // Verify the cache now contains the updated version
+    cachedT6 = cache.getEntityById(T6v2.getId());
+    assertThat(cachedT6).isNotNull();
+    
assertThat(cachedT6.getEntity().getEntityVersion()).isEqualTo(T6v2.getEntityVersion());
+  }
+
+  @Test
+  public void testBatchLoadWithStaleGrantVersions() {
+    // get a new cache
+    InMemoryEntityCache cache = this.allocateNewCache();
+
+    // Load catalog into cache
+    EntityCacheLookupResult lookup =
+        cache.getOrLoadEntityByName(
+            this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
+    assertThat(lookup).isNotNull();
+    PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+    // Get entities we'll work with
+    PolarisBaseEntity N1 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N1");
+    PolarisBaseEntity N2 =
+        this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N2");
+    PolarisBaseEntity R1 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.CATALOG_ROLE, "R1");
+
+    // Load N2 into cache initially
+    cache.getOrLoadEntityByName(
+        this.callCtx,
+        new EntityCacheByNameKey(catalog.getId(), N1.getId(), 
PolarisEntityType.NAMESPACE, "N2"));
+
+    // Verify it's in cache with original grant version
+    ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId());
+    assertThat(cachedN2).isNotNull();
+    int originalGrantVersion = cachedN2.getEntity().getGrantRecordsVersion();
+
+    // Grant additional privilege to change grant version
+    this.tm.grantPrivilege(R1, List.of(catalog, N1), N2, 
PolarisPrivilege.NAMESPACE_FULL_METADATA);
+
+    // Get the updated entity with new grant version
+    PolarisBaseEntity N2Updated =
+        this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N2");
+    
assertThat(N2Updated.getGrantRecordsVersion()).isGreaterThan(originalGrantVersion);
+
+    // Create entity ID list
+    List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(N2Updated));
+
+    // Call batch load - this should detect the stale grant version and reload
+    List<EntityCacheLookupResult> results =
+        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.NAMESPACE, entityIds);
+
+    // Verify the entity was reloaded with the new grant version
+    assertThat(results).hasSize(1);
+    assertThat(results.get(0)).isNotNull();
+
+    ResolvedPolarisEntity reloadedN2 = results.get(0).getCacheEntry();
+    assertThat(reloadedN2.getEntity().getId()).isEqualTo(N2Updated.getId());
+    assertThat(reloadedN2.getEntity().getGrantRecordsVersion())
+        .isEqualTo(N2Updated.getGrantRecordsVersion());
+
+    // Should now have more grant records
+    
assertThat(reloadedN2.getGrantRecordsAsSecurable().size()).isGreaterThan(1);
+
+    // Verify the cache now contains the updated grant version
+    cachedN2 = cache.getEntityById(N2Updated.getId());
+    assertThat(cachedN2).isNotNull();
+    assertThat(cachedN2.getEntity().getGrantRecordsVersion())
+        .isEqualTo(N2Updated.getGrantRecordsVersion());
+  }
+
+  @Test
+  public void testBatchLoadVersionRetryLogic() {
+    // get a new cache
+    PolarisMetaStoreManager metaStoreManager = 
Mockito.spy(this.metaStoreManager);
+    InMemoryEntityCache cache =
+        new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(), 
metaStoreManager);
+
+    // Load catalog into cache
+    EntityCacheLookupResult lookup =
+        cache.getOrLoadEntityByName(
+            this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
+    assertThat(lookup).isNotNull();
+    PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+    // Get entities that we can work with
+    PolarisBaseEntity N1 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N1");
+    PolarisBaseEntity N2 =
+        this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N2");
+
+    // Load N2 into cache initially
+    cache.getOrLoadEntityByName(
+        this.callCtx,
+        new EntityCacheByNameKey(catalog.getId(), N1.getId(), 
PolarisEntityType.NAMESPACE, "N2"));
+
+    // Verify it's in cache with original version
+    ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId());
+    assertThat(cachedN2).isNotNull();
+    int originalEntityVersion = cachedN2.getEntity().getEntityVersion();
+
+    // Update the entity multiple times to create version skew
+    PolarisBaseEntity N2v2 =
+        this.tm.updateEntity(List.of(catalog, N1), N2, "{\"v2\": \"value\"}", 
null);
+
+    // the first call should return the first version, then we call the real 
method to get the
+    // latest
+    Mockito.doReturn(
+            new ChangeTrackingResult(
+                List.of(
+                    changeTrackingFor(catalog), changeTrackingFor(N1), 
changeTrackingFor(N2v2))))
+        .when(metaStoreManager)
+        .loadEntitiesChangeTracking(Mockito.any(), Mockito.any());
+    Mockito.doCallRealMethod()
+        .when(metaStoreManager)
+        .loadEntitiesChangeTracking(Mockito.any(), Mockito.any());
+
+    // update again to create v3, which isn't returned in the change tracking 
result
+    PolarisBaseEntity N2v3 =
+        this.tm.updateEntity(List.of(catalog, N1), N2v2, "{\"v3\": 
\"value\"}", null);
+
+    // Verify versions increased
+    assertThat(N2v2.getEntityVersion()).isGreaterThan(originalEntityVersion);
+    assertThat(N2v3.getEntityVersion()).isGreaterThan(N2v2.getEntityVersion());
+
+    // Create entity ID list
+    List<PolarisEntityId> entityIds =
+        List.of(getPolarisEntityId(catalog), getPolarisEntityId(N1), 
getPolarisEntityId(N2));
+
+    // Call batch load - this should detect the stale versions and reload 
until consistent
+    List<EntityCacheLookupResult> results =
+        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.NAMESPACE, entityIds);
+
+    // Verify the entity was reloaded with the latest version
+    assertThat(results).hasSize(3);
+    assertThat(results)
+        .doesNotContainNull()
+        .extracting(EntityCacheLookupResult::getCacheEntry)
+        .doesNotContainNull()
+        .extracting(e -> e.getEntity().getId())
+        .containsExactly(catalog.getId(), N1.getId(), N2.getId());
+
+    ResolvedPolarisEntity reloadedN2 = results.get(2).getCacheEntry();
+    assertThat(reloadedN2.getEntity().getId()).isEqualTo(N2v3.getId());
+    
assertThat(reloadedN2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion());
+
+    // Verify the cache now contains the latest version
+    cachedN2 = cache.getEntityById(N2v3.getId());
+    assertThat(cachedN2).isNotNull();
+    
assertThat(cachedN2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion());
+  }
+
+  private static PolarisEntityId getPolarisEntityId(PolarisBaseEntity catalog) 
{
+    return new PolarisEntityId(catalog.getCatalogId(), catalog.getId());
+  }
+
+  @Test
+  public void testConcurrentClientLoadingBehavior() throws Exception {
+    // Load catalog into cache
+    EntityCacheLookupResult lookup =
+        allocateNewCache()
+            .getOrLoadEntityByName(
+                this.callCtx, new 
EntityCacheByNameKey(PolarisEntityType.CATALOG, "test"));
+    assertThat(lookup).isNotNull();
+    PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+    // Get multiple entities to create a larger list for concurrent processing
+    PolarisBaseEntity N1 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N1");
+    PolarisBaseEntity N2 =
+        this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N2");
+    PolarisBaseEntity N3 =
+        this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N3");
+    PolarisBaseEntity N5 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N5");
+    PolarisBaseEntity N5_N6 =
+        this.tm.ensureExistsByName(List.of(catalog, N5), 
PolarisEntityType.NAMESPACE, "N6");
+
+    // Create entity IDs list for both clients
+    List<PolarisEntityId> entityIds =
+        List.of(
+            getPolarisEntityId(N1),
+            getPolarisEntityId(N2),
+            getPolarisEntityId(N3),
+            getPolarisEntityId(N5),
+            getPolarisEntityId(N5_N6));
+
+    // Update one of the entities to create version differences
+    PolarisBaseEntity N2v2 =
+        this.tm.updateEntity(
+            List.of(catalog, N1), N2, "{\"concurrent_test\": 
\"client1_version\"}", null);
+    PolarisBaseEntity N2v3 =
+        this.tm.updateEntity(
+            List.of(catalog, N1), N2v2, "{\"concurrent_test\": 
\"client2_version\"}", null);
+
+    // Mock the metastore manager to control the timing of method calls
+    PolarisMetaStoreManager mockedMetaStoreManager = 
Mockito.spy(this.metaStoreManager);
+
+    // Create caches with the mocked metastore manager
+    InMemoryEntityCache cache =
+        new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(), 
mockedMetaStoreManager);
+
+    // Synchronization primitives for controlling execution order
+    CountDownLatch client1ChangeTrackingResult = new CountDownLatch(1);
+    Semaphore client1ResolvedEntitiesBlocker = new Semaphore(0);
+
+    // Atomic references to capture results from both threads
+    AtomicReference<Exception> client1Exception = new AtomicReference<>();
+    AtomicReference<Exception> client2Exception = new AtomicReference<>();
+
+    // Configure mock behavior:
+    // 1. Allow both threads to call loadEntitiesChangeTracking() with 
different versions
+    // 2. Block client1's loadResolvedEntities() until client2's 
loadResolvedEntities() completes
+
+    // Mock loadEntitiesChangeTracking to return different versions for each 
client
+    Mockito.doAnswer(
+            invocation -> {
+              // First call (client1) - returns older version for N2
+              LOGGER.debug("Returning change tracking for client1");
+              return new ChangeTrackingResult(
+                  List.of(
+                      changeTrackingFor(N1),
+                      changeTrackingFor(N2v2), // older version
+                      changeTrackingFor(N3),
+                      changeTrackingFor(N5),
+                      changeTrackingFor(N5_N6)));
+            })
+        .doAnswer(
+            invocation -> {
+              // Second call (client2) - returns newer version for N2
+              LOGGER.debug("Returning change tracking for client2");
+              return new ChangeTrackingResult(
+                  List.of(
+                      changeTrackingFor(N1),
+                      changeTrackingFor(N2v3), // newer version
+                      changeTrackingFor(N3),
+                      changeTrackingFor(N5),
+                      changeTrackingFor(N5_N6)));
+            })
+        .when(mockedMetaStoreManager)
+        .loadEntitiesChangeTracking(Mockito.any(), Mockito.any());
+
+    // Mock loadResolvedEntities to control timing - client1 blocks until 
client2 completes
+    // client1 receives the older version of all entities, while client2 
receives the newer version
+    // of N2
+    Mockito.doAnswer(
+            invocation -> {
+              // This is client1's loadResolvedEntities call - block until 
client2 completes
+              try {
+                client1ChangeTrackingResult.countDown();
+                LOGGER.debug("Awaiting client2 to complete resolved entities 
load");
+                client1ResolvedEntitiesBlocker.acquire(); // Block until 
client2 signals completion
+                List<ResolvedPolarisEntity> resolvedEntities =
+                    List.of(
+                        getResolvedPolarisEntity(N1),
+                        getResolvedPolarisEntity(N2v2),
+                        getResolvedPolarisEntity(N3),
+                        getResolvedPolarisEntity(N5),
+                        getResolvedPolarisEntity(N5_N6));
+                LOGGER.debug("Client1 returning results {}", resolvedEntities);
+                return new ResolvedEntitiesResult(resolvedEntities);
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException(e);
+              }
+            })
+        .doAnswer(
+            invocation -> {
+              // This is client2's loadResolvedEntities call - execute 
normally and signal client1
+              try {
+                LOGGER.debug("Client2 loading resolved entities");
+                var result =
+                    new ResolvedEntitiesResult(
+                        List.of(
+                            getResolvedPolarisEntity(N1),
+                            getResolvedPolarisEntity(N2v3),
+                            getResolvedPolarisEntity(N3),
+                            getResolvedPolarisEntity(N5),
+                            getResolvedPolarisEntity(N5_N6)));
+                client1ResolvedEntitiesBlocker.release(); // Allow client1 to 
proceed
+                LOGGER.debug("Client2 returning results {}", 
result.getResolvedEntities());
+                return result;
+              } catch (Exception e) {
+                client1ResolvedEntitiesBlocker.release(); // Release in case 
of error
+                throw e;
+              }
+            })
+        .when(mockedMetaStoreManager)
+        .loadResolvedEntities(Mockito.any(), Mockito.any(), Mockito.any());
+
+    // ExecutorService isn't AutoCloseable in JDK 11 :(
+    ExecutorService executorService = Executors.newFixedThreadPool(2);
+    try {
+      // Client 1 task - should get older version and be blocked during 
loadResolvedEntities
+      Future<List<EntityCacheLookupResult>> client1Task =
+          executorService.submit(
+              () -> {
+                try {
+                  // Client1 calls getOrLoadResolvedEntities
+                  // - loadEntitiesChangeTracking returns older version for N2
+                  // - loadResolvedEntities will block until client2 completes
+                  List<EntityCacheLookupResult> results =
+                      cache.getOrLoadResolvedEntities(
+                          this.callCtx, PolarisEntityType.NAMESPACE, 
entityIds);
+
+                  return results;
+                } catch (Exception e) {
+                  client1Exception.set(e);
+                  return null;
+                }
+              });
+
+      // Client 2 task - should get newer version and complete first
+      Future<List<EntityCacheLookupResult>> client2Task =
+          executorService.submit(
+              () -> {
+                try {
+                  // Client2 calls getOrLoadResolvedEntities
+                  // - loadEntitiesChangeTracking returns newer version for N2
+                  // - loadResolvedEntities executes normally and signals 
client1 when done
+                  client1ChangeTrackingResult.await();
+                  List<EntityCacheLookupResult> results =
+                      cache.getOrLoadResolvedEntities(
+                          this.callCtx, PolarisEntityType.NAMESPACE, 
entityIds);
+
+                  return results;
+                } catch (Exception e) {
+                  client2Exception.set(e);
+                  client1ResolvedEntitiesBlocker.release(); // Release in case 
of error
+                  return null;
+                }
+              });
+
+      // Wait for both tasks to complete
+      List<EntityCacheLookupResult> client1Results = client1Task.get();
+      List<EntityCacheLookupResult> client2Results = client2Task.get();
+
+      // Verify no exceptions occurred
+      assertThat(client1Exception.get()).isNull();
+      assertThat(client2Exception.get()).isNull();
+
+      // Verify both clients got results
+      assertThat(client1Results).isNotNull();
+      assertThat(client2Results).isNotNull();
+      assertThat(client1Results).hasSize(5);
+      assertThat(client2Results).hasSize(5);
+
+      // All entities should be found
+      assertThat(client1Results).doesNotContainNull();
+      assertThat(client2Results).doesNotContainNull();
+
+      // Verify that client1 got the older version of N2 (index 1 in the list)
+      ResolvedPolarisEntity client1N2 = client1Results.get(1).getCacheEntry();
+      assertThat(client1N2.getEntity().getId()).isEqualTo(N2.getId());
+      
assertThat(client1N2.getEntity().getEntityVersion()).isEqualTo(N2v2.getEntityVersion());
+
+      // Verify that client2 got the newer version of N2
+      ResolvedPolarisEntity client2N2 = client2Results.get(1).getCacheEntry();
+      assertThat(client2N2.getEntity().getId()).isEqualTo(N2.getId());
+      
assertThat(client2N2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion());
+
+      // Verify that both clients got consistent versions for other entities
+      for (int i = 0; i < 5; i++) {
+        if (i != 1) { // Skip N2 which we expect to be different
+          ResolvedPolarisEntity client1Entity = 
client1Results.get(i).getCacheEntry();
+          ResolvedPolarisEntity client2Entity = 
client2Results.get(i).getCacheEntry();
+
+          assertThat(client1Entity.getEntity().getId())
+              .isEqualTo(client2Entity.getEntity().getId());
+          assertThat(client1Entity.getEntity().getEntityVersion())
+              .isEqualTo(client2Entity.getEntity().getEntityVersion());
+          assertThat(client1Entity.getEntity().getGrantRecordsVersion())
+              .isEqualTo(client2Entity.getEntity().getGrantRecordsVersion());
+        }
+      }
+      assertThat(entityIds).extracting(id -> 
cache.getEntityById(id.getId())).doesNotContainNull();
+    } finally {
+      executorService.shutdown();
+    }
+  }
+
+  private static ResolvedPolarisEntity 
getResolvedPolarisEntity(PolarisBaseEntity catalog) {
+    return new ResolvedPolarisEntity(PolarisEntity.of(catalog), List.of(), 
List.of());
+  }
+
+  private static PolarisChangeTrackingVersions 
changeTrackingFor(PolarisBaseEntity entity) {
+    return new PolarisChangeTrackingVersions(
+        entity.getEntityVersion(), entity.getGrantRecordsVersion());
   }
 }
diff --git 
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
 
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
index 0eb7414b6..222e36a47 100644
--- 
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
+++ 
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
@@ -118,7 +118,7 @@ public abstract class BasePolarisMetaStoreManagerTest {
         .containsExactly(PolarisEntity.toCore(task1), 
PolarisEntity.toCore(task2));
 
     List<PolarisBaseEntity> listedEntities =
-        metaStoreManager.loadFullEntitiesAll(
+        metaStoreManager.listFullEntitiesAll(
             polarisTestMetaStoreManager.polarisCallContext,
             null,
             PolarisEntityType.TASK,
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
index 656323647..70b7f3624 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
@@ -953,7 +953,7 @@ public class PolarisAdminService {
   /** List all catalogs without checking for permission. */
   private Stream<CatalogEntity> listCatalogsUnsafe() {
     return metaStoreManager
-        .loadFullEntitiesAll(
+        .listFullEntitiesAll(
             getCurrentPolarisContext(),
             null,
             PolarisEntityType.CATALOG,
@@ -1195,7 +1195,7 @@ public class PolarisAdminService {
     authorizeBasicRootOperationOrThrow(op);
 
     return metaStoreManager
-        .loadFullEntitiesAll(
+        .listFullEntitiesAll(
             getCurrentPolarisContext(),
             null,
             PolarisEntityType.PRINCIPAL,
@@ -1303,7 +1303,7 @@ public class PolarisAdminService {
     authorizeBasicRootOperationOrThrow(op);
 
     return metaStoreManager
-        .loadFullEntitiesAll(
+        .listFullEntitiesAll(
             getCurrentPolarisContext(),
             null,
             PolarisEntityType.PRINCIPAL_ROLE,
@@ -1431,7 +1431,7 @@ public class PolarisAdminService {
             .orElseThrow(() -> new NotFoundException("Parent catalog %s not 
found", catalogName));
     List<PolarisEntityCore> catalogPath = 
PolarisEntity.toCoreList(List.of(catalogEntity));
     return metaStoreManager
-        .loadFullEntitiesAll(
+        .listFullEntitiesAll(
             getCurrentPolarisContext(),
             catalogPath,
             PolarisEntityType.CATALOG_ROLE,
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
index 9cd061ab6..b3fbd0317 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
@@ -189,7 +189,7 @@ public class PolicyCatalog {
     }
     // with a policyType filter we need to load the full PolicyEntity to apply 
the filter
     return metaStoreManager
-        .loadFullEntitiesAll(
+        .listFullEntitiesAll(
             callContext.getPolarisCallContext(),
             catalogPath,
             PolarisEntityType.POLICY,

Reply via email to