This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch preallocate_array_list
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/preallocate_array_list by this
push:
new 319ecca62a4 add gc throttle
319ecca62a4 is described below
commit 319ecca62a475cbdc288155fd1fa453b968ff5e7
Author: Tian Jiang <[email protected]>
AuthorDate: Tue Apr 22 16:53:48 2025 +0800
add gc throttle
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 19 ++++++++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 15 +++++++++
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 36 ++++++++++++++++++++++
.../db/utils/datastructure/AlignedTVList.java | 2 +-
4 files changed, 71 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 00310772704..b5d5d186953 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1151,6 +1151,9 @@ public class IoTDBConfig {
private CompressionType WALCompressionAlgorithm = CompressionType.LZ4;
+ private boolean enableGCThrottle = false;
+ private int gcThrottleTimeMs = 1000;
+
IoTDBConfig() {}
public int getMaxLogEntriesNumPerBatch() {
@@ -4079,4 +4082,20 @@ public class IoTDBConfig {
public void setWALCompressionAlgorithm(CompressionType
WALCompressionAlgorithm) {
this.WALCompressionAlgorithm = WALCompressionAlgorithm;
}
+
+ public int getGcThrottleTimeMs() {
+ return gcThrottleTimeMs;
+ }
+
+ public void setGcThrottleTimeMs(int gcThrottleTimeMs) {
+ this.gcThrottleTimeMs = gcThrottleTimeMs;
+ }
+
+ public boolean isEnableGCThrottle() {
+ return enableGCThrottle;
+ }
+
+ public void setEnableGCThrottle(boolean enableGCThrottle) {
+ this.enableGCThrottle = enableGCThrottle;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2ac818de64c..e6e5377009d 100755
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1074,6 +1074,21 @@ public class IoTDBDescriptor {
"detail_container_min_degrade_memory_in_bytes",
String.valueOf(conf.getDetailContainerMinDegradeMemoryInBytes()))));
+ conf.setEnableGCThrottle(Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_gc_throttle",
+ String.valueOf(conf.isEnableGCThrottle())
+ )
+ ));
+ conf.setGcThrottleTimeMs(
+ Integer.parseInt(
+ properties.getProperty(
+ "gc_throttle_time_ms",
+ String.valueOf(conf.getGcThrottleTimeMs())
+ )
+ )
+ );
+
loadIoTConsensusProps(properties);
loadIoTConsensusV2Props(properties);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 75581413082..e4b9f4514dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.protocol.thrift.impl;
+import java.util.Random;
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -42,6 +43,7 @@ import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.NonAlignedFullPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.column.ColumnHeader;
+import org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.audit.AuditLogger;
@@ -338,6 +340,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+ gcThrottle();
+
statementType = s.getType();
if (ENABLE_AUDIT_LOG) {
AuditLogger.log(statement, s);
@@ -496,6 +500,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+ gcThrottle();
if (ENABLE_AUDIT_LOG) {
AuditLogger.log(String.format("execute Raw Data Query: %s", req), s);
@@ -588,6 +593,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+ gcThrottle();
if (ENABLE_AUDIT_LOG) {
AuditLogger.log(String.format("Last Data Query: %s", req), s);
@@ -682,6 +688,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+ gcThrottle();
queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
// create and cache dataset
@@ -1022,6 +1029,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+ gcThrottle();
if (ENABLE_AUDIT_LOG) {
AuditLogger.log(String.format("Last Data Query: %s", req), s);
@@ -1713,6 +1721,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+ gcThrottle();
if (ENABLE_AUDIT_LOG) {
AuditLogger.log(statement, s);
@@ -1916,6 +1925,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(),
statement);
+ gcThrottle();
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
@@ -1985,6 +1995,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(),
statement);
+ gcThrottle();
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
@@ -2055,6 +2066,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(),
statement);
+ gcThrottle();
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
@@ -2127,6 +2139,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(),
statement);
+ gcThrottle();
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
@@ -2197,6 +2210,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(),
statement);
+ gcThrottle();
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
@@ -2261,6 +2275,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(),
statement);
+ gcThrottle();
// Step 2: call the coordinator
long queryId = SESSION_MANAGER.requestQueryId();
@@ -2340,6 +2355,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(),
statement);
+ gcThrottle();
long queryId = SESSION_MANAGER.requestQueryId();
ExecutionResult result =
@@ -2657,6 +2673,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(),
statement);
+ gcThrottle();
if (ENABLE_AUDIT_LOG) {
AuditLogger.log(String.format("execute Query: %s", statement),
statement);
@@ -2979,6 +2996,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
quota =
DataNodeThrottleQuotaManager.getInstance()
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(),
statement);
+ gcThrottle();
// Step 2: Call the coordinator
final long queryId = SESSION_MANAGER.requestQueryId();
@@ -3045,4 +3063,22 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
PipeDataNodeAgent.receiver().legacy().handleClientExit();
SubscriptionAgent.receiver().handleClientExit();
}
+
+ private static final Random throttleRandom = new Random();
+ public static void gcThrottle() {
+ if (!IoTDBDescriptor.getInstance().getConfig().isEnableGCThrottle()) {
+ return;
+ }
+ int minimumGcTimePercentToThrottle = 20;
+ long gcTimePercentage =
JvmGcMonitorMetrics.getInstance().getGcData().getGcTimePercentage();
+ // if gc time is above 20%, pause the request with the probability equals
to gc time percent
+ if (gcTimePercentage > minimumGcTimePercentToThrottle &&
throttleRandom.nextInt(100) < gcTimePercentage) {
+ try {
+
Thread.sleep(IoTDBDescriptor.getInstance().getConfig().getGcThrottleTimeMs());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 80649eb182d..bd27e34ced2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -686,8 +686,8 @@ public abstract class AlignedTVList extends TVList {
}
columnValues.clear();
}
- memoryBinaryChunkSize[i] = 0;
}
+ Arrays.fill(memoryBinaryChunkSize, 0);
}
@Override