This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch allocator_auto_release
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c793b59e6095222adaf6e9a95286494c3b930236
Author: MrQuansy <[email protected]>
AuthorDate: Wed Dec 25 22:23:42 2024 +0800

    add auto release logic
---
 .../commons/binaryallocator/BinaryAllocator.java   | 40 +++++++++++++++++++---
 .../PooledBinaryPhantomReference.java              | 23 +++++++++++++
 .../iotdb/commons/binaryallocator/arena/Arena.java | 36 +++++++++++++++----
 .../binaryallocator/BinaryAllocatorTest.java       | 35 +++++++++++++++----
 4 files changed, 116 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java
index ca676176a60..12d11eb6344 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java
@@ -33,7 +33,11 @@ import org.apache.tsfile.utils.PooledBinary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.ref.ReferenceQueue;
 import java.time.Duration;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class BinaryAllocator {
@@ -49,14 +53,23 @@ public class BinaryAllocator {
 
   private final BinaryAllocatorMetrics metrics;
   private Evictor sampleEvictor;
+  // private TinyGC tinyGC;
   private static final ThreadLocal<ThreadArenaRegistry> arenaRegistry =
       ThreadLocal.withInitial(ThreadArenaRegistry::new);
 
-  private static final int WARNING_GC_TIME_PERCENTAGE = 10;
-  private static final int HALF_GC_TIME_PERCENTAGE = 20;
+  private static final int WARNING_GC_TIME_PERCENTAGE = 20;
+  private static final int HALF_GC_TIME_PERCENTAGE = 25;
   private static final int SHUTDOWN_GC_TIME_PERCENTAGE = 30;
   private static final int RESTART_GC_TIME_PERCENTAGE = 5;
 
+  private final AutoReleaseThread autoReleaseThread = new AutoReleaseThread();
+  public final ReferenceQueue<PooledBinary> referenceQueue = new 
ReferenceQueue<>();
+
+  // JDK 9+ Cleaner uses double-linked list and synchronized to manage 
references, which has worse
+  // performance than lock-free hash set
+  public final Set<PooledBinaryPhantomReference> phantomRefs =
+      Collections.newSetFromMap(new ConcurrentHashMap<>());
+
   public BinaryAllocator(AllocatorConfig allocatorConfig) {
     this.allocatorConfig = allocatorConfig;
 
@@ -89,6 +102,7 @@ public class BinaryAllocator {
             ThreadName.BINARY_ALLOCATOR_SAMPLE_EVICTOR.getName(),
             allocatorConfig.durationEvictorShutdownTimeout);
     sampleEvictor.startEvictor(allocatorConfig.durationBetweenEvictorRuns);
+    autoReleaseThread.start();
   }
 
   public synchronized void close(boolean forceClose) {
@@ -105,7 +119,7 @@ public class BinaryAllocator {
     }
   }
 
-  public PooledBinary allocateBinary(int reqCapacity) {
+  public PooledBinary allocateBinary(int reqCapacity, boolean autoRelease) {
     if (reqCapacity < allocatorConfig.minAllocateSize
         || reqCapacity > allocatorConfig.maxAllocateSize
         || state.get() != BinaryAllocatorState.OPEN) {
@@ -114,7 +128,7 @@ public class BinaryAllocator {
 
     Arena arena = arenaStrategy.choose(heapArenas);
 
-    return new PooledBinary(arena.allocate(reqCapacity), reqCapacity, 
arena.getArenaID());
+    return arena.allocate(reqCapacity, autoRelease);
   }
 
   public void deallocateBinary(PooledBinary binary) {
@@ -125,7 +139,7 @@ public class BinaryAllocator {
       int arenaIndex = binary.getArenaIndex();
       if (arenaIndex != -1) {
         Arena arena = heapArenas[arenaIndex];
-        arena.deallocate(binary.getValues());
+        arena.deallocate(binary);
       }
     }
   }
@@ -263,4 +277,20 @@ public class BinaryAllocator {
       metrics.updateSampleEvictionCounter(evictedSize);
     }
   }
+
+  /** Process phantomly reachable objects and return their byte arrays to 
pool. */
+  public class AutoReleaseThread extends Thread {
+    @Override
+    public void run() {
+      PooledBinaryPhantomReference ref;
+      try {
+        while ((ref = (PooledBinaryPhantomReference) referenceQueue.remove()) 
!= null) {
+          phantomRefs.remove(ref);
+          ref.slabRegion.deallocate(ref.byteArray);
+        }
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/PooledBinaryPhantomReference.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/PooledBinaryPhantomReference.java
new file mode 100644
index 00000000000..5ec8e1c6f0a
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/PooledBinaryPhantomReference.java
@@ -0,0 +1,23 @@
+package org.apache.iotdb.commons.binaryallocator;
+
+import org.apache.iotdb.commons.binaryallocator.arena.Arena;
+
+import org.apache.tsfile.utils.PooledBinary;
+
+import java.lang.ref.PhantomReference;
+import java.lang.ref.ReferenceQueue;
+
+public class PooledBinaryPhantomReference extends 
PhantomReference<PooledBinary> {
+  public final byte[] byteArray;
+  public Arena.SlabRegion slabRegion;
+
+  public PooledBinaryPhantomReference(
+      PooledBinary referent,
+      ReferenceQueue<? super PooledBinary> q,
+      byte[] byteArray,
+      Arena.SlabRegion region) {
+    super(referent, q);
+    this.byteArray = byteArray;
+    this.slabRegion = region;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/Arena.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/Arena.java
index 9a38cb1fe58..1a8a85ec370 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/Arena.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/Arena.java
@@ -20,10 +20,15 @@
 package org.apache.iotdb.commons.binaryallocator.arena;
 
 import org.apache.iotdb.commons.binaryallocator.BinaryAllocator;
+import org.apache.iotdb.commons.binaryallocator.PooledBinaryPhantomReference;
 import org.apache.iotdb.commons.binaryallocator.config.AllocatorConfig;
 import org.apache.iotdb.commons.binaryallocator.ema.AdaptiveWeightedAverage;
 import org.apache.iotdb.commons.binaryallocator.utils.SizeClasses;
 
+import org.apache.tsfile.utils.PooledBinary;
+
+import java.lang.ref.ReferenceQueue;
+import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -39,6 +44,9 @@ public class Arena {
 
   private int sampleCount;
 
+  private final ReferenceQueue<PooledBinary> referenceQueue;
+  private final Set<PooledBinaryPhantomReference> phantomRefs;
+
   public Arena(
       BinaryAllocator allocator, SizeClasses sizeClasses, int id, 
AllocatorConfig allocatorConfig) {
     this.binaryAllocator = allocator;
@@ -52,20 +60,31 @@ public class Arena {
     }
 
     sampleCount = 0;
+    referenceQueue = binaryAllocator.referenceQueue;
+    phantomRefs = binaryAllocator.phantomRefs;
   }
 
   public int getArenaID() {
     return arenaID;
   }
 
-  public byte[] allocate(int reqCapacity) {
+  public PooledBinary allocate(int reqCapacity, boolean autoRelease) {
     final int sizeIdx = sizeClasses.size2SizeIdx(reqCapacity);
-    return regions[sizeIdx].allocate();
+    byte[] data = regions[sizeIdx].allocate();
+    if (autoRelease) {
+      PooledBinary binary = new PooledBinary(data, reqCapacity, -1);
+      PooledBinaryPhantomReference ref =
+          new PooledBinaryPhantomReference(binary, referenceQueue, data, 
regions[sizeIdx]);
+      phantomRefs.add(ref);
+      return binary;
+    } else {
+      return new PooledBinary(data, reqCapacity, arenaID);
+    }
   }
 
-  public void deallocate(byte[] bytes) {
-    final int sizeIdx = sizeClasses.size2SizeIdx(bytes.length);
-    regions[sizeIdx].deallocate(bytes);
+  public void deallocate(PooledBinary binary) {
+    final int sizeIdx = sizeClasses.size2SizeIdx(binary.getLength());
+    regions[sizeIdx].deallocate(binary.getValues());
   }
 
   public long evict(double ratio) {
@@ -146,8 +165,13 @@ public class Arena {
     return evictedSize;
   }
 
-  private static class SlabRegion {
+  public static class SlabRegion {
     private final int byteArraySize;
+
+    // Current implementation uses ConcurrentLinkedQueue for simplicity
+    // TODO: Can be optimized with more efficient lock-free approaches:
+    // 1. No need for strict FIFO, it's just an object pool
+    // 2. Use segmented arrays/queues with per-segment counters to reduce 
contention
     private final ConcurrentLinkedQueue<byte[]> queue;
 
     private final AtomicInteger allocationsFromAllocator;
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorTest.java
index 0fb4f0d96b5..3314f26199c 100644
--- 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorTest.java
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorTest.java
@@ -42,19 +42,19 @@ public class BinaryAllocatorTest {
     BinaryAllocator binaryAllocator = new BinaryAllocator(config);
     binaryAllocator.resetArenaBinding();
 
-    PooledBinary binary = binaryAllocator.allocateBinary(255);
+    PooledBinary binary = binaryAllocator.allocateBinary(255, false);
     assertNotNull(binary);
     assertEquals(binary.getArenaIndex(), -1);
     assertEquals(binary.getLength(), 255);
     binaryAllocator.deallocateBinary(binary);
 
-    binary = binaryAllocator.allocateBinary(65536);
+    binary = binaryAllocator.allocateBinary(65536, false);
     assertNotNull(binary);
     assertEquals(binary.getArenaIndex(), 0);
     assertEquals(binary.getLength(), 65536);
     binaryAllocator.deallocateBinary(binary);
 
-    binary = binaryAllocator.allocateBinary(65535);
+    binary = binaryAllocator.allocateBinary(65535, false);
     assertNotNull(binary);
     assertEquals(binary.getArenaIndex(), 0);
     assertEquals(binary.getLength(), 65535);
@@ -67,8 +67,8 @@ public class BinaryAllocatorTest {
     BinaryAllocator binaryAllocator = new 
BinaryAllocator(AllocatorConfig.DEFAULT_CONFIG);
     binaryAllocator.resetArenaBinding();
 
-    PooledBinary binary1 = binaryAllocator.allocateBinary(4096);
-    PooledBinary binary2 = binaryAllocator.allocateBinary(4096);
+    PooledBinary binary1 = binaryAllocator.allocateBinary(4096, false);
+    PooledBinary binary2 = binaryAllocator.allocateBinary(4096, false);
     assertEquals(binary1.getArenaIndex(), binary2.getArenaIndex());
     binaryAllocator.deallocateBinary(binary1);
     binaryAllocator.deallocateBinary(binary2);
@@ -81,7 +81,7 @@ public class BinaryAllocatorTest {
           new Thread(
               () -> {
                 try {
-                  PooledBinary firstBinary = 
binaryAllocator.allocateBinary(2048);
+                  PooledBinary firstBinary = 
binaryAllocator.allocateBinary(2048, false);
                   int arenaId = firstBinary.getArenaIndex();
                   arenaUsageCount.merge(arenaId, 1, Integer::sum);
                   binaryAllocator.deallocateBinary(firstBinary);
@@ -107,7 +107,7 @@ public class BinaryAllocatorTest {
     BinaryAllocator binaryAllocator = new BinaryAllocator(config);
     binaryAllocator.resetArenaBinding();
 
-    PooledBinary binary = binaryAllocator.allocateBinary(4096);
+    PooledBinary binary = binaryAllocator.allocateBinary(4096, false);
     binaryAllocator.deallocateBinary(binary);
     assertEquals(binaryAllocator.getTotalUsedMemory(), 4096);
     Thread.sleep(200);
@@ -136,4 +136,25 @@ public class BinaryAllocatorTest {
       }
     }
   }
+
+  @Test
+  public void testAutoRelease() throws InterruptedException {
+    AllocatorConfig config = new AllocatorConfig();
+    config.minAllocateSize = 4096;
+    config.maxAllocateSize = 65536;
+    BinaryAllocator binaryAllocator = new BinaryAllocator(config);
+    binaryAllocator.resetArenaBinding();
+
+    PooledBinary binary = binaryAllocator.allocateBinary(4096, true);
+    assertNotNull(binary);
+    assertEquals(binary.getArenaIndex(), -1);
+    assertEquals(binary.getLength(), 4096);
+    assertEquals(binaryAllocator.getTotalUsedMemory(), 0);
+
+    // reference count is 0
+    binary = null;
+    System.gc();
+    Thread.sleep(100);
+    assertEquals(binaryAllocator.getTotalUsedMemory(), 4096);
+  }
 }

Reply via email to