This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/PrintQuery
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 32d9b4f56d82c827c87eda6aa407b1ff4a6a30f3
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Dec 10 20:20:24 2024 +0800

    Sampling queries in each DN
---
 .../assembly/resources/conf/logback-datanode.xml   | 18 ++++++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 15 +++++++++
 .../iotdb/db/queryengine/plan/Coordinator.java     | 20 +++++++++++
 .../conf/iotdb-system.properties.template          |  7 ++++
 .../apache/iotdb/commons/conf/CommonConfig.java    | 39 ++++++++++++++++++++++
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |  2 ++
 6 files changed, 101 insertions(+)

diff --git 
a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml 
b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml
index 698bdfc3a82..e15038ea3ef 100644
--- a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml
+++ b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml
@@ -177,6 +177,21 @@
             <level>INFO</level>
         </filter>
     </appender>
+    <appender class="ch.qos.logback.core.rolling.RollingFileAppender" 
name="SAMPLED_QUERIES">
+        <file>${IOTDB_HOME}/logs/log_datanode_sampled_queries.log</file>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            
<fileNamePattern>${IOTDB_HOME}/logs/log-datanode-slow-sql-%d{yyyyMMdd}.log.gz</fileNamePattern>
+            <maxHistory>30</maxHistory>
+        </rollingPolicy>
+        <append>true</append>
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <pattern>%d [%t] %-5p %C{25}:%L - %m %n</pattern>
+            <charset>utf-8</charset>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>INFO</level>
+        </filter>
+    </appender>
     <appender class="ch.qos.logback.core.rolling.RollingFileAppender" 
name="COMPACTION">
         <file>${IOTDB_HOME}/logs/log_datanode_compaction.log</file>
         <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
@@ -245,6 +260,9 @@
     <logger level="info" name="SLOW_SQL">
         <appender-ref ref="SLOW_SQL"/>
     </logger>
+    <logger level="info" name="SAMPLED_QUERIES">
+        <appender-ref ref="SAMPLED_QUERIES"/>
+    </logger>
     <logger level="info" name="QUERY_FREQUENCY">
         <appender-ref ref="QUERY_FREQUENCY"/>
     </logger>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index b652acf7329..3990da8e9cd 100755
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2870,6 +2870,21 @@ public class IoTDBDescriptor {
       } else {
         BinaryAllocator.getInstance().close(true);
       }
