This is an automated email from the ASF dual-hosted git repository.
collado pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new 0e6151422 Add loadEntities batch call and rename listFullEntities
(#2508)
0e6151422 is described below
commit 0e6151422eb10ea902d2df79843360cc66012a30
Author: Michael Collado <[email protected]>
AuthorDate: Fri Nov 7 09:12:06 2025 -0800
Add loadEntities batch call and rename listFullEntities (#2508)
* Add loadEntities batch call and rename listFullEntities
* Changed batch call to implement loadResolvedEntities instead
* Add loadResolvedEntities by id and entity cache support
* Add additional test for loadResolvedEntities by id
* Added additional test and updated comments in EntityCache interface
* Add additional constructor to ResolvedEntitiesResult
* Fixed unused method reference
* Removed loadResolvedEntities method with lookup record param
* Pulled out toResolvedPolarisEntity method per PR comment
---
.../relational/jdbc/JdbcBasePersistenceImpl.java | 11 +-
.../AtomicOperationMetaStoreManager.java | 60 +-
.../polaris/core/persistence/BasePersistence.java | 4 +-
.../core/persistence/PolarisMetaStoreManager.java | 31 +-
.../TransactionWorkspaceMetaStoreManager.java | 13 +-
.../core/persistence/cache/EntityCache.java | 30 +
.../persistence/cache/InMemoryEntityCache.java | 134 +++-
.../dao/entity/ResolvedEntitiesResult.java | 56 ++
.../AbstractTransactionalPersistence.java | 2 +-
.../TransactionalMetaStoreManagerImpl.java | 64 +-
.../transactional/TransactionalPersistence.java | 2 +-
.../persistence/cache/InMemoryEntityCacheTest.java | 846 +++++++++++++++++----
.../BasePolarisMetaStoreManagerTest.java | 8 +-
.../persistence/PolarisTestMetaStoreManager.java | 112 +++
.../polaris/service/admin/PolarisAdminService.java | 8 +-
.../service/catalog/policy/PolicyCatalog.java | 2 +-
16 files changed, 1222 insertions(+), 161 deletions(-)
diff --git
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
index 8d6201453..9401df2dd 100644
---
a/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
+++
b/persistence/relational-jdbc/src/main/java/org/apache/polaris/persistence/relational/jdbc/JdbcBasePersistenceImpl.java
@@ -31,6 +31,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -463,7 +464,12 @@ public class JdbcBasePersistenceImpl implements
BasePersistence, IntegrationPers
PreparedQuery query =
QueryGenerator.generateSelectQueryWithEntityIds(realmId,
schemaVersion, entityIds);
try {
- return datasourceOperations.executeSelect(query, new
ModelEntity(schemaVersion));
+ Map<PolarisEntityId, PolarisBaseEntity> idMap =
+ datasourceOperations.executeSelect(query, new
ModelEntity(schemaVersion)).stream()
+ .collect(
+ Collectors.toMap(
+ e -> new PolarisEntityId(e.getCatalogId(), e.getId()),
Function.identity()));
+ return entityIds.stream().map(idMap::get).collect(Collectors.toList());
} catch (SQLException e) {
throw new RuntimeException(
String.format("Failed to retrieve polaris entities due to %s",
e.getMessage()), e);
@@ -476,6 +482,7 @@ public class JdbcBasePersistenceImpl implements
BasePersistence, IntegrationPers
@Nonnull PolarisCallContext callCtx, List<PolarisEntityId> entityIds) {
Map<PolarisEntityId, ModelEntity> idToEntityMap =
lookupEntities(callCtx, entityIds).stream()
+ .filter(Objects::nonNull)
.collect(
Collectors.toMap(
entry -> new PolarisEntityId(entry.getCatalogId(),
entry.getId()),
@@ -570,7 +577,7 @@ public class JdbcBasePersistenceImpl implements
BasePersistence, IntegrationPers
@Nonnull
@Override
- public <T> Page<T> loadEntities(
+ public <T> Page<T> listFullEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
index c3841486a..1ec8c89d4 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.config.FeatureConfiguration;
@@ -67,6 +68,7 @@ import
org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult;
import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult;
import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult;
import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
import org.apache.polaris.core.persistence.pagination.Page;
@@ -705,7 +707,7 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
/** {@inheritDoc} */
@Override
- public @Nonnull Page<PolarisBaseEntity> loadEntities(
+ public @Nonnull Page<PolarisBaseEntity> listFullEntities(
@Nonnull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@Nonnull PolarisEntityType entityType,
@@ -728,7 +730,7 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
// with sensitive data; but the window of inconsistency is only the
duration of a single
// in-flight request (the cache-based resolution follows a different path
entirely).
- return ms.loadEntities(
+ return ms.listFullEntities(
callCtx,
catalogId,
parentId,
@@ -1200,7 +1202,7 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
// get the list of catalog roles, at most 2
List<PolarisBaseEntity> catalogRoles =
- ms.loadEntities(
+ ms.listFullEntities(
callCtx,
catalogId,
catalogId,
@@ -1520,7 +1522,7 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
// find all available tasks
Page<PolarisBaseEntity> availableTasks =
- ms.loadEntities(
+ ms.listFullEntities(
callCtx,
PolarisEntityConstants.getRootEntityId(),
PolarisEntityConstants.getRootEntityId(),
@@ -1760,6 +1762,56 @@ public class AtomicOperationMetaStoreManager extends
BaseMetaStoreManager {
return result;
}
+ @Nonnull
+ @Override
+ public ResolvedEntitiesResult loadResolvedEntities(
+ @Nonnull PolarisCallContext callCtx,
+ @Nonnull PolarisEntityType entityType,
+ @Nonnull List<PolarisEntityId> entityIds) {
+ BasePersistence ms = callCtx.getMetaStore();
+ return getResolvedEntitiesResult(callCtx, ms, entityIds, i -> entityType);
+ }
+
+ private static ResolvedEntitiesResult getResolvedEntitiesResult(
+ PolarisCallContext callCtx,
+ BasePersistence ms,
+ List<PolarisEntityId> entityIds,
+ Function<Integer, PolarisEntityType> entityTypeForIndex) {
+ List<PolarisBaseEntity> entities = ms.lookupEntities(callCtx, entityIds);
+ // mimic the behavior of loadEntity above, return null if not found or
type mismatch
+ List<ResolvedPolarisEntity> ret =
+ IntStream.range(0, entityIds.size())
+ .mapToObj(
+ i -> {
+ if (entities.get(i) != null
+ &&
!entities.get(i).getType().equals(entityTypeForIndex.apply(i))) {
+ return null;
+ } else {
+ return entities.get(i);
+ }
+ })
+ .map(e -> toResolvedPolarisEntity(callCtx, e, ms))
+ .collect(Collectors.toList());
+ return new ResolvedEntitiesResult(ret);
+ }
+
+ private static ResolvedPolarisEntity toResolvedPolarisEntity(
+ PolarisCallContext callCtx, PolarisBaseEntity e, BasePersistence ms) {
+ if (e == null) {
+ return null;
+ } else {
+ // load the grant records
+ final List<PolarisGrantRecord> grantRecordsAsSecurable =
+ ms.loadAllGrantRecordsOnSecurable(callCtx, e.getCatalogId(),
e.getId());
+ final List<PolarisGrantRecord> grantRecordsAsGrantee =
+ e.getType().isGrantee()
+ ? ms.loadAllGrantRecordsOnGrantee(callCtx, e.getCatalogId(),
e.getId())
+ : List.of();
+ return new ResolvedPolarisEntity(
+ PolarisEntity.of(e), grantRecordsAsGrantee, grantRecordsAsSecurable);
+ }
+ }
+
/** {@inheritDoc} */
@Override
public @Nonnull ResolvedEntityResult refreshResolvedEntity(
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java
index d238e490e..afea79fda 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/BasePersistence.java
@@ -279,7 +279,7 @@ public interface BasePersistence extends
PolicyMappingPersistence {
/**
* List lightweight information of entities matching the given criteria with
pagination. If all
- * properties of the entity are required,use {@link #loadEntities} instead.
+ * properties of the entity are required,use {@link #listFullEntities}
instead.
*
* @param callCtx call context
* @param catalogId catalog id for that entity, NULL_ID if the entity is
top-level
@@ -314,7 +314,7 @@ public interface BasePersistence extends
PolicyMappingPersistence {
* @return the paged list of matching entities after transformation
*/
@Nonnull
- <T> Page<T> loadEntities(
+ <T> Page<T> listFullEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
index cf3912fa9..efa73a60b 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java
@@ -47,6 +47,7 @@ import
org.apache.polaris.core.persistence.dao.entity.EntityResult;
import org.apache.polaris.core.persistence.dao.entity.EntityWithPath;
import org.apache.polaris.core.persistence.dao.entity.GenerateEntityIdResult;
import org.apache.polaris.core.persistence.dao.entity.ListEntitiesResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
import org.apache.polaris.core.persistence.pagination.Page;
import org.apache.polaris.core.persistence.pagination.PageToken;
@@ -114,7 +115,7 @@ public interface PolarisMetaStoreManager
/**
* List lightweight information about entities matching the given criteria.
If all properties of
- * the entity are required,use {@link #loadEntities} instead.
+ * the entity are required,use {@link #listFullEntities} instead.
*
* @param callCtx call context
* @param catalogPath path inside a catalog. If null or empty, the entities
to list are top-level,
@@ -135,7 +136,7 @@ public interface PolarisMetaStoreManager
/**
* Load full entities matching the given criteria with pagination. If only
the entity name/id/type
* is required, use {@link #listEntities} instead. If no pagination is
required, use {@link
- * #loadEntitiesAll} instead.
+ * #listFullEntitiesAll} instead.
*
* @param callCtx call context
* @param catalogPath path inside a catalog. If null or empty, the entities
to list are top-level,
@@ -145,7 +146,7 @@ public interface PolarisMetaStoreManager
* @return paged list of matching entities
*/
@Nonnull
- Page<PolarisBaseEntity> loadEntities(
+ Page<PolarisBaseEntity> listFullEntities(
@Nonnull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@Nonnull PolarisEntityType entityType,
@@ -154,7 +155,7 @@ public interface PolarisMetaStoreManager
/**
* Load full entities matching the given criteria into an unpaged list. If
pagination is required
- * use {@link #loadEntities} instead. If only the entity name/id/type is
required, use {@link
+ * use {@link #listFullEntities} instead. If only the entity name/id/type is
required, use {@link
* #listEntities} instead.
*
* @param callCtx call context
@@ -164,12 +165,13 @@ public interface PolarisMetaStoreManager
* @param entitySubType subType of entities to list (or ANY_SUBTYPE)
* @return list of all matching entities
*/
- default @Nonnull List<PolarisBaseEntity> loadEntitiesAll(
+ default @Nonnull List<PolarisBaseEntity> listFullEntitiesAll(
@Nonnull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@Nonnull PolarisEntityType entityType,
@Nonnull PolarisEntitySubType entitySubType) {
- return loadEntities(callCtx, catalogPath, entityType, entitySubType,
PageToken.readEverything())
+ return listFullEntities(
+ callCtx, catalogPath, entityType, entitySubType,
PageToken.readEverything())
.items();
}
@@ -416,6 +418,23 @@ public interface PolarisMetaStoreManager
@Nonnull PolarisEntityType entityType,
@Nonnull String entityName);
+ /**
+ * Load a batch of resolved entities of a specified entity type given their
{@link
+ * PolarisEntityId}. Will return an empty list if the input list is empty.
Order in that returned
+ * list is the same as the input list. Some elements might be NULL if the
entity has been dropped.
+ *
+ * @param callCtx call context
+ * @param entityType the type of entities to load
+ * @param entityIds the list of entity ids to load
+ * @return a non-null list of entities corresponding to the lookup keys.
Some elements might be
+ * NULL if the entity has been dropped.
+ */
+ @Nonnull
+ ResolvedEntitiesResult loadResolvedEntities(
+ @Nonnull PolarisCallContext callCtx,
+ @Nonnull PolarisEntityType entityType,
+ @Nonnull List<PolarisEntityId> entityIds);
+
/**
* Refresh a resolved entity from the backend store. Will return NULL if the
entity does not
* exist, i.e. has been purged or dropped. Else, will determine what has
changed based on the
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
index 872955893..99c1f8162 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
@@ -53,6 +53,7 @@ import
org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult;
import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult;
import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult;
import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
import org.apache.polaris.core.persistence.pagination.Page;
@@ -133,7 +134,7 @@ public class TransactionWorkspaceMetaStoreManager
implements PolarisMetaStoreMan
}
@Override
- public @Nonnull Page<PolarisBaseEntity> loadEntities(
+ public @Nonnull Page<PolarisBaseEntity> listFullEntities(
@Nonnull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@Nonnull PolarisEntityType entityType,
@@ -379,6 +380,16 @@ public class TransactionWorkspaceMetaStoreManager
implements PolarisMetaStoreMan
return null;
}
+ @Nonnull
+ @Override
+ public ResolvedEntitiesResult loadResolvedEntities(
+ @Nonnull PolarisCallContext callCtx,
+ @Nonnull PolarisEntityType entityType,
+ @Nonnull List<PolarisEntityId> entityIds) {
+ diagnostics.fail("illegal_method_in_transaction_workspace",
"loadResolvedEntities");
+ return null;
+ }
+
@Override
public ResolvedEntityResult refreshResolvedEntity(
@Nonnull PolarisCallContext callCtx,
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
index cd438c995..481fa7a9e 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
@@ -20,8 +20,10 @@ package org.apache.polaris.core.persistence.cache;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
+import java.util.List;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisEntityId;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
@@ -80,4 +82,32 @@ public interface EntityCache {
@Nullable
EntityCacheLookupResult getOrLoadEntityByName(
@Nonnull PolarisCallContext callContext, @Nonnull EntityCacheByNameKey
entityNameKey);
+
+ /**
+ * Load multiple entities by id, returning those found in the cache and
loading those not found.
+ *
+ * <p>Cached entity versions and grant versions must be verified against the
versions returned by
+ * the {@link
+ *
org.apache.polaris.core.persistence.PolarisMetaStoreManager#loadEntitiesChangeTracking(PolarisCallContext,
+ * List)} API to ensure the returned entities are consistent with the
current state of the
+ * metastore. Cache implementations must never return a mix of stale
entities and fresh entities,
+ * as authorization or table conflict decisions could be made based on
inconsistent data. For
+ * example, a Principal may have a grant to a Principal Role in a cached
entry, but that grant may
+ * be revoked prior to the Principal Role being granted a privilege on a
Catalog. If the Principal
+ * record is stale, but the Principal Role is refreshed, the Principal may
be incorrectly
+ * authorized to access the Catalog.
+ *
+ * @param callCtx the Polaris call context
+ * @param entityType the entity type
+ * @param entityIds the list of entity ids to load
+ * @return the list of resolved entities, in the same order as the requested
entity ids. As in
+ * {@link
+ *
org.apache.polaris.core.persistence.PolarisMetaStoreManager#loadResolvedEntities(PolarisCallContext,
+ * PolarisEntityType, List)}, elements in the returned list may be null
if the corresponding
+ * entity id does not exist.
+ */
+ List<EntityCacheLookupResult> getOrLoadResolvedEntities(
+ @Nonnull PolarisCallContext callCtx,
+ @Nonnull PolarisEntityType entityType,
+ @Nonnull List<PolarisEntityId> entityIds);
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
index c30b996f1..cf5ad0c21 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
@@ -24,24 +24,39 @@ import com.github.benmanes.caffeine.cache.RemovalListener;
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.config.BehaviorChangeConfiguration;
import org.apache.polaris.core.config.FeatureConfiguration;
import org.apache.polaris.core.config.RealmConfig;
import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisChangeTrackingVersions;
+import org.apache.polaris.core.entity.PolarisEntityId;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
+import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** An in-memory entity cache with a limit of 100k entities and a 1h TTL. */
public class InMemoryEntityCache implements EntityCache {
-
+ private static final Logger LOGGER =
LoggerFactory.getLogger(InMemoryEntityCache.class);
+ public static final int MAX_CACHE_REFRESH_ATTEMPTS = 100;
private final PolarisDiagnostics diagnostics;
private final PolarisMetaStoreManager polarisMetaStoreManager;
private final Cache<Long, ResolvedPolarisEntity> byId;
@@ -451,4 +466,121 @@ public class InMemoryEntityCache implements EntityCache {
// return what we found
return new EntityCacheLookupResult(entry, cacheHit);
}
+
+ @Override
+ public List<EntityCacheLookupResult> getOrLoadResolvedEntities(
+ @Nonnull PolarisCallContext callCtx,
+ @Nonnull PolarisEntityType entityType,
+ @Nonnull List<PolarisEntityId> entityIds) {
+ // use a map to collect cached entries to avoid concurrency problems in
case a second thread is
+ // trying to populate
+ // the cache from a different snapshot
+ Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities = new
HashMap<>();
+ boolean stateResolved = false;
+ for (int i = 0; i < MAX_CACHE_REFRESH_ATTEMPTS; i++) {
+ Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc =
+ idsToLoad -> polarisMetaStoreManager.loadResolvedEntities(callCtx,
entityType, idsToLoad);
+ if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc))
{
+ stateResolved = true;
+ break;
+ }
+ }
+ if (!stateResolved) {
+ LOGGER.warn(
+ "Unable to resolve entities in cache after multiple attempts {} -
resolved: {}",
+ entityIds,
+ resolvedEntities);
+ diagnostics.fail("cannot_resolve_all_entities", "Unable to resolve
entities in cache");
+ }
+
+ return entityIds.stream()
+ .map(
+ id -> {
+ ResolvedPolarisEntity entity = resolvedEntities.get(id);
+ return entity == null ? null : new
EntityCacheLookupResult(entity, true);
+ })
+ .collect(Collectors.toList());
+ }
+
+ private boolean isCacheStateValid(
+ @Nonnull PolarisCallContext callCtx,
+ @Nonnull Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities,
+ @Nonnull List<PolarisEntityId> entityIds,
+ @Nonnull Function<List<PolarisEntityId>, ResolvedEntitiesResult>
loaderFunc) {
+ ChangeTrackingResult changeTrackingResult =
+ polarisMetaStoreManager.loadEntitiesChangeTracking(callCtx, entityIds);
+ List<PolarisEntityId> idsToLoad = new ArrayList<>();
+ if (changeTrackingResult.isSuccess()) {
+ idsToLoad.addAll(validateCacheEntries(entityIds, resolvedEntities,
changeTrackingResult));
+ } else {
+ idsToLoad.addAll(entityIds);
+ }
+ if (!idsToLoad.isEmpty()) {
+ ResolvedEntitiesResult resolvedEntitiesResult =
loaderFunc.apply(idsToLoad);
+ if (resolvedEntitiesResult.isSuccess()) {
+ LOGGER.debug("Resolved entities - validating cache");
+ resolvedEntitiesResult.getResolvedEntities().stream()
+ .filter(Objects::nonNull)
+ .forEach(
+ e -> {
+ this.cacheNewEntry(e);
+ resolvedEntities.put(
+ new PolarisEntityId(e.getEntity().getCatalogId(),
e.getEntity().getId()), e);
+ });
+ }
+ }
+
+ // the loader function should always return a batch of results from the
same "snapshot" of the
+ // persistence, so
+ // if the changeTracking call above failed, we should have loaded the
entire batch in one shot.
+ // There should be no
+ // need to revalidate the entities.
+ List<PolarisEntityId> idsToReload =
+ changeTrackingResult.isSuccess()
+ ? validateCacheEntries(entityIds, resolvedEntities,
changeTrackingResult)
+ : List.of();
+ return idsToReload.isEmpty();
+ }
+
+ private List<PolarisEntityId> validateCacheEntries(
+ List<PolarisEntityId> entityIds,
+ Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities,
+ ChangeTrackingResult changeTrackingResult) {
+ List<PolarisEntityId> idsToReload = new ArrayList<>();
+ Iterator<PolarisEntityId> idIterator = entityIds.iterator();
+ Iterator<PolarisChangeTrackingVersions> changeTrackingIterator =
+ changeTrackingResult.getChangeTrackingVersions().iterator();
+ while (idIterator.hasNext() && changeTrackingIterator.hasNext()) {
+ PolarisEntityId entityId = idIterator.next();
+ PolarisChangeTrackingVersions changeTrackingVersions =
changeTrackingIterator.next();
+ if (changeTrackingVersions == null) {
+ // entity has been purged
+ ResolvedPolarisEntity cachedEntity = getEntityById(entityId.getId());
+ if (cachedEntity != null || resolvedEntities.containsKey(entityId)) {
+ LOGGER.debug("Entity {} has been purged, removing from cache",
entityId);
+ Optional.ofNullable(cachedEntity).ifPresent(this::removeCacheEntry);
+ resolvedEntities.remove(entityId);
+ }
+ continue;
+ }
+ // compare versions using equals rather than less than so we can use the
same function to
+ // validate that the cache
+ // entries are consistent with a single call to the change tracking
table, rather than some
+ // grants ahead and some
+ // grants behind
+ ResolvedPolarisEntity cachedEntity =
+ resolvedEntities.computeIfAbsent(entityId, id ->
this.getEntityById(id.getId()));
+ if (cachedEntity == null
+ || cachedEntity.getEntity().getEntityVersion()
+ != changeTrackingVersions.getEntityVersion()
+ || cachedEntity.getEntity().getGrantRecordsVersion()
+ != changeTrackingVersions.getGrantRecordsVersion()) {
+ idsToReload.add(entityId);
+ } else {
+ resolvedEntities.put(entityId, cachedEntity);
+ }
+ }
+ LOGGER.debug("Cache entries {} need to be reloaded", idsToReload);
+ return idsToReload;
+ }
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ResolvedEntitiesResult.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ResolvedEntitiesResult.java
new file mode 100644
index 000000000..61eb27da1
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/dao/entity/ResolvedEntitiesResult.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.persistence.dao.entity;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.util.List;
+import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
+
+/** Response object for the loadResolvedEntities call. */
+public class ResolvedEntitiesResult extends BaseResult {
+ private final List<ResolvedPolarisEntity> resolvedEntities;
+
+ public ResolvedEntitiesResult(List<ResolvedPolarisEntity> resolvedEntities) {
+ super(ReturnStatus.SUCCESS, null);
+ this.resolvedEntities = resolvedEntities;
+ }
+
+ public ResolvedEntitiesResult(
+ @Nonnull ReturnStatus returnStatus, @Nullable String extraInformation) {
+ super(returnStatus, extraInformation);
+ this.resolvedEntities = null;
+ }
+
+ @JsonCreator
+ private ResolvedEntitiesResult(
+ @JsonProperty("returnStatus") ReturnStatus returnStatus,
+ @JsonProperty("extraInformation") String extraInformation,
+ @JsonProperty("resolvedEntities") List<ResolvedPolarisEntity>
resolvedEntities) {
+ super(returnStatus, extraInformation);
+ this.resolvedEntities = resolvedEntities;
+ }
+
+ public List<ResolvedPolarisEntity> getResolvedEntities() {
+ return resolvedEntities;
+ }
+}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java
index 902e2c8d6..4eaf40fe5 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/AbstractTransactionalPersistence.java
@@ -389,7 +389,7 @@ public abstract class AbstractTransactionalPersistence
implements TransactionalP
/** {@inheritDoc} */
@Override
@Nonnull
- public <T> Page<T> loadEntities(
+ public <T> Page<T> listFullEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
index 402cdc280..db3ccd0f3 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
@@ -31,6 +31,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.config.FeatureConfiguration;
@@ -56,6 +57,7 @@ import
org.apache.polaris.core.persistence.BaseMetaStoreManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.PolarisObjectMapperUtil;
import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
+import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
import org.apache.polaris.core.persistence.dao.entity.BaseResult;
import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult;
@@ -71,6 +73,7 @@ import
org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult;
import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult;
import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult;
import org.apache.polaris.core.persistence.dao.entity.PrivilegeResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
import org.apache.polaris.core.persistence.pagination.Page;
@@ -723,10 +726,10 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
}
/**
- * See {@link PolarisMetaStoreManager#loadEntities(PolarisCallContext, List,
PolarisEntityType,
- * PolarisEntitySubType, PageToken)}
+ * See {@link PolarisMetaStoreManager#listFullEntities(PolarisCallContext,
List,
+ * PolarisEntityType, PolarisEntitySubType, PageToken)}
*/
- private @Nonnull Page<PolarisBaseEntity> loadEntities(
+ private @Nonnull Page<PolarisBaseEntity> listFullEntities(
@Nonnull PolarisCallContext callCtx,
@Nonnull TransactionalPersistence ms,
@Nullable List<PolarisEntityCore> catalogPath,
@@ -756,7 +759,7 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
/** {@inheritDoc} */
@Override
- public @Nonnull Page<PolarisBaseEntity> loadEntities(
+ public @Nonnull Page<PolarisBaseEntity> listFullEntities(
@Nonnull PolarisCallContext callCtx,
@Nullable List<PolarisEntityCore> catalogPath,
@Nonnull PolarisEntityType entityType,
@@ -768,7 +771,7 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
// run operation in a read transaction
return ms.runInReadTransaction(
callCtx,
- () -> loadEntities(callCtx, ms, catalogPath, entityType,
entitySubType, pageToken));
+ () -> listFullEntities(callCtx, ms, catalogPath, entityType,
entitySubType, pageToken));
}
/** {@link #createPrincipal(PolarisCallContext, PrincipalEntity)} */
@@ -2293,6 +2296,57 @@ public class TransactionalMetaStoreManagerImpl extends
BaseMetaStoreManager {
return result;
}
+ private static ResolvedEntitiesResult getResolvedEntitiesResult(
+ PolarisCallContext callCtx,
+ TransactionalPersistence ms,
+ List<PolarisEntityId> entityIds,
+ Function<Integer, PolarisEntityType> entityTypeForIndex) {
+ List<PolarisBaseEntity> entities = ms.lookupEntitiesInCurrentTxn(callCtx,
entityIds);
+ // mimic the behavior of loadEntity above, return null if not found or
type mismatch
+ List<ResolvedPolarisEntity> ret =
+ IntStream.range(0, entityIds.size())
+ .mapToObj(
+ i -> {
+ if (entities.get(i) != null
+ &&
!entities.get(i).getType().equals(entityTypeForIndex.apply(i))) {
+ return null;
+ } else {
+ return entities.get(i);
+ }
+ })
+ .map(e -> toResolvedPolarisEntity(callCtx, e, ms))
+ .collect(Collectors.toList());
+ return new ResolvedEntitiesResult(ret);
+ }
+
+ private static ResolvedPolarisEntity toResolvedPolarisEntity(
+ PolarisCallContext callCtx, PolarisBaseEntity e,
TransactionalPersistence ms) {
+ if (e == null) {
+ return null;
+ } else {
+ // load the grant records
+ final List<PolarisGrantRecord> grantRecordsAsSecurable =
+ ms.loadAllGrantRecordsOnSecurableInCurrentTxn(callCtx,
e.getCatalogId(), e.getId());
+ final List<PolarisGrantRecord> grantRecordsAsGrantee =
+ e.getType().isGrantee()
+ ? ms.loadAllGrantRecordsOnSecurableInCurrentTxn(callCtx,
e.getCatalogId(), e.getId())
+ : List.of();
+ return new ResolvedPolarisEntity(
+ PolarisEntity.of(e), grantRecordsAsGrantee, grantRecordsAsSecurable);
+ }
+ }
+
+ @Nonnull
+ @Override
+ public ResolvedEntitiesResult loadResolvedEntities(
+ @Nonnull PolarisCallContext callCtx,
+ @Nonnull PolarisEntityType entityType,
+ @Nonnull List<PolarisEntityId> entityIds) {
+ TransactionalPersistence ms = ((TransactionalPersistence)
callCtx.getMetaStore());
+ return ms.runInReadTransaction(
+ callCtx, () -> getResolvedEntitiesResult(callCtx, ms, entityIds, i ->
entityType));
+ }
+
/** {@inheritDoc} */
private @Nonnull ResolvedEntityResult refreshResolvedEntity(
@Nonnull PolarisCallContext callCtx,
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java
index 3802908b8..dfcd5f925 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalPersistence.java
@@ -225,7 +225,7 @@ public interface TransactionalPersistence
pageToken);
}
- /** See {@link
org.apache.polaris.core.persistence.BasePersistence#loadEntities} */
+ /** See {@link
org.apache.polaris.core.persistence.BasePersistence#listFullEntities} */
@Nonnull
<T> Page<T> loadEntitiesInCurrentTxn(
@Nonnull PolarisCallContext callCtx,
diff --git
a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
index ce3432050..95774364d 100644
---
a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
+++
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
@@ -19,16 +19,25 @@
package org.apache.polaris.core.persistence.cache;
import static
org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS;
+import static org.assertj.core.api.Assertions.assertThat;
import java.time.Clock;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisChangeTrackingVersions;
import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityId;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PolarisGrantRecord;
@@ -37,6 +46,8 @@ import
org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.PolarisTestMetaStoreManager;
import org.apache.polaris.core.persistence.ResolvedPolarisEntity;
+import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
import
org.apache.polaris.core.persistence.transactional.TransactionalMetaStoreManagerImpl;
import
org.apache.polaris.core.persistence.transactional.TransactionalPersistence;
import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore;
@@ -44,10 +55,14 @@ import
org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPer
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Unit testing of the entity cache */
public class InMemoryEntityCacheTest {
+ public static final Logger LOGGER =
LoggerFactory.getLogger(InMemoryEntityCache.class);
private final PolarisDiagnostics diagServices;
private final PolarisCallContext callCtx;
private final PolarisTestMetaStoreManager tm;
@@ -105,31 +120,31 @@ public class InMemoryEntityCacheTest {
EntityCacheLookupResult lookup =
cache.getOrLoadEntityByName(
this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG,
"test"));
- Assertions.assertThat(lookup).isNotNull();
- Assertions.assertThat(lookup.isCacheHit()).isFalse();
- Assertions.assertThat(lookup.getCacheEntry()).isNotNull();
+ assertThat(lookup).isNotNull();
+ assertThat(lookup.isCacheHit()).isFalse();
+ assertThat(lookup.getCacheEntry()).isNotNull();
// validate the cache entry
PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
- Assertions.assertThat(catalog).isNotNull();
-
Assertions.assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG);
+ assertThat(catalog).isNotNull();
+ assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG);
// do it again, should be found in the cache
lookup =
cache.getOrLoadEntityByName(
this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG,
"test"));
- Assertions.assertThat(lookup).isNotNull();
- Assertions.assertThat(lookup.isCacheHit()).isTrue();
+ assertThat(lookup).isNotNull();
+ assertThat(lookup.isCacheHit()).isTrue();
// do it again by id, should be found in the cache
lookup =
cache.getOrLoadEntityById(
this.callCtx, catalog.getCatalogId(), catalog.getId(),
catalog.getType());
- Assertions.assertThat(lookup).isNotNull();
- Assertions.assertThat(lookup.isCacheHit()).isTrue();
- Assertions.assertThat(lookup.getCacheEntry()).isNotNull();
- Assertions.assertThat(lookup.getCacheEntry().getEntity()).isNotNull();
-
Assertions.assertThat(lookup.getCacheEntry().getGrantRecordsAsSecurable()).isNotNull();
+ assertThat(lookup).isNotNull();
+ assertThat(lookup.isCacheHit()).isTrue();
+ assertThat(lookup.getCacheEntry()).isNotNull();
+ assertThat(lookup.getCacheEntry().getEntity()).isNotNull();
+
assertThat(lookup.getCacheEntry().getGrantRecordsAsSecurable()).isNotNull();
// get N1
PolarisBaseEntity N1 =
@@ -140,128 +155,119 @@ public class InMemoryEntityCacheTest {
new EntityCacheByNameKey(
catalog.getId(), catalog.getId(), PolarisEntityType.NAMESPACE,
"N1");
ResolvedPolarisEntity cacheEntry = cache.getEntityByName(N1_name);
- Assertions.assertThat(cacheEntry).isNull();
+ assertThat(cacheEntry).isNull();
// try to find it in the cache by id. Should not be there, i.e. no cache
hit
lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(),
N1.getId(), N1.getType());
- Assertions.assertThat(lookup).isNotNull();
- Assertions.assertThat(lookup.isCacheHit()).isFalse();
+ assertThat(lookup).isNotNull();
+ assertThat(lookup.isCacheHit()).isFalse();
// should be there now, by name
cacheEntry = cache.getEntityByName(N1_name);
- Assertions.assertThat(cacheEntry).isNotNull();
- Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
- Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+ assertThat(cacheEntry).isNotNull();
+ assertThat(cacheEntry.getEntity()).isNotNull();
+ assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
// should be there now, by id
cacheEntry = cache.getEntityById(N1.getId());
- Assertions.assertThat(cacheEntry).isNotNull();
- Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
- Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+ assertThat(cacheEntry).isNotNull();
+ assertThat(cacheEntry.getEntity()).isNotNull();
+ assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
// lookup N1
ResolvedPolarisEntity N1_entry = cache.getEntityById(N1.getId());
- Assertions.assertThat(N1_entry).isNotNull();
- Assertions.assertThat(N1_entry.getEntity()).isNotNull();
- Assertions.assertThat(N1_entry.getGrantRecordsAsSecurable()).isNotNull();
+ assertThat(N1_entry).isNotNull();
+ assertThat(N1_entry.getEntity()).isNotNull();
+ assertThat(N1_entry.getGrantRecordsAsSecurable()).isNotNull();
// negative tests, load an entity which does not exist
lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), 10000,
N1.getType());
- Assertions.assertThat(lookup).isNull();
+ assertThat(lookup).isNull();
lookup =
cache.getOrLoadEntityByName(
this.callCtx,
new EntityCacheByNameKey(PolarisEntityType.CATALOG,
"non_existant_catalog"));
- Assertions.assertThat(lookup).isNull();
+ assertThat(lookup).isNull();
// lookup N2 to validate grants
EntityCacheByNameKey N2_name =
new EntityCacheByNameKey(catalog.getId(), N1.getId(),
PolarisEntityType.NAMESPACE, "N2");
lookup = cache.getOrLoadEntityByName(callCtx, N2_name);
- Assertions.assertThat(lookup).isNotNull();
+ assertThat(lookup).isNotNull();
ResolvedPolarisEntity cacheEntry_N1 = lookup.getCacheEntry();
- Assertions.assertThat(cacheEntry_N1).isNotNull();
- Assertions.assertThat(cacheEntry_N1.getEntity()).isNotNull();
-
Assertions.assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).isNotNull();
+ assertThat(cacheEntry_N1).isNotNull();
+ assertThat(cacheEntry_N1.getEntity()).isNotNull();
+ assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).isNotNull();
// lookup catalog role R1
EntityCacheByNameKey R1_name =
new EntityCacheByNameKey(
catalog.getId(), catalog.getId(), PolarisEntityType.CATALOG_ROLE,
"R1");
lookup = cache.getOrLoadEntityByName(callCtx, R1_name);
- Assertions.assertThat(lookup).isNotNull();
+ assertThat(lookup).isNotNull();
ResolvedPolarisEntity cacheEntry_R1 = lookup.getCacheEntry();
- Assertions.assertThat(cacheEntry_R1).isNotNull();
- Assertions.assertThat(cacheEntry_R1.getEntity()).isNotNull();
-
Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).isNotNull();
-
Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).isNotNull();
+ assertThat(cacheEntry_R1).isNotNull();
+ assertThat(cacheEntry_R1.getEntity()).isNotNull();
+ assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).isNotNull();
+ assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).isNotNull();
// we expect one TABLE_READ grant on that securable granted to the catalog
role R1
-
Assertions.assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).hasSize(1);
+ assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).hasSize(1);
PolarisGrantRecord gr = cacheEntry_N1.getGrantRecordsAsSecurable().get(0);
// securable is N1, grantee is R1
-
Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId());
- Assertions.assertThat(gr.getGranteeCatalogId())
- .isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
-
Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId());
- Assertions.assertThat(gr.getSecurableCatalogId())
- .isEqualTo(cacheEntry_N1.getEntity().getCatalogId());
- Assertions.assertThat(gr.getPrivilegeCode())
- .isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode());
+ assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId());
+
assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
+
assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId());
+
assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_N1.getEntity().getCatalogId());
+
assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode());
// R1 should have 4 privileges granted to it
- Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).hasSize(4);
+ assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).hasSize(4);
List<PolarisGrantRecord> matchPriv =
cacheEntry_R1.getGrantRecordsAsGrantee().stream()
.filter(
grantRecord ->
grantRecord.getPrivilegeCode() ==
PolarisPrivilege.TABLE_READ_DATA.getCode())
.collect(Collectors.toList());
- Assertions.assertThat(matchPriv).hasSize(1);
+ assertThat(matchPriv).hasSize(1);
gr = matchPriv.get(0);
-
Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId());
- Assertions.assertThat(gr.getGranteeCatalogId())
- .isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
-
Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId());
- Assertions.assertThat(gr.getSecurableCatalogId())
- .isEqualTo(cacheEntry_N1.getEntity().getCatalogId());
- Assertions.assertThat(gr.getPrivilegeCode())
- .isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode());
+ assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId());
+
assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
+
assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId());
+
assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_N1.getEntity().getCatalogId());
+
assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode());
// lookup principal role PR1
EntityCacheByNameKey PR1_name =
new EntityCacheByNameKey(PolarisEntityType.PRINCIPAL_ROLE, "PR1");
lookup = cache.getOrLoadEntityByName(callCtx, PR1_name);
- Assertions.assertThat(lookup).isNotNull();
+ assertThat(lookup).isNotNull();
ResolvedPolarisEntity cacheEntry_PR1 = lookup.getCacheEntry();
- Assertions.assertThat(cacheEntry_PR1).isNotNull();
- Assertions.assertThat(cacheEntry_PR1.getEntity()).isNotNull();
-
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).isNotNull();
-
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).isNotNull();
+ assertThat(cacheEntry_PR1).isNotNull();
+ assertThat(cacheEntry_PR1.getEntity()).isNotNull();
+ assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).isNotNull();
+ assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).isNotNull();
// R1 should have 1 CATALOG_ROLE_USAGE privilege granted *on* it to PR1
-
Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).hasSize(1);
+ assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).hasSize(1);
gr = cacheEntry_R1.getGrantRecordsAsSecurable().get(0);
-
Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_R1.getEntity().getId());
- Assertions.assertThat(gr.getSecurableCatalogId())
- .isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
-
Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_PR1.getEntity().getId());
- Assertions.assertThat(gr.getGranteeCatalogId())
- .isEqualTo(cacheEntry_PR1.getEntity().getCatalogId());
- Assertions.assertThat(gr.getPrivilegeCode())
- .isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode());
+
assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_R1.getEntity().getId());
+
assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId());
+
assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_PR1.getEntity().getId());
+
assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_PR1.getEntity().getCatalogId());
+
assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode());
// PR1 should have 1 grant on it to P1.
-
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).hasSize(1);
-
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable().get(0).getPrivilegeCode())
+ assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).hasSize(1);
+
assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable().get(0).getPrivilegeCode())
.isEqualTo(PolarisPrivilege.PRINCIPAL_ROLE_USAGE.getCode());
// PR1 should have 2 grants to it, on R1 and R2
-
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).hasSize(2);
-
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(0).getPrivilegeCode())
+ assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).hasSize(2);
+
assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(0).getPrivilegeCode())
.isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode());
-
Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(1).getPrivilegeCode())
+
assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(1).getPrivilegeCode())
.isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode());
}
@@ -274,14 +280,14 @@ public class InMemoryEntityCacheTest {
EntityCacheLookupResult lookup =
cache.getOrLoadEntityByName(
this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG,
"test"));
- Assertions.assertThat(lookup).isNotNull();
- Assertions.assertThat(lookup.isCacheHit()).isFalse();
+ assertThat(lookup).isNotNull();
+ assertThat(lookup.isCacheHit()).isFalse();
// the catalog
- Assertions.assertThat(lookup.getCacheEntry()).isNotNull();
+ assertThat(lookup.getCacheEntry()).isNotNull();
PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
- Assertions.assertThat(catalog).isNotNull();
-
Assertions.assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG);
+ assertThat(catalog).isNotNull();
+ assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG);
// find table N5/N6/T6
PolarisBaseEntity N5 =
@@ -294,23 +300,23 @@ public class InMemoryEntityCacheTest {
PolarisEntityType.TABLE_LIKE,
PolarisEntitySubType.ICEBERG_TABLE,
"T6");
- Assertions.assertThat(T6v1).isNotNull();
+ assertThat(T6v1).isNotNull();
// that table is not in the cache
ResolvedPolarisEntity cacheEntry = cache.getEntityById(T6v1.getId());
- Assertions.assertThat(cacheEntry).isNull();
+ assertThat(cacheEntry).isNull();
// now load that table in the cache
cacheEntry =
cache.getAndRefreshIfNeeded(
this.callCtx, T6v1, T6v1.getEntityVersion(),
T6v1.getGrantRecordsVersion());
- Assertions.assertThat(cacheEntry).isNotNull();
- Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
- Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+ assertThat(cacheEntry).isNotNull();
+ assertThat(cacheEntry.getEntity()).isNotNull();
+ assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
PolarisBaseEntity table = cacheEntry.getEntity();
- Assertions.assertThat(table.getId()).isEqualTo(T6v1.getId());
-
Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
-
Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion());
+ assertThat(table.getId()).isEqualTo(T6v1.getId());
+ assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
+
assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion());
// update the entity
PolarisBaseEntity T6v2 =
@@ -319,31 +325,31 @@ public class InMemoryEntityCacheTest {
T6v1,
"{\"v2_properties\": \"some value\"}",
"{\"v2_internal_properties\": \"internal value\"}");
- Assertions.assertThat(T6v2).isNotNull();
+ assertThat(T6v2).isNotNull();
// now refresh that entity. But because we don't change the versions,
nothing should be reloaded
cacheEntry =
cache.getAndRefreshIfNeeded(
this.callCtx, T6v1, T6v1.getEntityVersion(),
T6v1.getGrantRecordsVersion());
- Assertions.assertThat(cacheEntry).isNotNull();
- Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
- Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+ assertThat(cacheEntry).isNotNull();
+ assertThat(cacheEntry.getEntity()).isNotNull();
+ assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
table = cacheEntry.getEntity();
- Assertions.assertThat(table.getId()).isEqualTo(T6v1.getId());
-
Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
-
Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion());
+ assertThat(table.getId()).isEqualTo(T6v1.getId());
+ assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
+
assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion());
// now refresh again, this time with the new versions. Should be reloaded
cacheEntry =
cache.getAndRefreshIfNeeded(
this.callCtx, T6v2, T6v2.getEntityVersion(),
T6v2.getGrantRecordsVersion());
- Assertions.assertThat(cacheEntry).isNotNull();
- Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
- Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+ assertThat(cacheEntry).isNotNull();
+ assertThat(cacheEntry.getEntity()).isNotNull();
+ assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
table = cacheEntry.getEntity();
- Assertions.assertThat(table.getId()).isEqualTo(T6v2.getId());
-
Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v2.getEntityVersion());
-
Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v2.getGrantRecordsVersion());
+ assertThat(table.getId()).isEqualTo(T6v2.getId());
+ assertThat(table.getEntityVersion()).isEqualTo(T6v2.getEntityVersion());
+
assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v2.getGrantRecordsVersion());
// update it again
PolarisBaseEntity T6v3 =
@@ -352,7 +358,7 @@ public class InMemoryEntityCacheTest {
T6v2,
"{\"v3_properties\": \"some value\"}",
"{\"v3_internal_properties\": \"internal value\"}");
- Assertions.assertThat(T6v3).isNotNull();
+ assertThat(T6v3).isNotNull();
// the two catalog roles
PolarisBaseEntity R1 =
@@ -368,9 +374,9 @@ public class InMemoryEntityCacheTest {
this.callCtx, N2, N2.getEntityVersion(),
N2.getGrantRecordsVersion());
// should have one single grant
- Assertions.assertThat(cacheEntry).isNotNull();
- Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
- Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1);
+ assertThat(cacheEntry).isNotNull();
+ assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+ assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1);
// perform an additional grant to R1
this.tm.grantPrivilege(R1, List.of(catalog, N1), N2,
PolarisPrivilege.NAMESPACE_FULL_METADATA);
@@ -380,8 +386,8 @@ public class InMemoryEntityCacheTest {
this.tm.ensureExistsByName(List.of(catalog, N1),
PolarisEntityType.NAMESPACE, "N2");
// same entity version but different grant records
- Assertions.assertThat(N2v2).isNotNull();
-
Assertions.assertThat(N2v2.getGrantRecordsVersion()).isEqualTo(N2.getGrantRecordsVersion()
+ 1);
+ assertThat(N2v2).isNotNull();
+
assertThat(N2v2.getGrantRecordsVersion()).isEqualTo(N2.getGrantRecordsVersion()
+ 1);
// the cache is outdated now
lookup =
@@ -389,24 +395,24 @@ public class InMemoryEntityCacheTest {
this.callCtx,
new EntityCacheByNameKey(
catalog.getId(), N1.getId(), PolarisEntityType.NAMESPACE,
"N2"));
- Assertions.assertThat(lookup).isNotNull();
+ assertThat(lookup).isNotNull();
cacheEntry = lookup.getCacheEntry();
- Assertions.assertThat(cacheEntry).isNotNull();
- Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
- Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
- Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1);
- Assertions.assertThat(cacheEntry.getEntity().getGrantRecordsVersion())
+ assertThat(cacheEntry).isNotNull();
+ assertThat(cacheEntry.getEntity()).isNotNull();
+ assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+ assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1);
+ assertThat(cacheEntry.getEntity().getGrantRecordsVersion())
.isEqualTo(N2.getGrantRecordsVersion());
// now refresh
cacheEntry =
cache.getAndRefreshIfNeeded(
this.callCtx, N2, N2v2.getEntityVersion(),
N2v2.getGrantRecordsVersion());
- Assertions.assertThat(cacheEntry).isNotNull();
- Assertions.assertThat(cacheEntry.getEntity()).isNotNull();
- Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
- Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(2);
- Assertions.assertThat(cacheEntry.getEntity().getGrantRecordsVersion())
+ assertThat(cacheEntry).isNotNull();
+ assertThat(cacheEntry.getEntity()).isNotNull();
+ assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull();
+ assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(2);
+ assertThat(cacheEntry.getEntity().getGrantRecordsVersion())
.isEqualTo(N2v2.getGrantRecordsVersion());
}
@@ -418,23 +424,23 @@ public class InMemoryEntityCacheTest {
EntityCacheLookupResult lookup =
cache.getOrLoadEntityByName(
this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG,
"test"));
- Assertions.assertThat(lookup).isNotNull();
- Assertions.assertThat(lookup.getCacheEntry()).isNotNull();
+ assertThat(lookup).isNotNull();
+ assertThat(lookup.getCacheEntry()).isNotNull();
PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
PolarisBaseEntity N1 =
this.tm.ensureExistsByName(List.of(catalog),
PolarisEntityType.NAMESPACE, "N1");
lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(),
N1.getId(), N1.getType());
- Assertions.assertThat(lookup).isNotNull();
+ assertThat(lookup).isNotNull();
EntityCacheByNameKey T4_name =
new EntityCacheByNameKey(N1.getCatalogId(), N1.getId(),
PolarisEntityType.TABLE_LIKE, "T4");
lookup = cache.getOrLoadEntityByName(callCtx, T4_name);
- Assertions.assertThat(lookup).isNotNull();
+ assertThat(lookup).isNotNull();
ResolvedPolarisEntity cacheEntry_T4 = lookup.getCacheEntry();
- Assertions.assertThat(cacheEntry_T4).isNotNull();
- Assertions.assertThat(cacheEntry_T4.getEntity()).isNotNull();
-
Assertions.assertThat(cacheEntry_T4.getGrantRecordsAsSecurable()).isNotNull();
+ assertThat(cacheEntry_T4).isNotNull();
+ assertThat(cacheEntry_T4.getEntity()).isNotNull();
+ assertThat(cacheEntry_T4.getGrantRecordsAsSecurable()).isNotNull();
PolarisBaseEntity T4 = cacheEntry_T4.getEntity();
@@ -445,18 +451,17 @@ public class InMemoryEntityCacheTest {
new EntityCacheByNameKey(
N1.getCatalogId(), N1.getId(), PolarisEntityType.TABLE_LIKE,
"T4_renamed");
lookup = cache.getOrLoadEntityByName(callCtx, T4_renamed);
- Assertions.assertThat(lookup).isNotNull();
+ assertThat(lookup).isNotNull();
ResolvedPolarisEntity cacheEntry_T4_renamed = lookup.getCacheEntry();
- Assertions.assertThat(cacheEntry_T4_renamed).isNotNull();
+ assertThat(cacheEntry_T4_renamed).isNotNull();
PolarisBaseEntity T4_renamed_entity = cacheEntry_T4_renamed.getEntity();
// new entry if lookup by id
EntityCacheLookupResult lookupResult =
cache.getOrLoadEntityById(callCtx, T4.getCatalogId(), T4.getId(),
T4.getType());
- Assertions.assertThat(lookupResult).isNotNull();
- Assertions.assertThat(lookupResult.getCacheEntry()).isNotNull();
- Assertions.assertThat(lookupResult.getCacheEntry().getEntity().getName())
- .isEqualTo("T4_renamed");
+ assertThat(lookupResult).isNotNull();
+ assertThat(lookupResult.getCacheEntry()).isNotNull();
+
assertThat(lookupResult.getCacheEntry().getEntity().getName()).isEqualTo("T4_renamed");
// old name is gone, replaced by new name
// Assertions.assertNull(cache.getOrLoadEntityByName(callCtx, T4_name));
@@ -469,7 +474,7 @@ public class InMemoryEntityCacheTest {
T4_renamed_entity.getGrantRecordsVersion());
// now the loading by the old name should return null
- Assertions.assertThat(cache.getOrLoadEntityByName(callCtx,
T4_name)).isNull();
+ assertThat(cache.getOrLoadEntityByName(callCtx, T4_name)).isNull();
}
/* Helper for `testEntityWeigher` */
@@ -495,7 +500,584 @@ public class InMemoryEntityCacheTest {
.setMetadataLocation("a".repeat(1000 * 1000))
.build();
-
Assertions.assertThat(getEntityWeight(smallEntity)).isLessThan(getEntityWeight(mediumEntity));
-
Assertions.assertThat(getEntityWeight(mediumEntity)).isLessThan(getEntityWeight(largeEntity));
+
assertThat(getEntityWeight(smallEntity)).isLessThan(getEntityWeight(mediumEntity));
+
assertThat(getEntityWeight(mediumEntity)).isLessThan(getEntityWeight(largeEntity));
+ }
+
+ @Test
+ public void testBatchLoadByEntityIds() {
+ // get a new cache
+ InMemoryEntityCache cache = this.allocateNewCache();
+
+ // Load catalog into cache
+ EntityCacheLookupResult lookup =
+ cache.getOrLoadEntityByName(
+ this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG,
"test"));
+ assertThat(lookup).isNotNull();
+ PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+ // Get some entities that exist in the test setup
+ PolarisBaseEntity N1 =
+ this.tm.ensureExistsByName(List.of(catalog),
PolarisEntityType.NAMESPACE, "N1");
+ PolarisBaseEntity N5 =
+ this.tm.ensureExistsByName(List.of(catalog),
PolarisEntityType.NAMESPACE, "N5");
+ PolarisBaseEntity N5_N6 =
+ this.tm.ensureExistsByName(List.of(catalog, N5),
PolarisEntityType.NAMESPACE, "N6");
+
+ // Pre-load N1 into cache
+ cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), N1.getId(),
N1.getType());
+
+ // Create list of entity IDs - N1 is already cached, N5, N5_N6 are not
+ List<PolarisEntityId> entityIds =
+ List.of(getPolarisEntityId(N1), getPolarisEntityId(N5),
getPolarisEntityId(N5_N6));
+
+ // Test batch loading by entity IDs (all are namespaces)
+ List<EntityCacheLookupResult> results =
+ cache.getOrLoadResolvedEntities(this.callCtx,
PolarisEntityType.NAMESPACE, entityIds);
+
+ // Verify all entities were found
+ assertThat(results).hasSize(3);
+ assertThat(results.get(0)).isNotNull(); // N1 - was cached
+ assertThat(results.get(1)).isNotNull(); // N5 - was loaded
+ assertThat(results.get(2)).isNotNull(); // N5_N6 - was loaded
+
+ // Verify the entities are correct
+
assertThat(results.get(0).getCacheEntry().getEntity().getId()).isEqualTo(N1.getId());
+
assertThat(results.get(1).getCacheEntry().getEntity().getId()).isEqualTo(N5.getId());
+
assertThat(results.get(2).getCacheEntry().getEntity().getId()).isEqualTo(N5_N6.getId());
+
+ // All should be cache hits now since they were loaded in the previous call
+ assertThat(results.get(0).isCacheHit()).isTrue();
+ assertThat(results.get(1).isCacheHit()).isTrue();
+ assertThat(results.get(2).isCacheHit()).isTrue();
+
+ // Test with a non-existent entity ID
+ List<PolarisEntityId> nonExistentIds =
+ List.of(new PolarisEntityId(catalog.getCatalogId(), 99999L));
+ List<EntityCacheLookupResult> nonExistentResults =
+ cache.getOrLoadResolvedEntities(this.callCtx,
PolarisEntityType.NAMESPACE, nonExistentIds);
+
+ assertThat(nonExistentResults).hasSize(1);
+ assertThat(nonExistentResults.get(0)).isNull();
+
+ // Test with table entities separately
+ PolarisBaseEntity T6 =
+ this.tm.ensureExistsByName(
+ List.of(catalog, N5, N5_N6),
+ PolarisEntityType.TABLE_LIKE,
+ PolarisEntitySubType.ICEBERG_TABLE,
+ "T6");
+
+ List<PolarisEntityId> tableIds = List.of(getPolarisEntityId(T6));
+
+ List<EntityCacheLookupResult> tableResults =
+ cache.getOrLoadResolvedEntities(this.callCtx,
PolarisEntityType.TABLE_LIKE, tableIds);
+
+ assertThat(tableResults).hasSize(1);
+ assertThat(tableResults.get(0)).isNotNull();
+
assertThat(tableResults.get(0).getCacheEntry().getEntity().getId()).isEqualTo(T6.getId());
+ }
+
+ @Test
+ public void testBatchLoadWithStaleVersions() {
+ // get a new cache
+ InMemoryEntityCache cache = this.allocateNewCache();
+
+ // Load catalog into cache
+ EntityCacheLookupResult lookup =
+ cache.getOrLoadEntityByName(
+ this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG,
"test"));
+ assertThat(lookup).isNotNull();
+ PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+ // Get table T6 that we can update
+ PolarisBaseEntity N5 =
+ this.tm.ensureExistsByName(List.of(catalog),
PolarisEntityType.NAMESPACE, "N5");
+ PolarisBaseEntity N5_N6 =
+ this.tm.ensureExistsByName(List.of(catalog, N5),
PolarisEntityType.NAMESPACE, "N6");
+ PolarisBaseEntity T6v1 =
+ this.tm.ensureExistsByName(
+ List.of(catalog, N5, N5_N6),
+ PolarisEntityType.TABLE_LIKE,
+ PolarisEntitySubType.ICEBERG_TABLE,
+ "T6");
+
+ // Load T6 into cache initially
+ cache.getOrLoadEntityById(this.callCtx, T6v1.getCatalogId(), T6v1.getId(),
T6v1.getType());
+
+ // Verify it's in cache with original version
+ ResolvedPolarisEntity cachedT6 = cache.getEntityById(T6v1.getId());
+ assertThat(cachedT6).isNotNull();
+
assertThat(cachedT6.getEntity().getEntityVersion()).isEqualTo(T6v1.getEntityVersion());
+
+ // Update the entity to create a new version
+ PolarisBaseEntity T6v2 =
+ this.tm.updateEntity(
+ List.of(catalog, N5, N5_N6),
+ T6v1,
+ "{\"v2_properties\": \"some value\"}",
+ "{\"v2_internal_properties\": \"internal value\"}");
+ assertThat(T6v2).isNotNull();
+ assertThat(T6v2.getEntityVersion()).isGreaterThan(T6v1.getEntityVersion());
+
+ List<EntityCacheLookupResult> results;
+ // Create entity ID list with the updated entity
+ List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(T6v2));
+
+ // Call batch load - this should detect the stale version and reload
+ results =
+ cache.getOrLoadResolvedEntities(this.callCtx,
PolarisEntityType.TABLE_LIKE, entityIds);
+
+ // Verify the entity was reloaded with the new version
+ assertThat(results).hasSize(1);
+ assertThat(results.get(0)).isNotNull();
+
+ ResolvedPolarisEntity reloadedT6 = results.get(0).getCacheEntry();
+ assertThat(reloadedT6.getEntity().getId()).isEqualTo(T6v2.getId());
+
assertThat(reloadedT6.getEntity().getEntityVersion()).isEqualTo(T6v2.getEntityVersion());
+
+ // Verify the cache now contains the updated version
+ cachedT6 = cache.getEntityById(T6v2.getId());
+ assertThat(cachedT6).isNotNull();
+
assertThat(cachedT6.getEntity().getEntityVersion()).isEqualTo(T6v2.getEntityVersion());
+ }
+
+ @Test
+ public void testBatchLoadWithStaleGrantVersions() {
+ // get a new cache
+ InMemoryEntityCache cache = this.allocateNewCache();
+
+ // Load catalog into cache
+ EntityCacheLookupResult lookup =
+ cache.getOrLoadEntityByName(
+ this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG,
"test"));
+ assertThat(lookup).isNotNull();
+ PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+ // Get entities we'll work with
+ PolarisBaseEntity N1 =
+ this.tm.ensureExistsByName(List.of(catalog),
PolarisEntityType.NAMESPACE, "N1");
+ PolarisBaseEntity N2 =
+ this.tm.ensureExistsByName(List.of(catalog, N1),
PolarisEntityType.NAMESPACE, "N2");
+ PolarisBaseEntity R1 =
+ this.tm.ensureExistsByName(List.of(catalog),
PolarisEntityType.CATALOG_ROLE, "R1");
+
+ // Load N2 into cache initially
+ cache.getOrLoadEntityByName(
+ this.callCtx,
+ new EntityCacheByNameKey(catalog.getId(), N1.getId(),
PolarisEntityType.NAMESPACE, "N2"));
+
+ // Verify it's in cache with original grant version
+ ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId());
+ assertThat(cachedN2).isNotNull();
+ int originalGrantVersion = cachedN2.getEntity().getGrantRecordsVersion();
+
+ // Grant additional privilege to change grant version
+ this.tm.grantPrivilege(R1, List.of(catalog, N1), N2,
PolarisPrivilege.NAMESPACE_FULL_METADATA);
+
+ // Get the updated entity with new grant version
+ PolarisBaseEntity N2Updated =
+ this.tm.ensureExistsByName(List.of(catalog, N1),
PolarisEntityType.NAMESPACE, "N2");
+
assertThat(N2Updated.getGrantRecordsVersion()).isGreaterThan(originalGrantVersion);
+
+ // Create entity ID list
+ List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(N2Updated));
+
+ // Call batch load - this should detect the stale grant version and reload
+ List<EntityCacheLookupResult> results =
+ cache.getOrLoadResolvedEntities(this.callCtx,
PolarisEntityType.NAMESPACE, entityIds);
+
+ // Verify the entity was reloaded with the new grant version
+ assertThat(results).hasSize(1);
+ assertThat(results.get(0)).isNotNull();
+
+ ResolvedPolarisEntity reloadedN2 = results.get(0).getCacheEntry();
+ assertThat(reloadedN2.getEntity().getId()).isEqualTo(N2Updated.getId());
+ assertThat(reloadedN2.getEntity().getGrantRecordsVersion())
+ .isEqualTo(N2Updated.getGrantRecordsVersion());
+
+ // Should now have more grant records
+
assertThat(reloadedN2.getGrantRecordsAsSecurable().size()).isGreaterThan(1);
+
+ // Verify the cache now contains the updated grant version
+ cachedN2 = cache.getEntityById(N2Updated.getId());
+ assertThat(cachedN2).isNotNull();
+ assertThat(cachedN2.getEntity().getGrantRecordsVersion())
+ .isEqualTo(N2Updated.getGrantRecordsVersion());
+ }
+
+ @Test
+ public void testBatchLoadVersionRetryLogic() {
+ // get a new cache
+ PolarisMetaStoreManager metaStoreManager =
Mockito.spy(this.metaStoreManager);
+ InMemoryEntityCache cache =
+ new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(),
metaStoreManager);
+
+ // Load catalog into cache
+ EntityCacheLookupResult lookup =
+ cache.getOrLoadEntityByName(
+ this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG,
"test"));
+ assertThat(lookup).isNotNull();
+ PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+ // Get entities that we can work with
+ PolarisBaseEntity N1 =
+ this.tm.ensureExistsByName(List.of(catalog),
PolarisEntityType.NAMESPACE, "N1");
+ PolarisBaseEntity N2 =
+ this.tm.ensureExistsByName(List.of(catalog, N1),
PolarisEntityType.NAMESPACE, "N2");
+
+ // Load N2 into cache initially
+ cache.getOrLoadEntityByName(
+ this.callCtx,
+ new EntityCacheByNameKey(catalog.getId(), N1.getId(),
PolarisEntityType.NAMESPACE, "N2"));
+
+ // Verify it's in cache with original version
+ ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId());
+ assertThat(cachedN2).isNotNull();
+ int originalEntityVersion = cachedN2.getEntity().getEntityVersion();
+
+ // Update the entity multiple times to create version skew
+ PolarisBaseEntity N2v2 =
+ this.tm.updateEntity(List.of(catalog, N1), N2, "{\"v2\": \"value\"}",
null);
+
+ // the first call should return the first version, then we call the real
method to get the
+ // latest
+ Mockito.doReturn(
+ new ChangeTrackingResult(
+ List.of(
+ changeTrackingFor(catalog), changeTrackingFor(N1),
changeTrackingFor(N2v2))))
+ .when(metaStoreManager)
+ .loadEntitiesChangeTracking(Mockito.any(), Mockito.any());
+ Mockito.doCallRealMethod()
+ .when(metaStoreManager)
+ .loadEntitiesChangeTracking(Mockito.any(), Mockito.any());
+
+ // update again to create v3, which isn't returned in the change tracking
result
+ PolarisBaseEntity N2v3 =
+ this.tm.updateEntity(List.of(catalog, N1), N2v2, "{\"v3\":
\"value\"}", null);
+
+ // Verify versions increased
+ assertThat(N2v2.getEntityVersion()).isGreaterThan(originalEntityVersion);
+ assertThat(N2v3.getEntityVersion()).isGreaterThan(N2v2.getEntityVersion());
+
+ // Create entity ID list
+ List<PolarisEntityId> entityIds =
+ List.of(getPolarisEntityId(catalog), getPolarisEntityId(N1),
getPolarisEntityId(N2));
+
+ // Call batch load - this should detect the stale versions and reload
until consistent
+ List<EntityCacheLookupResult> results =
+ cache.getOrLoadResolvedEntities(this.callCtx,
PolarisEntityType.NAMESPACE, entityIds);
+
+ // Verify the entity was reloaded with the latest version
+ assertThat(results).hasSize(3);
+ assertThat(results)
+ .doesNotContainNull()
+ .extracting(EntityCacheLookupResult::getCacheEntry)
+ .doesNotContainNull()
+ .extracting(e -> e.getEntity().getId())
+ .containsExactly(catalog.getId(), N1.getId(), N2.getId());
+
+ ResolvedPolarisEntity reloadedN2 = results.get(2).getCacheEntry();
+ assertThat(reloadedN2.getEntity().getId()).isEqualTo(N2v3.getId());
+
assertThat(reloadedN2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion());
+
+ // Verify the cache now contains the latest version
+ cachedN2 = cache.getEntityById(N2v3.getId());
+ assertThat(cachedN2).isNotNull();
+
assertThat(cachedN2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion());
+ }
+
+ @Test
+ public void testBatchLoadVersionRetryFailsAfterMaxAttempts() {
+ // get a new cache
+ PolarisMetaStoreManager metaStoreManager =
Mockito.spy(this.metaStoreManager);
+ InMemoryEntityCache cache =
+ new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(),
metaStoreManager);
+
+ // Load catalog into cache
+ EntityCacheLookupResult lookup =
+ cache.getOrLoadEntityByName(
+ this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG,
"test"));
+ assertThat(lookup).isNotNull();
+ PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+ // Get entities that we can work with
+ PolarisBaseEntity N1 =
+ this.tm.ensureExistsByName(List.of(catalog),
PolarisEntityType.NAMESPACE, "N1");
+ PolarisBaseEntity N2 =
+ this.tm.ensureExistsByName(List.of(catalog, N1),
PolarisEntityType.NAMESPACE, "N2");
+
+ // Load N2 into cache initially
+ cache.getOrLoadEntityByName(
+ this.callCtx,
+ new EntityCacheByNameKey(catalog.getId(), N1.getId(),
PolarisEntityType.NAMESPACE, "N2"));
+
+ // Verify it's in cache with original version
+ ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId());
+ assertThat(cachedN2).isNotNull();
+ int originalEntityVersion = cachedN2.getEntity().getEntityVersion();
+
+ // Update the entity multiple times to create version skew
+ PolarisBaseEntity N2v2 =
+ this.tm.updateEntity(List.of(catalog, N1), N2, "{\"v2\": \"value\"}",
null);
+
+ // return v2 for the change tracking
+ Mockito.doReturn(
+ new ChangeTrackingResult(
+ List.of(
+ changeTrackingFor(catalog), changeTrackingFor(N1),
changeTrackingFor(N2v2))))
+ .when(metaStoreManager)
+ .loadEntitiesChangeTracking(Mockito.any(), Mockito.any());
+
+ // update again to create v3, which isn't returned in the change tracking
result
+ PolarisBaseEntity N2v3 =
+ this.tm.updateEntity(List.of(catalog, N1), N2v2, "{\"v3\":
\"value\"}", null);
+
+ // Verify versions increased
+ assertThat(N2v2.getEntityVersion()).isGreaterThan(originalEntityVersion);
+ assertThat(N2v3.getEntityVersion()).isGreaterThan(N2v2.getEntityVersion());
+
+ // Create entity ID list
+ List<PolarisEntityId> entityIds =
+ List.of(getPolarisEntityId(catalog), getPolarisEntityId(N1),
getPolarisEntityId(N2));
+ Assertions.assertThatThrownBy(
+ () ->
+ cache.getOrLoadResolvedEntities(
+ this.callCtx, PolarisEntityType.NAMESPACE, entityIds))
+ .isInstanceOf(RuntimeException.class);
+ }
+
+ private static PolarisEntityId getPolarisEntityId(PolarisBaseEntity catalog)
{
+ return new PolarisEntityId(catalog.getCatalogId(), catalog.getId());
+ }
+
+ @Test
+ public void testConcurrentClientLoadingBehavior() throws Exception {
+ // Load catalog into cache
+ EntityCacheLookupResult lookup =
+ allocateNewCache()
+ .getOrLoadEntityByName(
+ this.callCtx, new
EntityCacheByNameKey(PolarisEntityType.CATALOG, "test"));
+ assertThat(lookup).isNotNull();
+ PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+ // Get multiple entities to create a larger list for concurrent processing
+ PolarisBaseEntity N1 =
+ this.tm.ensureExistsByName(List.of(catalog),
PolarisEntityType.NAMESPACE, "N1");
+ PolarisBaseEntity N2 =
+ this.tm.ensureExistsByName(List.of(catalog, N1),
PolarisEntityType.NAMESPACE, "N2");
+ PolarisBaseEntity N3 =
+ this.tm.ensureExistsByName(List.of(catalog, N1),
PolarisEntityType.NAMESPACE, "N3");
+ PolarisBaseEntity N5 =
+ this.tm.ensureExistsByName(List.of(catalog),
PolarisEntityType.NAMESPACE, "N5");
+ PolarisBaseEntity N5_N6 =
+ this.tm.ensureExistsByName(List.of(catalog, N5),
PolarisEntityType.NAMESPACE, "N6");
+
+ // Create entity IDs list for both clients
+ List<PolarisEntityId> entityIds =
+ List.of(
+ getPolarisEntityId(N1),
+ getPolarisEntityId(N2),
+ getPolarisEntityId(N3),
+ getPolarisEntityId(N5),
+ getPolarisEntityId(N5_N6));
+
+ // Update one of the entities to create version differences
+ PolarisBaseEntity N2v2 =
+ this.tm.updateEntity(
+ List.of(catalog, N1), N2, "{\"concurrent_test\":
\"client1_version\"}", null);
+ PolarisBaseEntity N2v3 =
+ this.tm.updateEntity(
+ List.of(catalog, N1), N2v2, "{\"concurrent_test\":
\"client2_version\"}", null);
+
+ // Mock the metastore manager to control the timing of method calls
+ PolarisMetaStoreManager mockedMetaStoreManager =
Mockito.spy(this.metaStoreManager);
+
+ // Create caches with the mocked metastore manager
+ InMemoryEntityCache cache =
+ new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(),
mockedMetaStoreManager);
+
+ // Synchronization primitives for controlling execution order
+ CountDownLatch client1ChangeTrackingResult = new CountDownLatch(1);
+ Semaphore client1ResolvedEntitiesBlocker = new Semaphore(0);
+
+ // Atomic references to capture results from both threads
+ AtomicReference<Exception> client1Exception = new AtomicReference<>();
+ AtomicReference<Exception> client2Exception = new AtomicReference<>();
+
+ // Configure mock behavior:
+ // 1. Allow both threads to call loadEntitiesChangeTracking() with
different versions
+ // 2. Block client1's loadResolvedEntities() until client2's
loadResolvedEntities() completes
+
+ // Mock loadEntitiesChangeTracking to return different versions for each
client
+ Mockito.doAnswer(
+ invocation -> {
+ // First call (client1) - returns older version for N2
+ LOGGER.debug("Returning change tracking for client1");
+ return new ChangeTrackingResult(
+ List.of(
+ changeTrackingFor(N1),
+ changeTrackingFor(N2v2), // older version
+ changeTrackingFor(N3),
+ changeTrackingFor(N5),
+ changeTrackingFor(N5_N6)));
+ })
+ .doAnswer(
+ invocation -> {
+ // Second call (client2) - returns newer version for N2
+ LOGGER.debug("Returning change tracking for client2");
+ return new ChangeTrackingResult(
+ List.of(
+ changeTrackingFor(N1),
+ changeTrackingFor(N2v3), // newer version
+ changeTrackingFor(N3),
+ changeTrackingFor(N5),
+ changeTrackingFor(N5_N6)));
+ })
+ .when(mockedMetaStoreManager)
+ .loadEntitiesChangeTracking(Mockito.any(), Mockito.any());
+
+ // Mock loadResolvedEntities to control timing - client1 blocks until
client2 completes
+ // client1 receives the older version of all entities, while client2
receives the newer version
+ // of N2
+ Answer client1Answer =
+ invocation -> {
+ // This is client1's loadResolvedEntities call - block until client2
completes
+ try {
+ client1ChangeTrackingResult.countDown();
+ LOGGER.debug("Awaiting client2 to complete resolved entities
load");
+ client1ResolvedEntitiesBlocker.acquire(); // Block until client2
signals completion
+ List<ResolvedPolarisEntity> resolvedEntities =
+ List.of(
+ getResolvedPolarisEntity(N1),
+ getResolvedPolarisEntity(N2v2),
+ getResolvedPolarisEntity(N3),
+ getResolvedPolarisEntity(N5),
+ getResolvedPolarisEntity(N5_N6));
+ LOGGER.debug("Client1 returning results {}", resolvedEntities);
+ return new ResolvedEntitiesResult(resolvedEntities);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ };
+ Answer client2Answer =
+ invocation -> {
+ // This is client2's loadResolvedEntities call - execute normally
and signal client1
+ try {
+ LOGGER.debug("Client2 loading resolved entities");
+ var result =
+ new ResolvedEntitiesResult(
+ List.of(
+ getResolvedPolarisEntity(N1),
+ getResolvedPolarisEntity(N2v3),
+ getResolvedPolarisEntity(N3),
+ getResolvedPolarisEntity(N5),
+ getResolvedPolarisEntity(N5_N6)));
+ client1ResolvedEntitiesBlocker.release(); // Allow client1 to
proceed
+ LOGGER.debug("Client2 returning results {}",
result.getResolvedEntities());
+ return result;
+ } catch (Exception e) {
+ client1ResolvedEntitiesBlocker.release(); // Release in case of
error
+ throw e;
+ }
+ };
+ Mockito.doAnswer(client1Answer)
+ .doAnswer(client2Answer)
+ .when(mockedMetaStoreManager)
+ .loadResolvedEntities(Mockito.any(), Mockito.any(), Mockito.any());
+
+ // ExecutorService isn't AutoCloseable in JDK 11 :(
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ try {
+ // Client 1 task - should get older version and be blocked during
loadResolvedEntities
+ Future<List<EntityCacheLookupResult>> client1Task =
+ executorService.submit(
+ () -> {
+ try {
+ // Client1 calls getOrLoadResolvedEntities
+ // - loadEntitiesChangeTracking returns older version for N2
+ // - loadResolvedEntities will block until client2 completes
+ return cache.getOrLoadResolvedEntities(
+ this.callCtx, PolarisEntityType.NAMESPACE, entityIds);
+ } catch (Exception e) {
+ client1Exception.set(e);
+ return null;
+ }
+ });
+
+ // Client 2 task - should get newer version and complete first
+ Future<List<EntityCacheLookupResult>> client2Task =
+ executorService.submit(
+ () -> {
+ try {
+ // Client2 calls getOrLoadResolvedEntities
+ // - loadEntitiesChangeTracking returns newer version for N2
+ // - loadResolvedEntities executes normally and signals
client1 when done
+ client1ChangeTrackingResult.await();
+ return cache.getOrLoadResolvedEntities(
+ this.callCtx, PolarisEntityType.NAMESPACE, entityIds);
+ } catch (Exception e) {
+ client2Exception.set(e);
+ client1ResolvedEntitiesBlocker.release(); // Release in case
of error
+ return null;
+ }
+ });
+
+ // Wait for both tasks to complete
+ List<EntityCacheLookupResult> client1Results = client1Task.get();
+ List<EntityCacheLookupResult> client2Results = client2Task.get();
+
+ // Verify no exceptions occurred
+ assertThat(client1Exception.get()).isNull();
+ assertThat(client2Exception.get()).isNull();
+
+ // Verify both clients got results
+ assertThat(client1Results).isNotNull();
+ assertThat(client2Results).isNotNull();
+ assertThat(client1Results).hasSize(5);
+ assertThat(client2Results).hasSize(5);
+
+ // All entities should be found
+ assertThat(client1Results).doesNotContainNull();
+ assertThat(client2Results).doesNotContainNull();
+
+ // Verify that client1 got the older version of N2 (index 1 in the list)
+ ResolvedPolarisEntity client1N2 = client1Results.get(1).getCacheEntry();
+ assertThat(client1N2.getEntity().getId()).isEqualTo(N2.getId());
+
assertThat(client1N2.getEntity().getEntityVersion()).isEqualTo(N2v2.getEntityVersion());
+
+ // Verify that client2 got the newer version of N2
+ ResolvedPolarisEntity client2N2 = client2Results.get(1).getCacheEntry();
+ assertThat(client2N2.getEntity().getId()).isEqualTo(N2.getId());
+
assertThat(client2N2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion());
+
+ // Verify that both clients got consistent versions for other entities
+ for (int i = 0; i < 5; i++) {
+ if (i != 1) { // Skip N2 which we expect to be different
+ ResolvedPolarisEntity client1Entity =
client1Results.get(i).getCacheEntry();
+ ResolvedPolarisEntity client2Entity =
client2Results.get(i).getCacheEntry();
+
+ assertThat(client1Entity.getEntity().getId())
+ .isEqualTo(client2Entity.getEntity().getId());
+ assertThat(client1Entity.getEntity().getEntityVersion())
+ .isEqualTo(client2Entity.getEntity().getEntityVersion());
+ assertThat(client1Entity.getEntity().getGrantRecordsVersion())
+ .isEqualTo(client2Entity.getEntity().getGrantRecordsVersion());
+ }
+ }
+ assertThat(entityIds).extracting(id ->
cache.getEntityById(id.getId())).doesNotContainNull();
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
+ private static ResolvedPolarisEntity
getResolvedPolarisEntity(PolarisBaseEntity catalog) {
+ return new ResolvedPolarisEntity(PolarisEntity.of(catalog), List.of(),
List.of());
+ }
+
+ private static PolarisChangeTrackingVersions
changeTrackingFor(PolarisBaseEntity entity) {
+ return new PolarisChangeTrackingVersions(
+ entity.getEntityVersion(), entity.getGrantRecordsVersion());
}
}
diff --git
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
index 43a0c5581..f8816260a 100644
---
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
+++
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java
@@ -118,7 +118,7 @@ public abstract class BasePolarisMetaStoreManagerTest {
.containsExactly(PolarisEntity.toCore(task1),
PolarisEntity.toCore(task2));
List<PolarisBaseEntity> listedEntities =
- metaStoreManager.loadEntitiesAll(
+ metaStoreManager.listFullEntitiesAll(
polarisTestMetaStoreManager.polarisCallContext,
null,
PolarisEntityType.TASK,
@@ -238,6 +238,12 @@ public abstract class BasePolarisMetaStoreManagerTest {
polarisTestMetaStoreManager.testLookup();
}
+ /** test batch entity load */
+ @Test
+ protected void testLoadResolvedEntitiesById() {
+ polarisTestMetaStoreManager.testLoadResolvedEntitiesById();
+ }
+
/** Test the set of functions for the entity cache */
@Test
protected void testEntityCache() {
diff --git
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java
index c8e486ee6..b32a4059d 100644
---
a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java
+++
b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/PolarisTestMetaStoreManager.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.polaris.core.PolarisCallContext;
@@ -48,6 +49,7 @@ import
org.apache.polaris.core.persistence.dao.entity.EntityResult;
import org.apache.polaris.core.persistence.dao.entity.LoadGrantsResult;
import org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult;
import org.apache.polaris.core.persistence.dao.entity.PolicyAttachmentResult;
+import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult;
import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult;
import org.apache.polaris.core.persistence.pagination.PageToken;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
@@ -55,6 +57,7 @@ import org.apache.polaris.core.policy.PolicyEntity;
import org.apache.polaris.core.policy.PolicyType;
import org.apache.polaris.core.policy.PredefinedPolicyTypes;
import org.assertj.core.api.Assertions;
+import org.assertj.core.api.InstanceOfAssertFactories;
/** Test the Polaris persistence layer */
public class PolarisTestMetaStoreManager {
@@ -650,6 +653,7 @@ public class PolarisTestMetaStoreManager {
.parentId(parentId)
.name(name)
.propertiesAsMap(properties)
+ .internalPropertiesAsMap(Map.of())
.build();
PolarisBaseEntity entity =
polarisMetaStoreManager
@@ -2685,6 +2689,114 @@ public class PolarisTestMetaStoreManager {
this.ensureNotExistsById(catalog.getId(), T1.getId(),
PolarisEntityType.NAMESPACE);
}
+ public void testLoadResolvedEntitiesById() {
+ // load all principals
+ List<EntityNameLookupRecord> principals =
+ polarisMetaStoreManager
+ .listEntities(
+ this.polarisCallContext,
+ null,
+ PolarisEntityType.PRINCIPAL,
+ PolarisEntitySubType.NULL_SUBTYPE,
+ PageToken.readEverything())
+ .getEntities();
+
+ // create new catalog
+ PolarisBaseEntity catalog =
+ new PolarisBaseEntity(
+ PolarisEntityConstants.getNullId(),
+
polarisMetaStoreManager.generateNewEntityId(this.polarisCallContext).getId(),
+ PolarisEntityType.CATALOG,
+ PolarisEntitySubType.NULL_SUBTYPE,
+ PolarisEntityConstants.getRootEntityId(),
+ "test");
+ CreateCatalogResult catalogCreated =
+ polarisMetaStoreManager.createCatalog(this.polarisCallContext,
catalog, List.of());
+ Assertions.assertThat(catalogCreated).isNotNull();
+
+ // load the catalog again, since the grant versions are different
+ catalog =
+ polarisMetaStoreManager
+ .loadEntity(
+ polarisCallContext,
+ 0L,
+ catalogCreated.getCatalog().getId(),
+ PolarisEntityType.CATALOG)
+ .getEntity();
+
+ // now create all objects
+ PolarisBaseEntity N1 = this.createEntity(List.of(catalog),
PolarisEntityType.NAMESPACE, "N1");
+ PolarisBaseEntity N1_N2 =
+ this.createEntity(List.of(catalog, N1), PolarisEntityType.NAMESPACE,
"N2");
+ PolarisBaseEntity T1 =
+ this.createEntity(
+ List.of(catalog, N1, N1_N2),
+ PolarisEntityType.TABLE_LIKE,
+ PolarisEntitySubType.ICEBERG_TABLE,
+ "T1");
+
+ // batch load all entities. They should all be present and non-null
+ ResolvedEntitiesResult entitiesResult =
+ polarisMetaStoreManager.loadResolvedEntities(
+ polarisCallContext,
+ PolarisEntityType.NAMESPACE,
+ List.of(
+ new PolarisEntityId(N1.getCatalogId(), N1.getId()),
+ new PolarisEntityId(N1_N2.getCatalogId(), N1_N2.getId())));
+ Assertions.assertThat(entitiesResult)
+ .isNotNull()
+ .returns(BaseResult.ReturnStatus.SUCCESS,
ResolvedEntitiesResult::getReturnStatus)
+ .extracting(
+ ResolvedEntitiesResult::getResolvedEntities,
+ InstanceOfAssertFactories.list(ResolvedPolarisEntity.class))
+ .hasSize(2)
+ .allSatisfy(entity -> Assertions.assertThat(entity).isNotNull())
+ .extracting(r -> getEntityCore(r.getEntity()))
+ .containsExactly(getEntityCore(N1), getEntityCore(N1_N2));
+
+ // try entities which do not exist
+ entitiesResult =
+ polarisMetaStoreManager.loadResolvedEntities(
+ polarisCallContext,
+ PolarisEntityType.CATALOG,
+ List.of(
+ new PolarisEntityId(catalog.getId(), 27),
+ new PolarisEntityId(catalog.getId(), 35)));
+ Assertions.assertThat(entitiesResult)
+ .isNotNull()
+ .returns(BaseResult.ReturnStatus.SUCCESS,
ResolvedEntitiesResult::getReturnStatus)
+ .extracting(
+ ResolvedEntitiesResult::getResolvedEntities,
+ InstanceOfAssertFactories.list(ResolvedPolarisEntity.class))
+ .hasSize(2)
+ .allSatisfy(entity -> Assertions.assertThat(entity).isNull());
+
+ // existing entities, some with wrong type
+ entitiesResult =
+ polarisMetaStoreManager.loadResolvedEntities(
+ polarisCallContext,
+ PolarisEntityType.NAMESPACE,
+ List.of(
+ new PolarisEntityId(catalog.getCatalogId(), catalog.getId()),
+ new PolarisEntityId(catalog.getId(), N1_N2.getId()),
+ new PolarisEntityId(catalog.getId(), T1.getId())));
+ Assertions.assertThat(entitiesResult)
+ .isNotNull()
+ .returns(BaseResult.ReturnStatus.SUCCESS,
ResolvedEntitiesResult::getReturnStatus)
+ .extracting(
+ ResolvedEntitiesResult::getResolvedEntities,
+ InstanceOfAssertFactories.list(ResolvedPolarisEntity.class))
+ .hasSize(3)
+ .filteredOn(Objects::nonNull)
+ .hasSize(1)
+ .extracting(r -> getEntityCore(r.getEntity()))
+ .containsExactly(getEntityCore(N1_N2));
+ }
+
+ private static PolarisEntityCore getEntityCore(PolarisBaseEntity entity) {
+ return new PolarisEntityCore.Builder<>(entity).build();
+ }
+
/** Test the set of functions for the entity cache */
public void testEntityCache() {
// create test catalog
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
index b64394844..a80d28965 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java
@@ -965,7 +965,7 @@ public class PolarisAdminService {
/** List all catalogs without checking for permission. */
private Stream<CatalogEntity> listCatalogsUnsafe() {
return metaStoreManager
- .loadEntitiesAll(
+ .listFullEntitiesAll(
getCurrentPolarisContext(),
null,
PolarisEntityType.CATALOG,
@@ -1212,7 +1212,7 @@ public class PolarisAdminService {
authorizeBasicRootOperationOrThrow(op);
return metaStoreManager
- .loadEntitiesAll(
+ .listFullEntitiesAll(
getCurrentPolarisContext(),
null,
PolarisEntityType.PRINCIPAL,
@@ -1319,7 +1319,7 @@ public class PolarisAdminService {
authorizeBasicRootOperationOrThrow(op);
return metaStoreManager
- .loadEntitiesAll(
+ .listFullEntitiesAll(
getCurrentPolarisContext(),
null,
PolarisEntityType.PRINCIPAL_ROLE,
@@ -1443,7 +1443,7 @@ public class PolarisAdminService {
CatalogEntity catalogEntity = getCatalogByName(resolutionManifest,
catalogName);
List<PolarisEntityCore> catalogPath =
PolarisEntity.toCoreList(List.of(catalogEntity));
return metaStoreManager
- .loadEntitiesAll(
+ .listFullEntitiesAll(
getCurrentPolarisContext(),
catalogPath,
PolarisEntityType.CATALOG_ROLE,
diff --git
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
index 8e4063b87..9b2f7c8a6 100644
---
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
+++
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
@@ -188,7 +188,7 @@ public class PolicyCatalog {
}
// with a policyType filter we need to load the full PolicyEntity to apply
the filter
return metaStoreManager
- .loadEntitiesAll(
+ .listFullEntitiesAll(
callContext.getPolarisCallContext(),
catalogPath,
PolarisEntityType.POLICY,