This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cp_iot_1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 862298865aa3242f41c61f7edea83c5d56b9582a
Author: Jiang Tian <[email protected]>
AuthorDate: Thu Aug 14 17:56:46 2025 +0800

    Fix ref count of IoTConsensus request not decreased in allocation failure 
(#16169)
    
    * fix IoTConsensus memory management
    
    * Fix ref count of IoTConsensus request not decreased in allocation failure
    
    * fix log level
---
 .../logdispatcher/IoTConsensusMemoryManager.java   | 134 +++++++++++++++------
 .../consensus/iot/logdispatcher/LogDispatcher.java |  26 +---
 .../consensus/iot/logdispatcher/SyncStatus.java    |  20 ++-
 .../IoTConsensusMemoryManagerTest.java             | 103 ++++++++++++++++
 4 files changed, 219 insertions(+), 64 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
index 3e7893b4c44..75516cb6592 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.iot.logdispatcher;
 
 import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 
 import org.slf4j.Logger;
@@ -40,61 +41,103 @@ public class IoTConsensusMemoryManager {
     MetricService.getInstance().addMetricSet(new 
IoTConsensusMemoryManagerMetrics(this));
   }
 
-  public boolean reserve(IndexedConsensusRequest request, boolean fromQueue) {
-    synchronized (request) {
-      long prevRef = request.incRef();
-      if (prevRef == 0) {
-        return reserve(request.getMemorySize(), fromQueue);
+
+  public boolean reserve(IndexedConsensusRequest request) {
+    long prevRef = request.incRef();
+    if (prevRef == 0) {
+      boolean reserved = reserve(request.getMemorySize(), true);
+      if (reserved) {
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Reserving {} bytes for request {} succeeds, current total usage 
{}",
+              request.getMemorySize(),
+              request.getSearchIndex(),
+              memorySizeInByte.get());
+        }
       } else {
-        return true;
+        request.decRef();
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "Reserving {} bytes for request {} fails, current total usage 
{}",
+              request.getMemorySize(),
+              request.getSearchIndex(),
+              memorySizeInByte.get());
+        }
       }
+      return reserved;
+    } else if (logger.isDebugEnabled()) {
+      logger.debug(
+          "Skip memory reservation for {} because its ref count is not 0",
+          request.getSearchIndex());
+    }
+    return true;
+  }
+
+  public boolean reserve(Batch batch) {
+    boolean reserved = reserve(batch.getMemorySize(), false);
+    if (reserved && logger.isDebugEnabled()) {
+      logger.debug(
+          "Reserving {} bytes for batch {}-{} succeeds, current total usage 
{}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          memorySizeInByte.get());
+    } else if (logger.isDebugEnabled()) {
+      logger.debug(
+          "Reserving {} bytes for batch {}-{} fails, current total usage {}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          memorySizeInByte.get());
     }
+    return reserved;
   }
 
-  public boolean reserve(long size, boolean fromQueue) {
-    AtomicBoolean result = new AtomicBoolean(false);
-    memorySizeInByte.updateAndGet(
-        memorySize -> {
-          long remainSize =
-              (fromQueue ? maxMemorySizeForQueueInByte : maxMemorySizeInByte) 
- memorySize;
-          if (size > remainSize) {
-            logger.debug(
-                "consensus memory limited. required: {}, used: {}, total: {}",
-                size,
-                memorySize,
-                maxMemorySizeInByte);
-            result.set(false);
-            return memorySize;
-          } else {
-            logger.debug(
-                "{} add {} bytes, total memory size: {} bytes.",
-                Thread.currentThread().getName(),
-                size,
-                memorySize + size);
-            result.set(true);
-            return memorySize + size;
-          }
-        });
-    if (result.get()) {
+  private boolean reserve(long size, boolean fromQueue) {
+    boolean result =
+        memorySizeInByte.addAndGet(size) < maxMemorySizeInByte;
+    if (result) {
       if (fromQueue) {
-        queueMemorySizeInByte.addAndGet(size);
+        result = queueMemorySizeInByte.addAndGet(size) < 
maxMemorySizeForQueueInByte;
+        if (!result) {
+          queueMemorySizeInByte.addAndGet(-size);
+        }
       } else {
         syncMemorySizeInByte.addAndGet(size);
       }
+    } else {
+      memorySizeInByte.addAndGet(-size);
     }
-    return result.get();
+    return result;
   }
 
-  public void free(IndexedConsensusRequest request, boolean fromQueue) {
-    synchronized (request) {
-      long prevRef = request.decRef();
-      if (prevRef == 0) {
-        free(request.getMemorySize(), fromQueue);
+  public void free(IndexedConsensusRequest request) {
+    long prevRef = request.decRef();
+    if (prevRef == 1) {
+      free(request.getMemorySize(), true);
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Freed {} bytes for request {}, current total usage {}",
+            request.getMemorySize(),
+            request.getSearchIndex(),
+            memorySizeInByte.get());
       }
     }
   }
 
-  public void free(long size, boolean fromQueue) {
+  public void free(Batch batch) {
+    free(batch.getMemorySize(), false);
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "Freed {} bytes for batch {}-{}, current total usage {}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          getMemorySizeInByte());
+    }
+  }
+
+  private void free(long size, boolean fromQueue) {
     long currentUsedMemory = memorySizeInByte.addAndGet(-size);
     if (fromQueue) {
       queueMemorySizeInByte.addAndGet(-size);
@@ -113,6 +156,19 @@ public class IoTConsensusMemoryManager {
     this.maxMemorySizeForQueueInByte = maxMemorySizeForQueue;
   }
 
+  @TestOnly
+  public void reset() {
+    this.memorySizeInByte.set(0);
+    this.queueMemorySizeInByte.set(0);
+    this.syncMemorySizeInByte.set(0);
+  }
+
+
+  @TestOnly
+  public Double getMaxMemoryRatioForQueue() {
+    return 0.06;
+  }
+
   long getMemorySizeInByte() {
     return memorySizeInByte.get();
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 7f1f91b1fa0..374691bf38b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -283,7 +283,7 @@ public class LogDispatcher {
 
     /** try to offer a request into queue with memory control. */
     public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
-      if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest, true)) {
+      if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest)) {
         return false;
       }
       boolean success;
@@ -291,19 +291,19 @@ public class LogDispatcher {
         success = pendingEntries.offer(indexedConsensusRequest);
       } catch (Throwable t) {
         // If exception occurs during request offer, the reserved memory 
should be released
-        iotConsensusMemoryManager.free(indexedConsensusRequest, true);
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
         throw t;
       }
       if (!success) {
         // If offer failed, the reserved memory should be released
-        iotConsensusMemoryManager.free(indexedConsensusRequest, true);
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
       }
       return success;
     }
 
     /** try to remove a request from queue with memory control. */
     private void releaseReservedMemory(IndexedConsensusRequest 
indexedConsensusRequest) {
-      iotConsensusMemoryManager.free(indexedConsensusRequest, true);
+      iotConsensusMemoryManager.free(indexedConsensusRequest);
     }
 
     public void stop() {
@@ -323,27 +323,13 @@ public class LogDispatcher {
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
-      long requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
-        synchronized (indexedConsensusRequest) {
-          long prevRef = indexedConsensusRequest.decRef();
-          if (prevRef == 1) {
-            requestSize += indexedConsensusRequest.getMemorySize();
-          }
-        }
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
       }
       pendingEntries.clear();
-      iotConsensusMemoryManager.free(requestSize, true);
-      requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : bufferedEntries) {
-        synchronized (indexedConsensusRequest) {
-          long prevRef = indexedConsensusRequest.decRef();
-          if (prevRef == 1) {
-            requestSize += indexedConsensusRequest.getMemorySize();
-          }
-        }
+        iotConsensusMemoryManager.free(indexedConsensusRequest);
       }
-      iotConsensusMemoryManager.free(requestSize, true);
       syncStatus.free();
       MetricService.getInstance().removeMetricSet(logDispatcherThreadMetrics);
     }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index c5a426d88b8..accc9f7667d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -21,12 +21,16 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
 
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
 public class SyncStatus {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SyncStatus.class);
   private final IoTConsensusConfig config;
   private final IndexController controller;
   private final LinkedList<Batch> pendingBatches = new LinkedList<>();
@@ -45,10 +49,18 @@ public class SyncStatus {
    */
   public synchronized void addNextBatch(Batch batch) throws 
InterruptedException {
     while ((pendingBatches.size() >= 
config.getReplication().getMaxPendingBatchesNum()
-            || !iotConsensusMemoryManager.reserve(batch.getMemorySize(), 
false))
+            || !iotConsensusMemoryManager.reserve(batch))
         && !Thread.interrupted()) {
       wait();
     }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug(
+          "Reserved {} bytes for batch {}-{}, current total usage {}",
+          batch.getMemorySize(),
+          batch.getStartIndex(),
+          batch.getEndIndex(),
+          iotConsensusMemoryManager.getMemorySizeInByte());
+    }
     pendingBatches.add(batch);
   }
 
@@ -65,7 +77,7 @@ public class SyncStatus {
       while (current.isSynced()) {
         controller.update(current.getEndIndex(), false);
         iterator.remove();
-        iotConsensusMemoryManager.free(current.getMemorySize(), false);
+        iotConsensusMemoryManager.free(current);
         if (iterator.hasNext()) {
           current = iterator.next();
         } else {
@@ -78,13 +90,11 @@ public class SyncStatus {
   }
 
   public synchronized void free() {
-    long size = 0;
     for (Batch pendingBatch : pendingBatches) {
-      size += pendingBatch.getMemorySize();
+      iotConsensusMemoryManager.free(pendingBatch);
     }
     pendingBatches.clear();
     controller.update(0L, true);
-    iotConsensusMemoryManager.free(size, false);
   }
 
   /** Gets the first index that is not currently synchronized. */
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java
new file mode 100644
index 00000000000..3d89943772e
--- /dev/null
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.iot.logdispatcher;
+
+import org.apache.iotdb.commons.memory.AtomicLongMemoryBlock;
+import org.apache.iotdb.commons.memory.IMemoryBlock;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class IoTConsensusMemoryManagerTest {
+
+  private IMemoryBlock previousMemoryBlock;
+  private long memoryBlockSize = 16 * 1024L;
+
+  @Before
+  public void setUp() throws Exception {
+    previousMemoryBlock = 
IoTConsensusMemoryManager.getInstance().getMemoryBlock();
+    IoTConsensusMemoryManager.getInstance()
+        .setMemoryBlock(new AtomicLongMemoryBlock("Test", null, 
memoryBlockSize));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    
IoTConsensusMemoryManager.getInstance().setMemoryBlock(previousMemoryBlock);
+  }
+
+  @Test
+  public void testSingleReserveAndRelease() {
+    testReserveAndRelease(1);
+  }
+
+  @Test
+  public void testMultiReserveAndRelease() {
+    testReserveAndRelease(3);
+  }
+
+  private void testReserveAndRelease(int numReservation) {
+    int allocationSize = 1;
+    long allocatedSize = 0;
+    List<IndexedConsensusRequest> requestList = new ArrayList<>();
+    while (true) {
+      IndexedConsensusRequest request =
+          new IndexedConsensusRequest(
+              0,
+              Collections.singletonList(
+                  new 
ByteBufferConsensusRequest(ByteBuffer.allocate(allocationSize))));
+      request.buildSerializedRequests();
+      if (allocatedSize + allocationSize
+          <= memoryBlockSize
+              * 
IoTConsensusMemoryManager.getInstance().getMaxMemoryRatioForQueue()) {
+        for (int i = 0; i < numReservation; i++) {
+          assertTrue(IoTConsensusMemoryManager.getInstance().reserve(request));
+          requestList.add(request);
+        }
+      } else {
+        for (int i = 0; i < numReservation; i++) {
+          
assertFalse(IoTConsensusMemoryManager.getInstance().reserve(request));
+        }
+        break;
+      }
+      allocatedSize += allocationSize;
+    }
+
+    assertTrue(
+        IoTConsensusMemoryManager.getInstance().getMemorySizeInByte()
+            <= memoryBlockSize
+                * 
IoTConsensusMemoryManager.getInstance().getMaxMemoryRatioForQueue());
+    for (IndexedConsensusRequest indexedConsensusRequest : requestList) {
+      IoTConsensusMemoryManager.getInstance().free(indexedConsensusRequest);
+    }
+    assertEquals(0, 
IoTConsensusMemoryManager.getInstance().getMemorySizeInByte());
+  }
+}

Reply via email to