This is an automated email from the ASF dual-hosted git repository. dimas pushed a commit to branch dmitri-tmp1 in repository https://gitbox.apache.org/repos/asf/polaris.git
commit a5e779dd77efdfc7d05d00a0ed2e2768c0bae622 Author: Michael Collado <[email protected]> AuthorDate: Tue Sep 23 15:55:48 2025 -0700 Added additional test and updated comments in EntityCache interface --- .../core/persistence/cache/EntityCache.java | 16 ++ .../persistence/cache/InMemoryEntityCache.java | 23 +- .../persistence/cache/InMemoryEntityCacheTest.java | 284 +++++++++++++++------ 3 files changed, 244 insertions(+), 79 deletions(-) diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java index 93dc87ac9..7cfea2a84 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java @@ -87,6 +87,17 @@ public interface EntityCache { /** * Load multiple entities by id, returning those found in the cache and loading those not found. * + * <p>Cached entity versions and grant versions must be verified against the versions returned by + * the {@link + * org.apache.polaris.core.persistence.PolarisMetaStoreManager#loadEntitiesChangeTracking(PolarisCallContext, + * List)} API to ensure the returned entities are consistent with the current state of the + * metastore. Cache implementations must never return a mix of stale entities and fresh entities, + * as authorization or table conflict decisions could be made based on inconsistent data. For + * example, a Principal may have a grant to a Principal Role in a cached entry, but that grant may + * be revoked prior to the Principal Role being granted a privilege on a Catalog. If the Principal + * record is stale, but the Principal Role is refreshed, the Principal may be incorrectly + * authorized to access the Catalog. + * * @param callCtx the Polaris call context * @param entityType the entity type * @param entityIds the list of entity ids to load @@ -105,6 +116,11 @@ public interface EntityCache { * Load multiple entities by {@link EntityNameLookupRecord}, returning those found in the cache * and loading those not found. * + * <p>see note in {@link #getOrLoadResolvedEntities(PolarisCallContext, PolarisEntityType, List)} + * re: the need to validate cache contents against the {@link + * org.apache.polaris.core.persistence.PolarisMetaStoreManager#loadEntitiesChangeTracking(PolarisCallContext, + * List)} to avoid invalid authorization or conflict detection decisions based on stale entries. + * * @param callCtx the Polaris call context * @param lookupRecords the list of entity name to load * @return the list of resolved entities, in the same order as the requested entity records. As in diff --git a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java index a10a1f5ea..e8b4163e7 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java @@ -58,6 +58,7 @@ import java.util.stream.Collectors; /** An in-memory entity cache with a limit of 100k entities and a 1h TTL. */ public class InMemoryEntityCache implements EntityCache { private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryEntityCache.class); + public static final int MAX_CACHE_REFRESH_ATTEMPTS = 100; private final PolarisDiagnostics diagnostics; private final PolarisMetaStoreManager polarisMetaStoreManager; private final Cache<Long, ResolvedPolarisEntity> byId; @@ -477,13 +478,22 @@ public class InMemoryEntityCache implements EntityCache { // trying to populate // the cache from a different snapshot Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities = new HashMap<>(); - for (int i = 0; i < 100; i++) { + boolean stateResolved = false; + for (int i = 0; i < MAX_CACHE_REFRESH_ATTEMPTS; i++) { Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc = idsToLoad -> polarisMetaStoreManager.loadResolvedEntities(callCtx, entityType, idsToLoad); if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc)) { + stateResolved = true; break; } } + if (!stateResolved) { + LOGGER.warn( + "Unable to resolve entities in cache after multiple attempts {} - resolved: {}", + entityIds, + resolvedEntities); + diagnostics.fail("cannot_resolve_all_entities", "Unable to resolve entities in cache"); + } return entityIds.stream() .map( @@ -517,11 +527,20 @@ public class InMemoryEntityCache implements EntityCache { lookupRecords.stream() .map(e -> new PolarisEntityId(e.getCatalogId(), e.getId())) .collect(Collectors.toList()); - for (int i = 0; i < 100; i++) { + boolean stateResolved = false; + for (int i = 0; i < MAX_CACHE_REFRESH_ATTEMPTS; i++) { if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc)) { + stateResolved = true; break; } } + if (!stateResolved) { + LOGGER.warn( + "Unable to resolve entities in cache after multiple attempts {} - resolved: {}", + entityIds, + resolvedEntities); + diagnostics.fail("cannot_resolve_all_entities", "Unable to resolve entities in cache"); + } return lookupRecords.stream() .map( diff --git a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java index dd90f5027..43d9c8820 100644 --- a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java +++ b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java @@ -53,8 +53,12 @@ import org.apache.polaris.core.persistence.transactional.TransactionalMetaStoreM import org.apache.polaris.core.persistence.transactional.TransactionalPersistence; import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore; import org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPersistenceImpl; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -632,8 +636,9 @@ public class InMemoryEntityCacheTest { assertThat(results.get(2).isCacheHit()).isTrue(); } - @Test - public void testBatchLoadWithStaleVersions() { + @ParameterizedTest + @ValueSource(strings = {"id", "name"}) + public void testBatchLoadWithStaleVersions(String loadType) { // get a new cache InMemoryEntityCache cache = this.allocateNewCache(); @@ -674,12 +679,18 @@ public class InMemoryEntityCacheTest { assertThat(T6v2).isNotNull(); assertThat(T6v2.getEntityVersion()).isGreaterThan(T6v1.getEntityVersion()); - // Create entity ID list with the updated entity - List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(T6v2)); - - // Call batch load - this should detect the stale version and reload - List<EntityCacheLookupResult> results = - cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.TABLE_LIKE, entityIds); + List<EntityCacheLookupResult> results; + if (loadType.equals("id")) { + // Create entity ID list with the updated entity + List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(T6v2)); + + // Call batch load - this should detect the stale version and reload + results = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.TABLE_LIKE, entityIds); + } else { + results = + cache.getOrLoadResolvedEntities(this.callCtx, List.of(new EntityNameLookupRecord(T6v2))); + } // Verify the entity was reloaded with the new version assertThat(results).hasSize(1); @@ -695,8 +706,9 @@ public class InMemoryEntityCacheTest { assertThat(cachedT6.getEntity().getEntityVersion()).isEqualTo(T6v2.getEntityVersion()); } - @Test - public void testBatchLoadWithStaleGrantVersions() { + @ParameterizedTest + @ValueSource(strings = {"id", "name"}) + public void testBatchLoadWithStaleGrantVersions(String loadType) { // get a new cache InMemoryEntityCache cache = this.allocateNewCache(); @@ -733,12 +745,19 @@ public class InMemoryEntityCacheTest { this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); assertThat(N2Updated.getGrantRecordsVersion()).isGreaterThan(originalGrantVersion); - // Create entity ID list - List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(N2Updated)); - - // Call batch load - this should detect the stale grant version and reload - List<EntityCacheLookupResult> results = - cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + List<EntityCacheLookupResult> results; + if (loadType.equals("id")) { + // Create entity ID list + List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(N2Updated)); + + // Call batch load - this should detect the stale grant version and reload + results = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + } else { + results = + cache.getOrLoadResolvedEntities( + this.callCtx, List.of(new EntityNameLookupRecord(N2Updated))); + } // Verify the entity was reloaded with the new grant version assertThat(results).hasSize(1); @@ -759,8 +778,9 @@ public class InMemoryEntityCacheTest { .isEqualTo(N2Updated.getGrantRecordsVersion()); } - @Test - public void testBatchLoadVersionRetryLogic() { + @ParameterizedTest + @ValueSource(strings = {"id", "name"}) + public void testBatchLoadVersionRetryLogic(String loadType) { // get a new cache PolarisMetaStoreManager metaStoreManager = Mockito.spy(this.metaStoreManager); InMemoryEntityCache cache = @@ -813,13 +833,24 @@ public class InMemoryEntityCacheTest { assertThat(N2v2.getEntityVersion()).isGreaterThan(originalEntityVersion); assertThat(N2v3.getEntityVersion()).isGreaterThan(N2v2.getEntityVersion()); - // Create entity ID list - List<PolarisEntityId> entityIds = - List.of(getPolarisEntityId(catalog), getPolarisEntityId(N1), getPolarisEntityId(N2)); - - // Call batch load - this should detect the stale versions and reload until consistent - List<EntityCacheLookupResult> results = - cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + List<EntityCacheLookupResult> results; + if (loadType.equals("id")) { + // Create entity ID list + List<PolarisEntityId> entityIds = + List.of(getPolarisEntityId(catalog), getPolarisEntityId(N1), getPolarisEntityId(N2)); + + // Call batch load - this should detect the stale versions and reload until consistent + results = + cache.getOrLoadResolvedEntities(this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + } else { + results = + cache.getOrLoadResolvedEntities( + this.callCtx, + List.of( + new EntityNameLookupRecord(catalog), + new EntityNameLookupRecord(N1), + new EntityNameLookupRecord(N2))); + } // Verify the entity was reloaded with the latest version assertThat(results).hasSize(3); @@ -840,12 +871,86 @@ public class InMemoryEntityCacheTest { assertThat(cachedN2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion()); } + @ParameterizedTest + @ValueSource(strings = {"id", "name"}) + public void testBatchLoadVersionRetryFailsAfterMaxAttempts(String loadType) { + // get a new cache + PolarisMetaStoreManager metaStoreManager = Mockito.spy(this.metaStoreManager); + InMemoryEntityCache cache = + new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(), metaStoreManager); + + // Load catalog into cache + EntityCacheLookupResult lookup = + cache.getOrLoadEntityByName( + this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, "test")); + assertThat(lookup).isNotNull(); + PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity(); + + // Get entities that we can work with + PolarisBaseEntity N1 = + this.tm.ensureExistsByName(List.of(catalog), PolarisEntityType.NAMESPACE, "N1"); + PolarisBaseEntity N2 = + this.tm.ensureExistsByName(List.of(catalog, N1), PolarisEntityType.NAMESPACE, "N2"); + + // Load N2 into cache initially + cache.getOrLoadEntityByName( + this.callCtx, + new EntityCacheByNameKey(catalog.getId(), N1.getId(), PolarisEntityType.NAMESPACE, "N2")); + + // Verify it's in cache with original version + ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId()); + assertThat(cachedN2).isNotNull(); + int originalEntityVersion = cachedN2.getEntity().getEntityVersion(); + + // Update the entity multiple times to create version skew + PolarisBaseEntity N2v2 = + this.tm.updateEntity(List.of(catalog, N1), N2, "{\"v2\": \"value\"}", null); + + // return v2 for the change tracking + Mockito.doReturn( + new ChangeTrackingResult( + List.of( + changeTrackingFor(catalog), changeTrackingFor(N1), changeTrackingFor(N2v2)))) + .when(metaStoreManager) + .loadEntitiesChangeTracking(Mockito.any(), Mockito.any()); + + // update again to create v3, which isn't returned in the change tracking result + PolarisBaseEntity N2v3 = + this.tm.updateEntity(List.of(catalog, N1), N2v2, "{\"v3\": \"value\"}", null); + + // Verify versions increased + assertThat(N2v2.getEntityVersion()).isGreaterThan(originalEntityVersion); + assertThat(N2v3.getEntityVersion()).isGreaterThan(N2v2.getEntityVersion()); + + if (loadType.equals("id")) { + // Create entity ID list + List<PolarisEntityId> entityIds = + List.of(getPolarisEntityId(catalog), getPolarisEntityId(N1), getPolarisEntityId(N2)); + Assertions.assertThatThrownBy( + () -> + cache.getOrLoadResolvedEntities( + this.callCtx, PolarisEntityType.NAMESPACE, entityIds)) + .isInstanceOf(RuntimeException.class); + } else { + Assertions.assertThatThrownBy( + () -> + cache.getOrLoadResolvedEntities( + this.callCtx, + List.of( + new EntityNameLookupRecord(catalog), + new EntityNameLookupRecord(N1), + new EntityNameLookupRecord(N2)))) + .isInstanceOf(RuntimeException.class); + } + } + private static PolarisEntityId getPolarisEntityId(PolarisBaseEntity catalog) { return new PolarisEntityId(catalog.getCatalogId(), catalog.getId()); } - @Test - public void testConcurrentClientLoadingBehavior() throws Exception { + @ParameterizedTest + @ValueSource(strings = {"id", "name"}) + public void testConcurrentClientLoadingBehavior(String loadType) throws Exception { // Load catalog into cache EntityCacheLookupResult lookup = allocateNewCache() @@ -933,50 +1038,59 @@ public class InMemoryEntityCacheTest { // Mock loadResolvedEntities to control timing - client1 blocks until client2 completes // client1 receives the older version of all entities, while client2 receives the newer version // of N2 - Mockito.doAnswer( - invocation -> { - // This is client1's loadResolvedEntities call - block until client2 completes - try { - client1ChangeTrackingResult.countDown(); - LOGGER.debug("Awaiting client2 to complete resolved entities load"); - client1ResolvedEntitiesBlocker.acquire(); // Block until client2 signals completion - List<ResolvedPolarisEntity> resolvedEntities = + Answer client1Answer = + invocation -> { + // This is client1's loadResolvedEntities call - block until client2 completes + try { + client1ChangeTrackingResult.countDown(); + LOGGER.debug("Awaiting client2 to complete resolved entities load"); + client1ResolvedEntitiesBlocker.acquire(); // Block until client2 signals completion + List<ResolvedPolarisEntity> resolvedEntities = + List.of( + getResolvedPolarisEntity(N1), + getResolvedPolarisEntity(N2v2), + getResolvedPolarisEntity(N3), + getResolvedPolarisEntity(N5), + getResolvedPolarisEntity(N5_N6)); + LOGGER.debug("Client1 returning results {}", resolvedEntities); + return new ResolvedEntitiesResult(resolvedEntities); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }; + Answer client2Answer = + invocation -> { + // This is client2's loadResolvedEntities call - execute normally and signal client1 + try { + LOGGER.debug("Client2 loading resolved entities"); + var result = + new ResolvedEntitiesResult( List.of( getResolvedPolarisEntity(N1), - getResolvedPolarisEntity(N2v2), + getResolvedPolarisEntity(N2v3), getResolvedPolarisEntity(N3), getResolvedPolarisEntity(N5), - getResolvedPolarisEntity(N5_N6)); - LOGGER.debug("Client1 returning results {}", resolvedEntities); - return new ResolvedEntitiesResult(resolvedEntities); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - }) - .doAnswer( - invocation -> { - // This is client2's loadResolvedEntities call - execute normally and signal client1 - try { - LOGGER.debug("Client2 loading resolved entities"); - var result = - new ResolvedEntitiesResult( - List.of( - getResolvedPolarisEntity(N1), - getResolvedPolarisEntity(N2v3), - getResolvedPolarisEntity(N3), - getResolvedPolarisEntity(N5), - getResolvedPolarisEntity(N5_N6))); - client1ResolvedEntitiesBlocker.release(); // Allow client1 to proceed - LOGGER.debug("Client2 returning results {}", result.getResolvedEntities()); - return result; - } catch (Exception e) { - client1ResolvedEntitiesBlocker.release(); // Release in case of error - throw e; - } - }) - .when(mockedMetaStoreManager) - .loadResolvedEntities(Mockito.any(), Mockito.any(), Mockito.any()); + getResolvedPolarisEntity(N5_N6))); + client1ResolvedEntitiesBlocker.release(); // Allow client1 to proceed + LOGGER.debug("Client2 returning results {}", result.getResolvedEntities()); + return result; + } catch (Exception e) { + client1ResolvedEntitiesBlocker.release(); // Release in case of error + throw e; + } + }; + if (loadType.equals("id")) { + Mockito.doAnswer(client1Answer) + .doAnswer(client2Answer) + .when(mockedMetaStoreManager) + .loadResolvedEntities(Mockito.any(), Mockito.any(), Mockito.any()); + } else { + Mockito.doAnswer(client1Answer) + .doAnswer(client2Answer) + .when(mockedMetaStoreManager) + .loadResolvedEntities(Mockito.any(), Mockito.any()); + } // ExecutorService isn't AutoCloseable in JDK 11 :( ExecutorService executorService = Executors.newFixedThreadPool(2); @@ -989,11 +1103,19 @@ public class InMemoryEntityCacheTest { // Client1 calls getOrLoadResolvedEntities // - loadEntitiesChangeTracking returns older version for N2 // - loadResolvedEntities will block until client2 completes - List<EntityCacheLookupResult> results = - cache.getOrLoadResolvedEntities( - this.callCtx, PolarisEntityType.NAMESPACE, entityIds); - - return results; + if (loadType.equals("id")) { + return cache.getOrLoadResolvedEntities( + this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + } else { + return cache.getOrLoadResolvedEntities( + this.callCtx, + List.of( + new EntityNameLookupRecord(N1), + new EntityNameLookupRecord(N2), + new EntityNameLookupRecord(N3), + new EntityNameLookupRecord(N5), + new EntityNameLookupRecord(N5_N6))); + } } catch (Exception e) { client1Exception.set(e); return null; @@ -1009,11 +1131,19 @@ public class InMemoryEntityCacheTest { // - loadEntitiesChangeTracking returns newer version for N2 // - loadResolvedEntities executes normally and signals client1 when done client1ChangeTrackingResult.await(); - List<EntityCacheLookupResult> results = - cache.getOrLoadResolvedEntities( - this.callCtx, PolarisEntityType.NAMESPACE, entityIds); - - return results; + if (loadType.equals("id")) { + return cache.getOrLoadResolvedEntities( + this.callCtx, PolarisEntityType.NAMESPACE, entityIds); + } else { + return cache.getOrLoadResolvedEntities( + this.callCtx, + List.of( + new EntityNameLookupRecord(N1), + new EntityNameLookupRecord(N2), + new EntityNameLookupRecord(N3), + new EntityNameLookupRecord(N5), + new EntityNameLookupRecord(N5_N6))); + } } catch (Exception e) { client2Exception.set(e); client1ResolvedEntitiesBlocker.release(); // Release in case of error
