This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch add_rpc_memory_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9501c61631f59caeb3a4f30209cbe82ef27ae02b
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Jun 11 18:08:04 2026 +0800

    fix review
---
 .../rpc/TCompressedElasticFramedTransport.java     | 12 +++++
 .../apache/iotdb/rpc/TElasticFramedTransport.java  | 16 ++++++-
 .../apache/iotdb/rpc/AutoResizingBufferTest.java   | 54 ++++++++++++++++++++++
 .../iotdb/confignode/service/ConfigNode.java       |  2 +
 .../apache/iotdb/consensus/ratis/utils/Utils.java  | 11 +++--
 5 files changed, 91 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index 3f423771dda..88c9683db97 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -41,10 +41,22 @@ public abstract class TCompressedElasticFramedTransport 
extends TElasticFramedTr
       writeCompressBuffer = new 
AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
       readCompressBuffer = new 
AutoScalingBufferReadTransport(thriftDefaultBufferSize);
     } catch (IOException e) {
+      closeAllocatedBuffers();
       throw new TTransportException(e);
     }
   }
 
+  @Override
+  protected void closeAllocatedBuffers() {
+    super.closeAllocatedBuffers();
+    if (writeCompressBuffer != null) {
+      writeCompressBuffer.close();
+    }
+    if (readCompressBuffer != null) {
+      readCompressBuffer.close();
+    }
+  }
+
   @Override
   protected void readFrame() throws TTransportException {
     underlying.readAll(i32buf, 0, 4);
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index f3208b07234..db3d2cc7d95 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -95,6 +95,7 @@ public class TElasticFramedTransport extends TTransport {
       readBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize);
       writeBuffer = new 
AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
     } catch (IOException e) {
+      closeAllocatedBuffers();
       throw new TTransportException(e);
     }
   }
@@ -121,7 +122,20 @@ public class TElasticFramedTransport extends TTransport {
 
   @Override
   public void close() {
-    underlying.close();
+    try {
+      underlying.close();
+    } finally {
+      closeAllocatedBuffers();
+    }
+  }
+
+  protected void closeAllocatedBuffers() {
+    if (readBuffer != null) {
+      readBuffer.close();
+    }
+    if (writeBuffer != null) {
+      writeBuffer.close();
+    }
   }
 
   @Override
diff --git 
a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java
 
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java
index f862a76733c..802b9669390 100644
--- 
a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java
+++ 
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.rpc;
 
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TTransportException;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -77,6 +79,58 @@ public class AutoResizingBufferTest {
     }
   }
 
+  @Test
+  public void testElasticFramedTransportReleasesMemoryWhenClosed() throws 
Exception {
+    AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+
+    TElasticFramedTransport transport =
+        new TElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true);
+    Assert.assertEquals(200, memoryControl.getUsedMemoryInBytes());
+
+    transport.close();
+    Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+  }
+
+  @Test
+  public void testElasticFramedTransportReleasesMemoryWhenConstructorFails() {
+    memoryControl.setLimit(150);
+    AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+    AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1);
+
+    try {
+      new TElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true);
+      Assert.fail("Expected TTransportException");
+    } catch (TTransportException e) {
+      Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+    }
+  }
+
+  @Test
+  public void testSnappyElasticFramedTransportReleasesMemory() throws 
Exception {
+    AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+
+    TSnappyElasticFramedTransport transport =
+        new TSnappyElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, 
true);
+    Assert.assertEquals(400, memoryControl.getUsedMemoryInBytes());
+
+    transport.close();
+    Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+  }
+
+  @Test
+  public void 
testSnappyElasticFramedTransportReleasesMemoryWhenConstructorFails() {
+    memoryControl.setLimit(350);
+    AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+    AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1);
+
+    try {
+      new TSnappyElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true);
+      Assert.fail("Expected TTransportException");
+    } catch (TTransportException e) {
+      Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+    }
+  }
+
   private void setLastShrinkTime(AutoResizingBuffer buffer, long 
lastShrinkTime) throws Exception {
     Field field = AutoResizingBuffer.class.getDeclaredField("lastShrinkTime");
     field.setAccessible(true);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 037f138286a..1d3c5128599 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.memory.MemoryConfig;
 import org.apache.iotdb.commons.service.JMXService;
 import org.apache.iotdb.commons.service.RegisterManager;
 import org.apache.iotdb.commons.service.ServiceType;
@@ -142,6 +143,7 @@ public class ConfigNode extends ServerCommandLine 
implements ConfigNodeMBean {
       LOGGER.info(ConfigNodeMessages.STARTING_IOTDB, 
IoTDBConstant.VERSION_WITH_BUILD);
       ConfigNodeStartupCheck checks = new 
ConfigNodeStartupCheck(IoTDBConstant.CN_ROLE);
       checks.startUpCheck();
+      MemoryConfig.getInstance();
     } catch (StartupException | ConfigurationException | IOException e) {
       LOGGER.error(ConfigNodeMessages.MEET_ERROR_WHEN_DOING_START_CHECKING, e);
       throw new IoTDBException(ConfigNodeMessages.ERROR_STARTING, -1);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
index 8244cdf1bc5..7a832b53067 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
@@ -67,6 +67,7 @@ import java.nio.file.AccessDeniedException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.security.KeyStore;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -192,9 +193,13 @@ public class Utils {
   public static ByteBuffer serializeTSStatus(TSStatus status) throws 
TException {
     AutoScalingBufferWriteTransport byteBuffer =
         createAutoScalingBufferWriteTransport(TEMP_BUFFER_SIZE);
-    TCompactProtocol protocol = new TCompactProtocol(byteBuffer);
-    status.write(protocol);
-    return ByteBuffer.wrap(byteBuffer.getBuffer());
+    try {
+      TCompactProtocol protocol = new TCompactProtocol(byteBuffer);
+      status.write(protocol);
+      return ByteBuffer.wrap(Arrays.copyOf(byteBuffer.getBuffer(), 
byteBuffer.getPos()));
+    } finally {
+      byteBuffer.close();
+    }
   }
 
   private static AutoScalingBufferWriteTransport 
createAutoScalingBufferWriteTransport(

Reply via email to