This is an automated email from the ASF dual-hosted git repository. jt2594838 pushed a commit to branch add_rpc_memory_control in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9501c61631f59caeb3a4f30209cbe82ef27ae02b Author: Tian Jiang <[email protected]> AuthorDate: Thu Jun 11 18:08:04 2026 +0800 fix review --- .../rpc/TCompressedElasticFramedTransport.java | 12 +++++ .../apache/iotdb/rpc/TElasticFramedTransport.java | 16 ++++++- .../apache/iotdb/rpc/AutoResizingBufferTest.java | 54 ++++++++++++++++++++++ .../iotdb/confignode/service/ConfigNode.java | 2 + .../apache/iotdb/consensus/ratis/utils/Utils.java | 11 +++-- 5 files changed, 91 insertions(+), 4 deletions(-) diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java index 3f423771dda..88c9683db97 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java @@ -41,10 +41,22 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr writeCompressBuffer = new AutoScalingBufferWriteTransport(thriftDefaultBufferSize); readCompressBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize); } catch (IOException e) { + closeAllocatedBuffers(); throw new TTransportException(e); } } + @Override + protected void closeAllocatedBuffers() { + super.closeAllocatedBuffers(); + if (writeCompressBuffer != null) { + writeCompressBuffer.close(); + } + if (readCompressBuffer != null) { + readCompressBuffer.close(); + } + } + @Override protected void readFrame() throws TTransportException { underlying.readAll(i32buf, 0, 4); diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java index f3208b07234..db3d2cc7d95 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java @@ -95,6 +95,7 @@ public class TElasticFramedTransport extends TTransport { readBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize); writeBuffer = new AutoScalingBufferWriteTransport(thriftDefaultBufferSize); } catch (IOException e) { + closeAllocatedBuffers(); throw new TTransportException(e); } } @@ -121,7 +122,20 @@ public class TElasticFramedTransport extends TTransport { @Override public void close() { - underlying.close(); + try { + underlying.close(); + } finally { + closeAllocatedBuffers(); + } + } + + protected void closeAllocatedBuffers() { + if (readBuffer != null) { + readBuffer.close(); + } + if (writeBuffer != null) { + writeBuffer.close(); + } } @Override diff --git a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java index f862a76733c..802b9669390 100644 --- a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java +++ b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java @@ -19,6 +19,8 @@ package org.apache.iotdb.rpc; +import org.apache.thrift.transport.TMemoryBuffer; +import org.apache.thrift.transport.TTransportException; import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -77,6 +79,58 @@ public class AutoResizingBufferTest { } } + @Test + public void testElasticFramedTransportReleasesMemoryWhenClosed() throws Exception { + AutoResizingBufferMemoryManager.setMemoryControl(memoryControl); + + TElasticFramedTransport transport = + new TElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true); + Assert.assertEquals(200, memoryControl.getUsedMemoryInBytes()); + + transport.close(); + Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes()); + } + + @Test + public void testElasticFramedTransportReleasesMemoryWhenConstructorFails() { + memoryControl.setLimit(150); + AutoResizingBufferMemoryManager.setMemoryControl(memoryControl); + AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1); + + try { + new TElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true); + Assert.fail("Expected TTransportException"); + } catch (TTransportException e) { + Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes()); + } + } + + @Test + public void testSnappyElasticFramedTransportReleasesMemory() throws Exception { + AutoResizingBufferMemoryManager.setMemoryControl(memoryControl); + + TSnappyElasticFramedTransport transport = + new TSnappyElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true); + Assert.assertEquals(400, memoryControl.getUsedMemoryInBytes()); + + transport.close(); + Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes()); + } + + @Test + public void testSnappyElasticFramedTransportReleasesMemoryWhenConstructorFails() { + memoryControl.setLimit(350); + AutoResizingBufferMemoryManager.setMemoryControl(memoryControl); + AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1); + + try { + new TSnappyElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true); + Assert.fail("Expected TTransportException"); + } catch (TTransportException e) { + Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes()); + } + } + private void setLastShrinkTime(AutoResizingBuffer buffer, long lastShrinkTime) throws Exception { Field field = AutoResizingBuffer.class.getDeclaredField("lastShrinkTime"); field.setAccessible(true); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java index 037f138286a..1d3c5128599 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java @@ -34,6 +34,7 @@ import org.apache.iotdb.commons.exception.ConfigurationException; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.StartupException; +import org.apache.iotdb.commons.memory.MemoryConfig; import org.apache.iotdb.commons.service.JMXService; import org.apache.iotdb.commons.service.RegisterManager; import org.apache.iotdb.commons.service.ServiceType; @@ -142,6 +143,7 @@ public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { LOGGER.info(ConfigNodeMessages.STARTING_IOTDB, IoTDBConstant.VERSION_WITH_BUILD); ConfigNodeStartupCheck checks = new ConfigNodeStartupCheck(IoTDBConstant.CN_ROLE); checks.startUpCheck(); + MemoryConfig.getInstance(); } catch (StartupException | ConfigurationException | IOException e) { LOGGER.error(ConfigNodeMessages.MEET_ERROR_WHEN_DOING_START_CHECKING, e); throw new IoTDBException(ConfigNodeMessages.ERROR_STARTING, -1); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java index 8244cdf1bc5..7a832b53067 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java @@ -67,6 +67,7 @@ import java.nio.file.AccessDeniedException; import java.nio.file.Files; import java.nio.file.Paths; import java.security.KeyStore; +import java.util.Arrays; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -192,9 +193,13 @@ public class Utils { public static ByteBuffer serializeTSStatus(TSStatus status) throws TException { AutoScalingBufferWriteTransport byteBuffer = createAutoScalingBufferWriteTransport(TEMP_BUFFER_SIZE); - TCompactProtocol protocol = new TCompactProtocol(byteBuffer); - status.write(protocol); - return ByteBuffer.wrap(byteBuffer.getBuffer()); + try { + TCompactProtocol protocol = new TCompactProtocol(byteBuffer); + status.write(protocol); + return ByteBuffer.wrap(Arrays.copyOf(byteBuffer.getBuffer(), byteBuffer.getPos())); + } finally { + byteBuffer.close(); + } } private static AutoScalingBufferWriteTransport createAutoScalingBufferWriteTransport(
