This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch preallocate_array_list
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/preallocate_array_list by this 
push:
     new 319ecca62a4 add gc throttle
319ecca62a4 is described below

commit 319ecca62a475cbdc288155fd1fa453b968ff5e7
Author: Tian Jiang <[email protected]>
AuthorDate: Tue Apr 22 16:53:48 2025 +0800

    add gc throttle
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 19 ++++++++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 15 +++++++++
 .../protocol/thrift/impl/ClientRPCServiceImpl.java | 36 ++++++++++++++++++++++
 .../db/utils/datastructure/AlignedTVList.java      |  2 +-
 4 files changed, 71 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 00310772704..b5d5d186953 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1151,6 +1151,9 @@ public class IoTDBConfig {
 
   private CompressionType WALCompressionAlgorithm = CompressionType.LZ4;
 
+  private boolean enableGCThrottle = false;
+  private int gcThrottleTimeMs = 1000;
+
   IoTDBConfig() {}
 
   public int getMaxLogEntriesNumPerBatch() {
@@ -4079,4 +4082,20 @@ public class IoTDBConfig {
   public void setWALCompressionAlgorithm(CompressionType 
WALCompressionAlgorithm) {
     this.WALCompressionAlgorithm = WALCompressionAlgorithm;
   }
+
+  public int getGcThrottleTimeMs() {
+    return gcThrottleTimeMs;
+  }
+
+  public void setGcThrottleTimeMs(int gcThrottleTimeMs) {
+    this.gcThrottleTimeMs = gcThrottleTimeMs;
+  }
+
+  public boolean isEnableGCThrottle() {
+    return enableGCThrottle;
+  }
+
+  public void setEnableGCThrottle(boolean enableGCThrottle) {
+    this.enableGCThrottle = enableGCThrottle;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2ac818de64c..e6e5377009d 100755
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1074,6 +1074,21 @@ public class IoTDBDescriptor {
                 "detail_container_min_degrade_memory_in_bytes",
                 
String.valueOf(conf.getDetailContainerMinDegradeMemoryInBytes()))));
 
+    conf.setEnableGCThrottle(Boolean.parseBoolean(
+        properties.getProperty(
+            "enable_gc_throttle",
+            String.valueOf(conf.isEnableGCThrottle())
+        )
+    ));
+    conf.setGcThrottleTimeMs(
+        Integer.parseInt(
+            properties.getProperty(
+                "gc_throttle_time_ms",
+                String.valueOf(conf.getGcThrottleTimeMs())
+            )
+        )
+    );
+
     loadIoTConsensusProps(properties);
     loadIoTConsensusV2Props(properties);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 75581413082..e4b9f4514dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.protocol.thrift.impl;
 
+import java.util.Random;
 import org.apache.iotdb.common.rpc.thrift.TAggregationType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -42,6 +43,7 @@ import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.NonAlignedFullPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.schema.column.ColumnHeader;
+import org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.audit.AuditLogger;
@@ -338,6 +340,8 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         quota =
             DataNodeThrottleQuotaManager.getInstance()
                 .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+        gcThrottle();
+
         statementType = s.getType();
         if (ENABLE_AUDIT_LOG) {
           AuditLogger.log(statement, s);
@@ -496,6 +500,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       quota =
           DataNodeThrottleQuotaManager.getInstance()
               .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+      gcThrottle();
 
       if (ENABLE_AUDIT_LOG) {
         AuditLogger.log(String.format("execute Raw Data Query: %s", req), s);
@@ -588,6 +593,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       quota =
           DataNodeThrottleQuotaManager.getInstance()
               .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+      gcThrottle();
 
       if (ENABLE_AUDIT_LOG) {
         AuditLogger.log(String.format("Last Data Query: %s", req), s);
@@ -682,6 +688,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       quota =
           DataNodeThrottleQuotaManager.getInstance()
               .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+      gcThrottle();
 
       queryId = SESSION_MANAGER.requestQueryId(clientSession, req.statementId);
       // create and cache dataset
@@ -1022,6 +1029,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       quota =
           DataNodeThrottleQuotaManager.getInstance()
               .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+      gcThrottle();
 
       if (ENABLE_AUDIT_LOG) {
         AuditLogger.log(String.format("Last Data Query: %s", req), s);
@@ -1713,6 +1721,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
             quota =
                 DataNodeThrottleQuotaManager.getInstance()
                     
.checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), s);
+            gcThrottle();
 
             if (ENABLE_AUDIT_LOG) {
               AuditLogger.log(statement, s);
@@ -1916,6 +1925,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       quota =
           DataNodeThrottleQuotaManager.getInstance()
               .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), 
statement);
+      gcThrottle();
 
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
@@ -1985,6 +1995,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       quota =
           DataNodeThrottleQuotaManager.getInstance()
               .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), 
statement);
+      gcThrottle();
 
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
@@ -2055,6 +2066,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       quota =
           DataNodeThrottleQuotaManager.getInstance()
               .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), 
