This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/PrintQuery in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 32d9b4f56d82c827c87eda6aa407b1ff4a6a30f3 Author: JackieTien97 <[email protected]> AuthorDate: Tue Dec 10 20:20:24 2024 +0800 Sampling queries in each DN --- .../assembly/resources/conf/logback-datanode.xml | 18 ++++++++++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 15 +++++++++ .../iotdb/db/queryengine/plan/Coordinator.java | 20 +++++++++++ .../conf/iotdb-system.properties.template | 7 ++++ .../apache/iotdb/commons/conf/CommonConfig.java | 39 ++++++++++++++++++++++ .../apache/iotdb/commons/conf/IoTDBConstant.java | 2 ++ 6 files changed, 101 insertions(+) diff --git a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml index 698bdfc3a82..e15038ea3ef 100644 --- a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml +++ b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml @@ -177,6 +177,21 @@ <level>INFO</level> </filter> </appender> + <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="SAMPLED_QUERIES"> + <file>${IOTDB_HOME}/logs/log_datanode_sampled_queries.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${IOTDB_HOME}/logs/log-datanode-slow-sql-%d{yyyyMMdd}.log.gz</fileNamePattern> + <maxHistory>30</maxHistory> + </rollingPolicy> + <append>true</append> + <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> + <pattern>%d [%t] %-5p %C{25}:%L - %m %n</pattern> + <charset>utf-8</charset> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>INFO</level> + </filter> + </appender> <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="COMPACTION"> <file>${IOTDB_HOME}/logs/log_datanode_compaction.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> @@ -245,6 +260,9 @@ <logger level="info" name="SLOW_SQL"> <appender-ref ref="SLOW_SQL"/> </logger> + <logger level="info" name="SAMPLED_QUERIES"> + <appender-ref ref="SAMPLED_QUERIES"/> + </logger> <logger level="info" name="QUERY_FREQUENCY"> <appender-ref ref="QUERY_FREQUENCY"/> </logger> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index b652acf7329..3990da8e9cd 100755 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2870,6 +2870,21 @@ public class IoTDBDescriptor { } else { BinaryAllocator.getInstance().close(true); } + + // update query_sample_throughput_bytes_per_sec + commonDescriptor + .getConfig() + .setQuerySamplingRateLimit( + Integer.parseInt( + Optional.ofNullable( + properties.getProperty( + "query_sample_throughput_bytes_per_sec", + ConfigurationFileUtils.getConfigurationDefaultValue( + "query_sample_throughput_bytes_per_sec"))) + .map(String::trim) + .orElse( + ConfigurationFileUtils.getConfigurationDefaultValue( + "query_sample_throughput_bytes_per_sec")))); } catch (Exception e) { if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 05c00e5322e..098f9abd415 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -26,6 +26,8 @@ import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -121,10 +123,14 @@ public class Coordinator { private static final Logger LOGGER = LoggerFactory.getLogger(Coordinator.class); private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10; private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); private static final Logger SLOW_SQL_LOGGER = LoggerFactory.getLogger(IoTDBConstant.SLOW_SQL_LOGGER_NAME); + private static final Logger SAMPLED_QUERIES_LOGGER = + LoggerFactory.getLogger(IoTDBConstant.SAMPLED_QUERIES_LOGGER_NAME); + private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> SYNC_INTERNAL_SERVICE_CLIENT_MANAGER = new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>() @@ -478,12 +484,26 @@ public class Coordinator { queryExecutionMap.remove(queryId); if (queryExecution.isQuery()) { long costTime = queryExecution.getTotalExecutionTime(); + // print slow query if (costTime / 1_000_000 >= CONFIG.getSlowQueryThreshold()) { SLOW_SQL_LOGGER.info( "Cost: {} ms, {}", costTime / 1_000_000, getContentOfRequest(nativeApiRequest, queryExecution)); } + + // sample query + if (COMMON_CONFIG.isEnableQuerySampling()) { // sampling is enabled + String queryRequest = getContentOfRequest(nativeApiRequest, queryExecution); + if (COMMON_CONFIG.isQuerySamplingHasRateLimit()) { + if (COMMON_CONFIG.getQuerySamplingRateLimiter().tryAcquire(queryRequest.length())) { + SAMPLED_QUERIES_LOGGER.info(queryRequest); + } + } else { + // no limit, always sampled + SAMPLED_QUERIES_LOGGER.info(queryRequest); + } + } } } } diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 2efae33aae9..7edd2bc8bd4 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1022,6 +1022,13 @@ sort_buffer_size_in_bytes=1048576 # Datatype: int merge_threshold_of_explain_analyze=10 +# The limit of query sampling throughput merge can reach per second +# 0 means no queries will be recorded +# negative number means no limit, all queries will be recorded. +# effectiveMode: hot_reload +# Datatype: int, Unit: bytes +query_sample_throughput_bytes_per_sec=16 + #################### ### TTL Configuration #################### diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 2d1585d6525..8ba0eed9107 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.commons.utils.KillPoint.KillPoint; import org.apache.iotdb.rpc.RpcUtils; +import com.google.common.util.concurrent.RateLimiter; import org.apache.tsfile.fileSystem.FSType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -343,6 +344,14 @@ public class CommonConfig { private volatile long remoteWriteMaxRetryDurationInMs = 60000; + private final RateLimiter querySamplingRateLimiter = RateLimiter.create(16); + // if querySamplingRateLimiter < 0, means that there is no rate limit, we need to full sample all + // the queries + private volatile boolean querySamplingHasRateLimit = true; + // if querySamplingRateLimiter != 0, enableQuerySampling is true; querySamplingRateLimiter = 0, + // enableQuerySampling is false + private volatile boolean enableQuerySampling = true; + CommonConfig() { // Empty constructor } @@ -1529,4 +1538,34 @@ public class CommonConfig { public void setLog2SizeClassGroup(int log2SizeClassGroup) { this.log2SizeClassGroup = log2SizeClassGroup; } + + /** + * @param querySamplingRateLimit query_sample_throughput_bytes_per_sec + */ + public void setQuerySamplingRateLimit(int querySamplingRateLimit) { + if (querySamplingRateLimit > 0) { + this.querySamplingRateLimiter.setRate(querySamplingRateLimit); + this.enableQuerySampling = true; + this.querySamplingHasRateLimit = true; + } else if (querySamplingRateLimit == 0) { + // querySamplingRateLimit = 0, means that we sample no queries + this.enableQuerySampling = false; + } else { + // querySamplingRateLimit < 0, means that we need to full sample all queries + this.enableQuerySampling = true; + this.querySamplingHasRateLimit = false; + } + } + + public boolean isQuerySamplingHasRateLimit() { + return querySamplingHasRateLimit; + } + + public RateLimiter getQuerySamplingRateLimiter() { + return querySamplingRateLimiter; + } + + public boolean isEnableQuerySampling() { + return enableQuerySampling; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java index c86900e5f99..de7aeae6e05 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java @@ -108,6 +108,8 @@ public class IoTDBConstant { public static final String AUDIT_LOGGER_NAME = "IoTDB_AUDIT_LOGGER"; public static final String SLOW_SQL_LOGGER_NAME = "SLOW_SQL"; + public static final String SAMPLED_QUERIES_LOGGER_NAME = "SAMPLED_QUERIES"; + public static final String COMPACTION_LOGGER_NAME = "COMPACTION"; public static final String EXPLAIN_ANALYZE_LOGGER_NAME = "EXPLAIN_ANALYZE";
