This is an automated email from the ASF dual-hosted git repository. jt2594838 pushed a commit to branch add_rpc_memory_control in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0ec40b2af8c268bec13db7fb1950724df853bb8c Author: Tian Jiang <[email protected]> AuthorDate: Wed Jun 10 17:54:13 2026 +0800 ver2 --- .../en/org/apache/iotdb/rpc/i18n/RpcMessages.java | 6 ++ .../zh/org/apache/iotdb/rpc/i18n/RpcMessages.java | 6 ++ .../iotdb/rpc/AutoResizingBufferMemoryManager.java | 95 +++++++++++++++++ .../apache/iotdb/rpc/AutoResizingBufferTest.java | 118 +++++++++++++++++++++ 4 files changed, 225 insertions(+) diff --git a/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java b/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java index 0e545164e8d..5c89472be5b 100644 --- a/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java +++ b/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java @@ -50,6 +50,12 @@ public final class RpcMessages { public static final String COULD_NOT_LOAD_KEYSTORE = "Could not load keystore or truststore file"; + // AutoResizingBuffer + public static final String AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED = + "AutoResizingBuffer was interrupted while allocating %d bytes"; + public static final String AUTO_RESIZING_BUFFER_ALLOCATE_FAILED = + "AutoResizingBuffer failed to allocate %d bytes after %d retries"; + // IoTDBRpcDataSet / IoTDBJDBCDataSet public static final String CLOSE_OPERATION_SERVER_ERROR = "Error occurs for close operation in server side because "; diff --git a/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java b/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java index 03faa1dbb83..d12d4593a39 100644 --- a/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java +++ b/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java @@ -44,6 +44,12 @@ public final class RpcMessages { // BaseRpcTransportFactory public static final String COULD_NOT_LOAD_KEYSTORE = "无法加载密钥库或信任库文件"; + // AutoResizingBuffer + public static final String AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED = + "AutoResizingBuffer 分配 %d 字节内存时被中断"; + public static final String AUTO_RESIZING_BUFFER_ALLOCATE_FAILED = + "AutoResizingBuffer 在 %d 次重试后仍无法分配 %d 字节内存"; + // IoTDBRpcDataSet / IoTDBJDBCDataSet public static final String CLOSE_OPERATION_SERVER_ERROR = "服务端关闭操作失败,原因:"; public static final String CLOSE_OPERATION_CONNECTION_ERROR = "连接服务端执行关闭操作时出错 "; diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java new file mode 100644 index 00000000000..8c9fb19785b --- /dev/null +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rpc; + +import org.apache.iotdb.rpc.i18n.RpcMessages; + +import java.io.IOException; +import java.util.Objects; + +public final class AutoResizingBufferMemoryManager { + private static final int MEMORY_ALLOCATE_MAX_RETRIES = 5; + private static final long DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS = 2_000L; + + private static final AutoResizingBufferMemoryControl NO_OP_MEMORY_CONTROL = + new AutoResizingBufferMemoryControl() { + @Override + public boolean allocate(long sizeInBytes) { + return true; + } + + @Override + public void release(long sizeInBytes) { + // Do nothing. + } + }; + + private static volatile AutoResizingBufferMemoryControl memoryControl = NO_OP_MEMORY_CONTROL; + private static volatile long memoryAllocateRetryIntervalInMs = + DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS; + + private AutoResizingBufferMemoryManager() { + // Utility class. + } + + public static void setMemoryControl(AutoResizingBufferMemoryControl memoryControl) { + AutoResizingBufferMemoryManager.memoryControl = Objects.requireNonNull(memoryControl); + } + + static void resetMemoryControl() { + memoryControl = NO_OP_MEMORY_CONTROL; + memoryAllocateRetryIntervalInMs = DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS; + } + + static void setMemoryAllocateRetryIntervalInMs(long memoryAllocateRetryIntervalInMs) { + AutoResizingBufferMemoryManager.memoryAllocateRetryIntervalInMs = + memoryAllocateRetryIntervalInMs; + } + + static void allocate(long sizeInBytes) throws IOException { + if (sizeInBytes <= 0) { + return; + } + for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) { + if (memoryControl.allocate(sizeInBytes)) { + return; + } + try { + Thread.sleep(memoryAllocateRetryIntervalInMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + String.format(RpcMessages.AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED, sizeInBytes), e); + } + } + throw new IOException( + String.format( + RpcMessages.AUTO_RESIZING_BUFFER_ALLOCATE_FAILED, + sizeInBytes, + MEMORY_ALLOCATE_MAX_RETRIES)); + } + + static void release(long sizeInBytes) { + if (sizeInBytes <= 0) { + return; + } + memoryControl.release(sizeInBytes); + } +} diff --git a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java new file mode 100644 index 00000000000..f862a76733c --- /dev/null +++ b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.rpc; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.concurrent.atomic.AtomicLong; + +public class AutoResizingBufferTest { + + private final TestMemoryControl memoryControl = new TestMemoryControl(); + + @After + public void tearDown() { + AutoResizingBufferMemoryManager.resetMemoryControl(); + } + + @Test + public void testAllocateAndReleaseMemoryWhenResizing() throws Exception { + AutoResizingBufferMemoryManager.setMemoryControl(memoryControl); + + AutoResizingBuffer buffer = new AutoResizingBuffer(100); + Assert.assertEquals(100, memoryControl.getUsedMemoryInBytes()); + + buffer.resizeIfNecessary(200); + Assert.assertEquals(200, buffer.array().length); + Assert.assertEquals(200, memoryControl.getUsedMemoryInBytes()); + + setLastShrinkTime(buffer, System.currentTimeMillis() - RpcUtils.MIN_SHRINK_INTERVAL - 1); + for (int i = 0; i <= RpcUtils.MAX_BUFFER_OVERSIZE_TIME; i++) { + buffer.resizeIfNecessary(101); + } + Assert.assertEquals(150, buffer.array().length); + Assert.assertEquals(150, memoryControl.getUsedMemoryInBytes()); + + buffer.close(); + Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes()); + } + + @Test + public void testThrowIOExceptionWhenMemoryIsInsufficient() throws Exception { + memoryControl.setLimit(120); + AutoResizingBufferMemoryManager.setMemoryControl(memoryControl); + AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1); + + AutoResizingBuffer buffer = new AutoResizingBuffer(100); + try { + buffer.resizeIfNecessary(200); + Assert.fail("Expected IOException"); + } catch (IOException e) { + Assert.assertTrue(e.getMessage().contains("100")); + Assert.assertTrue(e.getMessage().contains("5")); + } finally { + Assert.assertEquals(100, memoryControl.getUsedMemoryInBytes()); + buffer.close(); + } + } + + private void setLastShrinkTime(AutoResizingBuffer buffer, long lastShrinkTime) throws Exception { + Field field = AutoResizingBuffer.class.getDeclaredField("lastShrinkTime"); + field.setAccessible(true); + field.set(buffer, lastShrinkTime); + } + + private static class TestMemoryControl implements AutoResizingBufferMemoryControl { + + private final AtomicLong usedMemoryInBytes = new AtomicLong(); + private long limit = Long.MAX_VALUE; + + @Override + public boolean allocate(long sizeInBytes) { + while (true) { + long current = usedMemoryInBytes.get(); + long next = current + sizeInBytes; + if (next > limit) { + return false; + } + if (usedMemoryInBytes.compareAndSet(current, next)) { + return true; + } + } + } + + @Override + public void release(long sizeInBytes) { + usedMemoryInBytes.addAndGet(-sizeInBytes); + } + + private long getUsedMemoryInBytes() { + return usedMemoryInBytes.get(); + } + + private void setLimit(long limit) { + this.limit = limit; + } + } +}
