This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 345efb7d6eb Fix memory leak when allocation failure in IoTConsensus 
queue (#16966)
345efb7d6eb is described below

commit 345efb7d6eba48bed188d868ad082d825c777da4
Author: Jiang Tian <[email protected]>
AuthorDate: Mon Dec 29 20:47:49 2025 +0800

    Fix memory leak when allocation failure in IoTConsensus queue (#16966)
    
    * Fix memory leak when allocation failure in IoTConsensus queue.
    
    * spotless
---
 .../logdispatcher/IoTConsensusMemoryManager.java   |  11 ++
 .../IoTConsensusMemoryManagerTest.java             | 207 +++++++++++++++++++++
 2 files changed, 218 insertions(+)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
index bb46f20486f..22e5484f5a2 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
@@ -98,6 +98,7 @@ public class IoTConsensusMemoryManager {
         result = queueMemorySizeInByte.addAndGet(size) < 
maxMemorySizeForQueueInByte;
         if (!result) {
           queueMemorySizeInByte.addAndGet(-size);
+          memorySizeInByte.addAndGet(-size);
         }
       } else {
         syncMemorySizeInByte.addAndGet(size);
@@ -172,6 +173,16 @@ public class IoTConsensusMemoryManager {
     return syncMemorySizeInByte.get();
   }
 
+  @TestOnly
+  public Long getMaxMemorySizeInByte() {
+    return maxMemorySizeInByte;
+  }
+
+  @TestOnly
+  public Long getMaxMemorySizeForQueueInByte() {
+    return maxMemorySizeForQueueInByte;
+  }
+
   private static final IoTConsensusMemoryManager INSTANCE = new 
IoTConsensusMemoryManager();
 
   public static IoTConsensusMemoryManager getInstance() {
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java
new file mode 100644
index 00000000000..f87d8cd7f98
--- /dev/null
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.iot.logdispatcher;
+
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
+import org.apache.iotdb.consensus.config.IoTConsensusConfig;
+import org.apache.iotdb.consensus.iot.thrift.TLogEntry;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class IoTConsensusMemoryManagerTest {
+
+  @Test
+  public void testAllocateQueue() {
+    IoTConsensusMemoryManager memoryManager = 
IoTConsensusMemoryManager.getInstance();
+    long maxMemory = memoryManager.getMaxMemorySizeForQueueInByte();
+
+    long occupiedMemory = 0;
+    IndexedConsensusRequest request;
+    List<IndexedConsensusRequest> requestList = new ArrayList<>();
+    while (occupiedMemory <= maxMemory) {
+      request =
+          new IndexedConsensusRequest(
+              0,
+              Collections.singletonList(
+                  new ByteBufferConsensusRequest(ByteBuffer.wrap(new byte[4 * 
1024 * 1024]))));
+      request.buildSerializedRequests();
+      long requestSize = request.getMemorySize();
+      if (occupiedMemory + requestSize < maxMemory) {
+        boolean reserved = memoryManager.reserve(request);
+        assertTrue(reserved);
+        occupiedMemory += requestSize;
+        assertEquals(occupiedMemory, memoryManager.getQueueMemorySizeInByte());
+        assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
+        requestList.add(request);
+      } else {
+        assertFalse(memoryManager.reserve(request));
+        break;
+      }
+    }
+    assertTrue(memoryManager.getMemorySizeInByte() <= maxMemory);
+
+    for (IndexedConsensusRequest indexedConsensusRequest : requestList) {
+      memoryManager.free(indexedConsensusRequest);
+      occupiedMemory -= indexedConsensusRequest.getMemorySize();
+      assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
+      assertEquals(occupiedMemory, memoryManager.getQueueMemorySizeInByte());
+    }
+  }
+
+  @Test
+  public void testAllocateBatch() {
+    IoTConsensusMemoryManager memoryManager = 
IoTConsensusMemoryManager.getInstance();
+    long maxMemory = memoryManager.getQueueMemorySizeInByte();
+
+    long occupiedMemory = 0;
+
+    Batch batch;
+    int batchSize = 5;
+    List<Batch> batchList = new ArrayList<>();
+    while (occupiedMemory < maxMemory) {
+      batch = new Batch(IoTConsensusConfig.newBuilder().build());
+      for (int i = 0; i < batchSize; i++) {
+        IndexedConsensusRequest request;
+        request =
+            new IndexedConsensusRequest(
+                0,
+                Collections.singletonList(
+                    new ByteBufferConsensusRequest(ByteBuffer.wrap(new 
byte[1024 * 1024]))));
+        batch.addTLogEntry(
+            new TLogEntry(
+                request.getSerializedRequests(),
+                request.getSearchIndex(),
+                false,
+                request.getMemorySize()));
+      }
+
+      long requestSize = batch.getMemorySize();
+      if (occupiedMemory + requestSize < maxMemory) {
+        assertTrue(memoryManager.reserve(batch));
+        occupiedMemory += requestSize;
+        assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
+        batchList.add(batch);
+      } else {
+        assertFalse(memoryManager.reserve(batch));
+      }
+    }
+    assertTrue(memoryManager.getMemorySizeInByte() <= maxMemory);
+
+    for (Batch b : batchList) {
+      memoryManager.free(b);
+      occupiedMemory -= b.getMemorySize();
+      assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
+    }
+  }
+
+  @Test
+  public void testAllocateMixed() {
+    IoTConsensusMemoryManager memoryManager = 
IoTConsensusMemoryManager.getInstance();
+    long maxMemory = memoryManager.getMaxMemorySizeForQueueInByte();
+
+    long occupiedMemory = 0;
+    IndexedConsensusRequest request;
+    List<IndexedConsensusRequest> requestList = new ArrayList<>();
+    Batch batch;
+    int batchSize = 5;
+    List<Batch> batchList = new ArrayList<>();
+
+    int i = 0;
+    while (occupiedMemory <= maxMemory) {
+      if (i % 2 == 0) {
+        request =
+            new IndexedConsensusRequest(
+                0,
+                Collections.singletonList(
+                    new ByteBufferConsensusRequest(ByteBuffer.wrap(new byte[4 
* 1024 * 1024]))));
+        request.buildSerializedRequests();
+        long requestSize = request.getMemorySize();
+        if (occupiedMemory + requestSize < maxMemory) {
+          boolean reserved = memoryManager.reserve(request);
+          assertTrue(reserved);
+          occupiedMemory += requestSize;
+          assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
+          requestList.add(request);
+        } else {
+          assertFalse(memoryManager.reserve(request));
+          break;
+        }
+      } else {
+        batch = new Batch(IoTConsensusConfig.newBuilder().build());
+        for (int j = 0; j < batchSize; j++) {
+          IndexedConsensusRequest batchRequest;
+          batchRequest =
+              new IndexedConsensusRequest(
+                  0,
+                  Collections.singletonList(
+                      new ByteBufferConsensusRequest(ByteBuffer.wrap(new 
byte[1024 * 1024]))));
+          batch.addTLogEntry(
+              new TLogEntry(
+                  batchRequest.getSerializedRequests(),
+                  batchRequest.getSearchIndex(),
+                  false,
+                  batchRequest.getMemorySize()));
+        }
+
+        long requestSize = batch.getMemorySize();
+        if (occupiedMemory + requestSize < maxMemory) {
+          assertTrue(memoryManager.reserve(batch));
+          occupiedMemory += requestSize;
+          assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
+          batchList.add(batch);
+        } else {
+          assertFalse(memoryManager.reserve(batch));
+        }
+      }
+      i++;
+    }
+    assertTrue(memoryManager.getMemorySizeInByte() <= maxMemory);
+
+    while (!requestList.isEmpty() || !batchList.isEmpty()) {
+      if (!requestList.isEmpty()) {
+        request = requestList.remove(0);
+        memoryManager.free(request);
+        occupiedMemory -= request.getMemorySize();
+        assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
+        i--;
+      }
+      if (!batchList.isEmpty()) {
+        batch = batchList.remove(0);
+        memoryManager.free(batch);
+        occupiedMemory -= batch.getMemorySize();
+        assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte());
+        i--;
+      }
+    }
+    assertEquals(0, i);
+    assertEquals(0, memoryManager.getMemorySizeInByte());
+    assertEquals(0, memoryManager.getQueueMemorySizeInByte());
+  }
+}

Reply via email to