This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch allocator_auto_release in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 18996450158e33748038d809f6060de2a1543d35 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Tue May 13 14:18:31 2025 +0800 enhance Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../commons/binaryallocator/BinaryAllocator.java | 32 +++++++++---- .../Evictor.java => autoreleaser/Releaser.java} | 55 ++++++++++------------ .../binaryallocator/config/AllocatorConfig.java | 4 +- .../commons/binaryallocator/evictor/Evictor.java | 14 +++--- .../iotdb/commons/concurrent/ThreadName.java | 1 + .../binaryallocator/BinaryAllocatorTest.java | 13 ++++- 6 files changed, 69 insertions(+), 50 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java index 12d11eb6344..984ffafd9bb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocator.java @@ -21,6 +21,7 @@ package org.apache.iotdb.commons.binaryallocator; import org.apache.iotdb.commons.binaryallocator.arena.Arena; import org.apache.iotdb.commons.binaryallocator.arena.ArenaStrategy; +import org.apache.iotdb.commons.binaryallocator.autoreleaser.Releaser; import org.apache.iotdb.commons.binaryallocator.config.AllocatorConfig; import org.apache.iotdb.commons.binaryallocator.evictor.Evictor; import org.apache.iotdb.commons.binaryallocator.metric.BinaryAllocatorMetrics; @@ -53,7 +54,7 @@ public class BinaryAllocator { private final BinaryAllocatorMetrics metrics; private Evictor sampleEvictor; - // private TinyGC tinyGC; + private Releaser autoReleaser; private static final ThreadLocal<ThreadArenaRegistry> arenaRegistry = ThreadLocal.withInitial(ThreadArenaRegistry::new); @@ -62,7 +63,6 @@ public class BinaryAllocator { private static final int SHUTDOWN_GC_TIME_PERCENTAGE = 30; private static final int RESTART_GC_TIME_PERCENTAGE = 5; - private final AutoReleaseThread autoReleaseThread = new AutoReleaseThread(); public final ReferenceQueue<PooledBinary> referenceQueue = new ReferenceQueue<>(); // JDK 9+ Cleaner uses double-linked list and synchronized to manage references, which has worse @@ -100,9 +100,11 @@ public class BinaryAllocator { sampleEvictor = new SampleEvictor( ThreadName.BINARY_ALLOCATOR_SAMPLE_EVICTOR.getName(), - allocatorConfig.durationEvictorShutdownTimeout); - sampleEvictor.startEvictor(allocatorConfig.durationBetweenEvictorRuns); - autoReleaseThread.start(); + allocatorConfig.durationShutdownTimeout); + sampleEvictor.start(allocatorConfig.durationBetweenEvictorRuns); + autoReleaser = new AutoReleaser(ThreadName.BINARY_ALLOCATOR_AUTO_RELEASER.getName(), + allocatorConfig.durationShutdownTimeout); + autoReleaser.start(); } public synchronized void close(boolean forceClose) { @@ -113,7 +115,8 @@ public class BinaryAllocator { state.set(BinaryAllocatorState.PENDING); } - sampleEvictor.stopEvictor(); + sampleEvictor.stop(); + autoReleaser.stop(); for (Arena arena : heapArenas) { arena.close(); } @@ -182,11 +185,13 @@ public class BinaryAllocator { } private static class BinaryAllocatorHolder { + private static final BinaryAllocator INSTANCE = new BinaryAllocator(AllocatorConfig.DEFAULT_CONFIG); } private static class ThreadArenaRegistry { + private Arena threadArenaBinding = null; public Arena getArena() { @@ -213,6 +218,7 @@ public class BinaryAllocator { } private static class LeastUsedArenaStrategy implements ArenaStrategy { + @Override public Arena choose(Arena[] arenas) { Arena boundArena = arenaRegistry.get().getArena(); @@ -278,8 +284,15 @@ public class BinaryAllocator { } } - /** Process phantomly reachable objects and return their byte arrays to pool. */ - public class AutoReleaseThread extends Thread { + /** + * Process phantomly reachable objects and return their byte arrays to pool. + */ + public class AutoReleaser extends Releaser { + + public AutoReleaser(String name, Duration shutdownTimeoutDuration) { + super(name, shutdownTimeoutDuration); + } + @Override public void run() { PooledBinaryPhantomReference ref; @@ -289,7 +302,8 @@ public class BinaryAllocator { ref.slabRegion.deallocate(ref.byteArray); } } catch (InterruptedException e) { - throw new RuntimeException(e); + LOGGER.info("{} exits due to interruptedException.", 1); + Thread.currentThread().interrupt(); } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/evictor/Evictor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/autoreleaser/Releaser.java similarity index 51% copy from iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/evictor/Evictor.java copy to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/autoreleaser/Releaser.java index 686e7e73d8f..6bcf31afc10 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/evictor/Evictor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/autoreleaser/Releaser.java @@ -17,61 +17,56 @@ * under the License. */ -package org.apache.iotdb.commons.binaryallocator.evictor; +package org.apache.iotdb.commons.binaryallocator.autoreleaser; +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; -import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.time.Duration; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -public abstract class Evictor implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(Evictor.class); +public abstract class Releaser implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(Releaser.class); - private ScheduledFuture<?> scheduledFuture; + private Future<?> future; private final String name; - private final Duration evictorShutdownTimeoutDuration; + private final Duration shutdownTimeoutDuration; - private ScheduledExecutorService executor; + private ExecutorService executor; - public Evictor(String name, Duration evictorShutdownTimeoutDuration) { + public Releaser(String name, Duration shutdownTimeoutDuration) { this.name = name; - this.evictorShutdownTimeoutDuration = evictorShutdownTimeoutDuration; + this.shutdownTimeoutDuration = shutdownTimeoutDuration; } - /** Cancels the scheduled future. */ + /** Cancels the future. */ void cancel() { - scheduledFuture.cancel(false); + future.cancel(false); } @Override public abstract void run(); - void setScheduledFuture(final ScheduledFuture<?> scheduledFuture) { - this.scheduledFuture = scheduledFuture; + void setFuture(final Future<?> future) { + this.future = future; } @Override public String toString() { - return getClass().getName() + " [scheduledFuture=" + scheduledFuture + "]"; + return getClass().getName() + " [future=" + future + "]"; } - public void startEvictor(final Duration delay) { + public void start() { if (null == executor) { - executor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(name); + executor = IoTDBThreadPoolFactory.newSingleThreadExecutor(name); } - final ScheduledFuture<?> scheduledFuture = - ScheduledExecutorUtil.safelyScheduleAtFixedRate( - executor, this, delay.toMillis(), delay.toMillis(), TimeUnit.MILLISECONDS); - this.setScheduledFuture(scheduledFuture); + final Future<?> future = executor.submit(this); + this.setFuture(future); } - public void stopEvictor() { + public void stop() { if (executor == null) { return; } @@ -83,14 +78,14 @@ public abstract class Evictor implements Runnable { try { boolean result = executor.awaitTermination( - evictorShutdownTimeoutDuration.toMillis(), TimeUnit.MILLISECONDS); + shutdownTimeoutDuration.toMillis(), TimeUnit.MILLISECONDS); if (!result) { LOGGER.info( - "unable to stop evictor after {} ms", evictorShutdownTimeoutDuration.toMillis()); + "unable to stop auto releaser after {} ms", shutdownTimeoutDuration.toMillis()); } } catch (final InterruptedException ignored) { Thread.currentThread().interrupt(); } executor = null; } -} +} \ No newline at end of file diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/config/AllocatorConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/config/AllocatorConfig.java index 53f20ac0da1..3bbbbc9a210 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/config/AllocatorConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/config/AllocatorConfig.java @@ -37,8 +37,8 @@ public class AllocatorConfig { public boolean enableBinaryAllocator = CommonDescriptor.getInstance().getConfig().isEnableBinaryAllocator(); - /** Maximum wait time in milliseconds when shutting down the evictor */ - public Duration durationEvictorShutdownTimeout = Duration.ofMillis(1000L); + /** Maximum wait time in milliseconds when shutting down the evictor and autoReleaser */ + public Duration durationShutdownTimeout = Duration.ofMillis(1000L); /** Time interval in milliseconds between two consecutive evictor runs */ public Duration durationBetweenEvictorRuns = Duration.ofMillis(1000L); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/evictor/Evictor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/evictor/Evictor.java index 686e7e73d8f..ab9286cda40 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/evictor/Evictor.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/binaryallocator/evictor/Evictor.java @@ -35,13 +35,13 @@ public abstract class Evictor implements Runnable { private ScheduledFuture<?> scheduledFuture; private final String name; - private final Duration evictorShutdownTimeoutDuration; + private final Duration shutdownTimeoutDuration; private ScheduledExecutorService executor; - public Evictor(String name, Duration evictorShutdownTimeoutDuration) { + public Evictor(String name, Duration shutdownTimeoutDuration) { this.name = name; - this.evictorShutdownTimeoutDuration = evictorShutdownTimeoutDuration; + this.shutdownTimeoutDuration = shutdownTimeoutDuration; } /** Cancels the scheduled future. */ @@ -61,7 +61,7 @@ public abstract class Evictor implements Runnable { return getClass().getName() + " [scheduledFuture=" + scheduledFuture + "]"; } - public void startEvictor(final Duration delay) { + public void start(final Duration delay) { if (null == executor) { executor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(name); } @@ -71,7 +71,7 @@ public abstract class Evictor implements Runnable { this.setScheduledFuture(scheduledFuture); } - public void stopEvictor() { + public void stop() { if (executor == null) { return; } @@ -83,10 +83,10 @@ public abstract class Evictor implements Runnable { try { boolean result = executor.awaitTermination( - evictorShutdownTimeoutDuration.toMillis(), TimeUnit.MILLISECONDS); + shutdownTimeoutDuration.toMillis(), TimeUnit.MILLISECONDS); if (!result) { LOGGER.info( - "unable to stop evictor after {} ms", evictorShutdownTimeoutDuration.toMillis()); + "unable to stop evictor after {} ms", shutdownTimeoutDuration.toMillis()); } } catch (final InterruptedException ignored) { Thread.currentThread().interrupt(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index dae0bd76581..e208f0eb87a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -193,6 +193,7 @@ public enum ThreadName { STORAGE_ENGINE_RECOVER_TRIGGER("StorageEngine-RecoverTrigger"), FILE_TIME_INDEX_RECORD("FileTimeIndexRecord"), BINARY_ALLOCATOR_SAMPLE_EVICTOR("BinaryAllocator-SampleEvictor"), + BINARY_ALLOCATOR_AUTO_RELEASER("BinaryAllocator-Auto-Releaser"), // the unknown thread name is used for metrics UNKNOWN("UNKNOWN"); diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorTest.java index 3314f26199c..b920cb6d54b 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/binaryallocator/BinaryAllocatorTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.commons.binaryallocator; +import java.util.concurrent.TimeUnit; import org.apache.iotdb.commons.binaryallocator.config.AllocatorConfig; import org.apache.iotdb.commons.binaryallocator.utils.SizeClasses; @@ -33,8 +34,10 @@ import java.util.concurrent.CountDownLatch; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class BinaryAllocatorTest { + @Test public void testAllocateBinary() { AllocatorConfig config = new AllocatorConfig(); @@ -154,7 +157,13 @@ public class BinaryAllocatorTest { // reference count is 0 binary = null; System.gc(); - Thread.sleep(100); - assertEquals(binaryAllocator.getTotalUsedMemory(), 4096); + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime <= TimeUnit.MINUTES.toMillis(1)) { + if (binaryAllocator.getTotalUsedMemory() == 4096) { + return; + } + Thread.sleep(100); + } + fail("Can not auto release PoolBinary in binary allocator"); } }
