dimas-b commented on code in PR #4526:
URL: https://github.com/apache/polaris/pull/4526#discussion_r3311545423


##########
persistence/nosql/persistence/metastore-maintenance/src/main/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CatalogRetainedIdentifier.java:
##########
@@ -302,25 +344,27 @@ private <O extends ContainerObj> void perRealmContainer(
         });
   }
 
-  private <O extends BaseCommitObj> void perCatalog(
-      String what,
-      String refNamePattern,
+  private void perCatalogRoles(
       CatalogObj catalogObj,
       String celRetainExpr,
-      Class<O> objClazz,
-      Function<O, IndexContainer<ObjRef>> indexContainerFunction,
+      Function<CatalogRolesObj, IndexContainer<ObjRef>> indexContainerFunction,
       RetainedCollector collector,
-      Consumer<O> objConsumer) {
+      Consumer<CatalogRolesObj> objConsumer) {
     LOGGER.info(
-        "Identifying {} for catalog '{}' ({})...", what, catalogObj.name(), 
catalogObj.stableId());
+        "Identifying {} for catalog '{}' ({})...",
+        "catalog roles",
+        catalogObj.name(),
+        catalogObj.stableId());
     ignoreReferenceNotFound(
         () -> {
           var persistence = collector.realmPersistence();
-          var refName = format(refNamePattern, catalogObj.stableId());
+          var refName =
+              format(CatalogRolesObj.CATALOG_ROLES_REF_NAME_PATTERN, 
catalogObj.stableId());
           var historyContinue =
-              new CelReferenceContinuePredicate<O>(refName, persistence, 
celRetainExpr);
+              new CelReferenceContinuePredicate<CatalogRolesObj>(
+                  refName, persistence, celRetainExpr);
           collector.refRetainIndexToSingleObj(
-              refName, objClazz, historyContinue, indexContainerFunction, 
objConsumer);
+              refName, CatalogRolesObj.class, historyContinue, 
indexContainerFunction, objConsumer);

Review Comment:
   Since we inline `CatalogRolesObj.class` why not inline 
`indexContainerFunction`, which is always `CatalogRolesObj::nameToObjRef`?



##########
persistence/nosql/persistence/metastore-maintenance/src/main/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CatalogRetainedIdentifier.java:
##########
@@ -331,4 +375,351 @@ void ignoreReferenceNotFound(Runnable runnable) {
       LOGGER.debug("Reference not found: {}", e.getMessage());
     }
   }
+
+  private void cleanupStaleGrantRecords(Persistence persistence, 
EntityLookupCache entityLookup) {
+    LOGGER.info("Cleaning stale grant records...");
+    ignoreReferenceNotFound(
+        () -> {
+          IndexKey continuation = null;
+          while (true) {
+            var refObj =
+                persistence.fetchReferenceHead(REALM_GRANTS_REF_NAME, 
RealmGrantsObj.class);
+            if (refObj.isEmpty()) {
+              return;
+            }
+
+            var batch =
+                scanStaleGrantRecords(
+                    persistence,
+                    refObj.get().acls().indexForRead(persistence, 
OBJ_REF_SERIALIZER),
+                    continuation,
+                    entityLookup);
+            if (batch.candidates().isEmpty()) {
+              return;
+            }
+
+            var cleaned = commitGrantCleanup(persistence, batch.candidates());
+            LOGGER.debug("Removed {} stale grant ACL entries in this batch", 
cleaned);
+
+            if (batch.exhausted()) {
+              return;
+            }
+            continuation = batch.lastScannedKey();
+          }
+        });
+  }
+
+  private GrantCleanupBatch scanStaleGrantRecords(
+      Persistence persistence,
+      Index<ObjRef> index,
+      IndexKey continuation,
+      EntityLookupCache entityLookup) {
+    var candidates = new ArrayList<GrantCleanupCandidate>();
+    var iter = index.iterator(continuation, null, false);
+    var skipContinuation = continuation != null;
+    IndexKey lastScannedKey = null;
+    while (iter.hasNext()) {
+      var elem = iter.next();
+      var aclKey = elem.key();
+      if (skipContinuation && aclKey.equals(continuation)) {
+        skipContinuation = false;
+        continue;
+      }
+      skipContinuation = false;
+      lastScannedKey = aclKey;
+
+      var anchor = GrantTriplet.fromRoleName(aclKey.toString());
+      if (!entityLookup.entityExists(anchor.catalogId(), anchor.id(), 
anchor.typeCode())) {
+        candidates.add(new GrantCleanupCandidate(aclKey, List.of(), true));
+      } else {
+        var aclObj = persistence.fetch(elem.value(), AclObj.class);
+        if (aclObj == null) {
+          continue;
+        }
+
+        var staleRoleIds = new ArrayList<String>();
+        aclObj
+            .acl()
+            .forEach(
+                (roleId, entry) -> {
+                  var role = GrantTriplet.fromRoleName(roleId);
+                  if (!entityLookup.entityExists(role.catalogId(), role.id(), 
role.typeCode())) {
+                    staleRoleIds.add(roleId);
+                  }
+                });
+        if (!staleRoleIds.isEmpty()) {
+          candidates.add(new GrantCleanupCandidate(aclKey, staleRoleIds, 
false));
+        }
+      }
+
+      if (candidates.size() >= STALE_CLEANUP_BATCH_SIZE) {
+        return new GrantCleanupBatch(candidates, lastScannedKey, false);
+      }
+    }
+    return new GrantCleanupBatch(candidates, lastScannedKey, true);
+  }
+
+  private int commitGrantCleanup(
+      Persistence persistence, List<GrantCleanupCandidate> 
grantCleanupCandidates) {
+    return persistence
+        .createCommitter(REALM_GRANTS_REF_NAME, RealmGrantsObj.class, 
Integer.class)
+        .synchronizingLocally()
+        .commitRuntimeException(
+            (state, refObjSupplier) -> {
+              var refObj = refObjSupplier.get();
+              if (refObj.isEmpty()) {
+                return state.noCommit(0);
+              }
+
+              var aclIndex =
+                  refObj.get().acls().asUpdatableIndex(state.persistence(), 
OBJ_REF_SERIALIZER);
+              var changed = false;
+              var cleaned = 0;
+              for (var candidate : grantCleanupCandidates) {
+                if (candidate.removeAcl()) {
+                  if (aclIndex.remove(candidate.aclKey())) {
+                    changed = true;
+                    cleaned++;
+                  }
+                  continue;
+                }
+
+                var aclRef = aclIndex.get(candidate.aclKey());
+                if (aclRef == null) {
+                  continue;
+                }
+
+                var currentAclObj = state.persistence().fetch(aclRef, 
AclObj.class);
+                if (currentAclObj == null) {
+                  continue;
+                }
+
+                var aclBuilder = 
privileges.newAclBuilder().from(currentAclObj.acl());
+                candidate.staleRoleIds().forEach(aclBuilder::removeEntry);
+                var updatedAcl = aclBuilder.build();
+                if (currentAclObj.acl().equals(updatedAcl)) {
+                  continue;
+                }
+
+                if (aclIsEmpty(updatedAcl)) {
+                  if (aclIndex.remove(candidate.aclKey())) {
+                    changed = true;
+                    cleaned++;
+                  }
+                  continue;
+                }
+
+                AclObj updatedAclObj =
+                    AclObj.builder()
+                        .from(currentAclObj)
+                        .id(state.persistence().generateId())
+                        .acl(updatedAcl)
+                        .build();
+                updatedAclObj =
+                    state.writeOrReplace(
+                        "acl-" + currentAclObj.securableId(), updatedAclObj, 
AclObj.class);
+                aclIndex.put(candidate.aclKey(), objRef(updatedAclObj));
+                changed = true;
+                cleaned++;
+              }
+
+              if (!changed) {
+                return state.noCommit(0);
+              }
+
+              var builder = RealmGrantsObj.builder().from(refObj.get());
+              builder.acls(aclIndex.toIndexed("idx-sec-", 
state::writeOrReplace));
+              return state.commitResult(cleaned, builder, refObj);
+            })
+        .orElse(0);
+  }
+
+  private void cleanupStalePolicyMappings(Persistence persistence, 
EntityLookupCache entityLookup) {
+    LOGGER.info("Cleaning stale policy mappings...");
+    ignoreReferenceNotFound(
+        () -> {
+          IndexKey continuation = null;
+          while (true) {
+            var refObj =
+                persistence.fetchReferenceHead(POLICY_MAPPINGS_REF_NAME, 
PolicyMappingsObj.class);
+            if (refObj.isEmpty()) {
+              return;
+            }
+
+            var batch =
+                scanStalePolicyMappings(
+                    refObj
+                        .get()
+                        .policyMappings()
+                        .indexForRead(persistence, POLICY_MAPPING_SERIALIZER),
+                    continuation,
+                    entityLookup);
+            if (batch.candidates().isEmpty()) {
+              return;
+            }
+
+            var cleaned = commitPolicyMappingsCleanup(persistence, 
batch.candidates());
+            LOGGER.debug("Removed {} stale policy mappings in this batch", 
cleaned);
+
+            if (batch.exhausted()) {
+              return;
+            }
+            continuation = batch.lastScannedKey();
+          }
+        });
+  }
+
+  private PolicyMappingsCleanupBatch scanStalePolicyMappings(
+      Index<PolicyMapping> index, IndexKey continuation, EntityLookupCache 
entityLookup) {
+    var candidates = new ArrayList<PolicyMappingsObj.KeyByEntity>();
+    var lower = continuation != null ? continuation : 
POLICY_MAPPINGS_ENTITY_PREFIX_KEY;
+    var iter = index.iterator(lower, null, false);
+    var skipContinuation = continuation != null;
+    IndexKey lastScannedKey = null;
+    while (iter.hasNext()) {
+      var elem = iter.next();
+      var key = elem.key();
+      if (skipContinuation && key.equals(continuation)) {
+        skipContinuation = false;
+        continue;
+      }
+      skipContinuation = false;
+
+      var mappingKey = PolicyMappingsObj.PolicyMappingKey.fromIndexKey(key);
+      if (!(mappingKey instanceof PolicyMappingsObj.KeyByEntity entityKey)) {
+        break;
+      }
+
+      lastScannedKey = key;
+      if (!entityLookup.policyExists(entityKey.policyCatalogId(), 
entityKey.policyId())
+          || !entityLookup.policyTargetExists(entityKey.entityCatalogId(), 
entityKey.entityId())) {
+        candidates.add(entityKey);
+      }
+
+      if (candidates.size() >= STALE_CLEANUP_BATCH_SIZE) {
+        return new PolicyMappingsCleanupBatch(candidates, lastScannedKey, 
false);
+      }
+    }
+    return new PolicyMappingsCleanupBatch(candidates, lastScannedKey, true);
+  }
+
+  private int commitPolicyMappingsCleanup(
+      Persistence persistence, List<PolicyMappingsObj.KeyByEntity> 
staleMappings) {
+    return persistence
+        .createCommitter(POLICY_MAPPINGS_REF_NAME, PolicyMappingsObj.class, 
Integer.class)
+        .synchronizingLocally()
+        .commitRuntimeException(
+            (state, refObjSupplier) -> {
+              var refObj = refObjSupplier.get();
+              if (refObj.isEmpty()) {
+                return state.noCommit(0);
+              }
+
+              var mappingsIndex =
+                  refObj
+                      .get()
+                      .policyMappings()
+                      .asUpdatableIndex(state.persistence(), 
POLICY_MAPPING_SERIALIZER);
+              var changed = false;
+              var cleaned = 0;
+              for (var keyByEntity : staleMappings) {
+                var removedForward = 
mappingsIndex.remove(keyByEntity.toIndexKey());
+                var removedReverse = 
mappingsIndex.remove(keyByEntity.reverse().toIndexKey());
+                if (removedForward || removedReverse) {
+                  changed = true;
+                  cleaned++;
+                }
+              }
+
+              if (!changed) {
+                return state.noCommit(0);
+              }
+
+              var builder = PolicyMappingsObj.builder().from(refObj.get());
+              builder.policyMappings(mappingsIndex.toIndexed("mappings", 
state::writeOrReplace));
+              return state.commitResult(cleaned, builder, refObj);
+            })
+        .orElse(0);
+  }
+
+  private static boolean aclIsEmpty(Acl acl) {
+    var hasEntries = new boolean[1];
+    acl.forEach((roleId, entry) -> hasEntries[0] = true);

Review Comment:
   `Acl.isEmpty()`?.. There's only one impl., so LOC changes are about the same.



##########
persistence/nosql/persistence/metastore-maintenance/src/main/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CatalogRetainedIdentifier.java:
##########
@@ -331,4 +375,351 @@ void ignoreReferenceNotFound(Runnable runnable) {
       LOGGER.debug("Reference not found: {}", e.getMessage());
     }
   }
+
+  private void cleanupStaleGrantRecords(Persistence persistence, 
EntityLookupCache entityLookup) {
+    LOGGER.info("Cleaning stale grant records...");
+    ignoreReferenceNotFound(
+        () -> {
+          IndexKey continuation = null;
+          while (true) {
+            var refObj =
+                persistence.fetchReferenceHead(REALM_GRANTS_REF_NAME, 
RealmGrantsObj.class);
+            if (refObj.isEmpty()) {
+              return;
+            }
+
+            var batch =
+                scanStaleGrantRecords(
+                    persistence,
+                    refObj.get().acls().indexForRead(persistence, 
OBJ_REF_SERIALIZER),
+                    continuation,
+                    entityLookup);
+            if (batch.candidates().isEmpty()) {
+              return;
+            }
+
+            var cleaned = commitGrantCleanup(persistence, batch.candidates());
+            LOGGER.debug("Removed {} stale grant ACL entries in this batch", 
cleaned);
+
+            if (batch.exhausted()) {
+              return;
+            }
+            continuation = batch.lastScannedKey();
+          }
+        });
+  }
+
+  private GrantCleanupBatch scanStaleGrantRecords(
+      Persistence persistence,
+      Index<ObjRef> index,
+      IndexKey continuation,
+      EntityLookupCache entityLookup) {
+    var candidates = new ArrayList<GrantCleanupCandidate>();
+    var iter = index.iterator(continuation, null, false);
+    var skipContinuation = continuation != null;
+    IndexKey lastScannedKey = null;
+    while (iter.hasNext()) {
+      var elem = iter.next();
+      var aclKey = elem.key();
+      if (skipContinuation && aclKey.equals(continuation)) {
+        skipContinuation = false;
+        continue;
+      }
+      skipContinuation = false;
+      lastScannedKey = aclKey;
+
+      var anchor = GrantTriplet.fromRoleName(aclKey.toString());
+      if (!entityLookup.entityExists(anchor.catalogId(), anchor.id(), 
anchor.typeCode())) {
+        candidates.add(new GrantCleanupCandidate(aclKey, List.of(), true));
+      } else {
+        var aclObj = persistence.fetch(elem.value(), AclObj.class);
+        if (aclObj == null) {
+          continue;
+        }
+
+        var staleRoleIds = new ArrayList<String>();
+        aclObj
+            .acl()
+            .forEach(
+                (roleId, entry) -> {
+                  var role = GrantTriplet.fromRoleName(roleId);
+                  if (!entityLookup.entityExists(role.catalogId(), role.id(), 
role.typeCode())) {
+                    staleRoleIds.add(roleId);
+                  }
+                });
+        if (!staleRoleIds.isEmpty()) {
+          candidates.add(new GrantCleanupCandidate(aclKey, staleRoleIds, 
false));
+        }
+      }
+
+      if (candidates.size() >= STALE_CLEANUP_BATCH_SIZE) {
+        return new GrantCleanupBatch(candidates, lastScannedKey, false);
+      }
+    }
+    return new GrantCleanupBatch(candidates, lastScannedKey, true);
+  }
+
+  private int commitGrantCleanup(
+      Persistence persistence, List<GrantCleanupCandidate> 
grantCleanupCandidates) {
+    return persistence
+        .createCommitter(REALM_GRANTS_REF_NAME, RealmGrantsObj.class, 
Integer.class)
+        .synchronizingLocally()
+        .commitRuntimeException(
+            (state, refObjSupplier) -> {
+              var refObj = refObjSupplier.get();
+              if (refObj.isEmpty()) {
+                return state.noCommit(0);
+              }
+
+              var aclIndex =
+                  refObj.get().acls().asUpdatableIndex(state.persistence(), 
OBJ_REF_SERIALIZER);
+              var changed = false;
+              var cleaned = 0;
+              for (var candidate : grantCleanupCandidates) {
+                if (candidate.removeAcl()) {
+                  if (aclIndex.remove(candidate.aclKey())) {
+                    changed = true;
+                    cleaned++;
+                  }
+                  continue;
+                }
+
+                var aclRef = aclIndex.get(candidate.aclKey());
+                if (aclRef == null) {
+                  continue;
+                }
+
+                var currentAclObj = state.persistence().fetch(aclRef, 
AclObj.class);
+                if (currentAclObj == null) {
+                  continue;
+                }
+
+                var aclBuilder = 
privileges.newAclBuilder().from(currentAclObj.acl());
+                candidate.staleRoleIds().forEach(aclBuilder::removeEntry);
+                var updatedAcl = aclBuilder.build();
+                if (currentAclObj.acl().equals(updatedAcl)) {
+                  continue;
+                }
+
+                if (aclIsEmpty(updatedAcl)) {
+                  if (aclIndex.remove(candidate.aclKey())) {
+                    changed = true;
+                    cleaned++;
+                  }
+                  continue;
+                }
+
+                AclObj updatedAclObj =
+                    AclObj.builder()
+                        .from(currentAclObj)
+                        .id(state.persistence().generateId())
+                        .acl(updatedAcl)
+                        .build();
+                updatedAclObj =
+                    state.writeOrReplace(
+                        "acl-" + currentAclObj.securableId(), updatedAclObj, 
AclObj.class);
+                aclIndex.put(candidate.aclKey(), objRef(updatedAclObj));
+                changed = true;
+                cleaned++;
+              }
+
+              if (!changed) {
+                return state.noCommit(0);
+              }
+
+              var builder = RealmGrantsObj.builder().from(refObj.get());
+              builder.acls(aclIndex.toIndexed("idx-sec-", 
state::writeOrReplace));
+              return state.commitResult(cleaned, builder, refObj);
+            })
+        .orElse(0);
+  }
+
+  private void cleanupStalePolicyMappings(Persistence persistence, 
EntityLookupCache entityLookup) {
+    LOGGER.info("Cleaning stale policy mappings...");
+    ignoreReferenceNotFound(
+        () -> {
+          IndexKey continuation = null;
+          while (true) {
+            var refObj =
+                persistence.fetchReferenceHead(POLICY_MAPPINGS_REF_NAME, 
PolicyMappingsObj.class);
+            if (refObj.isEmpty()) {
+              return;
+            }
+
+            var batch =
+                scanStalePolicyMappings(
+                    refObj
+                        .get()
+                        .policyMappings()
+                        .indexForRead(persistence, POLICY_MAPPING_SERIALIZER),
+                    continuation,
+                    entityLookup);
+            if (batch.candidates().isEmpty()) {
+              return;
+            }
+
+            var cleaned = commitPolicyMappingsCleanup(persistence, 
batch.candidates());
+            LOGGER.debug("Removed {} stale policy mappings in this batch", 
cleaned);
+
+            if (batch.exhausted()) {
+              return;
+            }
+            continuation = batch.lastScannedKey();
+          }
+        });
+  }
+
+  private PolicyMappingsCleanupBatch scanStalePolicyMappings(
+      Index<PolicyMapping> index, IndexKey continuation, EntityLookupCache 
entityLookup) {
+    var candidates = new ArrayList<PolicyMappingsObj.KeyByEntity>();
+    var lower = continuation != null ? continuation : 
POLICY_MAPPINGS_ENTITY_PREFIX_KEY;
+    var iter = index.iterator(lower, null, false);
+    var skipContinuation = continuation != null;
+    IndexKey lastScannedKey = null;
+    while (iter.hasNext()) {
+      var elem = iter.next();
+      var key = elem.key();
+      if (skipContinuation && key.equals(continuation)) {
+        skipContinuation = false;
+        continue;
+      }
+      skipContinuation = false;
+
+      var mappingKey = PolicyMappingsObj.PolicyMappingKey.fromIndexKey(key);
+      if (!(mappingKey instanceof PolicyMappingsObj.KeyByEntity entityKey)) {
+        break;
+      }
+
+      lastScannedKey = key;
+      if (!entityLookup.policyExists(entityKey.policyCatalogId(), 
entityKey.policyId())
+          || !entityLookup.policyTargetExists(entityKey.entityCatalogId(), 
entityKey.entityId())) {
+        candidates.add(entityKey);
+      }
+
+      if (candidates.size() >= STALE_CLEANUP_BATCH_SIZE) {
+        return new PolicyMappingsCleanupBatch(candidates, lastScannedKey, 
false);
+      }
+    }
+    return new PolicyMappingsCleanupBatch(candidates, lastScannedKey, true);
+  }
+
+  private int commitPolicyMappingsCleanup(
+      Persistence persistence, List<PolicyMappingsObj.KeyByEntity> 
staleMappings) {
+    return persistence
+        .createCommitter(POLICY_MAPPINGS_REF_NAME, PolicyMappingsObj.class, 
Integer.class)
+        .synchronizingLocally()
+        .commitRuntimeException(
+            (state, refObjSupplier) -> {
+              var refObj = refObjSupplier.get();
+              if (refObj.isEmpty()) {
+                return state.noCommit(0);
+              }
+
+              var mappingsIndex =
+                  refObj
+                      .get()
+                      .policyMappings()
+                      .asUpdatableIndex(state.persistence(), 
POLICY_MAPPING_SERIALIZER);
+              var changed = false;
+              var cleaned = 0;
+              for (var keyByEntity : staleMappings) {
+                var removedForward = 
mappingsIndex.remove(keyByEntity.toIndexKey());
+                var removedReverse = 
mappingsIndex.remove(keyByEntity.reverse().toIndexKey());
+                if (removedForward || removedReverse) {
+                  changed = true;
+                  cleaned++;
+                }
+              }
+
+              if (!changed) {
+                return state.noCommit(0);
+              }
+
+              var builder = PolicyMappingsObj.builder().from(refObj.get());
+              builder.policyMappings(mappingsIndex.toIndexed("mappings", 
state::writeOrReplace));
+              return state.commitResult(cleaned, builder, refObj);
+            })
+        .orElse(0);
+  }
+
+  private static boolean aclIsEmpty(Acl acl) {
+    var hasEntries = new boolean[1];
+    acl.forEach((roleId, entry) -> hasEntries[0] = true);
+    return !hasEntries[0];
+  }
+
+  private record PolicyMappingsCleanupBatch(
+      List<PolicyMappingsObj.KeyByEntity> candidates, IndexKey lastScannedKey, 
boolean exhausted) {}
+
+  private record GrantCleanupBatch(
+      List<GrantCleanupCandidate> candidates, IndexKey lastScannedKey, boolean 
exhausted) {}
+
+  private record GrantCleanupCandidate(
+      IndexKey aclKey, List<String> staleRoleIds, boolean removeAcl) {}

Review Comment:
   `removeAcl == staleRoleIds.isEmpty()` (always)



##########
persistence/nosql/persistence/metastore-maintenance/src/main/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CatalogRetainedIdentifier.java:
##########
@@ -331,4 +375,351 @@ void ignoreReferenceNotFound(Runnable runnable) {
       LOGGER.debug("Reference not found: {}", e.getMessage());
     }
   }
