This is an automated email from the ASF dual-hosted git repository.

dimas pushed a commit to branch dmitri-tmp1
in repository https://gitbox.apache.org/repos/asf/polaris.git

commit a5e779dd77efdfc7d05d00a0ed2e2768c0bae622
Author: Michael Collado <[email protected]>
AuthorDate: Tue Sep 23 15:55:48 2025 -0700

    Added additional test and updated comments in EntityCache interface
---
 .../core/persistence/cache/EntityCache.java        |  16 ++
 .../persistence/cache/InMemoryEntityCache.java     |  23 +-
 .../persistence/cache/InMemoryEntityCacheTest.java | 284 +++++++++++++++------
 3 files changed, 244 insertions(+), 79 deletions(-)

diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
index 93dc87ac9..7cfea2a84 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/EntityCache.java
@@ -87,6 +87,17 @@ public interface EntityCache {
   /**
    * Load multiple entities by id, returning those found in the cache and 
loading those not found.
    *
+   * <p>Cached entity versions and grant versions must be verified against the 
versions returned by
+   * the {@link
+   * 
org.apache.polaris.core.persistence.PolarisMetaStoreManager#loadEntitiesChangeTracking(PolarisCallContext,
+   * List)} API to ensure the returned entities are consistent with the 
current state of the
+   * metastore. Cache implementations must never return a mix of stale 
entities and fresh entities,
+   * as authorization or table conflict decisions could be made based on 
inconsistent data. For
+   * example, a Principal may have a grant to a Principal Role in a cached 
entry, but that grant may
+   * be revoked prior to the Principal Role being granted a privilege on a 
Catalog. If the Principal
+   * record is stale, but the Principal Role is refreshed, the Principal may 
be incorrectly
+   * authorized to access the Catalog.
+   *
    * @param callCtx the Polaris call context
    * @param entityType the entity type
    * @param entityIds the list of entity ids to load
@@ -105,6 +116,11 @@ public interface EntityCache {
    * Load multiple entities by {@link EntityNameLookupRecord}, returning those 
found in the cache
    * and loading those not found.
    *
+   * <p>see note in {@link #getOrLoadResolvedEntities(PolarisCallContext, 
PolarisEntityType, List)}
+   * re: the need to validate cache contents against the {@link
+   * 
org.apache.polaris.core.persistence.PolarisMetaStoreManager#loadEntitiesChangeTracking(PolarisCallContext,
+   * List)} to avoid invalid authorization or conflict detection decisions 
based on stale entries.
+   *
    * @param callCtx the Polaris call context
    * @param lookupRecords the list of entity name to load
    * @return the list of resolved entities, in the same order as the requested 
entity records. As in
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
index a10a1f5ea..e8b4163e7 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCache.java
@@ -58,6 +58,7 @@ import java.util.stream.Collectors;
 /** An in-memory entity cache with a limit of 100k entities and a 1h TTL. */
 public class InMemoryEntityCache implements EntityCache {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InMemoryEntityCache.class);
+  public static final int MAX_CACHE_REFRESH_ATTEMPTS = 100;
   private final PolarisDiagnostics diagnostics;
   private final PolarisMetaStoreManager polarisMetaStoreManager;
   private final Cache<Long, ResolvedPolarisEntity> byId;
@@ -477,13 +478,22 @@ public class InMemoryEntityCache implements EntityCache {
     // trying to populate
     // the cache from a different snapshot
     Map<PolarisEntityId, ResolvedPolarisEntity> resolvedEntities = new 
HashMap<>();
-    for (int i = 0; i < 100; i++) {
+    boolean stateResolved = false;
+    for (int i = 0; i < MAX_CACHE_REFRESH_ATTEMPTS; i++) {
       Function<List<PolarisEntityId>, ResolvedEntitiesResult> loaderFunc =
           idsToLoad -> polarisMetaStoreManager.loadResolvedEntities(callCtx, 
entityType, idsToLoad);
       if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc)) 
{
+        stateResolved = true;
         break;
       }
     }
+    if (!stateResolved) {
+      LOGGER.warn(
+          "Unable to resolve entities in cache after multiple attempts {} - 
resolved: {}",
+          entityIds,
+          resolvedEntities);
+      diagnostics.fail("cannot_resolve_all_entities", "Unable to resolve 
entities in cache");
+    }
 
     return entityIds.stream()
         .map(
@@ -517,11 +527,20 @@ public class InMemoryEntityCache implements EntityCache {
         lookupRecords.stream()
             .map(e -> new PolarisEntityId(e.getCatalogId(), e.getId()))
             .collect(Collectors.toList());
-    for (int i = 0; i < 100; i++) {
+    boolean stateResolved = false;
+    for (int i = 0; i < MAX_CACHE_REFRESH_ATTEMPTS; i++) {
       if (isCacheStateValid(callCtx, resolvedEntities, entityIds, loaderFunc)) 
{
+        stateResolved = true;
         break;
       }
     }
+    if (!stateResolved) {
+      LOGGER.warn(
+          "Unable to resolve entities in cache after multiple attempts {} - 
resolved: {}",
+          entityIds,
+          resolvedEntities);
+      diagnostics.fail("cannot_resolve_all_entities", "Unable to resolve 
entities in cache");
+    }
 
     return lookupRecords.stream()
         .map(
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
index dd90f5027..43d9c8820 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/core/persistence/cache/InMemoryEntityCacheTest.java
@@ -53,8 +53,12 @@ import 
org.apache.polaris.core.persistence.transactional.TransactionalMetaStoreM
 import 
org.apache.polaris.core.persistence.transactional.TransactionalPersistence;
 import org.apache.polaris.core.persistence.transactional.TreeMapMetaStore;
 import 
org.apache.polaris.core.persistence.transactional.TreeMapTransactionalPersistenceImpl;
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -632,8 +636,9 @@ public class InMemoryEntityCacheTest {
     assertThat(results.get(2).isCacheHit()).isTrue();
   }
 
-  @Test
-  public void testBatchLoadWithStaleVersions() {
+  @ParameterizedTest
+  @ValueSource(strings = {"id", "name"})
+  public void testBatchLoadWithStaleVersions(String loadType) {
     // get a new cache
     InMemoryEntityCache cache = this.allocateNewCache();
 
@@ -674,12 +679,18 @@ public class InMemoryEntityCacheTest {
     assertThat(T6v2).isNotNull();
     assertThat(T6v2.getEntityVersion()).isGreaterThan(T6v1.getEntityVersion());
 
-    // Create entity ID list with the updated entity
-    List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(T6v2));
-
-    // Call batch load - this should detect the stale version and reload
-    List<EntityCacheLookupResult> results =
-        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.TABLE_LIKE, entityIds);
+    List<EntityCacheLookupResult> results;
+    if (loadType.equals("id")) {
+      // Create entity ID list with the updated entity
+      List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(T6v2));
+
+      // Call batch load - this should detect the stale version and reload
+      results =
+          cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.TABLE_LIKE, entityIds);
+    } else {
+      results =
+          cache.getOrLoadResolvedEntities(this.callCtx, List.of(new 
EntityNameLookupRecord(T6v2)));
+    }
 
     // Verify the entity was reloaded with the new version
     assertThat(results).hasSize(1);
@@ -695,8 +706,9 @@ public class InMemoryEntityCacheTest {
     
assertThat(cachedT6.getEntity().getEntityVersion()).isEqualTo(T6v2.getEntityVersion());
   }
 
-  @Test
-  public void testBatchLoadWithStaleGrantVersions() {
+  @ParameterizedTest
+  @ValueSource(strings = {"id", "name"})
+  public void testBatchLoadWithStaleGrantVersions(String loadType) {
     // get a new cache
     InMemoryEntityCache cache = this.allocateNewCache();
 
@@ -733,12 +745,19 @@ public class InMemoryEntityCacheTest {
         this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N2");
     
assertThat(N2Updated.getGrantRecordsVersion()).isGreaterThan(originalGrantVersion);
 
-    // Create entity ID list
-    List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(N2Updated));
-
-    // Call batch load - this should detect the stale grant version and reload
-    List<EntityCacheLookupResult> results =
-        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.NAMESPACE, entityIds);
+    List<EntityCacheLookupResult> results;
+    if (loadType.equals("id")) {
+      // Create entity ID list
+      List<PolarisEntityId> entityIds = List.of(getPolarisEntityId(N2Updated));
+
+      // Call batch load - this should detect the stale grant version and 
reload
+      results =
+          cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.NAMESPACE, entityIds);
+    } else {
+      results =
+          cache.getOrLoadResolvedEntities(
+              this.callCtx, List.of(new EntityNameLookupRecord(N2Updated)));
+    }
 
     // Verify the entity was reloaded with the new grant version
     assertThat(results).hasSize(1);
@@ -759,8 +778,9 @@ public class InMemoryEntityCacheTest {
         .isEqualTo(N2Updated.getGrantRecordsVersion());
   }
 
-  @Test
-  public void testBatchLoadVersionRetryLogic() {
+  @ParameterizedTest
+  @ValueSource(strings = {"id", "name"})
+  public void testBatchLoadVersionRetryLogic(String loadType) {
     // get a new cache
     PolarisMetaStoreManager metaStoreManager = 
Mockito.spy(this.metaStoreManager);
     InMemoryEntityCache cache =
@@ -813,13 +833,24 @@ public class InMemoryEntityCacheTest {
     assertThat(N2v2.getEntityVersion()).isGreaterThan(originalEntityVersion);
     assertThat(N2v3.getEntityVersion()).isGreaterThan(N2v2.getEntityVersion());
 
-    // Create entity ID list
-    List<PolarisEntityId> entityIds =
-        List.of(getPolarisEntityId(catalog), getPolarisEntityId(N1), 
getPolarisEntityId(N2));
-
-    // Call batch load - this should detect the stale versions and reload 
until consistent
-    List<EntityCacheLookupResult> results =
-        cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.NAMESPACE, entityIds);
+    List<EntityCacheLookupResult> results;
+    if (loadType.equals("id")) {
+      // Create entity ID list
+      List<PolarisEntityId> entityIds =
+          List.of(getPolarisEntityId(catalog), getPolarisEntityId(N1), 
getPolarisEntityId(N2));
+
+      // Call batch load - this should detect the stale versions and reload 
until consistent
+      results =
+          cache.getOrLoadResolvedEntities(this.callCtx, 
PolarisEntityType.NAMESPACE, entityIds);
+    } else {
+      results =
+          cache.getOrLoadResolvedEntities(
+              this.callCtx,
+              List.of(
+                  new EntityNameLookupRecord(catalog),
+                  new EntityNameLookupRecord(N1),
+                  new EntityNameLookupRecord(N2)));
+    }
 
     // Verify the entity was reloaded with the latest version
     assertThat(results).hasSize(3);
@@ -840,12 +871,86 @@ public class InMemoryEntityCacheTest {
     
assertThat(cachedN2.getEntity().getEntityVersion()).isEqualTo(N2v3.getEntityVersion());
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = {"id", "name"})
+  public void testBatchLoadVersionRetryFailsAfterMaxAttempts(String loadType) {
+    // get a new cache
+    PolarisMetaStoreManager metaStoreManager = 
Mockito.spy(this.metaStoreManager);
+    InMemoryEntityCache cache =
+        new InMemoryEntityCache(diagServices, callCtx.getRealmConfig(), 
metaStoreManager);
+
+    // Load catalog into cache
+    EntityCacheLookupResult lookup =
+        cache.getOrLoadEntityByName(
+            this.callCtx, new EntityCacheByNameKey(PolarisEntityType.CATALOG, 
"test"));
+    assertThat(lookup).isNotNull();
+    PolarisBaseEntity catalog = lookup.getCacheEntry().getEntity();
+
+    // Get entities that we can work with
+    PolarisBaseEntity N1 =
+        this.tm.ensureExistsByName(List.of(catalog), 
PolarisEntityType.NAMESPACE, "N1");
+    PolarisBaseEntity N2 =
+        this.tm.ensureExistsByName(List.of(catalog, N1), 
PolarisEntityType.NAMESPACE, "N2");
+
+    // Load N2 into cache initially
+    cache.getOrLoadEntityByName(
+        this.callCtx,
+        new EntityCacheByNameKey(catalog.getId(), N1.getId(), 
PolarisEntityType.NAMESPACE, "N2"));
+
+    // Verify it's in cache with original version
+    ResolvedPolarisEntity cachedN2 = cache.getEntityById(N2.getId());
+    assertThat(cachedN2).isNotNull();
+    int originalEntityVersion = cachedN2.getEntity().getEntityVersion();
+
+    // Update the entity multiple times to create version skew
+    PolarisBaseEntity N2v2 =
+        this.tm.updateEntity(List.of(catalog, N1), N2, "{\"v2\": \"value\"}", 
null);
+
+    // return v2 for the change tracking
+    Mockito.doReturn(
+            new ChangeTrackingResult(
+                List.of(
+                    changeTrackingFor(catalog), changeTrackingFor(N1), 
changeTrackingFor(N2v2))))
+        .when(metaStoreManager)
+        .loadEntitiesChangeTracking(Mockito.any(), Mockito.any());
+
+    // update again to create v3, which isn't returned in the change tracking 
result
+    PolarisBaseEntity N2v3 =
+        this.tm.updateEntity(List.of(catalog, N1), N2v2, "{\"v3\": 
\"value\"}", null);
+
+    // Verify versions increased
+    assertThat(N2v2.getEntityVersion()).isGreaterThan(originalEntityVersion);
+    assertThat(N2v3.getEntityVersion()).isGreaterThan(N2v2.getEntityVersion());
+
+    if (loadType.equals("id")) {
+      // Create entity ID list
+      List<PolarisEntityId> entityIds =
+          List.of(getPolarisEntityId(catalog), getPolarisEntityId(N1), 
getPolarisEntityId(N2));
+      Assertions.assertThatThrownBy(
+              () ->
+                  cache.getOrLoadResolvedEntities(
+                      this.callCtx, PolarisEntityType.NAMESPACE, entityIds))
+          .isInstanceOf(RuntimeException.class);
+    } else {
+      Assertions.assertThatThrownBy(
+              () ->
+                  cache.getOrLoadResolvedEntities(
+                      this.callCtx,
+                      List.of(
+                          new EntityNameLookupRecord(catalog),
+                          new EntityNameLookupRecord(N1),
+                          new EntityNameLookupRecord(N2))))
+          .isInstanceOf(RuntimeException.class);
+    }
+  }
+
   private static PolarisEntityId getPolarisEntityId(PolarisBaseEntity catalog) 
{
     return new PolarisEntityId(catalog.getCatalogId(), catalog.getId());
   }
 
-  @Test
-  public void testConcurrentClientLoadingBehavior() throws Exception {
+  @ParameterizedTest
+  @ValueSource(strings = {"id", "name"})
+  public void testConcurrentClientLoadingBehavior(String loadType) throws 
Exception {
     // Load catalog into cache
     EntityCacheLookupResult lookup =
         allocateNewCache()
@@ -933,50 +1038,59 @@ public class InMemoryEntityCacheTest {
     // Mock loadResolvedEntities to control timing - client1 blocks until 
client2 completes
     // client1 receives the older version of all entities, while client2 
receives the newer version
     // of N2
-    Mockito.doAnswer(
-            invocation -> {
-              // This is client1's loadResolvedEntities call - block until 
client2 completes
-              try {
-                client1ChangeTrackingResult.countDown();
-                LOGGER.debug("Awaiting client2 to complete resolved entities 
load");
-                client1ResolvedEntitiesBlocker.acquire(); // Block until 
client2 signals completion
-                List<ResolvedPolarisEntity> resolvedEntities =
+    Answer client1Answer =
+        invocation -> {
+          // This is client1's loadResolvedEntities call - block until client2 
completes
+          try {
+            client1ChangeTrackingResult.countDown();
+            LOGGER.debug("Awaiting client2 to complete resolved entities 
load");
+            client1ResolvedEntitiesBlocker.acquire(); // Block until client2 
signals completion
+            List<ResolvedPolarisEntity> resolvedEntities =
+                List.of(
+                    getResolvedPolarisEntity(N1),
+                    getResolvedPolarisEntity(N2v2),
+                    getResolvedPolarisEntity(N3),
+                    getResolvedPolarisEntity(N5),
+                    getResolvedPolarisEntity(N5_N6));
+            LOGGER.debug("Client1 returning results {}", resolvedEntities);
+            return new ResolvedEntitiesResult(resolvedEntities);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
+        };
+    Answer client2Answer =
+        invocation -> {
+          // This is client2's loadResolvedEntities call - execute normally 
and signal client1
+          try {
+            LOGGER.debug("Client2 loading resolved entities");
+            var result =
+                new ResolvedEntitiesResult(
                     List.of(
                         getResolvedPolarisEntity(N1),
-                        getResolvedPolarisEntity(N2v2),
+                        getResolvedPolarisEntity(N2v3),
                         getResolvedPolarisEntity(N3),
                         getResolvedPolarisEntity(N5),
-                        getResolvedPolarisEntity(N5_N6));
-                LOGGER.debug("Client1 returning results {}", resolvedEntities);
-                return new ResolvedEntitiesResult(resolvedEntities);
-              } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                throw new RuntimeException(e);
-              }
-            })
-        .doAnswer(
-            invocation -> {
-              // This is client2's loadResolvedEntities call - execute 
normally and signal client1
-              try {
-                LOGGER.debug("Client2 loading resolved entities");
-                var result =
-                    new ResolvedEntitiesResult(
-                        List.of(
-                            getResolvedPolarisEntity(N1),
-                            getResolvedPolarisEntity(N2v3),
-                            getResolvedPolarisEntity(N3),
-                            getResolvedPolarisEntity(N5),
-                            getResolvedPolarisEntity(N5_N6)));
-                client1ResolvedEntitiesBlocker.release(); // Allow client1 to 
proceed
-                LOGGER.debug("Client2 returning results {}", 
result.getResolvedEntities());
-                return result;
-              } catch (Exception e) {
-                client1ResolvedEntitiesBlocker.release(); // Release in case 
of error
-                throw e;
-              }
-            })
-        .when(mockedMetaStoreManager)
-        .loadResolvedEntities(Mockito.any(), Mockito.any(), Mockito.any());
+                        getResolvedPolarisEntity(N5_N6)));
+            client1ResolvedEntitiesBlocker.release(); // Allow client1 to 
proceed
+            LOGGER.debug("Client2 returning results {}", 
result.getResolvedEntities());
+            return result;
+          } catch (Exception e) {
+            client1ResolvedEntitiesBlocker.release(); // Release in case of 
error
+            throw e;
+          }
+        };
+    if (loadType.equals("id")) {
+      Mockito.doAnswer(client1Answer)
+          .doAnswer(client2Answer)
+          .when(mockedMetaStoreManager)
+          .loadResolvedEntities(Mockito.any(), Mockito.any(), Mockito.any());
+    } else {
+      Mockito.doAnswer(client1Answer)
+          .doAnswer(client2Answer)
+          .when(mockedMetaStoreManager)
+          .loadResolvedEntities(Mockito.any(), Mockito.any());
+    }
 
     // ExecutorService isn't AutoCloseable in JDK 11 :(
     ExecutorService executorService = Executors.newFixedThreadPool(2);
@@ -989,11 +1103,19 @@ public class InMemoryEntityCacheTest {
                   // Client1 calls getOrLoadResolvedEntities
                   // - loadEntitiesChangeTracking returns older version for N2
                   // - loadResolvedEntities will block until client2 completes
-                  List<EntityCacheLookupResult> results =
-                      cache.getOrLoadResolvedEntities(
-                          this.callCtx, PolarisEntityType.NAMESPACE, 
entityIds);
-
-                  return results;
+                  if (loadType.equals("id")) {
+                    return cache.getOrLoadResolvedEntities(
+                        this.callCtx, PolarisEntityType.NAMESPACE, entityIds);
+                  } else {
+                    return cache.getOrLoadResolvedEntities(
+                        this.callCtx,
+                        List.of(
+                            new EntityNameLookupRecord(N1),
+                            new EntityNameLookupRecord(N2),
+                            new EntityNameLookupRecord(N3),
+                            new EntityNameLookupRecord(N5),
+                            new EntityNameLookupRecord(N5_N6)));
+                  }
                 } catch (Exception e) {
                   client1Exception.set(e);
                   return null;
@@ -1009,11 +1131,19 @@ public class InMemoryEntityCacheTest {
                   // - loadEntitiesChangeTracking returns newer version for N2
                   // - loadResolvedEntities executes normally and signals 
client1 when done
                   client1ChangeTrackingResult.await();
-                  List<EntityCacheLookupResult> results =
-                      cache.getOrLoadResolvedEntities(
-                          this.callCtx, PolarisEntityType.NAMESPACE, 
entityIds);
-
-                  return results;
+                  if (loadType.equals("id")) {
+                    return cache.getOrLoadResolvedEntities(
+                        this.callCtx, PolarisEntityType.NAMESPACE, entityIds);
+                  } else {
+                    return cache.getOrLoadResolvedEntities(
+                        this.callCtx,
+                        List.of(
+                            new EntityNameLookupRecord(N1),
+                            new EntityNameLookupRecord(N2),
+                            new EntityNameLookupRecord(N3),
+                            new EntityNameLookupRecord(N5),
+                            new EntityNameLookupRecord(N5_N6)));
+                  }
                 } catch (Exception e) {
                   client2Exception.set(e);
                   client1ResolvedEntitiesBlocker.release(); // Release in case 
of error

Reply via email to