This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch add_rpc_memory_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0ec40b2af8c268bec13db7fb1950724df853bb8c
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Jun 10 17:54:13 2026 +0800

    ver2
---
 .../en/org/apache/iotdb/rpc/i18n/RpcMessages.java  |   6 ++
 .../zh/org/apache/iotdb/rpc/i18n/RpcMessages.java  |   6 ++
 .../iotdb/rpc/AutoResizingBufferMemoryManager.java |  95 +++++++++++++++++
 .../apache/iotdb/rpc/AutoResizingBufferTest.java   | 118 +++++++++++++++++++++
 4 files changed, 225 insertions(+)

diff --git 
a/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java
 
b/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java
index 0e545164e8d..5c89472be5b 100644
--- 
a/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java
+++ 
b/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java
@@ -50,6 +50,12 @@ public final class RpcMessages {
   public static final String COULD_NOT_LOAD_KEYSTORE =
       "Could not load keystore or truststore file";
 
+  // AutoResizingBuffer
+  public static final String AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED =
+      "AutoResizingBuffer was interrupted while allocating %d bytes";
+  public static final String AUTO_RESIZING_BUFFER_ALLOCATE_FAILED =
+      "AutoResizingBuffer failed to allocate %d bytes after %d retries";
+
   // IoTDBRpcDataSet / IoTDBJDBCDataSet
   public static final String CLOSE_OPERATION_SERVER_ERROR =
       "Error occurs for close operation in server side because ";
diff --git 
a/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java
 
b/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java
index 03faa1dbb83..d12d4593a39 100644
--- 
a/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java
+++ 
b/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java
@@ -44,6 +44,12 @@ public final class RpcMessages {
   // BaseRpcTransportFactory
   public static final String COULD_NOT_LOAD_KEYSTORE = "无法加载密钥库或信任库文件";
 
+  // AutoResizingBuffer
+  public static final String AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED =
+      "AutoResizingBuffer 分配 %d 字节内存时被中断";
+  public static final String AUTO_RESIZING_BUFFER_ALLOCATE_FAILED =
+      "AutoResizingBuffer 在 %d 次重试后仍无法分配 %d 字节内存";
+
   // IoTDBRpcDataSet / IoTDBJDBCDataSet
   public static final String CLOSE_OPERATION_SERVER_ERROR = "服务端关闭操作失败,原因:";
   public static final String CLOSE_OPERATION_CONNECTION_ERROR = 
"连接服务端执行关闭操作时出错 ";
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java
new file mode 100644
index 00000000000..8c9fb19785b
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.apache.iotdb.rpc.i18n.RpcMessages;
+
+import java.io.IOException;
+import java.util.Objects;
+
+public final class AutoResizingBufferMemoryManager {
+  private static final int MEMORY_ALLOCATE_MAX_RETRIES = 5;
+  private static final long DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS = 
2_000L;
+
+  private static final AutoResizingBufferMemoryControl NO_OP_MEMORY_CONTROL =
+      new AutoResizingBufferMemoryControl() {
+        @Override
+        public boolean allocate(long sizeInBytes) {
+          return true;
+        }
+
+        @Override
+        public void release(long sizeInBytes) {
+          // Do nothing.
+        }
+      };
+
+  private static volatile AutoResizingBufferMemoryControl memoryControl = 
NO_OP_MEMORY_CONTROL;
+  private static volatile long memoryAllocateRetryIntervalInMs =
+      DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS;
+
+  private AutoResizingBufferMemoryManager() {
+    // Utility class.
+  }
+
+  public static void setMemoryControl(AutoResizingBufferMemoryControl 
memoryControl) {
+    AutoResizingBufferMemoryManager.memoryControl = 
Objects.requireNonNull(memoryControl);
+  }
+
+  static void resetMemoryControl() {
+    memoryControl = NO_OP_MEMORY_CONTROL;
+    memoryAllocateRetryIntervalInMs = 
DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS;
+  }
+
+  static void setMemoryAllocateRetryIntervalInMs(long 
memoryAllocateRetryIntervalInMs) {
+    AutoResizingBufferMemoryManager.memoryAllocateRetryIntervalInMs =
+        memoryAllocateRetryIntervalInMs;
+  }
+
+  static void allocate(long sizeInBytes) throws IOException {
+    if (sizeInBytes <= 0) {
+      return;
+    }
+    for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) {
+      if (memoryControl.allocate(sizeInBytes)) {
+        return;
+      }
+      try {
+        Thread.sleep(memoryAllocateRetryIntervalInMs);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(
+            
String.format(RpcMessages.AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED, 
sizeInBytes), e);
+      }
+    }
+    throw new IOException(
+        String.format(
+            RpcMessages.AUTO_RESIZING_BUFFER_ALLOCATE_FAILED,
+            sizeInBytes,
+            MEMORY_ALLOCATE_MAX_RETRIES));
+  }
+
+  static void release(long sizeInBytes) {
+    if (sizeInBytes <= 0) {
+      return;
+    }
+    memoryControl.release(sizeInBytes);
+  }
+}
diff --git 
a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java
 
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java
new file mode 100644
index 00000000000..f862a76733c
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AutoResizingBufferTest {
+
+  private final TestMemoryControl memoryControl = new TestMemoryControl();
+
+  @After
+  public void tearDown() {
+    AutoResizingBufferMemoryManager.resetMemoryControl();
+  }
+
+  @Test
+  public void testAllocateAndReleaseMemoryWhenResizing() throws Exception {
+    AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+
+    AutoResizingBuffer buffer = new AutoResizingBuffer(100);
+    Assert.assertEquals(100, memoryControl.getUsedMemoryInBytes());
+
+    buffer.resizeIfNecessary(200);
+    Assert.assertEquals(200, buffer.array().length);
+    Assert.assertEquals(200, memoryControl.getUsedMemoryInBytes());
+
+    setLastShrinkTime(buffer, System.currentTimeMillis() - 
RpcUtils.MIN_SHRINK_INTERVAL - 1);
+    for (int i = 0; i <= RpcUtils.MAX_BUFFER_OVERSIZE_TIME; i++) {
+      buffer.resizeIfNecessary(101);
+    }
+    Assert.assertEquals(150, buffer.array().length);
+    Assert.assertEquals(150, memoryControl.getUsedMemoryInBytes());
+
+    buffer.close();
+    Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+  }
+
+  @Test
+  public void testThrowIOExceptionWhenMemoryIsInsufficient() throws Exception {
+    memoryControl.setLimit(120);
+    AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+    AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1);
+
+    AutoResizingBuffer buffer = new AutoResizingBuffer(100);
+    try {
+      buffer.resizeIfNecessary(200);
+      Assert.fail("Expected IOException");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("100"));
+      Assert.assertTrue(e.getMessage().contains("5"));
+    } finally {
+      Assert.assertEquals(100, memoryControl.getUsedMemoryInBytes());
+      buffer.close();
+    }
+  }
+
+  private void setLastShrinkTime(AutoResizingBuffer buffer, long 
lastShrinkTime) throws Exception {
+    Field field = AutoResizingBuffer.class.getDeclaredField("lastShrinkTime");
+    field.setAccessible(true);
+    field.set(buffer, lastShrinkTime);
+  }
+
+  private static class TestMemoryControl implements 
AutoResizingBufferMemoryControl {
+
+    private final AtomicLong usedMemoryInBytes = new AtomicLong();
+    private long limit = Long.MAX_VALUE;
+
+    @Override
+    public boolean allocate(long sizeInBytes) {
+      while (true) {
+        long current = usedMemoryInBytes.get();
+        long next = current + sizeInBytes;
+        if (next > limit) {
+          return false;
+        }
+        if (usedMemoryInBytes.compareAndSet(current, next)) {
+          return true;
+        }
+      }
+    }
+
+    @Override
+    public void release(long sizeInBytes) {
+      usedMemoryInBytes.addAndGet(-sizeInBytes);
+    }
+
+    private long getUsedMemoryInBytes() {
+      return usedMemoryInBytes.get();
+    }
+
+    private void setLimit(long limit) {
+      this.limit = limit;
+    }
+  }
+}

Reply via email to