This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 71f765b727c Pipe: add timely consistency check for pipe memory control 
(#13354)
71f765b727c is described below

commit 71f765b727cbb6cb5e390a21ea263e7cc10caeb0
Author: Zikun Ma <[email protected]>
AuthorDate: Mon Sep 2 18:28:45 2024 +0800

    Pipe: add timely consistency check for pipe memory control (#13354)
---
 .../db/pipe/resource/memory/PipeMemoryManager.java | 27 ++++++++++++++++++++--
 1 file changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 8c7bc2c8ed2..bf5db9d0bb2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -66,7 +66,7 @@ public class PipeMemoryManager {
     PipeDataNodeAgent.runtime()
         .registerPeriodicalJob(
             "PipeMemoryManager#tryExpandAll()",
-            this::tryExpandAll,
+            this::tryExpandAllAndCheckConsistency,
             PipeConfig.getInstance().getPipeMemoryExpanderIntervalSeconds());
   }
 
@@ -290,8 +290,31 @@ public class PipeMemoryManager {
     }
   }
 
-  public synchronized void tryExpandAll() {
+  public synchronized void tryExpandAllAndCheckConsistency() {
     allocatedBlocks.forEach(PipeMemoryBlock::expand);
+
+    long blockSum =
+        
allocatedBlocks.stream().mapToLong(PipeMemoryBlock::getMemoryUsageInBytes).sum();
+    if (blockSum != usedMemorySizeInBytes) {
+      LOGGER.warn(
+          "tryExpandAllAndCheckConsistency: memory usage is not consistent 
with allocated blocks,"
+              + " usedMemorySizeInBytes is {} but sum of all blocks is {}",
+          usedMemorySizeInBytes,
+          blockSum);
+    }
+
+    long tabletBlockSum =
+        allocatedBlocks.stream()
+            .filter(PipeTabletMemoryBlock.class::isInstance)
+            .mapToLong(PipeMemoryBlock::getMemoryUsageInBytes)
+            .sum();
+    if (tabletBlockSum != usedMemorySizeInBytesOfTablets) {
+      LOGGER.warn(
+          "tryExpandAllAndCheckConsistency: memory usage of tablets is not 
consistent with allocated blocks,"
+              + " usedMemorySizeInBytesOfTablets is {} but sum of all tablet 
blocks is {}",
+          usedMemorySizeInBytesOfTablets,
+          tabletBlockSum);
+    }
   }
 
   public synchronized void release(PipeMemoryBlock block) {

Reply via email to