This is an automated email from the ASF dual-hosted git repository. spricoder pushed a commit to branch feature/memory_auto in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ca8687b48e068225fe9ae8956f7db2f0073aa93c Author: spricoder <[email protected]> AuthorDate: Fri Feb 21 15:37:02 2025 +0800 add simple auto --- .../db/consensus/DataRegionConsensusImpl.java | 2 +- .../db/pipe/resource/memory/PipeMemoryManager.java | 7 +- .../queryengine/execution/memory/MemoryPool.java | 2 +- .../analyze/cache/partition/PartitionCache.java | 2 +- .../cache/schema/DataNodeDevicePathCache.java | 2 +- .../plan/planner/LocalExecutionPlanner.java | 2 +- .../fetcher/cache/TableDeviceSchemaCache.java | 2 +- .../rescon/MemSchemaEngineStatistics.java | 2 +- .../db/storageengine/buffer/BloomFilterCache.java | 2 +- .../iotdb/db/storageengine/buffer/ChunkCache.java | 4 +- .../buffer/TimeSeriesMetadataCache.java | 2 +- .../rescon/memory/PrimitiveArrayManager.java | 2 +- .../db/storageengine/rescon/memory/SystemInfo.java | 6 +- .../rescon/memory/TimePartitionManager.java | 2 +- .../rescon/memory/TsFileResourceManager.java | 2 +- .../iotdb/commons/concurrent/ThreadName.java | 1 + .../apache/iotdb/commons/memory/IMemoryBlock.java | 4 + .../iotdb/commons/memory/MemoryBlockType.java | 8 +- .../apache/iotdb/commons/memory/MemoryManager.java | 83 ++++++++++++++- .../memory/MemoryPeriodicalJobExecutor.java | 111 +++++++++++++++++++++ .../iotdb/commons/memory/MemoryManagerTest.java | 20 ++-- 21 files changed, 230 insertions(+), 38 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 84466cc7598..5c7f5ad7242 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -113,7 +113,7 @@ public class DataRegionConsensusImpl { private static ConsensusConfig buildConsensusConfig() { IMemoryBlock memoryBlock = - CONF.getConsensusMemoryManager().forceAllocate("Consensus", MemoryBlockType.FUNCTION); + CONF.getConsensusMemoryManager().forceAllocate("Consensus", MemoryBlockType.DYNAMIC); return ConsensusConfig.newBuilder() .setThisNodeId(CONF.getDataNodeId()) .setThisNode(new TEndPoint(CONF.getInternalAddress(), CONF.getDataRegionConsensusPort())) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java index f60606cc83b..1e441a7e8d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java @@ -56,7 +56,7 @@ public class PipeMemoryManager { IoTDBDescriptor.getInstance() .getConfig() .getPipeMemoryManager() - .forceAllocate("Stream", MemoryBlockType.FUNCTION); + .forceAllocate("Stream", MemoryBlockType.DYNAMIC); private static final double EXCEED_PROTECT_THRESHOLD = 0.95; @@ -388,8 +388,9 @@ public class PipeMemoryManager { return new PipeMemoryBlock(sizeInBytes); } - if (sizeInBytes == 0 || memoryBlock.getTotalMemorySizeInBytes() - memoryBlock.getUsedMemoryInBytes() - >= sizeInBytes) { + if (sizeInBytes == 0 + || memoryBlock.getTotalMemorySizeInBytes() - memoryBlock.getUsedMemoryInBytes() + >= sizeInBytes) { return registerMemoryBlock(sizeInBytes); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java index 9323519e664..1ea742dbbf7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/memory/MemoryPool.java @@ -127,7 +127,7 @@ public class MemoryPool { public MemoryPool(String id, MemoryManager memoryManager, long maxBytesPerFragmentInstance) { this.id = Validate.notNull(id, "id can not be null."); this.memoryBlock = - memoryManager.forceAllocate(memoryManager.getName(), MemoryBlockType.FUNCTION); + memoryManager.forceAllocate(memoryManager.getName(), MemoryBlockType.DYNAMIC); Validate.isTrue( this.memoryBlock.getTotalMemorySizeInBytes() > 0L, "max bytes should be greater than zero: %d", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java index 88acac5e701..143d92e62fa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java @@ -121,7 +121,7 @@ public class PartitionCache { this.memoryBlock = config .getPartitionCacheMemoryManager() - .forceAllocate("PartitionCache", MemoryBlockType.FUNCTION); + .forceAllocate("PartitionCache", MemoryBlockType.STATIC); this.memoryBlock.allocate(this.memoryBlock.getTotalMemorySizeInBytes()); // TODO @spricoder: PartitionCache need to be controlled according to memory this.schemaPartitionCache = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java index 269dc34ad88..fca17a11430 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeDevicePathCache.java @@ -45,7 +45,7 @@ public class DataNodeDevicePathCache { devicePathCacheMemoryBlock = config .getDevicePathCacheMemoryManager() - .forceAllocate("DevicePathCache", MemoryBlockType.PERFORMANCE); + .forceAllocate("DevicePathCache", MemoryBlockType.STATIC); // TODO @spricoder: later we can find a way to get the byte size of cache devicePathCacheMemoryBlock.allocate(devicePathCacheMemoryBlock.getTotalMemorySizeInBytes()); devicePathCache = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java index b3dd13b40ad..93f45bebb5e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java @@ -69,7 +69,7 @@ public class LocalExecutionPlanner { IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); OPERATORS_MEMORY_BLOCK = - CONFIG.getOperatorsMemoryManager().forceAllocate("Operators", MemoryBlockType.FUNCTION); + CONFIG.getOperatorsMemoryManager().forceAllocate("Operators", MemoryBlockType.DYNAMIC); MIN_REST_MEMORY_FOR_QUERY_AFTER_LOAD = (long) ((OPERATORS_MEMORY_BLOCK.getTotalMemorySizeInBytes()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java index 9004c43ef5d..604208b1edc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java @@ -108,7 +108,7 @@ public class TableDeviceSchemaCache { memoryBlock = config .getSchemaCacheMemoryManager() - .forceAllocate("TableDeviceSchemaCache", MemoryBlockType.FUNCTION); + .forceAllocate("TableDeviceSchemaCache", MemoryBlockType.STATIC); dualKeyCache = new DualKeyCacheBuilder<TableId, IDeviceID, TableDeviceCacheEntry>() .cacheEvictionPolicy( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaEngineStatistics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaEngineStatistics.java index f34e8a6f898..51d6ad337a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaEngineStatistics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/rescon/MemSchemaEngineStatistics.java @@ -59,7 +59,7 @@ public class MemSchemaEngineStatistics implements ISchemaEngineStatistics { IoTDBDescriptor.getInstance() .getConfig() .getSchemaRegionMemoryManager() - .forceAllocate("SchemaRegion", MemoryBlockType.FUNCTION); + .forceAllocate("SchemaRegion", MemoryBlockType.DYNAMIC); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java index 5a396b85a4c..be2a48c19aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/BloomFilterCache.java @@ -60,7 +60,7 @@ public class BloomFilterCache { CACHE_MEMORY_BLOCK = CONFIG .getBloomFilterCacheMemoryManager() - .forceAllocate("BloomFilterCache", MemoryBlockType.PERFORMANCE); + .forceAllocate("BloomFilterCache", MemoryBlockType.STATIC); // TODO @spricoder: find a way to get the size of the BloomFilterCache CACHE_MEMORY_BLOCK.allocate(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java index dc0ada9f8ea..d964a635b8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/ChunkCache.java @@ -73,9 +73,7 @@ public class ChunkCache { static { CACHE_MEMORY_BLOCK = - CONFIG - .getChunkCacheMemoryManager() - .forceAllocate("ChunkCache", MemoryBlockType.PERFORMANCE); + CONFIG.getChunkCacheMemoryManager().forceAllocate("ChunkCache", MemoryBlockType.STATIC); // TODO @spricoder: find a way to get the size of the ChunkCache CACHE_MEMORY_BLOCK.allocate(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java index 2b02bd300c6..3ba8e8a4af3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java @@ -87,7 +87,7 @@ public class TimeSeriesMetadataCache { CACHE_MEMORY_BLOCK = config .getTimeSeriesMetaDataCacheMemoryManager() - .forceAllocate("TimeSeriesMetadataCache", MemoryBlockType.PERFORMANCE); + .forceAllocate("TimeSeriesMetadataCache", MemoryBlockType.STATIC); // TODO @spricoder find a better way to get the size of cache CACHE_MEMORY_BLOCK.allocate(CACHE_MEMORY_BLOCK.getTotalMemorySizeInBytes()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java index 9e198a1f601..1b9b523dd12 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java @@ -55,7 +55,7 @@ public class PrimitiveArrayManager { private static final IMemoryBlock POOLED_ARRAYS_MEMORY_BLOCK = CONFIG .getBufferedArraysMemoryManager() - .forceAllocate("BufferedArrays", MemoryBlockType.FUNCTION); + .forceAllocate("BufferedArrays", MemoryBlockType.DYNAMIC); /** threshold total size of arrays for all data types */ private static final double POOLED_ARRAYS_MEMORY_THRESHOLD = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java index a079e01bd9a..7819e9b5a53 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/SystemInfo.java @@ -78,13 +78,13 @@ public class SystemInfo { private SystemInfo() { compactionMemoryBlock = - config.getCompactionMemoryManager().forceAllocate("Compaction", MemoryBlockType.FUNCTION); + config.getCompactionMemoryManager().forceAllocate("Compaction", MemoryBlockType.DYNAMIC); walBufferQueueMemoryBlock = - config.getWalBufferQueueManager().forceAllocate("WalBufferQueue", MemoryBlockType.FUNCTION); + config.getWalBufferQueueManager().forceAllocate("WalBufferQueue", MemoryBlockType.DYNAMIC); directBufferMemoryBlock = config .getDirectBufferMemoryManager() - .forceAllocate("DirectBuffer", MemoryBlockType.FUNCTION); + .forceAllocate("DirectBuffer", MemoryBlockType.DYNAMIC); loadWriteMemory(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java index 65194d0fbe8..85e96683413 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TimePartitionManager.java @@ -51,7 +51,7 @@ public class TimePartitionManager { IoTDBDescriptor.getInstance() .getConfig() .getTimePartitionInfoMemoryManager() - .forceAllocate("TimePartitionInfoMemoryBlock", MemoryBlockType.FUNCTION); + .forceAllocate("TimePartitionInfoMemoryBlock", MemoryBlockType.DYNAMIC); } public void registerTimePartitionInfo(TimePartitionInfo timePartitionInfo) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java index 62bb52a45c6..2cecc1ed3a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java @@ -49,7 +49,7 @@ public class TsFileResourceManager { private TsFileResourceManager() { memoryBlock = - CONFIG.getTimeIndexMemoryManager().forceAllocate("TimeIndex", MemoryBlockType.FUNCTION); + CONFIG.getTimeIndexMemoryManager().forceAllocate("TimeIndex", MemoryBlockType.DYNAMIC); } @TestOnly diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index ec4a1a7c2cd..3965622078c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -149,6 +149,7 @@ public enum ThreadName { SUBSCRIPTION_RUNTIME_META_SYNCER("Subscription-Runtime-Meta-Syncer"), WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"), STATEFUL_TRIGGER_INFORMATION_UPDATER("Stateful-Trigger-Information-Updater"), + MEMORY_PERIODICAL_JOB_EXECUTOR("Memory-Periodical-Job-Executor"), // -------------------------- JVM -------------------------- // NOTICE: The thread name of jvm cannot be edited here! // We list the thread name here just for distinguishing what module the thread belongs to. diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java index 8e6437460e1..829b27292b3 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/IMemoryBlock.java @@ -106,6 +106,10 @@ public abstract class IMemoryBlock implements AutoCloseable { this.totalMemorySizeInBytes = totalMemorySizeInBytes; } + public void resizeByRatio(double ratio) { + totalMemorySizeInBytes = (long) (totalMemorySizeInBytes * ratio); + } + /** Get the maximum memory size in byte of this memory block */ public long getTotalMemorySizeInBytes() { return totalMemorySizeInBytes; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryBlockType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryBlockType.java index 50d22eee50d..6a20600713e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryBlockType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryBlockType.java @@ -21,8 +21,8 @@ package org.apache.iotdb.commons.memory; public enum MemoryBlockType { NONE, - // function related memory - FUNCTION, - // performance related memory - PERFORMANCE, + // static memory + STATIC, + // dynamic memory + DYNAMIC, } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java index 58142d9f490..e591d618fbe 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java @@ -19,6 +19,8 @@ package org.apache.iotdb.commons.memory; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.utils.TestOnly; import org.slf4j.Logger; @@ -46,6 +48,9 @@ public class MemoryManager { /** Whether memory management is enabled */ private final boolean enable; + /** The total allocate memory size in byte of memory manager */ + private final long allocateTotalMemorySizeInBytes; + /** The total memory size in byte of memory manager */ private long totalMemorySizeInBytes; @@ -65,6 +70,7 @@ public class MemoryManager { public MemoryManager(long totalMemorySizeInBytes) { this.name = "Test"; this.parentMemoryManager = null; + this.allocateTotalMemorySizeInBytes = totalMemorySizeInBytes; this.totalMemorySizeInBytes = totalMemorySizeInBytes; this.enable = false; } @@ -73,6 +79,7 @@ public class MemoryManager { String name, MemoryManager parentMemoryManager, long totalMemorySizeInBytes) { this.name = name; this.parentMemoryManager = parentMemoryManager; + this.allocateTotalMemorySizeInBytes = totalMemorySizeInBytes; this.totalMemorySizeInBytes = totalMemorySizeInBytes; this.enable = false; } @@ -81,6 +88,7 @@ public class MemoryManager { String name, MemoryManager parentMemoryManager, long totalMemorySizeInBytes, boolean enable) { this.name = name; this.parentMemoryManager = parentMemoryManager; + this.allocateTotalMemorySizeInBytes = totalMemorySizeInBytes; this.totalMemorySizeInBytes = totalMemorySizeInBytes; this.enable = enable; } @@ -446,6 +454,10 @@ public class MemoryManager { this.totalMemorySizeInBytes = totalMemorySizeInBytes; } + public void expandTotalMemorySizeInBytes(long totalMemorySizeInBytes) { + this.totalMemorySizeInBytes += totalMemorySizeInBytes; + } + public void setTotalMemorySizeInBytesWithReload(long totalMemorySizeInBytes) { reAllocateMemoryAccordingToRatio((double) totalMemorySizeInBytes / this.totalMemorySizeInBytes); } @@ -460,6 +472,11 @@ public class MemoryManager { return allocatedMemorySizeInBytes; } + /** Get used memory ratio */ + public double getUsedMemoryRatio() { + return (double) getUsedMemorySizeInBytes() / totalMemorySizeInBytes; + } + /** Get actual used memory size in bytes of memory manager */ public long getUsedMemorySizeInBytes() { long memorySize = @@ -482,8 +499,16 @@ public class MemoryManager { private static final MemoryManager GLOBAL = new MemoryManager("GlobalMemoryManager", null, Runtime.getRuntime().totalMemory()); - - private MemoryManagerHolder() {} + private static final MemoryPeriodicalJobExecutor EXECUTOR = + new MemoryPeriodicalJobExecutor( + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + ThreadName.MEMORY_PERIODICAL_JOB_EXECUTOR.getName()), + 20); + + private MemoryManagerHolder() { + EXECUTOR.register( + "GlobalMemoryManager#updateAllocate()", MemoryManagerHolder.GLOBAL::updateAllocate, 60); + } } // endregion @@ -514,10 +539,62 @@ public class MemoryManager { sb.append(this); LOGGER.info(sb.toString()); for (IMemoryBlock block : allocatedMemoryBlocks.values()) { - block.print(index + 1); + block.print(index + 2); } for (MemoryManager child : children.values()) { child.print(index + 1); } } + + /** Whether is able to shrink */ + public synchronized long shrink() { + long shrinkSize = + Math.min( + getAvailableMemorySizeInBytes() / 10, + totalMemorySizeInBytes - allocatedMemorySizeInBytes * 9 / 10); + totalMemorySizeInBytes -= shrinkSize; + return shrinkSize; + } + + /** Whether is available to shrink */ + public boolean isAvailableToShrink() { + return allocateTotalMemorySizeInBytes - totalMemorySizeInBytes + < allocateTotalMemorySizeInBytes / 10; + } + + public void updateAllocate() { + if (children.isEmpty()) { + double ratio = (double) totalMemorySizeInBytes / allocateTotalMemorySizeInBytes; + for (IMemoryBlock memoryBlock : allocatedMemoryBlocks.values()) { + memoryBlock.resizeByRatio(ratio); + } + } else { + MemoryManager higherMemoryManager = null; + MemoryManager lowerMemoryManager = null; + // search the highest and lowest memory manager + for (MemoryManager child : children.values()) { + if (higherMemoryManager == null) { + higherMemoryManager = child; + lowerMemoryManager = child; + } else { + if (child.getUsedMemorySizeInBytes() > higherMemoryManager.getUsedMemorySizeInBytes()) { + higherMemoryManager = child; + } + if (lowerMemoryManager.isAvailableToShrink() + && child.getUsedMemorySizeInBytes() < lowerMemoryManager.getUsedMemorySizeInBytes()) { + lowerMemoryManager = child; + } + } + } + if (higherMemoryManager != null && !higherMemoryManager.equals(lowerMemoryManager)) { + // transfer + long transferSize = lowerMemoryManager.shrink(); + higherMemoryManager.expandTotalMemorySizeInBytes(transferSize); + LOGGER.info("Transfer Memory Size from {} to {}", higherMemoryManager, lowerMemoryManager); + } + for (MemoryManager memoryManager : children.values()) { + memoryManager.updateAllocate(); + } + } + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryPeriodicalJobExecutor.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryPeriodicalJobExecutor.java new file mode 100644 index 00000000000..59662dc5a29 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryPeriodicalJobExecutor.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.memory; + +import org.apache.iotdb.commons.concurrent.WrappedRunnable; +import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil; +import org.apache.iotdb.commons.utils.TestOnly; + +import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class MemoryPeriodicalJobExecutor { + private static final Logger LOGGER = LoggerFactory.getLogger(MemoryPeriodicalJobExecutor.class); + + private final ScheduledExecutorService executorService; + private final long minIntervalSeconds; + + private long rounds; + private Future<?> executorFuture; + + private final List<Pair<WrappedRunnable, Long>> periodicalJobs = new CopyOnWriteArrayList<>(); + + public MemoryPeriodicalJobExecutor( + final ScheduledExecutorService executorService, final long minIntervalSeconds) { + this.executorService = executorService; + this.minIntervalSeconds = minIntervalSeconds; + } + + public void register(String id, Runnable periodicalJob, long intervalInSeconds) { + periodicalJobs.add( + new Pair<>( + new WrappedRunnable() { + @Override + public void runMayThrow() { + try { + periodicalJob.run(); + } catch (Exception e) { + LOGGER.warn("Periodical job {} failed.", id, e); + } + } + }, + Math.max(intervalInSeconds / minIntervalSeconds, 1))); + LOGGER.info( + "Memory periodical job {} is registered successfully. Interval: {} seconds.", + id, + Math.max(intervalInSeconds / minIntervalSeconds, 1) * minIntervalSeconds); + } + + public synchronized void start() { + if (executorFuture == null) { + rounds = 0; + + executorFuture = + ScheduledExecutorUtil.safelyScheduleWithFixedDelay( + executorService, + this::execute, + minIntervalSeconds, + minIntervalSeconds, + TimeUnit.SECONDS); + LOGGER.info("Memory periodical job executor is started successfully."); + } + } + + protected void execute() { + ++rounds; + + for (final Pair<WrappedRunnable, Long> periodicalJob : periodicalJobs) { + if (rounds % periodicalJob.right == 0) { + periodicalJob.left.run(); + } + } + } + + public synchronized void stop() { + if (executorFuture != null) { + executorFuture.cancel(false); + executorFuture = null; + LOGGER.info("Memory periodical job executor is stopped successfully."); + } + } + + @TestOnly + public void clear() { + periodicalJobs.clear(); + LOGGER.info("All memory periodical jobs are cleared successfully."); + } +} diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryManagerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryManagerTest.java index 7630d86f911..fa777526120 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryManagerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryManagerTest.java @@ -49,22 +49,22 @@ public class MemoryManagerTest { // create memoryBlock1 in the size of 20 from globalMemoryManager IMemoryBlock memoryBlock1 = - GLOBAL_MEMORY_MANAGER.forceAllocate("Block1", 20, MemoryBlockType.FUNCTION); + GLOBAL_MEMORY_MANAGER.forceAllocate("Block1", 20, MemoryBlockType.STATIC); Assert.assertEquals(80, GLOBAL_MEMORY_MANAGER.getAvailableMemorySizeInBytes()); Assert.assertEquals(20, GLOBAL_MEMORY_MANAGER.getAllocatedMemorySizeInBytes()); Assert.assertEquals("Block1", memoryBlock1.getName()); Assert.assertEquals(20, memoryBlock1.getTotalMemorySizeInBytes()); - Assert.assertEquals(MemoryBlockType.FUNCTION, memoryBlock1.getMemoryBlockType()); + Assert.assertEquals(MemoryBlockType.STATIC, memoryBlock1.getMemoryBlockType()); // create memoryBlock2 in the size of 10 from globalMemoryManager IMemoryBlock memoryBlock2 = GLOBAL_MEMORY_MANAGER.forceAllocateIfSufficient( - "Block2", 10, 0.9f, MemoryBlockType.PERFORMANCE); + "Block2", 10, 0.9f, MemoryBlockType.DYNAMIC); Assert.assertEquals(70, GLOBAL_MEMORY_MANAGER.getAvailableMemorySizeInBytes()); Assert.assertEquals(30, GLOBAL_MEMORY_MANAGER.getAllocatedMemorySizeInBytes()); Assert.assertEquals("Block2", memoryBlock2.getName()); Assert.assertEquals(10, memoryBlock2.getTotalMemorySizeInBytes()); - Assert.assertEquals(MemoryBlockType.PERFORMANCE, memoryBlock2.getMemoryBlockType()); + Assert.assertEquals(MemoryBlockType.DYNAMIC, memoryBlock2.getMemoryBlockType()); // create subMemoryManager in the size of 50 from globalMemoryManager MemoryManager subMemoryManager = @@ -83,11 +83,11 @@ public class MemoryManagerTest { // create memoryBlock4 in the size of 50 from subMemoryManager IMemoryBlock memoryBlock4 = - subMemoryManager.tryAllocate("Block4", 100, size -> size / 2, MemoryBlockType.FUNCTION); + subMemoryManager.tryAllocate("Block4", 100, size -> size / 2, MemoryBlockType.STATIC); Assert.assertEquals(0, GLOBAL_MEMORY_MANAGER.getAvailableMemorySizeInBytes()); Assert.assertEquals("Block4", memoryBlock4.getName()); Assert.assertEquals(50, memoryBlock4.getTotalMemorySizeInBytes()); - Assert.assertEquals(MemoryBlockType.FUNCTION, memoryBlock4.getMemoryBlockType()); + Assert.assertEquals(MemoryBlockType.STATIC, memoryBlock4.getMemoryBlockType()); Assert.assertEquals(0, GLOBAL_MEMORY_MANAGER.getUsedMemorySizeInBytes()); @@ -143,17 +143,17 @@ public class MemoryManagerTest { // create memoryBlock when enable IMemoryBlock memoryBlock5 = - subMemoryManager.forceAllocate("Block5", 10, MemoryBlockType.FUNCTION); + subMemoryManager.forceAllocate("Block5", 10, MemoryBlockType.DYNAMIC); Assert.assertEquals(10, memoryBlock5.getTotalMemorySizeInBytes()); Assert.assertEquals(20, subMemoryManager.getAvailableMemorySizeInBytes()); Assert.assertNull( - subMemoryManager.forceAllocateIfSufficient("Block6", 5, 0.6f, MemoryBlockType.FUNCTION)); + subMemoryManager.forceAllocateIfSufficient("Block6", 5, 0.6f, MemoryBlockType.STATIC)); IMemoryBlock memoryBlock6 = - subMemoryManager.forceAllocateIfSufficient("Block6", 5, 0.8f, MemoryBlockType.FUNCTION); + subMemoryManager.forceAllocateIfSufficient("Block6", 5, 0.8f, MemoryBlockType.DYNAMIC); Assert.assertEquals(5, memoryBlock6.getTotalMemorySizeInBytes()); Assert.assertEquals(15, subMemoryManager.getAvailableMemorySizeInBytes()); IMemoryBlock memoryBlock7 = - subMemoryManager2.tryAllocate("Block7", 5, size -> size / 2, MemoryBlockType.FUNCTION); + subMemoryManager2.tryAllocate("Block7", 5, size -> size / 2, MemoryBlockType.STATIC); Assert.assertEquals(5, memoryBlock7.getTotalMemorySizeInBytes()); Assert.assertEquals(15, subMemoryManager2.getAvailableMemorySizeInBytes());
