This is an automated email from the ASF dual-hosted git repository.

dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new ac724d3de NoSQL: Add maintenance API, SPI (#3028)
ac724d3de is described below

commit ac724d3de2b8f674fdbcb5214b542456b280efbb
Author: Robert Stupp <[email protected]>
AuthorDate: Wed Nov 12 15:18:37 2025 +0100

    NoSQL: Add maintenance API, SPI (#3028)
    
    Maintenance operations include a bunch of tasks that are regularly executed 
against a backend database.
    
    Types of maintenance operations include:
    * Purging unreferenced objects and references within a catalog
    * Purging whole catalogs that are marked to be purged
    * Purging whole realms that are marked to be purged
    
    Implementation added in a follow-up PR.
---
 bom/build.gradle.kts                               |   4 +
 codestyle/checkstyle.xml                           |   2 +-
 gradle/libs.versions.toml                          |   1 +
 gradle/projects.main.properties                    |   4 +
 .../nosql/persistence/maintenance/README.md        |  22 ++
 .../persistence/maintenance/api/build.gradle.kts   |  45 ++++
 .../nosql/maintenance/api/MaintenanceConfig.java   | 184 +++++++++++++++++
 .../maintenance/api/MaintenanceRunInformation.java | 126 ++++++++++++
 .../nosql/maintenance/api/MaintenanceRunSpec.java  |  53 +++++
 .../nosql/maintenance/api/MaintenanceService.java  |  50 +++++
 .../nosql/maintenance/api/package-info.java        | 110 ++++++++++
 .../maintenance/retain-cel/build.gradle.kts        |  45 ++++
 .../cel/CelReferenceContinuePredicateBench.java    | 124 +++++++++++
 .../cel/CelReferenceContinuePredicate.java         | 146 +++++++++++++
 .../cel/TestCelReferenceContinuePredicate.java     | 119 +++++++++++
 .../persistence/maintenance/spi/build.gradle.kts   |  38 ++++
 .../nosql/maintenance/spi/CountDownPredicate.java  |  45 ++++
 .../maintenance/spi/ObjTypeRetainedIdentifier.java |  58 ++++++
 .../spi/PerRealmRetainedIdentifier.java            |  56 +++++
 .../nosql/maintenance/spi/RetainedCollector.java   | 226 +++++++++++++++++++++
 .../nosql/maintenance/spi/package-info.java        |  24 +++
 21 files changed, 1481 insertions(+), 1 deletion(-)

diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts
index 45267c888..c959cfb76 100644
--- a/bom/build.gradle.kts
+++ b/bom/build.gradle.kts
@@ -62,6 +62,10 @@ dependencies {
     api(project(":polaris-persistence-nosql-inmemory"))
     api(project(":polaris-persistence-nosql-mongodb"))
 
+    api(project(":polaris-persistence-nosql-maintenance-api"))
+    api(project(":polaris-persistence-nosql-maintenance-cel"))
+    api(project(":polaris-persistence-nosql-maintenance-spi"))
+
     api(project(":polaris-config-docs-annotations"))
     api(project(":polaris-config-docs-generator"))
 
diff --git a/codestyle/checkstyle.xml b/codestyle/checkstyle.xml
index d3986dc3e..feda236cd 100644
--- a/codestyle/checkstyle.xml
+++ b/codestyle/checkstyle.xml
@@ -43,7 +43,7 @@
     <!-- Checks for imports                            -->
     <!-- See http://checkstyle.org/config_imports.html -->
     <module name="IllegalImport">
-      <property name="illegalPkgs" value=".*\.shaded\..*, .*\.relocated\..*"/>
+      <property name="illegalPkgs" value=".*\.shaded\..*, 
(?!org\.projectnessie\.cel).*\.relocated\..*"/>
       <property name="regexp" value="true"/>
     </module>
 
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 83438c876..b3e112a78 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -48,6 +48,7 @@ awssdk-bom = { module = "software.amazon.awssdk:bom", version 
= "2.38.2" }
 awaitility = { module = "org.awaitility:awaitility", version = "4.3.0" }
 azuresdk-bom = { module = "com.azure:azure-sdk-bom", version = "1.3.2" }
 caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version = 
"3.2.3" }
+cel-bom = { module = "org.projectnessie.cel:cel-bom", version = "0.5.3" }
 commons-lang3 = { module = "org.apache.commons:commons-lang3", version = 
"3.19.0" }
 commons-text = { module = "org.apache.commons:commons-text", version = 
"1.14.0" }
 errorprone = { module = "com.google.errorprone:error_prone_core", version = 
"2.44.0" }
diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties
index 52f5f72a8..1b50c9ce4 100644
--- a/gradle/projects.main.properties
+++ b/gradle/projects.main.properties
@@ -74,6 +74,10 @@ 
polaris-persistence-nosql-correctness=persistence/nosql/persistence/correctness
 polaris-persistence-nosql-standalone=persistence/nosql/persistence/standalone
 
polaris-persistence-nosql-testextension=persistence/nosql/persistence/testextension
 polaris-persistence-nosql-varint=persistence/nosql/persistence/varint
+# persistence / maintenance
+polaris-persistence-nosql-maintenance-api=persistence/nosql/persistence/maintenance/api
+polaris-persistence-nosql-maintenance-cel=persistence/nosql/persistence/maintenance/retain-cel
+polaris-persistence-nosql-maintenance-spi=persistence/nosql/persistence/maintenance/spi
 # persistence / database specific implementations
 polaris-persistence-nosql-inmemory=persistence/nosql/persistence/db/inmemory
 polaris-persistence-nosql-mongodb=persistence/nosql/persistence/db/mongodb
diff --git a/persistence/nosql/persistence/maintenance/README.md 
b/persistence/nosql/persistence/maintenance/README.md
new file mode 100644
index 000000000..d16031b63
--- /dev/null
+++ b/persistence/nosql/persistence/maintenance/README.md
@@ -0,0 +1,22 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+   http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+Maintenance service,
+see [API package 
javadoc](api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/package-info.java)
+for information. 
\ No newline at end of file
diff --git a/persistence/nosql/persistence/maintenance/api/build.gradle.kts 
b/persistence/nosql/persistence/maintenance/api/build.gradle.kts
new file mode 100644
index 000000000..27b2d2441
--- /dev/null
+++ b/persistence/nosql/persistence/maintenance/api/build.gradle.kts
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("org.kordamp.gradle.jandex")
+  id("polaris-server")
+}
+
+description = "Polaris NoSQL persistence maintenance - service interfaces"
+
+dependencies {
+  implementation(project(":polaris-persistence-nosql-api"))
+  compileOnly(project(":polaris-persistence-nosql-realms-api"))
+  compileOnly(project(":polaris-persistence-nosql-maintenance-spi"))
+
+  compileOnly(project(":polaris-immutables"))
+  annotationProcessor(project(":polaris-immutables", configuration = 
"processor"))
+
+  implementation(libs.guava)
+
+  compileOnly(libs.smallrye.config.core)
+
+  compileOnly(libs.jakarta.annotation.api)
+  compileOnly(libs.jakarta.validation.api)
+
+  compileOnly(platform(libs.jackson.bom))
+  compileOnly("com.fasterxml.jackson.core:jackson-annotations")
+  compileOnly("com.fasterxml.jackson.core:jackson-databind")
+}
diff --git 
a/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceConfig.java
 
b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceConfig.java
new file mode 100644
index 000000000..8fb0a493f
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceConfig.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.api;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import io.smallrye.config.ConfigMapping;
+import io.smallrye.config.WithDefault;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.immutables.value.Value;
+
+/** Maintenance service configuration. */
+@ConfigMapping(prefix = "polaris.persistence.maintenance")
+@PolarisImmutable
+@JsonSerialize(as = ImmutableMaintenanceConfig.class)
+@JsonDeserialize(as = ImmutableMaintenanceConfig.class)
+public interface MaintenanceConfig {
+
+  long DEFAULT_EXPECTED_REFERENCE_COUNT = 100;
+
+  /**
+   * Provides the expected number of references in all realms to retain, 
defaults to {@value
+   * #DEFAULT_EXPECTED_REFERENCE_COUNT}, must be at least {@code 100}. This 
value is used as the
+   * default if no information of a previous maintenance run is present, it is 
also the minimum
+   * number of expected references.
+   */
+  @WithDefault("" + DEFAULT_EXPECTED_REFERENCE_COUNT)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalLong expectedReferenceCount();
+
+  long DEFAULT_EXPECTED_OBJ_COUNT = 100_000;
+
+  /**
+   * Provides the expected number of objects in all realms to retain, defaults 
to {@value
+   * #DEFAULT_EXPECTED_OBJ_COUNT}, must be at least {@code 100000}. This value 
is used as the
+   * default if no information of a previous maintenance run is present, it is 
also the minimum
+   * number of expected objects.
+   */
+  @WithDefault("" + DEFAULT_EXPECTED_OBJ_COUNT)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalLong expectedObjCount();
+
+  double DEFAULT_COUNT_FROM_LAST_RUN_MULTIPLIER = 1.1;
+
+  /**
+   * Maintenance service sizes the bloom-filters used to hold the identified 
references and objects
+   * according to the expression {@code lastRun.numberOfIdentified * 
countFromLastRunMultiplier}.
+   * The default is to add 10% to the number of identified items.
+   */
+  @WithDefault("" + DEFAULT_COUNT_FROM_LAST_RUN_MULTIPLIER)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalDouble countFromLastRunMultiplier();
+
+  double DEFAULT_INITIALIZED_FPP = 0.00001;
+
+  /**
+   * False-positive-probability (FPP) used to initialize the bloom-filters for 
identified references
+   * and objects.
+   */
+  @WithDefault("" + DEFAULT_INITIALIZED_FPP)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalDouble filterInitializedFpp();
+
+  double DEFAULT_MAX_ACCEPTABLE_FPP = 0.00005;
+
+  /**
+   * Expected maximum false-positive-probability (FPP) used to check the 
bloom-filters for
+   * identified references and objects.
+   *
+   * <p>If the FPP of a bloom filter exceeds this value, no individual 
references or objects will be
+   * purged.
+   */
+  @WithDefault("" + DEFAULT_MAX_ACCEPTABLE_FPP)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalDouble maxAcceptableFilterFpp();
+
+  int DEFAULT_RETAINED_RUNS = 50;
+
+  /**
+   * Number of retained {@linkplain MaintenanceRunInformation maintenance run 
objects}, must be at
+   * least {@code 2}.
+   */
+  @WithDefault("" + DEFAULT_RETAINED_RUNS)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalInt retainedRuns();
+
+  String DEFAULT_CREATED_AT_GRACE_TIME_STRING = "PT3H";
+  Duration DEFAULT_CREATED_AT_GRACE_TIME = 
Duration.parse(DEFAULT_CREATED_AT_GRACE_TIME_STRING);
+
+  /**
+   * Objects and references that have been created <em>after</em> a 
maintenance run has started are
+   * never purged. This option defines an additional grace time to when the 
maintenance run has
+   * started.
+   *
+   * <p>This value is a safety net for two reasons:
+   *
+   * <ul>
+   *   <li>Respect the wall-clock drift between Polaris nodes.
+   *   <li>Respect the order of writes in Polaris persistence. Objects are 
written <em>before</em>
+   *       those become reachable via a commit. Commits may take a little time 
(milliseconds, up to
+   *       a few seconds, depending on the system load) to complete. 
Therefore, implementations
+   *       enforce a minimum of 5 minutes.
+   * </ul>
+   */
+  @WithDefault(DEFAULT_CREATED_AT_GRACE_TIME_STRING)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Duration> createdAtGraceTime();
+
+  /**
+   * Optionally limit the number of objects scanned per second. Default is to 
not throttle object
+   * scanning.
+   */
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalInt objectScanRateLimitPerSecond();
+
+  /**
+   * Optionally limit the number of references scanned per second.
+   *
+   * <p>Default is to not throttle reference scanning.
+   */
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalInt referenceScanRateLimitPerSecond();
+
+  int DEFAULT_DELETE_BATCH_SIZE = 10;
+
+  /** Size of the delete-batches when purging objects. */
+  @WithDefault("" + DEFAULT_DELETE_BATCH_SIZE)
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  OptionalInt deleteBatchSize();
+
+  static ImmutableMaintenanceConfig.Builder builder() {
+    return ImmutableMaintenanceConfig.builder();
+  }
+
+  @Value.Check
+  default void check() {
+    expectedReferenceCount()
+        .ifPresent(v -> checkState(v > 0, "expectedReferenceCount must be 
positive"));
+    expectedObjCount().ifPresent(v -> checkState(v > 0, "expectedObjCount must 
be positive"));
+    countFromLastRunMultiplier()
+        .ifPresent(v -> checkState(v > 1d, "countFromLastRunMultiplier must be 
greater than 1.0d"));
+    filterInitializedFpp()
+        .ifPresent(
+            v -> checkState(v > 0d && v <= 1d, "filterInitializedFpp must be > 
0.0d and <= 1.0d"));
+    maxAcceptableFilterFpp()
+        .ifPresent(
+            v ->
+                checkState(v > 0d && v <= 1d, "maxAcceptableFilterFpp must be 
> 0.0d and <= 1.0d"));
+    retainedRuns().ifPresent(v -> checkState(v >= 2, "retainedRuns must 2 or 
greater"));
+    createdAtGraceTime()
+        .ifPresent(v -> checkState(!v.isNegative(), "createdAtGraceTime must 
not be negative"));
+    objectScanRateLimitPerSecond()
+        .ifPresent(v -> checkState(v >= 0, "objectScanRateLimitPerSecond must 
not be negative"));
+    referenceScanRateLimitPerSecond()
+        .ifPresent(v -> checkState(v >= 0, "referenceScanRateLimitPerSecond 
must not be negative"));
+    deleteBatchSize().ifPresent(v -> checkState(v > 0, "deleteBatchSize must 
be positive"));
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceRunInformation.java
 
b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceRunInformation.java
new file mode 100644
index 000000000..834ec8d08
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceRunInformation.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.api;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.persistence.nosql.api.backend.Backend;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import 
org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier;
+import 
org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector;
+import org.immutables.value.Value;
+
+@PolarisImmutable
+@JsonSerialize(as = ImmutableMaintenanceRunInformation.class)
+@JsonDeserialize(as = ImmutableMaintenanceRunInformation.class)
+public interface MaintenanceRunInformation {
+
+  Instant started();
+
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  Optional<Instant> finished();
+
+  @Value.Default
+  default boolean success() {
+    return false;
+  }
+
+  /** Human-readable status message. */
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  Optional<String> statusMessage();
+
+  /** Human-readable detailed information, possibly including technical error 
information. */
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  Optional<String> detailedInformation();
+
+  Optional<MaintenanceStats> referenceStats();
+
+  Optional<MaintenanceStats> objStats();
+
+  Map<String, MaintenanceStats> perRealmReferenceStats();
+
+  Map<String, Map<String, MaintenanceStats>> perRealmPerObjTypeStats();
+
+  /**
+   * Number of realms that were directly purges, if the {@linkplain Backend 
backend} {@linkplain
+   * Backend#supportsRealmDeletion() supports} this.
+   */
+  OptionalInt purgedRealms();
+
+  /** Number of invocations of {@link RetainedCollector#retainObject(ObjRef)}. 
*/
+  OptionalLong identifiedObjs();
+
+  /** Number of invocations of {@link 
RetainedCollector#retainReference(String)}. */
+  OptionalLong identifiedReferences();
+
+  static ImmutableMaintenanceRunInformation.Builder builder() {
+    return ImmutableMaintenanceRunInformation.builder();
+  }
+
+  @PolarisImmutable
+  @JsonSerialize(as = ImmutableMaintenanceStats.class)
+  @JsonDeserialize(as = ImmutableMaintenanceStats.class)
+  interface MaintenanceStats {
+
+    static ImmutableMaintenanceStats.Builder builder() {
+      return ImmutableMaintenanceStats.builder();
+    }
+
+    /**
+     * Number of scanned items.
+     *
+     * <p>If a persisted object has been persisted using multiple parts, each 
part is counted.
+     */
+    OptionalLong scanned();
+
+    /**
+     * Number of scanned items that were retained, because those were 
{@linkplain
+     * RetainedCollector#retainObject(ObjRef) indicated} to be retained by a 
{@linkplain
+     * PerRealmRetainedIdentifier realm identifier} or {@linkplain 
ObjTypeRetainedIdentifier
+     * obj-type identifier}.
+     *
+     * <p>If a persisted object has been persisted using multiple parts, each 
part is counted.
+     */
+    OptionalLong retained();
+
+    /**
+     * Number of items that were written after the {@linkplain
+     * MaintenanceConfig#createdAtGraceTime() calculated grace time}.
+     *
+     * <p>If a persisted object has been persisted using multiple parts, each 
part is counted.
+     */
+    OptionalLong newer();
+
+    /**
+     * Number of scanned items that have been purged.
+     *
+     * <p>If a persisted object has been persisted using multiple parts, each 
part is counted.
+     */
+    OptionalLong purged();
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceRunSpec.java
 
b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceRunSpec.java
new file mode 100644
index 000000000..307f353ff
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceRunSpec.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.api;
+
+import java.util.Set;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.persistence.nosql.api.Realms;
+import org.immutables.value.Value;
+
+/**
+ * Configures a maintenance run.
+ *
+ * <p>Must specify both the realms to purge <em>and</em> the realms to retain. 
The two sets are
+ * distinct to allow certain database specific and implementation detail 
optimizations. Existing
+ * data of realms that are in neither of the sets {@linkplain #realmsToPurge() 
to purge} and
+ * {@linkplain #realmsToProcess() to process} will be ignored, not processed 
at all.
+ *
+ * <p>Reserved realms, realm IDs that start with {@code ::}, except {@value 
Realms#SYSTEM_REALM_ID},
+ * are considered to be "special" and are not processed, and all references 
and objects in those
+ * realms are retained.
+ */
+@PolarisImmutable
+public interface MaintenanceRunSpec {
+  Set<String> realmsToPurge();
+
+  Set<String> realmsToProcess();
+
+  /** Whether to run maintenance on the system realm, defaults to {@code 
true}. */
+  @Value.Default
+  default boolean includeSystemRealm() {
+    return true;
+  }
+
+  static ImmutableMaintenanceRunSpec.Builder builder() {
+    return ImmutableMaintenanceRunSpec.builder();
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceService.java
 
b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceService.java
new file mode 100644
index 000000000..d1b1ebe43
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/MaintenanceService.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.api;
+
+import jakarta.annotation.Nonnull;
+import java.util.List;
+import 
org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus;
+
+public interface MaintenanceService {
+  /**
+   * Generates a maintenance service run-specification containing realms in 
states {@link
+   * RealmStatus#ACTIVE ACTIVE} and {@link RealmStatus#INACTIVE INACTIVE} as 
"to retain" and realms
+   * in state {@link RealmStatus#PURGING PURGING} as "to purge".
+   */
+  @Nonnull
+  MaintenanceRunSpec buildMaintenanceRunSpec();
+
+  /**
+   * Perform maintenance.
+   *
+   * @param maintenanceRunSpec define the mandatory run-specification, see 
{@link
+   *     #buildMaintenanceRunSpec()}
+   * @return information about the maintenance run
+   */
+  @Nonnull
+  MaintenanceRunInformation performMaintenance(@Nonnull MaintenanceRunSpec 
maintenanceRunSpec);
+
+  /**
+   * Retrieve information about recent maintenance runs. The number of 
available elements is
+   * configured via {@link MaintenanceConfig#retainedRuns()}.
+   */
+  @Nonnull
+  List<MaintenanceRunInformation> maintenanceRunLog();
+}
diff --git 
a/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/package-info.java
 
b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/package-info.java
new file mode 100644
index 000000000..7d6088e13
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/api/src/main/java/org/apache/polaris/persistence/nosql/maintenance/api/package-info.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Maintenance operations include a bunch of tasks that are regularly executed 
against a backend
+ * database.
+ *
+ * <p>Types of maintenance operations include:
+ *
+ * <ul>
+ *   <li>Purging unreferenced objects and references within a catalog
+ *   <li>Purging whole catalogs that are marked to be purged
+ *   <li>Purging whole realms that are marked to be purged
+ * </ul>
+ *
+ * <h2>Discussion
+ *
+ * <p>Not all databases offer support to perform "prefix key" deletions, which 
are, for example,
+ * necessary to purge a whole realm. Some databases do support "deleting a 
huge number of rows".
+ * Some have another API for prefix-key deletions, for example, Google's 
BigTable {@code
+ * dropRowRange} on the table-admin-client. Relational databases may require 
different
+ * configurations with respect to isolation level to run those maintenance 
operations in a "better"
+ * way. Some databases do not support such "prefix-key deletions" at all, for 
example, Apache
+ * Cassandra or RocksDb or Amazon's DynamoDb.
+ *
+ * <p>{@link org.apache.polaris.persistence.nosql.api.backend.Backend Backend} 
implementations
+ * therefore expose whether it can leverage "prefix-key deletions" when one or 
more realms are to be
+ * purged. If a {@code Backend} does not support "prefix-key deletions", the 
whole repository has to
+ * be scanned.
+ *
+ * <h2>Purging unreferenced data
+ *
+ * <p>The other maintenance operations like purging a catalog or unreferenced 
objects or references
+ * a two-step approach that works even for large multi-tenant setups:
+ *
+ * <ol>
+ *   <li>Memoize the current timestamp, subtract some amount to account for 
expected wall-clock
+ *       drifts.
+ *   <li>Identify all objects and references that must be retained, memoize 
those in a probabilistic
+ *       data structure (bloom filter). See below.
+ *   <li>Scan the whole database to identify the objects and references that 
were not identified as
+ *       being referenced in the previous step.
+ *   <li>Delete the unreferenced objects and references if, and only if, their 
{@link
+ *       org.apache.polaris.persistence.nosql.api.obj.Obj#createdAtMicros() 
createdAtMicros()}
+ *       timestamp is less than the timestamp memoized in the first step.
+ * </ol>
+ *
+ * <h2>Identifying objects and references
+ *
+ * <p>Implementations of {@link jakarta.enterprise.context.ApplicationScoped 
@ApplicationScoped}
+ * {@link 
org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier
+ * PerRealmRetainedIdentifier} are called to identify the references and 
objects that have to be
+ * retained for a realm.
+ *
+ * <p>Implementations of {@link jakarta.enterprise.context.ApplicationScoped 
@ApplicationScoped}
+ * {@link 
org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier
+ * ObjTypeRetainedIdentifier} are called for each identified object of the 
requested object type.
+ *
+ * <h2>Realm status
+ *
+ * <p>The maintenance service implementation will check the current {@linkplain
+ * org.apache.polaris.persistence.nosql.realms.api.RealmDefinition#status() 
status} of the realm to
+ * retain and to purge, that the status is valid for being retained (valid: 
{@linkplain
+ * 
org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus#ACTIVE
 ACTIVE} and
+ * {@linkplain 
org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus#INACTIVE
+ * INACTIVE}) and being purged (valid: {@linkplain
+ * 
org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus#PURGING
 PURGING}).
+ * Realms that have been asked to be purged and for which no data has been 
encountered will be
+ * state-transitioned to {@linkplain
+ * 
org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus#PURGED
 PURGED}.
+ *
+ * <h2>System realm {@value 
org.apache.polaris.persistence.nosql.api.Realms#SYSTEM_REALM_ID}
+ *
+ * <p>The system realm is maintained like every other realm.
+ *
+ * <h2>Future export use cases (TBD/TBC)
+ *
+ * <p>These can be useful in a hosted and multi-tenant SaaS environment, when 
an export of the data
+ * for a particular realm is requested.
+ *
+ * <ul>
+ *   <li>Export live/referenced objects, filtered by realm. A possible 
implementation would hook
+ *       into the implementation of {@link
+ *       
org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier
+ *       PerRealmRetainedIdentifier} via a delegate over {@link
+ *       org.apache.polaris.persistence.nosql.api.Persistence Persistence}. 
The actual approach and
+ *       implementation is therefore out of the scope of the maintenance 
service.
+ *   <li>Low-level export, filtered by realm. This one is different from the 
one above, as it would
+ *       export references and all object-<em>parts</em>, in contrast to fully 
materialized objects.
+ *       A possible implementation would hook into the scanning-part of the 
maintenance service
+ *       implementation.
+ * </ul>
+ */
+package org.apache.polaris.persistence.nosql.maintenance.api;
diff --git 
a/persistence/nosql/persistence/maintenance/retain-cel/build.gradle.kts 
b/persistence/nosql/persistence/maintenance/retain-cel/build.gradle.kts
new file mode 100644
index 000000000..f40bd31d3
--- /dev/null
+++ b/persistence/nosql/persistence/maintenance/retain-cel/build.gradle.kts
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("org.kordamp.gradle.jandex")
+  alias(libs.plugins.jmh)
+  id("polaris-server")
+}
+
+description = "Polaris NoSQL persistence maintenance - reference retain check 
using CEL"
+
+dependencies {
+  implementation(project(":polaris-persistence-nosql-api"))
+  implementation(platform(libs.cel.bom))
+  implementation("org.projectnessie.cel:cel-standalone")
+  implementation(libs.caffeine)
+
+  compileOnly(libs.jakarta.annotation.api)
+  compileOnly(libs.jakarta.validation.api)
+
+  compileOnly(platform(libs.jackson.bom))
+  compileOnly("com.fasterxml.jackson.core:jackson-annotations")
+
+  testCompileOnly(platform(libs.jackson.bom))
+  testCompileOnly("com.fasterxml.jackson.core:jackson-annotations")
+
+  jmhImplementation(libs.jmh.core)
+  jmhAnnotationProcessor(libs.jmh.generator.annprocess)
+}
diff --git 
a/persistence/nosql/persistence/maintenance/retain-cel/src/jmh/java/org/apache/polaris/maintenance/cel/CelReferenceContinuePredicateBench.java
 
b/persistence/nosql/persistence/maintenance/retain-cel/src/jmh/java/org/apache/polaris/maintenance/cel/CelReferenceContinuePredicateBench.java
new file mode 100644
index 000000000..480130066
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/retain-cel/src/jmh/java/org/apache/polaris/maintenance/cel/CelReferenceContinuePredicateBench.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.maintenance.cel;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.time.Duration;
+import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+@Warmup(iterations = 4, time = 1000, timeUnit = MILLISECONDS)
+@Measurement(iterations = 5, time = 1000, timeUnit = MILLISECONDS)
+@Fork(1)
+@Threads(4)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(NANOSECONDS)
+public class CelReferenceContinuePredicateBench {
+
+  @State(Scope.Benchmark)
+  public static class BenchmarkParam {
+    BaseCommitObj commitObj;
+
+    CelReferenceContinuePredicate<BaseCommitObj> predicateConstantCondition;
+    CelReferenceContinuePredicate<BaseCommitObj> predicateOneCondition;
+    CelReferenceContinuePredicate<BaseCommitObj> predicateTwoConditions;
+
+    @Setup
+    public void init() {
+      commitObj =
+          new BaseCommitObj() {
+            @Override
+            public long seq() {
+              throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public long[] tail() {
+              throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public ObjType type() {
+              throw new UnsupportedOperationException();
+            }
+
+            @Override
+            public long id() {
+              throw new UnsupportedOperationException();
+            }
+
+            @Nullable
+            @Override
+            public String versionToken() {
+              throw new UnsupportedOperationException();
+            }
+
+            @Nonnull
+            @Override
+            public Obj withCreatedAtMicros(long createdAt) {
+              throw new UnsupportedOperationException();
+            }
+
+            @Nonnull
+            @Override
+            public Obj withNumParts(int numParts) {
+              throw new UnsupportedOperationException();
+            }
+          };
+      predicateConstantCondition =
+          new CelReferenceContinuePredicate<>("refName", o -> Duration.ZERO, 
"true");
+      predicateOneCondition =
+          new CelReferenceContinuePredicate<>("refName", o -> Duration.ZERO, 
"ageDays < 100");
+      predicateTwoConditions =
+          new CelReferenceContinuePredicate<>(
+              "refName", o -> Duration.ZERO, "ageDays < 100 || commits < 100");
+    }
+  }
+
+  @Benchmark
+  public boolean constantCondition(BenchmarkParam param) {
+    return param.predicateConstantCondition.test(param.commitObj);
+  }
+
+  @Benchmark
+  public boolean oneCondition(BenchmarkParam param) {
+    return param.predicateOneCondition.test(param.commitObj);
+  }
+
+  @Benchmark
+  public boolean twoConditions(BenchmarkParam param) {
+    return param.predicateTwoConditions.test(param.commitObj);
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/retain-cel/src/main/java/org/apache/polaris/maintenance/cel/CelReferenceContinuePredicate.java
 
b/persistence/nosql/persistence/maintenance/retain-cel/src/main/java/org/apache/polaris/maintenance/cel/CelReferenceContinuePredicate.java
new file mode 100644
index 000000000..cb78536e7
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/retain-cel/src/main/java/org/apache/polaris/maintenance/cel/CelReferenceContinuePredicate.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.maintenance.cel;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import jakarta.annotation.Nonnull;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.polaris.persistence.nosql.api.Persistence;
+import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj;
+import org.projectnessie.cel.checker.Decls;
+import org.projectnessie.cel.relocated.com.google.api.expr.v1alpha1.Decl;
+import org.projectnessie.cel.tools.Script;
+import org.projectnessie.cel.tools.ScriptCreateException;
+import org.projectnessie.cel.tools.ScriptException;
+import org.projectnessie.cel.tools.ScriptHost;
+import org.projectnessie.cel.types.jackson.JacksonRegistry;
+
+/**
+ * Provides a CEL script based "retain reference commit object" predicate used 
for {@code
+ * RetainCollector.refRetain*()} functions.
+ *
+ * <p>The predicate is coded as a <a 
href="https://github.com/google/cel-spec/";>CEL</a> script,
+ * using <a href="https://github.com/projectnessie/cel-java/";>cel-java</a>.
+ *
+ * <p>Micro benchmarks prove that the CEL scripts execute pretty fast, 
definitely fast enough to
+ * justify the flexibility of having scripts in the configuration.
+ *
+ * <p>The scripts have access to the following declared values:
+ *
+ * <ul>
+ *   <li>{@value #VAR_REF} (string) name of the reference
+ *   <li>{@value #VAR_COMMITS} (64-bit int) number of the currently processed 
commit, starting at
+ *       {@code 1}
+ *   <li>{@value #VAR_AGE_DAYS} (64-bit int) age of currently processed commit 
in days
+ *   <li>{@value #VAR_AGE_HOURS} (64-bit int) age of currently processed 
commit in hours
+ *   <li>{@value #VAR_AGE_MINUTES} (64-bit int) age of currently processed 
commit in minutes
+ * </ul>
+ *
+ * <p>Scripts <em>must</em> return a {@code boolean} yielding whether the 
commit shall be retained.
+ * Note that maintenance-service implementations can keep the first 
not-to-be-retained commit.
+ *
+ * <p>Example scripts
+ *
+ * <ul>
+ *   <li>{@code ageDays < 30 || commits <= 10} retains the reference history 
with at least 10
+ *       commits and commits that are younger than 30 days
+ *   <li>{@code true} retains the whole reference history
+ *   <li>{@code false} retains the most recent commit
+ * </ul>
+ *
+ * <p>A static cache retains up to 100 compiled CEL scripts, each up to 24 
hours after its last use.
+ */
+public class CelReferenceContinuePredicate<O extends BaseCommitObj> implements 
Predicate<O> {
+  private static final ScriptHost SCRIPT_HOST =
+      ScriptHost.newBuilder().registry(JacksonRegistry.newRegistry()).build();
+
+  private static final Cache<String, Script> SCRIPT_CACHE =
+      Caffeine.newBuilder()
+          .expireAfterAccess(Duration.ofHours(24))
+          .maximumSize(100)
+          .scheduler(Scheduler.systemScheduler())
+          .build();
+
+  private static final String VAR_REF = "ref";
+  private static final String VAR_COMMITS = "commits";
+  private static final String VAR_AGE_MINUTES = "ageMinutes";
+  private static final String VAR_AGE_HOURS = "ageHours";
+  private static final String VAR_AGE_DAYS = "ageDays";
+
+  private static final List<Decl> DECLS =
+      List.of(
+          Decls.newVar(VAR_REF, Decls.String),
+          // Decls.Int == 64 bit (aka Java long)
+          Decls.newVar(VAR_COMMITS, Decls.Int),
+          Decls.newVar(VAR_AGE_MINUTES, Decls.Int),
+          Decls.newVar(VAR_AGE_HOURS, Decls.Int),
+          Decls.newVar(VAR_AGE_DAYS, Decls.Int));
+
+  static Script createScript(String source) {
+    try {
+      return SCRIPT_HOST.buildScript(source).withDeclarations(DECLS).build();
+    } catch (ScriptCreateException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  static Script getScript(String source) {
+    return SCRIPT_CACHE.get(source, 
CelReferenceContinuePredicate::createScript);
+  }
+
+  private final String refName;
+  private final Function<O, Duration> objAge;
+  private final Script script;
+  private long numCommit;
+
+  public CelReferenceContinuePredicate(
+      @Nonnull String refName, @Nonnull Persistence persistence, @Nonnull 
String script) {
+    this(refName, persistence::objAge, script);
+  }
+
+  public CelReferenceContinuePredicate(
+      @Nonnull String refName, @Nonnull Function<O, Duration> objAge, @Nonnull 
String script) {
+    this.refName = refName;
+    this.objAge = objAge;
+    this.script = getScript(script);
+  }
+
+  @Override
+  public boolean test(O o) {
+    var age = objAge.apply(o);
+    var args =
+        Map.<String, Object>of(
+            VAR_REF, refName,
+            VAR_COMMITS, ++numCommit,
+            VAR_AGE_MINUTES, age.toMinutes(),
+            VAR_AGE_HOURS, age.toHours(),
+            VAR_AGE_DAYS, age.toDays());
+    try {
+      return script.execute(Boolean.class, args);
+    } catch (ScriptException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/retain-cel/src/test/java/org/apache/polaris/maintenance/cel/TestCelReferenceContinuePredicate.java
 
b/persistence/nosql/persistence/maintenance/retain-cel/src/test/java/org/apache/polaris/maintenance/cel/TestCelReferenceContinuePredicate.java
new file mode 100644
index 000000000..3e3086f4d
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/retain-cel/src/test/java/org/apache/polaris/maintenance/cel/TestCelReferenceContinuePredicate.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.maintenance.cel;
+
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Stream;
+import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.projectnessie.cel.tools.ScriptCreateException;
+
+@ExtendWith(SoftAssertionsExtension.class)
+public class TestCelReferenceContinuePredicate {
+  @InjectSoftAssertions protected SoftAssertions soft;
+
+  @Test
+  public void invalidScript() {
+    soft.assertThatRuntimeException()
+        .isThrownBy(() -> 
CelReferenceContinuePredicate.createScript("invalidScript"))
+        .withCauseInstanceOf(ScriptCreateException.class);
+    soft.assertThatRuntimeException()
+        .isThrownBy(() -> 
CelReferenceContinuePredicate.getScript("invalidScript"))
+        .withCauseInstanceOf(ScriptCreateException.class);
+  }
+
+  @Test
+  public void cacheWorks() {
+    var s1 = CelReferenceContinuePredicate.getScript("true");
+    var s2 = CelReferenceContinuePredicate.getScript("true");
+    soft.assertThat(s1).isSameAs(s2);
+    
soft.assertThat(s1).isNotSameAs(CelReferenceContinuePredicate.getScript("false"));
+  }
+
+  @ParameterizedTest
+  @MethodSource
+  public void scripts(String script, boolean[] expected, 
Iterable<BaseCommitObj> commits) {
+    var predicate =
+        new CelReferenceContinuePredicate<>(
+            "testRefName",
+            (Function<BaseCommitObj, Duration>)
+                o -> 
Duration.ofMillis(TimeUnit.MICROSECONDS.toMillis(o.createdAtMicros())),
+            script);
+    var iter = commits.iterator();
+    for (int i = 0; i < expected.length; i++) {
+      var exp = expected[i];
+      soft.assertThat(predicate.test(iter.next()))
+          .describedAs("test at commit %s", i + 1)
+          .isEqualTo(exp);
+    }
+  }
+
+  static Stream<Arguments> scripts() {
+    var dummy = mock(BaseCommitObj.class);
+    return Stream.of(
+        arguments("false", new boolean[] {false, false, false}, List.of(dummy, 
dummy, dummy)),
+        arguments(
+            "ref == 'testRefName'", new boolean[] {true, true, true}, 
List.of(dummy, dummy, dummy)),
+        arguments("commits < 3", new boolean[] {true, true, false}, 
List.of(dummy, dummy, dummy)),
+        arguments(
+            "ageMinutes < 100",
+            new boolean[] {true, true, true, false},
+            List.of(
+                yieldAge(Duration.ofMinutes(0)),
+                yieldAge(Duration.ofMinutes(1)),
+                yieldAge(Duration.ofMinutes(99)),
+                yieldAge(Duration.ofMinutes(100)))),
+        arguments(
+            "ageHours < 100",
+            new boolean[] {true, true, true, false},
+            List.of(
+                yieldAge(Duration.ofHours(0)),
+                yieldAge(Duration.ofHours(1)),
+                yieldAge(Duration.ofHours(99)),
+                yieldAge(Duration.ofHours(100)))),
+        arguments(
+            "ageDays < 100",
+            new boolean[] {true, true, true, false},
+            List.of(
+                yieldAge(Duration.ofDays(0)),
+                yieldAge(Duration.ofDays(1)),
+                yieldAge(Duration.ofDays(99)),
+                yieldAge(Duration.ofDays(100)))));
+  }
+
+  static BaseCommitObj yieldAge(Duration duration) {
+    var m = mock(BaseCommitObj.class);
+    
when(m.createdAtMicros()).thenReturn(TimeUnit.MILLISECONDS.toMicros(duration.toMillis()));
+    return m;
+  }
+}
diff --git a/persistence/nosql/persistence/maintenance/spi/build.gradle.kts 
b/persistence/nosql/persistence/maintenance/spi/build.gradle.kts
new file mode 100644
index 000000000..9e493b58e
--- /dev/null
+++ b/persistence/nosql/persistence/maintenance/spi/build.gradle.kts
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("org.kordamp.gradle.jandex")
+  id("polaris-server")
+}
+
+description =
+  "Polaris NoSQL persistence maintenance - SPI to implement identifiers of 
references and objects to retain"
+
+dependencies {
+  implementation(project(":polaris-persistence-nosql-api"))
+  compileOnly(project(":polaris-persistence-nosql-maintenance-cel"))
+
+  compileOnly(libs.jakarta.annotation.api)
+  compileOnly(libs.jakarta.validation.api)
+  compileOnly(libs.jakarta.enterprise.cdi.api)
+
+  compileOnly(platform(libs.jackson.bom))
+  compileOnly("com.fasterxml.jackson.core:jackson-annotations")
+}
diff --git 
a/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/CountDownPredicate.java
 
b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/CountDownPredicate.java
new file mode 100644
index 000000000..f32a792ac
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/CountDownPredicate.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.spi;
+
+import java.util.function.Predicate;
+
+/**
+ * Predicate that yields {@code true} for the number of {@link #test(Object)} 
invocations given to
+ * the constructor.
+ */
+public final class CountDownPredicate<T> implements Predicate<T> {
+  private int remaining;
+
+  public CountDownPredicate(int remaining) {
+    if (remaining <= 0) {
+      throw new IllegalArgumentException("remaining must be > 0");
+    }
+    this.remaining = remaining;
+  }
+
+  @Override
+  public boolean test(T t) {
+    if (remaining <= 0) {
+      return false;
+    }
+    remaining--;
+    return true;
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/ObjTypeRetainedIdentifier.java
 
b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/ObjTypeRetainedIdentifier.java
new file mode 100644
index 000000000..3cc6484fc
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/ObjTypeRetainedIdentifier.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.spi;
+
+import jakarta.annotation.Nonnull;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+import org.apache.polaris.persistence.nosql.api.ref.Reference;
+
+/**
+ * Implementations of this interface are called for objects that have been 
identified to be
+ * <em>retained</em> either by a {@link PerRealmRetainedIdentifier} or another 
{@link
+ * ObjTypeRetainedIdentifier}.
+ *
+ * <p>Polaris extensions and plugins that persist non-standard {@linkplain 
Reference references} or
+ * {@linkplain Obj objects} must provide an implementation of this interface 
to ensure that the
+ * required references and objects are not purged.
+ *
+ * <p>Implementation must be annotated as {@link
+ * jakarta.enterprise.context.ApplicationScoped @ApplicationScoped} for CDI 
usage.
+ */
+public interface ObjTypeRetainedIdentifier {
+  /** Human-readable name. */
+  String name();
+
+  /** The object type that the implementation handles. */
+  @Nonnull
+  ObjType handledObjType();
+
+  /**
+   * Called for every scanned object with the ID {@code objRef} having the 
object type yielded by
+   * {@link #handledObjType()}.
+   *
+   * <p>Any exception thrown from this function aborts the whole maintenance 
run. Exceptions thrown
+   * from functionality called by the implementation must be properly handled.
+   *
+   * @param collector instance that collects the objects and references to 
retain
+   * @param objRef ID of the object that has been scanned
+   */
+  void identifyRelatedObj(@Nonnull RetainedCollector collector, @Nonnull 
ObjRef objRef);
+}
diff --git 
a/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/PerRealmRetainedIdentifier.java
 
b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/PerRealmRetainedIdentifier.java
new file mode 100644
index 000000000..7b56564d8
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/PerRealmRetainedIdentifier.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.spi;
+
+import jakarta.annotation.Nonnull;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.ref.Reference;
+
+/**
+ * Implementations of this interface are called by the maintenance service for 
every realm to
+ * retain.
+ *
+ * <p>Implementation must be annotated as {@link
+ * jakarta.enterprise.context.ApplicationScoped @ApplicationScoped} for 
CDI-usage.
+ */
+public interface PerRealmRetainedIdentifier {
+  /** Human-readable name. */
+  String name();
+
+  /**
+   * Called to identify "live" references and objects for a realm.
+   *
+   * <p>The given {@linkplain RetainedCollector collector} must be invoked for 
every {@linkplain
+   * Reference reference} and {@linkplain Obj object} to retain. The 
maintenance service is allowed
+   * to purge references and objects that were not passed to the {@linkplain 
RetainedCollector
+   * collector's} {@code retain*()} functions.
+   *
+   * <p>Any exception thrown from this function aborts the whole maintenance 
run. Exceptions thrown
+   * from functionality called by the implementation must be properly handled.
+   *
+   * <p>The purpose of the {@code boolean} return value is meant as a safety 
net in case to not
+   * accidentally purge a realm.
+   *
+   * @param collector consumer of "live" references and objects
+   * @return {@code true} if this function was able to handle the realm, 
{@code false} if the
+   *     implementation did not process the realm or wants to defer the 
decision to another
+   *     implementation.
+   */
+  boolean identifyRetained(@Nonnull RetainedCollector collector);
+}
diff --git 
a/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/RetainedCollector.java
 
b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/RetainedCollector.java
new file mode 100644
index 000000000..19eaabcf8
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/RetainedCollector.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.spi;
+
+import static org.apache.polaris.persistence.nosql.api.Realms.SYSTEM_REALM_ID;
+import static 
org.apache.polaris.persistence.nosql.api.obj.ObjRef.OBJ_REF_SERIALIZER;
+
+import jakarta.annotation.Nonnull;
+import java.util.OptionalLong;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import org.apache.polaris.maintenance.cel.CelReferenceContinuePredicate;
+import org.apache.polaris.persistence.nosql.api.Persistence;
+import org.apache.polaris.persistence.nosql.api.commit.Commits;
+import org.apache.polaris.persistence.nosql.api.index.IndexContainer;
+import org.apache.polaris.persistence.nosql.api.index.IndexStripe;
+import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.ref.Reference;
+
+public interface RetainedCollector {
+  /** ID of the realm being processed. */
+  @Nonnull
+  String realm();
+
+  default boolean isSystemRealm() {
+    return SYSTEM_REALM_ID.equals(realm());
+  }
+
+  /**
+   * {@link Persistence Persistence} configured for the current {@linkplain 
#realm() realm}.
+   *
+   * <p>References and objects that are read or written via this {@link 
Persistence} are
+   * automatically retained.
+   *
+   * <p>The returned {@link Persistence Persistence} bypasses the cache to 
avoid polluting the
+   * production cache with accesses from the maintenance service.
+   *
+   * <p>If the reference name(s) and {@linkplain ObjRef object IDs} are known 
in advance, it is more
+   * efficient to just call the {@link #retainReference(String)}/{@link 
#retainObject(ObjRef)}
+   * functions, because those will not access the backend database.
+   */
+  @Nonnull
+  Persistence realmPersistence();
+
+  /**
+   * Instruct the maintenance service to retain the reference with the given 
name.
+   *
+   * <p>References that are fetched via {@link #realmPersistence()} are 
automatically marked to be
+   * retained.
+   */
+  void retainReference(@Nonnull String name);
+
+  /**
+   * Instruct the maintenance service to retain the reference with the given 
object ID.
+   *
+   * <p>Objects that are fetched via {@link #realmPersistence()} are 
automatically marked to be
+   * retained.
+   */
+  void retainObject(@Nonnull ObjRef objRef);
+
+  default <V> void indexRetain(IndexContainer<V> indexContainer) {
+    
indexContainer.stripes().stream().map(IndexStripe::segment).forEach(this::retainObject);
+  }
+
+  /**
+   * Same as {@link #refRetain(String, Class, Predicate, Consumer, 
ProgressListener)}, without a
+   * {@link ProgressListener}.
+   */
+  default <O extends BaseCommitObj> void refRetain(
+      String ref, Class<O> clazz, Predicate<O> continuePredicate, Consumer<O> 
retainedObjConsumer) {
+    refRetain(ref, clazz, continuePredicate, retainedObjConsumer, new 
ProgressListener<>() {});
+  }
+
+  interface ProgressListener<O extends BaseCommitObj> {
+    default void onCommit(O obj, long commit) {}
+
+    default void onIndexEntry(long inCommit, long total) {}
+
+    default void cut() {}
+
+    default void finished() {}
+  }
+
+  /**
+   * Functionality to identify the objects in a {@link Reference} to retain by 
{@linkplain
+   * Commits#commitLog(String, OptionalLong, Class) walking} the commit log.
+   *
+   * <p>For flexibility, consider using {@link CelReferenceContinuePredicate}.
+   *
+   * @param ref reference name, automatically marked as to-be-retained
+   * @param clazz type of the {@linkplain Reference#pointer() referenced 
objects}
+   * @param continuePredicate predicate to test whether to continue processing 
the reference
+   * @param retainedObjConsumer called for every retained object
+   * @param <O> type of the {@linkplain Reference#pointer() referenced objects}
+   */
+  default <O extends BaseCommitObj> void refRetain(
+      String ref,
+      Class<O> clazz,
+      Predicate<O> continuePredicate,
+      Consumer<O> retainedObjConsumer,
+      ProgressListener<O> progressListener) {
+    var persistence = realmPersistence();
+    var commits = persistence.commits();
+    var numCommits = 0L;
+    for (var iter = commits.commitLog(ref, OptionalLong.empty(), clazz); 
iter.hasNext(); ) {
+      var obj = iter.next();
+      retainedObjConsumer.accept(obj);
+
+      progressListener.onCommit(obj, ++numCommits);
+
+      // WARNING! The "stop" predicate must happen AFTER all referenced 
objects have been retained,
+      // doing the test before the loop over the above index would lead to the 
inconsistent state!
+      if (!continuePredicate.test(obj)) {
+        progressListener.cut();
+        break;
+      }
+    }
+    progressListener.finished();
+  }
+
+  /**
+   * Same as {@link #refRetainIndexToSingleObj(String, Class, Predicate, 
Function, ProgressListener,
+   * Consumer)}, but without a {@link ProgressListener}.
+   */
+  default <O extends BaseCommitObj> void refRetainIndexToSingleObj(
+      String ref,
+      Class<O> clazz,
+      Predicate<O> continuePredicate,
+      Function<O, IndexContainer<ObjRef>> indexToObjIdFromRetainedObj,
+      Consumer<O> retainedObjConsumer) {
+    refRetainIndexToSingleObj(
+        ref,
+        clazz,
+        continuePredicate,
+        indexToObjIdFromRetainedObj,
+        new ProgressListener<>() {
+          @Override
+          public void onCommit(O obj, long commit) {
+            retainedObjConsumer.accept(obj);
+          }
+        },
+        x -> {});
+  }
+
+  default <O extends BaseCommitObj> void refRetainIndexToSingleObj(
+      String ref,
+      Class<O> clazz,
+      Predicate<O> continuePredicate,
+      Function<O, IndexContainer<ObjRef>> indexToObjIdFromRetainedObj) {
+    refRetainIndexToSingleObj(ref, clazz, continuePredicate, 
indexToObjIdFromRetainedObj, x -> {});
+  }
+
+  /**
+   * Similar to {@link #refRetain(String, Class, Predicate, Consumer)}, with 
convenience to iterate
+   * over an {@link IndexContainer} having {@link ObjRef} index-element values 
to mark those as
+   * to-be-retained.
+   *
+   * <p>For flexibility, consider using {@link CelReferenceContinuePredicate}.
+   *
+   * @param ref reference name
+   * @param clazz type of the {@linkplain Reference#pointer() referenced 
objects}
+   * @param continuePredicate predicate to test whether to continue processing 
the reference
+   * @param indexToObjIdFromRetainedObj function to extract the {@link 
IndexContainer} from objects
+   * @param <O> type of the {@linkplain Reference#pointer() referenced objects}
+   */
+  default <O extends BaseCommitObj> void refRetainIndexToSingleObj(
+      String ref,
+      Class<O> clazz,
+      Predicate<O> continuePredicate,
+      Function<O, IndexContainer<ObjRef>> indexToObjIdFromRetainedObj,
+      ProgressListener<O> progressListener,
+      Consumer<ObjRef> indexedObjRefConsumer) {
+    var total = new AtomicLong();
+    refRetain(
+        ref,
+        clazz,
+        continuePredicate,
+        obj -> {
+          var elem = 0L;
+          var t = total.get();
+
+          // TODO we can save a lot of time when there is a (long) history to 
retain to skip
+          //  inspecting already seen index segments (as a performance 
optimization.), but that
+          //  requires some changes to the index APIs.
+
+          for (var entry :
+              indexToObjIdFromRetainedObj
+                  .apply(obj)
+                  .indexForRead(realmPersistence(), OBJ_REF_SERIALIZER)) {
+            ObjRef indexedObjRef = entry.getValue();
+            retainObject(indexedObjRef);
+            // ^ is for persistence.fetch(principalEntry.getValue(), 
PrincipalObj.class);
+            ++elem;
+            progressListener.onIndexEntry(elem, t + elem);
+
+            indexedObjRefConsumer.accept(indexedObjRef);
+          }
+          total.addAndGet(elem);
+        },
+        progressListener);
+  }
+
+  /*
+
+     Consumer<O> retainedObjConsumer,
+  */
+}
diff --git 
a/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/package-info.java
 
b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/package-info.java
new file mode 100644
index 000000000..d7f6c4af5
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/spi/src/main/java/org/apache/polaris/persistence/nosql/maintenance/spi/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Interfaces needed to provide implementations to identify "live" references 
and objects for
+ * maintenance.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.spi;

Reply via email to