This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 71f765b727c Pipe: add timely consistency check for pipe memory control
(#13354)
71f765b727c is described below
commit 71f765b727cbb6cb5e390a21ea263e7cc10caeb0
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Sep 2 18:28:45 2024 +0800
Pipe: add timely consistency check for pipe memory control (#13354)
---
.../db/pipe/resource/memory/PipeMemoryManager.java | 27 ++++++++++++++++++++--
1 file changed, 25 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 8c7bc2c8ed2..bf5db9d0bb2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -66,7 +66,7 @@ public class PipeMemoryManager {
PipeDataNodeAgent.runtime()
.registerPeriodicalJob(
"PipeMemoryManager#tryExpandAll()",
- this::tryExpandAll,
+ this::tryExpandAllAndCheckConsistency,
PipeConfig.getInstance().getPipeMemoryExpanderIntervalSeconds());
}
@@ -290,8 +290,31 @@ public class PipeMemoryManager {
}
}
- public synchronized void tryExpandAll() {
+ public synchronized void tryExpandAllAndCheckConsistency() {
allocatedBlocks.forEach(PipeMemoryBlock::expand);
+
+ long blockSum =
+
allocatedBlocks.stream().mapToLong(PipeMemoryBlock::getMemoryUsageInBytes).sum();
+ if (blockSum != usedMemorySizeInBytes) {
+ LOGGER.warn(
+ "tryExpandAllAndCheckConsistency: memory usage is not consistent
with allocated blocks,"
+ + " usedMemorySizeInBytes is {} but sum of all blocks is {}",
+ usedMemorySizeInBytes,
+ blockSum);
+ }
+
+ long tabletBlockSum =
+ allocatedBlocks.stream()
+ .filter(PipeTabletMemoryBlock.class::isInstance)
+ .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
+ .sum();
+ if (tabletBlockSum != usedMemorySizeInBytesOfTablets) {
+ LOGGER.warn(
+ "tryExpandAllAndCheckConsistency: memory usage of tablets is not
consistent with allocated blocks,"
+ + " usedMemorySizeInBytesOfTablets is {} but sum of all tablet
blocks is {}",
+ usedMemorySizeInBytesOfTablets,
+ tabletBlockSum);
+ }
}
public synchronized void release(PipeMemoryBlock block) {