This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fd105e030d6 Add Binary Allocator (#14321)
fd105e030d6 is described below
commit fd105e030d6bd754e767590b5c207557c4751e07
Author: Potato <[email protected]>
AuthorDate: Thu Dec 5 13:21:48 2024 +0800
Add Binary Allocator (#14321)
* add binary allocator
* fix
* add gc evictor and hot load configuration
* fix
* fix
* add license
* resolve comments
* add comments
* modified adaptive weighted average
* fix unit test
* add necessary comments
* restruct package
Signed-off-by: OneSizeFitQuorum <[email protected]>
* restruct SizeClasses
Signed-off-by: OneSizeFitQuorum <[email protected]>
* enhance
Signed-off-by: OneSizeFitQuorum <[email protected]>
* refactor eviction thread and remove evict timer
* almost finish
Signed-off-by: OneSizeFitQuorum <[email protected]>
* start binary allocator with hot loading
* add config check
* fix compile error
* enhance
Signed-off-by: OneSizeFitQuorum <[email protected]>
* Remove duplicate test
Signed-off-by: OneSizeFitQuorum <[email protected]>
---------
Signed-off-by: OneSizeFitQuorum <[email protected]>
Co-authored-by: MrQuansy <[email protected]>
---
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 21 ++
.../conf/iotdb-system.properties.template | 34 +++
.../commons/binaryallocator/BinaryAllocator.java | 260 +++++++++++++++++++++
.../binaryallocator/BinaryAllocatorState.java | 71 ++++++
.../iotdb/commons/binaryallocator/arena/Arena.java | 233 ++++++++++++++++++
.../binaryallocator/arena/ArenaStrategy.java | 35 +++
.../binaryallocator/config/AllocatorConfig.java | 53 +++++
.../ema/AdaptiveWeightedAverage.java | 100 ++++++++
.../commons/binaryallocator/evictor/Evictor.java | 96 ++++++++
.../metric/BinaryAllocatorMetrics.java | 104 +++++++++
.../commons/binaryallocator/utils/SizeClasses.java | 146 ++++++++++++
.../iotdb/commons/concurrent/ThreadName.java | 1 +
.../apache/iotdb/commons/conf/CommonConfig.java | 50 ++++
.../iotdb/commons/conf/CommonDescriptor.java | 25 ++
.../service/metric/JvmGcMonitorMetrics.java | 6 +
.../iotdb/commons/service/metric/enums/Metric.java | 1 +
.../binaryallocator/BinaryAllocatorTest.java | 139 +++++++++++
17 files changed, 1375 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 37490ec97f2..b652acf7329 100755
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.conf;
+import org.apache.iotdb.commons.binaryallocator.BinaryAllocator;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.ConfigurationFileUtils;
@@ -2849,6 +2850,26 @@ public class IoTDBDescriptor {
// update retry config
commonDescriptor.loadRetryProperties(properties);
+
+ // update binary allocator
+ commonDescriptor
+ .getConfig()
+ .setEnableBinaryAllocator(
+ Boolean.parseBoolean(
+ Optional.ofNullable(
+ properties.getProperty(
+ "enable_binary_allocator",
+
ConfigurationFileUtils.getConfigurationDefaultValue(
+ "enable_binary_allocator")))
+ .map(String::trim)
+ .orElse(
+ ConfigurationFileUtils.getConfigurationDefaultValue(
+ "enable_binary_allocator"))));
+ if (commonDescriptor.getConfig().isEnableBinaryAllocator()) {
+ BinaryAllocator.getInstance().start();
+ } else {
+ BinaryAllocator.getInstance().close(true);
+ }
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index cdcd5a8571a..103f26df3ce 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1493,6 +1493,40 @@ data_region_iot_max_memory_ratio_for_queue = 0.6
# Datatype: long
region_migration_speed_limit_bytes_per_second = 33554432
+####################
+### Blob Allocator Configuration
+####################
+# Whether to enable binary allocator.
+# For scenarios where large binary streams cause severe GC, enabling this
parameter significantly improves performance.
+# effectiveMode: hot_reload
+enable_binary_allocator=true
+
+# The size boundaries that allocator is responsible for
+# lower boundary for allocation size
+# unit: bytes
+# Datatype: int
+# effectiveMode: restart
+small_binary_object=4096
+
+# The size boundaries that allocator is responsible for
+# upper boundary for allocation size
+# unit: bytes
+# Datatype: int
+# effectiveMode: restart
+huge_binary_object=1048576
+
+# Number of arena regions in blob allocator, used to control concurrent
performance
+# Datatype: int
+# effectiveMode: restart
+arena_num=4
+
+# Control the number of slabs in allocator
+# The number of different sizes in each power-of-2 interval is
2^LOG2_SIZE_CLASS_GROUP
+# For example: if LOG2_SIZE_CLASS_GROUP=3, between 1024-2048 there will be 8
different sizes
+# Datatype: int
+# effectiveMode: restart
+log2_size_class_group=3
+
####################
### TsFile Configurations
####################
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java
new file mode 100644
index 00000000000..3193fd6abd2
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator;
+
+import org.apache.iotdb.commons.binaryallocator.arena.Arena;
+import org.apache.iotdb.commons.binaryallocator.arena.ArenaStrategy;
+import org.apache.iotdb.commons.binaryallocator.config.AllocatorConfig;
+import org.apache.iotdb.commons.binaryallocator.evictor.Evictor;
+import org.apache.iotdb.commons.binaryallocator.metric.BinaryAllocatorMetrics;
+import org.apache.iotdb.commons.binaryallocator.utils.SizeClasses;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.TestOnly;
+
+import org.apache.tsfile.utils.PooledBinary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class BinaryAllocator {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BinaryAllocator.class);
+
+ private final Arena[] heapArenas;
+ private final AllocatorConfig allocatorConfig;
+
+ private final ArenaStrategy arenaStrategy = new LeastUsedArenaStrategy();
+ private final AtomicReference<BinaryAllocatorState> state =
+ new AtomicReference<>(BinaryAllocatorState.UNINITIALIZED);
+
+ private final BinaryAllocatorMetrics metrics;
+ private Evictor sampleEvictor;
+ private static final ThreadLocal<ThreadArenaRegistry> arenaRegistry =
+ ThreadLocal.withInitial(ThreadArenaRegistry::new);
+
+ private static final int WARNING_GC_TIME_PERCENTAGE = 10;
+ private static final int HALF_GC_TIME_PERCENTAGE = 20;
+ private static final int SHUTDOWN_GC_TIME_PERCENTAGE = 30;
+ private static final int RESTART_GC_TIME_PERCENTAGE = 5;
+
+ public BinaryAllocator(AllocatorConfig allocatorConfig) {
+ this.allocatorConfig = allocatorConfig;
+
+ heapArenas = new Arena[allocatorConfig.arenaNum];
+ SizeClasses sizeClasses = new SizeClasses(allocatorConfig);
+
+ for (int i = 0; i < heapArenas.length; i++) {
+ Arena arena = new Arena(this, sizeClasses, i, allocatorConfig);
+ heapArenas[i] = arena;
+ }
+
+ this.metrics = new BinaryAllocatorMetrics(this);
+
+ if (allocatorConfig.enableBinaryAllocator) {
+ start();
+ } else {
+ state.set(BinaryAllocatorState.CLOSE);
+ }
+ }
+
+ public synchronized void start() {
+ if (state.get() == BinaryAllocatorState.OPEN) {
+ return;
+ }
+
+ state.set(BinaryAllocatorState.OPEN);
+ MetricService.getInstance().addMetricSet(this.metrics);
+ sampleEvictor =
+ new SampleEvictor(
+ ThreadName.BINARY_ALLOCATOR_SAMPLE_EVICTOR.getName(),
+ allocatorConfig.durationEvictorShutdownTimeout);
+ sampleEvictor.startEvictor(allocatorConfig.durationBetweenEvictorRuns);
+ }
+
+ public synchronized void close(boolean forceClose) {
+ if (forceClose) {
+ state.set(BinaryAllocatorState.CLOSE);
+ MetricService.getInstance().removeMetricSet(this.metrics);
+ } else {
+ state.set(BinaryAllocatorState.PENDING);
+ }
+
+ sampleEvictor.stopEvictor();
+ for (Arena arena : heapArenas) {
+ arena.close();
+ }
+ }
+
+ public PooledBinary allocateBinary(int reqCapacity) {
+ if (reqCapacity < allocatorConfig.minAllocateSize
+ || reqCapacity > allocatorConfig.maxAllocateSize
+ || state.get() != BinaryAllocatorState.OPEN) {
+ return new PooledBinary(new byte[reqCapacity]);
+ }
+
+ Arena arena = arenaStrategy.choose(heapArenas);
+
+ return new PooledBinary(arena.allocate(reqCapacity), reqCapacity,
arena.getArenaID());
+ }
+
+ public void deallocateBinary(PooledBinary binary) {
+ if (binary != null
+ && binary.getLength() >= allocatorConfig.minAllocateSize
+ && binary.getLength() <= allocatorConfig.maxAllocateSize
+ && state.get() == BinaryAllocatorState.OPEN) {
+ int arenaIndex = binary.getArenaIndex();
+ if (arenaIndex != -1) {
+ Arena arena = heapArenas[arenaIndex];
+ arena.deallocate(binary.getValues());
+ }
+ }
+ }
+
+ public long getTotalUsedMemory() {
+ long totalUsedMemory = 0;
+ for (Arena arena : heapArenas) {
+ totalUsedMemory += arena.getTotalUsedMemory();
+ }
+ return totalUsedMemory;
+ }
+
+ public long getTotalActiveMemory() {
+ long totalActiveMemory = 0;
+ for (Arena arena : heapArenas) {
+ totalActiveMemory += arena.getActiveMemory();
+ }
+ return totalActiveMemory;
+ }
+
+ @TestOnly
+ public void resetArenaBinding() {
+ arenaRegistry.get().unbindArena();
+ }
+
+ public BinaryAllocatorMetrics getMetrics() {
+ return metrics;
+ }
+
+ private void evict(double ratio) {
+ for (Arena arena : heapArenas) {
+ arena.evict(ratio);
+ }
+ }
+
+ public static BinaryAllocator getInstance() {
+ return BinaryAllocatorHolder.INSTANCE;
+ }
+
+ private static class BinaryAllocatorHolder {
+ private static final BinaryAllocator INSTANCE =
+ new BinaryAllocator(AllocatorConfig.DEFAULT_CONFIG);
+ }
+
+ private static class ThreadArenaRegistry {
+ private Arena threadArenaBinding = null;
+
+ public Arena getArena() {
+ return threadArenaBinding;
+ }
+
+ public void bindArena(Arena arena) {
+ threadArenaBinding = arena;
+ arena.incRegisteredThread();
+ }
+
+ public void unbindArena() {
+ Arena arena = threadArenaBinding;
+ if (arena != null) {
+ arena.decRegisteredThread();
+ threadArenaBinding = null;
+ }
+ }
+
+ @Override
+ protected void finalize() {
+ unbindArena();
+ }
+ }
+
+ private static class LeastUsedArenaStrategy implements ArenaStrategy {
+ @Override
+ public Arena choose(Arena[] arenas) {
+ Arena boundArena = arenaRegistry.get().getArena();
+ if (boundArena != null) {
+ return boundArena;
+ }
+
+ Arena minArena = arenas[0];
+
+ for (int i = 1; i < arenas.length; i++) {
+ Arena arena = arenas[i];
+ if (arena.getNumRegisteredThread() <
minArena.getNumRegisteredThread()) {
+ minArena = arena;
+ }
+ }
+
+ arenaRegistry.get().bindArena(minArena);
+ return minArena;
+ }
+ }
+
+ public void runGcEviction(long curGcTimePercent) {
+ if (state.get() == BinaryAllocatorState.CLOSE) {
+ return;
+ }
+
+ LOGGER.debug("Binary allocator running GC eviction");
+ if (state.get() == BinaryAllocatorState.PENDING) {
+ if (curGcTimePercent <= RESTART_GC_TIME_PERCENTAGE) {
+ start();
+ }
+ return;
+ }
+
+ if (curGcTimePercent > SHUTDOWN_GC_TIME_PERCENTAGE) {
+ LOGGER.info(
+ "Binary allocator is shutting down because of high GC time
percentage {}%.",
+ curGcTimePercent);
+ evict(1.0);
+ close(false);
+ } else if (curGcTimePercent > HALF_GC_TIME_PERCENTAGE) {
+ evict(0.5);
+ } else if (curGcTimePercent > WARNING_GC_TIME_PERCENTAGE) {
+ evict(0.2);
+ }
+ }
+
+ public class SampleEvictor extends Evictor {
+
+ public SampleEvictor(String name, Duration evictorShutdownTimeoutDuration)
{
+ super(name, evictorShutdownTimeoutDuration);
+ }
+
+ @Override
+ public void run() {
+ for (Arena arena : heapArenas) {
+ arena.runSampleEviction();
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorState.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorState.java
new file mode 100644
index 00000000000..6812bc84b11
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator;
+
+/**
+ * The state transmission of a binary allocator.
+ *
+ * <pre>
+ * ----------------------------------------
+ * | |
+ * | ---------- |
+ * | | | |
+ * | v | v
+ * UNINITIALIZED --> OPEN ---> PENDING --> CLOSE
+ * ^ ^
+ * | |
+ * -------------------------
+ * </pre>
+ *
+ * State Transition Logic:
+ *
+ * <ul>
+ * <li><b>UNINITIALIZED -> CLOSE</b>: When enable_binary_allocator = false
+ * <li><b>UNINITIALIZED -> OPEN</b>: When enable_binary_allocator = true
+ * <li><b>OPEN -> CLOSE</b>: When enable_binary_allocator is hot reload to
false
+ * <li><b>CLOSE -> OPEN</b>: When enable_binary_allocator is hot reload to
true
+ * <li><b>PENDING -> CLOSE</b>: When enable_binary_allocator is hot reload
to false
+ * <li><b>OPEN -> PENDING</b>: When in OPEN state and GC time percentage
exceeds 30%, indicating
+ * allocator ineffectiveness
+ * <li><b>PENDING -> OPEN</b>: When GC time percentage drops below 5%,
returning to normal state
+ * and re-enabling the allocator. Or when enable_binary_allocator is hot
reload to true.
+ * </ul>
+ */
+public enum BinaryAllocatorState {
+ /** Binary allocator is open for allocation. */
+ OPEN,
+
+ /** Binary allocator is close. All allocations are from the JVM heap. */
+ CLOSE,
+
+ /**
+ * Binary allocator is temporarily closed by GC evictor. All allocations are
from the JVM heap.
+ * Allocator can be restarted afterward.
+ */
+ PENDING,
+
+ /** The initial state of the allocator. */
+ UNINITIALIZED;
+
+ @Override
+ public String toString() {
+ return name();
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/Arena.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/Arena.java
new file mode 100644
index 00000000000..cfb83ed0708
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/Arena.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator.arena;
+
+import org.apache.iotdb.commons.binaryallocator.BinaryAllocator;
+import org.apache.iotdb.commons.binaryallocator.config.AllocatorConfig;
+import org.apache.iotdb.commons.binaryallocator.ema.AdaptiveWeightedAverage;
+import org.apache.iotdb.commons.binaryallocator.utils.SizeClasses;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Arena {
+
+ private static final int EVICT_SAMPLE_COUNT = 100;
+
+ private final BinaryAllocator binaryAllocator;
+ private final SizeClasses sizeClasses;
+ private final int arenaID;
+ private final AtomicInteger numRegisteredThread;
+ private final SlabRegion[] regions;
+
+ private int sampleCount;
+
+ public Arena(
+ BinaryAllocator allocator, SizeClasses sizeClasses, int id,
AllocatorConfig allocatorConfig) {
+ this.binaryAllocator = allocator;
+ this.sizeClasses = sizeClasses;
+ this.arenaID = id;
+ this.numRegisteredThread = new AtomicInteger(0);
+ regions = new SlabRegion[sizeClasses.getSizeClassNum()];
+
+ for (int i = 0; i < regions.length; i++) {
+ regions[i] = new SlabRegion(sizeClasses.sizeIdx2size(i),
allocatorConfig);
+ }
+
+ sampleCount = 0;
+ }
+
+ public int getArenaID() {
+ return arenaID;
+ }
+
+ public byte[] allocate(int reqCapacity) {
+ final int sizeIdx = sizeClasses.size2SizeIdx(reqCapacity);
+ return regions[sizeIdx].allocate();
+ }
+
+ public void deallocate(byte[] bytes) {
+ final int sizeIdx = sizeClasses.size2SizeIdx(bytes.length);
+ regions[sizeIdx].deallocate(bytes);
+ }
+
+ public void evict(double ratio) {
+ for (SlabRegion region : regions) {
+ region.evict(ratio);
+ }
+ }
+
+ public void close() {
+ sampleCount = 0;
+ for (SlabRegion region : regions) {
+ region.close();
+ }
+ }
+
+ public long getTotalUsedMemory() {
+ long totalUsedMemory = 0;
+ for (SlabRegion region : regions) {
+ totalUsedMemory += region.getTotalUsedMemory();
+ }
+ return totalUsedMemory;
+ }
+
+ public long getActiveMemory() {
+ long totalActiveMemory = 0;
+ for (SlabRegion region : regions) {
+ totalActiveMemory += region.getActiveUsedMemory();
+ }
+ return totalActiveMemory;
+ }
+
+ public int getNumRegisteredThread() {
+ return numRegisteredThread.get();
+ }
+
+ public void incRegisteredThread() {
+ this.numRegisteredThread.incrementAndGet();
+ }
+
+ public void decRegisteredThread() {
+ this.numRegisteredThread.decrementAndGet();
+ }
+
+ public void runSampleEviction() {
+ // update metric
+ int allocateFromSlabDelta = 0;
+ int allocateFromJVMDelta = 0;
+ for (SlabRegion region : regions) {
+ allocateFromSlabDelta +=
+ region.byteArraySize * (region.allocationsFromAllocator.get() -
region.prevAllocations);
+ region.prevAllocations = region.allocationsFromAllocator.get();
+ allocateFromJVMDelta +=
+ region.byteArraySize * (region.allocationsFromJVM.get() -
region.prevAllocationsFromJVM);
+ region.prevAllocationsFromJVM = region.allocationsFromJVM.get();
+ }
+ binaryAllocator.getMetrics().updateCounter(allocateFromSlabDelta,
allocateFromJVMDelta);
+
+ // Start sampling
+ for (SlabRegion region : regions) {
+ region.updateSample();
+ }
+
+ sampleCount++;
+ if (sampleCount == EVICT_SAMPLE_COUNT) {
+ // Evict
+ for (SlabRegion region : regions) {
+ region.resize();
+ }
+ sampleCount = 0;
+ }
+ }
+
+ private static class SlabRegion {
+ private final int byteArraySize;
+ private final ConcurrentLinkedQueue<byte[]> queue;
+
+ private final AtomicInteger allocationsFromAllocator;
+ private final AtomicInteger allocationsFromJVM;
+ private final AtomicInteger deAllocationsToAllocator;
+ private final AtomicInteger evictions;
+
+ public int prevAllocations;
+ public int prevAllocationsFromJVM;
+ AdaptiveWeightedAverage average;
+
+ SlabRegion(int byteArraySize, AllocatorConfig allocatorConfig) {
+ this.byteArraySize = byteArraySize;
+ this.average = new
AdaptiveWeightedAverage(allocatorConfig.arenaPredictionWeight);
+ queue = new ConcurrentLinkedQueue<>();
+ allocationsFromAllocator = new AtomicInteger(0);
+ allocationsFromJVM = new AtomicInteger(0);
+ deAllocationsToAllocator = new AtomicInteger(0);
+ evictions = new AtomicInteger(0);
+ prevAllocations = 0;
+ prevAllocationsFromJVM = 0;
+ }
+
+ public final byte[] allocate() {
+ byte[] bytes = queue.poll();
+ if (bytes == null) {
+ allocationsFromJVM.incrementAndGet();
+ return new byte[this.byteArraySize];
+ }
+ allocationsFromAllocator.incrementAndGet();
+ return bytes;
+ }
+
+ public void deallocate(byte[] bytes) {
+ deAllocationsToAllocator.incrementAndGet();
+ queue.add(bytes);
+ }
+
+ private void updateSample() {
+ average.sample(getActiveSize());
+ }
+
+ private void resize() {
+ average.update();
+ int needRemain = (int) Math.ceil(average.average()) - getActiveSize();
+ evict(getQueueSize() - needRemain);
+ }
+
+ private void evict(double ratio) {
+ evict((int) (getQueueSize() * ratio));
+ }
+
+ private void evict(int num) {
+ while (num > 0 && !queue.isEmpty()) {
+ queue.poll();
+ evictions.incrementAndGet();
+ num--;
+ }
+ }
+
+ private long getTotalUsedMemory() {
+ return (long) byteArraySize * getQueueSize();
+ }
+
+ private long getActiveUsedMemory() {
+ return (long) byteArraySize * getActiveSize();
+ }
+
+ // ConcurrentLinkedQueue::size() is O(n)
+ private int getQueueSize() {
+ return deAllocationsToAllocator.get() - allocationsFromAllocator.get() -
evictions.get();
+ }
+
+ private int getActiveSize() {
+ return allocationsFromAllocator.get()
+ + allocationsFromJVM.get()
+ - deAllocationsToAllocator.get();
+ }
+
+ private void close() {
+ queue.clear();
+ allocationsFromAllocator.set(0);
+ allocationsFromJVM.set(0);
+ deAllocationsToAllocator.set(0);
+ evictions.set(0);
+ prevAllocations = 0;
+ prevAllocationsFromJVM = 0;
+ average.clear();
+ }
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/ArenaStrategy.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/ArenaStrategy.java
new file mode 100644
index 00000000000..ec4af3fbe0a
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/arena/ArenaStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator.arena;
+
+/**
+ * This interface defines a strategy for choosing a {@link Arena} from an
array of {@link Arena}s.
+ * Implementations of this interface can provide various strategies for
selection based on specific
+ * criteria.
+ */
+public interface ArenaStrategy {
+ /**
+ * Chooses a {@link Arena} from the given array of {@link Arena}s.
+ *
+ * @param arenas an array of {@link Arena}s to choose from, should not be
null or length == 0
+ * @return the selected {@link Arena}
+ */
+ Arena choose(Arena[] arenas);
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/config/AllocatorConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/config/AllocatorConfig.java
new file mode 100644
index 00000000000..53f20ac0da1
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/config/AllocatorConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator.config;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+
+import java.time.Duration;
+
+public class AllocatorConfig {
+
+ public int minAllocateSize =
CommonDescriptor.getInstance().getConfig().getMinAllocateSize();
+
+ public int maxAllocateSize =
CommonDescriptor.getInstance().getConfig().getMaxAllocateSize();
+
+ public int arenaNum =
CommonDescriptor.getInstance().getConfig().getArenaNum();
+
+ public int log2ClassSizeGroup =
+ CommonDescriptor.getInstance().getConfig().getLog2SizeClassGroup();
+
+ public boolean enableBinaryAllocator =
+ CommonDescriptor.getInstance().getConfig().isEnableBinaryAllocator();
+
+ /** Maximum wait time in milliseconds when shutting down the evictor */
+ public Duration durationEvictorShutdownTimeout = Duration.ofMillis(1000L);
+
+ /** Time interval in milliseconds between two consecutive evictor runs */
+ public Duration durationBetweenEvictorRuns = Duration.ofMillis(1000L);
+
+ public int arenaPredictionWeight = 35;
+
+ public static final AllocatorConfig DEFAULT_CONFIG = new AllocatorConfig();
+
+ public void setTimeBetweenEvictorRunsMillis(long time) {
+ this.durationBetweenEvictorRuns = Duration.ofMillis(time);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/ema/AdaptiveWeightedAverage.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/ema/AdaptiveWeightedAverage.java
new file mode 100644
index 00000000000..201f2f63ae1
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/ema/AdaptiveWeightedAverage.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator.ema;
+
+import static java.lang.Math.max;
+
+/**
+ * This file is modified from <a
+ *
href="https://github.com/openjdk/jdk17/blob/master/src/hotspot/share/gc/shared/gcUtil.hpp">JDK17
+ * AdaptiveWeightedAverage</a>. But some necessary modifications are made to
adapt to the usage of
+ * binary allocator:
+ *
+ * <p>Adaptive weighted average implementation for memory allocation tracking.
During each eviction
+ * cycle, records the peak memory allocation size via sampling, then uses this
peak to calculate a
+ * weighted moving average.
+ */
+public class AdaptiveWeightedAverage {
+
+ private static final int OLD_THRESHOLD = 100;
+
+ private float average;
+ private int sampleCount;
+ private int tmpMaxSample;
+ private final int weight;
+ private boolean isOld; // Enable to have enough historical data
+
+ public AdaptiveWeightedAverage(int weight) {
+ this.weight = weight;
+ average = 0f;
+ sampleCount = 0;
+ tmpMaxSample = 0;
+ }
+
+ public void sample(int newSample) {
+ tmpMaxSample = max(tmpMaxSample, newSample);
+ }
+
+ // called at the end of each eviction cycle
+ public void update() {
+ incrementCount();
+
+ // Compute the new weighted average
+ int newSample = tmpMaxSample;
+ tmpMaxSample = 0;
+ average = computeAdaptiveAverage(newSample, average);
+ }
+
+ public float average() {
+ return average;
+ }
+
+ public void clear() {
+ average = 0f;
+ sampleCount = 0;
+ tmpMaxSample = 0;
+ isOld = false;
+ }
+
+ void incrementCount() {
+ sampleCount++;
+
+ if (!isOld && sampleCount > OLD_THRESHOLD) {
+ isOld = true;
+ }
+ }
+
+ float computeAdaptiveAverage(int newSample, float average) {
+ // We smooth the samples by not using weight() directly until we've
+ // had enough data to make it meaningful. We'd like the first weight
+ // used to be 1, the second to be 1/2, etc until we have
+ // OLD_THRESHOLD/weight samples.
+ int countWeight = 0;
+
+ // Avoid division by zero if the counter wraps
+ if (!isOld) {
+ countWeight = OLD_THRESHOLD / sampleCount;
+ }
+
+ int adaptiveWeight = max(weight, countWeight);
+
+ return (100.0f - adaptiveWeight) * average / 100.0f + adaptiveWeight *
newSample / 100.0f;
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/evictor/Evictor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/evictor/Evictor.java
new file mode 100644
index 00000000000..686e7e73d8f
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/evictor/Evictor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator.evictor;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public abstract class Evictor implements Runnable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Evictor.class);
+
+ private ScheduledFuture<?> scheduledFuture;
+ private final String name;
+ private final Duration evictorShutdownTimeoutDuration;
+
+ private ScheduledExecutorService executor;
+
+ public Evictor(String name, Duration evictorShutdownTimeoutDuration) {
+ this.name = name;
+ this.evictorShutdownTimeoutDuration = evictorShutdownTimeoutDuration;
+ }
+
+ /** Cancels the scheduled future. */
+ void cancel() {
+ scheduledFuture.cancel(false);
+ }
+
+ @Override
+ public abstract void run();
+
+ void setScheduledFuture(final ScheduledFuture<?> scheduledFuture) {
+ this.scheduledFuture = scheduledFuture;
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getName() + " [scheduledFuture=" + scheduledFuture + "]";
+ }
+
+ public void startEvictor(final Duration delay) {
+ if (null == executor) {
+ executor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(name);
+ }
+ final ScheduledFuture<?> scheduledFuture =
+ ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+ executor, this, delay.toMillis(), delay.toMillis(),
TimeUnit.MILLISECONDS);
+ this.setScheduledFuture(scheduledFuture);
+ }
+
+ public void stopEvictor() {
+ if (executor == null) {
+ return;
+ }
+
+ LOGGER.info("Stopping {}", name);
+
+ cancel();
+ executor.shutdown();
+ try {
+ boolean result =
+ executor.awaitTermination(
+ evictorShutdownTimeoutDuration.toMillis(),
TimeUnit.MILLISECONDS);
+ if (!result) {
+ LOGGER.info(
+ "unable to stop evictor after {} ms",
evictorShutdownTimeoutDuration.toMillis());
+ }
+ } catch (final InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ }
+ executor = null;
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/metric/BinaryAllocatorMetrics.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/metric/BinaryAllocatorMetrics.java
new file mode 100644
index 00000000000..475f98860ac
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/metric/BinaryAllocatorMetrics.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator.metric;
+
+import org.apache.iotdb.commons.binaryallocator.BinaryAllocator;
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Counter;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+
+public class BinaryAllocatorMetrics implements IMetricSet {
+
+ private static final String TOTAL_MEMORY = "total-memory";
+ private static final String ALLOCATE_FROM_SLAB = "allocate-from-slab";
+ private static final String ALLOCATE_FROM_JVM = "allocate-from-jvm";
+ private static final String ACTIVE_MEMORY = "active-memory";
+
+ private final BinaryAllocator binaryAllocator;
+ private Counter allocateFromSlab;
+ private Counter allocateFromJVM;
+
+ public BinaryAllocatorMetrics(final BinaryAllocator binaryAllocator) {
+ this.binaryAllocator = binaryAllocator;
+ }
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ metricService.createAutoGauge(
+ Metric.BINARY_ALLOCATOR.toString(),
+ MetricLevel.IMPORTANT,
+ binaryAllocator,
+ BinaryAllocator::getTotalUsedMemory,
+ Tag.NAME.toString(),
+ TOTAL_MEMORY);
+ metricService.createAutoGauge(
+ Metric.BINARY_ALLOCATOR.toString(),
+ MetricLevel.IMPORTANT,
+ binaryAllocator,
+ BinaryAllocator::getTotalActiveMemory,
+ Tag.NAME.toString(),
+ ACTIVE_MEMORY);
+ allocateFromSlab =
+ metricService.getOrCreateCounter(
+ Metric.BINARY_ALLOCATOR.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ ALLOCATE_FROM_SLAB);
+ allocateFromJVM =
+ metricService.getOrCreateCounter(
+ Metric.BINARY_ALLOCATOR.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ ALLOCATE_FROM_JVM);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.BINARY_ALLOCATOR.toString(),
+ Tag.NAME.toString(),
+ TOTAL_MEMORY);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.BINARY_ALLOCATOR.toString(),
+ Tag.NAME.toString(),
+ ACTIVE_MEMORY);
+ metricService.remove(
+ MetricType.COUNTER,
+ Metric.BINARY_ALLOCATOR.toString(),
+ Tag.NAME.toString(),
+ ALLOCATE_FROM_SLAB);
+ metricService.remove(
+ MetricType.COUNTER,
+ Metric.BINARY_ALLOCATOR.toString(),
+ Tag.NAME.toString(),
+ ALLOCATE_FROM_JVM);
+ }
+
+ public void updateCounter(int allocateFromSlabDelta, int
allocateFromJVMDelta) {
+ allocateFromSlab.inc(allocateFromSlabDelta);
+ allocateFromJVM.inc(allocateFromJVMDelta);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/utils/SizeClasses.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/utils/SizeClasses.java
new file mode 100644
index 00000000000..e13cc54a9f1
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/utils/SizeClasses.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator.utils;
+
+import org.apache.iotdb.commons.binaryallocator.config.AllocatorConfig;
+
+/**
+ * SizeClasses class manages different size classes of memory blocks in a
memory allocator. It
+ * optimizes the memory allocation process by precomputing the block sizes and
mapping them to
+ * indices.
+ */
+public final class SizeClasses {
+
+ // Integer size in bits minus 1, used for log2 calculations
+ private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;
+
+ // Mapping from size class index to actual memory block size
+ private final int[] sizeIdx2sizeTab;
+
+ // Log2 value of the minimum memory block size
+ private final int log2MinSize;
+
+ // Log2 value of the size class group
+ private final int log2SizeClassGroup;
+
+ /**
+ * Constructor that initializes the size class table based on the allocator
configuration.
+ *
+ * @param allocatorConfig The allocator configuration containing minimum and
maximum allocation
+ * sizes.
+ */
+ public SizeClasses(AllocatorConfig allocatorConfig) {
+ this.log2SizeClassGroup = allocatorConfig.log2ClassSizeGroup;
+ this.log2MinSize = log2(allocatorConfig.minAllocateSize);
+
+ int maxSize = allocatorConfig.maxAllocateSize;
+ int sizeClassGroupCount = log2(maxSize) - log2MinSize;
+
+ // Initialize the sizeIdx2sizeTab array based on the number of size class
groups
+ sizeIdx2sizeTab = new int[(sizeClassGroupCount << log2SizeClassGroup) + 1];
+
+ // Calculate the size of each size class and populate the table
+ initializeSizeClasses(allocatorConfig.minAllocateSize, maxSize);
+ }
+
+ /**
+ * Returns the memory block size for a given size class index.
+ *
+ * @param sizeIdx The index of the size class.
+ * @return The memory block size corresponding to the size class index.
+ */
+ public int sizeIdx2size(int sizeIdx) {
+ return sizeIdx2sizeTab[sizeIdx];
+ }
+
+ /**
+ * Returns the size class index for a given memory block size.
+ *
+ * @param size The memory block size.
+ * @return The corresponding size class index.
+ */
+ public int size2SizeIdx(int size) {
+ int log2Size = log2((size << 1) - 1); // Calculate the approximate log2
value
+ int shift = log2Size - log2MinSize - 1;
+
+ // Calculate the size class group
+ int group = shift << log2SizeClassGroup;
+ int log2Delta = log2Size - 1 - log2SizeClassGroup;
+
+ // Calculate the index within the size class group
+ int mod = (size - 1) >> log2Delta & (1 << log2SizeClassGroup) - 1;
+ return group + mod + 1;
+ }
+
+ /**
+ * Returns the total number of size classes.
+ *
+ * @return The total number of size classes.
+ */
+ public int getSizeClassNum() {
+ return sizeIdx2sizeTab.length;
+ }
+
+ /**
+ * Calculates the memory block size for a given log2 group, delta, and log2
delta.
+ *
+ * @param log2Group The log2 value of the current size class group.
+ * @param delta The delta value for the size class.
+ * @param log2Delta The log2 value of the delta.
+ * @return The calculated memory block size.
+ */
+ private static int calculateSize(int log2Group, int delta, int log2Delta) {
+ return (1 << log2Group) + (delta << log2Delta);
+ }
+
+ /**
+ * Calculates the log2 value of a given integer.
+ *
+ * @param val The value to calculate the log2 for.
+ * @return The log2 value of the given integer.
+ */
+ private static int log2(int val) {
+ return INTEGER_SIZE_MINUS_ONE - Integer.numberOfLeadingZeros(val);
+ }
+
+ /**
+ * Initializes the size class table by calculating the memory block sizes
for each size class.
+ *
+ * @param minSize The minimum memory block size.
+ * @param maxSize The maximum memory block size.
+ */
+ private void initializeSizeClasses(int minSize, int maxSize) {
+ int nDeltaLimit = 1 << log2SizeClassGroup;
+ int log2Group = log2MinSize;
+ int log2Delta = log2MinSize - log2SizeClassGroup;
+
+ int sizeCount = 0;
+ int size = calculateSize(log2Group, 0, log2Delta);
+ sizeIdx2sizeTab[sizeCount++] = size; // Initial size
+
+ // Iterate through the remaining size classes and calculate their sizes
+ for (; size < maxSize; log2Group++, log2Delta++) {
+ for (int nDelta = 1; nDelta <= nDeltaLimit && size <= maxSize; nDelta++)
{
+ size = calculateSize(log2Group, nDelta, log2Delta);
+ sizeIdx2sizeTab[sizeCount++] = size;
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index cd3416f6baf..fee09496730 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -190,6 +190,7 @@ public enum ThreadName {
STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"),
REPAIR_DATA("RepairData"),
FILE_TIME_INDEX_RECORD("FileTimeIndexRecord"),
+ BINARY_ALLOCATOR_SAMPLE_EVICTOR("BinaryAllocator-SampleEvictor"),
// the unknown thread name is used for metrics
UNKOWN("UNKNOWN");
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 74314cacdfa..2d1585d6525 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -320,6 +320,16 @@ public class CommonConfig {
private long seriesLimitThreshold = -1;
private long deviceLimitThreshold = -1;
+ private boolean enableBinaryAllocator = true;
+
+ private int arenaNum = 4;
+
+ private int minAllocateSize = 4096;
+
+ private int maxAllocateSize = 1024 * 1024;
+
+ private int log2SizeClassGroup = 3;
+
// time in nanosecond precision when starting up
private final long startUpNanosecond = System.nanoTime();
@@ -1479,4 +1489,44 @@ public class CommonConfig {
public void setRemoteWriteMaxRetryDurationInMs(long
remoteWriteMaxRetryDurationInMs) {
this.remoteWriteMaxRetryDurationInMs = remoteWriteMaxRetryDurationInMs;
}
+
+ public int getArenaNum() {
+ return arenaNum;
+ }
+
+ public void setArenaNum(int arenaNum) {
+ this.arenaNum = arenaNum;
+ }
+
+ public int getMinAllocateSize() {
+ return minAllocateSize;
+ }
+
+ public void setMinAllocateSize(int minAllocateSize) {
+ this.minAllocateSize = minAllocateSize;
+ }
+
+ public int getMaxAllocateSize() {
+ return maxAllocateSize;
+ }
+
+ public void setMaxAllocateSize(int maxAllocateSize) {
+ this.maxAllocateSize = maxAllocateSize;
+ }
+
+ public boolean isEnableBinaryAllocator() {
+ return enableBinaryAllocator;
+ }
+
+ public void setEnableBinaryAllocator(boolean enableBinaryAllocator) {
+ this.enableBinaryAllocator = enableBinaryAllocator;
+ }
+
+ public int getLog2SizeClassGroup() {
+ return log2SizeClassGroup;
+ }
+
+ public void setLog2SizeClassGroup(int log2SizeClassGroup) {
+ this.log2SizeClassGroup = log2SizeClassGroup;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 26ace0305c2..6acbd732857 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -244,6 +244,7 @@ public class CommonDescriptor {
String.valueOf(config.getDeviceLimitThreshold()))));
loadRetryProperties(properties);
+ loadBinaryAllocatorProps(properties);
}
private void loadPipeProps(TrimProperties properties) {
@@ -736,6 +737,30 @@ public class CommonDescriptor {
"enable_retry_for_unknown_error"))));
}
+ public void loadBinaryAllocatorProps(TrimProperties properties) {
+ config.setEnableBinaryAllocator(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_binary_allocator",
Boolean.toString(config.isEnableBinaryAllocator()))));
+ config.setMinAllocateSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "small_blob_object",
String.valueOf(config.getMinAllocateSize()))));
+ config.setMaxAllocateSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "huge_blob_object",
String.valueOf(config.getMaxAllocateSize()))));
+ int arenaNum =
+ Integer.parseInt(properties.getProperty("arena_num",
String.valueOf(config.getArenaNum())));
+ if (arenaNum > 0) {
+ config.setArenaNum(arenaNum);
+ }
+ config.setLog2SizeClassGroup(
+ Integer.parseInt(
+ properties.getProperty(
+ "log2_size_class_group",
String.valueOf(config.getLog2SizeClassGroup()))));
+ }
+
public void loadGlobalConfig(TGlobalConfig globalConfig) {
config.setTimestampPrecision(globalConfig.timestampPrecision);
config.setTimePartitionOrigin(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/JvmGcMonitorMetrics.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/JvmGcMonitorMetrics.java
index fd7e500808c..0dbbba5e99f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/JvmGcMonitorMetrics.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/JvmGcMonitorMetrics.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.service.metric;
+import org.apache.iotdb.commons.binaryallocator.BinaryAllocator;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -114,9 +115,14 @@ public class JvmGcMonitorMetrics implements IMetricSet {
private void scheduledMonitoring() {
calculateGCTimePercentageWithinObservedInterval();
+
+ // Alert if necessary
if (alertHandler != null && curData.getGcTimePercentage() >
MAX_GC_TIME_PERCENTAGE) {
alertHandler.alert(curData.clone());
}
+
+ // Run GC eviction
+ BinaryAllocator.getInstance().runGcEviction(curData.getGcTimePercentage());
}
private long getTotalGCTime() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 5812fac36f2..04757610064 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -184,6 +184,7 @@ public enum Metric {
LOAD_TIME_COST("load_time_cost"),
LOAD_POINT_COUNT("load_point_count"),
MEMTABLE_POINT_COUNT("memtable_point_count"),
+ BINARY_ALLOCATOR("binary_allocator"),
;
final String value;
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorTest.java
new file mode 100644
index 00000000000..0fb4f0d96b5
--- /dev/null
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.binaryallocator;
+
+import org.apache.iotdb.commons.binaryallocator.config.AllocatorConfig;
+import org.apache.iotdb.commons.binaryallocator.utils.SizeClasses;
+
+import org.apache.tsfile.utils.PooledBinary;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class BinaryAllocatorTest {
+ @Test
+ public void testAllocateBinary() {
+ AllocatorConfig config = new AllocatorConfig();
+ config.arenaNum = 1;
+ BinaryAllocator binaryAllocator = new BinaryAllocator(config);
+ binaryAllocator.resetArenaBinding();
+
+ PooledBinary binary = binaryAllocator.allocateBinary(255);
+ assertNotNull(binary);
+ assertEquals(binary.getArenaIndex(), -1);
+ assertEquals(binary.getLength(), 255);
+ binaryAllocator.deallocateBinary(binary);
+
+ binary = binaryAllocator.allocateBinary(65536);
+ assertNotNull(binary);
+ assertEquals(binary.getArenaIndex(), 0);
+ assertEquals(binary.getLength(), 65536);
+ binaryAllocator.deallocateBinary(binary);
+
+ binary = binaryAllocator.allocateBinary(65535);
+ assertNotNull(binary);
+ assertEquals(binary.getArenaIndex(), 0);
+ assertEquals(binary.getLength(), 65535);
+ assertEquals(binary.getValues().length, 65536);
+ binaryAllocator.deallocateBinary(binary);
+ }
+
+ @Test
+ public void testStrategy() throws InterruptedException {
+ BinaryAllocator binaryAllocator = new
BinaryAllocator(AllocatorConfig.DEFAULT_CONFIG);
+ binaryAllocator.resetArenaBinding();
+
+ PooledBinary binary1 = binaryAllocator.allocateBinary(4096);
+ PooledBinary binary2 = binaryAllocator.allocateBinary(4096);
+ assertEquals(binary1.getArenaIndex(), binary2.getArenaIndex());
+ binaryAllocator.deallocateBinary(binary1);
+ binaryAllocator.deallocateBinary(binary2);
+
+ int threadCount = 4;
+ CountDownLatch latch = new CountDownLatch(threadCount);
+ Map<Integer, Integer> arenaUsageCount = new ConcurrentHashMap<>();
+ for (int i = 0; i < threadCount; i++) {
+ Thread thread =
+ new Thread(
+ () -> {
+ try {
+ PooledBinary firstBinary =
binaryAllocator.allocateBinary(2048);
+ int arenaId = firstBinary.getArenaIndex();
+ arenaUsageCount.merge(arenaId, 1, Integer::sum);
+ binaryAllocator.deallocateBinary(firstBinary);
+ } finally {
+ latch.countDown();
+ }
+ });
+ thread.start();
+ }
+
+ latch.await();
+ int maxUsage = Collections.max(arenaUsageCount.values());
+ int minUsage = Collections.min(arenaUsageCount.values());
+ assertEquals(maxUsage, minUsage);
+ }
+
+ @Test
+ public void testEviction() throws InterruptedException {
+ AllocatorConfig config = new AllocatorConfig();
+ config.arenaNum = 1;
+ config.minAllocateSize = config.maxAllocateSize = 4096;
+ config.setTimeBetweenEvictorRunsMillis(1);
+ BinaryAllocator binaryAllocator = new BinaryAllocator(config);
+ binaryAllocator.resetArenaBinding();
+
+ PooledBinary binary = binaryAllocator.allocateBinary(4096);
+ binaryAllocator.deallocateBinary(binary);
+ assertEquals(binaryAllocator.getTotalUsedMemory(), 4096);
+ Thread.sleep(200);
+ assertEquals(binaryAllocator.getTotalUsedMemory(), 0);
+ }
+
+ @Test
+ public void testSizeMapping() {
+ AllocatorConfig config = new AllocatorConfig();
+ config.minAllocateSize = 4096;
+ config.maxAllocateSize = 65536;
+ SizeClasses sizeClasses = new SizeClasses(config);
+
+ assertEquals(sizeClasses.getSizeClassNum(), 33);
+ int[] testSizes = {4607, 8191, 16383, 32767, 65535};
+
+ for (int size : testSizes) {
+ int sizeIdx = sizeClasses.size2SizeIdx(size);
+ int mappedSize = sizeClasses.sizeIdx2size(sizeIdx);
+
+ assertEquals("Mapped size should be >= original size", mappedSize, size
+ 1);
+
+ if (sizeIdx > 0) {
+ int previousSize = sizeClasses.sizeIdx2size(sizeIdx - 1);
+ assertTrue("Previous size should be < original size", previousSize <
size);
+ }
+ }
+ }
+}