This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ee8eb654b1 Add Binary Allocator (#14201)
8ee8eb654b1 is described below

commit 8ee8eb654b1df77f89e2f1077fe96485142db46b
Author: Mrquan <[email protected]>
AuthorDate: Thu Dec 5 13:17:15 2024 +0800

    Add Binary Allocator (#14201)
---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  21 ++
 .../conf/iotdb-system.properties.template          |  34 +++
 .../commons/binaryallocator/BinaryAllocator.java   | 260 +++++++++++++++++++++
 .../binaryallocator/BinaryAllocatorState.java      |  71 ++++++
 .../iotdb/commons/binaryallocator/arena/Arena.java | 233 ++++++++++++++++++
 .../binaryallocator/arena/ArenaStrategy.java       |  35 +++
 .../binaryallocator/config/AllocatorConfig.java    |  53 +++++
 .../ema/AdaptiveWeightedAverage.java               | 100 ++++++++
 .../commons/binaryallocator/evictor/Evictor.java   |  96 ++++++++
 .../metric/BinaryAllocatorMetrics.java             | 104 +++++++++
 .../commons/binaryallocator/utils/SizeClasses.java | 146 ++++++++++++
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 .../apache/iotdb/commons/conf/CommonConfig.java    |  50 ++++
 .../iotdb/commons/conf/CommonDescriptor.java       |  25 ++
 .../service/metric/JvmGcMonitorMetrics.java        |   6 +
 .../iotdb/commons/service/metric/enums/Metric.java |   1 +
 .../binaryallocator/BinaryAllocatorTest.java       | 139 +++++++++++
 17 files changed, 1375 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 37490ec97f2..b652acf7329 100755
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.conf;
 
+import org.apache.iotdb.commons.binaryallocator.BinaryAllocator;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.ConfigurationFileUtils;
@@ -2849,6 +2850,26 @@ public class IoTDBDescriptor {
 
       // update retry config
       commonDescriptor.loadRetryProperties(properties);
+
+      // update binary allocator
+      commonDescriptor
+          .getConfig()
+          .setEnableBinaryAllocator(
+              Boolean.parseBoolean(
+                  Optional.ofNullable(
+                          properties.getProperty(
+                              "enable_binary_allocator",
+                              
ConfigurationFileUtils.getConfigurationDefaultValue(
+                                  "enable_binary_allocator")))
+                      .map(String::trim)
+                      .orElse(
+                          ConfigurationFileUtils.getConfigurationDefaultValue(
+                              "enable_binary_allocator"))));
+      if (commonDescriptor.getConfig().isEnableBinaryAllocator()) {
+        BinaryAllocator.getInstance().start();
+      } else {
+        BinaryAllocator.getInstance().close(true);
+      }
     } catch (Exception e) {
       if (e instanceof InterruptedException) {
         Thread.currentThread().interrupt();
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index cdcd5a8571a..103f26df3ce 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1493,6 +1493,40 @@ data_region_iot_max_memory_ratio_for_queue = 0.6
 # Datatype: long
 region_migration_speed_limit_bytes_per_second = 33554432
 
+####################
+### Blob Allocator Configuration
+####################
+# Whether to enable binary allocator.
+# For scenarios where large binary streams cause severe GC, enabling this 
parameter significantly improves performance.
+# effectiveMode: hot_reload
+enable_binary_allocator=true
+
+# The size boundaries that allocator is responsible for
+# lower boundary for allocation size
+# unit: bytes
+# Datatype: int
+# effectiveMode: restart
+small_binary_object=4096
+
+# The size boundaries that allocator is responsible for
+# upper boundary for allocation size
+# unit: bytes
+# Datatype: int
+# effectiveMode: restart
+huge_binary_object=1048576
+
+# Number of arena regions in blob allocator, used to control concurrent 
performance
+# Datatype: int
+# effectiveMode: restart
+arena_num=4
+
+# Control the number of slabs in allocator
+# The number of different sizes in each power-of-2 interval is 
2^LOG2_SIZE_CLASS_GROUP
+# For example: if LOG2_SIZE_CLASS_GROUP=3, between 1024-2048 there will be 8 
different sizes
+# Datatype: int
+# effectiveMode: restart
+log2_size_class_group=3
+
 ####################
 ### TsFile Configurations
 ####################
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java
new file mode 100644
index 00000000000..3193fd6abd2
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator;
+
+import org.apache.iotdb.commons.binaryallocator.arena.Arena;
+import org.apache.iotdb.commons.binaryallocator.arena.ArenaStrategy;
+import org.apache.iotdb.commons.binaryallocator.config.AllocatorConfig;
+import org.apache.iotdb.commons.binaryallocator.evictor.Evictor;
+import org.apache.iotdb.commons.binaryallocator.metric.BinaryAllocatorMetrics;
+import org.apache.iotdb.commons.binaryallocator.utils.SizeClasses;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.TestOnly;
+
+import org.apache.tsfile.utils.PooledBinary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class BinaryAllocator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BinaryAllocator.class);
+
+  private final Arena[] heapArenas;
+  private final AllocatorConfig allocatorConfig;
+
+  private final ArenaStrategy arenaStrategy = new LeastUsedArenaStrategy();
+  private final AtomicReference<BinaryAllocatorState> state =
+      new AtomicReference<>(BinaryAllocatorState.UNINITIALIZED);
+
+  private final BinaryAllocatorMetrics metrics;
+  private Evictor sampleEvictor;
+  private static final ThreadLocal<ThreadArenaRegistry> arenaRegistry =
+      ThreadLocal.withInitial(ThreadArenaRegistry::new);
+
+  private static final int WARNING_GC_TIME_PERCENTAGE = 10;
+  private static final int HALF_GC_TIME_PERCENTAGE = 20;
+  private static final int SHUTDOWN_GC_TIME_PERCENTAGE = 30;
+  private static final int RESTART_GC_TIME_PERCENTAGE = 5;
+
+  public BinaryAllocator(AllocatorConfig allocatorConfig) {
+    this.allocatorConfig = allocatorConfig;
+
+    heapArenas = new Arena[allocatorConfig.arenaNum];
+    SizeClasses sizeClasses = new SizeClasses(allocatorConfig);
+
+    for (int i = 0; i < heapArenas.length; i++) {
+      Arena arena = new Arena(this, sizeClasses, i, allocatorConfig);
+      heapArenas[i] = arena;
+    }
+
+    this.metrics = new BinaryAllocatorMetrics(this);
+
+    if (allocatorConfig.enableBinaryAllocator) {
+      start();
+    } else {
+      state.set(BinaryAllocatorState.CLOSE);
+    }
+  }
+
+  public synchronized void start() {
+    if (state.get() == BinaryAllocatorState.OPEN) {
+      return;
+    }
+
+    state.set(BinaryAllocatorState.OPEN);
+    MetricService.getInstance().addMetricSet(this.metrics);
+    sampleEvictor =
+        new SampleEvictor(
+            ThreadName.BINARY_ALLOCATOR_SAMPLE_EVICTOR.getName(),
+            allocatorConfig.durationEvictorShutdownTimeout);
+    sampleEvictor.startEvictor(allocatorConfig.durationBetweenEvictorRuns);
+  }
+
+  public synchronized void close(boolean forceClose) {
+    if (forceClose) {
+      state.set(BinaryAllocatorState.CLOSE);
+      MetricService.getInstance().removeMetricSet(this.metrics);
+    } else {
+      state.set(BinaryAllocatorState.PENDING);
+    }
+
+    sampleEvictor.stopEvictor();
+    for (Arena arena : heapArenas) {
+      arena.close();
+    }
+  }
+
+  public PooledBinary allocateBinary(int reqCapacity) {
+    if (reqCapacity < allocatorConfig.minAllocateSize
+        || reqCapacity > allocatorConfig.maxAllocateSize
+        || state.get() != BinaryAllocatorState.OPEN) {
+      return new PooledBinary(new byte[reqCapacity]);
+    }
+
+    Arena arena = arenaStrategy.choose(heapArenas);
+
+    return new PooledBinary(arena.allocate(reqCapacity), reqCapacity, 
arena.getArenaID());
+  }
+
+  public void deallocateBinary(PooledBinary binary) {
+    if (binary != null
+        && binary.getLength() >= allocatorConfig.minAllocateSize
+        && binary.getLength() <= allocatorConfig.maxAllocateSize
+        && state.get() == BinaryAllocatorState.OPEN) {
+      int arenaIndex = binary.getArenaIndex();
+      if (arenaIndex != -1) {
+        Arena arena = heapArenas[arenaIndex];
+        arena.deallocate(binary.getValues());
+      }
+    }
+  }
+
+  public long getTotalUsedMemory() {
+    long totalUsedMemory = 0;
+    for (Arena arena : heapArenas) {
+      totalUsedMemory += arena.getTotalUsedMemory();
+    }
+    return totalUsedMemory;
+  }
+
+  public long getTotalActiveMemory() {
+    long totalActiveMemory = 0;
+    for (Arena arena : heapArenas) {
+      totalActiveMemory += arena.getActiveMemory();
+    }
+    return totalActiveMemory;
+  }
+
+  @TestOnly
+  public void resetArenaBinding() {
+    arenaRegistry.get().unbindArena();
+  }
+
+  public BinaryAllocatorMetrics getMetrics() {
+    return metrics;
+  }
+
+  private void evict(double ratio) {
+    for (Arena arena : heapArenas) {
+      arena.evict(ratio);
+    }
+  }
+
+  public static BinaryAllocator getInstance() {
+    return BinaryAllocatorHolder.INSTANCE;
+  }
+
+  private static class BinaryAllocatorHolder {
+    private static final BinaryAllocator INSTANCE =
+        new BinaryAllocator(AllocatorConfig.DEFAULT_CONFIG);
+  }
+
+  private static class ThreadArenaRegistry {
+    private Arena threadArenaBinding = null;
+
+    public Arena getArena() {
+      return threadArenaBinding;
+    }
+
+    public void bindArena(Arena arena) {
+      threadArenaBinding = arena;
+      arena.incRegisteredThread();
+    }
+
+    public void unbindArena() {
+      Arena arena = threadArenaBinding;
+      if (arena != null) {
+        arena.decRegisteredThread();
+        threadArenaBinding = null;
+      }
+    }
+
+    @Override
+    protected void finalize() {
+      unbindArena();
+    }
+  }
+
+  private static class LeastUsedArenaStrategy implements ArenaStrategy {
+    @Override
+    public Arena choose(Arena[] arenas) {
+      Arena boundArena = arenaRegistry.get().getArena();
+      if (boundArena != null) {
+        return boundArena;
+      }
+
+      Arena minArena = arenas[0];
+
+      for (int i = 1; i < arenas.length; i++) {
+        Arena arena = arenas[i];
+        if (arena.getNumRegisteredThread() < 
minArena.getNumRegisteredThread()) {
+          minArena = arena;
+        }
+      }
+
+      arenaRegistry.get().bindArena(minArena);
+      return minArena;
+    }
+  }
+
+  public void runGcEviction(long curGcTimePercent) {
+    if (state.get() == BinaryAllocatorState.CLOSE) {
+      return;
+    }
+
+    LOGGER.debug("Binary allocator running GC eviction");
+    if (state.get() == BinaryAllocatorState.PENDING) {
+      if (curGcTimePercent <= RESTART_GC_TIME_PERCENTAGE) {
+        start();
+      }
+      return;
+    }
+
+    if (curGcTimePercent > SHUTDOWN_GC_TIME_PERCENTAGE) {
+      LOGGER.info(
+          "Binary allocator is shutting down because of high GC time 
percentage {}%.",
+          curGcTimePercent);
+      evict(1.0);
+      close(false);
+    } else if (curGcTimePercent > HALF_GC_TIME_PERCENTAGE) {
+      evict(0.5);
+    } else if (curGcTimePercent > WARNING_GC_TIME_PERCENTAGE) {
+      evict(0.2);
+    }
+  }
+
+  public class SampleEvictor extends Evictor {
+
+    public SampleEvictor(String name, Duration evictorShutdownTimeoutDuration) 
{
+      super(name, evictorShutdownTimeoutDuration);
+    }
+
+    @Override
+    public void run() {
+      for (Arena arena : heapArenas) {
+        arena.runSampleEviction();
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorState.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorState.java
new file mode 100644
index 00000000000..6812bc84b11
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator;
+
+/**
+ * The state transmission of a binary allocator.
+ *
+ * <pre>
+ *     ----------------------------------------
+ *     |                                      |
+ *     |              ----------              |
+ *     |              |        |              |
+ *     |              v        |              v
+ * UNINITIALIZED --> OPEN ---> PENDING -->  CLOSE
+ *                    ^                       ^
+ *                    |                       |
+ *                    -------------------------
+ * </pre>
+ *
+ * State Transition Logic:
+ *
+ * <ul>
+ *   <li><b>UNINITIALIZED -> CLOSE</b>: When enable_binary_allocator = false
+ *   <li><b>UNINITIALIZED -> OPEN</b>: When enable_binary_allocator = true
+ *   <li><b>OPEN -> CLOSE</b>: When enable_binary_allocator is hot reload to 
false
+ *   <li><b>CLOSE -> OPEN</b>: When enable_binary_allocator is hot reload to 
true
+ *   <li><b>PENDING -> CLOSE</b>: When enable_binary_allocator is hot reload 
to false
+ *   <li><b>OPEN -> PENDING</b>: When in OPEN state and GC time percentage 
exceeds 30%, indicating
+ *       allocator ineffectiveness
+ *   <li><b>PENDING -> OPEN</b>: When GC time percentage drops below 5%, 
returning to normal state
+ *       and re-enabling the allocator. Or when enable_binary_allocator is hot 
reload to true.
+ * </ul>
+ */
+public enum BinaryAllocatorState {
+  /** Binary allocator is open for allocation. */
+  OPEN,
+
+  /** Binary allocator is close. All allocations are from the JVM heap. */
+  CLOSE,
+
+  /**
+   * Binary allocator is temporarily closed by GC evictor. All allocations are 
from the JVM heap.
+   * Allocator can be restarted afterward.
+   */
+  PENDING,
+
+  /** The initial state of the allocator. */
+  UNINITIALIZED;
+
+  @Override
+  public String toString() {
+    return name();
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/Arena.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/Arena.java
new file mode 100644
index 00000000000..cfb83ed0708
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/Arena.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator.arena;
+
+import org.apache.iotdb.commons.binaryallocator.BinaryAllocator;
+import org.apache.iotdb.commons.binaryallocator.config.AllocatorConfig;
+import org.apache.iotdb.commons.binaryallocator.ema.AdaptiveWeightedAverage;
+import org.apache.iotdb.commons.binaryallocator.utils.SizeClasses;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Arena {
+
+  private static final int EVICT_SAMPLE_COUNT = 100;
+
+  private final BinaryAllocator binaryAllocator;
+  private final SizeClasses sizeClasses;
+  private final int arenaID;
+  private final AtomicInteger numRegisteredThread;
+  private final SlabRegion[] regions;
+
+  private int sampleCount;
+
+  public Arena(
+      BinaryAllocator allocator, SizeClasses sizeClasses, int id, 
AllocatorConfig allocatorConfig) {
+    this.binaryAllocator = allocator;
+    this.sizeClasses = sizeClasses;
+    this.arenaID = id;
+    this.numRegisteredThread = new AtomicInteger(0);
+    regions = new SlabRegion[sizeClasses.getSizeClassNum()];
+
+    for (int i = 0; i < regions.length; i++) {
+      regions[i] = new SlabRegion(sizeClasses.sizeIdx2size(i), 
allocatorConfig);
+    }
+
+    sampleCount = 0;
+  }
+
+  public int getArenaID() {
+    return arenaID;
+  }
+
+  public byte[] allocate(int reqCapacity) {
+    final int sizeIdx = sizeClasses.size2SizeIdx(reqCapacity);
+    return regions[sizeIdx].allocate();
+  }
+
+  public void deallocate(byte[] bytes) {
+    final int sizeIdx = sizeClasses.size2SizeIdx(bytes.length);
+    regions[sizeIdx].deallocate(bytes);
+  }
+
+  public void evict(double ratio) {
+    for (SlabRegion region : regions) {
+      region.evict(ratio);
+    }
+  }
+
+  public void close() {
+    sampleCount = 0;
+    for (SlabRegion region : regions) {
+      region.close();
+    }
+  }
+
+  public long getTotalUsedMemory() {
+    long totalUsedMemory = 0;
+    for (SlabRegion region : regions) {
+      totalUsedMemory += region.getTotalUsedMemory();
+    }
+    return totalUsedMemory;
+  }
+
+  public long getActiveMemory() {
+    long totalActiveMemory = 0;
+    for (SlabRegion region : regions) {
+      totalActiveMemory += region.getActiveUsedMemory();
+    }
+    return totalActiveMemory;
+  }
+
+  public int getNumRegisteredThread() {
+    return numRegisteredThread.get();
+  }
+
+  public void incRegisteredThread() {
+    this.numRegisteredThread.incrementAndGet();
+  }
+
+  public void decRegisteredThread() {
+    this.numRegisteredThread.decrementAndGet();
+  }
+
+  public void runSampleEviction() {
+    // update metric
+    int allocateFromSlabDelta = 0;
+    int allocateFromJVMDelta = 0;
+    for (SlabRegion region : regions) {
+      allocateFromSlabDelta +=
+          region.byteArraySize * (region.allocationsFromAllocator.get() - 
region.prevAllocations);
+      region.prevAllocations = region.allocationsFromAllocator.get();
+      allocateFromJVMDelta +=
+          region.byteArraySize * (region.allocationsFromJVM.get() - 
region.prevAllocationsFromJVM);
+      region.prevAllocationsFromJVM = region.allocationsFromJVM.get();
+    }
+    binaryAllocator.getMetrics().updateCounter(allocateFromSlabDelta, 
allocateFromJVMDelta);
+
+    // Start sampling
+    for (SlabRegion region : regions) {
+      region.updateSample();
+    }
+
+    sampleCount++;
+    if (sampleCount == EVICT_SAMPLE_COUNT) {
+      // Evict
+      for (SlabRegion region : regions) {
+        region.resize();
+      }
+      sampleCount = 0;
+    }
+  }
+
+  private static class SlabRegion {
+    private final int byteArraySize;
+    private final ConcurrentLinkedQueue<byte[]> queue;
+
+    private final AtomicInteger allocationsFromAllocator;
+    private final AtomicInteger allocationsFromJVM;
+    private final AtomicInteger deAllocationsToAllocator;
+    private final AtomicInteger evictions;
+
+    public int prevAllocations;
+    public int prevAllocationsFromJVM;
+    AdaptiveWeightedAverage average;
+
+    SlabRegion(int byteArraySize, AllocatorConfig allocatorConfig) {
+      this.byteArraySize = byteArraySize;
+      this.average = new 
AdaptiveWeightedAverage(allocatorConfig.arenaPredictionWeight);
+      queue = new ConcurrentLinkedQueue<>();
+      allocationsFromAllocator = new AtomicInteger(0);
+      allocationsFromJVM = new AtomicInteger(0);
+      deAllocationsToAllocator = new AtomicInteger(0);
+      evictions = new AtomicInteger(0);
+      prevAllocations = 0;
+      prevAllocationsFromJVM = 0;
+    }
+
+    public final byte[] allocate() {
+      byte[] bytes = queue.poll();
+      if (bytes == null) {
+        allocationsFromJVM.incrementAndGet();
+        return new byte[this.byteArraySize];
+      }
+      allocationsFromAllocator.incrementAndGet();
+      return bytes;
+    }
+
+    public void deallocate(byte[] bytes) {
+      deAllocationsToAllocator.incrementAndGet();
+      queue.add(bytes);
+    }
+
+    private void updateSample() {
+      average.sample(getActiveSize());
+    }
+
+    private void resize() {
+      average.update();
+      int needRemain = (int) Math.ceil(average.average()) - getActiveSize();
+      evict(getQueueSize() - needRemain);
+    }
+
+    private void evict(double ratio) {
+      evict((int) (getQueueSize() * ratio));
+    }
+
+    private void evict(int num) {
+      while (num > 0 && !queue.isEmpty()) {
+        queue.poll();
+        evictions.incrementAndGet();
+        num--;
+      }
+    }
+
+    private long getTotalUsedMemory() {
+      return (long) byteArraySize * getQueueSize();
+    }
+
+    private long getActiveUsedMemory() {
+      return (long) byteArraySize * getActiveSize();
+    }
+
+    // ConcurrentLinkedQueue::size() is O(n)
+    private int getQueueSize() {
+      return deAllocationsToAllocator.get() - allocationsFromAllocator.get() - 
evictions.get();
+    }
+
+    private int getActiveSize() {
+      return allocationsFromAllocator.get()
+          + allocationsFromJVM.get()
+          - deAllocationsToAllocator.get();
+    }
+
+    private void close() {
+      queue.clear();
+      allocationsFromAllocator.set(0);
+      allocationsFromJVM.set(0);
+      deAllocationsToAllocator.set(0);
+      evictions.set(0);
+      prevAllocations = 0;
+      prevAllocationsFromJVM = 0;
+      average.clear();
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/ArenaStrategy.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/ArenaStrategy.java
new file mode 100644
index 00000000000..ec4af3fbe0a
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/ArenaStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator.arena;
+
+/**
+ * This interface defines a strategy for choosing a {@link Arena} from an 
array of {@link Arena}s.
+ * Implementations of this interface can provide various strategies for 
selection based on specific
+ * criteria.
+ */
+public interface ArenaStrategy {
+  /**
+   * Chooses a {@link Arena} from the given array of {@link Arena}s.
+   *
+   * @param arenas an array of {@link Arena}s to choose from, should not be 
null or length == 0
+   * @return the selected {@link Arena}
+   */
+  Arena choose(Arena[] arenas);
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/config/AllocatorConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/config/AllocatorConfig.java
new file mode 100644
index 00000000000..53f20ac0da1
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/config/AllocatorConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator.config;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+
+import java.time.Duration;
+
+public class AllocatorConfig {
+
+  public int minAllocateSize = 
CommonDescriptor.getInstance().getConfig().getMinAllocateSize();
+
+  public int maxAllocateSize = 
CommonDescriptor.getInstance().getConfig().getMaxAllocateSize();
+
+  public int arenaNum = 
CommonDescriptor.getInstance().getConfig().getArenaNum();
+
+  public int log2ClassSizeGroup =
+      CommonDescriptor.getInstance().getConfig().getLog2SizeClassGroup();
+
+  public boolean enableBinaryAllocator =
+      CommonDescriptor.getInstance().getConfig().isEnableBinaryAllocator();
+
+  /** Maximum wait time in milliseconds when shutting down the evictor */
+  public Duration durationEvictorShutdownTimeout = Duration.ofMillis(1000L);
+
+  /** Time interval in milliseconds between two consecutive evictor runs */
+  public Duration durationBetweenEvictorRuns = Duration.ofMillis(1000L);
+
+  public int arenaPredictionWeight = 35;
+
+  public static final AllocatorConfig DEFAULT_CONFIG = new AllocatorConfig();
+
+  public void setTimeBetweenEvictorRunsMillis(long time) {
+    this.durationBetweenEvictorRuns = Duration.ofMillis(time);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/ema/AdaptiveWeightedAverage.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/ema/AdaptiveWeightedAverage.java
new file mode 100644
index 00000000000..201f2f63ae1
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/ema/AdaptiveWeightedAverage.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator.ema;
+
+import static java.lang.Math.max;
+
+/**
+ * This file is modified from <a
+ * 
href="https://github.com/openjdk/jdk17/blob/master/src/hotspot/share/gc/shared/gcUtil.hpp";>JDK17
+ * AdaptiveWeightedAverage</a>. But some necessary modifications are made to 
adapt to the usage of
+ * binary allocator:
+ *
+ * <p>Adaptive weighted average implementation for memory allocation tracking. 
During each eviction
+ * cycle, records the peak memory allocation size via sampling, then uses this 
peak to calculate a
+ * weighted moving average.
+ */
+public class AdaptiveWeightedAverage {
+
+  private static final int OLD_THRESHOLD = 100;
+
+  private float average;
+  private int sampleCount;
+  private int tmpMaxSample;
+  private final int weight;
+  private boolean isOld; // Enable to have enough historical data
+
+  public AdaptiveWeightedAverage(int weight) {
+    this.weight = weight;
+    average = 0f;
+    sampleCount = 0;
+    tmpMaxSample = 0;
+  }
+
+  public void sample(int newSample) {
+    tmpMaxSample = max(tmpMaxSample, newSample);
+  }
+
+  // called at the end of each eviction cycle
+  public void update() {
+    incrementCount();
+
+    // Compute the new weighted average
+    int newSample = tmpMaxSample;
+    tmpMaxSample = 0;
+    average = computeAdaptiveAverage(newSample, average);
+  }
+
+  public float average() {
+    return average;
+  }
+
+  public void clear() {
+    average = 0f;
+    sampleCount = 0;
+    tmpMaxSample = 0;
+    isOld = false;
+  }
+
+  void incrementCount() {
+    sampleCount++;
+
+    if (!isOld && sampleCount > OLD_THRESHOLD) {
+      isOld = true;
+    }
+  }
+
+  float computeAdaptiveAverage(int newSample, float average) {
+    // We smooth the samples by not using weight() directly until we've
+    // had enough data to make it meaningful. We'd like the first weight
+    // used to be 1, the second to be 1/2, etc until we have
+    // OLD_THRESHOLD/weight samples.
+    int countWeight = 0;
+
+    // Avoid division by zero if the counter wraps
+    if (!isOld) {
+      countWeight = OLD_THRESHOLD / sampleCount;
+    }
+
+    int adaptiveWeight = max(weight, countWeight);
+
+    return (100.0f - adaptiveWeight) * average / 100.0f + adaptiveWeight * 
newSample / 100.0f;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/evictor/Evictor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/evictor/Evictor.java
new file mode 100644
index 00000000000..686e7e73d8f
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/evictor/Evictor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator.evictor;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public abstract class Evictor implements Runnable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(Evictor.class);
+
+  private ScheduledFuture<?> scheduledFuture;
+  private final String name;
+  private final Duration evictorShutdownTimeoutDuration;
+
+  private ScheduledExecutorService executor;
+
+  public Evictor(String name, Duration evictorShutdownTimeoutDuration) {
+    this.name = name;
+    this.evictorShutdownTimeoutDuration = evictorShutdownTimeoutDuration;
+  }
+
+  /** Cancels the scheduled future. */
+  void cancel() {
+    scheduledFuture.cancel(false);
+  }
+
+  @Override
+  public abstract void run();
+
+  void setScheduledFuture(final ScheduledFuture<?> scheduledFuture) {
+    this.scheduledFuture = scheduledFuture;
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getName() + " [scheduledFuture=" + scheduledFuture + "]";
+  }
+
+  public void startEvictor(final Duration delay) {
+    if (null == executor) {
+      executor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(name);
+    }
+    final ScheduledFuture<?> scheduledFuture =
+        ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+            executor, this, delay.toMillis(), delay.toMillis(), 
TimeUnit.MILLISECONDS);
+    this.setScheduledFuture(scheduledFuture);
+  }
+
+  public void stopEvictor() {
+    if (executor == null) {
+      return;
+    }
+
+    LOGGER.info("Stopping {}", name);
+
+    cancel();
+    executor.shutdown();
+    try {
+      boolean result =
+          executor.awaitTermination(
+              evictorShutdownTimeoutDuration.toMillis(), 
TimeUnit.MILLISECONDS);
+      if (!result) {
+        LOGGER.info(
+            "unable to stop evictor after {} ms", 
evictorShutdownTimeoutDuration.toMillis());
+      }
+    } catch (final InterruptedException ignored) {
+      Thread.currentThread().interrupt();
+    }
+    executor = null;
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/metric/BinaryAllocatorMetrics.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/metric/BinaryAllocatorMetrics.java
new file mode 100644
index 00000000000..475f98860ac
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/metric/BinaryAllocatorMetrics.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator.metric;
+
+import org.apache.iotdb.commons.binaryallocator.BinaryAllocator;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Counter;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class BinaryAllocatorMetrics implements IMetricSet {
+
+  private static final String TOTAL_MEMORY = "total-memory";
+  private static final String ALLOCATE_FROM_SLAB = "allocate-from-slab";
+  private static final String ALLOCATE_FROM_JVM = "allocate-from-jvm";
+  private static final String ACTIVE_MEMORY = "active-memory";
+
+  private final BinaryAllocator binaryAllocator;
+  private Counter allocateFromSlab;
+  private Counter allocateFromJVM;
+
+  public BinaryAllocatorMetrics(final BinaryAllocator binaryAllocator) {
+    this.binaryAllocator = binaryAllocator;
+  }
+
+  @Override
+  public void bindTo(AbstractMetricService metricService) {
+    metricService.createAutoGauge(
+        Metric.BINARY_ALLOCATOR.toString(),
+        MetricLevel.IMPORTANT,
+        binaryAllocator,
+        BinaryAllocator::getTotalUsedMemory,
+        Tag.NAME.toString(),
+        TOTAL_MEMORY);
+    metricService.createAutoGauge(
+        Metric.BINARY_ALLOCATOR.toString(),
+        MetricLevel.IMPORTANT,
+        binaryAllocator,
+        BinaryAllocator::getTotalActiveMemory,
+        Tag.NAME.toString(),
+        ACTIVE_MEMORY);
+    allocateFromSlab =
+        metricService.getOrCreateCounter(
+            Metric.BINARY_ALLOCATOR.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            ALLOCATE_FROM_SLAB);
+    allocateFromJVM =
+        metricService.getOrCreateCounter(
+            Metric.BINARY_ALLOCATOR.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            ALLOCATE_FROM_JVM);
+  }
+
+  @Override
+  public void unbindFrom(AbstractMetricService metricService) {
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.BINARY_ALLOCATOR.toString(),
+        Tag.NAME.toString(),
+        TOTAL_MEMORY);
+    metricService.remove(
+        MetricType.AUTO_GAUGE,
+        Metric.BINARY_ALLOCATOR.toString(),
+        Tag.NAME.toString(),
+        ACTIVE_MEMORY);
+    metricService.remove(
+        MetricType.COUNTER,
+        Metric.BINARY_ALLOCATOR.toString(),
+        Tag.NAME.toString(),
+        ALLOCATE_FROM_SLAB);
+    metricService.remove(
+        MetricType.COUNTER,
+        Metric.BINARY_ALLOCATOR.toString(),
+        Tag.NAME.toString(),
+        ALLOCATE_FROM_JVM);
+  }
+
+  public void updateCounter(int allocateFromSlabDelta, int 
allocateFromJVMDelta) {
+    allocateFromSlab.inc(allocateFromSlabDelta);
+    allocateFromJVM.inc(allocateFromJVMDelta);
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/utils/SizeClasses.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/utils/SizeClasses.java
new file mode 100644
index 00000000000..e13cc54a9f1
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/utils/SizeClasses.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator.utils;
+
+import org.apache.iotdb.commons.binaryallocator.config.AllocatorConfig;
+
+/**
+ * SizeClasses class manages different size classes of memory blocks in a 
memory allocator. It
+ * optimizes the memory allocation process by precomputing the block sizes and 
mapping them to
+ * indices.
+ */
+public final class SizeClasses {
+
+  // Integer size in bits minus 1, used for log2 calculations
+  private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
+
+  // Mapping from size class index to actual memory block size
+  private final int[] sizeIdx2sizeTab;
+
+  // Log2 value of the minimum memory block size
+  private final int log2MinSize;
+
+  // Log2 value of the size class group
+  private final int log2SizeClassGroup;
+
+  /**
+   * Constructor that initializes the size class table based on the allocator 
configuration.
+   *
+   * @param allocatorConfig The allocator configuration containing minimum and 
maximum allocation
+   *     sizes.
+   */
+  public SizeClasses(AllocatorConfig allocatorConfig) {
+    this.log2SizeClassGroup = allocatorConfig.log2ClassSizeGroup;
+    this.log2MinSize = log2(allocatorConfig.minAllocateSize);
+
+    int maxSize = allocatorConfig.maxAllocateSize;
+    int sizeClassGroupCount = log2(maxSize) - log2MinSize;
+
+    // Initialize the sizeIdx2sizeTab array based on the number of size class 
groups
+    sizeIdx2sizeTab = new int[(sizeClassGroupCount << log2SizeClassGroup) + 1];
+
+    // Calculate the size of each size class and populate the table
+    initializeSizeClasses(allocatorConfig.minAllocateSize, maxSize);
+  }
+
+  /**
+   * Returns the memory block size for a given size class index.
+   *
+   * @param sizeIdx The index of the size class.
+   * @return The memory block size corresponding to the size class index.
+   */
+  public int sizeIdx2size(int sizeIdx) {
+    return sizeIdx2sizeTab[sizeIdx];
+  }
+
+  /**
+   * Returns the size class index for a given memory block size.
+   *
+   * @param size The memory block size.
+   * @return The corresponding size class index.
+   */
+  public int size2SizeIdx(int size) {
+    int log2Size = log2((size << 1) - 1); // Calculate the approximate log2 
value
+    int shift = log2Size - log2MinSize - 1;
+
+    // Calculate the size class group
+    int group = shift << log2SizeClassGroup;
+    int log2Delta = log2Size - 1 - log2SizeClassGroup;
+
+    // Calculate the index within the size class group
+    int mod = (size - 1) >> log2Delta & (1 << log2SizeClassGroup) - 1;
+    return group + mod + 1;
+  }
+
+  /**
+   * Returns the total number of size classes.
+   *
+   * @return The total number of size classes.
+   */
+  public int getSizeClassNum() {
+    return sizeIdx2sizeTab.length;
+  }
+
+  /**
+   * Calculates the memory block size for a given log2 group, delta, and log2 
delta.
+   *
+   * @param log2Group The log2 value of the current size class group.
+   * @param delta The delta value for the size class.
+   * @param log2Delta The log2 value of the delta.
+   * @return The calculated memory block size.
+   */
+  private static int calculateSize(int log2Group, int delta, int log2Delta) {
+    return (1 << log2Group) + (delta << log2Delta);
+  }
+
+  /**
+   * Calculates the log2 value of a given integer.
+   *
+   * @param val The value to calculate the log2 for.
+   * @return The log2 value of the given integer.
+   */
+  private static int log2(int val) {
+    return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
+  }
+
+  /**
+   * Initializes the size class table by calculating the memory block sizes 
for each size class.
+   *
+   * @param minSize The minimum memory block size.
+   * @param maxSize The maximum memory block size.
+   */
+  private void initializeSizeClasses(int minSize, int maxSize) {
+    int nDeltaLimit = 1 << log2SizeClassGroup;
+    int log2Group = log2MinSize;
+    int log2Delta = log2MinSize - log2SizeClassGroup;
+
+    int sizeCount = 0;
+    int size = calculateSize(log2Group, 0, log2Delta);
+    sizeIdx2sizeTab[sizeCount++] = size; // Initial size
+
+    // Iterate through the remaining size classes and calculate their sizes
+    for (; size < maxSize; log2Group++, log2Delta++) {
+      for (int nDelta = 1; nDelta <= nDeltaLimit && size <= maxSize; nDelta++) 
{
+        size = calculateSize(log2Group, nDelta, log2Delta);
+        sizeIdx2sizeTab[sizeCount++] = size;
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index cd3416f6baf..fee09496730 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -190,6 +190,7 @@ public enum ThreadName {
   STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"),
   REPAIR_DATA("RepairData"),
   FILE_TIME_INDEX_RECORD("FileTimeIndexRecord"),
+  BINARY_ALLOCATOR_SAMPLE_EVICTOR("BinaryAllocator-SampleEvictor"),
 
   // the unknown thread name is used for metrics
   UNKOWN("UNKNOWN");
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 74314cacdfa..2d1585d6525 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -320,6 +320,16 @@ public class CommonConfig {
   private long seriesLimitThreshold = -1;
   private long deviceLimitThreshold = -1;
 
+  private boolean enableBinaryAllocator = true;
+
+  private int arenaNum = 4;
+
+  private int minAllocateSize = 4096;
+
+  private int maxAllocateSize = 1024 * 1024;
+
+  private int log2SizeClassGroup = 3;
+
   // time in nanosecond precision when starting up
   private final long startUpNanosecond = System.nanoTime();
 
@@ -1479,4 +1489,44 @@ public class CommonConfig {
   public void setRemoteWriteMaxRetryDurationInMs(long 
remoteWriteMaxRetryDurationInMs) {
     this.remoteWriteMaxRetryDurationInMs = remoteWriteMaxRetryDurationInMs;
   }
+
+  public int getArenaNum() {
+    return arenaNum;
+  }
+
+  public void setArenaNum(int arenaNum) {
+    this.arenaNum = arenaNum;
+  }
+
+  public int getMinAllocateSize() {
+    return minAllocateSize;
+  }
+
+  public void setMinAllocateSize(int minAllocateSize) {
+    this.minAllocateSize = minAllocateSize;
+  }
+
+  public int getMaxAllocateSize() {
+    return maxAllocateSize;
+  }
+
+  public void setMaxAllocateSize(int maxAllocateSize) {
+    this.maxAllocateSize = maxAllocateSize;
+  }
+
+  public boolean isEnableBinaryAllocator() {
+    return enableBinaryAllocator;
+  }
+
+  public void setEnableBinaryAllocator(boolean enableBinaryAllocator) {
+    this.enableBinaryAllocator = enableBinaryAllocator;
+  }
+
+  public int getLog2SizeClassGroup() {
+    return log2SizeClassGroup;
+  }
+
+  public void setLog2SizeClassGroup(int log2SizeClassGroup) {
+    this.log2SizeClassGroup = log2SizeClassGroup;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 26ace0305c2..6acbd732857 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -244,6 +244,7 @@ public class CommonDescriptor {
                 String.valueOf(config.getDeviceLimitThreshold()))));
 
     loadRetryProperties(properties);
+    loadBinaryAllocatorProps(properties);
   }
 
   private void loadPipeProps(TrimProperties properties) {
@@ -736,6 +737,30 @@ public class CommonDescriptor {
                     "enable_retry_for_unknown_error"))));
   }
 
+  public void loadBinaryAllocatorProps(TrimProperties properties) {
+    config.setEnableBinaryAllocator(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "enable_binary_allocator", 
Boolean.toString(config.isEnableBinaryAllocator()))));
+    config.setMinAllocateSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "small_blob_object", 
String.valueOf(config.getMinAllocateSize()))));
+    config.setMaxAllocateSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "huge_blob_object", 
String.valueOf(config.getMaxAllocateSize()))));
+    int arenaNum =
+        Integer.parseInt(properties.getProperty("arena_num", 
String.valueOf(config.getArenaNum())));
+    if (arenaNum > 0) {
+      config.setArenaNum(arenaNum);
+    }
+    config.setLog2SizeClassGroup(
+        Integer.parseInt(
+            properties.getProperty(
+                "log2_size_class_group", 
String.valueOf(config.getLog2SizeClassGroup()))));
+  }
+
   public void loadGlobalConfig(TGlobalConfig globalConfig) {
     config.setTimestampPrecision(globalConfig.timestampPrecision);
     config.setTimePartitionOrigin(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/JvmGcMonitorMetrics.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/JvmGcMonitorMetrics.java
index fd7e500808c..0dbbba5e99f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/JvmGcMonitorMetrics.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/JvmGcMonitorMetrics.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.commons.service.metric;
 
+import org.apache.iotdb.commons.binaryallocator.BinaryAllocator;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -114,9 +115,14 @@ public class JvmGcMonitorMetrics implements IMetricSet {
 
   private void scheduledMonitoring() {
     calculateGCTimePercentageWithinObservedInterval();
+
+    // Alert if necessary
     if (alertHandler != null && curData.getGcTimePercentage() > 
MAX_GC_TIME_PERCENTAGE) {
       alertHandler.alert(curData.clone());
     }
+
+    // Run GC eviction
+    BinaryAllocator.getInstance().runGcEviction(curData.getGcTimePercentage());
   }
 
   private long getTotalGCTime() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 5812fac36f2..04757610064 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -184,6 +184,7 @@ public enum Metric {
   LOAD_TIME_COST("load_time_cost"),
   LOAD_POINT_COUNT("load_point_count"),
   MEMTABLE_POINT_COUNT("memtable_point_count"),
+  BINARY_ALLOCATOR("binary_allocator"),
   ;
 
   final String value;
diff --git 
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorTest.java
 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorTest.java
new file mode 100644
index 00000000000..0fb4f0d96b5
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator;
+
+import org.apache.iotdb.commons.binaryallocator.config.AllocatorConfig;
+import org.apache.iotdb.commons.binaryallocator.utils.SizeClasses;
+
+import org.apache.tsfile.utils.PooledBinary;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class BinaryAllocatorTest {
+  @Test
+  public void testAllocateBinary() {
+    AllocatorConfig config = new AllocatorConfig();
+    config.arenaNum = 1;
+    BinaryAllocator binaryAllocator = new BinaryAllocator(config);
+    binaryAllocator.resetArenaBinding();
+
+    PooledBinary binary = binaryAllocator.allocateBinary(255);
+    assertNotNull(binary);
+    assertEquals(binary.getArenaIndex(), -1);
+    assertEquals(binary.getLength(), 255);
+    binaryAllocator.deallocateBinary(binary);
+
+    binary = binaryAllocator.allocateBinary(65536);
+    assertNotNull(binary);
+    assertEquals(binary.getArenaIndex(), 0);
+    assertEquals(binary.getLength(), 65536);
+    binaryAllocator.deallocateBinary(binary);
+
+    binary = binaryAllocator.allocateBinary(65535);
+    assertNotNull(binary);
+    assertEquals(binary.getArenaIndex(), 0);
+    assertEquals(binary.getLength(), 65535);
+    assertEquals(binary.getValues().length, 65536);
+    binaryAllocator.deallocateBinary(binary);
+  }
+
+  @Test
+  public void testStrategy() throws InterruptedException {
+    BinaryAllocator binaryAllocator = new 
BinaryAllocator(AllocatorConfig.DEFAULT_CONFIG);
+    binaryAllocator.resetArenaBinding();
+
+    PooledBinary binary1 = binaryAllocator.allocateBinary(4096);
+    PooledBinary binary2 = binaryAllocator.allocateBinary(4096);
+    assertEquals(binary1.getArenaIndex(), binary2.getArenaIndex());
+    binaryAllocator.deallocateBinary(binary1);
+    binaryAllocator.deallocateBinary(binary2);
+
+    int threadCount = 4;
+    CountDownLatch latch = new CountDownLatch(threadCount);
+    Map<Integer, Integer> arenaUsageCount = new ConcurrentHashMap<>();
+    for (int i = 0; i < threadCount; i++) {
+      Thread thread =
+          new Thread(
+              () -> {
+                try {
+                  PooledBinary firstBinary = 
binaryAllocator.allocateBinary(2048);
+                  int arenaId = firstBinary.getArenaIndex();
+                  arenaUsageCount.merge(arenaId, 1, Integer::sum);
+                  binaryAllocator.deallocateBinary(firstBinary);
+                } finally {
+                  latch.countDown();
+                }
+              });
+      thread.start();
+    }
+
+    latch.await();
+    int maxUsage = Collections.max(arenaUsageCount.values());
+    int minUsage = Collections.min(arenaUsageCount.values());
+    assertEquals(maxUsage, minUsage);
+  }
+
+  @Test
+  public void testEviction() throws InterruptedException {
+    AllocatorConfig config = new AllocatorConfig();
+    config.arenaNum = 1;
+    config.minAllocateSize = config.maxAllocateSize = 4096;
+    config.setTimeBetweenEvictorRunsMillis(1);
+    BinaryAllocator binaryAllocator = new BinaryAllocator(config);
+    binaryAllocator.resetArenaBinding();
+
+    PooledBinary binary = binaryAllocator.allocateBinary(4096);
+    binaryAllocator.deallocateBinary(binary);
+    assertEquals(binaryAllocator.getTotalUsedMemory(), 4096);
+    Thread.sleep(200);
+    assertEquals(binaryAllocator.getTotalUsedMemory(), 0);
+  }
+
+  @Test
+  public void testSizeMapping() {
+    AllocatorConfig config = new AllocatorConfig();
+    config.minAllocateSize = 4096;
+    config.maxAllocateSize = 65536;
+    SizeClasses sizeClasses = new SizeClasses(config);
+
+    assertEquals(sizeClasses.getSizeClassNum(), 33);
+    int[] testSizes = {4607, 8191, 16383, 32767, 65535};
+
+    for (int size : testSizes) {
+      int sizeIdx = sizeClasses.size2SizeIdx(size);
+      int mappedSize = sizeClasses.sizeIdx2size(sizeIdx);
+
+      assertEquals("Mapped size should be >= original size", mappedSize, size 
+ 1);
+
+      if (sizeIdx > 0) {
+        int previousSize = sizeClasses.sizeIdx2size(sizeIdx - 1);
+        assertTrue("Previous size should be < original size", previousSize < 
size);
+      }
+    }
+  }
+}

Reply via email to