+
+      // update query_sample_throughput_bytes_per_sec
+      commonDescriptor
+          .getConfig()
+          .setQuerySamplingRateLimit(
+              Integer.parseInt(
+                  Optional.ofNullable(
+                          properties.getProperty(
+                              "query_sample_throughput_bytes_per_sec",
+                              
ConfigurationFileUtils.getConfigurationDefaultValue(
+                                  "query_sample_throughput_bytes_per_sec")))
+                      .map(String::trim)
+                      .orElse(
+                          ConfigurationFileUtils.getConfigurationDefaultValue(
+                              "query_sample_throughput_bytes_per_sec"))));
     } catch (Exception e) {
       if (e instanceof InterruptedException) {
         Thread.currentThread().interrupt();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 05c00e5322e..098f9abd415 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -26,6 +26,8 @@ import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -121,10 +123,14 @@ public class Coordinator {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(Coordinator.class);
   private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10;
   private static final IoTDBConfig CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
+  private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
 
   private static final Logger SLOW_SQL_LOGGER =
       LoggerFactory.getLogger(IoTDBConstant.SLOW_SQL_LOGGER_NAME);
 
+  private static final Logger SAMPLED_QUERIES_LOGGER =
+      LoggerFactory.getLogger(IoTDBConstant.SAMPLED_QUERIES_LOGGER_NAME);
+
   private static final IClientManager<TEndPoint, 
SyncDataNodeInternalServiceClient>
       SYNC_INTERNAL_SERVICE_CLIENT_MANAGER =
           new IClientManager.Factory<TEndPoint, 
SyncDataNodeInternalServiceClient>()
@@ -478,12 +484,26 @@ public class Coordinator {
         queryExecutionMap.remove(queryId);
         if (queryExecution.isQuery()) {
           long costTime = queryExecution.getTotalExecutionTime();
+          // print slow query
           if (costTime / 1_000_000 >= CONFIG.getSlowQueryThreshold()) {
             SLOW_SQL_LOGGER.info(
                 "Cost: {} ms, {}",
                 costTime / 1_000_000,
                 getContentOfRequest(nativeApiRequest, queryExecution));
           }
+
+          // sample query
+          if (COMMON_CONFIG.isEnableQuerySampling()) { // sampling is enabled
+            String queryRequest = getContentOfRequest(nativeApiRequest, 
queryExecution);
+            if (COMMON_CONFIG.isQuerySamplingHasRateLimit()) {
+              if 
(COMMON_CONFIG.getQuerySamplingRateLimiter().tryAcquire(queryRequest.length())) 
{
+                SAMPLED_QUERIES_LOGGER.info(queryRequest);
+              }
+            } else {
+              // no limit, always sampled
+              SAMPLED_QUERIES_LOGGER.info(queryRequest);
+            }
+          }
         }
       }
     }
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 2efae33aae9..7edd2bc8bd4 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1022,6 +1022,13 @@ sort_buffer_size_in_bytes=1048576
 # Datatype: int
 merge_threshold_of_explain_analyze=10
 
+# The limit of query sampling throughput merge can reach per second
+# 0 means no queries will be recorded
+# negative number means no limit, all queries will be recorded.
+# effectiveMode: hot_reload
+# Datatype: int, Unit: bytes
+query_sample_throughput_bytes_per_sec=16
+
 ####################
 ### TTL Configuration
 ####################
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 2d1585d6525..8ba0eed9107 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
 import org.apache.iotdb.rpc.RpcUtils;
 
+import com.google.common.util.concurrent.RateLimiter;
 import org.apache.tsfile.fileSystem.FSType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -343,6 +344,14 @@ public class CommonConfig {
 
   private volatile long remoteWriteMaxRetryDurationInMs = 60000;
 
+  private final RateLimiter querySamplingRateLimiter = RateLimiter.create(16);
+  // if querySamplingRateLimiter < 0, means that there is no rate limit, we 
need to full sample all
+  // the queries
+  private volatile boolean querySamplingHasRateLimit = true;
+  // if querySamplingRateLimiter != 0, enableQuerySampling is true; 
querySamplingRateLimiter = 0,
+  // enableQuerySampling is false
+  private volatile boolean enableQuerySampling = true;
+
   CommonConfig() {
     // Empty constructor
   }
@@ -1529,4 +1538,34 @@ public class CommonConfig {
   public void setLog2SizeClassGroup(int log2SizeClassGroup) {
     this.log2SizeClassGroup = log2SizeClassGroup;
   }
+
+  /**
+   * @param querySamplingRateLimit query_sample_throughput_bytes_per_sec
+   */
+  public void setQuerySamplingRateLimit(int querySamplingRateLimit) {
+    if (querySamplingRateLimit > 0) {
+      this.querySamplingRateLimiter.setRate(querySamplingRateLimit);
+      this.enableQuerySampling = true;
+      this.querySamplingHasRateLimit = true;
+    } else if (querySamplingRateLimit == 0) {
+      // querySamplingRateLimit = 0, means that we sample no queries
+      this.enableQuerySampling = false;
+    } else {
+      // querySamplingRateLimit < 0, means that we need to full sample all 
queries
+      this.enableQuerySampling = true;
+      this.querySamplingHasRateLimit = false;
+    }
+  }
+
+  public boolean isQuerySamplingHasRateLimit() {
+    return querySamplingHasRateLimit;
+  }
+
+  public RateLimiter getQuerySamplingRateLimiter() {
+    return querySamplingRateLimiter;
+  }
+
+  public boolean isEnableQuerySampling() {
+    return enableQuerySampling;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index c86900e5f99..de7aeae6e05 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -108,6 +108,8 @@ public class IoTDBConstant {
 
   public static final String AUDIT_LOGGER_NAME = "IoTDB_AUDIT_LOGGER";
   public static final String SLOW_SQL_LOGGER_NAME = "SLOW_SQL";
+  public static final String SAMPLED_QUERIES_LOGGER_NAME = "SAMPLED_QUERIES";
+
   public static final String COMPACTION_LOGGER_NAME = "COMPACTION";
   public static final String EXPLAIN_ANALYZE_LOGGER_NAME = "EXPLAIN_ANALYZE";
 

Reply via email to