snazy commented on code in PR #4526:
URL: https://github.com/apache/polaris/pull/4526#discussion_r3312065741


##########
persistence/nosql/persistence/metastore-maintenance/src/main/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CatalogRetainedIdentifier.java:
##########
@@ -331,4 +375,351 @@ void ignoreReferenceNotFound(Runnable runnable) {
       LOGGER.debug("Reference not found: {}", e.getMessage());
     }
   }
+
+  private void cleanupStaleGrantRecords(Persistence persistence, 
EntityLookupCache entityLookup) {
+    LOGGER.info("Cleaning stale grant records...");
+    ignoreReferenceNotFound(
+        () -> {
+          IndexKey continuation = null;
+          while (true) {
+            var refObj =
+                persistence.fetchReferenceHead(REALM_GRANTS_REF_NAME, 
RealmGrantsObj.class);
+            if (refObj.isEmpty()) {
+              return;
+            }
+
+            var batch =
+                scanStaleGrantRecords(
+                    persistence,
+                    refObj.get().acls().indexForRead(persistence, 
OBJ_REF_SERIALIZER),
+                    continuation,
+                    entityLookup);
+            if (batch.candidates().isEmpty()) {
+              return;
+            }
+
+            var cleaned = commitGrantCleanup(persistence, batch.candidates());
+            LOGGER.debug("Removed {} stale grant ACL entries in this batch", 
cleaned);
+
+            if (batch.exhausted()) {
+              return;
+            }
+            continuation = batch.lastScannedKey();
+          }
+        });
+  }
+
+  private GrantCleanupBatch scanStaleGrantRecords(
+      Persistence persistence,
+      Index<ObjRef> index,
+      IndexKey continuation,
+      EntityLookupCache entityLookup) {
+    var candidates = new ArrayList<GrantCleanupCandidate>();
+    var iter = index.iterator(continuation, null, false);
+    var skipContinuation = continuation != null;
+    IndexKey lastScannedKey = null;
+    while (iter.hasNext()) {
+      var elem = iter.next();
+      var aclKey = elem.key();
+      if (skipContinuation && aclKey.equals(continuation)) {
+        skipContinuation = false;
+        continue;
+      }
+      skipContinuation = false;
+      lastScannedKey = aclKey;
+
+      var anchor = GrantTriplet.fromRoleName(aclKey.toString());
+      if (!entityLookup.entityExists(anchor.catalogId(), anchor.id(), 
anchor.typeCode())) {
+        candidates.add(new GrantCleanupCandidate(aclKey, List.of(), true));
+      } else {
+        var aclObj = persistence.fetch(elem.value(), AclObj.class);
+        if (aclObj == null) {
+          continue;
+        }
+
+        var staleRoleIds = new ArrayList<String>();
+        aclObj
+            .acl()
+            .forEach(
+                (roleId, entry) -> {
+                  var role = GrantTriplet.fromRoleName(roleId);
+                  if (!entityLookup.entityExists(role.catalogId(), role.id(), 
role.typeCode())) {
+                    staleRoleIds.add(roleId);
+                  }
+                });
+        if (!staleRoleIds.isEmpty()) {
+          candidates.add(new GrantCleanupCandidate(aclKey, staleRoleIds, 
false));
+        }
+      }
+
+      if (candidates.size() >= STALE_CLEANUP_BATCH_SIZE) {
+        return new GrantCleanupBatch(candidates, lastScannedKey, false);
+      }
+    }
+    return new GrantCleanupBatch(candidates, lastScannedKey, true);
+  }
+
+  private int commitGrantCleanup(
+      Persistence persistence, List<GrantCleanupCandidate> 
grantCleanupCandidates) {
+    return persistence
+        .createCommitter(REALM_GRANTS_REF_NAME, RealmGrantsObj.class, 
Integer.class)
+        .synchronizingLocally()
+        .commitRuntimeException(
+            (state, refObjSupplier) -> {
+              var refObj = refObjSupplier.get();
+              if (refObj.isEmpty()) {
+                return state.noCommit(0);
+              }
+
+              var aclIndex =
+                  refObj.get().acls().asUpdatableIndex(state.persistence(), 
OBJ_REF_SERIALIZER);
+              var changed = false;
+              var cleaned = 0;
+              for (var candidate : grantCleanupCandidates) {
+                if (candidate.removeAcl()) {
+                  if (aclIndex.remove(candidate.aclKey())) {
+                    changed = true;
+                    cleaned++;
+                  }
+                  continue;
+                }
+
+                var aclRef = aclIndex.get(candidate.aclKey());
+                if (aclRef == null) {
+                  continue;
+                }
+
+                var currentAclObj = state.persistence().fetch(aclRef, 
AclObj.class);
+                if (currentAclObj == null) {
+                  continue;
+                }
+
+                var aclBuilder = 
privileges.newAclBuilder().from(currentAclObj.acl());
+                candidate.staleRoleIds().forEach(aclBuilder::removeEntry);
+                var updatedAcl = aclBuilder.build();
+                if (currentAclObj.acl().equals(updatedAcl)) {
+                  continue;
+                }
+
+                if (aclIsEmpty(updatedAcl)) {
+                  if (aclIndex.remove(candidate.aclKey())) {
+                    changed = true;
+                    cleaned++;
+                  }
+                  continue;
+                }
+
+                AclObj updatedAclObj =
+                    AclObj.builder()
+                        .from(currentAclObj)
+                        .id(state.persistence().generateId())
+                        .acl(updatedAcl)
+                        .build();
+                updatedAclObj =
+                    state.writeOrReplace(
+                        "acl-" + currentAclObj.securableId(), updatedAclObj, 
AclObj.class);
+                aclIndex.put(candidate.aclKey(), objRef(updatedAclObj));
+                changed = true;
+                cleaned++;
+              }
+
+              if (!changed) {
+                return state.noCommit(0);
+              }
+
+              var builder = RealmGrantsObj.builder().from(refObj.get());
+              builder.acls(aclIndex.toIndexed("idx-sec-", 
state::writeOrReplace));
+              return state.commitResult(cleaned, builder, refObj);
+            })
+        .orElse(0);
+  }
+
+  private void cleanupStalePolicyMappings(Persistence persistence, 
EntityLookupCache entityLookup) {
+    LOGGER.info("Cleaning stale policy mappings...");
+    ignoreReferenceNotFound(
+        () -> {
+          IndexKey continuation = null;
+          while (true) {
+            var refObj =
+                persistence.fetchReferenceHead(POLICY_MAPPINGS_REF_NAME, 
PolicyMappingsObj.class);
+            if (refObj.isEmpty()) {
+              return;
+            }
+
+            var batch =
+                scanStalePolicyMappings(
+                    refObj
+                        .get()
+                        .policyMappings()
+                        .indexForRead(persistence, POLICY_MAPPING_SERIALIZER),
+                    continuation,
+                    entityLookup);
+            if (batch.candidates().isEmpty()) {
+              return;
+            }
+
+            var cleaned = commitPolicyMappingsCleanup(persistence, 
batch.candidates());
+            LOGGER.debug("Removed {} stale policy mappings in this batch", 
cleaned);
+
+            if (batch.exhausted()) {
+              return;
+            }
+            continuation = batch.lastScannedKey();
+          }
+        });
+  }
+
+  private PolicyMappingsCleanupBatch scanStalePolicyMappings(
+      Index<PolicyMapping> index, IndexKey continuation, EntityLookupCache 
entityLookup) {
+    var candidates = new ArrayList<PolicyMappingsObj.KeyByEntity>();
+    var lower = continuation != null ? continuation : 
POLICY_MAPPINGS_ENTITY_PREFIX_KEY;
+    var iter = index.iterator(lower, null, false);
+    var skipContinuation = continuation != null;
+    IndexKey lastScannedKey = null;
+    while (iter.hasNext()) {
+      var elem = iter.next();
+      var key = elem.key();
+      if (skipContinuation && key.equals(continuation)) {
+        skipContinuation = false;
+        continue;
+      }
+      skipContinuation = false;
+
+      var mappingKey = PolicyMappingsObj.PolicyMappingKey.fromIndexKey(key);
+      if (!(mappingKey instanceof PolicyMappingsObj.KeyByEntity entityKey)) {
+        break;
+      }
+
+      lastScannedKey = key;
+      if (!entityLookup.policyExists(entityKey.policyCatalogId(), 
entityKey.policyId())
+          || !entityLookup.policyTargetExists(entityKey.entityCatalogId(), 
entityKey.entityId())) {
+        candidates.add(entityKey);
+      }
+
+      if (candidates.size() >= STALE_CLEANUP_BATCH_SIZE) {
+        return new PolicyMappingsCleanupBatch(candidates, lastScannedKey, 
false);
+      }
+    }
+    return new PolicyMappingsCleanupBatch(candidates, lastScannedKey, true);
+  }
+
+  private int commitPolicyMappingsCleanup(
+      Persistence persistence, List<PolicyMappingsObj.KeyByEntity> 
staleMappings) {
+    return persistence
+        .createCommitter(POLICY_MAPPINGS_REF_NAME, PolicyMappingsObj.class, 
Integer.class)
+        .synchronizingLocally()
+        .commitRuntimeException(
+            (state, refObjSupplier) -> {
+              var refObj = refObjSupplier.get();
+              if (refObj.isEmpty()) {
+                return state.noCommit(0);
+              }
+
+              var mappingsIndex =
+                  refObj
+                      .get()
+                      .policyMappings()
+                      .asUpdatableIndex(state.persistence(), 
POLICY_MAPPING_SERIALIZER);
+              var changed = false;
+              var cleaned = 0;
+              for (var keyByEntity : staleMappings) {
+                var removedForward = 
mappingsIndex.remove(keyByEntity.toIndexKey());
+                var removedReverse = 
mappingsIndex.remove(keyByEntity.reverse().toIndexKey());
+                if (removedForward || removedReverse) {
+                  changed = true;
+                  cleaned++;
+                }
+              }
+
+              if (!changed) {
+                return state.noCommit(0);
+              }
+
+              var builder = PolicyMappingsObj.builder().from(refObj.get());
+              builder.policyMappings(mappingsIndex.toIndexed("mappings", 
state::writeOrReplace));
+              return state.commitResult(cleaned, builder, refObj);
+            })
+        .orElse(0);
+  }
+
+  private static boolean aclIsEmpty(Acl acl) {
+    var hasEntries = new boolean[1];
+    acl.forEach((roleId, entry) -> hasEntries[0] = true);

Review Comment:
   Even more efficient, added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to