This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1073715a081 Add RPC auto resizing buffer memory control (#17911)
1073715a081 is described below
commit 1073715a081f4e67d3b2963694a8fca049805d6d
Author: Jiang Tian <[email protected]>
AuthorDate: Thu Jun 25 15:39:10 2026 +0800
Add RPC auto resizing buffer memory control (#17911)
* ver1
* ver2
* ver3
* ver4
* ver4
* update default config value
* Disable when proportion <= 0
* fix review
* disable mem-control in ITs
* Replace auto_resizing_buffer_memory_proportion with
datanode_memory_proportion
* Add AutoResizingBuffer memory metrics
* Fix it
---
.../db/it/IoTDBAutoResizingBufferMemoryIT.java | 274 +++++++++++++++++++++
.../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 2 +-
.../manual/AbstractPipeTableModelDualManualIT.java | 2 +
.../manual/basic/IoTDBPipePermissionIT.java | 2 +
.../manual/basic/IoTDBPipeProtocolIT.java | 6 +
.../tablemodel/manual/basic/IoTDBPipeSourceIT.java | 2 +
.../manual/basic/IoTDBPipeWithLoadIT.java | 2 +
.../manual/enhanced/IoTDBPipeAutoConflictIT.java | 2 +
.../manual/enhanced/IoTDBPipeAutoDropIT.java | 2 +
.../manual/enhanced/IoTDBPipeClusterIT.java | 2 +
.../enhanced/IoTDBPipeSinkCompressionIT.java | 4 +-
.../auto/AbstractPipeDualTreeModelAutoIT.java | 6 +-
.../treemodel/auto/basic/IoTDBPipeAutoSplitIT.java | 2 +
.../treemodel/auto/basic/IoTDBPipeProcessorIT.java | 2 +
.../treemodel/auto/basic/IoTDBPipeProtocolIT.java | 6 +
.../IoTDBPipeReceiverAutoCreateDisabledIT.java | 1 +
.../treemodel/auto/basic/IoTDBPipeSourceIT.java | 2 +
.../auto/enhanced/IoTDBPipeAutoConflictIT.java | 2 +
.../auto/enhanced/IoTDBPipeClusterIT.java | 2 +
.../auto/enhanced/IoTDBPipeIdempotentIT.java | 2 +
.../auto/enhanced/IoTDBPipeSinkCompressionIT.java | 2 +
.../auto/enhanced/IoTDBPipeWithLoadIT.java | 2 +
.../manual/AbstractPipeDualTreeModelManualIT.java | 4 +-
.../manual/IoTDBPipeMetaHistoricalIT.java | 4 +-
.../manual/IoTDBPipeMetaLeaderChangeIT.java | 7 +-
.../treemodel/manual/IoTDBPipePermissionIT.java | 4 +-
.../iotdb/pipe/it/single/AbstractPipeSingleIT.java | 1 +
.../single/IoTDBLegacyPipeReceiverSecurityIT.java | 1 +
.../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java | 1 +
.../pipe/it/single/IoTDBPipePermissionIT.java | 1 +
.../pipe/it/triple/AbstractPipeTripleManualIT.java | 5 +-
.../relational/it/db/it/IoTDBLoadTsFileIT.java | 1 +
iotdb-client/client-go | 2 +-
.../en/org/apache/iotdb/rpc/i18n/RpcMessages.java | 6 +
.../zh/org/apache/iotdb/rpc/i18n/RpcMessages.java | 6 +
.../org/apache/iotdb/rpc/AutoResizingBuffer.java | 53 +++-
.../iotdb/rpc/AutoResizingBufferMemoryControl.java | 25 +-
.../iotdb/rpc/AutoResizingBufferMemoryManager.java | 110 +++++++++
.../iotdb/rpc/AutoScalingBufferReadTransport.java | 18 +-
.../iotdb/rpc/AutoScalingBufferWriteTransport.java | 20 +-
.../rpc/TCompressedElasticFramedTransport.java | 29 ++-
.../apache/iotdb/rpc/TElasticFramedTransport.java | 38 ++-
.../iotdb/rpc/TSnappyElasticFramedTransport.java | 8 +-
.../rpc/TimeoutChangeableTFastFramedTransport.java | 6 +-
.../TimeoutChangeableTSnappyFramedTransport.java | 6 +-
.../apache/iotdb/rpc/AutoResizingBufferTest.java | 193 +++++++++++++++
.../iotdb/confignode/service/ConfigNode.java | 2 +
.../apache/iotdb/consensus/ratis/utils/Utils.java | 23 +-
.../apache/iotdb/db/conf/DataNodeMemoryConfig.java | 9 +
.../conf/iotdb-system.properties.template | 1 +
.../iotdb/commons/conf/CommonDescriptor.java | 1 -
.../memory/AutoResizingBufferMemoryMetrics.java | 112 +++++++++
.../apache/iotdb/commons/memory/MemoryConfig.java | 87 ++++++-
pom.xml | 1 +
54 files changed, 1048 insertions(+), 66 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBAutoResizingBufferMemoryIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBAutoResizingBufferMemoryIT.java
new file mode 100644
index 00000000000..ead744f5497
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBAutoResizingBufferMemoryIT.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.isession.SessionConfig;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory;
+import org.apache.iotdb.rpc.RpcUtils;
+import org.apache.iotdb.service.rpc.thrift.IClientRPCService;
+import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
+import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBAutoResizingBufferMemoryIT {
+
+ private static final int DATANODE_MAX_HEAP_SIZE_IN_MB = 256;
+ private static final int AUTO_RESIZING_BUFFER_COUNT_PER_CONNECTION = 2;
+ private static final int CONNECTION_COUNT_OVERFLOW_MARGIN = 1;
+ private static final int INITIAL_GROWING_REQUEST_PAYLOAD_SIZE = 16 * 1024;
+ private static final int MAX_GROWING_REQUEST_PAYLOAD_SIZE =
+ calculateNextPowerOfTwo(calculateAutoResizingBufferMemorySizeInBytes());
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getDataNodeJVMConfig()
+ .setMaxHeapSize(DATANODE_MAX_HEAP_SIZE_IN_MB);
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void
testNewConnectionsWithWritesAreRejectedWhenBufferMemoryIsExhausted()
+ throws Exception {
+ List<Connection> heldConnections = new ArrayList<>();
+ boolean rejected = false;
+
+ try {
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("CREATE DATABASE root.auto_resizing_buffer_reject");
+ statement.execute(
+ "CREATE TIMESERIES root.auto_resizing_buffer_reject.d1.s1 WITH
DATATYPE=INT32, ENCODING=PLAIN");
+ }
+
+ int connectionCountToExhaustBufferMemory =
+ calculateConnectionCountToExhaustAutoResizingBufferMemory();
+ for (int i = 0; i < connectionCountToExhaustBufferMemory; i++) {
+ try {
+ Connection connection = EnvFactory.getEnv().getConnection();
+ heldConnections.add(connection);
+ try (Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "INSERT INTO root.auto_resizing_buffer_reject.d1(time, s1)
VALUES (%d, %d)",
+ i + 1, i));
+ }
+ } catch (Exception e) {
+ rejected = true;
+ break;
+ }
+ }
+ } finally {
+ for (Connection connection : heldConnections) {
+ closeQuietly(connection);
+ }
+ }
+
+ Assert.assertTrue(
+ "Expected new connections with writes to be rejected after
AutoResizingBuffer memory is exhausted",
+ rejected);
+ }
+
+ @Test
+ public void testGrowingRequestsAreRejectedWhenBufferMemoryIsExhausted()
throws Exception {
+ boolean rejected = false;
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("CREATE DATABASE
root.auto_resizing_buffer_growing_request");
+ statement.execute(
+ "CREATE TIMESERIES root.auto_resizing_buffer_growing_request.d1.s1
WITH DATATYPE=TEXT, ENCODING=PLAIN");
+ }
+
+ // do not use connection because its inevitable retry will slow down the
test
+ try (ThriftClientContext clientContext = ThriftClientContext.open()) {
+ int payloadSize = INITIAL_GROWING_REQUEST_PAYLOAD_SIZE;
+ while (payloadSize <= MAX_GROWING_REQUEST_PAYLOAD_SIZE) {
+ try {
+ clientContext.executeStatement(
+ String.format(
+ "INSERT INTO
root.auto_resizing_buffer_growing_request.d1(time, s1) VALUES (%d, '%s')",
+ payloadSize, repeat('a', payloadSize)));
+ } catch (Exception e) {
+ rejected = true;
+ clientContext.markBroken();
+ break;
+ }
+ payloadSize = payloadSize << 1;
+ }
+ }
+
+ Assert.assertTrue(
+ "Expected a growing request to be rejected after AutoResizingBuffer
memory is exhausted",
+ rejected);
+ }
+
+ private static void closeQuietly(AutoCloseable closeable) {
+ if (closeable == null) {
+ return;
+ }
+ try {
+ closeable.close();
+ } catch (Exception ignored) {
+ // ignored
+ }
+ }
+
+ private static int
calculateConnectionCountToExhaustAutoResizingBufferMemory() {
+ int autoResizingBufferInitialSizePerConnection =
+ AUTO_RESIZING_BUFFER_COUNT_PER_CONNECTION *
RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
+ return (int)
+ (calculateAutoResizingBufferMemorySizeInBytes() /
autoResizingBufferInitialSizePerConnection
+ + CONNECTION_COUNT_OVERFLOW_MARGIN);
+ }
+
+ private static long calculateAutoResizingBufferMemorySizeInBytes() {
+ return (DATANODE_MAX_HEAP_SIZE_IN_MB * 1024L * 1024L * 5 / 100); // 5% of
the max heap size;
+ }
+
+ private static int calculateNextPowerOfTwo(long value) {
+ int result = 1;
+ while (result < value) {
+ result <<= 1;
+ }
+ return result;
+ }
+
+ private static String repeat(char character, int count) {
+ char[] chars = new char[count];
+ Arrays.fill(chars, character);
+ return new String(chars);
+ }
+
+ private static TSOpenSessionReq createOpenSessionReq() {
+ TSOpenSessionReq req = new TSOpenSessionReq();
+ req.setUsername(SessionConfig.DEFAULT_USER);
+ req.setPassword(SessionConfig.DEFAULT_PASSWORD);
+ req.setZoneId(ZoneId.systemDefault().toString());
+ req.putToConfiguration("version",
IoTDBConstant.ClientVersion.V_1_0.toString());
+ req.putToConfiguration("sql_dialect", "tree");
+ return req;
+ }
+
+ private static class ThriftClientContext implements AutoCloseable {
+
+ private final TTransport transport;
+ private final IClientRPCService.Client client;
+ private final long sessionId;
+ private final long statementId;
+ private boolean broken;
+
+ private ThriftClientContext(
+ TTransport transport, IClientRPCService.Client client, long sessionId,
long statementId) {
+ this.transport = transport;
+ this.client = client;
+ this.sessionId = sessionId;
+ this.statementId = statementId;
+ }
+
+ private static ThriftClientContext open() throws Exception {
+ DataNodeWrapper dataNode = EnvFactory.getEnv().getDataNodeWrapper(0);
+ TTransport transport =
+ DeepCopyRpcTransportFactory.INSTANCE.getTransport(
+ dataNode.getIp(), dataNode.getPort(), 0);
+ transport.open();
+ IClientRPCService.Client client =
+ new IClientRPCService.Client(new TBinaryProtocol(transport));
+ TSOpenSessionResp openSessionResp =
client.openSession(createOpenSessionReq());
+ RpcUtils.verifySuccess(openSessionResp.getStatus());
+ long sessionId = openSessionResp.getSessionId();
+ return new ThriftClientContext(
+ transport, client, sessionId, client.requestStatementId(sessionId));
+ }
+
+ private void executeStatement(String sql) throws Exception {
+ TSExecuteStatementResp resp =
+ client.executeStatementV2(new TSExecuteStatementReq(sessionId, sql,
statementId));
+ RpcUtils.verifySuccess(resp.getStatus());
+ }
+
+ private void markBroken() {
+ broken = true;
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ if (!broken) {
+ client.closeOperation(new
TSCloseOperationReq(sessionId).setStatementId(statementId));
+ client.closeSession(new TSCloseSessionReq(sessionId));
+ }
+ } finally {
+ transport.close();
+ }
+ }
+ }
+
+ private static boolean checkConfigFileContains(AbstractNodeWrapper
nodeWrapper, String content) {
+ try {
+ String systemPropertiesPath =
+ nodeWrapper.getNodePath()
+ + File.separator
+ + "conf"
+ + File.separator
+ + CommonConfig.SYSTEM_CONFIG_NAME;
+ return new String(Files.readAllBytes(new
File(systemPropertiesPath).toPath()))
+ .contains(content);
+ } catch (Exception ignore) {
+ return false;
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index 19a4214bed2..1eda0f06226 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -86,7 +86,7 @@ public class IoTDBLoadTsFileIT {
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(PARTITION_INTERVAL);
EnvFactory.getEnv().getConfig().getCommonConfig().setEnforceStrongPassword(false);
EnvFactory.getEnv().getConfig().getCommonConfig().setPipeMemoryManagementEnabled(false);
-
EnvFactory.getEnv().getConfig().getCommonConfig().setDatanodeMemoryProportion("1:10:1:1:1:1");
+
EnvFactory.getEnv().getConfig().getCommonConfig().setDatanodeMemoryProportion("1:10:1:1:1:0");
EnvFactory.getEnv()
.getConfig()
.getDataNodeConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
index 3b3fae80902..e304fc81d96 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java
@@ -46,6 +46,7 @@ public abstract class AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setEnforceStrongPassword(false)
@@ -56,6 +57,7 @@ public abstract class AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setEnforceStrongPassword(false)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
index f233596fc60..dc5b3218be7 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java
@@ -65,6 +65,7 @@ public class IoTDBPipePermissionIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setDefaultSchemaRegionGroupNumPerDatabase(1)
.setTimestampPrecision("ms")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -77,6 +78,7 @@ public class IoTDBPipePermissionIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setTimestampPrecision("ms")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
index 3fce11bf51f..647d5ab1515 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeProtocolIT.java
@@ -71,6 +71,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(dataRegionConsensus)
@@ -83,6 +84,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(configNodeConsensus)
.setSchemaRegionConsensusProtocolClass(schemaRegionConsensus)
.setDataRegionConsensusProtocolClass(dataRegionConsensus)
@@ -167,6 +169,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -179,6 +182,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -361,6 +365,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setPipeAirGapReceiverEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -377,6 +382,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setPipeAirGapReceiverEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
index 022d71d5487..b47a3540635 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeSourceIT.java
@@ -65,6 +65,7 @@ public class IoTDBPipeSourceIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
// Disable sender compaction for tsfile determination in loose range
test
@@ -80,6 +81,7 @@ public class IoTDBPipeSourceIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDnConnectionTimeoutMs(600000)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
index 57b5846fc4b..c1f466d9a72 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeWithLoadIT.java
@@ -63,6 +63,7 @@ public class IoTDBPipeWithLoadIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
// Disable sender compaction to test mods
@@ -77,6 +78,7 @@ public class IoTDBPipeWithLoadIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDnConnectionTimeoutMs(600000)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
index de0f0c460ee..404f74058dc 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoConflictIT.java
@@ -59,6 +59,7 @@ public class IoTDBPipeAutoConflictIT extends
AbstractPipeTableModelDualManualIT
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -69,6 +70,7 @@ public class IoTDBPipeAutoConflictIT extends
AbstractPipeTableModelDualManualIT
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
index ff906d8a5af..22d756a8b89 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java
@@ -70,6 +70,7 @@ public class IoTDBPipeAutoDropIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setEnforceStrongPassword(false)
@@ -79,6 +80,7 @@ public class IoTDBPipeAutoDropIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setEnforceStrongPassword(false)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index 27297c06471..6fe48f1d866 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -83,6 +83,7 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -98,6 +99,7 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setDataReplicationFactor(2)
.setSchemaReplicationFactor(3)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
index 547db349e2f..25495364e25 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -71,17 +71,19 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeTableModelDualManual
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDnConnectionTimeoutMs(600000)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false)
.setPipeAutoSplitFullEnabled(false);
-
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
+
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:0");
receiverEnv
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setPipeAirGapReceiverEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
index a7fae02f6d1..24e9b04d6ff 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/AbstractPipeDualTreeModelAutoIT.java
@@ -54,23 +54,25 @@ public abstract class AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setEnforceStrongPassword(false)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false)
.setPipeAutoSplitFullEnabled(false);
-
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
+
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:0");
receiverEnv
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false)
.setPipeAutoSplitFullEnabled(false);
-
receiverEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:9:1");
+
receiverEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:9:0");
// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
index 97e18346836..048f9a19180 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAutoSplitIT.java
@@ -61,6 +61,7 @@ public class IoTDBPipeAutoSplitIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setEnforceStrongPassword(false)
@@ -71,6 +72,7 @@ public class IoTDBPipeAutoSplitIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
index dfa7957a574..d17d73516fa 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProcessorIT.java
@@ -58,6 +58,7 @@ public class IoTDBPipeProcessorIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setTimestampPrecision("ms")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -67,6 +68,7 @@ public class IoTDBPipeProcessorIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
index 8a7ac4f1947..92874b6d539 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeProtocolIT.java
@@ -71,6 +71,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(dataRegionConsensus)
@@ -82,6 +83,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(configNodeConsensus)
.setSchemaRegionConsensusProtocolClass(schemaRegionConsensus)
.setDataRegionConsensusProtocolClass(dataRegionConsensus)
@@ -169,6 +171,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -178,6 +181,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -355,6 +359,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setPipeAirGapReceiverEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -368,6 +373,7 @@ public class IoTDBPipeProtocolIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setPipeAirGapReceiverEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
index 72646f81b86..158dd552c20 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
@@ -66,6 +66,7 @@ public class IoTDBPipeReceiverAutoCreateDisabledIT extends
AbstractPipeDualTreeM
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setDataReplicationFactor(1)
.setSchemaReplicationFactor(1);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
index d93cb5e42d4..9af7cc84316 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeSourceIT.java
@@ -67,6 +67,7 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
// Disable sender compaction for tsfile determination in loose range
test
@@ -81,6 +82,7 @@ public class IoTDBPipeSourceIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
index f1a623143aa..6953a5a23f4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeAutoConflictIT.java
@@ -59,6 +59,7 @@ public class IoTDBPipeAutoConflictIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -68,6 +69,7 @@ public class IoTDBPipeAutoConflictIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
index 22b6d07f44a..15ae9fca4c5 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
@@ -84,6 +84,7 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -99,6 +100,7 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setDataReplicationFactor(2)
.setSchemaReplicationFactor(3)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
index 33a68d43ff2..fb8945471b7 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeIdempotentIT.java
@@ -62,6 +62,7 @@ public class IoTDBPipeIdempotentIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
// Limit the schemaRegion number to 1 to guarantee the after sql
executed on the same region
// of the tested idempotent sql.
.setDefaultSchemaRegionGroupNumPerDatabase(1)
@@ -77,6 +78,7 @@ public class IoTDBPipeIdempotentIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setSchemaRegionGroupExtensionPolicy("CUSTOM")
.setDataRegionGroupExtensionPolicy("CUSTOM")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
index 3900506a658..739148f1523 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java
@@ -69,6 +69,7 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualTreeModelAutoIT
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
@@ -79,6 +80,7 @@ public class IoTDBPipeSinkCompressionIT extends
AbstractPipeDualTreeModelAutoIT
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setPipeAirGapReceiverEnabled(true)
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
index 26790479f22..0a5e3b0da74 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeWithLoadIT.java
@@ -56,6 +56,7 @@ public class IoTDBPipeWithLoadIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
// Disable sender compaction to test mods
@@ -68,6 +69,7 @@ public class IoTDBPipeWithLoadIT extends
AbstractPipeDualTreeModelAutoIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
index 11f70d944b6..0fec803c2d3 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/AbstractPipeDualTreeModelManualIT.java
@@ -54,17 +54,19 @@ public abstract class AbstractPipeDualTreeModelManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false)
.setPipeAutoSplitFullEnabled(false);
-
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
+
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:0");
receiverEnv
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
index 5f936cda516..06b4a72ccf7 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaHistoricalIT.java
@@ -57,6 +57,7 @@ public class IoTDBPipeMetaHistoricalIT extends
AbstractPipeDualTreeModelManualIT
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setDefaultSchemaRegionGroupNumPerDatabase(1)
.setTimestampPrecision("ms")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -64,11 +65,12 @@ public class IoTDBPipeMetaHistoricalIT extends
AbstractPipeDualTreeModelManualIT
.setDnConnectionTimeoutMs(600000)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
-
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
+
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:0");
receiverEnv
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setTimestampPrecision("ms")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaLeaderChangeIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaLeaderChangeIT.java
index 119f7ed59c2..9645ff718df 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaLeaderChangeIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipeMetaLeaderChangeIT.java
@@ -53,6 +53,7 @@ public class IoTDBPipeMetaLeaderChangeIT extends
AbstractPipeDualTreeModelManual
senderEnv
.getConfig()
.getCommonConfig()
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
@@ -60,7 +61,11 @@ public class IoTDBPipeMetaLeaderChangeIT extends
AbstractPipeDualTreeModelManual
// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
- receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
+ receiverEnv
+ .getConfig()
+ .getCommonConfig()
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
+ .setDnConnectionTimeoutMs(600000);
senderEnv.initClusterEnvironment(3, 3, 180);
receiverEnv.initClusterEnvironment();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
index 23d0b48d440..4548c12cef3 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/manual/IoTDBPipePermissionIT.java
@@ -63,6 +63,7 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualTreeModelManualIT {
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setDefaultSchemaRegionGroupNumPerDatabase(1)
.setTimestampPrecision("ms")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
@@ -71,11 +72,12 @@ public class IoTDBPipePermissionIT extends
AbstractPipeDualTreeModelManualIT {
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false)
.setPipeAutoSplitFullEnabled(false);
-
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
+
senderEnv.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:0");
receiverEnv
.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setTimestampPrecision("ms")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
index 3ade13c7209..61d4f0157e4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
@@ -36,6 +36,7 @@ abstract class AbstractPipeSingleIT {
env.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false)
.setPipeAutoSplitFullEnabled(false);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java
index 8647e776b50..2b1ce913770 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBLegacyPipeReceiverSecurityIT.java
@@ -54,6 +54,7 @@ public class IoTDBLegacyPipeReceiverSecurityIT {
@BeforeClass
public static void setUp() {
+
EnvFactory.getEnv().getConfig().getCommonConfig().setDatanodeMemoryProportion("3:3:1:1:1:0");
EnvFactory.getEnv().initClusterEnvironment();
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index b1d0a4dda73..fdee0e30c88 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -74,6 +74,7 @@ public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
env.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setPipeMemoryManagementEnabled(false)
.setDataReplicationFactor(1)
.setSchemaReplicationFactor(1)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
index c7fa9d12c4e..45fa762b02b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipePermissionIT.java
@@ -51,6 +51,7 @@ public class IoTDBPipePermissionIT extends
AbstractPipeSingleIT {
env.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(true)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setPipeMemoryManagementEnabled(false)
.setDataReplicationFactor(1)
.setSchemaReplicationFactor(1)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
index f4e63e1d2f8..c1d92c7916b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/triple/AbstractPipeTripleManualIT.java
@@ -48,16 +48,18 @@ abstract class AbstractPipeTripleManualIT {
env1.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false)
.setPipeAutoSplitFullEnabled(false);
-
env1.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:1");
+
env1.getConfig().getDataNodeConfig().setDataNodeMemoryProportion("3:3:1:1:3:0");
env2.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
@@ -67,6 +69,7 @@ abstract class AbstractPipeTripleManualIT {
env3.getConfig()
.getCommonConfig()
.setAutoCreateSchemaEnabled(false)
+ .setDatanodeMemoryProportion("3:3:1:1:1:0")
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
index b351ace5a4d..c4444c4bf55 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBLoadTsFileIT.java
@@ -71,6 +71,7 @@ public class IoTDBLoadTsFileIT {
tmpDir = new File(Files.createTempDirectory("load").toUri());
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(PARTITION_INTERVAL);
EnvFactory.getEnv().getConfig().getCommonConfig().setEnforceStrongPassword(false);
+
EnvFactory.getEnv().getConfig().getCommonConfig().setDatanodeMemoryProportion("3:3:1:1:1:0");
EnvFactory.getEnv()
.getConfig()
.getDataNodeConfig()
diff --git a/iotdb-client/client-go b/iotdb-client/client-go
index dc64b1a7648..2ea2655e090 160000
--- a/iotdb-client/client-go
+++ b/iotdb-client/client-go
@@ -1 +1 @@
-Subproject commit dc64b1a7648d3c505c10eed5419f422bb49f1def
+Subproject commit 2ea2655e090dcefd12bf1a789a51c8df9a28fa24
diff --git
a/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java
b/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java
index 0e545164e8d..5c89472be5b 100644
---
a/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java
+++
b/iotdb-client/service-rpc/src/main/i18n/en/org/apache/iotdb/rpc/i18n/RpcMessages.java
@@ -50,6 +50,12 @@ public final class RpcMessages {
public static final String COULD_NOT_LOAD_KEYSTORE =
"Could not load keystore or truststore file";
+ // AutoResizingBuffer
+ public static final String AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED =
+ "AutoResizingBuffer was interrupted while allocating %d bytes";
+ public static final String AUTO_RESIZING_BUFFER_ALLOCATE_FAILED =
+ "AutoResizingBuffer failed to allocate %d bytes after %d retries";
+
// IoTDBRpcDataSet / IoTDBJDBCDataSet
public static final String CLOSE_OPERATION_SERVER_ERROR =
"Error occurs for close operation in server side because ";
diff --git
a/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java
b/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java
index 03faa1dbb83..d12d4593a39 100644
---
a/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java
+++
b/iotdb-client/service-rpc/src/main/i18n/zh/org/apache/iotdb/rpc/i18n/RpcMessages.java
@@ -44,6 +44,12 @@ public final class RpcMessages {
// BaseRpcTransportFactory
public static final String COULD_NOT_LOAD_KEYSTORE = "无法加载密钥库或信任库文件";
+ // AutoResizingBuffer
+ public static final String AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED =
+ "AutoResizingBuffer 分配 %d 字节内存时被中断";
+ public static final String AUTO_RESIZING_BUFFER_ALLOCATE_FAILED =
+ "AutoResizingBuffer 在 %d 次重试后仍无法分配 %d 字节内存";
+
// IoTDBRpcDataSet / IoTDBJDBCDataSet
public static final String CLOSE_OPERATION_SERVER_ERROR = "服务端关闭操作失败,原因:";
public static final String CLOSE_OPERATION_CONNECTION_ERROR =
"连接服务端执行关闭操作时出错 ";
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
index 180d05b3d93..397a5843171 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.rpc;
+import java.io.IOException;
import java.util.Arrays;
/**
@@ -31,33 +32,38 @@ import java.util.Arrays;
* required size < current capacity * 0.6, and such small requests last for
more than 5 times,
* shrink to the middle of the required size and current capacity.
*/
-class AutoResizingBuffer {
+class AutoResizingBuffer implements AutoCloseable {
private byte[] array;
private int bufTooLargeCounter = RpcUtils.MAX_BUFFER_OVERSIZE_TIME;
private final int initialCapacity;
private long lastShrinkTime;
+ private int accountedCapacity;
+ private boolean closed;
- public AutoResizingBuffer(int initialCapacity) {
+ public AutoResizingBuffer(int initialCapacity) throws IOException {
+ AutoResizingBufferMemoryManager.allocate(initialCapacity);
this.array = new byte[initialCapacity];
this.initialCapacity = initialCapacity;
+ this.accountedCapacity = initialCapacity;
}
- public void resizeIfNecessary(int size) {
+ public void resizeIfNecessary(int size) throws IOException {
+ reserveCurrentCapacityIfReleased();
final int currentCapacity = this.array.length;
final double loadFactor = 0.6;
if (currentCapacity < size) {
// Increase by a factor of 1.5x
int growCapacity = currentCapacity + (currentCapacity >> 1);
int newCapacity = Math.max(growCapacity, size);
- this.array = Arrays.copyOf(array, newCapacity);
+ resize(newCapacity);
bufTooLargeCounter = RpcUtils.MAX_BUFFER_OVERSIZE_TIME;
} else if (size > initialCapacity
&& currentCapacity * loadFactor > size
&& bufTooLargeCounter-- <= 0
&& System.currentTimeMillis() - lastShrinkTime >
RpcUtils.MIN_SHRINK_INTERVAL) {
// do not resize if it is reading the request size and do not shrink too
often
- array = Arrays.copyOf(array, size + (currentCapacity - size) / 2);
+ resize(size + (currentCapacity - size) / 2);
bufTooLargeCounter = RpcUtils.MAX_BUFFER_OVERSIZE_TIME;
lastShrinkTime = System.currentTimeMillis();
}
@@ -66,4 +72,41 @@ class AutoResizingBuffer {
public byte[] array() {
return this.array;
}
+
+ @Override
+ public void close() {
+ if (!closed) {
+ AutoResizingBufferMemoryManager.release(accountedCapacity);
+ accountedCapacity = 0;
+ closed = true;
+ }
+ }
+
+ private void resize(int newCapacity) throws IOException {
+ final int currentCapacity = array.length;
+ if (newCapacity > currentCapacity) {
+ final int delta = newCapacity - currentCapacity;
+ AutoResizingBufferMemoryManager.allocate(delta);
+ try {
+ array = Arrays.copyOf(array, newCapacity);
+ accountedCapacity += delta;
+ } catch (RuntimeException | Error e) {
+ AutoResizingBufferMemoryManager.release(delta);
+ throw e;
+ }
+ } else if (newCapacity < currentCapacity) {
+ array = Arrays.copyOf(array, newCapacity);
+ final int delta = currentCapacity - newCapacity;
+ AutoResizingBufferMemoryManager.release(delta);
+ accountedCapacity -= delta;
+ }
+ }
+
+ private void reserveCurrentCapacityIfReleased() throws IOException {
+ if (closed) {
+ AutoResizingBufferMemoryManager.allocate(array.length);
+ accountedCapacity = array.length;
+ closed = false;
+ }
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryControl.java
similarity index 57%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java
copy to
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryControl.java
index dc1ca4e5473..49128727a0b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryControl.java
@@ -17,27 +17,12 @@
* under the License.
*/
-package org.apache.iotdb.commons.memory;
+package org.apache.iotdb.rpc;
-public class MemoryConfig {
- private final MemoryManager globalMemoryManager =
- new MemoryManager("GlobalMemoryManager", null,
Runtime.getRuntime().totalMemory());
+/** Memory accounting hook for {@link AutoResizingBuffer}. */
+public interface AutoResizingBufferMemoryControl {
- private MemoryConfig() {
- // singleton
- }
+ boolean allocate(long sizeInBytes);
- public static MemoryManager global() {
- return MemoryConfigHolder.INSTANCE.globalMemoryManager;
- }
-
- public static MemoryConfig getInstance() {
- return MemoryConfigHolder.INSTANCE;
- }
-
- private static class MemoryConfigHolder {
- private static final MemoryConfig INSTANCE = new MemoryConfig();
-
- private MemoryConfigHolder() {}
- }
+ void release(long sizeInBytes);
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java
new file mode 100644
index 00000000000..d95d51adb48
--- /dev/null
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBufferMemoryManager.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.apache.iotdb.rpc.i18n.RpcMessages;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
+public final class AutoResizingBufferMemoryManager {
+ private static final int MEMORY_ALLOCATE_MAX_RETRIES = 5;
+ private static final long DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS =
2_000L;
+
+ private static final AutoResizingBufferMemoryControl NO_OP_MEMORY_CONTROL =
+ new AutoResizingBufferMemoryControl() {
+ @Override
+ public boolean allocate(long sizeInBytes) {
+ return true;
+ }
+
+ @Override
+ public void release(long sizeInBytes) {
+ // Do nothing.
+ }
+ };
+
+ private static volatile AutoResizingBufferMemoryControl memoryControl =
NO_OP_MEMORY_CONTROL;
+ private static volatile long memoryAllocateRetryIntervalInMs =
+ DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS;
+ private static final AtomicLong memoryAllocationCount = new AtomicLong();
+ private static final AtomicLong memoryAllocationFailureCount = new
AtomicLong();
+
+ private AutoResizingBufferMemoryManager() {
+ // Utility class.
+ }
+
+ public static void setMemoryControl(AutoResizingBufferMemoryControl
memoryControl) {
+ AutoResizingBufferMemoryManager.memoryControl =
Objects.requireNonNull(memoryControl);
+ }
+
+ static void resetMemoryControl() {
+ memoryControl = NO_OP_MEMORY_CONTROL;
+ memoryAllocateRetryIntervalInMs =
DEFAULT_MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS;
+ memoryAllocationCount.set(0);
+ memoryAllocationFailureCount.set(0);
+ }
+
+ static void setMemoryAllocateRetryIntervalInMs(long
memoryAllocateRetryIntervalInMs) {
+ AutoResizingBufferMemoryManager.memoryAllocateRetryIntervalInMs =
+ memoryAllocateRetryIntervalInMs;
+ }
+
+ static void allocate(long sizeInBytes) throws IOException {
+ if (sizeInBytes <= 0) {
+ return;
+ }
+ for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) {
+ if (memoryControl.allocate(sizeInBytes)) {
+ memoryAllocationCount.incrementAndGet();
+ return;
+ }
+ try {
+ Thread.sleep(memoryAllocateRetryIntervalInMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+
String.format(RpcMessages.AUTO_RESIZING_BUFFER_ALLOCATE_INTERRUPTED,
sizeInBytes), e);
+ }
+ }
+ memoryAllocationFailureCount.incrementAndGet();
+ throw new IOException(
+ String.format(
+ RpcMessages.AUTO_RESIZING_BUFFER_ALLOCATE_FAILED,
+ sizeInBytes,
+ MEMORY_ALLOCATE_MAX_RETRIES));
+ }
+
+ static void release(long sizeInBytes) {
+ if (sizeInBytes <= 0) {
+ return;
+ }
+ memoryControl.release(sizeInBytes);
+ }
+
+ public static long getMemoryAllocationCount() {
+ return memoryAllocationCount.get();
+ }
+
+ public static long getMemoryAllocationFailureCount() {
+ return memoryAllocationFailureCount.get();
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
index 35833321ba3..035f8753de1 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
@@ -23,18 +23,24 @@ import org.apache.thrift.TConfiguration;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import java.io.IOException;
+
public class AutoScalingBufferReadTransport extends NonOpenTransport {
private final AutoResizingBuffer buf;
private int pos = 0;
private int limit = 0;
- public AutoScalingBufferReadTransport(int initialCapacity) {
+ public AutoScalingBufferReadTransport(int initialCapacity) throws
IOException {
this.buf = new AutoResizingBuffer(initialCapacity);
}
public void fill(TTransport inTrans, int length) throws TTransportException {
- buf.resizeIfNecessary(length);
+ try {
+ buf.resizeIfNecessary(length);
+ } catch (IOException e) {
+ throw new TTransportException(e);
+ }
inTrans.readAll(buf.array(), 0, length);
pos = 0;
limit = length;
@@ -89,10 +95,16 @@ public class AutoScalingBufferReadTransport extends
NonOpenTransport {
return limit - pos;
}
- public void resizeIfNecessary(int size) {
+ public void resizeIfNecessary(int size) throws IOException {
buf.resizeIfNecessary(size);
}
+ @Override
+ public void close() {
+ super.close();
+ buf.close();
+ }
+
public void limit(int newLimit) {
this.limit = newLimit;
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
index ae833fad239..24278067fcd 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.rpc;
import org.apache.thrift.TConfiguration;
import org.apache.thrift.transport.TTransportException;
+import java.io.IOException;
+
/**
* Note that this class is mainly copied from class {@link
* org.apache.thrift.transport.AutoExpandingBufferWriteTransport}. since that
class does not support
@@ -32,7 +34,7 @@ public class AutoScalingBufferWriteTransport extends
NonOpenTransport {
private final AutoResizingBuffer buf;
private int pos;
- public AutoScalingBufferWriteTransport(int initialCapacity) {
+ public AutoScalingBufferWriteTransport(int initialCapacity) throws
IOException {
this.buf = new AutoResizingBuffer(initialCapacity);
this.pos = 0;
}
@@ -43,8 +45,12 @@ public class AutoScalingBufferWriteTransport extends
NonOpenTransport {
}
@Override
- public void write(byte[] toWrite, int off, int len) {
- buf.resizeIfNecessary(pos + len);
+ public void write(byte[] toWrite, int off, int len) throws
TTransportException {
+ try {
+ buf.resizeIfNecessary(pos + len);
+ } catch (IOException e) {
+ throw new TTransportException(e);
+ }
System.arraycopy(toWrite, off, buf.array(), pos, len);
pos += len;
}
@@ -57,7 +63,13 @@ public class AutoScalingBufferWriteTransport extends
NonOpenTransport {
pos = 0;
}
- public void resizeIfNecessary(int size) {
+ @Override
+ public void close() {
+ super.close();
+ buf.close();
+ }
+
+ public void resizeIfNecessary(int size) throws IOException {
buf.resizeIfNecessary(size);
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index 62abc28e470..88c9683db97 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -34,10 +34,27 @@ public abstract class TCompressedElasticFramedTransport
extends TElasticFramedTr
TTransport underlying,
int thriftDefaultBufferSize,
int thriftMaxFrameSize,
- boolean copyBinary) {
+ boolean copyBinary)
+ throws TTransportException {
super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
- writeCompressBuffer = new
AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
- readCompressBuffer = new
AutoScalingBufferReadTransport(thriftDefaultBufferSize);
+ try {
+ writeCompressBuffer = new
AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
+ readCompressBuffer = new
AutoScalingBufferReadTransport(thriftDefaultBufferSize);
+ } catch (IOException e) {
+ closeAllocatedBuffers();
+ throw new TTransportException(e);
+ }
+ }
+
+ @Override
+ protected void closeAllocatedBuffers() {
+ super.closeAllocatedBuffers();
+ if (writeCompressBuffer != null) {
+ writeCompressBuffer.close();
+ }
+ if (readCompressBuffer != null) {
+ readCompressBuffer.close();
+ }
}
@Override
@@ -80,7 +97,11 @@ public abstract class TCompressedElasticFramedTransport
extends TElasticFramedTr
writeBuffer.reset();
if (thriftDefaultBufferSize < length) {
- writeBuffer.resizeIfNecessary(thriftDefaultBufferSize);
+ try {
+ writeBuffer.resizeIfNecessary(thriftDefaultBufferSize);
+ } catch (IOException e) {
+ throw new TTransportException(e);
+ }
}
underlying.flush();
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 6a174f86b6a..db3d2cc7d95 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -32,6 +32,7 @@ import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import java.io.EOFException;
+import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
@@ -74,7 +75,7 @@ public class TElasticFramedTransport extends TTransport {
}
@Override
- public TTransport getTransport(TTransport trans) {
+ public TTransport getTransport(TTransport trans) throws
TTransportException {
return new TElasticFramedTransport(
trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
}
@@ -84,13 +85,19 @@ public class TElasticFramedTransport extends TTransport {
TTransport underlying,
int thriftDefaultBufferSize,
int thriftMaxFrameSize,
- boolean copyBinary) {
+ boolean copyBinary)
+ throws TTransportException {
this.underlying = underlying;
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
this.thriftMaxFrameSize = thriftMaxFrameSize;
this.copyBinary = copyBinary;
- readBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize);
- writeBuffer = new AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
+ try {
+ readBuffer = new AutoScalingBufferReadTransport(thriftDefaultBufferSize);
+ writeBuffer = new
AutoScalingBufferWriteTransport(thriftDefaultBufferSize);
+ } catch (IOException e) {
+ closeAllocatedBuffers();
+ throw new TTransportException(e);
+ }
}
protected final int thriftDefaultBufferSize;
@@ -115,7 +122,20 @@ public class TElasticFramedTransport extends TTransport {
@Override
public void close() {
- underlying.close();
+ try {
+ underlying.close();
+ } finally {
+ closeAllocatedBuffers();
+ }
+ }
+
+ protected void closeAllocatedBuffers() {
+ if (readBuffer != null) {
+ readBuffer.close();
+ }
+ if (writeBuffer != null) {
+ writeBuffer.close();
+ }
}
@Override
@@ -263,7 +283,11 @@ public class TElasticFramedTransport extends TTransport {
underlying.write(writeBuffer.getBuffer(), 0, length);
writeBuffer.reset();
if (length > thriftDefaultBufferSize) {
- writeBuffer.resizeIfNecessary(thriftDefaultBufferSize);
+ try {
+ writeBuffer.resizeIfNecessary(thriftDefaultBufferSize);
+ } catch (IOException e) {
+ throw new TTransportException(e);
+ }
}
underlying.flush();
}
@@ -292,7 +316,7 @@ public class TElasticFramedTransport extends TTransport {
}
@Override
- public void write(byte[] buf, int off, int len) {
+ public void write(byte[] buf, int off, int len) throws TTransportException {
writeBuffer.write(buf, off, len);
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index 79ebc7983e3..fd9d4ffbba3 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.rpc;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import org.xerial.snappy.Snappy;
import java.io.IOException;
@@ -41,13 +42,13 @@ public class TSnappyElasticFramedTransport extends
TCompressedElasticFramedTrans
}
@Override
- public TTransport getTransport(TTransport trans) {
+ public TTransport getTransport(TTransport trans) throws
TTransportException {
return new TSnappyElasticFramedTransport(
trans, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
}
}
- public TSnappyElasticFramedTransport(TTransport underlying) {
+ public TSnappyElasticFramedTransport(TTransport underlying) throws
TTransportException {
this(underlying, RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY,
RpcUtils.THRIFT_FRAME_MAX_SIZE, true);
}
@@ -55,7 +56,8 @@ public class TSnappyElasticFramedTransport extends
TCompressedElasticFramedTrans
TTransport underlying,
int thriftDefaultBufferSize,
int thriftMaxFrameSize,
- boolean copyBinary) {
+ boolean copyBinary)
+ throws TTransportException {
super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
index 77c2bbf6b97..505e1a99405 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.rpc;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import java.net.SocketException;
@@ -31,7 +32,8 @@ public class TimeoutChangeableTFastFramedTransport extends
TElasticFramedTranspo
private final TSocket underlyingSocket;
public TimeoutChangeableTFastFramedTransport(
- TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize,
boolean copyBinary) {
+ TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize,
boolean copyBinary)
+ throws TTransportException {
super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
this.underlyingSocket = underlying;
}
@@ -65,7 +67,7 @@ public class TimeoutChangeableTFastFramedTransport extends
TElasticFramedTranspo
}
@Override
- public TTransport getTransport(TTransport trans) {
+ public TTransport getTransport(TTransport trans) throws
TTransportException {
if (trans instanceof TSocket) {
return new TimeoutChangeableTFastFramedTransport(
(TSocket) trans, thriftDefaultBufferSize, thriftMaxFrameSize,
copyBinary);
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
index 168f52662aa..ecd807e38d6 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.rpc;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import java.net.SocketException;
@@ -31,7 +32,8 @@ public class TimeoutChangeableTSnappyFramedTransport extends
TSnappyElasticFrame
private final TSocket underlyingSocket;
public TimeoutChangeableTSnappyFramedTransport(
- TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize,
boolean copyBinary) {
+ TSocket underlying, int thriftDefaultBufferSize, int thriftMaxFrameSize,
boolean copyBinary)
+ throws TTransportException {
super(underlying, thriftDefaultBufferSize, thriftMaxFrameSize, copyBinary);
this.underlyingSocket = underlying;
}
@@ -65,7 +67,7 @@ public class TimeoutChangeableTSnappyFramedTransport extends
TSnappyElasticFrame
}
@Override
- public TTransport getTransport(TTransport trans) {
+ public TTransport getTransport(TTransport trans) throws
TTransportException {
if (trans instanceof TSocket) {
return new TimeoutChangeableTSnappyFramedTransport(
(TSocket) trans, thriftDefaultBufferSize, thriftMaxFrameSize,
copyBinary);
diff --git
a/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java
new file mode 100644
index 00000000000..800881208ee
--- /dev/null
+++
b/iotdb-client/service-rpc/src/test/java/org/apache/iotdb/rpc/AutoResizingBufferTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc;
+
+import org.apache.thrift.transport.TMemoryBuffer;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AutoResizingBufferTest {
+
+ private final TestMemoryControl memoryControl = new TestMemoryControl();
+
+ @After
+ public void tearDown() {
+ AutoResizingBufferMemoryManager.resetMemoryControl();
+ }
+
+ @Test
+ public void testAllocateAndReleaseMemoryWhenResizing() throws Exception {
+ AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+
+ AutoResizingBuffer buffer = new AutoResizingBuffer(100);
+ Assert.assertEquals(100, memoryControl.getUsedMemoryInBytes());
+
+ buffer.resizeIfNecessary(200);
+ Assert.assertEquals(200, buffer.array().length);
+ Assert.assertEquals(200, memoryControl.getUsedMemoryInBytes());
+
+ setLastShrinkTime(buffer, System.currentTimeMillis() -
RpcUtils.MIN_SHRINK_INTERVAL - 1);
+ for (int i = 0; i <= RpcUtils.MAX_BUFFER_OVERSIZE_TIME; i++) {
+ buffer.resizeIfNecessary(101);
+ }
+ Assert.assertEquals(150, buffer.array().length);
+ Assert.assertEquals(150, memoryControl.getUsedMemoryInBytes());
+
+ buffer.close();
+ Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+ }
+
+ @Test
+ public void testThrowIOExceptionWhenMemoryIsInsufficient() throws Exception {
+ memoryControl.setLimit(120);
+ AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+ AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1);
+
+ AutoResizingBuffer buffer = new AutoResizingBuffer(100);
+ try {
+ buffer.resizeIfNecessary(200);
+ Assert.fail("Expected IOException");
+ } catch (IOException e) {
+ Assert.assertTrue(e.getMessage().contains("100"));
+ Assert.assertTrue(e.getMessage().contains("5"));
+ } finally {
+ Assert.assertEquals(100, memoryControl.getUsedMemoryInBytes());
+ buffer.close();
+ }
+ }
+
+ @Test
+ public void testMemoryAllocationMetrics() throws Exception {
+ memoryControl.setLimit(120);
+ AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+ AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1);
+
+ AutoResizingBuffer buffer = new AutoResizingBuffer(100);
+ Assert.assertEquals(1,
AutoResizingBufferMemoryManager.getMemoryAllocationCount());
+ Assert.assertEquals(0,
AutoResizingBufferMemoryManager.getMemoryAllocationFailureCount());
+
+ try {
+ buffer.resizeIfNecessary(200);
+ Assert.fail("Expected IOException");
+ } catch (IOException ignored) {
+ Assert.assertEquals(1,
AutoResizingBufferMemoryManager.getMemoryAllocationCount());
+ Assert.assertEquals(1,
AutoResizingBufferMemoryManager.getMemoryAllocationFailureCount());
+ } finally {
+ buffer.close();
+ }
+ }
+
+ @Test
+ public void testElasticFramedTransportReleasesMemoryWhenClosed() throws
Exception {
+ AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+
+ TElasticFramedTransport transport =
+ new TElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true);
+ Assert.assertEquals(200, memoryControl.getUsedMemoryInBytes());
+
+ transport.close();
+ Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+ }
+
+ @Test
+ public void testElasticFramedTransportReleasesMemoryWhenConstructorFails() {
+ memoryControl.setLimit(150);
+ AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+ AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1);
+
+ try {
+ new TElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true);
+ Assert.fail("Expected TTransportException");
+ } catch (TTransportException e) {
+ Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+ }
+ }
+
+ @Test
+ public void testSnappyElasticFramedTransportReleasesMemory() throws
Exception {
+ AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+
+ TSnappyElasticFramedTransport transport =
+ new TSnappyElasticFramedTransport(new TMemoryBuffer(0), 100, 1024,
true);
+ Assert.assertEquals(400, memoryControl.getUsedMemoryInBytes());
+
+ transport.close();
+ Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+ }
+
+ @Test
+ public void
testSnappyElasticFramedTransportReleasesMemoryWhenConstructorFails() {
+ memoryControl.setLimit(350);
+ AutoResizingBufferMemoryManager.setMemoryControl(memoryControl);
+ AutoResizingBufferMemoryManager.setMemoryAllocateRetryIntervalInMs(1);
+
+ try {
+ new TSnappyElasticFramedTransport(new TMemoryBuffer(0), 100, 1024, true);
+ Assert.fail("Expected TTransportException");
+ } catch (TTransportException e) {
+ Assert.assertEquals(0, memoryControl.getUsedMemoryInBytes());
+ }
+ }
+
+ private void setLastShrinkTime(AutoResizingBuffer buffer, long
lastShrinkTime) throws Exception {
+ Field field = AutoResizingBuffer.class.getDeclaredField("lastShrinkTime");
+ field.setAccessible(true);
+ field.set(buffer, lastShrinkTime);
+ }
+
+ private static class TestMemoryControl implements
AutoResizingBufferMemoryControl {
+
+ private final AtomicLong usedMemoryInBytes = new AtomicLong();
+ private long limit = Long.MAX_VALUE;
+
+ @Override
+ public boolean allocate(long sizeInBytes) {
+ while (true) {
+ long current = usedMemoryInBytes.get();
+ long next = current + sizeInBytes;
+ if (next > limit) {
+ return false;
+ }
+ if (usedMemoryInBytes.compareAndSet(current, next)) {
+ return true;
+ }
+ }
+ }
+
+ @Override
+ public void release(long sizeInBytes) {
+ usedMemoryInBytes.addAndGet(-sizeInBytes);
+ }
+
+ private long getUsedMemoryInBytes() {
+ return usedMemoryInBytes.get();
+ }
+
+ private void setLimit(long limit) {
+ this.limit = limit;
+ }
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index da766541786..d3effb28e9b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.memory.MemoryConfig;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
import org.apache.iotdb.commons.service.ServiceType;
@@ -142,6 +143,7 @@ public class ConfigNode extends ServerCommandLine
implements ConfigNodeMBean {
LOGGER.info(ConfigNodeMessages.STARTING_IOTDB,
IoTDBConstant.VERSION_WITH_BUILD);
ConfigNodeStartupCheck checks = new
ConfigNodeStartupCheck(IoTDBConstant.CN_ROLE);
checks.startUpCheck();
+ MemoryConfig.getInstance();
} catch (StartupException | ConfigurationException | IOException e) {
LOGGER.error(ConfigNodeMessages.MEET_ERROR_WHEN_DOING_START_CHECKING, e);
throw new IoTDBException(ConfigNodeMessages.ERROR_STARTING, -1);
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
index 442ac484064..cb71ed3fa4d 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
@@ -58,8 +58,10 @@ import javax.net.ssl.TrustManager;
import java.io.File;
import java.io.FileNotFoundException;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.AccessDeniedException;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -184,10 +186,23 @@ public class Utils {
public static ByteBuffer serializeTSStatus(TSStatus status) throws
TException {
AutoScalingBufferWriteTransport byteBuffer =
- new AutoScalingBufferWriteTransport(TEMP_BUFFER_SIZE);
- TCompactProtocol protocol = new TCompactProtocol(byteBuffer);
- status.write(protocol);
- return ByteBuffer.wrap(byteBuffer.getBuffer());
+ createAutoScalingBufferWriteTransport(TEMP_BUFFER_SIZE);
+ try {
+ TCompactProtocol protocol = new TCompactProtocol(byteBuffer);
+ status.write(protocol);
+ return ByteBuffer.wrap(Arrays.copyOf(byteBuffer.getBuffer(),
byteBuffer.getPos()));
+ } finally {
+ byteBuffer.close();
+ }
+ }
+
+ private static AutoScalingBufferWriteTransport
createAutoScalingBufferWriteTransport(
+ int initialCapacity) throws TException {
+ try {
+ return new AutoScalingBufferWriteTransport(initialCapacity);
+ } catch (IOException e) {
+ throw new TException(e);
+ }
}
public static TSStatus deserializeFrom(ByteBuffer buffer) throws TException {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java
index 3fd49c707fe..07e31454691 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/DataNodeMemoryConfig.java
@@ -168,6 +168,7 @@ public class DataNodeMemoryConfig {
long schemaEngineMemorySize = Runtime.getRuntime().maxMemory() / 10;
long consensusMemorySize = Runtime.getRuntime().maxMemory() / 10;
long pipeMemorySize = Runtime.getRuntime().maxMemory() / 10;
+ long autoResizingBufferMemorySize = Runtime.getRuntime().maxMemory() / 20;
if (memoryAllocateProportion != null) {
String[] proportions = memoryAllocateProportion.split(":");
int proportionSum = 0;
@@ -189,6 +190,11 @@ public class DataNodeMemoryConfig {
if (proportions.length >= 6) {
pipeMemorySize =
maxMemoryAvailable * Integer.parseInt(proportions[4].trim()) /
proportionSum;
+ autoResizingBufferMemorySize =
+ maxMemoryAvailable
+ * Integer.parseInt(proportions[proportions.length -
1].trim())
+ / proportionSum
+ / 2;
} else {
pipeMemorySize =
(maxMemoryAvailable
@@ -211,6 +217,8 @@ public class DataNodeMemoryConfig {
consensusMemoryManager =
onHeapMemoryManager.getOrCreateMemoryManager("Consensus",
consensusMemorySize);
pipeMemoryManager = onHeapMemoryManager.getOrCreateMemoryManager("Pipe",
pipeMemorySize);
+ MemoryConfig.getInstance()
+ .setAutoResizingBufferMemoryControl(onHeapMemoryManager,
autoResizingBufferMemorySize);
LOGGER.info(
"initial allocateMemoryForWrite = {}",
storageEngineMemoryManager.getTotalMemorySizeInBytes());
@@ -224,6 +232,7 @@ public class DataNodeMemoryConfig {
consensusMemoryManager.getTotalMemorySizeInBytes());
LOGGER.info(
"initial allocateMemoryForPipe = {}",
pipeMemoryManager.getTotalMemorySizeInBytes());
+ LOGGER.info("initial allocateMemoryForAutoResizingBuffer = {}",
autoResizingBufferMemorySize);
initSchemaMemoryAllocate(schemaEngineMemoryManager, properties);
initStorageEngineAllocate(storageEngineMemoryManager, properties);
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index facf07fb85f..f176c0e1761 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -796,6 +796,7 @@ partition_table_recover_max_read_megabytes_per_second=10
# Memory Allocation Ratio: StorageEngine, QueryEngine, SchemaEngine,
Consensus, StreamingEngine and Free Memory.
# The parameter form is a:b:c:d:e:f, where a, b, c, d, e and f are integers.
for example: 1:1:1:1:1:1 , 6:2:1:1:1:1
# If you have high level of writing pressure and low level of reading
pressure, please adjust it to for example 6:1:1:1:1:1
+# 50% of Free Memory is used by AutoResizingBuffer memory control.
# effectiveMode: restart
datanode_memory_proportion=3:3:1:1:1:1
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 824ec639ef6..eb24b0eeef8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -335,7 +335,6 @@ public class CommonDescriptor {
properties.getProperty(
"cluster_device_limit_threshold",
String.valueOf(config.getDeviceLimitThreshold()))));
-
config.setPathLogMaxSize(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AutoResizingBufferMemoryMetrics.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AutoResizingBufferMemoryMetrics.java
new file mode 100644
index 00000000000..4cc3fd3cd27
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/AutoResizingBufferMemoryMetrics.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.memory;
+
+import org.apache.iotdb.commons.service.metric.enums.Metric;
+import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.metrics.utils.MetricType;
+import org.apache.iotdb.rpc.AutoResizingBufferMemoryManager;
+
+public class AutoResizingBufferMemoryMetrics implements IMetricSet {
+
+ private static final String ALLOCATION_COUNT =
"auto_resizing_buffer_allocation_count";
+ private static final String ALLOCATION_FAILURE_COUNT =
+ "auto_resizing_buffer_allocation_failure_count";
+ private static final String TOTAL_MEMORY =
"auto_resizing_buffer_total_memory";
+ private static final String USED_MEMORY = "auto_resizing_buffer_used_memory";
+ private static final String AVAILABLE_MEMORY =
"auto_resizing_buffer_available_memory";
+
+ private final MemoryConfig memoryConfig;
+
+ public AutoResizingBufferMemoryMetrics(MemoryConfig memoryConfig) {
+ this.memoryConfig = memoryConfig;
+ }
+
+ @Override
+ public void bindTo(AbstractMetricService metricService) {
+ metricService.createAutoGauge(
+ Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+ MetricLevel.IMPORTANT,
+ AutoResizingBufferMemoryManager.class,
+ ignored -> AutoResizingBufferMemoryManager.getMemoryAllocationCount(),
+ Tag.NAME.toString(),
+ ALLOCATION_COUNT);
+ metricService.createAutoGauge(
+ Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+ MetricLevel.IMPORTANT,
+ AutoResizingBufferMemoryManager.class,
+ ignored ->
AutoResizingBufferMemoryManager.getMemoryAllocationFailureCount(),
+ Tag.NAME.toString(),
+ ALLOCATION_FAILURE_COUNT);
+ metricService.createAutoGauge(
+ Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+ MetricLevel.IMPORTANT,
+ memoryConfig,
+ MemoryConfig::getAutoResizingBufferMemoryTotalSizeInBytes,
+ Tag.NAME.toString(),
+ TOTAL_MEMORY);
+ metricService.createAutoGauge(
+ Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+ MetricLevel.IMPORTANT,
+ memoryConfig,
+ MemoryConfig::getAutoResizingBufferMemoryUsedSizeInBytes,
+ Tag.NAME.toString(),
+ USED_MEMORY);
+ metricService.createAutoGauge(
+ Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+ MetricLevel.IMPORTANT,
+ memoryConfig,
+ MemoryConfig::getAutoResizingBufferMemoryAvailableSizeInBytes,
+ Tag.NAME.toString(),
+ AVAILABLE_MEMORY);
+ }
+
+ @Override
+ public void unbindFrom(AbstractMetricService metricService) {
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+ Tag.NAME.toString(),
+ ALLOCATION_COUNT);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+ Tag.NAME.toString(),
+ ALLOCATION_FAILURE_COUNT);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+ Tag.NAME.toString(),
+ TOTAL_MEMORY);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+ Tag.NAME.toString(),
+ USED_MEMORY);
+ metricService.remove(
+ MetricType.AUTO_GAUGE,
+ Metric.THRIFT_RPC_MEMORY_USAGE.toString(),
+ Tag.NAME.toString(),
+ AVAILABLE_MEMORY);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java
index dc1ca4e5473..c828120eab5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/memory/MemoryConfig.java
@@ -19,12 +19,24 @@
package org.apache.iotdb.commons.memory;
+import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.rpc.AutoResizingBufferMemoryControl;
+import org.apache.iotdb.rpc.AutoResizingBufferMemoryManager;
+
public class MemoryConfig {
+ private static final String AUTO_RESIZING_BUFFER_MEMORY_MANAGER_NAME =
"AutoResizingBuffer";
+ private static final String AUTO_RESIZING_BUFFER_MEMORY_BLOCK_NAME =
"AutoResizingBufferBlock";
+
private final MemoryManager globalMemoryManager =
new MemoryManager("GlobalMemoryManager", null,
Runtime.getRuntime().totalMemory());
+ private MemoryManager autoResizingBufferMemoryManagerParent;
+ private IMemoryBlock autoResizingBufferMemoryBlock;
+ private boolean isAutoResizingBufferMemoryControlEnabled;
+
private MemoryConfig() {
- // singleton
+ initAutoResizingBufferMemoryControl();
+ MetricService.getInstance().addMetricSet(new
AutoResizingBufferMemoryMetrics(this));
}
public static MemoryManager global() {
@@ -40,4 +52,77 @@ public class MemoryConfig {
private MemoryConfigHolder() {}
}
+
+ private void initAutoResizingBufferMemoryControl() {
+ AutoResizingBufferMemoryManager.setMemoryControl(
+ new AutoResizingBufferMemoryControl() {
+ @Override
+ public synchronized boolean allocate(long sizeInBytes) {
+ IMemoryBlock memoryBlock = getAutoResizingBufferMemoryBlock();
+ if (memoryBlock == null) {
+ return true;
+ }
+ return memoryBlock.allocate(sizeInBytes);
+ }
+
+ @Override
+ public synchronized void release(long sizeInBytes) {
+ IMemoryBlock memoryBlock = getAutoResizingBufferMemoryBlock();
+ if (memoryBlock != null) {
+ memoryBlock.release(sizeInBytes);
+ }
+ }
+ });
+ }
+
+ public synchronized void setAutoResizingBufferMemoryControl(
+ MemoryManager parentMemoryManager, long memorySizeInBytes) {
+ if (autoResizingBufferMemoryManagerParent != null) {
+ autoResizingBufferMemoryManagerParent.releaseChildMemoryManager(
+ AUTO_RESIZING_BUFFER_MEMORY_MANAGER_NAME);
+ }
+ autoResizingBufferMemoryManagerParent = parentMemoryManager;
+ autoResizingBufferMemoryBlock = null;
+
+ if (memorySizeInBytes <= 0) {
+ isAutoResizingBufferMemoryControlEnabled = false;
+ return;
+ }
+
+ MemoryManager autoResizingBufferMemoryManager =
+ parentMemoryManager.getOrCreateMemoryManager(
+ AUTO_RESIZING_BUFFER_MEMORY_MANAGER_NAME, memorySizeInBytes, true);
+ if (autoResizingBufferMemoryManager == null) {
+ isAutoResizingBufferMemoryControlEnabled = false;
+ return;
+ }
+ autoResizingBufferMemoryBlock =
+ autoResizingBufferMemoryManager.exactAllocate(
+ AUTO_RESIZING_BUFFER_MEMORY_BLOCK_NAME, memorySizeInBytes,
MemoryBlockType.DYNAMIC);
+ isAutoResizingBufferMemoryControlEnabled = true;
+ }
+
+ private synchronized IMemoryBlock getAutoResizingBufferMemoryBlock() {
+ if (!isAutoResizingBufferMemoryControlEnabled
+ || autoResizingBufferMemoryBlock == null
+ || autoResizingBufferMemoryBlock.isReleased()) {
+ return null;
+ }
+ return autoResizingBufferMemoryBlock;
+ }
+
+ public synchronized long getAutoResizingBufferMemoryTotalSizeInBytes() {
+ IMemoryBlock memoryBlock = getAutoResizingBufferMemoryBlock();
+ return memoryBlock == null ? 0 : memoryBlock.getTotalMemorySizeInBytes();
+ }
+
+ public synchronized long getAutoResizingBufferMemoryUsedSizeInBytes() {
+ IMemoryBlock memoryBlock = getAutoResizingBufferMemoryBlock();
+ return memoryBlock == null ? 0 : memoryBlock.getUsedMemoryInBytes();
+ }
+
+ public synchronized long getAutoResizingBufferMemoryAvailableSizeInBytes() {
+ IMemoryBlock memoryBlock = getAutoResizingBufferMemoryBlock();
+ return memoryBlock == null ? 0 : memoryBlock.getFreeMemoryInBytes();
+ }
}
diff --git a/pom.xml b/pom.xml
index 3e0aa631a45..ac80880bd42 100644
--- a/pom.xml
+++ b/pom.xml
@@ -770,6 +770,7 @@
<exclude>**/.gitmodules</exclude>
<exclude>**/.git-blame-ignore-revs</exclude>
<exclude>**/git.properties</exclude>
+ <exclude>AGENTS.md</exclude>
<!-- Maven related files -->
<exclude>**/target/**</exclude>
<!-- Eclipse related files -->