statement);
+      gcThrottle();
 
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
@@ -2127,6 +2139,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       quota =
           DataNodeThrottleQuotaManager.getInstance()
               .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), 
statement);
+      gcThrottle();
 
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
@@ -2197,6 +2210,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       quota =
           DataNodeThrottleQuotaManager.getInstance()
               .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), 
statement);
+      gcThrottle();
 
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
@@ -2261,6 +2275,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       quota =
           DataNodeThrottleQuotaManager.getInstance()
               .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), 
statement);
+      gcThrottle();
 
       // Step 2: call the coordinator
       long queryId = SESSION_MANAGER.requestQueryId();
@@ -2340,6 +2355,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       quota =
           DataNodeThrottleQuotaManager.getInstance()
               .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), 
statement);
+      gcThrottle();
 
       long queryId = SESSION_MANAGER.requestQueryId();
       ExecutionResult result =
@@ -2657,6 +2673,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       quota =
           DataNodeThrottleQuotaManager.getInstance()
               .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), 
statement);
+      gcThrottle();
 
       if (ENABLE_AUDIT_LOG) {
         AuditLogger.log(String.format("execute Query: %s", statement), 
statement);
@@ -2979,6 +2996,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       quota =
           DataNodeThrottleQuotaManager.getInstance()
               .checkQuota(SESSION_MANAGER.getCurrSession().getUsername(), 
statement);
+      gcThrottle();
 
       // Step 2: Call the coordinator
       final long queryId = SESSION_MANAGER.requestQueryId();
@@ -3045,4 +3063,22 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     PipeDataNodeAgent.receiver().legacy().handleClientExit();
     SubscriptionAgent.receiver().handleClientExit();
   }
+
+  private static final Random throttleRandom = new Random();
+  public static void gcThrottle() {
+    if (!IoTDBDescriptor.getInstance().getConfig().isEnableGCThrottle()) {
+      return;
+    }
+    int minimumGcTimePercentToThrottle = 20;
+    long gcTimePercentage = 
JvmGcMonitorMetrics.getInstance().getGcData().getGcTimePercentage();
+    // if gc time is above 20%, pause the request with the probability equals 
to gc time percent
+    if (gcTimePercentage > minimumGcTimePercentToThrottle && 
throttleRandom.nextInt(100) < gcTimePercentage) {
+        try {
+          
Thread.sleep(IoTDBDescriptor.getInstance().getConfig().getGcThrottleTimeMs());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 80649eb182d..bd27e34ced2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -686,8 +686,8 @@ public abstract class AlignedTVList extends TVList {
         }
         columnValues.clear();
       }
-      memoryBinaryChunkSize[i] = 0;
     }
+    Arrays.fill(memoryBinaryChunkSize, 0);
   }
 
   @Override

Reply via email to