This is an automated email from the ASF dual-hosted git repository.

snazy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new df8442296 NoSQL: Add maintenance implementation (#3077)
df8442296 is described below

commit df84422960d2ee61231c6870bcc3d805db3305a1
Author: Robert Stupp <[email protected]>
AuthorDate: Thu Nov 20 16:10:35 2025 +0100

    NoSQL: Add maintenance implementation (#3077)
---
 bom/build.gradle.kts                               |   1 +
 gradle/projects.main.properties                    |   1 +
 .../persistence/maintenance/impl/build.gradle.kts  |  83 +++
 .../impl/AbstractScanItemStatsCollector.java       | 113 ++++
 .../nosql/maintenance/impl/AllRetained.java        | 105 ++++
 .../nosql/maintenance/impl/MaintenanceRunObj.java  |  67 +++
 .../nosql/maintenance/impl/MaintenanceRunsObj.java |  65 +++
 .../maintenance/impl/MaintenanceServiceImpl.java   | 617 +++++++++++++++++++++
 .../MaintenanceServiceRealmRetainedIdentifier.java |  57 ++
 .../nosql/maintenance/impl/RateLimit.java          |  53 ++
 .../maintenance/impl/RetainedCollectorImpl.java    | 363 ++++++++++++
 .../nosql/maintenance/impl/ScanHandler.java        | 201 +++++++
 .../nosql/maintenance/impl/ScanItemCallback.java   |  26 +
 .../nosql/maintenance/impl/ScanItemOutcome.java    |  34 ++
 .../nosql/maintenance/impl/package-info.java       |  28 +
 .../impl/src/main/resources/META-INF/beans.xml     |  24 +
 ...pache.polaris.persistence.nosql.api.obj.ObjType |  21 +
 .../nosql/maintenance/impl/TestMaintenance.java    | 393 +++++++++++++
 .../impl/src/test/resources/weld.properties        |  21 +
 .../impl/MaintenanceConfigurationProducer.java     |  33 ++
 .../maintenance/impl/MutableMaintenanceConfig.java | 102 ++++
 .../persistence/nosql/maintenance/impl/ObjOne.java |  53 ++
 .../persistence/nosql/maintenance/impl/ObjTwo.java |  53 ++
 .../nosql/maintenance/impl/ObjTypeIdentOne.java    |  51 ++
 .../nosql/maintenance/impl/ObjTypeIdentTwo.java    |  50 ++
 .../nosql/maintenance/impl/RealmIdentOne.java      |  40 ++
 .../nosql/maintenance/impl/RealmIdentTwo.java      |  40 ++
 .../nosql/maintenance/impl/package-info.java       |  20 +
 .../src/testFixtures/resources/META-INF/beans.xml  |  24 +
 ...pache.polaris.persistence.nosql.api.obj.ObjType |  21 +
 30 files changed, 2760 insertions(+)

diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts
index ae4ff4378..83faf336b 100644
--- a/bom/build.gradle.kts
+++ b/bom/build.gradle.kts
@@ -67,6 +67,7 @@ dependencies {
     api(project(":polaris-persistence-nosql-mongodb"))
 
     api(project(":polaris-persistence-nosql-maintenance-api"))
+    api(project(":polaris-persistence-nosql-maintenance-impl"))
     api(project(":polaris-persistence-nosql-maintenance-cel"))
     api(project(":polaris-persistence-nosql-maintenance-spi"))
 
diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties
index 244e647cd..d6ecdb830 100644
--- a/gradle/projects.main.properties
+++ b/gradle/projects.main.properties
@@ -80,6 +80,7 @@ 
polaris-persistence-nosql-testextension=persistence/nosql/persistence/testextens
 polaris-persistence-nosql-varint=persistence/nosql/persistence/varint
 # persistence / maintenance
 
polaris-persistence-nosql-maintenance-api=persistence/nosql/persistence/maintenance/api
+polaris-persistence-nosql-maintenance-impl=persistence/nosql/persistence/maintenance/impl
 
polaris-persistence-nosql-maintenance-cel=persistence/nosql/persistence/maintenance/retain-cel
 
polaris-persistence-nosql-maintenance-spi=persistence/nosql/persistence/maintenance/spi
 # persistence / database specific implementations
diff --git a/persistence/nosql/persistence/maintenance/impl/build.gradle.kts 
b/persistence/nosql/persistence/maintenance/impl/build.gradle.kts
new file mode 100644
index 000000000..058a0483d
--- /dev/null
+++ b/persistence/nosql/persistence/maintenance/impl/build.gradle.kts
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+plugins {
+  id("org.kordamp.gradle.jandex")
+  id("polaris-server")
+}
+
+description = "Polaris NoSQL persistence maintenance - service implementation"
+
+dependencies {
+  implementation(project(":polaris-persistence-nosql-api"))
+  implementation(project(":polaris-persistence-nosql-maintenance-api"))
+  implementation(project(":polaris-persistence-nosql-maintenance-spi"))
+  implementation(project(":polaris-persistence-nosql-realms-api"))
+  implementation(project(":polaris-idgen-api"))
+  runtimeOnly(project(":polaris-persistence-nosql-realms-impl"))
+  runtimeOnly(project(":polaris-persistence-nosql-realms-store-nosql"))
+
+  implementation(libs.guava)
+  implementation(libs.slf4j.api)
+
+  compileOnly(platform(libs.jackson.bom))
+  compileOnly("com.fasterxml.jackson.core:jackson-annotations")
+  compileOnly("com.fasterxml.jackson.core:jackson-databind")
+
+  compileOnly(libs.jakarta.annotation.api)
+  compileOnly(libs.jakarta.validation.api)
+  compileOnly(libs.jakarta.inject.api)
+  compileOnly(libs.jakarta.enterprise.cdi.api)
+
+  compileOnly(project(":polaris-immutables"))
+  annotationProcessor(project(":polaris-immutables", configuration = 
"processor"))
+
+  testFixturesApi(project(":polaris-persistence-nosql-api"))
+  testFixturesApi(project(":polaris-persistence-nosql-maintenance-api"))
+  testFixturesApi(project(":polaris-persistence-nosql-maintenance-spi"))
+  testFixturesApi(project(":polaris-persistence-nosql-testextension"))
+
+  testFixturesCompileOnly(project(":polaris-immutables"))
+  testFixturesAnnotationProcessor(project(":polaris-immutables", configuration 
= "processor"))
+
+  testFixturesCompileOnly(platform(libs.jackson.bom))
+  testFixturesCompileOnly("com.fasterxml.jackson.core:jackson-annotations")
+  testFixturesCompileOnly("com.fasterxml.jackson.core:jackson-databind")
+
+  testFixturesImplementation(libs.jakarta.annotation.api)
+  testFixturesImplementation(libs.jakarta.validation.api)
+  testFixturesCompileOnly(libs.jakarta.enterprise.cdi.api)
+
+  testCompileOnly(platform(libs.jackson.bom))
+  testCompileOnly("com.fasterxml.jackson.core:jackson-annotations")
+  testCompileOnly("com.fasterxml.jackson.core:jackson-databind")
+
+  testRuntimeOnly(libs.logback.classic)
+
+  testImplementation(project(":polaris-idgen-mocks"))
+  testRuntimeOnly(testFixtures(project(":polaris-persistence-nosql-cdi-weld")))
+  testImplementation(libs.weld.se.core)
+  testImplementation(libs.weld.junit5)
+  testRuntimeOnly(libs.smallrye.jandex)
+
+  testRuntimeOnly(project(":polaris-persistence-nosql-realms-impl"))
+  testRuntimeOnly(project(":polaris-persistence-nosql-realms-store-nosql"))
+  testRuntimeOnly(project(":polaris-persistence-nosql-inmemory"))
+  
testImplementation(testFixtures(project(":polaris-persistence-nosql-inmemory")))
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AbstractScanItemStatsCollector.java
 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AbstractScanItemStatsCollector.java
new file mode 100644
index 000000000..5fde8273b
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AbstractScanItemStatsCollector.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import jakarta.annotation.Nonnull;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import 
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunInformation.MaintenanceStats;
+
+abstract class AbstractScanItemStatsCollector<I> implements 
ScanItemCallback<I> {
+  final StatsHolder stats = new StatsHolder();
+
+  /** Collect maintenance-run stats for objects per realm. */
+  static final class ScanRefStatsCollector extends 
AbstractScanItemStatsCollector<String> {
+    final Map<String, StatsHolder> perRealm = new HashMap<>();
+
+    /** Handles the maintenance-run outcome for a reference in a realm. */
+    @Override
+    public void itemOutcome(
+        @Nonnull String realm, @Nonnull String ref, @Nonnull ScanItemOutcome 
outcome) {
+      stats.add(outcome);
+      perRealm.computeIfAbsent(realm, realmId -> new 
StatsHolder()).add(outcome);
+    }
+
+    /** Retrieve maintenance-run reference stats per realm. */
+    Map<String, ? extends MaintenanceStats> toRealmObjTypeStatsMap() {
+      return perRealm.entrySet().stream()
+          .collect(Collectors.toMap(Map.Entry::getKey, r -> 
r.getValue().toMaintenanceStats()));
+    }
+  }
+
+  /**
+   * Collect maintenance-run stats for objects per realm and {@linkplain
+   * org.apache.polaris.persistence.nosql.api.obj.ObjType object type}.
+   */
+  static final class ScanObjStatsCollector extends 
AbstractScanItemStatsCollector<ObjRef> {
+    final Map<String, Map<String, StatsHolder>> perRealmAndObjType = new 
HashMap<>();
+
+    /** Handles the maintenance-run outcome for an object in a realm. */
+    @Override
+    public void itemOutcome(
+        @Nonnull String realm, @Nonnull ObjRef id, @Nonnull ScanItemOutcome 
outcome) {
+      stats.add(outcome);
+      perRealmAndObjType
+          .computeIfAbsent(realm, realmId -> new HashMap<>())
+          .computeIfAbsent(id.type(), objType -> new StatsHolder())
+          .add(outcome);
+    }
+
+    /**
+     * Retrieve maintenance-run reference stats per realm and {@linkplain
+     * org.apache.polaris.persistence.nosql.api.obj.ObjType object type}.
+     */
+    Map<String, ? extends Map<String, MaintenanceStats>> 
toRealmObjTypeStatsMap() {
+      return perRealmAndObjType.entrySet().stream()
+          .collect(
+              Collectors.toMap(
+                  Map.Entry::getKey,
+                  r ->
+                      r.getValue().entrySet().stream()
+                          .collect(
+                              Collectors.toMap(
+                                  Map.Entry::getKey, ot -> 
ot.getValue().toMaintenanceStats()))));
+    }
+  }
+
+  /** Maintenance stats holder. */
+  static final class StatsHolder {
+    long scanned;
+    long newer;
+    long retained;
+    long purged;
+
+    /** Consume the outcome for a reference or object-type. */
+    void add(ScanItemOutcome outcome) {
+      scanned++;
+      switch (outcome) {
+        case REALM_PURGE, PURGED -> purged++;
+        case TOO_NEW_RETAINED -> newer++;
+        case RETAINED, UNHANDLED_RETAINED -> retained++;
+        default -> throw new IllegalStateException("Unknown outcome " + 
outcome);
+      }
+    }
+
+    /** Produce the serializable-stats container. */
+    MaintenanceStats toMaintenanceStats() {
+      return MaintenanceStats.builder()
+          .scanned(scanned)
+          .newer(newer)
+          .retained(retained)
+          .purged(purged)
+          .build();
+    }
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AllRetained.java
 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AllRetained.java
new file mode 100644
index 000000000..f49cf06cb
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/AllRetained.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import static java.util.Map.entry;
+
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.PrimitiveSink;
+import jakarta.annotation.Nonnull;
+import java.util.Map;
+import java.util.function.Predicate;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import 
org.apache.polaris.persistence.nosql.maintenance.impl.ScanHandler.RetainCheck;
+import org.jspecify.annotations.NonNull;
+
+/**
+ * Collects reference names and objects to retain.
+ *
+ * <p>The implementation uses bloom-filters to limit the heap usage when a 
huge number of
+ * references/objects is being used.
+ */
+@SuppressWarnings("UnstableApiUsage")
+final class AllRetained {
+
+  /**
+   * Some "salt" to make the bloom filters non-deterministic, in case there 
are false-positives, to
+   * reduce the number of false positives over time.
+   */
+  private final int salt;
+
+  // @NonNull is the jspecify variant, which allows type-usage.
+  // Jakarta's @Nonnull does not allow type-usage.
+  private final BloomFilter<Map.@NonNull Entry<String, String>> refsFilter;
+  private final BloomFilter<Map.@NonNull Entry<String, Long>> objsFilter;
+  private long refAdds;
+  private long objAdds;
+
+  AllRetained(long expectedReferenceCount, long expectedObjCount, double fpp, 
int salt) {
+    this.salt = salt;
+    this.refsFilter = BloomFilter.create(this::refFunnel, 
expectedReferenceCount, fpp);
+    this.objsFilter = BloomFilter.create(this::objFunnel, expectedObjCount, 
fpp);
+  }
+
+  private void refFunnel(Map.Entry<String, String> realmRef, @Nonnull 
PrimitiveSink primitiveSink) {
+    primitiveSink.putInt(salt);
+    primitiveSink.putUnencodedChars(realmRef.getKey());
+    primitiveSink.putUnencodedChars(realmRef.getValue());
+  }
+
+  private void objFunnel(Map.Entry<String, Long> realmObj, @Nonnull 
PrimitiveSink primitiveSink) {
+    primitiveSink.putInt(salt);
+    primitiveSink.putUnencodedChars(realmObj.getKey());
+    var id = realmObj.getValue();
+    primitiveSink.putLong(id);
+  }
+
+  void addRetainedRef(String realm, String ref) {
+    refsFilter.put(entry(realm, ref));
+    refAdds++;
+  }
+
+  void addRetainedObj(String realm, long id) {
+    objsFilter.put(entry(realm, id));
+    objAdds++;
+  }
+
+  /** The number of {@link #addRetainedRef(String, String)} invocations. */
+  long refAdds() {
+    return refAdds;
+  }
+
+  /** The number of {@link #addRetainedObj(String, long)} invocations. */
+  long objAdds() {
+    return objAdds;
+  }
+
+  boolean withinExpectedFpp(double expectedFpp) {
+    return refsFilter.expectedFpp() < expectedFpp && objsFilter.expectedFpp() 
< expectedFpp;
+  }
+
+  RetainCheck<String> referenceRetainCheck() {
+    return (realm, ref) -> refsFilter.mightContain(entry(realm, ref));
+  }
+
+  RetainCheck<ObjRef> objRetainCheck(Predicate<String> objTypeIdPredicate) {
+    return (realm, id) ->
+        objTypeIdPredicate.test(id.type()) || 
objsFilter.mightContain(entry(realm, id.id()));
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunObj.java
 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunObj.java
new file mode 100644
index 000000000..6ce50e348
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunObj.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import java.util.function.LongSupplier;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.persistence.nosql.api.obj.AbstractObjType;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+import 
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunInformation;
+
+/**
+ * Holds information about one maintenance run.
+ *
+ * <p>This object is <em>eventually overwritten</em> with the final result of 
the maintenance run.
+ */
+@PolarisImmutable
+@JsonSerialize(as = ImmutableMaintenanceRunObj.class)
+@JsonDeserialize(as = ImmutableMaintenanceRunObj.class)
+public interface MaintenanceRunObj extends Obj {
+
+  ObjType TYPE = new MaintenanceRunObjType();
+
+  @Override
+  default ObjType type() {
+    return TYPE;
+  }
+
+  MaintenanceRunInformation runInformation();
+
+  static ImmutableMaintenanceRunObj.Builder builder() {
+    return ImmutableMaintenanceRunObj.builder();
+  }
+
+  final class MaintenanceRunObjType extends AbstractObjType<MaintenanceRunObj> 
{
+    public MaintenanceRunObjType() {
+      super("mtr", "Maintenance Run", MaintenanceRunObj.class);
+    }
+
+    @Override
+    public long cachedObjectExpiresAtMicros(Obj obj, LongSupplier clockMicros) 
{
+      var mo = (MaintenanceRunObj) obj;
+      if (mo.runInformation().finished().isPresent()) {
+        return CACHE_UNLIMITED;
+      }
+      return NOT_CACHED;
+    }
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunsObj.java
 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunsObj.java
new file mode 100644
index 000000000..ca476351d
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceRunsObj.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.persistence.nosql.api.obj.AbstractObjType;
+import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+
+@PolarisImmutable
+@JsonSerialize(as = ImmutableMaintenanceRunsObj.class)
+@JsonDeserialize(as = ImmutableMaintenanceRunsObj.class)
+public interface MaintenanceRunsObj extends BaseCommitObj {
+
+  String MAINTENANCE_RUNS_REF_NAME = "maintenance-runs";
+
+  ObjType TYPE = new MaintenanceRunsObjType();
+
+  static ImmutableMaintenanceRunsObj.Builder builder() {
+    return ImmutableMaintenanceRunsObj.builder();
+  }
+
+  /**
+   * The ID of the object holding the maintenance run information.
+   *
+   * <p>The {@linkplain MaintenanceRunObj#runInformation() maintenance run 
information} is
+   * <em>not</em> included in this object, because {@link MaintenanceRunObj} 
is initially written as
+   * "currently running" and then updated with the final state of the 
maintenance run. Updating the
+   * {@link MaintenanceRunObj} is not great but okay, but updating a {@link 
BaseCommitObj} is an
+   * absolute no-go.
+   */
+  ObjRef maintenanceRunId();
+
+  @Override
+  default ObjType type() {
+    return TYPE;
+  }
+
+  final class MaintenanceRunsObjType extends 
AbstractObjType<MaintenanceRunsObj> {
+    public MaintenanceRunsObjType() {
+      super("mtrs", "Maintenance Runs", MaintenanceRunsObj.class);
+    }
+  }
+
+  interface Builder extends BaseCommitObj.Builder<MaintenanceRunsObj, Builder> 
{}
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceImpl.java
 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceImpl.java
new file mode 100644
index 000000000..897156b18
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceImpl.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.String.format;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.polaris.persistence.nosql.api.Realms.SYSTEM_REALM_ID;
+import static 
org.apache.polaris.persistence.nosql.api.Realms.SYSTEM_REALM_PREFIX;
+import static 
org.apache.polaris.persistence.nosql.api.backend.PersistId.persistId;
+import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
+import static 
org.apache.polaris.persistence.nosql.api.obj.ObjTypes.objTypeById;
+import static 
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_COUNT_FROM_LAST_RUN_MULTIPLIER;
+import static 
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_DELETE_BATCH_SIZE;
+import static 
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_INITIALIZED_FPP;
+import static 
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_MAX_ACCEPTABLE_FPP;
+import static 
org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceRunsObj.MAINTENANCE_RUNS_REF_NAME;
+import static 
org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus.ACTIVE;
+import static 
org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus.INACTIVE;
+import static 
org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus.PURGED;
+import static 
org.apache.polaris.persistence.nosql.realms.api.RealmDefinition.RealmStatus.PURGING;
+
+import com.google.common.collect.Streams;
+import com.google.common.math.LongMath;
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.PostConstruct;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Instance;
+import jakarta.inject.Inject;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.math.RoundingMode;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.persistence.nosql.api.Persistence;
+import org.apache.polaris.persistence.nosql.api.RealmPersistenceFactory;
+import org.apache.polaris.persistence.nosql.api.SystemPersistence;
+import org.apache.polaris.persistence.nosql.api.backend.Backend;
+import org.apache.polaris.persistence.nosql.api.commit.Committer;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.obj.ObjTypes;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig;
+import 
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunInformation;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunSpec;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceService;
+import 
org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier;
+import 
org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.realms.api.RealmDefinition;
+import 
org.apache.polaris.persistence.nosql.realms.api.RealmExpectedStateMismatchException;
+import org.apache.polaris.persistence.nosql.realms.api.RealmManagement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retrieve an instance of this class by adding the {@code
+ * polaris-persistence-nosql-maintenance-impl} artifact to the runtime class 
path and then use CDI
+ * via {@code @Inject MaintenanceService} to access it.
+ */
+@ApplicationScoped
+class MaintenanceServiceImpl implements MaintenanceService {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MaintenanceServiceImpl.class);
+
+  static final int MIN_GRACE_TIME_MINUTES = 5;
+
+  private static final long MIN_GRACE_TIME_MICROS =
+      TimeUnit.MINUTES.toMicros(MIN_GRACE_TIME_MINUTES);
+
+  private final Backend backend;
+  private final Persistence systemPersistence;
+  private final Committer<MaintenanceRunsObj, MaintenanceRunObj> committer;
+  private final RealmPersistenceFactory realmPersistenceFactory;
+  private final RealmManagement realmManagement;
+  private final List<PerRealmRetainedIdentifier> perRealmRetainedIdentifiers;
+  private final Map<String, List<ObjTypeRetainedIdentifier>> 
objTypeRetainedIdentifiers;
+  private final MaintenanceConfig maintenanceConfig;
+  private final MonotonicClock monotonicClock;
+
+  @SuppressWarnings("CdiInjectionPointsInspection")
+  @Inject
+  MaintenanceServiceImpl(
+      Backend backend,
+      @SystemPersistence Persistence systemPersistence,
+      RealmPersistenceFactory realmPersistenceFactory,
+      RealmManagement realmManagement,
+      Instance<PerRealmRetainedIdentifier> realmRetainedIdentifiers,
+      Instance<ObjTypeRetainedIdentifier> objTypeRetainedIdentifiers,
+      MaintenanceConfig maintenanceConfig,
+      MonotonicClock monotonicClock) {
+    checkArgument(
+        SYSTEM_REALM_ID.equals(systemPersistence.realmId()),
+        "Realms management must happen in the %s realm",
+        SYSTEM_REALM_ID);
+
+    this.backend = backend;
+    this.systemPersistence = systemPersistence;
+    this.realmManagement = realmManagement;
+    this.realmPersistenceFactory = realmPersistenceFactory;
+    this.maintenanceConfig = maintenanceConfig;
+    this.monotonicClock = monotonicClock;
+
+    this.perRealmRetainedIdentifiers = 
realmRetainedIdentifiers.stream().toList();
+    this.objTypeRetainedIdentifiers =
+        objTypeRetainedIdentifiers.stream()
+            .collect(Collectors.groupingBy(oti -> oti.handledObjType().id()));
+    this.committer =
+        systemPersistence.createCommitter(
+            MAINTENANCE_RUNS_REF_NAME, MaintenanceRunsObj.class, 
MaintenanceRunObj.class);
+
+    maintenanceServiceReport();
+  }
+
+  @PostConstruct
+  void init() {
+    // Do this in a @PostConstruct method as it involves I/O, which isn't a 
good thing to do in a
+    // constructor, especially in CDI
+    systemPersistence.createReferenceSilent(MAINTENANCE_RUNS_REF_NAME);
+  }
+
+  @Override
+  @Nonnull
+  public List<MaintenanceRunInformation> maintenanceRunLog() {
+    var runIds =
+        Streams.stream(
+                systemPersistence
+                    .commits()
+                    .commitLog(
+                        MAINTENANCE_RUNS_REF_NAME, OptionalLong.empty(), 
MaintenanceRunsObj.class))
+            .filter(Objects::nonNull)
+            
.limit(maintenanceConfig.retainedRuns().orElse(MaintenanceConfig.DEFAULT_RETAINED_RUNS))
+            .map(MaintenanceRunsObj::maintenanceRunId)
+            .toArray(ObjRef[]::new);
+    return Stream.of(systemPersistence.fetchMany(MaintenanceRunObj.class, 
runIds))
+        .filter(Objects::nonNull)
+        .map(MaintenanceRunObj::runInformation)
+        .toList();
+  }
+
+  @Nonnull
+  @Override
+  public MaintenanceRunSpec buildMaintenanceRunSpec() {
+    try (var realms = realmManagement.list()) {
+      var specBuilder = MaintenanceRunSpec.builder();
+      realms.forEach(
+          realm -> {
+            switch (realm.status()) {
+              case CREATED, INITIALIZING, LOADING, PURGED -> {
+                // Don't handle these states, those are either final or known 
to contain
+                // inconsistent data
+              }
+              case PURGING -> specBuilder.addRealmsToPurge(realm.id());
+              case ACTIVE, INACTIVE -> 
specBuilder.addRealmsToProces(realm.id());
+              default ->
+                  throw new IllegalStateException(
+                      "Unexpected realm status " + realm.status() + " for 
realm " + realm.id());
+            }
+          });
+      return specBuilder.build();
+    }
+  }
+
+  @Override
+  @Nonnull
+  public MaintenanceRunInformation performMaintenance(
+      @Nonnull MaintenanceRunSpec maintenanceRunSpec) {
+    LOGGER.info(
+        "Triggering maintenance run with {} realms to purge and {} realms to 
process",
+        maintenanceRunSpec.realmsToPurge().size(),
+        maintenanceRunSpec.realmsToProcess().size());
+
+    checkArgument(
+        maintenanceRunSpec.realmsToPurge().stream()
+                .noneMatch(maintenanceRunSpec.realmsToProcess()::contains)
+            && maintenanceRunSpec.realmsToProcess().stream()
+                .noneMatch(maintenanceRunSpec.realmsToPurge()::contains),
+        "No realm ID must be included in both the set of realms to process and 
the set of realms to purge");
+    checkArgument(
+        Stream.concat(
+                maintenanceRunSpec.realmsToPurge().stream(),
+                maintenanceRunSpec.realmsToProcess().stream())
+            .noneMatch(id -> id.startsWith(SYSTEM_REALM_PREFIX)),
+        "System realm IDs must not be present in the maintenance run 
specification");
+
+    var config = maintenanceConfig;
+    checkConfig(config);
+
+    // TODO follow-up: some safeguard that checks the run-log for an 
unfinished run, outside of this
+    //  function!
+
+    var allRetained = constructAllRetained(config);
+
+    var runObj = initMaintenanceRunObj();
+    var runInfo = 
MaintenanceRunInformation.builder().from(runObj.runInformation());
+
+    var maxCreatedAtMicros = calcMaxCreatedAtMicros(config);
+
+    var description = new StringWriter();
+    var descriptionWriter = new PrintWriter(description);
+
+    var info = (MaintenanceRunInformation) null;
+    try {
+      try {
+        var realmsToProcess = processRealms(maintenanceRunSpec, allRetained, 
descriptionWriter);
+        var realmsToPurge = purgeRealms(maintenanceRunSpec, descriptionWriter);
+
+        if (maintenanceRunSpec.includeSystemRealm()) {
+          realmsToProcess.add(SYSTEM_REALM_ID);
+          identifyAgainstRealm(SYSTEM_REALM_ID, allRetained);
+        }
+
+        if (!maintenanceRunSpec.realmsToPurge().isEmpty() && 
backend.supportsRealmDeletion()) {
+          LOGGER.info(
+              "Purging realms {} directly against the backend database...",
+              String.join(", ", maintenanceRunSpec.realmsToPurge()));
+          backend.deleteRealms(maintenanceRunSpec.realmsToPurge());
+          runInfo.purgedRealms(maintenanceRunSpec.realmsToPurge().size());
+        } else {
+          runInfo.purgedRealms(0);
+        }
+
+        var seenRealmsToPurge = new HashSet<String>();
+        var expectFpp = 
config.maxAcceptableFilterFpp().orElse(DEFAULT_MAX_ACCEPTABLE_FPP);
+
+        var refStats = new 
AbstractScanItemStatsCollector.ScanRefStatsCollector();
+        var objStats = new 
AbstractScanItemStatsCollector.ScanObjStatsCollector();
+
+        // Ensures that objects with unknown obj-types do not get purged.
+        // The assumption here is that if the obj-type is not known, there's 
also no
+        // ObjTypeRetainedIdentifier/RealmRetainedIdentifier, which could 
handle these object types.
+        // As a follow-up, it might be appropriate to add an advanced 
configuration option to define
+        // the object types that shall be purged.
+        // Even if the obj-type is unknown, to clean up after an 
extension/plugin that used those
+        // object-types is no longer being used.
+        var nonGenericObjTypeIds = ObjTypes.nonGenericObjTypes().keySet();
+        var objTypeIdPredicate =
+            (Predicate<String>) objTypeId -> 
!nonGenericObjTypeIds.contains(objTypeId);
+
+        var canDelete = allRetained.withinExpectedFpp(expectFpp);
+        try (var refHandler =
+                new ScanHandler<>(
+                    "reference",
+                    config.referenceScanRateLimitPerSecond(),
+                    maxCreatedAtMicros,
+                    realmsToProcess,
+                    realmsToPurge,
+                    seenRealmsToPurge::add,
+                    allRetained.referenceRetainCheck(),
+                    config.deleteBatchSize().orElse(DEFAULT_DELETE_BATCH_SIZE),
+                    realmRefs -> {
+                      if (canDelete) {
+                        backend.batchDeleteRefs(realmRefs);
+                      }
+                    },
+                    refStats);
+            var objHandler =
+                new ScanHandler<>(
+                    "object",
+                    config.objectScanRateLimitPerSecond(),
+                    maxCreatedAtMicros,
+                    realmsToProcess,
+                    realmsToPurge,
+                    seenRealmsToPurge::add,
+                    allRetained.objRetainCheck(objTypeIdPredicate),
+                    config.deleteBatchSize().orElse(DEFAULT_DELETE_BATCH_SIZE),
+                    realmObjs -> {
+                      if (canDelete) {
+                        backend.batchDeleteObjs(
+                            realmObjs.entrySet().stream()
+                                .collect(
+                                    Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        e ->
+                                            e.getValue().stream()
+                                                .map(oid -> 
persistId(oid.id(), oid.numParts()))
+                                                
.collect(Collectors.toSet()))));
+                      }
+                    },
+                    objStats)) {
+
+          LOGGER.info("Start scanning backend database, too-new: {} ...", 
maxCreatedAtMicros);
+          backend.scanBackend(
+              
refHandler.asReferenceScanCallback(monotonicClock::currentTimeMillis),
+              objHandler.asObjScanCallback(monotonicClock::currentTimeMillis));
+
+          LOGGER.info("Finished scanning backend database");
+
+          runInfo
+              .referenceStats(refStats.stats.toMaintenanceStats())
+              .objStats(objStats.stats.toMaintenanceStats())
+              .perRealmPerObjTypeStats(objStats.toRealmObjTypeStatsMap())
+              .perRealmReferenceStats(refStats.toRealmObjTypeStatsMap())
+              .identifiedObjs(allRetained.objAdds())
+              .identifiedReferences(allRetained.refAdds());
+
+          if (!canDelete) {
+            var warn =
+                "Maintenance run finished but did not purge all unreferenced 
objects, "
+                    + "because the probabilistic filter was not properly 
sized. "
+                    + "The next maintenance run will be sized according to 
current run.";
+            if (!seenRealmsToPurge.isEmpty()) {
+              warn +=
+                  format("\nRealms %s NOT marked as purged.", String.join(", 
", seenRealmsToPurge));
+            }
+            descriptionWriter.println(warn);
+            LOGGER.warn(warn);
+          }
+        }
+
+        if (canDelete) {
+          updateRealmsAsPurged(maintenanceRunSpec, seenRealmsToPurge);
+        }
+
+        runInfo.success(true);
+
+        LOGGER.info("Maintenance run completed successfully");
+      } catch (Exception e) {
+        LOGGER.info("Maintenance run failed", e);
+
+        runInfo.success(false).statusMessage("Maintenance run did not finish 
successfully.");
+
+        // Add stack trace as detailed information
+        descriptionWriter.printf("FAILURE:%n");
+        e.printStackTrace(descriptionWriter);
+      } finally {
+        descriptionWriter.flush();
+        runInfo
+            .finished(monotonicClock.currentInstant())
+            .detailedInformation(description.toString());
+        info = runInfo.build();
+      }
+    } finally {
+      LOGGER.info("Persisting maintenance result {}", info);
+      systemPersistence.write(
+          
MaintenanceRunObj.builder().from(runObj).runInformation(info).build(),
+          MaintenanceRunObj.class);
+    }
+
+    return info;
+  }
+
+  private void updateRealmsAsPurged(
+      MaintenanceRunSpec maintenanceRunSpec, HashSet<String> 
seenRealmsToPurge) {
+    // Update the realm status of the realms that were specified to be purged 
as `PURGED` if no
+    // data for those realms has been seen.
+    maintenanceRunSpec.realmsToPurge().stream()
+        .filter(r -> !seenRealmsToPurge.contains(r))
+        .map(realmManagement::get)
+        .filter(Optional::isPresent)
+        .map(Optional::get)
+        .forEach(
+            purgedRealm -> {
+              try {
+                realmManagement.update(
+                    purgedRealm,
+                    
RealmDefinition.builder().from(purgedRealm).status(PURGED).build());
+              } catch (RealmExpectedStateMismatchException e) {
+                // ignore, do the state transition during the next maintenance 
run
+              }
+            });
+  }
+
+  private void maintenanceServiceReport() {
+    LOGGER.info("Using {} realm retained identifiers", 
perRealmRetainedIdentifiers.size());
+    perRealmRetainedIdentifiers.forEach(
+        realmRetainedIdentifier ->
+            LOGGER.info("Realm retained identifier: {}", 
realmRetainedIdentifier.name()));
+    LOGGER.info(
+        "Using {} object type identifiers:",
+        
objTypeRetainedIdentifiers.values().stream().mapToInt(List::size).sum());
+    objTypeRetainedIdentifiers.forEach(
+        (type, idents) -> {
+          LOGGER.info(
+              "Using {} identifiers for object type '{}' {}",
+              idents.size(),
+              type,
+              objTypeById(type).name());
+          idents.forEach(
+              objTypeRetainedIdentifier ->
+                  LOGGER.info(
+                      "Object type '{}' identifier: {}", type, 
objTypeRetainedIdentifier.name()));
+        });
+  }
+
+  private Set<String> processRealms(
+      MaintenanceRunSpec maintenanceRunSpec,
+      AllRetained allRetained,
+      PrintWriter descriptionWriter) {
+    var realmsToProcess = new HashSet<String>();
+    for (var realmId : maintenanceRunSpec.realmsToProcess()) {
+      var currentRealmStatus =
+          
realmManagement.get(realmId).map(RealmDefinition::status).orElse(ACTIVE);
+      if ((currentRealmStatus == ACTIVE || currentRealmStatus == INACTIVE)
+          && identifyAgainstRealm(realmId, allRetained)) {
+        realmsToProcess.add(realmId);
+      }
+    }
+    maintenanceRunSpec.realmsToProcess().stream()
+        .filter(r -> !realmsToProcess.contains(r))
+        .forEach(
+            r -> {
+              var msg =
+                  format(
+                      "No realm retained identifier was able to handle the 
realm '%s' or the realm is not in status ACTIVE or INACTIVE, no references or 
objects will be purged from this realm.",
+                      r);
+              descriptionWriter.println(msg);
+              LOGGER.warn(msg);
+            });
+    return realmsToProcess;
+  }
+
+  private Set<String> purgeRealms(
+      MaintenanceRunSpec maintenanceRunSpec, PrintWriter descriptionWriter) {
+    var realmsToPurge = new HashSet<String>();
+    for (var realmId : maintenanceRunSpec.realmsToPurge()) {
+      var currentRealmStatus =
+          
realmManagement.get(realmId).map(RealmDefinition::status).orElse(PURGED);
+      if (currentRealmStatus == PURGING || currentRealmStatus == PURGED) {
+        realmsToPurge.add(realmId);
+      }
+    }
+    maintenanceRunSpec.realmsToPurge().stream()
+        .filter(r -> !realmsToPurge.contains(r))
+        .forEach(
+            r -> {
+              var msg =
+                  format(
+                      "The realm '%s' is not in state PURGING, will therefore 
not be purged.", r);
+              descriptionWriter.println(msg);
+              LOGGER.warn(msg);
+            });
+    return realmsToPurge;
+  }
+
+  private long calcMaxCreatedAtMicros(MaintenanceConfig effectiveConfig) {
+    var now = monotonicClock.currentTimeMicros();
+    var grace =
+        effectiveConfig
+            .createdAtGraceTime()
+            .map(
+                d -> {
+                  var micros = SECONDS.toMicros(d.toSeconds());
+                  micros += TimeUnit.NANOSECONDS.toMicros(d.toNanosPart());
+                  return Math.max(micros, MIN_GRACE_TIME_MICROS);
+                })
+            .orElse(MIN_GRACE_TIME_MICROS);
+    return now - grace;
+  }
+
+  private AllRetained constructAllRetained(MaintenanceConfig effectiveConfig) {
+    var expectedReferenceCount =
+        effectiveConfig
+            .expectedReferenceCount()
+            .orElse(MaintenanceConfig.DEFAULT_EXPECTED_REFERENCE_COUNT);
+    var expectedObjCount =
+        
effectiveConfig.expectedObjCount().orElse(MaintenanceConfig.DEFAULT_EXPECTED_OBJ_COUNT);
+
+    for (var lastRunIter =
+            Streams.stream(
+                    systemPersistence
+                        .commits()
+                        .commitLog(
+                            MAINTENANCE_RUNS_REF_NAME,
+                            OptionalLong.empty(),
+                            MaintenanceRunsObj.class))
+                .filter(Objects::nonNull)
+                .map(r -> systemPersistence.fetch(r.maintenanceRunId(), 
MaintenanceRunObj.class))
+                .filter(Objects::nonNull)
+                .map(MaintenanceRunObj::runInformation)
+                .iterator();
+        lastRunIter.hasNext(); ) {
+      var ri = lastRunIter.next();
+      var refs =
+          Math.max(
+              ri.referenceStats().map(st -> 
st.scanned().orElse(0L)).orElse(0L),
+              ri.identifiedReferences().orElse(0L));
+      var objs =
+          Math.max(
+              ri.objStats().map(st -> st.scanned().orElse(0L)).orElse(0L),
+              ri.identifiedObjs().orElse(0L));
+      if (refs == 0L || objs == 0L) {
+        continue;
+      }
+      if (refs > expectedReferenceCount) {
+        // Add 10% to account for newly created references
+        expectedReferenceCount =
+            (long)
+                (refs
+                    * effectiveConfig
+                        .countFromLastRunMultiplier()
+                        .orElse(DEFAULT_COUNT_FROM_LAST_RUN_MULTIPLIER));
+      }
+      if (objs > expectedObjCount) {
+        // Add 10% to account for newly created objects
+        expectedObjCount =
+            (long)
+                (objs
+                    * effectiveConfig
+                        .countFromLastRunMultiplier()
+                        .orElse(DEFAULT_COUNT_FROM_LAST_RUN_MULTIPLIER));
+      }
+    }
+
+    expectedReferenceCount = Math.max(expectedReferenceCount, 1_000);
+    expectedObjCount = Math.max(expectedObjCount, 100_000);
+    var configFpp = 
effectiveConfig.filterInitializedFpp().orElse(DEFAULT_INITIALIZED_FPP);
+    LOGGER.info(
+        "Sized retained collector for {} references and {} objects with an fpp 
of {}, approximate bloom filter heap sizes: {} and {} bytes",
+        expectedReferenceCount,
+        expectedObjCount,
+        configFpp,
+        bloomFilterBytes(expectedReferenceCount, configFpp),
+        bloomFilterBytes(expectedObjCount, configFpp));
+
+    return new AllRetained(
+        expectedReferenceCount, expectedObjCount, configFpp, 
ThreadLocalRandom.current().nextInt());
+  }
+
+  private static final double LOG2_SQUARED = Math.log(2) * Math.log(2);
+
+  static long bloomFilterBytes(long elements, double fpp) {
+    var bits = (long) (-elements * Math.log(fpp) / LOG2_SQUARED);
+    return LongMath.divide(bits, 64, RoundingMode.CEILING);
+  }
+
+  private boolean identifyAgainstRealm(String realmId, AllRetained 
allRetained) {
+    LOGGER.info("Identifying referenced data in realm '{}'", realmId);
+
+    var pers = realmPersistenceFactory.newBuilder().realmId(realmId).build();
+    var collector = new RetainedCollectorImpl(pers, allRetained, 
objTypeRetainedIdentifiers);
+
+    boolean any = false;
+    for (var realmRetainedIdentifier : perRealmRetainedIdentifiers) {
+      LOGGER.info(
+          "Running maintenance for realm '{}' via '{}'", realmId, 
realmRetainedIdentifier.name());
+      var handled = realmRetainedIdentifier.identifyRetained(collector);
+      LOGGER.info(
+          "Realm identifier '{}' {} {}",
+          realmRetainedIdentifier.name(),
+          handled ? "handled" : "did not handle",
+          realmId);
+      any |= handled;
+    }
+    return any;
+  }
+
+  private MaintenanceRunObj initMaintenanceRunObj() {
+    return committer
+        .commitRuntimeException(
+            (state, refObjSupplier) -> {
+              var refObj = refObjSupplier.get();
+              var res = MaintenanceRunsObj.builder();
+
+              var ro =
+                  MaintenanceRunObj.builder()
+                      .id(systemPersistence.generateId())
+                      .runInformation(
+                          MaintenanceRunInformation.builder()
+                              .started(monotonicClock.currentInstant())
+                              .build())
+                      .build();
+              res.maintenanceRunId(objRef(ro));
+
+              state.writeIfNew("ro", ro);
+
+              return state.commitResult(ro, res, refObj);
+            })
+        .orElseThrow();
+  }
+
+  private static void checkConfig(MaintenanceConfig config) {
+    config
+        .retainedRuns()
+        .ifPresent(v -> checkArgument(v > 1, "Number of maintenance runs must 
be at least 2"));
+    config
+        .expectedReferenceCount()
+        .ifPresent(
+            v ->
+                checkArgument(
+                    v >= MaintenanceConfig.DEFAULT_EXPECTED_REFERENCE_COUNT,
+                    "Expected reference count runs must be greater than or 
equal to %s",
+                    MaintenanceConfig.DEFAULT_EXPECTED_REFERENCE_COUNT));
+    config
+        .expectedObjCount()
+        .ifPresent(
+            v ->
+                checkArgument(
+                    v >= MaintenanceConfig.DEFAULT_EXPECTED_OBJ_COUNT,
+                    "Expected object count runs must be greater than or equal 
to %s",
+                    MaintenanceConfig.DEFAULT_EXPECTED_OBJ_COUNT));
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceRealmRetainedIdentifier.java
 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceRealmRetainedIdentifier.java
new file mode 100644
index 000000000..568e0ed86
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceServiceRealmRetainedIdentifier.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import static 
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig.DEFAULT_RETAINED_RUNS;
+import static 
org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceRunsObj.MAINTENANCE_RUNS_REF_NAME;
+
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig;
+import org.apache.polaris.persistence.nosql.maintenance.spi.CountDownPredicate;
+import 
org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector;
+
+/** Retained-identifier for the maintenance service's own reference and 
objects. */
+@SuppressWarnings("CdiInjectionPointsInspection")
+@ApplicationScoped
+class MaintenanceServiceRealmRetainedIdentifier implements 
PerRealmRetainedIdentifier {
+  @Inject MaintenanceConfig maintenanceConfig;
+
+  @Override
+  public String name() {
+    return "Maintenance service";
+  }
+
+  @Override
+  public boolean identifyRetained(@Nonnull RetainedCollector collector) {
+    if (!collector.isSystemRealm()) {
+      return false;
+    }
+
+    collector.refRetain(
+        MAINTENANCE_RUNS_REF_NAME,
+        MaintenanceRunsObj.class,
+        new 
CountDownPredicate<>(maintenanceConfig.retainedRuns().orElse(DEFAULT_RETAINED_RUNS)),
+        maintenance -> collector.retainObject(maintenance.maintenanceRunId()));
+
+    return true;
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RateLimit.java
 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RateLimit.java
new file mode 100644
index 000000000..bd525f6b7
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RateLimit.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+interface RateLimit {
+  void acquire();
+
+  @SuppressWarnings("UnstableApiUsage")
+  static RateLimit create(int ratePerSecond) {
+    if (ratePerSecond <= 0 || ratePerSecond == Integer.MAX_VALUE) {
+      return new RateLimit() {
+        @Override
+        public void acquire() {}
+
+        @Override
+        public String toString() {
+          return "unlimited";
+        }
+      };
+    }
+    return new RateLimit() {
+      final RateLimiter limiter = RateLimiter.create(ratePerSecond);
+
+      @Override
+      public void acquire() {
+        limiter.acquire();
+      }
+
+      @Override
+      public String toString() {
+        return "up to " + ratePerSecond;
+      }
+    };
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RetainedCollectorImpl.java
 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RetainedCollectorImpl.java
new file mode 100644
index 000000000..6590de77f
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/RetainedCollectorImpl.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Collections.emptyIterator;
+import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
+
+import com.google.common.collect.AbstractIterator;
+import jakarta.annotation.Nonnull;
+import jakarta.annotation.Nullable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.function.Supplier;
+import org.apache.polaris.ids.api.IdGenerator;
+import org.apache.polaris.ids.api.MonotonicClock;
+import org.apache.polaris.persistence.nosql.api.Persistence;
+import org.apache.polaris.persistence.nosql.api.PersistenceParams;
+import org.apache.polaris.persistence.nosql.api.commit.Commits;
+import org.apache.polaris.persistence.nosql.api.commit.Committer;
+import 
org.apache.polaris.persistence.nosql.api.exceptions.ReferenceAlreadyExistsException;
+import 
org.apache.polaris.persistence.nosql.api.exceptions.ReferenceNotFoundException;
+import org.apache.polaris.persistence.nosql.api.index.Index;
+import org.apache.polaris.persistence.nosql.api.index.IndexContainer;
+import org.apache.polaris.persistence.nosql.api.index.IndexValueSerializer;
+import org.apache.polaris.persistence.nosql.api.index.UpdatableIndex;
+import org.apache.polaris.persistence.nosql.api.obj.BaseCommitObj;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+import org.apache.polaris.persistence.nosql.api.ref.Reference;
+import 
org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector;
+
+/** {@link RetainedCollector} implementation, per realm. */
+final class RetainedCollectorImpl implements Persistence, RetainedCollector {
+  private final Persistence persistence;
+  private final AllRetained allRetained;
+  private final String realmId;
+  private final Map<String, List<ObjTypeRetainedIdentifier>> 
objTypeRetainedIdentifiers;
+
+  private final Set<Long> currentNesting = new HashSet<>();
+
+  RetainedCollectorImpl(
+      Persistence persistence,
+      AllRetained allRetained,
+      Map<String, List<ObjTypeRetainedIdentifier>> objTypeRetainedIdentifiers) 
{
+    this.persistence = persistence;
+    this.allRetained = allRetained;
+    this.realmId = persistence.realmId();
+    this.objTypeRetainedIdentifiers = objTypeRetainedIdentifiers;
+  }
+
+  @Nonnull
+  @Override
+  public String realm() {
+    return realmId;
+  }
+
+  @Nonnull
+  @Override
+  public Persistence realmPersistence() {
+    return this;
+  }
+
+  @Override
+  public void retainObject(@Nonnull ObjRef objRef) {
+    if (!currentNesting.add(objRef.id())) {
+      return;
+    }
+    try {
+      allRetained.addRetainedObj(realmId, objRef.id());
+
+      var otIdents = objTypeRetainedIdentifiers.get(objRef.type());
+      if (otIdents != null) {
+        for (var otIdent : otIdents) {
+          otIdent.identifyRelatedObj(this, objRef);
+        }
+      }
+    } finally {
+      currentNesting.remove(objRef.id());
+    }
+  }
+
+  @Override
+  public void retainReference(@Nonnull String name) {
+    allRetained.addRetainedRef(realmId, name);
+  }
+
+  // Persistence delegate
+
+  @Nonnull
+  @Override
+  public Reference createReference(@Nonnull String name, @Nonnull 
Optional<ObjRef> pointer)
+      throws ReferenceAlreadyExistsException {
+    retainReference(name);
+    pointer.ifPresent(this::retainObject);
+    return persistence.createReference(name, pointer);
+  }
+
+  @Override
+  public void createReferenceSilent(@Nonnull String name) {
+    retainReference(name);
+    persistence.createReferenceSilent(name);
+  }
+
+  @Override
+  public void createReferencesSilent(Set<String> referenceNames) {
+    referenceNames.forEach(this::retainReference);
+    persistence.createReferencesSilent(referenceNames);
+  }
+
+  @Nonnull
+  @Override
+  public Reference fetchOrCreateReference(
+      @Nonnull String name, @Nonnull Supplier<Optional<ObjRef>> 
pointerForCreate) {
+    try {
+      return fetchReference(name);
+    } catch (ReferenceNotFoundException e) {
+      try {
+        var objRef = pointerForCreate.get();
+        objRef.ifPresent(this::retainObject);
+        return createReference(name, objRef);
+      } catch (ReferenceAlreadyExistsException x) {
+        // Unlikely that we ever get here (ref does not exist (but then 
concurrently created)
+        return fetchReference(name);
+      }
+    }
+  }
+
+  @Nonnull
+  @Override
+  public Optional<Reference> updateReferencePointer(
+      @Nonnull Reference reference, @Nonnull ObjRef newPointer) throws 
ReferenceNotFoundException {
+    retainReference(reference.name());
+    retainObject(newPointer);
+    return persistence.updateReferencePointer(reference, newPointer);
+  }
+
+  @Nonnull
+  @Override
+  public Reference fetchReference(@Nonnull String name) throws 
ReferenceNotFoundException {
+    retainReference(name);
+    var ref = persistence.fetchReference(name);
+    ref.pointer().ifPresent(this::retainObject);
+    return ref;
+  }
+
+  @Nonnull
+  @Override
+  public Reference fetchReferenceForUpdate(@Nonnull String name) throws 
ReferenceNotFoundException {
+    retainReference(name);
+    var ref = persistence.fetchReferenceForUpdate(name);
+    ref.pointer().ifPresent(this::retainObject);
+    return ref;
+  }
+
+  @Override
+  public <T extends Obj> Optional<T> fetchReferenceHead(
+      @Nonnull String name, @Nonnull Class<T> clazz) throws 
ReferenceNotFoundException {
+    retainReference(name);
+    var ref = persistence.fetchReferenceHead(name, clazz);
+    ref.ifPresent(head -> retainObject(objRef(head)));
+    return ref;
+  }
+
+  @Nullable
+  @Override
+  public <T extends Obj> T fetch(@Nonnull ObjRef id, @Nonnull Class<T> clazz) {
+    retainObject(id);
+    return persistence.fetch(id, clazz);
+  }
+
+  @Nonnull
+  @Override
+  public <T extends Obj> T[] fetchMany(@Nonnull Class<T> clazz, @Nonnull 
ObjRef... ids) {
+    for (var id : ids) {
+      if (id != null) {
+        retainObject(id);
+      }
+    }
+    return persistence.fetchMany(clazz, ids);
+  }
+
+  @Nonnull
+  @Override
+  public <T extends Obj> T write(@Nonnull T obj, @Nonnull Class<T> clazz) {
+    retainObject(objRef(obj));
+    return persistence.write(obj, clazz);
+  }
+
+  @SafeVarargs
+  @Nonnull
+  @Override
+  public final <T extends Obj> T[] writeMany(@Nonnull Class<T> clazz, @Nonnull 
T... objs) {
+    for (var obj : objs) {
+      if (obj != null) {
+        retainObject(objRef(obj));
+      }
+    }
+    return persistence.writeMany(clazz, objs);
+  }
+
+  @Override
+  public void delete(@Nonnull ObjRef id) {
+    persistence.delete(id);
+  }
+
+  @Override
+  public void deleteMany(@Nonnull ObjRef... ids) {
+    persistence.deleteMany(ids);
+  }
+
+  @Nullable
+  @Override
+  public <T extends Obj> T conditionalInsert(@Nonnull T obj, @Nonnull Class<T> 
clazz) {
+    retainObject(objRef(obj));
+    return persistence.conditionalInsert(obj, clazz);
+  }
+
+  @Nullable
+  @Override
+  public <T extends Obj> T conditionalUpdate(
+      @Nonnull T expected, @Nonnull T update, @Nonnull Class<T> clazz) {
+    retainObject(objRef(update));
+    return persistence.conditionalUpdate(expected, update, clazz);
+  }
+
+  @Override
+  public <T extends Obj> boolean conditionalDelete(@Nonnull T expected, 
Class<T> clazz) {
+    retainObject(objRef(expected));
+    return persistence.conditionalDelete(expected, clazz);
+  }
+
+  @Override
+  public PersistenceParams params() {
+    return persistence.params();
+  }
+
+  @Override
+  public int maxSerializedValueSize() {
+    return persistence.maxSerializedValueSize();
+  }
+
+  @Override
+  public long generateId() {
+    return persistence.generateId();
+  }
+
+  @Override
+  public ObjRef generateObjId(ObjType type) {
+    return persistence.generateObjId(type);
+  }
+
+  @Nullable
+  @Override
+  public <T extends Obj> T getImmediate(@Nonnull ObjRef id, @Nonnull Class<T> 
clazz) {
+    retainObject(id);
+    return persistence.getImmediate(id, clazz);
+  }
+
+  @Override
+  public String realmId() {
+    return persistence.realmId();
+  }
+
+  @Override
+  public MonotonicClock monotonicClock() {
+    return persistence.monotonicClock();
+  }
+
+  @Override
+  public IdGenerator idGenerator() {
+    return persistence.idGenerator();
+  }
+
+  @Override
+  public <V> UpdatableIndex<V> buildWriteIndex(
+      @Nullable IndexContainer<V> indexContainer,
+      @Nonnull IndexValueSerializer<V> indexValueSerializer) {
+    return persistence.buildWriteIndex(indexContainer, indexValueSerializer);
+  }
+
+  @Override
+  public <V> Index<V> buildReadIndex(
+      @Nullable IndexContainer<V> indexContainer,
+      @Nonnull IndexValueSerializer<V> indexValueSerializer) {
+    return persistence.buildReadIndex(indexContainer, indexValueSerializer);
+  }
+
+  @Override
+  public <REF_OBJ extends BaseCommitObj, RESULT> Committer<REF_OBJ, RESULT> 
createCommitter(
+      @Nonnull String refName,
+      @Nonnull Class<REF_OBJ> referencedObjType,
+      @Nonnull Class<RESULT> resultType) {
+    throw new UnsupportedOperationException(
+        "Committing operations not supported during retained-objects 
identification");
+  }
+
+  @Override
+  public Commits commits() {
+    return new Commits() {
+      @Override
+      public <C extends BaseCommitObj> Iterator<C> commitLog(
+          String refName, OptionalLong offset, Class<C> clazz) {
+        checkArgument(
+            offset.isEmpty(), "Commit offset must be empty during 
retained-objects identification");
+
+        var ref = fetchReference(refName);
+
+        return ref.pointer()
+            .map(
+                head ->
+                    (Iterator<C>)
+                        new AbstractIterator<C>() {
+                          private ObjRef next = head;
+
+                          @Override
+                          protected C computeNext() {
+                            if (next == null) {
+                              return endOfData();
+                            }
+                            var r = fetch(next, clazz);
+                            if (r == null) {
+                              return endOfData();
+                            }
+                            next = r.directParent().orElse(null);
+                            return r;
+                          }
+                        })
+            .orElse(emptyIterator());
+      }
+
+      @Override
+      public <C extends BaseCommitObj> Iterator<C> commitLogReversed(
+          String refName, long offset, Class<C> clazz) {
+        throw new UnsupportedOperationException(
+            "Reversed commit scanning not supported during retained-objects 
identification");
+      }
+    };
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanHandler.java
 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanHandler.java
new file mode 100644
index 000000000..502ee3492
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanHandler.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import static 
org.apache.polaris.persistence.nosql.api.Realms.SYSTEM_REALM_PREFIX;
+import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
+
+import jakarta.annotation.Nonnull;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.function.LongSupplier;
+import org.apache.polaris.persistence.nosql.api.backend.Backend;
+import org.apache.polaris.persistence.nosql.api.backend.PersistId;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ScanHandler<I> implements AutoCloseable {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ScanHandler.class);
+
+  final String name;
+  final RateLimit rateLimit;
+  final long maxCreatedAtMicros;
+  final Set<String> realmsToRetain;
+  final Set<String> realmsToPurge;
+  final Consumer<String> seenRealmsToPurge;
+  final RetainCheck<I> retainCheck;
+  final int deleteBatchSize;
+  final Consumer<Map<String, Set<I>>> batchDelete;
+  final ScanItemCallback<I> itemCallback;
+
+  final Map<String, Set<I>> deletions = new HashMap<>();
+  int numDeletes;
+
+  ScanHandler(
+      String name,
+      OptionalInt rateLimit,
+      long maxCreatedAtMicros,
+      Set<String> realmsToRetain,
+      Set<String> realmsToPurge,
+      Consumer<String> seenRealmsToPurge,
+      RetainCheck<I> retainCheck,
+      int deleteBatchSize,
+      Consumer<Map<String, Set<I>>> batchDelete,
+      ScanItemCallback<I> itemCallback) {
+    this.name = name;
+    this.rateLimit = RateLimit.create(rateLimit.orElse(-1));
+    this.maxCreatedAtMicros = maxCreatedAtMicros;
+    this.realmsToRetain = realmsToRetain;
+    this.realmsToPurge = realmsToPurge;
+    this.seenRealmsToPurge = seenRealmsToPurge;
+    this.retainCheck = retainCheck;
+    this.deleteBatchSize = deleteBatchSize;
+    this.batchDelete = batchDelete;
+    this.itemCallback = itemCallback;
+  }
+
+  void scanned(String realmId, I id, long createdAtMicros) {
+    if (realmId.startsWith(SYSTEM_REALM_PREFIX) && 
!realmsToRetain.contains(realmId)) {
+      // some system realm, ignore
+      return;
+    }
+
+    rateLimit.acquire();
+    ScanItemOutcome outcome;
+    if (realmsToPurge.contains(realmId)) {
+      outcome = ScanItemOutcome.REALM_PURGE;
+      purge(realmId, id);
+      seenRealmsToPurge.accept(realmId);
+    } else if (createdAtMicros > maxCreatedAtMicros) {
+      outcome = ScanItemOutcome.TOO_NEW_RETAINED;
+    } else if (realmsToRetain.contains(realmId)) {
+      if (retainCheck.check(realmId, id)) {
+        outcome = ScanItemOutcome.RETAINED;
+      } else {
+        outcome = ScanItemOutcome.PURGED;
+        purge(realmId, id);
+      }
+    } else {
+      outcome = ScanItemOutcome.UNHANDLED_RETAINED;
+    }
+    itemCallback.itemOutcome(realmId, id, outcome);
+    LOGGER.debug(
+        "Got '{}' {} {} -> {}, createdAtMicros = {}",
+        realmId,
+        name,
+        id,
+        outcome.message,
+        createdAtMicros);
+  }
+
+  private void purge(String realmId, I id) {
+    LOGGER.debug("Enqueuing delete for '{}' {}", realmId, id);
+    deletions.computeIfAbsent(realmId, k -> new HashSet<>()).add(id);
+    numDeletes++;
+    if (numDeletes == deleteBatchSize) {
+      flushDeletes();
+    }
+  }
+
+  private void flushDeletes() {
+    LOGGER.debug("Flushing {} {} deletions", numDeletes, name);
+    batchDelete.accept(deletions);
+    deletions.clear();
+    numDeletes = 0;
+  }
+
+  @Override
+  public void close() {
+    if (numDeletes > 0) {
+      flushDeletes();
+    }
+  }
+
+  public Backend.ObjScanCallback asObjScanCallback(LongSupplier clock) {
+    return new ProgressObjScanCallback(clock);
+  }
+
+  public Backend.ReferenceScanCallback asReferenceScanCallback(LongSupplier 
clock) {
+    return new ProgressReferenceScanCallback(clock);
+  }
+
+  @FunctionalInterface
+  interface RetainCheck<I> {
+    boolean check(String realm, I id);
+  }
+
+  private abstract static class ProgressCallback {
+    private final LongSupplier clock;
+    private long nextLog;
+    private long scanned;
+
+    ProgressCallback(LongSupplier clock) {
+      this.clock = clock;
+      nextLog = clock.getAsLong() + 2_000L;
+    }
+
+    protected void called(String what) {
+      var s = scanned++;
+      var now = clock.getAsLong();
+      if (now >= nextLog) {
+        LOGGER.info("... scanned {} {} so far", s, what);
+        nextLog = now + 2_000L;
+      }
+    }
+  }
+
+  private class ProgressReferenceScanCallback extends ProgressCallback
+      implements Backend.ReferenceScanCallback {
+
+    ProgressReferenceScanCallback(LongSupplier clock) {
+      super(clock);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void call(@Nonnull String realmId, @Nonnull String refName, long 
createdAtMicros) {
+      called("references");
+      ((ScanHandler<String>) ScanHandler.this).scanned(realmId, refName, 
createdAtMicros);
+    }
+  }
+
+  private class ProgressObjScanCallback extends ProgressCallback
+      implements Backend.ObjScanCallback {
+    ProgressObjScanCallback(LongSupplier clock) {
+      super(clock);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void call(
+        @Nonnull String realmId,
+        @Nonnull String type,
+        @Nonnull PersistId id,
+        long createdAtMicros) {
+      called("objects");
+      ((ScanHandler<ObjRef>) ScanHandler.this)
+          .scanned(realmId, objRef(type, id.id(), id.part()), createdAtMicros);
+    }
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemCallback.java
 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemCallback.java
new file mode 100644
index 000000000..8d197cbb5
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemCallback.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import jakarta.annotation.Nonnull;
+
+@FunctionalInterface
+interface ScanItemCallback<I> {
+  void itemOutcome(@Nonnull String realm, @Nonnull I id, @Nonnull 
ScanItemOutcome outcome);
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemOutcome.java
 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemOutcome.java
new file mode 100644
index 000000000..f1a1565a2
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/ScanItemOutcome.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+enum ScanItemOutcome {
+  REALM_PURGE("realm purge"),
+  TOO_NEW_RETAINED("too new"),
+  RETAINED("retained"),
+  PURGED("purged"),
+  UNHANDLED_RETAINED("unhandled/retained"),
+  ;
+
+  final String message;
+
+  ScanItemOutcome(String message) {
+    this.message = message;
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java
 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java
new file mode 100644
index 000000000..9247f45ba
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/main/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Maintenance service implementation: do not directly use the types in this 
package.
+ *
+ * <p>Uses bloom filters to "collect" the references and objects to retain. 
The sizing of both
+ * filters uses the values of scanned references/objects of the last 
<em>successful</em> maintenance
+ * run, plus 10%. If no successful maintenance service run is present, the 
values of the maintenance
+ * configuration will be used.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/beans.xml
 
b/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/beans.xml
new file mode 100644
index 000000000..a297f1aa5
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/beans.xml
@@ -0,0 +1,24 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<beans xmlns="https://jakarta.ee/xml/ns/jakartaee";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee 
https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd";>
+    <!-- File required by Weld (used for testing), not by Quarkus -->
+</beans>
\ No newline at end of file
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType
 
b/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType
new file mode 100644
index 000000000..e1050243b
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/main/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceRunsObj$MaintenanceRunsObjType
+org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceRunObj$MaintenanceRunObjType
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/test/java/org/apache/polaris/persistence/nosql/maintenance/impl/TestMaintenance.java
 
b/persistence/nosql/persistence/maintenance/impl/src/test/java/org/apache/polaris/persistence/nosql/maintenance/impl/TestMaintenance.java
new file mode 100644
index 000000000..131065817
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/test/java/org/apache/polaris/persistence/nosql/maintenance/impl/TestMaintenance.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import static org.apache.polaris.persistence.nosql.api.obj.ObjRef.objRef;
+import static 
org.apache.polaris.persistence.nosql.maintenance.impl.MutableMaintenanceConfig.GRACE_TIME;
+
+import jakarta.inject.Inject;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.polaris.ids.api.SnowflakeIdGenerator;
+import org.apache.polaris.ids.mocks.MutableMonotonicClock;
+import org.apache.polaris.persistence.nosql.api.Persistence;
+import org.apache.polaris.persistence.nosql.api.RealmPersistenceFactory;
+import 
org.apache.polaris.persistence.nosql.api.exceptions.ReferenceNotFoundException;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig;
+import 
org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunInformation.MaintenanceStats;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceRunSpec;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceService;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.jboss.weld.junit5.EnableWeld;
+import org.jboss.weld.junit5.WeldInitiator;
+import org.jboss.weld.junit5.WeldSetup;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@SuppressWarnings("CdiInjectionPointsInspection")
+@ExtendWith(SoftAssertionsExtension.class)
+@EnableWeld
+public class TestMaintenance {
+  @InjectSoftAssertions protected SoftAssertions soft;
+
+  @WeldSetup WeldInitiator weld = WeldInitiator.performDefaultDiscovery();
+
+  String realmOne;
+  String realmTwo;
+  Persistence persOne;
+  Persistence persTwo;
+
+  @Inject MaintenanceService maintenance;
+  @Inject RealmPersistenceFactory realmPersistenceFactory;
+  @Inject MutableMonotonicClock mutableMonotonicClock;
+
+  @BeforeEach
+  protected void setup() {
+    RealmIdentOne.testCallback = c -> true;
+    RealmIdentTwo.testCallback = c -> true;
+    ObjTypeIdentOne.testCallback = (c, id) -> {};
+    ObjTypeIdentTwo.testCallback = (c, id) -> {};
+
+    // Set the "grace time" to 0 so tests can write refs+objs and get those 
purged
+    MutableMaintenanceConfig.setCurrent(
+        MaintenanceConfig.builder().createdAtGraceTime(GRACE_TIME).build());
+
+    realmOne = UUID.randomUUID().toString();
+    realmTwo = UUID.randomUUID().toString();
+
+    // 'skipDecorators' is used to bypass the cache, which cannot be 
consistent after maintenance
+    // purged some references/objects
+    persOne = 
realmPersistenceFactory.newBuilder().realmId(realmOne).skipDecorators().build();
+    persTwo = 
realmPersistenceFactory.newBuilder().realmId(realmTwo).skipDecorators().build();
+  }
+
+  @AfterEach
+  protected void cleanup() {
+    mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+    // Use maintenance to clean the backend for the next test
+    maintenance.performMaintenance(
+        MaintenanceRunSpec.builder()
+            .includeSystemRealm(false)
+            .realmsToPurge(Set.of(realmOne, realmTwo))
+            .build());
+  }
+
+  @Test
+  public void noRealmsSpecified() {
+    
persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(), 
ObjOne.class);
+    
persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(), 
ObjTwo.class);
+
+    persOne.createReference("ref1", Optional.empty());
+    persTwo.createReference("ref1", Optional.empty());
+
+    mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+    // Run maintenance, no realm given to retain or purge, must not purge 
anything
+    var runInfo =
+        maintenance.performMaintenance(
+            MaintenanceRunSpec.builder().includeSystemRealm(false).build());
+
+    soft.assertThat(runInfo.referenceStats())
+        
.contains(MaintenanceStats.builder().scanned(2L).purged(0L).retained(2L).newer(0L).build());
+    soft.assertThat(runInfo.objStats())
+        
.contains(MaintenanceStats.builder().scanned(2L).purged(0L).retained(2L).newer(0L).build());
+    soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(0L);
+    soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(0L);
+  }
+
+  @Test
+  public void simple() {
+    var rOneObj1 =
+        
persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(), 
ObjOne.class);
+    var rTwoObj2 =
+        
persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(), 
ObjTwo.class);
+
+    var systemRealmCalled = new AtomicInteger();
+    RealmIdentOne.testCallback =
+        collector -> {
+          if (collector.isSystemRealm()) {
+            systemRealmCalled.incrementAndGet();
+          }
+          return true;
+        };
+
+    persOne.createReference("ref1", Optional.empty());
+    persOne.createReference("ref2", Optional.empty());
+    persTwo.createReference("ref1", Optional.empty());
+    persTwo.createReference("ref2", Optional.empty());
+
+    mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+    // Run maintenance, provide realms, must purge unidentified
+    var runInfo =
+        maintenance.performMaintenance(
+            MaintenanceRunSpec.builder()
+                .includeSystemRealm(false)
+                .realmsToProcess(Set.of(realmOne, realmTwo))
+                .build());
+
+    soft.assertThat(runInfo.referenceStats())
+        
.contains(MaintenanceStats.builder().scanned(4L).purged(4L).retained(0L).newer(0L).build());
+    soft.assertThat(runInfo.objStats())
+        
.contains(MaintenanceStats.builder().scanned(2L).purged(2L).retained(0L).newer(0L).build());
+    soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(0L);
+    soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(0L);
+
+    soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+        .isThrownBy(() -> persOne.fetchReference("ref1"));
+    soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+        .isThrownBy(() -> persOne.fetchReference("ref2"));
+    soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+        .isThrownBy(() -> persTwo.fetchReference("ref1"));
+    soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+        .isThrownBy(() -> persTwo.fetchReference("ref2"));
+
+    soft.assertThat(persOne.fetch(objRef(rOneObj1), ObjOne.class)).isNull();
+    soft.assertThat(persTwo.fetch(objRef(rTwoObj2), ObjTwo.class)).isNull();
+  }
+
+  @Test
+  public void systemRealm() {
+    var systemRealmCalled = new AtomicInteger();
+    RealmIdentOne.testCallback =
+        collector -> {
+          if (collector.isSystemRealm()) {
+            systemRealmCalled.incrementAndGet();
+          }
+          return true;
+        };
+
+    mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+    // Run maintenance, provide realms, must purge unidentified
+    var runInfo =
+        maintenance.performMaintenance(
+            MaintenanceRunSpec.builder()
+                .includeSystemRealm(true) // default
+                .build());
+
+    soft.assertThat(systemRealmCalled).hasValue(1);
+
+    soft.assertThat(runInfo.referenceStats())
+        
.contains(MaintenanceStats.builder().scanned(1L).purged(0L).retained(0L).newer(1L).build());
+    soft.assertThat(runInfo.objStats())
+        
.contains(MaintenanceStats.builder().scanned(4L).purged(0L).retained(0L).newer(4L).build());
+    soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(2L);
+    soft.assertThat(runInfo.identifiedObjs().orElse(-1))
+        .isEqualTo((1 << SnowflakeIdGenerator.DEFAULT_NODE_ID_BITS) + 4L);
+  }
+
+  @Test
+  public void simpleRetainViaRealmIdentifier() {
+    var rOneObj1 =
+        
persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(), 
ObjOne.class);
+    var rTwoObj2 =
+        
persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(), 
ObjTwo.class);
+
+    persOne.createReference("ref1", Optional.empty());
+    persOne.createReference("ref2", Optional.empty());
+    persTwo.createReference("ref1", Optional.empty());
+    persTwo.createReference("ref2", Optional.empty());
+
+    // identify rOneObj1 as "live"
+    RealmIdentOne.testCallback =
+        c -> {
+          if (c.realm().equals(realmOne)) {
+            c.retainObject(objRef(rOneObj1));
+            c.retainReference("ref1");
+          }
+          return true;
+        };
+
+    mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+    // Run maintenance, provide realms, must purge unidentified
+    var runInfo =
+        maintenance.performMaintenance(
+            MaintenanceRunSpec.builder()
+                .includeSystemRealm(false)
+                .realmsToProcess(Set.of(realmOne, realmTwo))
+                .build());
+
+    soft.assertThat(runInfo.referenceStats())
+        
.contains(MaintenanceStats.builder().scanned(4L).purged(3L).retained(1L).newer(0L).build());
+    soft.assertThat(runInfo.objStats())
+        
.contains(MaintenanceStats.builder().scanned(2L).purged(1L).retained(1L).newer(0L).build());
+    soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(1L);
+    soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(1L);
+
+    soft.assertThatCode(() -> 
persOne.fetchReference("ref1")).doesNotThrowAnyException();
+    soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+        .isThrownBy(() -> persOne.fetchReference("ref2"));
+    soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+        .isThrownBy(() -> persTwo.fetchReference("ref1"));
+    soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+        .isThrownBy(() -> persTwo.fetchReference("ref2"));
+
+    soft.assertThat(persOne.fetch(objRef(rOneObj1), 
ObjOne.class)).isEqualTo(rOneObj1);
+    soft.assertThat(persTwo.fetch(objRef(rTwoObj2), ObjTwo.class)).isNull();
+  }
+
+  @Test
+  public void simpleRetainViaRealmIdentifierPersistence() {
+    var rOneObj1 =
+        
persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(), 
ObjOne.class);
+    var rTwoObj2 =
+        
persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(), 
ObjTwo.class);
+
+    persOne.createReference("ref1", Optional.empty());
+    persOne.createReference("ref2", Optional.empty());
+    persTwo.createReference("ref1", Optional.empty());
+    persTwo.createReference("ref2", Optional.empty());
+
+    // identify rOneObj1 as "live"
+    RealmIdentOne.testCallback =
+        c -> {
+          if (c.realm().equals(realmOne)) {
+            c.realmPersistence().fetch(objRef(rOneObj1), ObjOne.class);
+            c.realmPersistence().fetchReference("ref1");
+          }
+          return true;
+        };
+
+    mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+    // Run maintenance, provide realms, must purge unidentified
+    var runInfo =
+        maintenance.performMaintenance(
+            MaintenanceRunSpec.builder()
+                .includeSystemRealm(false)
+                .realmsToProcess(Set.of(realmOne, realmTwo))
+                .build());
+
+    soft.assertThat(runInfo.referenceStats())
+        
.contains(MaintenanceStats.builder().scanned(4L).purged(3L).retained(1L).newer(0L).build());
+    soft.assertThat(runInfo.objStats())
+        
.contains(MaintenanceStats.builder().scanned(2L).purged(1L).retained(1L).newer(0L).build());
+    soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(1L);
+    soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(1L);
+
+    soft.assertThatCode(() -> 
persOne.fetchReference("ref1")).doesNotThrowAnyException();
+    soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+        .isThrownBy(() -> persOne.fetchReference("ref2"));
+    soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+        .isThrownBy(() -> persTwo.fetchReference("ref1"));
+    soft.assertThatExceptionOfType(ReferenceNotFoundException.class)
+        .isThrownBy(() -> persTwo.fetchReference("ref2"));
+
+    soft.assertThat(persOne.fetch(objRef(rOneObj1), 
ObjOne.class)).isEqualTo(rOneObj1);
+    soft.assertThat(persTwo.fetch(objRef(rTwoObj2), ObjTwo.class)).isNull();
+  }
+
+  @Test
+  public void simpleRetainViaObjTypeIdentifier() {
+    var rOneObj1 =
+        
persOne.write(ObjOne.builder().text("foo1").id(persOne.generateId()).build(), 
ObjOne.class);
+    var rOneObj2 =
+        
persOne.write(ObjTwo.builder().text("foo2").id(persOne.generateId()).build(), 
ObjTwo.class);
+    var rTwoObj1 =
+        
persTwo.write(ObjOne.builder().text("bar2").id(persTwo.generateId()).build(), 
ObjOne.class);
+    var rTwoObj2 =
+        
persTwo.write(ObjTwo.builder().text("bar2").id(persTwo.generateId()).build(), 
ObjTwo.class);
+
+    // identify rOneObj1 as "live"
+    RealmIdentOne.testCallback =
+        c -> {
+          c.retainObject(objRef(rOneObj1));
+          return true;
+        };
+    // identify rObjObj2 via obj-type callback
+    ObjTypeIdentOne.testCallback =
+        (c, id) -> {
+          if (id.equals(objRef(rOneObj1))) {
+            c.retainObject(objRef(rOneObj2));
+          }
+        };
+    // identify rTwoObj1
+    RealmIdentTwo.testCallback =
+        c -> {
+          c.retainObject(objRef(rTwoObj1));
+          return true;
+        };
+
+    mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+    // Run maintenance, provide realms, must purge unidentified
+    var runInfo =
+        maintenance.performMaintenance(
+            MaintenanceRunSpec.builder()
+                .includeSystemRealm(false)
+                .realmsToProcess(Set.of(realmOne, realmTwo))
+                .build());
+
+    soft.assertThat(runInfo.referenceStats())
+        
.contains(MaintenanceStats.builder().scanned(0L).purged(0L).retained(0L).newer(0L).build());
+    soft.assertThat(runInfo.objStats())
+        
.contains(MaintenanceStats.builder().scanned(4L).purged(1L).retained(3L).newer(0L).build());
+    soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(0L);
+    soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(6L);
+
+    soft.assertThat(persOne.fetch(objRef(rOneObj1), 
ObjOne.class)).isEqualTo(rOneObj1);
+    soft.assertThat(persOne.fetch(objRef(rOneObj2), 
ObjTwo.class)).isEqualTo(rOneObj2);
+    soft.assertThat(persTwo.fetch(objRef(rTwoObj1), 
ObjOne.class)).isEqualTo(rTwoObj1);
+    soft.assertThat(persTwo.fetch(objRef(rTwoObj2), ObjTwo.class)).isNull();
+  }
+
+  @Test
+  public void noRealmIdentifierHandlesRealms() {
+    var rOneObj1 =
+        
persOne.write(ObjOne.builder().text("foo").id(persOne.generateId()).build(), 
ObjOne.class);
+    var rTwoObj2 =
+        
persTwo.write(ObjTwo.builder().text("bar").id(persTwo.generateId()).build(), 
ObjTwo.class);
+    persOne.createReference("ref1", Optional.empty());
+    persOne.createReference("ref2", Optional.empty());
+    persTwo.createReference("ref1", Optional.empty());
+    persTwo.createReference("ref2", Optional.empty());
+
+    RealmIdentOne.testCallback = c -> false;
+    RealmIdentTwo.testCallback = c -> false;
+
+    mutableMonotonicClock.advanceBoth(GRACE_TIME);
+
+    // Run maintenance, provide realms, no realm-identifier handles realm, 
must NOT purge
+    var runInfo =
+        maintenance.performMaintenance(
+            MaintenanceRunSpec.builder()
+                .includeSystemRealm(false)
+                .realmsToProcess(Set.of(realmOne, realmTwo))
+                .build());
+
+    soft.assertThat(runInfo.referenceStats())
+        
.contains(MaintenanceStats.builder().scanned(4L).purged(0L).retained(4L).newer(0L).build());
+    soft.assertThat(runInfo.objStats())
+        
.contains(MaintenanceStats.builder().scanned(2L).purged(0L).retained(2L).newer(0L).build());
+    soft.assertThat(runInfo.identifiedReferences().orElse(-1)).isEqualTo(0L);
+    soft.assertThat(runInfo.identifiedObjs().orElse(-1)).isEqualTo(0L);
+
+    soft.assertThat(persOne.fetch(objRef(rOneObj1), 
ObjOne.class)).isEqualTo(rOneObj1);
+    soft.assertThat(persTwo.fetch(objRef(rTwoObj2), 
ObjTwo.class)).isEqualTo(rTwoObj2);
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/test/resources/weld.properties
 
b/persistence/nosql/persistence/maintenance/impl/src/test/resources/weld.properties
new file mode 100644
index 000000000..c26169e0e
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/test/resources/weld.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# See https://bugs.openjdk.org/browse/JDK-8349545
+org.jboss.weld.bootstrap.concurrentDeployment=false
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceConfigurationProducer.java
 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceConfigurationProducer.java
new file mode 100644
index 000000000..3e283e955
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MaintenanceConfigurationProducer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Produces;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig;
+
+@ApplicationScoped
+public class MaintenanceConfigurationProducer {
+  public static MutableMaintenanceConfig config = new 
MutableMaintenanceConfig();
+
+  @Produces
+  MaintenanceConfig produceMaintenanceConfig() {
+    return config;
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MutableMaintenanceConfig.java
 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MutableMaintenanceConfig.java
new file mode 100644
index 000000000..d2bbc31ad
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/MutableMaintenanceConfig.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import static 
org.apache.polaris.persistence.nosql.maintenance.impl.MaintenanceServiceImpl.MIN_GRACE_TIME_MINUTES;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import org.apache.polaris.persistence.nosql.maintenance.api.MaintenanceConfig;
+
+public class MutableMaintenanceConfig implements MaintenanceConfig {
+  /** Minimum allowed by MaintenanceServiceImpl. */
+  public static final Duration GRACE_TIME = 
Duration.ofMinutes(MIN_GRACE_TIME_MINUTES);
+
+  private static MaintenanceConfig current = 
MaintenanceConfig.builder().build();
+
+  public static void setCurrent(MaintenanceConfig config) {
+    current = config;
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @Override
+  public OptionalLong expectedReferenceCount() {
+    return current.expectedReferenceCount();
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @Override
+  public OptionalLong expectedObjCount() {
+    return current.expectedObjCount();
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @Override
+  public OptionalDouble countFromLastRunMultiplier() {
+    return current.countFromLastRunMultiplier();
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @Override
+  public OptionalDouble filterInitializedFpp() {
+    return current.filterInitializedFpp();
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @Override
+  public OptionalDouble maxAcceptableFilterFpp() {
+    return current.maxAcceptableFilterFpp();
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @Override
+  public OptionalInt retainedRuns() {
+    return current.retainedRuns();
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @JsonFormat(shape = JsonFormat.Shape.STRING)
+  @Override
+  public Optional<Duration> createdAtGraceTime() {
+    return current.createdAtGraceTime();
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @Override
+  public OptionalInt objectScanRateLimitPerSecond() {
+    return current.objectScanRateLimitPerSecond();
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @Override
+  public OptionalInt referenceScanRateLimitPerSecond() {
+    return current.referenceScanRateLimitPerSecond();
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_ABSENT)
+  @Override
+  public OptionalInt deleteBatchSize() {
+    return current.deleteBatchSize();
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjOne.java
 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjOne.java
new file mode 100644
index 000000000..72afcc076
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjOne.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import jakarta.annotation.Nullable;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.persistence.nosql.api.obj.AbstractObjType;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+
+@PolarisImmutable
+@JsonSerialize(as = ImmutableObjOne.class)
+@JsonDeserialize(as = ImmutableObjOne.class)
+public interface ObjOne extends Obj {
+
+  ObjType TYPE = new ObjOneType();
+
+  @Override
+  default ObjType type() {
+    return TYPE;
+  }
+
+  @Nullable
+  String text();
+
+  static ImmutableObjOne.Builder builder() {
+    return ImmutableObjOne.builder();
+  }
+
+  final class ObjOneType extends AbstractObjType<ObjOne> {
+    public ObjOneType() {
+      super("maint-test-one", "maint-one", ObjOne.class);
+    }
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTwo.java
 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTwo.java
new file mode 100644
index 000000000..571e5d20c
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTwo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import jakarta.annotation.Nullable;
+import org.apache.polaris.immutables.PolarisImmutable;
+import org.apache.polaris.persistence.nosql.api.obj.AbstractObjType;
+import org.apache.polaris.persistence.nosql.api.obj.Obj;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+
+@PolarisImmutable
+@JsonSerialize(as = ImmutableObjTwo.class)
+@JsonDeserialize(as = ImmutableObjTwo.class)
+public interface ObjTwo extends Obj {
+
+  ObjType TYPE = new ObjTwoType();
+
+  @Override
+  default ObjType type() {
+    return TYPE;
+  }
+
+  @Nullable
+  String text();
+
+  static ImmutableObjTwo.Builder builder() {
+    return ImmutableObjTwo.builder();
+  }
+
+  final class ObjTwoType extends AbstractObjType<ObjTwo> {
+    public ObjTwoType() {
+      super("maint-test-two", "maint-two", ObjTwo.class);
+    }
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentOne.java
 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentOne.java
new file mode 100644
index 000000000..be9ef1cd3
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentOne.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.util.function.BiConsumer;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+import 
org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector;
+
+@ApplicationScoped
+public class ObjTypeIdentOne implements ObjTypeRetainedIdentifier {
+
+  static BiConsumer<RetainedCollector, ObjRef> testCallback;
+
+  @Override
+  public String name() {
+    return "TEST ObjTypeRetainedIdentifier ONE";
+  }
+
+  @Nonnull
+  @Override
+  public ObjType handledObjType() {
+    return ObjOne.TYPE;
+  }
+
+  @Override
+  public void identifyRelatedObj(@Nonnull RetainedCollector collector, 
@Nonnull ObjRef objRef) {
+    if (testCallback != null) {
+      testCallback.accept(collector, objRef);
+    }
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentTwo.java
 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentTwo.java
new file mode 100644
index 000000000..9052a3882
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/ObjTypeIdentTwo.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.util.function.BiConsumer;
+import org.apache.polaris.persistence.nosql.api.obj.ObjRef;
+import org.apache.polaris.persistence.nosql.api.obj.ObjType;
+import 
org.apache.polaris.persistence.nosql.maintenance.spi.ObjTypeRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector;
+
+@ApplicationScoped
+public class ObjTypeIdentTwo implements ObjTypeRetainedIdentifier {
+  static BiConsumer<RetainedCollector, ObjRef> testCallback;
+
+  @Override
+  public String name() {
+    return "TEST ObjTypeRetainedIdentifier TWO";
+  }
+
+  @Nonnull
+  @Override
+  public ObjType handledObjType() {
+    return ObjTwo.TYPE;
+  }
+
+  @Override
+  public void identifyRelatedObj(@Nonnull RetainedCollector collector, 
@Nonnull ObjRef objRef) {
+    if (testCallback != null) {
+      testCallback.accept(collector, objRef);
+    }
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentOne.java
 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentOne.java
new file mode 100644
index 000000000..229aa2ac4
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentOne.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.util.function.Function;
+import 
org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector;
+
+@ApplicationScoped
+public class RealmIdentOne implements PerRealmRetainedIdentifier {
+  static Function<RetainedCollector, Boolean> testCallback;
+
+  @Override
+  public String name() {
+    return "TEST RealmRetainedIdentifier ONE";
+  }
+
+  @Override
+  public boolean identifyRetained(@Nonnull RetainedCollector collector) {
+    return testCallback != null ? testCallback.apply(collector) : false;
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentTwo.java
 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentTwo.java
new file mode 100644
index 000000000..22f2a61db
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/RealmIdentTwo.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.persistence.nosql.maintenance.impl;
+
+import jakarta.annotation.Nonnull;
+import jakarta.enterprise.context.ApplicationScoped;
+import java.util.function.Function;
+import 
org.apache.polaris.persistence.nosql.maintenance.spi.PerRealmRetainedIdentifier;
+import org.apache.polaris.persistence.nosql.maintenance.spi.RetainedCollector;
+
+@ApplicationScoped
+public class RealmIdentTwo implements PerRealmRetainedIdentifier {
+  static Function<RetainedCollector, Boolean> testCallback;
+
+  @Override
+  public String name() {
+    return "TEST RealmRetainedIdentifier TWO";
+  }
+
+  @Override
+  public boolean identifyRetained(@Nonnull RetainedCollector collector) {
+    return testCallback != null ? testCallback.apply(collector) : false;
+  }
+}
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java
 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java
new file mode 100644
index 000000000..b486d52fa
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/java/org/apache/polaris/persistence/nosql/maintenance/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.persistence.nosql.maintenance.impl;
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/beans.xml
 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/beans.xml
new file mode 100644
index 000000000..a297f1aa5
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/beans.xml
@@ -0,0 +1,24 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<beans xmlns="https://jakarta.ee/xml/ns/jakartaee";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="https://jakarta.ee/xml/ns/jakartaee 
https://jakarta.ee/xml/ns/jakartaee/beans_4_0.xsd";>
+    <!-- File required by Weld (used for testing), not by Quarkus -->
+</beans>
\ No newline at end of file
diff --git 
a/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType
 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType
new file mode 100644
index 000000000..c2c5d5f95
--- /dev/null
+++ 
b/persistence/nosql/persistence/maintenance/impl/src/testFixtures/resources/META-INF/services/org.apache.polaris.persistence.nosql.api.obj.ObjType
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+org.apache.polaris.persistence.nosql.maintenance.impl.ObjOne$ObjOneType
+org.apache.polaris.persistence.nosql.maintenance.impl.ObjTwo$ObjTwoType

Reply via email to