+
+  private void cleanupStaleGrantRecords(Persistence persistence, 
EntityLookupCache entityLookup) {
+    LOGGER.info("Cleaning stale grant records...");
+    ignoreReferenceNotFound(
+        () -> {
+          IndexKey continuation = null;
+          while (true) {
+            var refObj =
+                persistence.fetchReferenceHead(REALM_GRANTS_REF_NAME, 
RealmGrantsObj.class);
+            if (refObj.isEmpty()) {
+              return;
+            }
+
+            var batch =
+                scanStaleGrantRecords(
+                    persistence,
+                    refObj.get().acls().indexForRead(persistence, 
OBJ_REF_SERIALIZER),
+                    continuation,
+                    entityLookup);
+            if (batch.candidates().isEmpty()) {
+              return;
+            }
+
+            var cleaned = commitGrantCleanup(persistence, batch.candidates());
+            LOGGER.debug("Removed {} stale grant ACL entries in this batch", 
cleaned);
+
+            if (batch.exhausted()) {
+              return;
+            }
+            continuation = batch.lastScannedKey();
+          }
+        });
+  }
+
+  private GrantCleanupBatch scanStaleGrantRecords(
+      Persistence persistence,
+      Index<ObjRef> index,
+      IndexKey continuation,
+      EntityLookupCache entityLookup) {
+    var candidates = new ArrayList<GrantCleanupCandidate>();
+    var iter = index.iterator(continuation, null, false);
+    var skipContinuation = continuation != null;
+    IndexKey lastScannedKey = null;
+    while (iter.hasNext()) {
+      var elem = iter.next();
+      var aclKey = elem.key();
+      if (skipContinuation && aclKey.equals(continuation)) {
+        skipContinuation = false;
+        continue;
+      }
+      skipContinuation = false;
+      lastScannedKey = aclKey;
+
+      var anchor = GrantTriplet.fromRoleName(aclKey.toString());
+      if (!entityLookup.entityExists(anchor.catalogId(), anchor.id(), 
anchor.typeCode())) {
+        candidates.add(new GrantCleanupCandidate(aclKey, List.of(), true));
+      } else {
+        var aclObj = persistence.fetch(elem.value(), AclObj.class);
+        if (aclObj == null) {
+          continue;
+        }
+
+        var staleRoleIds = new ArrayList<String>();
+        aclObj
+            .acl()
+            .forEach(
+                (roleId, entry) -> {
+                  var role = GrantTriplet.fromRoleName(roleId);
+                  if (!entityLookup.entityExists(role.catalogId(), role.id(), 
role.typeCode())) {
+                    staleRoleIds.add(roleId);
+                  }
+                });
+        if (!staleRoleIds.isEmpty()) {
+          candidates.add(new GrantCleanupCandidate(aclKey, staleRoleIds, 
false));
+        }
+      }
+
+      if (candidates.size() >= STALE_CLEANUP_BATCH_SIZE) {
+        return new GrantCleanupBatch(candidates, lastScannedKey, false);
+      }
+    }
+    return new GrantCleanupBatch(candidates, lastScannedKey, true);
+  }
+
+  private int commitGrantCleanup(
+      Persistence persistence, List<GrantCleanupCandidate> 
grantCleanupCandidates) {
+    return persistence
+        .createCommitter(REALM_GRANTS_REF_NAME, RealmGrantsObj.class, 
Integer.class)
+        .synchronizingLocally()

Review Comment:
   side note: `ExclusiveCommitSynchronizer.tryAcquire()` may return without 
actually acquiring the `Semaphore` - its `tryAcquire` method does not throw on 
timeout, but returns `false`. The subsequent `.release()` call can skew the 
`Semaphore`'s state. 



##########
persistence/nosql/persistence/metastore-maintenance/src/test/java/org/apache/polaris/persistence/nosql/metastore/maintenance/TestCatalogMaintenance.java:
##########
@@ -261,20 +268,400 @@ public void catalogMaintenance() {
     checkEntities("real state after grace", entities);
   }
 
+  @Test
+  public void maintenanceRemovesStalePolicyMappings() {
+    var testSetup = bootstrapRealm();
+    var manager = testSetup.manager();
+    var callCtx = testSetup.callCtx();
+    var persistence = testSetup.persistence();
+
+    var catalog = createCatalog(manager, callCtx, persistence);
+    mandatoryCatalogObjsForTestImpl(persistence, catalog.getId());
+    var namespace = createNamespace(manager, callCtx, catalog, persistence, 
"policy-ns");
+    var policy =
+        createPolicy(
+            manager,
+            callCtx,
+            catalog,
+            namespace,
+            persistence,
+            "cleanup-policy",
+            PredefinedPolicyTypes.DATA_COMPACTION);
+    var tables =
+        createTables(
+            manager,
+            callCtx,
+            catalog,
+            namespace,
+            persistence,
+            CatalogRetainedIdentifier.STALE_CLEANUP_BATCH_SIZE + 1,
+            "policy-table-");
+
+    for (var table : tables) {
+      assertThat(
+              manager.attachPolicyToEntity(
+                  callCtx,
+                  List.of(catalog, namespace),
+                  table,
+                  List.of(catalog, namespace),
+                  policy,
+                  Map.of()))
+          .extracting(BaseResult::isSuccess)
+          .isEqualTo(true);
+    }
+
+    assertThat(countPolicyMappingsForPolicy(persistence, 
policy.getCatalogId(), policy.getId()))
+        .isEqualTo(2L * tables.size());
+
+    assertThat(manager.dropEntityIfExists(callCtx, List.of(catalog, 
namespace), policy, null, true))
+        .extracting(BaseResult::isSuccess)
+        .isEqualTo(true);
+
+    LoadPolicyMappingsResult staleMappings =
+        manager.loadPoliciesOnEntity(callCtx, tables.getFirst());
+    assertThat(staleMappings.isSuccess()).isTrue();
+    assertThat(staleMappings.getPolicyMappingRecords()).hasSize(1);
+    assertThat(staleMappings.getEntities()).isEmpty();
+
+    assertThat(runMaintenance().success()).isTrue();
+
+    assertThat(countPolicyMappingsForPolicy(persistence, 
policy.getCatalogId(), policy.getId()))
+        .isZero();
+  }
+
+  @Test
+  public void maintenanceRemovesStalePolicyMappingsForDeletedTargets() {
+    var testSetup = bootstrapRealm();
+    var manager = testSetup.manager();
+    var callCtx = testSetup.callCtx();
+    var persistence = testSetup.persistence();
+
+    var catalog = createCatalog(manager, callCtx, persistence);
+    mandatoryCatalogObjsForTestImpl(persistence, catalog.getId());
+    var namespace = createNamespace(manager, callCtx, catalog, persistence, 
"policy-target-ns");
+    var policy =
+        createPolicy(
+            manager,
+            callCtx,
+            catalog,
+            namespace,
+            persistence,
+            "cleanup-target-policy",
+            PredefinedPolicyTypes.DATA_COMPACTION);
+    var staleTables =
+        createTables(
+            manager,
+            callCtx,
+            catalog,
+            namespace,
+            persistence,
+            CatalogRetainedIdentifier.STALE_CLEANUP_BATCH_SIZE + 1,
+            "stale-policy-table-");
+    var liveTable =
+        createTable(manager, callCtx, catalog, namespace, persistence, 
"live-policy-table");
+
+    for (var table : staleTables) {
+      assertThat(
+              manager.attachPolicyToEntity(
+                  callCtx,
+                  List.of(catalog, namespace),
+                  table,
+                  List.of(catalog, namespace),
+                  policy,
+                  Map.of()))
+          .extracting(BaseResult::isSuccess)
+          .isEqualTo(true);
+    }
+    assertThat(
+            manager.attachPolicyToEntity(
+                callCtx,
+                List.of(catalog, namespace),
+                liveTable,
+                List.of(catalog, namespace),
+                policy,
+                Map.of()))
+        .extracting(BaseResult::isSuccess)
+        .isEqualTo(true);
+
+    assertThat(countPolicyMappingsForPolicy(persistence, 
policy.getCatalogId(), policy.getId()))
+        .isEqualTo(2L * (staleTables.size() + 1L));
+
+    for (var table : staleTables) {
+      assertThat(
+              manager.dropEntityIfExists(callCtx, List.of(catalog, namespace), 
table, null, false))
+          .extracting(BaseResult::isSuccess)
+          .isEqualTo(true);
+    }
+
+    assertThat(countPolicyMappingsForPolicy(persistence, 
policy.getCatalogId(), policy.getId()))
+        .isEqualTo(2L * (staleTables.size() + 1L));
+    assertThat(manager.loadPoliciesOnEntity(callCtx, 
liveTable).getPolicyMappingRecords())
+        .hasSize(1);
+
+    assertThat(runMaintenance().success()).isTrue();
+
+    assertThat(countPolicyMappingsForPolicy(persistence, 
policy.getCatalogId(), policy.getId()))
+        .isEqualTo(2L);

Review Comment:
   Should we assert that they point to `liveTable`?



##########
persistence/nosql/persistence/metastore-maintenance/src/main/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CatalogRetainedIdentifier.java:
##########
@@ -331,4 +375,351 @@ void ignoreReferenceNotFound(Runnable runnable) {
       LOGGER.debug("Reference not found: {}", e.getMessage());
     }
   }
+
+  private void cleanupStaleGrantRecords(Persistence persistence, 
EntityLookupCache entityLookup) {
+    LOGGER.info("Cleaning stale grant records...");
+    ignoreReferenceNotFound(
+        () -> {
+          IndexKey continuation = null;
+          while (true) {
+            var refObj =
+                persistence.fetchReferenceHead(REALM_GRANTS_REF_NAME, 
RealmGrantsObj.class);
+            if (refObj.isEmpty()) {
+              return;
+            }
+
+            var batch =
+                scanStaleGrantRecords(
+                    persistence,
+                    refObj.get().acls().indexForRead(persistence, 
OBJ_REF_SERIALIZER),
+                    continuation,
+                    entityLookup);
+            if (batch.candidates().isEmpty()) {
+              return;
+            }
+
+            var cleaned = commitGrantCleanup(persistence, batch.candidates());
+            LOGGER.debug("Removed {} stale grant ACL entries in this batch", 
cleaned);
+
+            if (batch.exhausted()) {
+              return;
+            }
+            continuation = batch.lastScannedKey();
+          }
+        });
+  }
+
+  private GrantCleanupBatch scanStaleGrantRecords(
+      Persistence persistence,
+      Index<ObjRef> index,
+      IndexKey continuation,
+      EntityLookupCache entityLookup) {
+    var candidates = new ArrayList<GrantCleanupCandidate>();
+    var iter = index.iterator(continuation, null, false);
+    var skipContinuation = continuation != null;
+    IndexKey lastScannedKey = null;
+    while (iter.hasNext()) {
+      var elem = iter.next();
+      var aclKey = elem.key();
+      if (skipContinuation && aclKey.equals(continuation)) {
+        skipContinuation = false;
+        continue;
+      }
+      skipContinuation = false;
+      lastScannedKey = aclKey;
+
+      var anchor = GrantTriplet.fromRoleName(aclKey.toString());
+      if (!entityLookup.entityExists(anchor.catalogId(), anchor.id(), 
anchor.typeCode())) {
+        candidates.add(new GrantCleanupCandidate(aclKey, List.of(), true));
+      } else {
+        var aclObj = persistence.fetch(elem.value(), AclObj.class);
+        if (aclObj == null) {
+          continue;
+        }
+
+        var staleRoleIds = new ArrayList<String>();
+        aclObj
+            .acl()
+            .forEach(
+                (roleId, entry) -> {
+                  var role = GrantTriplet.fromRoleName(roleId);
+                  if (!entityLookup.entityExists(role.catalogId(), role.id(), 
role.typeCode())) {
+                    staleRoleIds.add(roleId);
+                  }
+                });
+        if (!staleRoleIds.isEmpty()) {
+          candidates.add(new GrantCleanupCandidate(aclKey, staleRoleIds, 
false));
+        }
+      }
+
+      if (candidates.size() >= STALE_CLEANUP_BATCH_SIZE) {
+        return new GrantCleanupBatch(candidates, lastScannedKey, false);
+      }
+    }
+    return new GrantCleanupBatch(candidates, lastScannedKey, true);
+  }
+
+  private int commitGrantCleanup(
+      Persistence persistence, List<GrantCleanupCandidate> 
grantCleanupCandidates) {
+    return persistence
+        .createCommitter(REALM_GRANTS_REF_NAME, RealmGrantsObj.class, 
Integer.class)
+        .synchronizingLocally()
+        .commitRuntimeException(
+            (state, refObjSupplier) -> {
+              var refObj = refObjSupplier.get();
+              if (refObj.isEmpty()) {
+                return state.noCommit(0);
+              }
+
+              var aclIndex =
+                  refObj.get().acls().asUpdatableIndex(state.persistence(), 
OBJ_REF_SERIALIZER);
+              var changed = false;
+              var cleaned = 0;
+              for (var candidate : grantCleanupCandidates) {
+                if (candidate.removeAcl()) {
+                  if (aclIndex.remove(candidate.aclKey())) {
+                    changed = true;
+                    cleaned++;
+                  }
+                  continue;
+                }
+
+                var aclRef = aclIndex.get(candidate.aclKey());
+                if (aclRef == null) {
+                  continue;
+                }
+
+                var currentAclObj = state.persistence().fetch(aclRef, 
AclObj.class);
+                if (currentAclObj == null) {
+                  continue;
+                }
+
+                var aclBuilder = 
privileges.newAclBuilder().from(currentAclObj.acl());
+                candidate.staleRoleIds().forEach(aclBuilder::removeEntry);
+                var updatedAcl = aclBuilder.build();
+                if (currentAclObj.acl().equals(updatedAcl)) {
+                  continue;
+                }
+
+                if (aclIsEmpty(updatedAcl)) {
+                  if (aclIndex.remove(candidate.aclKey())) {
+                    changed = true;
+                    cleaned++;
+                  }
+                  continue;
+                }
+
+                AclObj updatedAclObj =
+                    AclObj.builder()
+                        .from(currentAclObj)
+                        .id(state.persistence().generateId())
+                        .acl(updatedAcl)
+                        .build();
+                updatedAclObj =
+                    state.writeOrReplace(
+                        "acl-" + currentAclObj.securableId(), updatedAclObj, 
AclObj.class);
+                aclIndex.put(candidate.aclKey(), objRef(updatedAclObj));
+                changed = true;
+                cleaned++;
+              }
+
+              if (!changed) {

Review Comment:
   `cleaned > 0`?



##########
persistence/nosql/persistence/metastore-maintenance/src/test/java/org/apache/polaris/persistence/nosql/metastore/maintenance/TestCatalogMaintenance.java:
##########
@@ -261,20 +268,400 @@ public void catalogMaintenance() {
     checkEntities("real state after grace", entities);
   }
 
+  @Test
+  public void maintenanceRemovesStalePolicyMappings() {
+    var testSetup = bootstrapRealm();
+    var manager = testSetup.manager();
+    var callCtx = testSetup.callCtx();
+    var persistence = testSetup.persistence();
+
+    var catalog = createCatalog(manager, callCtx, persistence);
+    mandatoryCatalogObjsForTestImpl(persistence, catalog.getId());
+    var namespace = createNamespace(manager, callCtx, catalog, persistence, 
"policy-ns");
+    var policy =
+        createPolicy(
+            manager,
+            callCtx,
+            catalog,
+            namespace,
+            persistence,
+            "cleanup-policy",
+            PredefinedPolicyTypes.DATA_COMPACTION);
+    var tables =
+        createTables(
+            manager,
+            callCtx,
+            catalog,
+            namespace,
+            persistence,
+            CatalogRetainedIdentifier.STALE_CLEANUP_BATCH_SIZE + 1,
+            "policy-table-");
+
+    for (var table : tables) {
+      assertThat(
+              manager.attachPolicyToEntity(
+                  callCtx,
+                  List.of(catalog, namespace),
+                  table,
+                  List.of(catalog, namespace),
+                  policy,
+                  Map.of()))
+          .extracting(BaseResult::isSuccess)
+          .isEqualTo(true);
+    }
+
+    assertThat(countPolicyMappingsForPolicy(persistence, 
policy.getCatalogId(), policy.getId()))
+        .isEqualTo(2L * tables.size());
+
+    assertThat(manager.dropEntityIfExists(callCtx, List.of(catalog, 
namespace), policy, null, true))
+        .extracting(BaseResult::isSuccess)
+        .isEqualTo(true);
+
+    LoadPolicyMappingsResult staleMappings =
+        manager.loadPoliciesOnEntity(callCtx, tables.getFirst());
+    assertThat(staleMappings.isSuccess()).isTrue();
+    assertThat(staleMappings.getPolicyMappingRecords()).hasSize(1);
+    assertThat(staleMappings.getEntities()).isEmpty();
+
+    assertThat(runMaintenance().success()).isTrue();
+
+    assertThat(countPolicyMappingsForPolicy(persistence, 
policy.getCatalogId(), policy.getId()))

Review Comment:
   Should we also assert that `manager.loadPoliciesOnEntity()` returns an empty 
list of mappings?



##########
persistence/nosql/persistence/metastore-maintenance/src/main/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CatalogRetainedIdentifier.java:
##########
@@ -331,4 +375,351 @@ void ignoreReferenceNotFound(Runnable runnable) {
       LOGGER.debug("Reference not found: {}", e.getMessage());
     }
   }
+
+  private void cleanupStaleGrantRecords(Persistence persistence, 
EntityLookupCache entityLookup) {
+    LOGGER.info("Cleaning stale grant records...");
+    ignoreReferenceNotFound(
+        () -> {
+          IndexKey continuation = null;
+          while (true) {
+            var refObj =
+                persistence.fetchReferenceHead(REALM_GRANTS_REF_NAME, 
RealmGrantsObj.class);
+            if (refObj.isEmpty()) {
+              return;
+            }
+
+            var batch =
+                scanStaleGrantRecords(
+                    persistence,
+                    refObj.get().acls().indexForRead(persistence, 
OBJ_REF_SERIALIZER),
+                    continuation,
+                    entityLookup);
+            if (batch.candidates().isEmpty()) {
+              return;
+            }
+
+            var cleaned = commitGrantCleanup(persistence, batch.candidates());
+            LOGGER.debug("Removed {} stale grant ACL entries in this batch", 
cleaned);
+
+            if (batch.exhausted()) {
+              return;
+            }
+            continuation = batch.lastScannedKey();
+          }
+        });
+  }
+
+  private GrantCleanupBatch scanStaleGrantRecords(
+      Persistence persistence,
+      Index<ObjRef> index,
+      IndexKey continuation,
+      EntityLookupCache entityLookup) {
+    var candidates = new ArrayList<GrantCleanupCandidate>();
+    var iter = index.iterator(continuation, null, false);
+    var skipContinuation = continuation != null;
+    IndexKey lastScannedKey = null;
+    while (iter.hasNext()) {
+      var elem = iter.next();
+      var aclKey = elem.key();
+      if (skipContinuation && aclKey.equals(continuation)) {
+        skipContinuation = false;
+        continue;
+      }
+      skipContinuation = false;
+      lastScannedKey = aclKey;
+
+      var anchor = GrantTriplet.fromRoleName(aclKey.toString());
+      if (!entityLookup.entityExists(anchor.catalogId(), anchor.id(), 
anchor.typeCode())) {
+        candidates.add(new GrantCleanupCandidate(aclKey, List.of(), true));
+      } else {
+        var aclObj = persistence.fetch(elem.value(), AclObj.class);
+        if (aclObj == null) {
+          continue;
+        }
+
+        var staleRoleIds = new ArrayList<String>();
+        aclObj
+            .acl()
+            .forEach(
+                (roleId, entry) -> {
+                  var role = GrantTriplet.fromRoleName(roleId);
+                  if (!entityLookup.entityExists(role.catalogId(), role.id(), 
role.typeCode())) {
+                    staleRoleIds.add(roleId);
+                  }
+                });
+        if (!staleRoleIds.isEmpty()) {
+          candidates.add(new GrantCleanupCandidate(aclKey, staleRoleIds, 
false));
+        }
+      }
+
+      if (candidates.size() >= STALE_CLEANUP_BATCH_SIZE) {
+        return new GrantCleanupBatch(candidates, lastScannedKey, false);
+      }
+    }
+    return new GrantCleanupBatch(candidates, lastScannedKey, true);
+  }
+
+  private int commitGrantCleanup(
+      Persistence persistence, List<GrantCleanupCandidate> 
grantCleanupCandidates) {
+    return persistence
+        .createCommitter(REALM_GRANTS_REF_NAME, RealmGrantsObj.class, 
Integer.class)
+        .synchronizingLocally()
+        .commitRuntimeException(
+            (state, refObjSupplier) -> {
+              var refObj = refObjSupplier.get();
+              if (refObj.isEmpty()) {
+                return state.noCommit(0);
+              }
+
+              var aclIndex =
+                  refObj.get().acls().asUpdatableIndex(state.persistence(), 
OBJ_REF_SERIALIZER);
+              var changed = false;
+              var cleaned = 0;
+              for (var candidate : grantCleanupCandidates) {
+                if (candidate.removeAcl()) {
+                  if (aclIndex.remove(candidate.aclKey())) {
+                    changed = true;
+                    cleaned++;
+                  }
+                  continue;
+                }
+
+                var aclRef = aclIndex.get(candidate.aclKey());
+                if (aclRef == null) {
+                  continue;
+                }
+
+                var currentAclObj = state.persistence().fetch(aclRef, 
AclObj.class);
+                if (currentAclObj == null) {
+                  continue;
+                }
+
+                var aclBuilder = 
privileges.newAclBuilder().from(currentAclObj.acl());
+                candidate.staleRoleIds().forEach(aclBuilder::removeEntry);
+                var updatedAcl = aclBuilder.build();
+                if (currentAclObj.acl().equals(updatedAcl)) {
+                  continue;
+                }
+
+                if (aclIsEmpty(updatedAcl)) {
+                  if (aclIndex.remove(candidate.aclKey())) {
+                    changed = true;
+                    cleaned++;
+                  }
+                  continue;
+                }
+
+                AclObj updatedAclObj =
+                    AclObj.builder()
+                        .from(currentAclObj)
+                        .id(state.persistence().generateId())
+                        .acl(updatedAcl)
+                        .build();
+                updatedAclObj =
+                    state.writeOrReplace(
+                        "acl-" + currentAclObj.securableId(), updatedAclObj, 
AclObj.class);
+                aclIndex.put(candidate.aclKey(), objRef(updatedAclObj));
+                changed = true;
+                cleaned++;
+              }
+
+              if (!changed) {
+                return state.noCommit(0);
+              }
+
+              var builder = RealmGrantsObj.builder().from(refObj.get());
+              builder.acls(aclIndex.toIndexed("idx-sec-", 
state::writeOrReplace));
+              return state.commitResult(cleaned, builder, refObj);
+            })
+        .orElse(0);
+  }
+
+  private void cleanupStalePolicyMappings(Persistence persistence, 
EntityLookupCache entityLookup) {
+    LOGGER.info("Cleaning stale policy mappings...");
+    ignoreReferenceNotFound(
+        () -> {
+          IndexKey continuation = null;
+          while (true) {
+            var refObj =
+                persistence.fetchReferenceHead(POLICY_MAPPINGS_REF_NAME, 
PolicyMappingsObj.class);
+            if (refObj.isEmpty()) {
+              return;
+            }
+
+            var batch =
+                scanStalePolicyMappings(
+                    refObj
+                        .get()
+                        .policyMappings()
+                        .indexForRead(persistence, POLICY_MAPPING_SERIALIZER),
+                    continuation,
+                    entityLookup);
+            if (batch.candidates().isEmpty()) {
+              return;
+            }
+
+            var cleaned = commitPolicyMappingsCleanup(persistence, 
batch.candidates());
+            LOGGER.debug("Removed {} stale policy mappings in this batch", 
cleaned);
+
+            if (batch.exhausted()) {
+              return;
+            }
+            continuation = batch.lastScannedKey();
+          }
+        });
+  }
+
+  private PolicyMappingsCleanupBatch scanStalePolicyMappings(
+      Index<PolicyMapping> index, IndexKey continuation, EntityLookupCache 
entityLookup) {
+    var candidates = new ArrayList<PolicyMappingsObj.KeyByEntity>();
+    var lower = continuation != null ? continuation : 
POLICY_MAPPINGS_ENTITY_PREFIX_KEY;
+    var iter = index.iterator(lower, null, false);
+    var skipContinuation = continuation != null;
+    IndexKey lastScannedKey = null;
+    while (iter.hasNext()) {
+      var elem = iter.next();
+      var key = elem.key();
+      if (skipContinuation && key.equals(continuation)) {
+        skipContinuation = false;
+        continue;
+      }
+      skipContinuation = false;
+
+      var mappingKey = PolicyMappingsObj.PolicyMappingKey.fromIndexKey(key);
+      if (!(mappingKey instanceof PolicyMappingsObj.KeyByEntity entityKey)) {
+        break;

Review Comment:
   Why ignore subsequent keys?



##########
persistence/nosql/persistence/metastore-maintenance/src/test/java/org/apache/polaris/persistence/nosql/metastore/maintenance/TestCatalogMaintenance.java:
##########
@@ -261,20 +268,400 @@ public void catalogMaintenance() {
     checkEntities("real state after grace", entities);
   }
 
+  @Test
+  public void maintenanceRemovesStalePolicyMappings() {
+    var testSetup = bootstrapRealm();
+    var manager = testSetup.manager();
+    var callCtx = testSetup.callCtx();
+    var persistence = testSetup.persistence();
+
+    var catalog = createCatalog(manager, callCtx, persistence);
+    mandatoryCatalogObjsForTestImpl(persistence, catalog.getId());
+    var namespace = createNamespace(manager, callCtx, catalog, persistence, 
"policy-ns");
+    var policy =
+        createPolicy(
+            manager,
+            callCtx,
+            catalog,
+            namespace,
+            persistence,
+            "cleanup-policy",
+            PredefinedPolicyTypes.DATA_COMPACTION);
+    var tables =
+        createTables(
+            manager,
+            callCtx,
+            catalog,
+            namespace,
+            persistence,
+            CatalogRetainedIdentifier.STALE_CLEANUP_BATCH_SIZE + 1,
+            "policy-table-");
+
+    for (var table : tables) {
+      assertThat(
+              manager.attachPolicyToEntity(
+                  callCtx,
+                  List.of(catalog, namespace),
+                  table,
+                  List.of(catalog, namespace),
+                  policy,
+                  Map.of()))
+          .extracting(BaseResult::isSuccess)
+          .isEqualTo(true);
+    }
+
+    assertThat(countPolicyMappingsForPolicy(persistence, 
policy.getCatalogId(), policy.getId()))
+        .isEqualTo(2L * tables.size());
+
+    assertThat(manager.dropEntityIfExists(callCtx, List.of(catalog, 
namespace), policy, null, true))
+        .extracting(BaseResult::isSuccess)
+        .isEqualTo(true);
+
+    LoadPolicyMappingsResult staleMappings =
+        manager.loadPoliciesOnEntity(callCtx, tables.getFirst());
+    assertThat(staleMappings.isSuccess()).isTrue();
+    assertThat(staleMappings.getPolicyMappingRecords()).hasSize(1);
+    assertThat(staleMappings.getEntities()).isEmpty();
+
+    assertThat(runMaintenance().success()).isTrue();
+
+    assertThat(countPolicyMappingsForPolicy(persistence, 
policy.getCatalogId(), policy.getId()))
+        .isZero();
+  }
+
+  @Test
+  public void maintenanceRemovesStalePolicyMappingsForDeletedTargets() {
+    var testSetup = bootstrapRealm();
+    var manager = testSetup.manager();
+    var callCtx = testSetup.callCtx();
+    var persistence = testSetup.persistence();
+
+    var catalog = createCatalog(manager, callCtx, persistence);
+    mandatoryCatalogObjsForTestImpl(persistence, catalog.getId());
+    var namespace = createNamespace(manager, callCtx, catalog, persistence, 
"policy-target-ns");
+    var policy =
+        createPolicy(
+            manager,
+            callCtx,
+            catalog,
+            namespace,
+            persistence,
+            "cleanup-target-policy",
+            PredefinedPolicyTypes.DATA_COMPACTION);
+    var staleTables =
+        createTables(
+            manager,
+            callCtx,
+            catalog,
+            namespace,
+            persistence,
+            CatalogRetainedIdentifier.STALE_CLEANUP_BATCH_SIZE + 1,
+            "stale-policy-table-");
+    var liveTable =
+        createTable(manager, callCtx, catalog, namespace, persistence, 
"live-policy-table");
+
+    for (var table : staleTables) {
+      assertThat(
+              manager.attachPolicyToEntity(
+                  callCtx,
+                  List.of(catalog, namespace),
+                  table,
+                  List.of(catalog, namespace),
+                  policy,
+                  Map.of()))
+          .extracting(BaseResult::isSuccess)
+          .isEqualTo(true);
+    }
+    assertThat(
+            manager.attachPolicyToEntity(
+                callCtx,
+                List.of(catalog, namespace),
+                liveTable,
+                List.of(catalog, namespace),
+                policy,
+                Map.of()))
+        .extracting(BaseResult::isSuccess)
+        .isEqualTo(true);
+
+    assertThat(countPolicyMappingsForPolicy(persistence, 
policy.getCatalogId(), policy.getId()))
+        .isEqualTo(2L * (staleTables.size() + 1L));
+
+    for (var table : staleTables) {
+      assertThat(
+              manager.dropEntityIfExists(callCtx, List.of(catalog, namespace), 
table, null, false))
+          .extracting(BaseResult::isSuccess)
+          .isEqualTo(true);
+    }
+
+    assertThat(countPolicyMappingsForPolicy(persistence, 
policy.getCatalogId(), policy.getId()))
+        .isEqualTo(2L * (staleTables.size() + 1L));
+    assertThat(manager.loadPoliciesOnEntity(callCtx, 
liveTable).getPolicyMappingRecords())
+        .hasSize(1);
+
+    assertThat(runMaintenance().success()).isTrue();
+
+    assertThat(countPolicyMappingsForPolicy(persistence, 
policy.getCatalogId(), policy.getId()))
+        .isEqualTo(2L);
+    assertThat(manager.loadPoliciesOnEntity(callCtx, 
liveTable).getPolicyMappingRecords())
+        .hasSize(1);
+  }
+
+  @Test
+  public void maintenanceRemovesStaleGrantRecords() {
+    var testSetup = bootstrapRealm();
+    var manager = testSetup.manager();
+    var callCtx = testSetup.callCtx();
+    var persistence = testSetup.persistence();
+
+    var catalog = createCatalog(manager, callCtx, persistence);
+    mandatoryCatalogObjsForTestImpl(persistence, catalog.getId());
+    var namespace = createNamespace(manager, callCtx, catalog, persistence, 
"grant-ns");
+    var catalogRole = createCatalogRole(manager, callCtx, catalog, 
persistence);
+    var tables =
+        createTables(
+            manager,
+            callCtx,
+            catalog,
+            namespace,
+            persistence,
+            CatalogRetainedIdentifier.STALE_CLEANUP_BATCH_SIZE + 1,
+            "grant-table-");
+
+    for (var table : tables) {
+      assertThat(
+              manager.grantPrivilegeOnSecurableToRole(
+                  callCtx,
+                  catalogRole,
+                  List.of(catalog, namespace),
+                  table,
+                  PolarisPrivilege.TABLE_READ_DATA))
+          .extracting(BaseResult::isSuccess)
+          .isEqualTo(true);
+    }
+
+    var staleAclNames = new ArrayList<String>();
+    staleAclNames.add(grantAclName(catalogRole));
+    tables.forEach(table -> staleAclNames.add(grantAclName(table)));
+    assertThat(countGrantAclHeads(persistence, 
staleAclNames)).isEqualTo(tables.size() + 1L);
+
+    assertThat(manager.dropEntityIfExists(callCtx, List.of(catalog), 
catalogRole, null, false))
+        .extracting(BaseResult::isSuccess)
+        .isEqualTo(true);
+
+    var staleGrants = manager.loadGrantsOnSecurable(callCtx, 
tables.getFirst());
+    assertThat(staleGrants.isSuccess()).isTrue();
+    assertThat(staleGrants.getGrantRecords()).isEmpty();
+    assertThat(staleGrants.getEntities()).isEmpty();
+
+    assertThat(runMaintenance().success()).isTrue();
+
+    assertThat(countGrantAclHeads(persistence, staleAclNames)).isZero();
+
+    var cleanedGrants = manager.loadGrantsOnSecurable(callCtx, 
tables.getFirst());
+    assertThat(cleanedGrants.isSuccess()).isTrue();
+    assertThat(cleanedGrants.getGrantRecords()).isEmpty();
+    assertThat(cleanedGrants.getEntities()).isEmpty();

Review Comment:
   What's the difference in `loadGrantsOnSecurable()` before/after cleanup? Are 
`staleGrants` really stale?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to