This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch fix_iot_memory_2
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d1f6557132d9dd80a40c9470357e0e2c9644afb8
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Aug 13 12:25:12 2025 +0800

    fix IoTConsensus memory management
---
 .../logdispatcher/IoTConsensusMemoryManager.java   | 64 ++++++++++++++++++++--
 .../consensus/iot/logdispatcher/LogDispatcher.java | 22 ++------
 .../consensus/iot/logdispatcher/SyncStatus.java    | 18 ++++--
 3 files changed, 77 insertions(+), 27 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
index f993ea386da..9a9d38946fe 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
@@ -41,16 +41,53 @@ public class IoTConsensusMemoryManager {
     MetricService.getInstance().addMetricSet(new 
IoTConsensusMemoryManagerMetrics(this));
   }
 
-  public boolean reserve(IndexedConsensusRequest request, boolean fromQueue) {
+  public boolean reserve(IndexedConsensusRequest request) {
     long prevRef = request.incRef();
     if (prevRef == 0) {
-      return reserve(request.getMemorySize(), fromQueue);
+      boolean reserved = reserve(request.getMemorySize(), true);
+      if (reserved) {
+        logger.info(
+            "Reserving {} bytes for request {} succeeds, current total usage 
{}",
+            request.getMemorySize(),
+            request.getSearchIndex(),
+            memoryBlock.getUsedMemoryInBytes());
+      } else {
+        logger.info(
+            "Reserving {} bytes for request {} fails, current total usage {}",
+            request.getMemorySize(),
+            request.getSearchIndex(),
+            memoryBlock.getUsedMemoryInBytes());
+      }
+      return reserved;
     } else {
+      logger.info(
+          "Skip memory reservation for {} because its ref count is not 0",
+          request.getSearchIndex());
       return true;
     }
   }
 
-  public boolean reserve(long size, boolean fromQueue) {
+  public boolean reserve(Batch batch) {
+    boolean reserved = reserve(batch.getMemorySize(), false);
+    if (reserved) {
+      logger.info(
+          "Reserving {} bytes for batch {}-{} succeeds, current total usage 
{}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          memoryBlock.getUsedMemoryInBytes());
+    } else {
+      logger.info(
+          "Reserving {} bytes for batch {}-{} fails, current total usage {}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          memoryBlock.getUsedMemoryInBytes());
+    }
+    return reserved;
+  }
+
+  private boolean reserve(long size, boolean fromQueue) {
     boolean result =
         fromQueue
             ? memoryBlock.allocateIfSufficient(size, maxMemoryRatioForQueue)
@@ -65,14 +102,29 @@ public class IoTConsensusMemoryManager {
     return result;
   }
 
-  public void free(IndexedConsensusRequest request, boolean fromQueue) {
+  public void free(IndexedConsensusRequest request) {
     long prevRef = request.decRef();
     if (prevRef == 1) {
-      free(request.getMemorySize(), fromQueue);
+      free(request.getMemorySize(), true);
+      logger.info(
+          "Freed {} bytes for request {}, current total usage {}",
+          request.getMemorySize(),
+          request.getSearchIndex(),
+          memoryBlock.getUsedMemoryInBytes());
     }
   }
 
-  public void free(long size, boolean fromQueue) {
+  public void free(Batch batch) {
+    free(batch.getMemorySize(), false);
+    logger.info(
+        "Freed {} bytes for batch {}-{}, current total usage {}",
+        batch.getMemorySize(),
+        batch.getStartIndex(),
+        batch.getEndIndex(),
+        getMemorySizeInByte());
+  }
+
+  private void free(long size, boolean fromQueue) {
     long currentUsedMemory = memoryBlock.release(size);
     if (fromQueue) {
       queueMemorySizeInByte.addAndGet(-size);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 00cd6d1376f..374691bf38b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -283,7 +283,7 @@ public class LogDispatcher {
 
     /** try to offer a request into queue with memory control. */
     public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
-      if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest, true)) {
+      if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest)) {
         return false;
       }
       boolean success;
@@ -291,19 +291,19 @@ public class LogDispatcher {
         success = pendingEntries.offer(indexedConsensusRequest);
       } catch (Throwable t) {
         // If exception occurs during request offer, the reserved memory 
should be released
-        iotConsensusMemoryManager.free(indexedConsensusRequest, true);
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
         throw t;
       }
       if (!success) {
         // If offer failed, the reserved memory should be released
-        iotConsensusMemoryManager.free(indexedConsensusRequest, true);
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
       }
       return success;
     }
 
     /** try to remove a request from queue with memory control. */
     private void releaseReservedMemory(IndexedConsensusRequest 
indexedConsensusRequest) {
-      iotConsensusMemoryManager.free(indexedConsensusRequest, true);
+      iotConsensusMemoryManager.free(indexedConsensusRequest);
     }
 
     public void stop() {
@@ -323,23 +323,13 @@ public class LogDispatcher {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
-      long requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
-        long prevRef = indexedConsensusRequest.decRef();
-        if (prevRef == 1) {
-          requestSize += indexedConsensusRequest.getMemorySize();
-        }
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
       }
       pendingEntries.clear();
-      iotConsensusMemoryManager.free(requestSize, true);
-      requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : bufferedEntries) {
-        long prevRef = indexedConsensusRequest.decRef();
-        if (prevRef == 1) {
-          requestSize += indexedConsensusRequest.getMemorySize();
-        }
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
       }
-      iotConsensusMemoryManager.free(requestSize, true);
       syncStatus.free();
       MetricService.getInstance().removeMetricSet(logDispatcherThreadMetrics);
     }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index c5a426d88b8..3d547c6ad90 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -21,12 +21,16 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
 
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
 public class SyncStatus {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SyncStatus.class);
   private final IoTConsensusConfig config;
   private final IndexController controller;
   private final LinkedList<Batch> pendingBatches = new LinkedList<>();
@@ -45,10 +49,16 @@ public class SyncStatus {
    */
   public synchronized void addNextBatch(Batch batch) throws 
InterruptedException {
     while ((pendingBatches.size() >= 
config.getReplication().getMaxPendingBatchesNum()
-            || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), 
false))
+            || !iotConsensusMemoryManager.reserve(batch))
         && !Thread.interrupted()) {
       wait();
     }
+    LOGGER.info(
+        "Reserved {} bytes for batch {}-{}, current total usage {}",
+        batch.getMemorySize(),
+        batch.getStartIndex(),
+        batch.getEndIndex(),
+        iotConsensusMemoryManager.getMemorySizeInByte());
     pendingBatches.add(batch);
   }
 
@@ -65,7 +75,7 @@ public class SyncStatus {
       while (current.isSynced()) {
         controller.update(current.getEndIndex(), false);
         iterator.remove();
-        iotConsensusMemoryManager.free(current.getMemorySize(), false);
+        iotConsensusMemoryManager.free(current);
         if (iterator.hasNext()) {
           current = iterator.next();
         } else {
@@ -78,13 +88,11 @@ public class SyncStatus {
   }
 
   public synchronized void free() {
-    long size = 0;
     for (Batch pendingBatch : pendingBatches) {
-      size += pendingBatch.getMemorySize();
+      iotConsensusMemoryManager.free(pendingBatch);
     }
     pendingBatches.clear();
     controller.update(0L, true);
-    iotConsensusMemoryManager.free(size, false);
   }
 
   /** Gets the first index that is not currently synchronized. */

Reply via email to