This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch add_rpc_memory_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a875fee9b3c2e57c25a21be0cbd96c31bb2a7a15
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Jun 10 18:55:15 2026 +0800

    ver4
---
 .../db/it/IoTDBAutoResizingBufferMemoryIT.java     | 171 +++++++++++++++++++++
 pom.xml                                            |   1 +
 2 files changed, 172 insertions(+)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBAutoResizingBufferMemoryIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBAutoResizingBufferMemoryIT.java
new file mode 100644
index 00000000000..2c37100ae69
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBAutoResizingBufferMemoryIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.RpcUtils;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBAutoResizingBufferMemoryIT {
+
+  private static final double AUTO_RESIZING_BUFFER_MEMORY_PROPORTION = 0.0003;
+  private static final String CONFIG_FILE_ENTRY = 
"auto_resizing_buffer_memory_proportion=3.0E-4";
+  private static final int DATANODE_MAX_HEAP_SIZE_IN_MB = 256;
+  private static final int AUTO_RESIZING_BUFFER_COUNT_PER_CONNECTION = 2;
+  private static final int CONNECTION_COUNT_OVERFLOW_MARGIN = 1;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        
.setAutoResizingBufferMemoryProportion(AUTO_RESIZING_BUFFER_MEMORY_PROPORTION);
+    EnvFactory.getEnv()
+        .getConfig()
+        .getDataNodeJVMConfig()
+        .setMaxHeapSize(DATANODE_MAX_HEAP_SIZE_IN_MB);
+    EnvFactory.getEnv().initClusterEnvironment();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testAutoResizingBufferMemoryProportionConfigTakesEffect() throws 
Exception {
+    Assert.assertTrue(
+        EnvFactory.getEnv().getNodeWrapperList().stream()
+            .allMatch(nodeWrapper -> checkConfigFileContains(nodeWrapper, 
CONFIG_FILE_ENTRY)));
+
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE root.auto_resizing_buffer_memory");
+      statement.execute(
+          "CREATE TIMESERIES root.auto_resizing_buffer_memory.d1.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN");
+      statement.execute(
+          "INSERT INTO root.auto_resizing_buffer_memory.d1(time, s1) VALUES 
(1, 100)");
+
+      try (ResultSet resultSet =
+          statement.executeQuery("SELECT s1 FROM 
root.auto_resizing_buffer_memory.d1")) {
+        Assert.assertTrue(resultSet.next());
+        Assert.assertEquals(100, 
resultSet.getInt("root.auto_resizing_buffer_memory.d1.s1"));
+        Assert.assertFalse(resultSet.next());
+      }
+    }
+  }
+
+  @Test
+  public void 
testNewConnectionsWithWritesAreRejectedWhenBufferMemoryIsExhausted()
+      throws Exception {
+    List<Connection> heldConnections = new ArrayList<>();
+    boolean rejected = false;
+
+    try {
+      try (Connection connection = EnvFactory.getEnv().getConnection();
+          Statement statement = connection.createStatement()) {
+        statement.execute("CREATE DATABASE root.auto_resizing_buffer_reject");
+        statement.execute(
+            "CREATE TIMESERIES root.auto_resizing_buffer_reject.d1.s1 WITH 
DATATYPE=INT32, ENCODING=PLAIN");
+      }
+
+      int connectionCountToExhaustBufferMemory =
+          calculateConnectionCountToExhaustAutoResizingBufferMemory();
+      for (int i = 0; i < connectionCountToExhaustBufferMemory; i++) {
+        try {
+          Connection connection = EnvFactory.getEnv().getConnection();
+          heldConnections.add(connection);
+          try (Statement statement = connection.createStatement()) {
+            statement.execute(
+                String.format(
+                    "INSERT INTO root.auto_resizing_buffer_reject.d1(time, s1) 
VALUES (%d, %d)",
+                    i + 1, i));
+          }
+        } catch (Exception e) {
+          rejected = true;
+          break;
+        }
+      }
+    } finally {
+      for (Connection connection : heldConnections) {
+        closeQuietly(connection);
+      }
+    }
+
+    Assert.assertTrue(
+        "Expected new connections with writes to be rejected after 
AutoResizingBuffer memory is exhausted",
+        rejected);
+  }
+
+  private static void closeQuietly(Connection connection) {
+    try {
+      connection.close();
+    } catch (SQLException ignored) {
+      // ignored
+    }
+  }
+
+  private static int 
calculateConnectionCountToExhaustAutoResizingBufferMemory() {
+    long autoResizingBufferMemorySizeInBytes =
+        (long)
+            (DATANODE_MAX_HEAP_SIZE_IN_MB * 1024L * 1024L * 
AUTO_RESIZING_BUFFER_MEMORY_PROPORTION);
+    int autoResizingBufferInitialSizePerConnection =
+        AUTO_RESIZING_BUFFER_COUNT_PER_CONNECTION * 
RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
+    return (int)
+        (autoResizingBufferMemorySizeInBytes / 
autoResizingBufferInitialSizePerConnection
+            + CONNECTION_COUNT_OVERFLOW_MARGIN);
+  }
+
+  private static boolean checkConfigFileContains(AbstractNodeWrapper 
nodeWrapper, String content) {
+    try {
+      String systemPropertiesPath =
+          nodeWrapper.getNodePath()
+              + File.separator
+              + "conf"
+              + File.separator
+              + CommonConfig.SYSTEM_CONFIG_NAME;
+      return new String(Files.readAllBytes(new 
File(systemPropertiesPath).toPath()))
+          .contains(content);
+    } catch (Exception ignore) {
+      return false;
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index b06647be538..73c3fd93c38 100644
--- a/pom.xml
+++ b/pom.xml
@@ -764,6 +764,7 @@
                             <exclude>**/.gitmodules</exclude>
                             <exclude>**/.git-blame-ignore-revs</exclude>
                             <exclude>**/git.properties</exclude>
+                            <exclude>AGENTS.md</exclude>
                             <!-- Maven related files -->
                             <exclude>**/target/**</exclude>
                             <!-- Eclipse related files -->

Reply via email to