This is an automated email from the ASF dual-hosted git repository. collado pushed a commit to branch mcollado-loadentities-batch in repository https://gitbox.apache.org/repos/asf/polaris.git
commit abe9627ab09fa309e9779a1cc6409ffd07aa3832 Author: Michael Collado <[email protected]> AuthorDate: Tue Sep 23 13:25:42 2025 -0700 Add loadResolvedEntities by id and entity cache support --- .../apache/polaris/core/entity/PolarisEntity.java | 2 + .../AtomicOperationMetaStoreManager.java | 51 +- .../core/persistence/PolarisMetaStoreManager.java | 21 +- .../TransactionWorkspaceMetaStoreManager.java | 10 + .../core/persistence/cache/EntityCache.java | 35 + .../persistence/cache/InMemoryEntityCache.java | 165 +++- .../TransactionalMetaStoreManagerImpl.java | 43 +- .../persistence/cache/InMemoryEntityCacheTest.java | 845 +++++++++++++++++---- .../BasePolarisMetaStoreManagerTest.java | 2 +- .../polaris/service/admin/PolarisAdminService.java | 8 +- .../service/catalog/policy/PolicyCatalog.java | 2 +- 11 files changed, 1013 insertions(+), 171 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java index 98701af45..60a358538 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisEntity.java @@ -234,6 +234,8 @@ public class PolarisEntity extends PolarisBaseEntity { + getParentId() + ";entityVersion=" + getEntityVersion() + + ";grantVersion=" + + getGrantRecordsVersion() + ";type=" + getType() + ";subType=" diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java index de298b70a..22b45c6e8 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java @@ -1804,26 +1804,53 @@ public class AtomicOperationMetaStoreManager extends BaseMetaStoreManager { return entities.get(i); } }) - .map( - e -> { - if (e == null) { + .map(e -> toResolvedPolarisEntity(callCtx, e, ms)) + .collect(Collectors.toList()); + return new ResolvedEntitiesResult(ret); + } + + @Nonnull + @Override + public ResolvedEntitiesResult loadResolvedEntities( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisEntityType entityType, + @Nonnull List<PolarisEntityId> entityIds) { + BasePersistence ms = callCtx.getMetaStore(); + List<PolarisBaseEntity> entities = ms.lookupEntities(callCtx, entityIds); + + // mimic the behavior of loadEntity above, return null if not found or type mismatch + List<ResolvedPolarisEntity> ret = + IntStream.range(0, entityIds.size()) + .mapToObj( + i -> { + if (entities.get(i) != null && !entities.get(i).getType().equals(entityType)) { return null; } else { - // load the grant records - final List<PolarisGrantRecord> grantRecordsAsSecurable = - ms.loadAllGrantRecordsOnSecurable(callCtx, e.getCatalogId(), e.getId()); - final List<PolarisGrantRecord> grantRecordsAsGrantee = - e.getType().isGrantee() - ? ms.loadAllGrantRecordsOnGrantee(callCtx, e.getCatalogId(), e.getId()) - : List.of(); - return new ResolvedPolarisEntity( - PolarisEntity.of(e), grantRecordsAsGrantee, grantRecordsAsSecurable); + return entities.get(i); } }) + .map(e -> toResolvedPolarisEntity(callCtx, e, ms)) .collect(Collectors.toList()); return new ResolvedEntitiesResult(ret); } + private static ResolvedPolarisEntity toResolvedPolarisEntity( + PolarisCallContext callCtx, PolarisBaseEntity e, BasePersistence ms) { + if (e == null) { + return null; + } else { + // load the grant records + final List<PolarisGrantRecord> grantRecordsAsSecurable = + ms.loadAllGrantRecordsOnSecurable(callCtx, e.getCatalogId(), e.getId()); + final List<PolarisGrantRecord> grantRecordsAsGrantee = + e.getType().isGrantee() + ? ms.loadAllGrantRecordsOnGrantee(callCtx, e.getCatalogId(), e.getId()) + : List.of(); + return new ResolvedPolarisEntity( + PolarisEntity.of(e), grantRecordsAsGrantee, grantRecordsAsSecurable); + } + } + /** {@inheritDoc} */ @Override public @Nonnull ResolvedEntityResult refreshResolvedEntity( diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java index 0d2857743..245cd6dcc 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolarisMetaStoreManager.java @@ -137,7 +137,7 @@ public interface PolarisMetaStoreManager /** * Load full entities matching the given criteria with pagination. If only the entity name/id/type * is required, use {@link #listEntities} instead. If no pagination is required, use {@link - * #loadFullEntitiesAll} instead. + * #listFullEntitiesAll} instead. * * @param callCtx call context * @param catalogPath path inside a catalog. If null or empty, the entities to list are top-level, @@ -166,7 +166,7 @@ public interface PolarisMetaStoreManager * @param entitySubType subType of entities to list (or ANY_SUBTYPE) * @return list of all matching entities */ - default @Nonnull List<PolarisBaseEntity> loadFullEntitiesAll( + default @Nonnull List<PolarisBaseEntity> listFullEntitiesAll( @Nonnull PolarisCallContext callCtx, @Nullable List<PolarisEntityCore> catalogPath, @Nonnull PolarisEntityType entityType, @@ -434,6 +434,23 @@ public interface PolarisMetaStoreManager @Nonnull PolarisCallContext callCtx, @Nonnull List<EntityNameLookupRecord> entityLookupRecords); + /** + * Load a batch of resolved entities of a specified entity type given their {@link + * PolarisEntityId}. Will return an empty list if the input list is empty. Order in that returned + * list is the same as the input list. Some elements might be NULL if the entity has been dropped. + * + * @param callCtx call context + * @param entityType the type of entities to load + * @param entityIds the list of entity ids to load + * @return a non-null list of entities corresponding to the lookup keys. Some elements might be + * NULL if the entity has been dropped. + */ + @Nonnull + ResolvedEntitiesResult loadResolvedEntities( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisEntityType entityType, + @Nonnull List<PolarisEntityId> entityIds); + /** * Refresh a resolved entity from the backend store. Will return NULL if the entity does not * exist, i.e. has been purged or dropped. Else, will determine what has changed based on the diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java index 238d29a89..add217f05 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java @@ -389,6 +389,16 @@ public class TransactionWorkspaceMetaStoreManager implements PolarisMetaStoreMan return null; } + @Nonnull + @Override + public ResolvedEntitiesResult loadResolvedEntities( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisEntityType entityType, + @Nonnull List<PolarisEntityId> entityIds) { + diagnostics.fail("illegal_method_in_transaction_workspace", "loadResolvedEntities"); + return null; + } + @Override public ResolvedEntityResult refreshResolvedEntity( @Nonnull PolarisCallContext callCtx, diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java index cd438c995..93dc87ac9 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java @@ -20,8 +20,11 @@ package org.apache.polaris.core.persistence.cache; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; +import java.util.List; import org.apache.polaris.core.PolarisCallContext; +import org.apache.polaris.core.entity.EntityNameLookupRecord; import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisEntityId; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; @@ -80,4 +83,36 @@ public interface EntityCache { @Nullable EntityCacheLookupResult getOrLoadEntityByName( @Nonnull PolarisCallContext callContext, @Nonnull EntityCacheByNameKey entityNameKey); + + /** + * Load multiple entities by id, returning those found in the cache and loading those not found. + * + * @param callCtx the Polaris call context + * @param entityType the entity type + * @param entityIds the list of entity ids to load + * @return the list of resolved entities, in the same order as the requested entity ids. As in + * {@link + * org.apache.polaris.core.persistence.PolarisMetaStoreManager#loadResolvedEntities(PolarisCallContext, + * PolarisEntityType, List)}, elements in the returned list may be null if the corresponding + * entity id does not exist. + */ + List<EntityCacheLookupResult> getOrLoadResolvedEntities( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisEntityType entityType, + @Nonnull List<PolarisEntityId> entityIds); + + /** + * Load multiple entities by {@link EntityNameLookupRecord}, returning those found in the cache + * and loading those not found. + * + * @param callCtx the Polaris call context + * @param lookupRecords the list of entity name to load + * @return the list of resolved entities, in the same order as the requested entity records. As in + * {@link + * org.apache.polaris.core.persistence.PolarisMetaStoreManager#loadResolvedEntities(PolarisCallContext, + * PolarisEntityType, List)}, elements in the returned list may be null if the corresponding + * entity id does not exist. + */ + List<EntityCacheLookupResult> getOrLoadResolvedEntities( + @Nonnull PolarisCallContext callCtx, @Nonnull List<EntityNameLookupRecord> lookupRecords); } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java index efac734ed..59ecb1977 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java @@ -24,24 +24,39 @@ import com.github.benmanes.caffeine.cache.RemovalListener; import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDiagnostics; import org.apache.polaris.core.config.BehaviorChangeConfiguration; import org.apache.polaris.core.config.FeatureConfiguration; import org.apache.polaris.core.config.RealmConfig; +import org.apache.polaris.core.entity.EntityNameLookupRecord; import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; +import org.apache.polaris.core.entity.PolarisEntityId; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.PolarisGrantRecord; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; +import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult; +import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult; import org.apache.polaris.core.persistence.dao.entity.ResolvedEntityResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** An in-memory entity cache with a limit of 100k entities and a 1h TTL. */ public class InMemoryEntityCache implements EntityCache { - + private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryEntityCache.class); private EntityCacheMode cacheMode; private final PolarisDiagnostics diagnostics; private final PolarisMetaStoreManager polarisMetaStoreManager; @@ -473,4 +488,152 @@ public class InMemoryEntityCache implements EntityCache { // return what we found return new EntityCacheLookupResult(entry, cacheHit); } + + @Override + public List<EntityCacheLookupResult> getOrLoadResolvedEntities( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisEntityType entityType, + @Nonnull List<PolarisEntityId> entityIds) { + // use a map to collect cached entries to avoid concurrency problems in case a second thread is + // trying to populate + // the cache from a different snapshot + Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities = new HashMap<>(); + for (int i = 0; i < 100; i++) { + Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc = + idsToLoad -> polarisMetaStoreManager.loadResolvedEntities(callCtx, entityType, idsToLoad); + if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc)) { + break; + } + } + + return entityIds.stream() + .map( + id -> { + ResolvedPolarisEntity entity = resolvedEntities.get(id); + return entity == null ? null : new EntityCacheLookupResult(entity, true); + }) + .collect(Collectors.toList()); + } + + @Override + public List<EntityCacheLookupResult> getOrLoadResolvedEntities( + @Nonnull PolarisCallContext callCtx, @Nonnull List<EntityNameLookupRecord> lookupRecords) { + Map<PolarisEntityId, EntityNameLookupRecord> entityIdMap = + lookupRecords.stream() + .collect( + Collectors.toMap( + e -> new PolarisEntityId(e.getCatalogId(), e.getId()), + Function.identity(), + (a, b) -> a)); + Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc = + idsToLoad -> + polarisMetaStoreManager.loadResolvedEntities( + callCtx, idsToLoad.stream().map(entityIdMap::get).collect(Collectors.toList())); + + // use a map to collect cached entries to avoid concurrency problems in case a second thread is + // trying to populate + // the cache from a different snapshot + Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities = new HashMap<>(); + List<PolarisEntityId> entityIds = + lookupRecords.stream() + .map(e -> new PolarisEntityId(e.getCatalogId(), e.getId())) + .collect(Collectors.toList()); + for (int i = 0; i < 100; i++) { + if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc)) { + break; + } + } + + return lookupRecords.stream() + .map( + lookupRecord -> { + ResolvedPolarisEntity entity = + resolvedEntities.get( + new PolarisEntityId(lookupRecord.getCatalogId(), lookupRecord.getId())); + return entity == null ? null : new EntityCacheLookupResult(entity, true); + }) + .collect(Collectors.toList()); + } + + private boolean isCacheStateValid( + @Nonnull PolarisCallContext callCtx, + @Nonnull Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities, + @Nonnull List<PolarisEntityId> entityIds, + @Nonnull Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc) { + ChangeTrackingResult changeTrackingResult = + polarisMetaStoreManager.loadEntitiesChangeTracking(callCtx, entityIds); + List<PolarisEntityId> idsToLoad = new ArrayList<>(); + if (changeTrackingResult.isSuccess()) { + idsToLoad.addAll(validateCacheEntries(entityIds, resolvedEntities, changeTrackingResult)); + } else { + idsToLoad.addAll(entityIds); + } + if (!idsToLoad.isEmpty()) { + ResolvedEntitiesResult resolvedEntitiesResult = loaderFunc.apply(idsToLoad); + if (resolvedEntitiesResult.isSuccess()) { + LOGGER.debug("Resolved entities - validating cache"); + resolvedEntitiesResult.getResolvedEntities().stream() + .filter(Objects::nonNull) + .forEach( + e -> { + this.cacheNewEntry(e); + resolvedEntities.put( + new PolarisEntityId(e.getEntity().getCatalogId(), e.getEntity().getId()), e); + }); + } + } + + // the loader function should always return a batch of results from the same "snapshot" of the + // persistence, so + // if the changeTracking call above failed, we should have loaded the entire batch in one shot. + // There should be no + // need to revalidate the entities. + List<PolarisEntityId> idsToReload = + changeTrackingResult.isSuccess() + ? validateCacheEntries(entityIds, resolvedEntities, changeTrackingResult) + : List.of(); + return idsToReload.isEmpty(); + } + + private List<PolarisEntityId> validateCacheEntries( + List<PolarisEntityId> entityIds, + Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities, + ChangeTrackingResult changeTrackingResult) { + List<PolarisEntityId> idsToReload = new ArrayList<>(); + Iterator<PolarisEntityId> idIterator = entityIds.iterator(); + Iterator<PolarisChangeTrackingVersions> changeTrackingIterator = + changeTrackingResult.getChangeTrackingVersions().iterator(); + while (idIterator.hasNext() && changeTrackingIterator.hasNext()) { + PolarisEntityId entityId = idIterator.next(); + PolarisChangeTrackingVersions changeTrackingVersions = changeTrackingIterator.next(); + if (changeTrackingVersions == null) { + // entity has been purged + ResolvedPolarisEntity cachedEntity = getEntityById(entityId.getId()); + if (cachedEntity != null || resolvedEntities.containsKey(entityId)) { + LOGGER.debug("Entity {} has been purged, removing from cache", entityId); + Optional.ofNullable(cachedEntity).ifPresent(this::removeCacheEntry); + resolvedEntities.remove(entityId); + } + continue; + } + // compare versions using equals rather than less than so we can use the same function to + // validate that the cache + // entries are consistent with a single call to the change tracking table, rather than some + // grants ahead and some + // grants behind + ResolvedPolarisEntity cachedEntity = + resolvedEntities.computeIfAbsent(entityId, id -> this.getEntityById(id.getId())); + if (cachedEntity == null + || cachedEntity.getEntity().getEntityVersion() + != changeTrackingVersions.getEntityVersion() + || cachedEntity.getEntity().getGrantRecordsVersion() + != changeTrackingVersions.getGrantRecordsVersion()) { + idsToReload.add(entityId); + } else { + resolvedEntities.put(entityId, cachedEntity); + } + } + LOGGER.debug("Cache entries {} need to be reloaded", idsToReload); + return idsToReload; + } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java index 45a02027e..899a82016 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java @@ -2310,24 +2310,19 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { return result; } - /** Refer to {@link #loadEntity(PolarisCallContext, long, long, PolarisEntityType)} */ - private @Nonnull ResolvedEntitiesResult loadResolvedEntities( - @Nonnull PolarisCallContext callCtx, - @Nonnull TransactionalPersistence ms, - @Nonnull List<EntityNameLookupRecord> entityLookupRecords) { - List<PolarisBaseEntity> entities = - ms.lookupEntitiesInCurrentTxn( - callCtx, - entityLookupRecords.stream() - .map(r -> new PolarisEntityId(r.getCatalogId(), r.getId())) - .collect(Collectors.toList())); + private static ResolvedEntitiesResult getResolvedEntitiesResult( + PolarisCallContext callCtx, + TransactionalPersistence ms, + List<PolarisEntityId> entityIds, + Function<Integer, PolarisEntityType> entityTypeForIndex) { + List<PolarisBaseEntity> entities = ms.lookupEntitiesInCurrentTxn(callCtx, entityIds); // mimic the behavior of loadEntity above, return null if not found or type mismatch List<ResolvedPolarisEntity> ret = - IntStream.range(0, entityLookupRecords.size()) + IntStream.range(0, entityIds.size()) .mapToObj( i -> { if (entities.get(i) != null - && !entities.get(i).getType().equals(entityLookupRecords.get(i).getType())) { + && !entities.get(i).getType().equals(entityTypeForIndex.apply(i))) { return null; } else { return entities.get(i); @@ -2361,11 +2356,25 @@ public class TransactionalMetaStoreManagerImpl extends BaseMetaStoreManager { @Nonnull PolarisCallContext callCtx, @Nonnull List<EntityNameLookupRecord> entityLookupRecords) { TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore()); + List<PolarisEntityId> entityIds = + entityLookupRecords.stream() + .map(r -> new PolarisEntityId(r.getCatalogId(), r.getId())) + .collect(Collectors.toList()); + Function<Integer, PolarisEntityType> entityTypeForIndex = + i -> entityLookupRecords.get(i).getType(); return ms.runInReadTransaction( - callCtx, - () -> - this.loadResolvedEntities( - callCtx, (TransactionalPersistence) callCtx.getMetaStore(), entityLookupRecords)); + callCtx, () -> getResolvedEntitiesResult(callCtx, ms, entityIds, entityTypeForIndex)); + } + + @Nonnull + @Override + public ResolvedEntitiesResult loadResolvedEntities( + @Nonnull PolarisCallContext callCtx, + @Nonnull PolarisEntityType entityType, + @Nonnull List<PolarisEntityId> entityIds) { + TransactionalPersistence ms = ((TransactionalPersistence) callCtx.getMetaStore()); + return ms.runInReadTransaction( + callCtx, () -> getResolvedEntitiesResult(callCtx, ms, entityIds, i -> entityType)); } /** {@inheritDoc} */ diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java index d750c4821..baf20c86a 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java @@ -19,16 +19,26 @@ package org.apache.polaris.core.persistence.cache; import static org.apache.polaris.core.persistence.PrincipalSecretsGenerator.RANDOM_SECRETS; +import static org.assertj.core.api.Assertions.assertThat; import java.time.Clock; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.polaris.core.PolarisCallContext; import org.apache.polaris.core.PolarisDefaultDiagServiceImpl; import org.apache.polaris.core.PolarisDiagnostics; +import org.apache.polaris.core.entity.EntityNameLookupRecord; import org.apache.polaris.core.entity.PolarisBaseEntity; +import org.apache.polaris.core.entity.PolarisChangeTrackingVersions; import org.apache.polaris.core.entity.PolarisEntity; +import org.apache.polaris.core.entity.PolarisEntityId; import org.apache.polaris.core.entity.PolarisEntitySubType; import org.apache.polaris.core.entity.PolarisEntityType; import org.apache.polaris.core.entity.PolarisGrantRecord; @@ -37,17 +47,21 @@ import org.apache.polaris.core.entity.table.IcebergTableLikeEntity; import org.apache.polaris.core.persistence.PolarisMetaStoreManager; import org.apache.polaris.core.persistence.PolarisTestMetaStoreManager; import org.apache.polaris.core.persistence.ResolvedPolarisEntity; +import org.apache.polaris.core.persistence.dao.entity.ChangeTrackingResult; +import org.apache.polaris.core.persistence.dao.entity.ResolvedEntitiesResult; import org.apache.polaris.core.persistence.transactional.TransactionalMetaStoreManagerImpl; import org.apache.polaris.core.persistence.transactional.TransactionalPersistence; import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore; import org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPersistenceImpl; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Unit testing of the entity cache */ public class InMemoryEntityCacheTest { + public static final Logger LOGGER = LoggerFactory.getLogger(InMemoryEntityCache.class); private final PolarisDiagnostics diagServices; private final PolarisCallContext callCtx; private final PolarisTestMetaStoreManager tm; @@ -105,31 +119,31 @@ public class InMemoryEntityCacheTest { EntityCacheLookupResult lookup = cache.getOrLoadEntityByName( this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); - Assertions.assertThat(lookup).isNotNull(); - Assertions.assertThat(lookup.isCacheHit()).isFalse(); - Assertions.assertThat(lookup.getCacheEntry()).isNotNull(); + assertThat(lookup).isNotNull(); + assertThat(lookup.isCacheHit()).isFalse(); + assertThat(lookup.getCacheEntry()).isNotNull(); // validate the cache entry PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); - Assertions.assertThat(catalog).isNotNull(); - Assertions.assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG); + assertThat(catalog).isNotNull(); + assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG); // do it again, should be found in the cache lookup = cache.getOrLoadEntityByName( this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); - Assertions.assertThat(lookup).isNotNull(); - Assertions.assertThat(lookup.isCacheHit()).isTrue(); + assertThat(lookup).isNotNull(); + assertThat(lookup.isCacheHit()).isTrue(); // do it again by id, should be found in the cache lookup = cache.getOrLoadEntityById( this.callCtx, catalog.getCatalogId(), catalog.getId(), catalog.getType()); - Assertions.assertThat(lookup).isNotNull(); - Assertions.assertThat(lookup.isCacheHit()).isTrue(); - Assertions.assertThat(lookup.getCacheEntry()).isNotNull(); - Assertions.assertThat(lookup.getCacheEntry().getEntity()).isNotNull(); - Assertions.assertThat(lookup.getCacheEntry().getGrantRecordsAsSecurable()).isNotNull(); + assertThat(lookup).isNotNull(); + assertThat(lookup.isCacheHit()).isTrue(); + assertThat(lookup.getCacheEntry()).isNotNull(); + assertThat(lookup.getCacheEntry().getEntity()).isNotNull(); + assertThat(lookup.getCacheEntry().getGrantRecordsAsSecurable()).isNotNull(); // get N1 PolarisBaseEntity N1 = @@ -140,128 +154,119 @@ public class InMemoryEntityCacheTest { new EntityCacheByNameKey( catalog.getId(), catalog.getId(), PolarisEntityType.NAMESPACE, "N1"); ResolvedPolarisEntity cacheEntry = cache.getEntityByName(N1_name); - Assertions.assertThat(cacheEntry).isNull(); + assertThat(cacheEntry).isNull(); // try to find it in the cache by id. Should not be there, i.e. no cache hit lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), N1.getId(), N1.getType()); - Assertions.assertThat(lookup).isNotNull(); - Assertions.assertThat(lookup.isCacheHit()).isFalse(); + assertThat(lookup).isNotNull(); + assertThat(lookup.isCacheHit()).isFalse(); // should be there now, by name cacheEntry = cache.getEntityByName(N1_name); - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getEntity()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); // should be there now, by id cacheEntry = cache.getEntityById(N1.getId()); - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getEntity()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); // lookup N1 ResolvedPolarisEntity N1_entry = cache.getEntityById(N1.getId()); - Assertions.assertThat(N1_entry).isNotNull(); - Assertions.assertThat(N1_entry.getEntity()).isNotNull(); - Assertions.assertThat(N1_entry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(N1_entry).isNotNull(); + assertThat(N1_entry.getEntity()).isNotNull(); + assertThat(N1_entry.getGrantRecordsAsSecurable()).isNotNull(); // negative tests, load an entity which does not exist lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), 10000, N1.getType()); - Assertions.assertThat(lookup).isNull(); + assertThat(lookup).isNull(); lookup = cache.getOrLoadEntityByName( this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "non_existant_catalog")); - Assertions.assertThat(lookup).isNull(); + assertThat(lookup).isNull(); // lookup N2 to validate grants EntityCacheByNameKey N2_name = new EntityCacheByNameKey(catalog.getId(), N1.getId(), PolarisEntityType.NAMESPACE, "N2"); lookup = cache.getOrLoadEntityByName(callCtx, N2_name); - Assertions.assertThat(lookup).isNotNull(); + assertThat(lookup).isNotNull(); ResolvedPolarisEntity cacheEntry_N1 = lookup.getCacheEntry(); - Assertions.assertThat(cacheEntry_N1).isNotNull(); - Assertions.assertThat(cacheEntry_N1.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry_N1).isNotNull(); + assertThat(cacheEntry_N1.getEntity()).isNotNull(); + assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).isNotNull(); // lookup catalog role R1 EntityCacheByNameKey R1_name = new EntityCacheByNameKey( catalog.getId(), catalog.getId(), PolarisEntityType.CATALOG_ROLE, "R1"); lookup = cache.getOrLoadEntityByName(callCtx, R1_name); - Assertions.assertThat(lookup).isNotNull(); + assertThat(lookup).isNotNull(); ResolvedPolarisEntity cacheEntry_R1 = lookup.getCacheEntry(); - Assertions.assertThat(cacheEntry_R1).isNotNull(); - Assertions.assertThat(cacheEntry_R1.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).isNotNull(); - Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).isNotNull(); + assertThat(cacheEntry_R1).isNotNull(); + assertThat(cacheEntry_R1.getEntity()).isNotNull(); + assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).isNotNull(); // we expect one TABLE_READ grant on that securable granted to the catalog role R1 - Assertions.assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).hasSize(1); + assertThat(cacheEntry_N1.getGrantRecordsAsSecurable()).hasSize(1); PolarisGrantRecord gr = cacheEntry_N1.getGrantRecordsAsSecurable().get(0); // securable is N1, grantee is R1 - Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId()); - Assertions.assertThat(gr.getGranteeCatalogId()) - .isEqualTo(cacheEntry_R1.getEntity().getCatalogId()); - Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId()); - Assertions.assertThat(gr.getSecurableCatalogId()) - .isEqualTo(cacheEntry_N1.getEntity().getCatalogId()); - Assertions.assertThat(gr.getPrivilegeCode()) - .isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode()); + assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId()); + assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId()); + assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId()); + assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_N1.getEntity().getCatalogId()); + assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode()); // R1 should have 4 privileges granted to it - Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).hasSize(4); + assertThat(cacheEntry_R1.getGrantRecordsAsGrantee()).hasSize(4); List<PolarisGrantRecord> matchPriv = cacheEntry_R1.getGrantRecordsAsGrantee().stream() .filter( grantRecord -> grantRecord.getPrivilegeCode() == PolarisPrivilege.TABLE_READ_DATA.getCode()) .collect(Collectors.toList()); - Assertions.assertThat(matchPriv).hasSize(1); + assertThat(matchPriv).hasSize(1); gr = matchPriv.get(0); - Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId()); - Assertions.assertThat(gr.getGranteeCatalogId()) - .isEqualTo(cacheEntry_R1.getEntity().getCatalogId()); - Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId()); - Assertions.assertThat(gr.getSecurableCatalogId()) - .isEqualTo(cacheEntry_N1.getEntity().getCatalogId()); - Assertions.assertThat(gr.getPrivilegeCode()) - .isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode()); + assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_R1.getEntity().getId()); + assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId()); + assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_N1.getEntity().getId()); + assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_N1.getEntity().getCatalogId()); + assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.TABLE_READ_DATA.getCode()); // lookup principal role PR1 EntityCacheByNameKey PR1_name = new EntityCacheByNameKey(PolarisEntityType.PRINCIPAL_ROLE, "PR1"); lookup = cache.getOrLoadEntityByName(callCtx, PR1_name); - Assertions.assertThat(lookup).isNotNull(); + assertThat(lookup).isNotNull(); ResolvedPolarisEntity cacheEntry_PR1 = lookup.getCacheEntry(); - Assertions.assertThat(cacheEntry_PR1).isNotNull(); - Assertions.assertThat(cacheEntry_PR1.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).isNotNull(); - Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).isNotNull(); + assertThat(cacheEntry_PR1).isNotNull(); + assertThat(cacheEntry_PR1.getEntity()).isNotNull(); + assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).isNotNull(); // R1 should have 1 CATALOG_ROLE_USAGE privilege granted *on* it to PR1 - Assertions.assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).hasSize(1); + assertThat(cacheEntry_R1.getGrantRecordsAsSecurable()).hasSize(1); gr = cacheEntry_R1.getGrantRecordsAsSecurable().get(0); - Assertions.assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_R1.getEntity().getId()); - Assertions.assertThat(gr.getSecurableCatalogId()) - .isEqualTo(cacheEntry_R1.getEntity().getCatalogId()); - Assertions.assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_PR1.getEntity().getId()); - Assertions.assertThat(gr.getGranteeCatalogId()) - .isEqualTo(cacheEntry_PR1.getEntity().getCatalogId()); - Assertions.assertThat(gr.getPrivilegeCode()) - .isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode()); + assertThat(gr.getSecurableId()).isEqualTo(cacheEntry_R1.getEntity().getId()); + assertThat(gr.getSecurableCatalogId()).isEqualTo(cacheEntry_R1.getEntity().getCatalogId()); + assertThat(gr.getGranteeId()).isEqualTo(cacheEntry_PR1.getEntity().getId()); + assertThat(gr.getGranteeCatalogId()).isEqualTo(cacheEntry_PR1.getEntity().getCatalogId()); + assertThat(gr.getPrivilegeCode()).isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode()); // PR1 should have 1 grant on it to P1. - Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).hasSize(1); - Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable().get(0).getPrivilegeCode()) + assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable()).hasSize(1); + assertThat(cacheEntry_PR1.getGrantRecordsAsSecurable().get(0).getPrivilegeCode()) .isEqualTo(PolarisPrivilege.PRINCIPAL_ROLE_USAGE.getCode()); // PR1 should have 2 grants to it, on R1 and R2 - Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).hasSize(2); - Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(0).getPrivilegeCode()) + assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee()).hasSize(2); + assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(0).getPrivilegeCode()) .isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode()); - Assertions.assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(1).getPrivilegeCode()) + assertThat(cacheEntry_PR1.getGrantRecordsAsGrantee().get(1).getPrivilegeCode()) .isEqualTo(PolarisPrivilege.CATALOG_ROLE_USAGE.getCode()); } @@ -274,14 +279,14 @@ public class InMemoryEntityCacheTest { EntityCacheLookupResult lookup = cache.getOrLoadEntityByName( this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); - Assertions.assertThat(lookup).isNotNull(); - Assertions.assertThat(lookup.isCacheHit()).isFalse(); + assertThat(lookup).isNotNull(); + assertThat(lookup.isCacheHit()).isFalse(); // the catalog - Assertions.assertThat(lookup.getCacheEntry()).isNotNull(); + assertThat(lookup.getCacheEntry()).isNotNull(); PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); - Assertions.assertThat(catalog).isNotNull(); - Assertions.assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG); + assertThat(catalog).isNotNull(); + assertThat(catalog.getType()).isEqualTo(PolarisEntityType.CATALOG); // find table N5/N6/T6 PolarisBaseEntity N5 = @@ -294,23 +299,23 @@ public class InMemoryEntityCacheTest { PolarisEntityType.TABLE_LIKE, PolarisEntitySubType.ICEBERG_TABLE, "T6"); - Assertions.assertThat(T6v1).isNotNull(); + assertThat(T6v1).isNotNull(); // that table is not in the cache ResolvedPolarisEntity cacheEntry = cache.getEntityById(T6v1.getId()); - Assertions.assertThat(cacheEntry).isNull(); + assertThat(cacheEntry).isNull(); // now load that table in the cache cacheEntry = cache.getAndRefreshIfNeeded( this.callCtx, T6v1, T6v1.getEntityVersion(), T6v1.getGrantRecordsVersion()); - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getEntity()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); PolarisBaseEntity table = cacheEntry.getEntity(); - Assertions.assertThat(table.getId()).isEqualTo(T6v1.getId()); - Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion()); - Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion()); + assertThat(table.getId()).isEqualTo(T6v1.getId()); + assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion()); + assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion()); // update the entity PolarisBaseEntity T6v2 = @@ -319,31 +324,31 @@ public class InMemoryEntityCacheTest { T6v1, "{\"v2_properties\": \"some value\"}", "{\"v2_internal_properties\": \"internal value\"}"); - Assertions.assertThat(T6v2).isNotNull(); + assertThat(T6v2).isNotNull(); // now refresh that entity. But because we don't change the versions, nothing should be reloaded cacheEntry = cache.getAndRefreshIfNeeded( this.callCtx, T6v1, T6v1.getEntityVersion(), T6v1.getGrantRecordsVersion()); - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getEntity()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); table = cacheEntry.getEntity(); - Assertions.assertThat(table.getId()).isEqualTo(T6v1.getId()); - Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion()); - Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion()); + assertThat(table.getId()).isEqualTo(T6v1.getId()); + assertThat(table.getEntityVersion()).isEqualTo(T6v1.getEntityVersion()); + assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v1.getGrantRecordsVersion()); // now refresh again, this time with the new versions. Should be reloaded cacheEntry = cache.getAndRefreshIfNeeded( this.callCtx, T6v2, T6v2.getEntityVersion(), T6v2.getGrantRecordsVersion()); - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getEntity()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); table = cacheEntry.getEntity(); - Assertions.assertThat(table.getId()).isEqualTo(T6v2.getId()); - Assertions.assertThat(table.getEntityVersion()).isEqualTo(T6v2.getEntityVersion()); - Assertions.assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v2.getGrantRecordsVersion()); + assertThat(table.getId()).isEqualTo(T6v2.getId()); + assertThat(table.getEntityVersion()).isEqualTo(T6v2.getEntityVersion()); + assertThat(table.getGrantRecordsVersion()).isEqualTo(T6v2.getGrantRecordsVersion()); // update it again PolarisBaseEntity T6v3 = @@ -352,7 +357,7 @@ public class InMemoryEntityCacheTest { T6v2, "{\"v3_properties\": \"some value\"}", "{\"v3_internal_properties\": \"internal value\"}"); - Assertions.assertThat(T6v3).isNotNull(); + assertThat(T6v3).isNotNull(); // the two catalog roles PolarisBaseEntity R1 = @@ -368,9 +373,9 @@ public class InMemoryEntityCacheTest { this.callCtx, N2, N2.getEntityVersion(), N2.getGrantRecordsVersion()); // should have one single grant - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1); + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1); // perform an additional grant to R1 this.tm.grantPrivilege(R1, List.of(catalog, N1), N2, PolarisPrivilege.NAMESPACE_FULL_METADATA); @@ -380,8 +385,8 @@ public class InMemoryEntityCacheTest { this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); // same entity version but different grant records - Assertions.assertThat(N2v2).isNotNull(); - Assertions.assertThat(N2v2.getGrantRecordsVersion()).isEqualTo(N2.getGrantRecordsVersion() + 1); + assertThat(N2v2).isNotNull(); + assertThat(N2v2.getGrantRecordsVersion()).isEqualTo(N2.getGrantRecordsVersion() + 1); // the cache is outdated now lookup = @@ -389,24 +394,24 @@ public class InMemoryEntityCacheTest { this.callCtx, new EntityCacheByNameKey( catalog.getId(), N1.getId(), PolarisEntityType.NAMESPACE, "N2")); - Assertions.assertThat(lookup).isNotNull(); + assertThat(lookup).isNotNull(); cacheEntry = lookup.getCacheEntry(); - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1); - Assertions.assertThat(cacheEntry.getEntity().getGrantRecordsVersion()) + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getEntity()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(1); + assertThat(cacheEntry.getEntity().getGrantRecordsVersion()) .isEqualTo(N2.getGrantRecordsVersion()); // now refresh cacheEntry = cache.getAndRefreshIfNeeded( this.callCtx, N2, N2v2.getEntityVersion(), N2v2.getGrantRecordsVersion()); - Assertions.assertThat(cacheEntry).isNotNull(); - Assertions.assertThat(cacheEntry.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); - Assertions.assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(2); - Assertions.assertThat(cacheEntry.getEntity().getGrantRecordsVersion()) + assertThat(cacheEntry).isNotNull(); + assertThat(cacheEntry.getEntity()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry.getGrantRecordsAsSecurable()).hasSize(2); + assertThat(cacheEntry.getEntity().getGrantRecordsVersion()) .isEqualTo(N2v2.getGrantRecordsVersion()); } @@ -418,23 +423,23 @@ public class InMemoryEntityCacheTest { EntityCacheLookupResult lookup = cache.getOrLoadEntityByName( this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); - Assertions.assertThat(lookup).isNotNull(); - Assertions.assertThat(lookup.getCacheEntry()).isNotNull(); + assertThat(lookup).isNotNull(); + assertThat(lookup.getCacheEntry()).isNotNull(); PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); PolarisBaseEntity N1 = this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N1"); lookup = cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), N1.getId(), N1.getType()); - Assertions.assertThat(lookup).isNotNull(); + assertThat(lookup).isNotNull(); EntityCacheByNameKey T4_name = new EntityCacheByNameKey(N1.getCatalogId(), N1.getId(), PolarisEntityType.TABLE_LIKE, "T4"); lookup = cache.getOrLoadEntityByName(callCtx, T4_name); - Assertions.assertThat(lookup).isNotNull(); + assertThat(lookup).isNotNull(); ResolvedPolarisEntity cacheEntry_T4 = lookup.getCacheEntry(); - Assertions.assertThat(cacheEntry_T4).isNotNull(); - Assertions.assertThat(cacheEntry_T4.getEntity()).isNotNull(); - Assertions.assertThat(cacheEntry_T4.getGrantRecordsAsSecurable()).isNotNull(); + assertThat(cacheEntry_T4).isNotNull(); + assertThat(cacheEntry_T4.getEntity()).isNotNull(); + assertThat(cacheEntry_T4.getGrantRecordsAsSecurable()).isNotNull(); PolarisBaseEntity T4 = cacheEntry_T4.getEntity(); @@ -445,18 +450,17 @@ public class InMemoryEntityCacheTest { new EntityCacheByNameKey( N1.getCatalogId(), N1.getId(), PolarisEntityType.TABLE_LIKE, "T4_renamed"); lookup = cache.getOrLoadEntityByName(callCtx, T4_renamed); - Assertions.assertThat(lookup).isNotNull(); + assertThat(lookup).isNotNull(); ResolvedPolarisEntity cacheEntry_T4_renamed = lookup.getCacheEntry(); - Assertions.assertThat(cacheEntry_T4_renamed).isNotNull(); + assertThat(cacheEntry_T4_renamed).isNotNull(); PolarisBaseEntity T4_renamed_entity = cacheEntry_T4_renamed.getEntity(); // new entry if lookup by id EntityCacheLookupResult lookupResult = cache.getOrLoadEntityById(callCtx, T4.getCatalogId(), T4.getId(), T4.getType()); - Assertions.assertThat(lookupResult).isNotNull(); - Assertions.assertThat(lookupResult.getCacheEntry()).isNotNull(); - Assertions.assertThat(lookupResult.getCacheEntry().getEntity().getName()) - .isEqualTo("T4_renamed"); + assertThat(lookupResult).isNotNull(); + assertThat(lookupResult.getCacheEntry()).isNotNull(); + assertThat(lookupResult.getCacheEntry().getEntity().getName()).isEqualTo("T4_renamed"); // old name is gone, replaced by new name // Assertions.assertNull(cache.getOrLoadEntityByName(callCtx, T4_name)); @@ -469,7 +473,7 @@ public class InMemoryEntityCacheTest { T4_renamed_entity.getGrantRecordsVersion()); // now the loading by the old name should return null - Assertions.assertThat(cache.getOrLoadEntityByName(callCtx, T4_name)).isNull(); + assertThat(cache.getOrLoadEntityByName(callCtx, T4_name)).isNull(); } /* Helper for `testEntityWeigher` */ @@ -490,7 +494,582 @@ public class InMemoryEntityCacheTest { .setMetadataLocation("a".repeat(1000 * 1000)) .build(); - Assertions.assertThat(getEntityWeight(smallEntity)).isLessThan(getEntityWeight(mediumEntity)); - Assertions.assertThat(getEntityWeight(mediumEntity)).isLessThan(getEntityWeight(largeEntity)); + assertThat(getEntityWeight(smallEntity)).isLessThan(getEntityWeight(mediumEntity)); + assertThat(getEntityWeight(mediumEntity)).isLessThan(getEntityWeight(largeEntity)); + } + + @Test + public void testBatchLoadByEntityIds() { + // get a new cache + InMemoryEntityCache cache = this.allocateNewCache(); + + // Load catalog into cache + EntityCacheLookupResult lookup = + cache.getOrLoadEntityByName( + this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); + assertThat(lookup).isNotNull(); + PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); + + // Get some entities that exist in the test setup + PolarisBaseEntity N1 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N1"); + PolarisBaseEntity N5 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N5"); + PolarisBaseEntity N5_N6 = + this.tm.ensureExistsByName(List.of(catalog, N5), PolarisEntityType.NAMESPACE, "N6"); + + // Pre-load N1 into cache + cache.getOrLoadEntityById(this.callCtx, N1.getCatalogId(), N1.getId(), N1.getType()); + + // Create list of entity IDs - N1 is already cached, N5, N5_N6 are not + List<PolarisEntityId> entityIds = + List.of(getPolarisEntityId(N1), getPolarisEntityId(N5), getPolarisEntityId(N5_N6)); + + // Test batch loading by entity IDs (all are namespaces) + List<EntityCacheLookupResult> results = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + + // Verify all entities were found + assertThat(results).hasSize(3); + assertThat(results.get(0)).isNotNull(); // N1 - was cached + assertThat(results.get(1)).isNotNull(); // N5 - was loaded + assertThat(results.get(2)).isNotNull(); // N5_N6 - was loaded + + // Verify the entities are correct + assertThat(results.get(0).getCacheEntry().getEntity().getId()).isEqualTo(N1.getId()); + assertThat(results.get(1).getCacheEntry().getEntity().getId()).isEqualTo(N5.getId()); + assertThat(results.get(2).getCacheEntry().getEntity().getId()).isEqualTo(N5_N6.getId()); + + // All should be cache hits now since they were loaded in the previous call + assertThat(results.get(0).isCacheHit()).isTrue(); + assertThat(results.get(1).isCacheHit()).isTrue(); + assertThat(results.get(2).isCacheHit()).isTrue(); + + // Test with a non-existent entity ID + List<PolarisEntityId> nonExistentIds = + List.of(new PolarisEntityId(catalog.getCatalogId(), 99999L)); + List<EntityCacheLookupResult> nonExistentResults = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.NAMESPACE, nonExistentIds); + + assertThat(nonExistentResults).hasSize(1); + assertThat(nonExistentResults.get(0)).isNull(); + + // Test with table entities separately + PolarisBaseEntity T6 = + this.tm.ensureExistsByName( + List.of(catalog, N5, N5_N6), + PolarisEntityType.TABLE_LIKE, + PolarisEntitySubType.ICEBERG_TABLE, + "T6"); + + List<PolarisEntityId> tableIds = List.of(getPolarisEntityId(T6)); + + List<EntityCacheLookupResult> tableResults = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.TABLE_LIKE, tableIds); + + assertThat(tableResults).hasSize(1); + assertThat(tableResults.get(0)).isNotNull(); + assertThat(tableResults.get(0).getCacheEntry().getEntity().getId()).isEqualTo(T6.getId()); + } + + @Test + public void testBatchLoadByNameLookupRecords() { + // get a new cache + InMemoryEntityCache cache = this.allocateNewCache(); + + // Load catalog into cache + EntityCacheLookupResult lookup = + cache.getOrLoadEntityByName( + this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); + assertThat(lookup).isNotNull(); + PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); + + // Get some entities that exist in the test setup + PolarisBaseEntity N1 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N1"); + PolarisBaseEntity N2 = + this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); + PolarisBaseEntity R1 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.CATALOG_ROLE, "R1"); + + // Pre-load N1 into cache by name + cache.getOrLoadEntityByName( + this.callCtx, + new EntityCacheByNameKey( + catalog.getId(), catalog.getId(), PolarisEntityType.NAMESPACE, "N1")); + + // Create list of EntityNameLookupRecords + List<EntityNameLookupRecord> lookupRecords = + List.of( + new EntityNameLookupRecord(N1), // already cached + new EntityNameLookupRecord(N2), // not cached + new EntityNameLookupRecord(R1) // not cached + ); + + // Test batch loading by name lookup records + List<EntityCacheLookupResult> results = + cache.getOrLoadResolvedEntities(this.callCtx, lookupRecords); + + // Verify all entities were found + assertThat(results).hasSize(3); + assertThat(results.get(0)).isNotNull(); // N1 - was cached + assertThat(results.get(1)).isNotNull(); // N2 - was loaded + assertThat(results.get(2)).isNotNull(); // R1 - was loaded + + // Verify the entities are correct + assertThat(results.get(0).getCacheEntry().getEntity().getId()).isEqualTo(N1.getId()); + assertThat(results.get(1).getCacheEntry().getEntity().getId()).isEqualTo(N2.getId()); + assertThat(results.get(2).getCacheEntry().getEntity().getId()).isEqualTo(R1.getId()); + + // All should be cache hits now + assertThat(results.get(0).isCacheHit()).isTrue(); + assertThat(results.get(1).isCacheHit()).isTrue(); + assertThat(results.get(2).isCacheHit()).isTrue(); + } + + @Test + public void testBatchLoadWithStaleVersions() { + // get a new cache + InMemoryEntityCache cache = this.allocateNewCache(); + + // Load catalog into cache + EntityCacheLookupResult lookup = + cache.getOrLoadEntityByName( + this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); + assertThat(lookup).isNotNull(); + PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); + + // Get table T6 that we can update + PolarisBaseEntity N5 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N5"); + PolarisBaseEntity N5_N6 = + this.tm.ensureExistsByName(List.of(catalog, N5), PolarisEntityType.NAMESPACE, "N6"); + PolarisBaseEntity T6v1 = + this.tm.ensureExistsByName( + List.of(catalog, N5, N5_N6), + PolarisEntityType.TABLE_LIKE, + PolarisEntitySubType.ICEBERG_TABLE, + "T6"); + + // Load T6 into cache initially + cache.getOrLoadEntityById(this.callCtx, T6v1.getCatalogId(), T6v1.getId(), T6v1.getType()); + + // Verify it's in cache with original version + ResolvedPolarisEntity cachedT6 = cache.getEntityById(T6v1.getId()); + assertThat(cachedT6).isNotNull(); + assertThat(cachedT6.getEntity().getEntityVersion()).isEqualTo(T6v1.getEntityVersion()); + + // Update the entity to create a new version + PolarisBaseEntity T6v2 = + this.tm.updateEntity( + List.of(catalog, N5, N5_N6), + T6v1, + "{\"v2_properties\": \"some value\"}", + "{\"v2_internal_properties\": \"internal value\"}"); + assertThat(T6v2).isNotNull(); + assertThat(T6v2.getEntityVersion()).isGreaterThan(T6v1.getEntityVersion()); + + // Create entity ID list with the updated entity + List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(T6v2)); + + // Call batch load - this should detect the stale version and reload + List<EntityCacheLookupResult> results = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.TABLE_LIKE, entityIds); + + // Verify the entity was reloaded with the new version + assertThat(results).hasSize(1); + assertThat(results.get(0)).isNotNull(); + + ResolvedPolarisEntity reloadedT6 = results.get(0).getCacheEntry(); + assertThat(reloadedT6.getEntity().getId()).isEqualTo(T6v2.getId()); + assertThat(reloadedT6.getEntity().getEntityVersion()).isEqualTo(T6v2.getEntityVersion()); + + // Verify the cache now contains the updated version + cachedT6 = cache.getEntityById(T6v2.getId()); + assertThat(cachedT6).isNotNull(); + assertThat(cachedT6.getEntity().getEntityVersion()).isEqualTo(T6v2.getEntityVersion()); + } + + @Test + public void testBatchLoadWithStaleGrantVersions() { + // get a new cache + InMemoryEntityCache cache = this.allocateNewCache(); + + // Load catalog into cache + EntityCacheLookupResult lookup = + cache.getOrLoadEntityByName( + this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); + assertThat(lookup).isNotNull(); + PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); + + // Get entities we'll work with + PolarisBaseEntity N1 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N1"); + PolarisBaseEntity N2 = + this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); + PolarisBaseEntity R1 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.CATALOG_ROLE, "R1"); + + // Load N2 into cache initially + cache.getOrLoadEntityByName( + this.callCtx, + new EntityCacheByNameKey(catalog.getId(), N1.getId(), PolarisEntityType.NAMESPACE, "N2")); + + // Verify it's in cache with original grant version + ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId()); + assertThat(cachedN2).isNotNull(); + int originalGrantVersion = cachedN2.getEntity().getGrantRecordsVersion(); + + // Grant additional privilege to change grant version + this.tm.grantPrivilege(R1, List.of(catalog, N1), N2, PolarisPrivilege.NAMESPACE_FULL_METADATA); + + // Get the updated entity with new grant version + PolarisBaseEntity N2Updated = + this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); + assertThat(N2Updated.getGrantRecordsVersion()).isGreaterThan(originalGrantVersion); + + // Create entity ID list + List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(N2Updated)); + + // Call batch load - this should detect the stale grant version and reload + List<EntityCacheLookupResult> results = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + + // Verify the entity was reloaded with the new grant version + assertThat(results).hasSize(1); + assertThat(results.get(0)).isNotNull(); + + ResolvedPolarisEntity reloadedN2 = results.get(0).getCacheEntry(); + assertThat(reloadedN2.getEntity().getId()).isEqualTo(N2Updated.getId()); + assertThat(reloadedN2.getEntity().getGrantRecordsVersion()) + .isEqualTo(N2Updated.getGrantRecordsVersion()); + + // Should now have more grant records + assertThat(reloadedN2.getGrantRecordsAsSecurable().size()).isGreaterThan(1); + + // Verify the cache now contains the updated grant version + cachedN2 = cache.getEntityById(N2Updated.getId()); + assertThat(cachedN2).isNotNull(); + assertThat(cachedN2.getEntity().getGrantRecordsVersion()) + .isEqualTo(N2Updated.getGrantRecordsVersion()); + } + + @Test + public void testBatchLoadVersionRetryLogic() { + // get a new cache + PolarisMetaStoreManager metaStoreManager = Mockito.spy(this.metaStoreManager); + InMemoryEntityCache cache = + new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(), metaStoreManager); + + // Load catalog into cache + EntityCacheLookupResult lookup = + cache.getOrLoadEntityByName( + this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); + assertThat(lookup).isNotNull(); + PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); + + // Get entities that we can work with + PolarisBaseEntity N1 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N1"); + PolarisBaseEntity N2 = + this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); + + // Load N2 into cache initially + cache.getOrLoadEntityByName( + this.callCtx, + new EntityCacheByNameKey(catalog.getId(), N1.getId(), PolarisEntityType.NAMESPACE, "N2")); + + // Verify it's in cache with original version + ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId()); + assertThat(cachedN2).isNotNull(); + int originalEntityVersion = cachedN2.getEntity().getEntityVersion(); + + // Update the entity multiple times to create version skew + PolarisBaseEntity N2v2 = + this.tm.updateEntity(List.of(catalog, N1), N2, "{\"v2\": \"value\"}", null); + + // the first call should return the first version, then we call the real method to get the + // latest + Mockito.doReturn( + new ChangeTrackingResult( + List.of( + changeTrackingFor(catalog), changeTrackingFor(N1), changeTrackingFor(N2v2)))) + .when(metaStoreManager) + .loadEntitiesChangeTracking(Mockito.any(), Mockito.any()); + Mockito.doCallRealMethod() + .when(metaStoreManager) + .loadEntitiesChangeTracking(Mockito.any(), Mockito.any()); + + // update again to create v3, which isn't returned in the change tracking result + PolarisBaseEntity N2v3 = + this.tm.updateEntity(List.of(catalog, N1), N2v2, "{\"v3\": \"value\"}", null); + + // Verify versions increased + assertThat(N2v2.getEntityVersion()).isGreaterThan(originalEntityVersion); + assertThat(N2v3.getEntityVersion()).isGreaterThan(N2v2.getEntityVersion()); + + // Create entity ID list + List<PolarisEntityId> entityIds = + List.of(getPolarisEntityId(catalog), getPolarisEntityId(N1), getPolarisEntityId(N2)); + + // Call batch load - this should detect the stale versions and reload until consistent + List<EntityCacheLookupResult> results = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + + // Verify the entity was reloaded with the latest version + assertThat(results).hasSize(3); + assertThat(results) + .doesNotContainNull() + .extracting(EntityCacheLookupResult::getCacheEntry) + .doesNotContainNull() + .extracting(e -> e.getEntity().getId()) + .containsExactly(catalog.getId(), N1.getId(), N2.getId()); + + ResolvedPolarisEntity reloadedN2 = results.get(2).getCacheEntry(); + assertThat(reloadedN2.getEntity().getId()).isEqualTo(N2v3.getId()); + assertThat(reloadedN2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion()); + + // Verify the cache now contains the latest version + cachedN2 = cache.getEntityById(N2v3.getId()); + assertThat(cachedN2).isNotNull(); + assertThat(cachedN2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion()); + } + + private static PolarisEntityId getPolarisEntityId(PolarisBaseEntity catalog) { + return new PolarisEntityId(catalog.getCatalogId(), catalog.getId()); + } + + @Test + public void testConcurrentClientLoadingBehavior() throws Exception { + // Load catalog into cache + EntityCacheLookupResult lookup = + allocateNewCache() + .getOrLoadEntityByName( + this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); + assertThat(lookup).isNotNull(); + PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); + + // Get multiple entities to create a larger list for concurrent processing + PolarisBaseEntity N1 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N1"); + PolarisBaseEntity N2 = + this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); + PolarisBaseEntity N3 = + this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N3"); + PolarisBaseEntity N5 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N5"); + PolarisBaseEntity N5_N6 = + this.tm.ensureExistsByName(List.of(catalog, N5), PolarisEntityType.NAMESPACE, "N6"); + + // Create entity IDs list for both clients + List<PolarisEntityId> entityIds = + List.of( + getPolarisEntityId(N1), + getPolarisEntityId(N2), + getPolarisEntityId(N3), + getPolarisEntityId(N5), + getPolarisEntityId(N5_N6)); + + // Update one of the entities to create version differences + PolarisBaseEntity N2v2 = + this.tm.updateEntity( + List.of(catalog, N1), N2, "{\"concurrent_test\": \"client1_version\"}", null); + PolarisBaseEntity N2v3 = + this.tm.updateEntity( + List.of(catalog, N1), N2v2, "{\"concurrent_test\": \"client2_version\"}", null); + + // Mock the metastore manager to control the timing of method calls + PolarisMetaStoreManager mockedMetaStoreManager = Mockito.spy(this.metaStoreManager); + + // Create caches with the mocked metastore manager + InMemoryEntityCache cache = + new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(), mockedMetaStoreManager); + + // Synchronization primitives for controlling execution order + CountDownLatch client1ChangeTrackingResult = new CountDownLatch(1); + Semaphore client1ResolvedEntitiesBlocker = new Semaphore(0); + + // Atomic references to capture results from both threads + AtomicReference<Exception> client1Exception = new AtomicReference<>(); + AtomicReference<Exception> client2Exception = new AtomicReference<>(); + + // Configure mock behavior: + // 1. Allow both threads to call loadEntitiesChangeTracking() with different versions + // 2. Block client1's loadResolvedEntities() until client2's loadResolvedEntities() completes + + // Mock loadEntitiesChangeTracking to return different versions for each client + Mockito.doAnswer( + invocation -> { + // First call (client1) - returns older version for N2 + LOGGER.debug("Returning change tracking for client1"); + return new ChangeTrackingResult( + List.of( + changeTrackingFor(N1), + changeTrackingFor(N2v2), // older version + changeTrackingFor(N3), + changeTrackingFor(N5), + changeTrackingFor(N5_N6))); + }) + .doAnswer( + invocation -> { + // Second call (client2) - returns newer version for N2 + LOGGER.debug("Returning change tracking for client2"); + return new ChangeTrackingResult( + List.of( + changeTrackingFor(N1), + changeTrackingFor(N2v3), // newer version + changeTrackingFor(N3), + changeTrackingFor(N5), + changeTrackingFor(N5_N6))); + }) + .when(mockedMetaStoreManager) + .loadEntitiesChangeTracking(Mockito.any(), Mockito.any()); + + // Mock loadResolvedEntities to control timing - client1 blocks until client2 completes + // client1 receives the older version of all entities, while client2 receives the newer version + // of N2 + Mockito.doAnswer( + invocation -> { + // This is client1's loadResolvedEntities call - block until client2 completes + try { + client1ChangeTrackingResult.countDown(); + LOGGER.debug("Awaiting client2 to complete resolved entities load"); + client1ResolvedEntitiesBlocker.acquire(); // Block until client2 signals completion + List<ResolvedPolarisEntity> resolvedEntities = + List.of( + getResolvedPolarisEntity(N1), + getResolvedPolarisEntity(N2v2), + getResolvedPolarisEntity(N3), + getResolvedPolarisEntity(N5), + getResolvedPolarisEntity(N5_N6)); + LOGGER.debug("Client1 returning results {}", resolvedEntities); + return new ResolvedEntitiesResult(resolvedEntities); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }) + .doAnswer( + invocation -> { + // This is client2's loadResolvedEntities call - execute normally and signal client1 + try { + LOGGER.debug("Client2 loading resolved entities"); + var result = + new ResolvedEntitiesResult( + List.of( + getResolvedPolarisEntity(N1), + getResolvedPolarisEntity(N2v3), + getResolvedPolarisEntity(N3), + getResolvedPolarisEntity(N5), + getResolvedPolarisEntity(N5_N6))); + client1ResolvedEntitiesBlocker.release(); // Allow client1 to proceed + LOGGER.debug("Client2 returning results {}", result.getResolvedEntities()); + return result; + } catch (Exception e) { + client1ResolvedEntitiesBlocker.release(); // Release in case of error + throw e; + } + }) + .when(mockedMetaStoreManager) + .loadResolvedEntities(Mockito.any(), Mockito.any(), Mockito.any()); + + // ExecutorService isn't AutoCloseable in JDK 11 :( + ExecutorService executorService = Executors.newFixedThreadPool(2); + try { + // Client 1 task - should get older version and be blocked during loadResolvedEntities + Future<List<EntityCacheLookupResult>> client1Task = + executorService.submit( + () -> { + try { + // Client1 calls getOrLoadResolvedEntities + // - loadEntitiesChangeTracking returns older version for N2 + // - loadResolvedEntities will block until client2 completes + List<EntityCacheLookupResult> results = + cache.getOrLoadResolvedEntities( + this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + + return results; + } catch (Exception e) { + client1Exception.set(e); + return null; + } + }); + + // Client 2 task - should get newer version and complete first + Future<List<EntityCacheLookupResult>> client2Task = + executorService.submit( + () -> { + try { + // Client2 calls getOrLoadResolvedEntities + // - loadEntitiesChangeTracking returns newer version for N2 + // - loadResolvedEntities executes normally and signals client1 when done + client1ChangeTrackingResult.await(); + List<EntityCacheLookupResult> results = + cache.getOrLoadResolvedEntities( + this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + + return results; + } catch (Exception e) { + client2Exception.set(e); + client1ResolvedEntitiesBlocker.release(); // Release in case of error + return null; + } + }); + + // Wait for both tasks to complete + List<EntityCacheLookupResult> client1Results = client1Task.get(); + List<EntityCacheLookupResult> client2Results = client2Task.get(); + + // Verify no exceptions occurred + assertThat(client1Exception.get()).isNull(); + assertThat(client2Exception.get()).isNull(); + + // Verify both clients got results + assertThat(client1Results).isNotNull(); + assertThat(client2Results).isNotNull(); + assertThat(client1Results).hasSize(5); + assertThat(client2Results).hasSize(5); + + // All entities should be found + assertThat(client1Results).doesNotContainNull(); + assertThat(client2Results).doesNotContainNull(); + + // Verify that client1 got the older version of N2 (index 1 in the list) + ResolvedPolarisEntity client1N2 = client1Results.get(1).getCacheEntry(); + assertThat(client1N2.getEntity().getId()).isEqualTo(N2.getId()); + assertThat(client1N2.getEntity().getEntityVersion()).isEqualTo(N2v2.getEntityVersion()); + + // Verify that client2 got the newer version of N2 + ResolvedPolarisEntity client2N2 = client2Results.get(1).getCacheEntry(); + assertThat(client2N2.getEntity().getId()).isEqualTo(N2.getId()); + assertThat(client2N2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion()); + + // Verify that both clients got consistent versions for other entities + for (int i = 0; i < 5; i++) { + if (i != 1) { // Skip N2 which we expect to be different + ResolvedPolarisEntity client1Entity = client1Results.get(i).getCacheEntry(); + ResolvedPolarisEntity client2Entity = client2Results.get(i).getCacheEntry(); + + assertThat(client1Entity.getEntity().getId()) + .isEqualTo(client2Entity.getEntity().getId()); + assertThat(client1Entity.getEntity().getEntityVersion()) + .isEqualTo(client2Entity.getEntity().getEntityVersion()); + assertThat(client1Entity.getEntity().getGrantRecordsVersion()) + .isEqualTo(client2Entity.getEntity().getGrantRecordsVersion()); + } + } + assertThat(entityIds).extracting(id -> cache.getEntityById(id.getId())).doesNotContainNull(); + } finally { + executorService.shutdown(); + } + } + + private static ResolvedPolarisEntity getResolvedPolarisEntity(PolarisBaseEntity catalog) { + return new ResolvedPolarisEntity(PolarisEntity.of(catalog), List.of(), List.of()); + } + + private static PolarisChangeTrackingVersions changeTrackingFor(PolarisBaseEntity entity) { + return new PolarisChangeTrackingVersions( + entity.getEntityVersion(), entity.getGrantRecordsVersion()); } } diff --git a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java index 0eb7414b6..222e36a47 100644 --- a/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java +++ b/polaris-core/src/testFixtures/java/org/apache/polaris/core/persistence/BasePolarisMetaStoreManagerTest.java @@ -118,7 +118,7 @@ public abstract class BasePolarisMetaStoreManagerTest { .containsExactly(PolarisEntity.toCore(task1), PolarisEntity.toCore(task2)); List<PolarisBaseEntity> listedEntities = - metaStoreManager.loadFullEntitiesAll( + metaStoreManager.listFullEntitiesAll( polarisTestMetaStoreManager.polarisCallContext, null, PolarisEntityType.TASK, diff --git a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java index 656323647..70b7f3624 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/admin/PolarisAdminService.java @@ -953,7 +953,7 @@ public class PolarisAdminService { /** List all catalogs without checking for permission. */ private Stream<CatalogEntity> listCatalogsUnsafe() { return metaStoreManager - .loadFullEntitiesAll( + .listFullEntitiesAll( getCurrentPolarisContext(), null, PolarisEntityType.CATALOG, @@ -1195,7 +1195,7 @@ public class PolarisAdminService { authorizeBasicRootOperationOrThrow(op); return metaStoreManager - .loadFullEntitiesAll( + .listFullEntitiesAll( getCurrentPolarisContext(), null, PolarisEntityType.PRINCIPAL, @@ -1303,7 +1303,7 @@ public class PolarisAdminService { authorizeBasicRootOperationOrThrow(op); return metaStoreManager - .loadFullEntitiesAll( + .listFullEntitiesAll( getCurrentPolarisContext(), null, PolarisEntityType.PRINCIPAL_ROLE, @@ -1431,7 +1431,7 @@ public class PolarisAdminService { .orElseThrow(() -> new NotFoundException("Parent catalog %s not found", catalogName)); List<PolarisEntityCore> catalogPath = PolarisEntity.toCoreList(List.of(catalogEntity)); return metaStoreManager - .loadFullEntitiesAll( + .listFullEntitiesAll( getCurrentPolarisContext(), catalogPath, PolarisEntityType.CATALOG_ROLE, diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java index 9cd061ab6..b3fbd0317 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java @@ -189,7 +189,7 @@ public class PolicyCatalog { } // with a policyType filter we need to load the full PolicyEntity to apply the filter return metaStoreManager - .loadFullEntitiesAll( + .listFullEntitiesAll( callContext.getPolarisCallContext(), catalogPath, PolarisEntityType.POLICY,
