This is an automated email from the ASF dual-hosted git repository.
spricoder pushed a commit to branch feature/memory_transfer
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/feature/memory_transfer by
this push:
new c8af61f857f Add Runtime() Adaption
c8af61f857f is described below
commit c8af61f857f462e1385b69010f956845bfc299fb
Author: spricoder <[email protected]>
AuthorDate: Sat Feb 22 18:11:13 2025 +0800
Add Runtime() Adaption
---
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 ++--
.../memory/LoadTsFileDataCacheMemoryBlock.java | 2 +-
.../load/memory/LoadTsFileMemoryManager.java | 2 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 9 ++++++++
.../iotdb/commons/conf/CommonDescriptor.java | 4 ++++
.../apache/iotdb/commons/memory/MemoryManager.java | 25 ++++++++++++++++----
.../iotdb/commons/memory/MemoryRuntimeAgent.java | 27 +++++++++++++++++++++-
.../iotdb/commons/memory/MemoryManagerTest.java | 2 +-
8 files changed, 64 insertions(+), 11 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index ec1f2c7b795..8fce6c352ad 100755
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2999,8 +2999,8 @@ public class IoTDBDescriptor {
long newSize =
storageEngineMemoryManager.getTotalMemorySizeInBytes()
+ consensusMemoryManager.getTotalMemorySizeInBytes();
- consensusMemoryManager.setTotalMemorySizeInBytes(0);
- storageEngineMemoryManager.setTotalMemorySizeInBytesWithReload(newSize);
+ consensusMemoryManager.setTotalAllocatedMemorySizeInBytes(0);
+
storageEngineMemoryManager.setTotalAllocatedMemorySizeInBytesWithReload(newSize);
SystemInfo.getInstance().loadWriteMemory();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
index 4611df37aa8..6c65949e879 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
@@ -74,7 +74,7 @@ public class LoadTsFileDataCacheMemoryBlock extends
LoadTsFileAbstractMemoryBloc
@Override
public synchronized void forceResize(long newSizeInBytes) {
throw new UnsupportedOperationException(
- "setTotalMemorySizeInBytes is not supported for
LoadTsFileDataCacheMemoryBlock");
+ "setTotalAllocatedMemorySizeInBytes is not supported for
LoadTsFileDataCacheMemoryBlock");
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
index 095c24a8cc7..42c6846b11b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
@@ -117,7 +117,7 @@ public class LoadTsFileMemoryManager {
if (memoryBlock.getMemoryUsageInBytes() > newSizeInBytes) {
LOGGER.error(
- "Load: Failed to setTotalMemorySizeInBytes memory block {} to {}
bytes, current memory usage {} bytes",
+ "Load: Failed to setTotalAllocatedMemorySizeInBytes memory block
{} to {} bytes, current memory usage {} bytes",
memoryBlock,
newSizeInBytes,
memoryBlock.getMemoryUsageInBytes());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 0b92e90bc2d..fe376bc6366 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -373,6 +373,7 @@ public class CommonConfig {
// memory management
private boolean enableMemoryTransfer = false;
+ private boolean enableMemoryAdapt = false;
private long memoryCheckIntervalInS = 20;
CommonConfig() {
@@ -1696,4 +1697,12 @@ public class CommonConfig {
public void setEnableMemoryTransfer(boolean enableMemoryTransfer) {
this.enableMemoryTransfer = enableMemoryTransfer;
}
+
+ public boolean isEnableMemoryAdapt() {
+ return enableMemoryAdapt;
+ }
+
+ public void setEnableMemoryAdapt(boolean enableMemoryAdapt) {
+ this.enableMemoryAdapt = enableMemoryAdapt;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index d787035f064..760935dde93 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -263,6 +263,10 @@ public class CommonDescriptor {
Integer.parseInt(
properties.getProperty(
"memory_check_interval",
String.valueOf(config.getMemoryCheckIntervalInS()))));
+ config.setEnableMemoryAdapt(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_memory_adapt",
Boolean.toString(config.isEnableMemoryAdapt()))));
}
private void loadPipeProps(TrimProperties properties) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java
index be4210a6226..d577d2e90bc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryManager.java
@@ -47,7 +47,7 @@ public class MemoryManager {
private final boolean enable;
/** The total allocate memory size in byte of memory manager */
- private final long totalAllocatedMemorySizeInBytes;
+ private long totalAllocatedMemorySizeInBytes;
/** The total memory size in byte of memory manager */
private long totalMemorySizeInBytes;
@@ -354,7 +354,10 @@ public class MemoryManager {
*/
private void reAllocateMemoryAccordingToRatio(double ratio) {
// first increase the total memory size of this memory manager
+ long beforeTotalMemorySizeInBytes = this.totalMemorySizeInBytes;
this.totalMemorySizeInBytes *= ratio;
+ this.totalAllocatedMemorySizeInBytes +=
+ (this.totalMemorySizeInBytes - beforeTotalMemorySizeInBytes);
// then re-allocate memory for all memory blocks
for (IMemoryBlock block : allocatedMemoryBlocks.values()) {
block.setTotalMemorySizeInBytes((long)
(block.getTotalMemorySizeInBytes() * ratio));
@@ -448,16 +451,28 @@ public class MemoryManager {
return totalMemorySizeInBytes;
}
- public void setTotalMemorySizeInBytes(long totalMemorySizeInBytes) {
- this.totalMemorySizeInBytes = totalMemorySizeInBytes;
+ public void setTotalAllocatedMemorySizeInBytes(long
totalAllocatedMemorySizeInBytes) {
+ this.totalMemorySizeInBytes +=
+ (totalAllocatedMemorySizeInBytes -
this.totalAllocatedMemorySizeInBytes);
+ this.totalAllocatedMemorySizeInBytes = totalAllocatedMemorySizeInBytes;
+ }
+
+ public void setTotalAllocatedMemorySizeInBytesWithReload(long
totalAllocatedMemorySizeInBytes) {
+ long beforeTotalMemorySizeInBytes = this.totalMemorySizeInBytes;
+ long afterTotalMemorySizeInBytes =
+ this.totalMemorySizeInBytes
+ + totalAllocatedMemorySizeInBytes
+ - this.totalAllocatedMemorySizeInBytes;
+ reAllocateMemoryAccordingToRatio(
+ (double) afterTotalMemorySizeInBytes / beforeTotalMemorySizeInBytes);
}
public void expandTotalMemorySizeInBytes(long totalMemorySizeInBytes) {
this.totalMemorySizeInBytes += totalMemorySizeInBytes;
}
- public void setTotalMemorySizeInBytesWithReload(long totalMemorySizeInBytes)
{
- reAllocateMemoryAccordingToRatio((double) totalMemorySizeInBytes /
this.totalMemorySizeInBytes);
+ public long getTotalAllocatedMemorySizeInBytes() {
+ return totalAllocatedMemorySizeInBytes;
}
/** Get available memory size in bytes of memory manager */
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryRuntimeAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryRuntimeAgent.java
index 40e5e5aa478..850d7cb94a7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryRuntimeAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryRuntimeAgent.java
@@ -36,8 +36,11 @@ public class MemoryRuntimeAgent implements IService {
private static final Logger LOGGER =
LoggerFactory.getLogger(MemoryRuntimeAgent.class);
private static final CommonConfig CONFIG =
CommonDescriptor.getInstance().getConfig();
private static final boolean ENABLE_MEMORY_TRANSFER =
CONFIG.isEnableMemoryTransfer();
+ private static final boolean ENABLE_MEMORY_ADAPT =
CONFIG.isEnableMemoryAdapt();
private static final long MEMORY_CHECK_INTERVAL_IN_S =
CONFIG.getMemoryCheckIntervalInS();
- private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+
+ private static final double ratio = 0.05;
+ private static final AtomicBoolean isShutdown = new AtomicBoolean(false);
private static final MemoryPeriodicalJobExecutor memoryPeriodicalJobExecutor
=
new MemoryPeriodicalJobExecutor(
@@ -58,10 +61,32 @@ public class MemoryRuntimeAgent implements IService {
MemoryManager.global()::updateAllocate,
MEMORY_CHECK_INTERVAL_IN_S);
}
+ if (ENABLE_MEMORY_ADAPT) {
+ LOGGER.info(
+ "Enable automatic memory adapt with an interval of {} s",
MEMORY_CHECK_INTERVAL_IN_S);
+ MemoryRuntimeAgent.getInstance()
+ .registerPeriodicalJob(
+ "MemoryRuntimeAgent#adaptTotalMemory()",
+ this::adaptTotalMemory,
+ MEMORY_CHECK_INTERVAL_IN_S);
+ }
isShutdown.set(false);
}
+ private void adaptTotalMemory() {
+ long totalMemory = Runtime.getRuntime().totalMemory();
+ MemoryManager memoryManager =
MemoryManager.global().getMemoryManager("OnHeap");
+ if (memoryManager != null) {
+ long originMemorySize =
memoryManager.getTotalAllocatedMemorySizeInBytes();
+ if (totalMemory >= (1 + ratio) * originMemorySize
+ || totalMemory <= (1 - ratio) * originMemorySize) {
+ LOGGER.info("Total memory size changed from {} to {}",
originMemorySize, totalMemory);
+
memoryManager.setTotalAllocatedMemorySizeInBytesWithReload(totalMemory);
+ }
+ }
+ }
+
@Override
public void stop() {
if (isShutdown.get()) {
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryManagerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryManagerTest.java
index fa777526120..35cc5006e7b 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryManagerTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/memory/MemoryManagerTest.java
@@ -28,7 +28,7 @@ public class MemoryManagerTest {
@Before
public void reset() {
GLOBAL_MEMORY_MANAGER.clearAll();
- GLOBAL_MEMORY_MANAGER.setTotalMemorySizeInBytes(100);
+ GLOBAL_MEMORY_MANAGER.setTotalAllocatedMemorySizeInBytes(100);
}
@Test