This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 542bb9d4aaa KAFKA-18063: SnapshotRegistry should not leak memory 
(#17898)
542bb9d4aaa is described below

commit 542bb9d4aaad716d18fb36b3b1c704aa7fb2001e
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Nov 21 13:27:53 2024 -0800

    KAFKA-18063: SnapshotRegistry should not leak memory (#17898)
    
    SnapshotRegistry needs to have a reference to all snapshot data structures. 
However, this should
    not be a strong reference, but a weak reference, so that these data 
structures can be garbage
    collected as needed. This PR also adds a scrub mechanism so that we can 
eventually reclaim the
    slots used by GC'ed Revertable objects in the SnapshotRegistry.revertables 
array.
    
    Reviewers: David Jacot <[email protected]>
---
 .../apache/kafka/timeline/SnapshotRegistry.java    | 87 +++++++++++++++++++---
 .../kafka/timeline/SnapshotRegistryTest.java       | 24 ++++++
 2 files changed, 101 insertions(+), 10 deletions(-)

diff --git 
a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java 
b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
index b35670a0bcc..c2b950f7fbe 100644
--- 
a/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
+++ 
b/server-common/src/main/java/org/apache/kafka/timeline/SnapshotRegistry.java
@@ -21,17 +21,18 @@ import org.apache.kafka.common.utils.LogContext;
 
 import org.slf4j.Logger;
 
+import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
 
-
 /**
- * A registry containing snapshots of timeline data structures.
- * We generally expect a small number of snapshots-- perhaps 1 or 2 at a time.
- * Therefore, we use ArrayLists here rather than a data structure with higher 
overhead.
+ * A registry containing snapshots of timeline data structures. All timeline 
data structures must
+ * be registered here, so that they can be reverted to the expected state when 
desired.
+ * Because the registry only keeps a weak reference to each timeline data 
structure, it does not
+ * prevent them from being garbage collected.
  */
 public class SnapshotRegistry {
     public static final long LATEST_EPOCH = Long.MAX_VALUE;
@@ -107,12 +108,39 @@ public class SnapshotRegistry {
     private final Snapshot head = new Snapshot(Long.MIN_VALUE);
 
     /**
-     * Collection of all Revertable registered with this registry
+     * A collection of all Revertable objects registered here. Since we store 
only weak
+     * references, every time we access a revertable through this list, we 
must check to
+     * see if it has been garbage collected. If so, WeakReference.get will 
return null.
+     *
+     * Although the garbage collector handles freeing the underlying 
Revertables, over
+     * time slots in the ArrayList will fill up with expired references. 
Therefore, after
+     * enough registrations, we scrub the ArrayList of the expired references 
by creating
+     * a new arraylist.
      */
-    private final List<Revertable> revertables = new ArrayList<>();
+    private List<WeakReference<Revertable>> revertables = new ArrayList<>();
+
+    /**
+     * The maximum number of registrations to allow before we compact the 
revertable list.
+     */
+    private final int maxRegistrationsSinceScrub;
+
+    /**
+     * The number of registrations we have done since removing all expired 
weak references.
+     */
+    private int numRegistrationsSinceScrub = 0;
+
+    /**
+     * The number of scrubs that we have done.
+     */
+    private long numScrubs = 0;
 
     public SnapshotRegistry(LogContext logContext) {
+        this(logContext, 10_000);
+    }
+
+    public SnapshotRegistry(LogContext logContext, int 
maxRegistrationsSinceScrub) {
         this.log = logContext.logger(SnapshotRegistry.class);
+        this.maxRegistrationsSinceScrub = maxRegistrationsSinceScrub;
     }
 
     /**
@@ -271,21 +299,60 @@ public class SnapshotRegistry {
         return head.prev().epoch();
     }
 
+    /**
+     * Return the number of scrub operations that we have done.
+     */
+    public long numScrubs() {
+        return numScrubs;
+    }
+
     /**
      * Associate a revertable with this registry.
      */
     public void register(Revertable revertable) {
-        revertables.add(revertable);
+        numRegistrationsSinceScrub++;
+        if (numRegistrationsSinceScrub > maxRegistrationsSinceScrub) {
+            scrub();
+        }
+        revertables.add(new WeakReference<>(revertable));
+    }
+
+    /**
+     * Remove all expired weak references from the revertable list.
+     */
+    void scrub() {
+        ArrayList<WeakReference<Revertable>> newRevertables =
+            new ArrayList<>(revertables.size() / 2);
+        for (WeakReference<Revertable> ref : revertables) {
+            if (ref.get() != null) {
+                newRevertables.add(ref);
+            }
+        }
+        numScrubs++;
+        this.revertables = newRevertables;
+        numRegistrationsSinceScrub = 0;
     }
 
     /**
-     * Delete all snapshots and resets all of the Revertable object registered.
+     * Delete all snapshots and reset all of the Revertable objects.
      */
     public void reset() {
         deleteSnapshotsUpTo(LATEST_EPOCH);
 
-        for (Revertable revertable : revertables) {
-            revertable.reset();
+        ArrayList<WeakReference<Revertable>> newRevertables = new 
ArrayList<>();
+        for (WeakReference<Revertable> ref : revertables) {
+            Revertable revertable = ref.get();
+            if (revertable != null) {
+                try {
+                    revertable.reset();
+                } catch (Exception e) {
+                    log.error("Error reverting {}", revertable, e);
+                }
+                newRevertables.add(ref);
+            }
         }
+        numScrubs++;
+        this.revertables = newRevertables;
+        numRegistrationsSinceScrub = 0;
     }
 }
diff --git 
a/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java
 
b/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java
index 264c9231f9c..dacc91fa931 100644
--- 
a/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/timeline/SnapshotRegistryTest.java
@@ -94,4 +94,28 @@ public class SnapshotRegistryTest {
 
         assertEquals(latest, duplicate);
     }
+
+    @Test
+    public void testScrub() {
+        SnapshotRegistry registry = new SnapshotRegistry(new LogContext(), 2);
+        new TimelineInteger(registry).set(123);
+        new TimelineInteger(registry).set(123);
+        assertEquals(0, registry.numScrubs());
+        new TimelineInteger(registry).set(123);
+        assertEquals(1, registry.numScrubs());
+        new TimelineInteger(registry).set(123);
+        new TimelineInteger(registry).set(123);
+        new TimelineInteger(registry).set(123);
+        assertEquals(2, registry.numScrubs());
+    }
+
+    @Test
+    public void testReset() {
+        SnapshotRegistry registry = new SnapshotRegistry(new LogContext(), 2);
+        TimelineInteger integer = new TimelineInteger(registry);
+        integer.set(123);
+        registry.reset();
+        assertEquals(0, integer.get());
+        assertEquals(1, registry.numScrubs());
+    }
 }

Reply via email to