This is an automated email from the ASF dual-hosted git repository.

snazy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 27c3f9680 NoSQL: Metastore maintenance (#3268)
27c3f9680 is described below

commit 27c3f9680e3fe5d6ddecf9c838caf6ca8918b719
Author: Robert Stupp <[email protected]>
AuthorDate: Fri Jan 9 11:22:01 2026 +0100

    NoSQL: Metastore maintenance (#3268)
    
    Implementation of the NoSQL meta-store maintenance implementation. It adds 
the meta-store specific handling to the existing NoSQL maintenance service to 
purge unreferenced and unneeded data from the database.
---
 bom/build.gradle.kts                               |   1 +
 gradle/projects.main.properties                    |   1 +
 .../nosql/maintenance/api/MaintenanceConfig.java   |   2 +-
 .../metastore-maintenance/build.gradle.kts         |  80 ++++
 .../maintenance/CatalogRetainedIdentifier.java     | 334 ++++++++++++++
 .../maintenance/CatalogsMaintenanceConfig.java     | 103 +++++
 .../src/main/resources/META-INF/beans.xml          |  24 ++
 .../maintenance/TestCatalogMaintenance.java        | 479 +++++++++++++++++++++
 .../src/test/resources/logback-test.xml            |  34 ++
 .../src/test/resources/weld.properties             |  21 +
 .../nosql/metastore/maintenance/CdiProducers.java  |  32 ++
 .../MutableCatalogsMaintenanceConfig.java          |  71 +++
 .../src/testFixtures/resources/META-INF/beans.xml  |  24 ++
 13 files changed, 1205 insertions(+), 1 deletion(-)

diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts
index 213f31ed7..d5e5afc75 100644
--- a/bom/build.gradle.kts
+++ b/bom/build.gradle.kts
@@ -63,6 +63,7 @@ dependencies {
     api(project(":polaris-persistence-nosql-impl"))
     api(project(":polaris-persistence-nosql-benchmark"))
     api(project(":polaris-persistence-nosql-metastore"))
+    api(project(":polaris-persistence-nosql-metastore-maintenance"))
     api(project(":polaris-persistence-nosql-metastore-types"))
     api(project(":polaris-persistence-nosql-correctness"))
     api(project(":polaris-persistence-nosql-cdi-common"))
diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties
index 85ff05b26..fadbbadea 100644
--- a/gradle/projects.main.properties
+++ b/gradle/projects.main.properties
@@ -78,6 +78,7 @@ 
polaris-persistence-nosql-api=persistence/nosql/persistence/api
 polaris-persistence-nosql-impl=persistence/nosql/persistence/impl
 polaris-persistence-nosql-benchmark=persistence/nosql/persistence/benchmark
 polaris-persistence-nosql-metastore=persistence/nosql/persistence/metastore
+polaris-persistence-nosql-metastore-maintenance=persistence/nosql/persistence/metastore-maintenance
 
polaris-persistence-nosql-metastore-types=persistence/nosql/persistence/metastore-types
 polaris-persistence-nosql-correctness=persistence/nosql/persistence/correctness
 polaris-persistence-nosql-cdi-common=persistence/nosql/persistence/cdi/common
diff --git 
a/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceConfig.java
 
b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceConfig.java
index 8fb0a493f..0d46cbf48 100644
--- 
a/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceConfig.java
+++ 
b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceConfig.java
@@ -35,7 +35,7 @@ import org.apache.polaris.immutables.PolarisImmutable;
 import org.immutables.value.Value;
 
 /** Maintenance service configuration. */
-@ConfigMapping(prefix = "polaris.persistence.maintenance")
+@ConfigMapping(prefix = "polaris.persistence.nosql.maintenance")
 @PolarisImmutable
 @JsonSerialize(as = ImmutableMaintenanceConfig.class)
 @JsonDeserialize(as = ImmutableMaintenanceConfig.class)
diff --git 
a/persistence/nosql/persistence/metastore-maintenance/build.gradle.kts 
b/persistence/nosql/persistence/metastore-maintenance/build.gradle.kts
new file mode 100644
index 000000000..5ba625328
--- /dev/null
+++ b/persistence/nosql/persistence/metastore-maintenance/build.gradle.kts
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("org.kordamp.gradle.jandex")
+  id("polaris-server")
+}
+
+description = "Polaris NoSQL persistence core types"
+
+dependencies {
+  implementation(project(":polaris-core"))
+  implementation(project(":polaris-persistence-nosql-api"))
+  implementation(project(":polaris-idgen-api"))
+  implementation(project(":polaris-persistence-nosql-maintenance-api"))
+  implementation(project(":polaris-persistence-nosql-maintenance-spi"))
+  implementation(project(":polaris-persistence-nosql-maintenance-cel"))
+  implementation(project(":polaris-persistence-nosql-metastore-types"))
+
+  implementation(libs.guava)
+  implementation(libs.slf4j.api)
+
+  implementation(platform(libs.jackson.bom))
+  implementation("com.fasterxml.jackson.core:jackson-annotations")
+  implementation("com.fasterxml.jackson.core:jackson-core")
+  implementation("com.fasterxml.jackson.core:jackson-databind")
+  implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-smile")
+
+  compileOnly(libs.smallrye.config.core)
+  compileOnly(platform(libs.quarkus.bom))
+  compileOnly("io.quarkus:quarkus-core")
+
+  compileOnly(project(":polaris-immutables"))
+  annotationProcessor(project(":polaris-immutables", configuration = 
"processor"))
+
+  compileOnly(libs.jakarta.annotation.api)
+  compileOnly(libs.jakarta.validation.api)
+  compileOnly(libs.jakarta.inject.api)
+  compileOnly(libs.jakarta.enterprise.cdi.api)
+
+  testFixturesImplementation(project(":polaris-core"))
+  testFixturesImplementation(project(":polaris-persistence-nosql-metastore"))
+  
testFixturesImplementation(testFixtures(project(":polaris-persistence-nosql-metastore")))
+  testRuntimeOnly(project(":polaris-persistence-nosql-authz-impl"))
+  testRuntimeOnly(project(":polaris-persistence-nosql-authz-store-nosql"))
+
+  testFixturesImplementation(libs.jakarta.annotation.api)
+  testFixturesImplementation(libs.jakarta.validation.api)
+  testFixturesImplementation(libs.jakarta.enterprise.cdi.api)
+  testImplementation(libs.smallrye.common.annotation)
+
+  testFixturesImplementation(platform(libs.jackson.bom))
+  testFixturesImplementation("com.fasterxml.jackson.core:jackson-annotations")
+  testFixturesImplementation("com.fasterxml.jackson.core:jackson-core")
+
+  testImplementation(project(":polaris-idgen-mocks"))
+  
testImplementation(testFixtures(project(":polaris-persistence-nosql-maintenance-impl")))
+  testImplementation(project(":polaris-persistence-nosql-impl"))
+
+  testRuntimeOnly(testFixtures(project(":polaris-persistence-nosql-cdi-weld")))
+  testImplementation(libs.weld.se.core)
+  testImplementation(libs.weld.junit5)
+  testRuntimeOnly(libs.smallrye.jandex)
+}
diff --git 
a/persistence/nosql/persistence/metastore-maintenance/src/main/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CatalogRetainedIdentifier.java
 
b/persistence/nosql/persistence/metastore-maintenance/src/main/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CatalogRetainedIdentifier.java
new file mode 100644
index 000000000..cb5524101
--- /dev/null
+++ 
b/persistence/nosql/persistence/metastore-maintenance/src/main/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CatalogRetainedIdentifier.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.metastore.maintenance;
+
+import static java.lang.String.format;
+import static 
org.apache.polaris.persistence.nosql.api.obj.ObjRef.OBJ_REF_SERIALIZER;
+import static 
org.apache.polaris.persistence.nosql.coretypes.catalog.CatalogRolesObj.CATALOG_ROLES_REF_NAME_PATTERN;
+import static 
org.apache.polaris.persistence.nosql.coretypes.catalog.CatalogStateObj.CATALOG_STATE_REF_NAME_PATTERN;
+import static 
org.apache.polaris.persistence.nosql.coretypes.catalog.CatalogsObj.CATALOGS_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.coretypes.principals.PrincipalRolesObj.PRINCIPAL_ROLES_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.coretypes.principals.PrincipalsObj.PRINCIPALS_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.ImmediateTasksObj.IMMEDIATE_TASKS_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMapping.POLICY_MAPPING_SERIALIZER;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj.POLICY_MAPPINGS_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.RealmGrantsObj.REALM_GRANTS_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.RootObj.ROOT_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.metastore.maintenance.CatalogsMaintenanceConfig.DEFAULT_CATALOGS_HISTORY_RETAIN;
+import static 
org.apache.polaris.persistence.nosql.metastore.maintenance.CatalogsMaintenanceConfig.DEFAULT_CATALOG_POLICIES_RETAIN;
+import static 
org.apache.polaris.persistence.nosql.metastore.maintenance.CatalogsMaintenanceConfig.DEFAULT_CATALOG_ROLES_RETAIN;
+import static 
org.apache.polaris.persistence.nosql.metastore.maintenance.CatalogsMaintenanceConfig.DEFAULT_CATALOG_STATE_RETAIN;
+import static 
org.apache.polaris.persistence.nosql.metastore.maintenance.CatalogsMaintenanceConfig.DEFAULT_GRANTS_RETAIN;
+import static 
org.apache.polaris.persistence.nosql.metastore.maintenance.CatalogsMaintenanceConfig.DEFAULT_IMMEDIATE_TASKS_RETAIN;
+import static 
org.apache.polaris.persistence.nosql.metastore.maintenance.CatalogsMaintenanceConfig.DEFAULT_PRINCIPALS_RETAIN;
+import static 
org.apache.polaris.persistence.nosql.metastore.maintenance.CatalogsMaintenanceConfig.DEFAULT_PRINCIPAL_ROLES_RETAIN;
+
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.maintenance.cel.CelReferenceContinuePredicate;
+import 
org.apache.polaris.persistence.nosql.api.exceptions.ReferenceNotFoundException;
+import org.apache.polaris.persistence.nosql.api.index.IndexContainer;
+import org.apache.polaris.persistence.nosql.api.index.IndexKey;
+import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.coretypes.ContainerObj;
+import org.apache.polaris.persistence.nosql.coretypes.catalog.CatalogObj;
+import org.apache.polaris.persistence.nosql.coretypes.catalog.CatalogRolesObj;
+import org.apache.polaris.persistence.nosql.coretypes.catalog.CatalogStateObj;
+import org.apache.polaris.persistence.nosql.coretypes.catalog.CatalogsObj;
+import 
org.apache.polaris.persistence.nosql.coretypes.principals.PrincipalRolesObj;
+import org.apache.polaris.persistence.nosql.coretypes.principals.PrincipalsObj;
+import org.apache.polaris.persistence.nosql.coretypes.realm.ImmediateTasksObj;
+import org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj;
+import org.apache.polaris.persistence.nosql.coretypes.realm.RealmGrantsObj;
+import org.apache.polaris.persistence.nosql.coretypes.realm.RootObj;
+import 
org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@ApplicationScoped
+class CatalogRetainedIdentifier implements PerRealmRetainedIdentifier {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CatalogRetainedIdentifier.class);
+
+  private final CatalogsMaintenanceConfig catalogsMaintenanceConfig;
+  private final MonotonicClock monotonicClock;
+
+  @SuppressWarnings("CdiInjectionPointsInspection")
+  @Inject
+  CatalogRetainedIdentifier(
+      CatalogsMaintenanceConfig catalogsMaintenanceConfig, MonotonicClock 
monotonicClock) {
+    this.catalogsMaintenanceConfig = catalogsMaintenanceConfig;
+    this.monotonicClock = monotonicClock;
+  }
+
+  @Override
+  public String name() {
+    return "Catalog data";
+  }
+
+  @Override
+  public boolean identifyRetained(@Nonnull RetainedCollector collector) {
+
+    // Note: References & objects retrieved via the `Persistence` instance 
returned by the
+    // `RetainedCollector` are automatically retained (no need to call 
collector.retain*()
+    // explicitly).
+    var persistence = collector.realmPersistence();
+
+    // per realm
+
+    // The root object is "special" (there's only one)
+    LOGGER.info("Identifying root object...");
+    ignoreReferenceNotFound(() -> 
persistence.fetchReferenceHead(ROOT_REF_NAME, RootObj.class));
+
+    perRealmContainer(
+        "principals",
+        PRINCIPALS_REF_NAME,
+        
catalogsMaintenanceConfig.principalsRetain().orElse(DEFAULT_PRINCIPALS_RETAIN),
+        PrincipalsObj.class,
+        collector);
+
+    perRealmContainer(
+        "principal roles",
+        PRINCIPAL_ROLES_REF_NAME,
+        
catalogsMaintenanceConfig.principalRolesRetain().orElse(DEFAULT_PRINCIPAL_ROLES_RETAIN),
+        PrincipalRolesObj.class,
+        collector);
+
+    perRealm(
+        "grants",
+        REALM_GRANTS_REF_NAME,
+        catalogsMaintenanceConfig.grantsRetain().orElse(DEFAULT_GRANTS_RETAIN),
+        RealmGrantsObj.class,
+        RealmGrantsObj::acls,
+        collector);
+
+    perRealmContainer(
+        "immediate tasks",
+        IMMEDIATE_TASKS_REF_NAME,
+        
catalogsMaintenanceConfig.immediateTasksRetain().orElse(DEFAULT_IMMEDIATE_TASKS_RETAIN),
+        ImmediateTasksObj.class,
+        collector);
+
+    LOGGER.info("Identifying policy mappings...");
+    ignoreReferenceNotFound(
+        () -> {
+          var policyMappingsContinue =
+              new CelReferenceContinuePredicate<PolicyMappingsObj>(
+                  POLICY_MAPPINGS_REF_NAME,
+                  persistence,
+                  catalogsMaintenanceConfig
+                      .catalogPoliciesRetain()
+                      .orElse(DEFAULT_CATALOG_POLICIES_RETAIN));
+          // PolicyMappings are stored _INLINE_
+          collector.refRetain(
+              POLICY_MAPPINGS_REF_NAME,
+              PolicyMappingsObj.class,
+              policyMappingsContinue,
+              policyMappingsObj ->
+                  policyMappingsObj
+                      .policyMappings()
+                      .indexForRead(collector.realmPersistence(), 
POLICY_MAPPING_SERIALIZER)
+                      .forEach(
+                          e -> {
+                            var policyMapping = e.getValue();
+                            
policyMapping.externalMapping().ifPresent(collector::retainObject);
+                          }));
+        });
+
+    // per catalog
+
+    LOGGER.info("Identifying catalogs...");
+    ignoreReferenceNotFound(
+        () -> {
+          var catalogsHistoryContinue =
+              new CelReferenceContinuePredicate<CatalogsObj>(
+                  CATALOGS_REF_NAME,
+                  persistence,
+                  catalogsMaintenanceConfig
+                      .catalogsHistoryRetain()
+                      .orElse(DEFAULT_CATALOGS_HISTORY_RETAIN));
+          var currentCatalogs = new ConcurrentHashMap<IndexKey, ObjRef>();
+          collector.refRetain(
+              CATALOGS_REF_NAME,
+              CatalogsObj.class,
+              catalogsHistoryContinue,
+              catalogs -> {
+                var allCatalogsIndex =
+                    catalogs.nameToObjRef().indexForRead(persistence, 
OBJ_REF_SERIALIZER);
+                for (var entry : allCatalogsIndex) {
+                  var catalogKey = entry.getKey();
+                  var catalogObjRef = entry.getValue();
+                  currentCatalogs.putIfAbsent(catalogKey, catalogObjRef);
+                }
+                collector.indexRetain(catalogs.stableIdToName());
+              });
+
+          var catalogObjs =
+              persistence.fetchMany(
+                  CatalogObj.class, 
currentCatalogs.values().toArray(ObjRef[]::new));
+          for (var catalogObj : catalogObjs) {
+            if (catalogObj == null) {
+              // just in case...
+              continue;
+            }
+
+            perCatalog(
+                "catalog roles",
+                CATALOG_ROLES_REF_NAME_PATTERN,
+                catalogObj,
+                
catalogsMaintenanceConfig.catalogRolesRetain().orElse(DEFAULT_CATALOG_ROLES_RETAIN),
+                CatalogRolesObj.class,
+                CatalogRolesObj::nameToObjRef,
+                collector,
+                catalogRolesObj -> 
collector.indexRetain(catalogRolesObj.stableIdToName()));
+
+            LOGGER.info(
+                "Identifying catalog state for catalog '{}' ({})...",
+                catalogObj.name(),
+                catalogObj.stableId());
+            ignoreReferenceNotFound(
+                () -> {
+                  var catalogStateRefName =
+                      format(CATALOG_STATE_REF_NAME_PATTERN, 
catalogObj.stableId());
+                  var catalogStateContinue =
+                      new CelReferenceContinuePredicate<CatalogStateObj>(
+                          catalogStateRefName,
+                          persistence,
+                          catalogsMaintenanceConfig
+                              .catalogStateRetain()
+                              .orElse(DEFAULT_CATALOG_STATE_RETAIN));
+                  collector.refRetainIndexToSingleObj(
+                      catalogStateRefName,
+                      CatalogStateObj.class,
+                      catalogStateContinue,
+                      CatalogStateObj::nameToObjRef,
+                      new RetainedCollector.ProgressListener<>() {
+                        public static final long PROGRESS_LOG_INTERVAL_MS = 
2_000L;
+                        private long commit;
+                        private long nextLog =
+                            monotonicClock.currentTimeMillis() + 
PROGRESS_LOG_INTERVAL_MS;
+
+                        @Override
+                        public void onCommit(CatalogStateObj catalogStateObj, 
long commit) {
+                          
collector.indexRetain(catalogStateObj.stableIdToName());
+                          
catalogStateObj.locations().ifPresent(collector::indexRetain);
+                          
catalogStateObj.changes().ifPresent(collector::indexRetain);
+                          this.commit = commit;
+                        }
+
+                        @Override
+                        public void onIndexEntry(long inCommit, long total) {
+                          var now = monotonicClock.currentTimeMillis();
+                          if (now >= nextLog) {
+                            LOGGER.info(
+                                "... {} total index entries processed so far, 
at commit {}",
+                                total,
+                                commit);
+                            nextLog = now + PROGRESS_LOG_INTERVAL_MS;
+                          }
+                        }
+                      },
+                      x -> {});
+                });
+          }
+        });
+
+    return true;
+  }
+
+  @SuppressWarnings({"LoggingSimilarMessage", "SameParameterValue"})
+  private <O extends BaseCommitObj> void perRealm(
+      String what,
+      String refName,
+      String celRetainExpr,
+      Class<O> objClazz,
+      Function<O, IndexContainer<ObjRef>> indexContainerFunction,
+      RetainedCollector collector) {
+
+    LOGGER.info("Identifying {}...", what);
+    ignoreReferenceNotFound(
+        () -> {
+          var persistence = collector.realmPersistence();
+          var historyContinue =
+              new CelReferenceContinuePredicate<O>(refName, persistence, 
celRetainExpr);
+          collector.refRetainIndexToSingleObj(
+              refName, objClazz, historyContinue, indexContainerFunction);
+        });
+  }
+
+  @SuppressWarnings("LoggingSimilarMessage")
+  private <O extends ContainerObj> void perRealmContainer(
+      String what,
+      String refName,
+      String celRetainExpr,
+      Class<O> objClazz,
+      RetainedCollector collector) {
+
+    LOGGER.info("Identifying {}...", what);
+    ignoreReferenceNotFound(
+        () -> {
+          var persistence = collector.realmPersistence();
+          var historyContinue =
+              new CelReferenceContinuePredicate<O>(refName, persistence, 
celRetainExpr);
+          collector.refRetainIndexToSingleObj(
+              refName,
+              objClazz,
+              historyContinue,
+              ContainerObj::nameToObjRef,
+              containerObj -> 
collector.indexRetain(containerObj.stableIdToName()));
+        });
+  }
+
+  private <O extends BaseCommitObj> void perCatalog(
+      String what,
+      String refNamePattern,
+      CatalogObj catalogObj,
+      String celRetainExpr,
+      Class<O> objClazz,
+      Function<O, IndexContainer<ObjRef>> indexContainerFunction,
+      RetainedCollector collector,
+      Consumer<O> objConsumer) {
+    LOGGER.info(
+        "Identifying {} for catalog '{}' ({})...", what, catalogObj.name(), 
catalogObj.stableId());
+    ignoreReferenceNotFound(
+        () -> {
+          var persistence = collector.realmPersistence();
+          var refName = format(refNamePattern, catalogObj.stableId());
+          var historyContinue =
+              new CelReferenceContinuePredicate<O>(refName, persistence, 
celRetainExpr);
+          collector.refRetainIndexToSingleObj(
+              refName, objClazz, historyContinue, indexContainerFunction, 
objConsumer);
+        });
+  }
+
+  void ignoreReferenceNotFound(Runnable runnable) {
+    try {
+      runnable.run();
+    } catch (ReferenceNotFoundException e) {
+      LOGGER.debug("Reference not found: {}", e.getMessage());
+    }
+  }
+}
diff --git 
a/persistence/nosql/persistence/metastore-maintenance/src/main/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CatalogsMaintenanceConfig.java
 
b/persistence/nosql/persistence/metastore-maintenance/src/main/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CatalogsMaintenanceConfig.java
new file mode 100644
index 000000000..6392b5064
--- /dev/null
+++ 
b/persistence/nosql/persistence/metastore-maintenance/src/main/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CatalogsMaintenanceConfig.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.metastore.maintenance;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import java.util.Optional;
+import org.apache.polaris.immutables.PolarisImmutable;
+
+/**
+ * No SQL persistence implementation of Polaris stores a history of changes 
per kind of object
+ * (principals, principal roles, grants, immediate tasks, catalog roles and 
catalog state).
+ *
+ * <p>The rules are defined using a <a 
href="https://github.com/projectnessie/cel-java/";>CEL
+ * script</a>. The default rules for all kinds of objects are to retain the 
history for 3 days, for
+ * the catalog state for 30 days.
+ *
+ * <p>The scripts have access to the following declared values:
+ *
+ * <ul>
+ *   <li>{@code ref} (string) name of the reference
+ *   <li>{@code commits} (64-bit int) number of the currently processed 
commit, starting at {@code
+ *       1}
+ *   <li>{@code ageDays} (64-bit int) age of currently processed commit in days
+ *   <li>{@code ageHours} (64-bit int) age of currently processed commit in 
hours
+ *   <li>{@code ageMinutes} (64-bit int) age of currently processed commit in 
minutes
+ * </ul>
+ *
+ * <p>Scripts <em>must</em> return a {@code boolean} yielding whether the 
commit shall be retained.
+ * Note that maintenance-service implementations can keep the first 
not-to-be-retained commit.
+ *
+ * <p>Example scripts
+ *
+ * <ul>
+ *   <li>{@code ageDays < 30 || commits <= 10} retains the reference history 
with at least 10
+ *       commits and commits that are younger than 30 days
+ *   <li>{@code true} retains the whole reference history
+ *   <li>{@code false} retains the most recent commit
+ * </ul>
+ */
+@ConfigMapping(prefix = "polaris.persistence.nosql.maintenance.catalog")
+@JsonSerialize(as = ImmutableBuildableCatalogsMaintenanceConfig.class)
+@JsonDeserialize(as = ImmutableBuildableCatalogsMaintenanceConfig.class)
+public interface CatalogsMaintenanceConfig {
+
+  String DEFAULT_PRINCIPALS_RETAIN = "false";
+  String DEFAULT_PRINCIPAL_ROLES_RETAIN = "false";
+  String DEFAULT_GRANTS_RETAIN = "false";
+  String DEFAULT_IMMEDIATE_TASKS_RETAIN = "false";
+  String DEFAULT_CATALOGS_HISTORY_RETAIN = "false";
+  String DEFAULT_CATALOG_ROLES_RETAIN = "false";
+  String DEFAULT_CATALOG_POLICIES_RETAIN = "ageDays < 30 || commits <= 1";
+  String DEFAULT_CATALOG_STATE_RETAIN = "ageDays < 30 || commits <= 1";
+
+  @WithDefault(DEFAULT_PRINCIPALS_RETAIN)
+  Optional<String> principalsRetain();
+
+  @WithDefault(DEFAULT_PRINCIPAL_ROLES_RETAIN)
+  Optional<String> principalRolesRetain();
+
+  @WithDefault(DEFAULT_GRANTS_RETAIN)
+  Optional<String> grantsRetain();
+
+  @WithDefault(DEFAULT_IMMEDIATE_TASKS_RETAIN)
+  Optional<String> immediateTasksRetain();
+
+  @WithDefault(DEFAULT_CATALOGS_HISTORY_RETAIN)
+  Optional<String> catalogsHistoryRetain();
+
+  @WithDefault(DEFAULT_CATALOG_ROLES_RETAIN)
+  Optional<String> catalogRolesRetain();
+
+  @WithDefault(DEFAULT_CATALOG_POLICIES_RETAIN)
+  Optional<String> catalogPoliciesRetain();
+
+  @WithDefault(DEFAULT_CATALOG_STATE_RETAIN)
+  Optional<String> catalogStateRetain();
+
+  @PolarisImmutable
+  interface BuildableCatalogsMaintenanceConfig extends 
CatalogsMaintenanceConfig {
+    static ImmutableBuildableCatalogsMaintenanceConfig.Builder builder() {
+      return ImmutableBuildableCatalogsMaintenanceConfig.builder();
+    }
+  }
+}
diff --git 
a/persistence/nosql/persistence/metastore-maintenance/src/main/resources/META-INF/beans.xml
 
b/persistence/nosql/persistence/metastore-maintenance/src/main/resources/META-INF/beans.xml
new file mode 100644
index 000000000..a297f1aa5
--- /dev/null
+++ 
b/persistence/nosql/persistence/metastore-maintenance/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,24 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<beans xmlns="https://jakarta.ee/xml/ns/jakartaee";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee 
https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd";>
+    <!-- File required by Weld (used for testing), not by Quarkus -->
+</beans>
\ No newline at end of file
diff --git 
a/persistence/nosql/persistence/metastore-maintenance/src/test/java/org/apache/polaris/persistence/nosql/metastore/maintenance/TestCatalogMaintenance.java
 
b/persistence/nosql/persistence/metastore-maintenance/src/test/java/org/apache/polaris/persistence/nosql/metastore/maintenance/TestCatalogMaintenance.java
new file mode 100644
index 000000000..d5f311054
--- /dev/null
+++ 
b/persistence/nosql/persistence/metastore-maintenance/src/test/java/org/apache/polaris/persistence/nosql/metastore/maintenance/TestCatalogMaintenance.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.metastore.maintenance;
+
+import static java.lang.String.format;
+import static java.util.function.Function.identity;
+import static 
org.apache.polaris.core.entity.PolarisEntitySubType.ICEBERG_TABLE;
+import static org.apache.polaris.core.entity.PolarisEntityType.CATALOG_ROLE;
+import static org.apache.polaris.core.entity.PolarisEntityType.NAMESPACE;
+import static org.apache.polaris.core.entity.PolarisEntityType.PRINCIPAL;
+import static org.apache.polaris.core.entity.PolarisEntityType.PRINCIPAL_ROLE;
+import static org.apache.polaris.core.entity.PolarisEntityType.TABLE_LIKE;
+import static 
org.apache.polaris.persistence.nosql.api.index.IndexContainer.newUpdatableIndex;
+import static 
org.apache.polaris.persistence.nosql.api.index.IndexKey.INDEX_KEY_SERIALIZER;
+import static 
org.apache.polaris.persistence.nosql.api.obj.ObjRef.OBJ_REF_SERIALIZER;
+import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
+import static 
org.apache.polaris.persistence.nosql.coretypes.catalog.CatalogStateObj.CATALOG_STATE_REF_NAME_PATTERN;
+import static 
org.apache.polaris.persistence.nosql.coretypes.catalog.CatalogsObj.CATALOGS_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.coretypes.catalog.EntityIdSet.ENTITY_ID_SET_SERIALIZER;
+import static 
org.apache.polaris.persistence.nosql.coretypes.changes.Change.CHANGE_SERIALIZER;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.ImmediateTasksObj.IMMEDIATE_TASKS_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMapping.POLICY_MAPPING_SERIALIZER;
+import static 
org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj.POLICY_MAPPINGS_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.coretypes.refs.References.realmReferenceNames;
+import static 
org.apache.polaris.persistence.nosql.maintenance.impl.MutableMaintenanceConfig.GRACE_TIME;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.fail;
+
+import io.smallrye.common.annotation.Identifier;
+import jakarta.inject.Inject;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.PolarisBaseEntity;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PrincipalEntity;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.bootstrap.RootCredentialsSet;
+import org.apache.polaris.ids.mocks.MutableMonotonicClock;
+import org.apache.polaris.persistence.nosql.api.Persistence;
+import org.apache.polaris.persistence.nosql.api.RealmPersistenceFactory;
+import org.apache.polaris.persistence.nosql.api.cache.CacheBackend;
+import org.apache.polaris.persistence.nosql.api.ref.Reference;
+import org.apache.polaris.persistence.nosql.coretypes.catalog.CatalogStateObj;
+import org.apache.polaris.persistence.nosql.coretypes.catalog.CatalogsObj;
+import org.apache.polaris.persistence.nosql.coretypes.realm.ImmediateTasksObj;
+import org.apache.polaris.persistence.nosql.coretypes.realm.PolicyMappingsObj;
+import org.apache.polaris.persistence.nosql.coretypes.refs.References;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig;
+import 
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunInformation;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunSpec;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceService;
+import 
org.apache.polaris.persistence.nosql.maintenance.impl.MutableMaintenanceConfig;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.jboss.weld.junit5.EnableWeld;
+import org.jboss.weld.junit5.WeldInitiator;
+import org.jboss.weld.junit5.WeldSetup;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(SoftAssertionsExtension.class)
+@EnableWeld
+@SuppressWarnings("CdiInjectionPointsInspection")
+public class TestCatalogMaintenance {
+  @InjectSoftAssertions protected SoftAssertions soft;
+
+  @WeldSetup WeldInitiator weld = WeldInitiator.performDefaultDiscovery();
+
+  String realmId;
+  RealmContext realmContext;
+
+  @Inject MaintenanceService maintenance;
+  @Inject MutableMonotonicClock mutableMonotonicClock;
+
+  @Inject PolarisConfigurationStore configurationStore;
+  @Inject CacheBackend cacheBackend;
+  @Inject RealmPersistenceFactory realmPersistenceFactory;
+
+  @Inject
+  @Identifier("nosql")
+  MetaStoreManagerFactory metaStoreManagerFactory;
+
+  @BeforeEach
+  protected void setup() {
+    // Set the "grace time" to 0 so tests can write refs+objs and get those 
purged
+    MutableMaintenanceConfig.setCurrent(
+        MaintenanceConfig.builder().createdAtGraceTime(GRACE_TIME).build());
+
+    realmId = UUID.randomUUID().toString();
+    realmContext = () -> realmId;
+
+    // tell maintenance to only retain the latest commit
+    MutableCatalogsMaintenanceConfig.setCurrent(
+        CatalogsMaintenanceConfig.BuildableCatalogsMaintenanceConfig.builder()
+            .catalogRolesRetain("false")
+            .catalogsHistoryRetain("false")
+            .catalogPoliciesRetain("false")
+            .catalogStateRetain("false")
+            .grantsRetain("false")
+            .principalRolesRetain("false")
+            .principalsRetain("false")
+            .immediateTasksRetain("false")
+            .build());
+  }
+
+  @Test
+  public void catalogMaintenance() {
+
+    metaStoreManagerFactory.bootstrapRealms(List.of(realmId), 
RootCredentialsSet.fromEnvironment());
+
+    var manager = 
metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
+    var session = metaStoreManagerFactory.getOrCreateSession(realmContext);
+    var callCtx = new PolarisCallContext(realmContext, session, 
configurationStore);
+
+    var persistence =
+        
realmPersistenceFactory.newBuilder().realmId(realmId).skipDecorators().build();
+
+    // Some references are "empty", need to populate those to be able to bump 
the references "back"
+    // to the "real" state below.
+    mandatoryRealmObjsForTestImpl(persistence);
+
+    var initialReferenceHeads = new HashMap<String, Reference>();
+    realmReferenceNames().forEach(n -> initialReferenceHeads.put(n, 
persistence.fetchReference(n)));
+
+    var principalRole = createPrincipalRole(manager, callCtx, persistence);
+    var principal = createPrincipal(manager, callCtx, persistence);
+
+    var catalog = createCatalog(manager, callCtx, persistence);
+    var catalogBase = new PolarisBaseEntity.Builder(catalog).build();
+    var catalogId = catalog.getId();
+
+    // Some references are "empty", need to populate those to be able to bump 
the references "back"
+    // to the "real" state below.
+    mandatoryCatalogObjsForTestImpl(persistence, catalogId);
+
+    References.catalogReferenceNames(catalogId)
+        .forEach(n -> initialReferenceHeads.put(n, 
persistence.fetchReference(n)));
+
+    var catalogRole = createCatalogRole(manager, callCtx, catalog, 
persistence);
+    var namespace = createNamespace(manager, callCtx, catalog, persistence);
+    var table = createTable(manager, callCtx, catalog, namespace, persistence);
+
+    var entities = List.of(principalRole, principal, catalogBase, catalogRole, 
namespace, table);
+
+    checkEntities("sanity", entities);
+
+    // Ensure that "maintenance does not purge objects created before 
references are bumped".
+    // In other words: maintenance runs during a commit operation - those 
objects are protected by
+    // the "grace period".
+
+    // Update the references to the initial state, so the created objects 
become "unreachable" - the
+    // state before the commits' reference bumps.
+    var currentReferenceHeads =
+        Stream.concat(
+                realmReferenceNames().stream(),
+                References.catalogReferenceNames(catalogId).stream())
+            .toList()
+            .stream()
+            .collect(Collectors.toMap(identity(), 
persistence::fetchReference));
+    currentReferenceHeads.forEach(
+        (n, r) -> {
+          var initial = 
initialReferenceHeads.get(r.name()).pointer().orElseThrow();
+          if (!initial.equals(r.pointer().orElseThrow())) {
+            assertThat(persistence.updateReferencePointer(r, 
initial)).describedAs(n).isPresent();
+          }
+        });
+
+    var runInformation =
+        maintenance.performMaintenance(
+            MaintenanceRunSpec.builder()
+                .includeSystemRealm(false)
+                .realmsToProcess(Set.of(realmId))
+                .build());
+    soft.assertThat(runInformation)
+        .describedAs("%s", runInformation)
+        .extracting(
+            MaintenanceRunInformation::success,
+            MaintenanceRunInformation::purgedRealms,
+            ri -> ri.referenceStats().map(s -> s.purged().orElse(-1L)),
+            ri -> ri.objStats().map(s -> s.purged().orElse(-1L)))
+        .containsExactly(
+            true,
+            OptionalInt.of(0),
+            Optional.of(0L),
+            // Within grace-time -> nothing must be purged
+            Optional.of(0L));
+
+    // Revert the references to the "real" state
+    initialReferenceHeads.forEach(
+        (n, r) -> {
+          var real = 
currentReferenceHeads.get(r.name()).pointer().orElseThrow();
+          if (!real.equals(r.pointer().orElseThrow())) {
+            assertThat(persistence.updateReferencePointer(r, 
real)).describedAs(n).isPresent();
+          }
+        });
+
+    checkEntities("real state within grace", entities);
+
+    // Perform a maintenance run _after_ the references have been bumped 
(successful commits).
+
+    mutableMonotonicClock.advanceBoth(GRACE_TIME);
+    runInformation =
+        maintenance.performMaintenance(
+            MaintenanceRunSpec.builder()
+                .includeSystemRealm(false)
+                .realmsToProcess(Set.of(realmId))
+                .build());
+    soft.assertThat(runInformation)
+        .describedAs("%s", runInformation)
+        .extracting(
+            MaintenanceRunInformation::success,
+            MaintenanceRunInformation::purgedRealms,
+            ri -> ri.referenceStats().map(s -> s.purged().orElse(-1L)),
+            ri -> ri.objStats().map(s -> s.purged().orElse(-1L)))
+        .containsExactly(
+            true,
+            OptionalInt.of(0),
+            Optional.of(0L),
+            // 8 stale objects:
+            // - 1 namespace (catalog state)
+            // - 1 table (catalog state)
+            // - 1 grants (realm setup) - including 1 ACLs
+            // - 1 principal
+            // - 1 principal role
+            // - 1 catalog role
+            // - 1 catalog
+            Optional.of(8L));
+
+    checkEntities("real state after grace", entities);
+  }
+
+  private static PolarisBaseEntity createTable(
+      PolarisMetaStoreManager manager,
+      PolarisCallContext callCtx,
+      PolarisBaseEntity catalog,
+      PolarisBaseEntity namespace,
+      Persistence persistence) {
+    var tableResult =
+        manager.createEntityIfNotExists(
+            callCtx,
+            List.of(catalog, namespace),
+            new PolarisEntity.Builder()
+                .setType(TABLE_LIKE)
+                .setSubType(ICEBERG_TABLE)
+                .setName("table1")
+                .setId(persistence.generateId())
+                .setCatalogId(catalog.getId())
+                .setCreateTimestamp(System.currentTimeMillis())
+                .build());
+    return tableResult.getEntity();
+  }
+
+  private static PolarisBaseEntity createNamespace(
+      PolarisMetaStoreManager manager,
+      PolarisCallContext callCtx,
+      PolarisBaseEntity catalog,
+      Persistence persistence) {
+    var namespaceResult =
+        manager.createEntityIfNotExists(
+            callCtx,
+            List.of(catalog),
+            new PolarisEntity.Builder()
+                .setType(NAMESPACE)
+                .setName("ns")
+                .setId(persistence.generateId())
+                .setCatalogId(catalog.getId())
+                .setCreateTimestamp(System.currentTimeMillis())
+                .build());
+    return namespaceResult.getEntity();
+  }
+
+  private static PolarisBaseEntity createCatalogRole(
+      PolarisMetaStoreManager manager,
+      PolarisCallContext callCtx,
+      PolarisBaseEntity catalog,
+      Persistence persistence) {
+    var catalogRoleResult =
+        manager.createEntityIfNotExists(
+            callCtx,
+            List.of(catalog),
+            new PolarisEntity.Builder()
+                .setType(CATALOG_ROLE)
+                .setName("catalog-role")
+                .setId(persistence.generateId())
+                .setCatalogId(catalog.getId())
+                .setCreateTimestamp(System.currentTimeMillis())
+                .build());
+    return catalogRoleResult.getEntity();
+  }
+
+  private static PolarisBaseEntity createCatalog(
+      PolarisMetaStoreManager manager, PolarisCallContext callCtx, Persistence 
persistence) {
+    var catalogResult =
+        manager.createCatalog(
+            callCtx,
+            new PolarisEntity.Builder(
+                    new CatalogEntity.Builder()
+                        .setName("catalog")
+                        .setDefaultBaseLocation("file:///tmp/foo/bar/baz")
+                        .setCatalogType("INTERNAL")
+                        .build())
+                .setId(persistence.generateId())
+                .setCatalogId(0L)
+                .setCreateTimestamp(System.currentTimeMillis())
+                .build(),
+            List.of());
+    return catalogResult.getCatalog();
+  }
+
+  private static PolarisBaseEntity createPrincipal(
+      PolarisMetaStoreManager manager, PolarisCallContext callCtx, Persistence 
persistence) {
+    var principalResult =
+        manager.createPrincipal(
+            callCtx,
+            new PrincipalEntity.Builder()
+                .setType(PRINCIPAL)
+                .setName("principal")
+                .setId(persistence.generateId())
+                .setCatalogId(0L)
+                .setCreateTimestamp(System.currentTimeMillis())
+                .build());
+    return principalResult.getPrincipal();
+  }
+
+  private static PolarisBaseEntity createPrincipalRole(
+      PolarisMetaStoreManager manager, PolarisCallContext callCtx, Persistence 
persistence) {
+    var principalRoleResult =
+        manager.createEntityIfNotExists(
+            callCtx,
+            List.of(),
+            new PolarisEntity.Builder()
+                .setType(PRINCIPAL_ROLE)
+                .setName("principal-role")
+                .setId(persistence.generateId())
+                .setCatalogId(0L)
+                .setCreateTimestamp(System.currentTimeMillis())
+                .build());
+    return principalRoleResult.getEntity();
+  }
+
+  private static void mandatoryCatalogObjsForTestImpl(Persistence persistence, 
long catalogId) {
+    var catalogStateObj =
+        persistence.write(
+            CatalogStateObj.builder()
+                .id(persistence.generateId())
+                .stableIdToName(
+                    newUpdatableIndex(persistence, INDEX_KEY_SERIALIZER)
+                        .toIndexed("", (x, o) -> fail()))
+                .nameToObjRef(
+                    newUpdatableIndex(persistence, OBJ_REF_SERIALIZER)
+                        .toIndexed("", (x, o) -> fail()))
+                .changes(
+                    newUpdatableIndex(persistence, CHANGE_SERIALIZER)
+                        .toIndexed("", (x, o) -> fail()))
+                .locations(
+                    newUpdatableIndex(persistence, ENTITY_ID_SET_SERIALIZER)
+                        .toIndexed("", (x, o) -> fail()))
+                .createdAtMicros(persistence.currentTimeMicros())
+                .seq(1)
+                .tail()
+                .build(),
+            CatalogStateObj.class);
+    persistence.updateReferencePointer(
+        persistence.fetchReference(format(CATALOG_STATE_REF_NAME_PATTERN, 
catalogId)),
+        objRef(catalogStateObj));
+  }
+
+  private static void mandatoryRealmObjsForTestImpl(Persistence persistence) {
+    var catalogsObj =
+        persistence.write(
+            CatalogsObj.builder()
+                .id(persistence.generateId())
+                .stableIdToName(
+                    newUpdatableIndex(persistence, INDEX_KEY_SERIALIZER)
+                        .toIndexed("", (x, o) -> fail()))
+                .nameToObjRef(
+                    newUpdatableIndex(persistence, OBJ_REF_SERIALIZER)
+                        .toIndexed("", (x, o) -> fail()))
+                .createdAtMicros(persistence.currentTimeMicros())
+                .seq(1)
+                .tail()
+                .build(),
+            CatalogsObj.class);
+    persistence.updateReferencePointer(
+        persistence.fetchReference(CATALOGS_REF_NAME), objRef(catalogsObj));
+
+    var immediateTasksObj =
+        persistence.write(
+            ImmediateTasksObj.builder()
+                .id(persistence.generateId())
+                .stableIdToName(
+                    newUpdatableIndex(persistence, INDEX_KEY_SERIALIZER)
+                        .toIndexed("", (x, o) -> fail()))
+                .nameToObjRef(
+                    newUpdatableIndex(persistence, OBJ_REF_SERIALIZER)
+                        .toIndexed("", (x, o) -> fail()))
+                .createdAtMicros(persistence.currentTimeMicros())
+                .seq(1)
+                .tail()
+                .build(),
+            ImmediateTasksObj.class);
+    persistence.updateReferencePointer(
+        persistence.fetchReference(IMMEDIATE_TASKS_REF_NAME), 
objRef(immediateTasksObj));
+
+    var policyMappingsObj =
+        persistence.write(
+            PolicyMappingsObj.builder()
+                .id(persistence.generateId())
+                .policyMappings(
+                    newUpdatableIndex(persistence, POLICY_MAPPING_SERIALIZER)
+                        .toIndexed("", (x, o) -> fail()))
+                .createdAtMicros(persistence.currentTimeMicros())
+                .seq(1)
+                .tail()
+                .build(),
+            PolicyMappingsObj.class);
+    persistence.updateReferencePointer(
+        persistence.fetchReference(POLICY_MAPPINGS_REF_NAME), 
objRef(policyMappingsObj));
+  }
+
+  private void checkEntities(String step, List<PolarisBaseEntity> entities) {
+    // Purge the whole cache in case maintenance purged objects/references 
that should not have
+    // been purged to make the assertions catch those cases.
+    cacheBackend.purge();
+    
soft.assertThat(cacheBackend.estimatedSize()).describedAs(step).isEqualTo(0L);
+
+    var manager = 
metaStoreManagerFactory.getOrCreateMetaStoreManager(realmContext);
+    var session = metaStoreManagerFactory.getOrCreateSession(realmContext);
+    var callCtx = new PolarisCallContext(realmContext, session, 
configurationStore);
+
+    for (var e : entities) {
+      var result =
+          manager.loadResolvedEntityById(callCtx, e.getCatalogId(), e.getId(), 
e.getType());
+      var loadedEntity = result.getEntity();
+      soft.assertThat(loadedEntity)
+          .describedAs("%s: %s", step, result.getReturnStatus())
+          .isEqualTo(e);
+    }
+
+    for (var e : entities) {
+      var result =
+          manager.loadResolvedEntityByName(
+              callCtx, e.getCatalogId(), e.getParentId(), e.getType(), 
e.getName());
+      var loadedEntity = result.getEntity();
+      soft.assertThat(loadedEntity)
+          .describedAs("%s: %s", step, result.getReturnStatus())
+          .isEqualTo(e);
+    }
+  }
+}
diff --git 
a/persistence/nosql/persistence/metastore-maintenance/src/test/resources/logback-test.xml
 
b/persistence/nosql/persistence/metastore-maintenance/src/test/resources/logback-test.xml
new file mode 100644
index 000000000..1de5ba06e
--- /dev/null
+++ 
b/persistence/nosql/persistence/metastore-maintenance/src/test/resources/logback-test.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<configuration debug="false">
+  <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator"/>
+  <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%date{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
+    </encoder>
+  </appender>
+  <root level="DEBUG">
+    <appender-ref ref="console"/>
+  </root>
+  <logger name="org.jboss" level="INFO"/>
+  <logger name="org.apache.polaris" level="DEBUG"/>
+</configuration>
diff --git 
a/persistence/nosql/persistence/metastore-maintenance/src/test/resources/weld.properties
 
b/persistence/nosql/persistence/metastore-maintenance/src/test/resources/weld.properties
new file mode 100644
index 000000000..c26169e0e
--- /dev/null
+++ 
b/persistence/nosql/persistence/metastore-maintenance/src/test/resources/weld.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# See https://bugs.openjdk.org/browse/JDK-8349545
+org.jboss.weld.bootstrap.concurrentDeployment=false
diff --git 
a/persistence/nosql/persistence/metastore-maintenance/src/testFixtures/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CdiProducers.java
 
b/persistence/nosql/persistence/metastore-maintenance/src/testFixtures/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CdiProducers.java
new file mode 100644
index 000000000..88f95915a
--- /dev/null
+++ 
b/persistence/nosql/persistence/metastore-maintenance/src/testFixtures/java/org/apache/polaris/persistence/nosql/metastore/maintenance/CdiProducers.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.metastore.maintenance;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Produces;
+
+@ApplicationScoped
+public class CdiProducers {
+  public static MutableCatalogsMaintenanceConfig config = new 
MutableCatalogsMaintenanceConfig();
+
+  @Produces
+  MutableCatalogsMaintenanceConfig produceMutableCatalogsMaintenanceConfig() {
+    return config;
+  }
+}
diff --git 
a/persistence/nosql/persistence/metastore-maintenance/src/testFixtures/java/org/apache/polaris/persistence/nosql/metastore/maintenance/MutableCatalogsMaintenanceConfig.java
 
b/persistence/nosql/persistence/metastore-maintenance/src/testFixtures/java/org/apache/polaris/persistence/nosql/metastore/maintenance/MutableCatalogsMaintenanceConfig.java
new file mode 100644
index 000000000..50e517152
--- /dev/null
+++ 
b/persistence/nosql/persistence/metastore-maintenance/src/testFixtures/java/org/apache/polaris/persistence/nosql/metastore/maintenance/MutableCatalogsMaintenanceConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.metastore.maintenance;
+
+import java.util.Optional;
+
+public class MutableCatalogsMaintenanceConfig implements 
CatalogsMaintenanceConfig {
+
+  private static CatalogsMaintenanceConfig current =
+      
CatalogsMaintenanceConfig.BuildableCatalogsMaintenanceConfig.builder().build();
+
+  public static void setCurrent(CatalogsMaintenanceConfig config) {
+    current = config;
+  }
+
+  @Override
+  public Optional<String> principalsRetain() {
+    return current.principalsRetain();
+  }
+
+  @Override
+  public Optional<String> principalRolesRetain() {
+    return current.principalRolesRetain();
+  }
+
+  @Override
+  public Optional<String> grantsRetain() {
+    return current.grantsRetain();
+  }
+
+  @Override
+  public Optional<String> immediateTasksRetain() {
+    return current.immediateTasksRetain();
+  }
+
+  @Override
+  public Optional<String> catalogsHistoryRetain() {
+    return current.catalogsHistoryRetain();
+  }
+
+  @Override
+  public Optional<String> catalogRolesRetain() {
+    return current.catalogRolesRetain();
+  }
+
+  @Override
+  public Optional<String> catalogStateRetain() {
+    return current.catalogStateRetain();
+  }
+
+  @Override
+  public Optional<String> catalogPoliciesRetain() {
+    return current.catalogPoliciesRetain();
+  }
+}
diff --git 
a/persistence/nosql/persistence/metastore-maintenance/src/testFixtures/resources/META-INF/beans.xml
 
b/persistence/nosql/persistence/metastore-maintenance/src/testFixtures/resources/META-INF/beans.xml
new file mode 100644
index 000000000..a297f1aa5
--- /dev/null
+++ 
b/persistence/nosql/persistence/metastore-maintenance/src/testFixtures/resources/META-INF/beans.xml
@@ -0,0 +1,24 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<beans xmlns="https://jakarta.ee/xml/ns/jakartaee";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee 
https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd";>
+    <!-- File required by Weld (used for testing), not by Quarkus -->
+</beans>
\ No newline at end of file

Reply via email to