This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 542bb9d4aaa KAFKA-18063: SnapshotRegistry should not leak memory
(#17898)
542bb9d4aaa is described below
commit 542bb9d4aaad716d18fb36b3b1c704aa7fb2001e
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Nov 21 13:27:53 2024 -0800
KAFKA-18063: SnapshotRegistry should not leak memory (#17898)
SnapshotRegistry needs to have a reference to all snapshot data structures.
However, this should
not be a strong reference, but a weak reference, so that these data
structures can be garbage
collected as needed. This PR also adds a scrub mechanism so that we can
eventually reclaim the
slots used by GC'ed Revertable objects in the SnapshotRegistry.revertables
array.
Reviewers: David Jacot <[email protected]>
---
.../apache/kafka/timeline/SnapshotRegistry.java | 87 +++++++++++++++++++---
.../kafka/timeline/SnapshotRegistryTest.java | 24 ++++++
2 files changed, 101 insertions(+), 10 deletions(-)
diff --git
a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
index b35670a0bcc..c2b950f7fbe 100644
---
a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
+++
b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
@@ -21,17 +21,18 @@ import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;
+import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
-
/**
- * A registry containing snapshots of timeline data structures.
- * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
- * Therefore, we use ArrayLists here rather than a data structure with higher
overhead.
+ * A registry containing snapshots of timeline data structures. All timeline
data structures must
+ * be registered here, so that they can be reverted to the expected state when
desired.
+ * Because the registry only keeps a weak reference to each timeline data
structure, it does not
+ * prevent them from being garbage collected.
*/
public class SnapshotRegistry {
public static final long LATEST_EPOCH = Long.MAX_VALUE;
@@ -107,12 +108,39 @@ public class SnapshotRegistry {
private final Snapshot head = new Snapshot(Long.MIN_VALUE);
/**
- * Collection of all Revertable registered with this registry
+ * A collection of all Revertable objects registered here. Since we store
only weak
+ * references, every time we access a revertable through this list, we
must check to
+ * see if it has been garbage collected. If so, WeakReference.get will
return null.
+ *
+ * Although the garbage collector handles freeing the underlying
Revertables, over
+ * time slots in the ArrayList will fill up with expired references.
Therefore, after
+ * enough registrations, we scrub the ArrayList of the expired references
by creating
+ * a new arraylist.
*/
- private final List<Revertable> revertables = new ArrayList<>();
+ private List<WeakReference<Revertable>> revertables = new ArrayList<>();
+
+ /**
+ * The maximum number of registrations to allow before we compact the
revertable list.
+ */
+ private final int maxRegistrationsSinceScrub;
+
+ /**
+ * The number of registrations we have done since removing all expired
weak references.
+ */
+ private int numRegistrationsSinceScrub = 0;
+
+ /**
+ * The number of scrubs that we have done.
+ */
+ private long numScrubs = 0;
public SnapshotRegistry(LogContext logContext) {
+ this(logContext, 10_000);
+ }
+
+ public SnapshotRegistry(LogContext logContext, int
maxRegistrationsSinceScrub) {
this.log = logContext.logger(SnapshotRegistry.class);
+ this.maxRegistrationsSinceScrub = maxRegistrationsSinceScrub;
}
/**
@@ -271,21 +299,60 @@ public class SnapshotRegistry {
return head.prev().epoch();
}
+ /**
+ * Return the number of scrub operations that we have done.
+ */
+ public long numScrubs() {
+ return numScrubs;
+ }
+
/**
* Associate a revertable with this registry.
*/
public void register(Revertable revertable) {
- revertables.add(revertable);
+ numRegistrationsSinceScrub++;
+ if (numRegistrationsSinceScrub > maxRegistrationsSinceScrub) {
+ scrub();
+ }
+ revertables.add(new WeakReference<>(revertable));
+ }
+
+ /**
+ * Remove all expired weak references from the revertable list.
+ */
+ void scrub() {
+ ArrayList<WeakReference<Revertable>> newRevertables =
+ new ArrayList<>(revertables.size() / 2);
+ for (WeakReference<Revertable> ref : revertables) {
+ if (ref.get() != null) {
+ newRevertables.add(ref);
+ }
+ }
+ numScrubs++;
+ this.revertables = newRevertables;
+ numRegistrationsSinceScrub = 0;
}
/**
- * Delete all snapshots and resets all of the Revertable object registered.
+ * Delete all snapshots and reset all of the Revertable objects.
*/
public void reset() {
deleteSnapshotsUpTo(LATEST_EPOCH);
- for (Revertable revertable : revertables) {
- revertable.reset();
+ ArrayList<WeakReference<Revertable>> newRevertables = new
ArrayList<>();
+ for (WeakReference<Revertable> ref : revertables) {
+ Revertable revertable = ref.get();
+ if (revertable != null) {
+ try {
+ revertable.reset();
+ } catch (Exception e) {
+ log.error("Error reverting {}", revertable, e);
+ }
+ newRevertables.add(ref);
+ }
}
+ numScrubs++;
+ this.revertables = newRevertables;
+ numRegistrationsSinceScrub = 0;
}
}
diff --git
a/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java
b/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java
index 264c9231f9c..dacc91fa931 100644
---
a/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java
+++
b/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java
@@ -94,4 +94,28 @@ public class SnapshotRegistryTest {
assertEquals(latest, duplicate);
}
+
+ @Test
+ public void testScrub() {
+ SnapshotRegistry registry = new SnapshotRegistry(new LogContext(), 2);
+ new TimelineInteger(registry).set(123);
+ new TimelineInteger(registry).set(123);
+ assertEquals(0, registry.numScrubs());
+ new TimelineInteger(registry).set(123);
+ assertEquals(1, registry.numScrubs());
+ new TimelineInteger(registry).set(123);
+ new TimelineInteger(registry).set(123);
+ new TimelineInteger(registry).set(123);
+ assertEquals(2, registry.numScrubs());
+ }
+
+ @Test
+ public void testReset() {
+ SnapshotRegistry registry = new SnapshotRegistry(new LogContext(), 2);
+ TimelineInteger integer = new TimelineInteger(registry);
+ integer.set(123);
+ registry.reset();
+ assertEquals(0, integer.get());
+ assertEquals(1, registry.numScrubs());
+ }
}