This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4c213313a9f Sampling queries in each DN
4c213313a9f is described below
commit 4c213313a9f16645fa2c1dd3d58e344b9ec7c167
Author: Jackie Tien <[email protected]>
AuthorDate: Fri Dec 13 08:33:09 2024 +0800
Sampling queries in each DN
---
.../assembly/resources/conf/logback-datanode.xml | 18 ++++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 22 ++++++++++
.../legacy/IoTDBLegacyPipeReceiverAgent.java | 3 +-
.../protocol/legacy/loader/DeletionLoader.java | 3 +-
.../protocol/legacy/loader/TsFileLoader.java | 3 +-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 3 +-
.../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 3 +-
.../rest/v1/impl/GrafanaApiServiceImpl.java | 9 ++--
.../protocol/rest/v1/impl/RestApiServiceImpl.java | 9 ++--
.../rest/v2/impl/GrafanaApiServiceImpl.java | 9 ++--
.../protocol/rest/v2/impl/RestApiServiceImpl.java | 12 +++--
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 51 ++++++++++++++++++----
.../impl/DataNodeInternalRPCServiceImpl.java | 3 +-
.../db/queryengine/common/MPPQueryContext.java | 14 +++---
.../iotdb/db/queryengine/plan/Coordinator.java | 37 +++++++++++++---
.../load/TreeSchemaAutoCreatorAndVerifier.java | 3 +-
.../analyze/schema/AutoCreateSchemaExecutor.java | 3 +-
.../analyze/schema/ClusterSchemaFetchExecutor.java | 3 +-
.../plan/execution/IQueryExecution.java | 2 +
.../queryengine/plan/execution/QueryExecution.java | 5 +++
.../plan/execution/config/ConfigExecution.java | 5 +++
.../config/executor/ClusterConfigTaskExecutor.java | 3 +-
.../metadata/fetcher/TableDeviceSchemaFetcher.java | 6 ++-
.../fetcher/TableDeviceSchemaValidator.java | 3 +-
.../load/active/ActiveLoadTsFileLoader.java | 3 +-
.../org/apache/iotdb/db/utils/CommonUtils.java | 13 ++++--
.../operator/MergeTreeSortOperatorTest.java | 5 +++
.../conf/iotdb-system.properties.template | 7 +++
.../apache/iotdb/commons/conf/CommonConfig.java | 39 +++++++++++++++++
.../apache/iotdb/commons/conf/IoTDBConstant.java | 2 +
30 files changed, 248 insertions(+), 53 deletions(-)
diff --git
a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml
b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml
index 698bdfc3a82..8902599cf8f 100644
--- a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml
+++ b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml
@@ -177,6 +177,21 @@
<level>INFO</level>
</filter>
</appender>
+ <appender class="ch.qos.logback.core.rolling.RollingFileAppender"
name="SAMPLED_QUERIES">
+ <file>${IOTDB_HOME}/logs/log_datanode_sampled_queries.log</file>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+
<fileNamePattern>${IOTDB_HOME}/logs/log-datanode-sampled-queries-%d{yyyyMMdd}.log.gz</fileNamePattern>
+ <maxHistory>30</maxHistory>
+ </rollingPolicy>
+ <append>true</append>
+ <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+ <pattern>%d %m %n</pattern>
+ <charset>utf-8</charset>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+ <level>INFO</level>
+ </filter>
+ </appender>
<appender class="ch.qos.logback.core.rolling.RollingFileAppender"
name="COMPACTION">
<file>${IOTDB_HOME}/logs/log_datanode_compaction.log</file>
<rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
@@ -245,6 +260,9 @@
<logger level="info" name="SLOW_SQL">
<appender-ref ref="SLOW_SQL"/>
</logger>
+ <logger level="info" name="SAMPLED_QUERIES">
+ <appender-ref ref="SAMPLED_QUERIES"/>
+ </logger>
<logger level="info" name="QUERY_FREQUENCY">
<appender-ref ref="QUERY_FREQUENCY"/>
</logger>
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8045ad3b648..6dfd5c85ab1 100755
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2882,6 +2882,28 @@ public class IoTDBDescriptor {
BinaryAllocator.getInstance().close(true);
}
+ // update query_sample_throughput_bytes_per_sec
+ String querySamplingRateLimitNumber =
+ Optional.ofNullable(
+ properties.getProperty(
+ "query_sample_throughput_bytes_per_sec",
+ ConfigurationFileUtils.getConfigurationDefaultValue(
+ "query_sample_throughput_bytes_per_sec")))
+ .map(String::trim)
+ .orElse(
+ ConfigurationFileUtils.getConfigurationDefaultValue(
+ "query_sample_throughput_bytes_per_sec"));
+ if (querySamplingRateLimitNumber != null) {
+ try {
+ int rateLimit = Integer.parseInt(querySamplingRateLimitNumber);
+ commonDescriptor.getConfig().setQuerySamplingRateLimit(rateLimit);
+ } catch (Exception e) {
+ LOGGER.warn(
+ "Failed to parse query_sample_throughput_bytes_per_sec {} to
integer",
+ querySamplingRateLimitNumber);
+ }
+ }
+
// update trusted_uri_pattern
String trustedUriPattern =
Optional.ofNullable(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
index daecab687f9..5bdc5de9205 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -145,7 +145,8 @@ public class IoTDBLegacyPipeReceiverAgent {
"",
partitionFetcher,
schemaFetcher,
-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
+ false);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()
&& result.status.code !=
TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java
index 48a06c88837..32f549866f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java
@@ -68,7 +68,8 @@ public class DeletionLoader implements ILoader {
"",
PARTITION_FETCHER,
SCHEMA_FETCHER,
-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
+ false);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error("Delete {} error, statement: {}.", deletion, statement);
LOGGER.error("Delete result status : {}.", result.status);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
index 9b9e0705f21..0e0612ce2ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java
@@ -70,7 +70,8 @@ public class TsFileLoader implements ILoader {
"",
PARTITION_FETCHER,
SCHEMA_FETCHER,
-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
+ false);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.error("Load TsFile {} error, statement: {}.", tsFile.getPath(),
statement);
LOGGER.error("Load TsFile result status : {}.", result.status);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index e7345a45732..fcfd9881cc9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -888,7 +888,8 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
"",
ClusterPartitionFetcher.getInstance(),
ClusterSchemaFetcher.getInstance(),
-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
+ false)
.status;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
index 3fc50278ff7..772f3672a14 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java
@@ -169,7 +169,8 @@ public class MPPPublishHandler extends
AbstractInterceptHandler {
"",
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ false);
tsStatus = result.status;
}
} catch (Exception e) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java
index 484fc5b4dca..904eabc1421 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java
@@ -117,7 +117,8 @@ public class GrafanaApiServiceImpl extends
GrafanaApiService {
sql.getSql(),
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ true);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
return Response.ok()
@@ -184,7 +185,8 @@ public class GrafanaApiServiceImpl extends
GrafanaApiService {
sql,
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ true);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
return Response.ok()
@@ -247,7 +249,8 @@ public class GrafanaApiServiceImpl extends
GrafanaApiService {
sql,
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ true);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
return Response.ok()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java
index 638ea109c9f..084b7cc53e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java
@@ -115,7 +115,8 @@ public class RestApiServiceImpl extends RestApiService {
sql.getSql(),
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ false);
finish = true;
return Response.ok()
.entity(
@@ -190,7 +191,8 @@ public class RestApiServiceImpl extends RestApiService {
sql.getSql(),
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ true);
finish = true;
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -266,7 +268,8 @@ public class RestApiServiceImpl extends RestApiService {
"",
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ false);
return Response.ok()
.entity(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java
index 4c89cd524a4..0c8478f31e8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java
@@ -117,7 +117,8 @@ public class GrafanaApiServiceImpl extends
GrafanaApiService {
sql.getSql(),
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ true);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
return Response.ok()
@@ -184,7 +185,8 @@ public class GrafanaApiServiceImpl extends
GrafanaApiService {
sql,
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ true);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
return Response.ok()
@@ -247,7 +249,8 @@ public class GrafanaApiServiceImpl extends
GrafanaApiService {
sql,
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ true);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
return Response.ok()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
index 018a9d90763..6028d9d257e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java
@@ -119,7 +119,8 @@ public class RestApiServiceImpl extends RestApiService {
sql.getSql(),
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ false);
finish = true;
return responseGenerateHelper(result);
} catch (Exception e) {
@@ -187,7 +188,8 @@ public class RestApiServiceImpl extends RestApiService {
sql.getSql(),
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ true);
finish = true;
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
@@ -252,7 +254,8 @@ public class RestApiServiceImpl extends RestApiService {
"",
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ false);
return responseGenerateHelper(result);
} catch (Exception e) {
@@ -307,7 +310,8 @@ public class RestApiServiceImpl extends RestApiService {
"",
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ false);
return responseGenerateHelper(result);
} catch (Exception e) {
return
Response.ok().entity(ExceptionHandler.tryCatchException(e)).build();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index c1ae9efd173..506b962daf5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -218,6 +218,7 @@ import static
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNod
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
+import static
org.apache.iotdb.db.utils.CommonUtils.getContentOfTSFastLastDataQueryForOneDeviceReq;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
@@ -230,6 +231,11 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
+
+ private static final Logger SAMPLED_QUERIES_LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.SAMPLED_QUERIES_LOGGER_NAME);
+
private static final Coordinator COORDINATOR = Coordinator.getInstance();
private static final SessionManager SESSION_MANAGER =
SessionManager.getInstance();
@@ -340,7 +346,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
statement,
partitionFetcher,
schemaFetcher,
- req.getTimeout());
+ req.getTimeout(),
+ true);
} else {
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement s =
relationSqlParser.createStatement(statement,
clientSession.getZoneId(), clientSession);
@@ -370,7 +377,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
SESSION_MANAGER.getSessionInfo(clientSession),
statement,
metadata,
- req.getTimeout());
+ req.getTimeout(),
+ true);
}
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
@@ -477,7 +485,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
"",
partitionFetcher,
schemaFetcher,
- req.getTimeout());
+ req.getTimeout(),
+ true);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
finished = true;
@@ -567,7 +576,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
"",
partitionFetcher,
schemaFetcher,
- req.getTimeout());
+ req.getTimeout(),
+ true);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
finished = true;
@@ -656,7 +666,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
"",
partitionFetcher,
schemaFetcher,
- req.getTimeout());
+ req.getTimeout(),
+ true);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
finished = true;
@@ -903,6 +914,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
resp.setQueryResult(Collections.emptyList());
finished = true;
resp.setMoreData(false);
+ sampleForCacheHitFastLastDataQueryForOneDevice(req);
return resp;
}
@@ -966,6 +978,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
}
finished = true;
resp.setMoreData(false);
+ sampleForCacheHitFastLastDataQueryForOneDevice(req);
return resp;
}
}
@@ -994,7 +1007,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
"",
partitionFetcher,
schemaFetcher,
- req.getTimeout());
+ req.getTimeout(),
+ true);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
finished = true;
@@ -1070,6 +1084,22 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
return tsLastDataQueryReq;
}
+ private static void sampleForCacheHitFastLastDataQueryForOneDevice(
+ TSFastLastDataQueryForOneDeviceReq req) {
+ // only sample successful query
+ if (COMMON_CONFIG.isEnableQuerySampling()) { // sampling is enabled
+ String queryRequest =
getContentOfTSFastLastDataQueryForOneDeviceReq(req);
+ if (COMMON_CONFIG.isQuerySamplingHasRateLimit()) {
+ if
(COMMON_CONFIG.getQuerySamplingRateLimiter().tryAcquire(queryRequest.length()))
{
+ SAMPLED_QUERIES_LOGGER.info(queryRequest);
+ }
+ } else {
+ // no limit, always sampled
+ SAMPLED_QUERIES_LOGGER.info(queryRequest);
+ }
+ }
+ }
+
@Override
public TSExecuteStatementResp
executeAggregationQueryV2(TSAggregationQueryReq req) {
return executeAggregationQueryInternal(req, SELECT_RESULT);
@@ -1668,7 +1698,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
statement,
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ false);
} else {
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement
s =
@@ -1699,7 +1730,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
SESSION_MANAGER.getSessionInfo(clientSession),
statement,
metadata,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ false);
}
results.add(result.status);
@@ -2597,7 +2629,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
null,
partitionFetcher,
schemaFetcher,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ true);
if (executionResult.status.code !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& executionResult.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index b1d2806cdc2..aa4b5ac09e0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1473,7 +1473,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
executedSQL,
partitionFetcher,
schemaFetcher,
- req.getTimeout());
+ req.getTimeout(),
+ false);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code !=
TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index f96f4e6b4f4..30007c6979c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -82,7 +82,7 @@ public class MPPQueryContext {
// constructing some Expression and PlanNode.
private final MemoryReservationManager memoryReservationManager;
- private boolean isTableQuery = false;
+ private boolean userQuery = false;
public MPPQueryContext(QueryId queryId) {
this.queryId = queryId;
@@ -346,15 +346,15 @@ public class MPPQueryContext {
// endregion
- public boolean isTableQuery() {
- return isTableQuery;
+ public Optional<String> getDatabaseName() {
+ return session.getDatabaseName();
}
- public void setTableQuery(boolean tableQuery) {
- isTableQuery = tableQuery;
+ public boolean isUserQuery() {
+ return userQuery;
}
- public Optional<String> getDatabaseName() {
- return session.getDatabaseName();
+ public void setUserQuery(boolean userQuery) {
+ this.userQuery = userQuery;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 05c00e5322e..9151ca14cba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -26,6 +26,8 @@ import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -121,10 +123,14 @@ public class Coordinator {
private static final Logger LOGGER =
LoggerFactory.getLogger(Coordinator.class);
private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10;
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
+ private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
private static final Logger SLOW_SQL_LOGGER =
LoggerFactory.getLogger(IoTDBConstant.SLOW_SQL_LOGGER_NAME);
+ private static final Logger SAMPLED_QUERIES_LOGGER =
+ LoggerFactory.getLogger(IoTDBConstant.SAMPLED_QUERIES_LOGGER_NAME);
+
private static final IClientManager<TEndPoint,
SyncDataNodeInternalServiceClient>
SYNC_INTERNAL_SERVICE_CLIENT_MANAGER =
new IClientManager.Factory<TEndPoint,
SyncDataNodeInternalServiceClient>()
@@ -180,6 +186,7 @@ public class Coordinator {
long queryId,
SessionInfo session,
String sql,
+ boolean userQuery,
BiFunction<MPPQueryContext, Long, IQueryExecution>
iQueryExecutionFactory) {
long startTime = System.currentTimeMillis();
QueryId globalQueryId = queryIdGenerator.createNextQueryId();
@@ -196,6 +203,7 @@ public class Coordinator {
session,
DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT,
DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT);
+ queryContext.setUserQuery(userQuery);
IQueryExecution execution = iQueryExecutionFactory.apply(queryContext,
startTime);
if (execution.isQuery()) {
queryExecutionMap.put(queryId, execution);
@@ -227,7 +235,7 @@ public class Coordinator {
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher) {
return executeForTreeModel(
- statement, queryId, session, sql, partitionFetcher, schemaFetcher,
Long.MAX_VALUE);
+ statement, queryId, session, sql, partitionFetcher, schemaFetcher,
Long.MAX_VALUE, false);
}
public ExecutionResult executeForTreeModel(
@@ -237,11 +245,13 @@ public class Coordinator {
String sql,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher,
- long timeOut) {
+ long timeOut,
+ boolean userQuery) {
return execution(
queryId,
session,
sql,
+ userQuery,
((queryContext, startTime) ->
createQueryExecutionForTreeModel(
statement,
@@ -290,11 +300,13 @@ public class Coordinator {
SessionInfo session,
String sql,
Metadata metadata,
- long timeOut) {
+ long timeOut,
+ boolean userQuery) {
return execution(
queryId,
session,
sql,
+ userQuery,
((queryContext, startTime) ->
createQueryExecutionForTableModel(
statement,
@@ -319,6 +331,7 @@ public class Coordinator {
queryId,
session,
sql,
+ false,
((queryContext, startTime) ->
createQueryExecutionForTableModel(
statement,
@@ -338,7 +351,6 @@ public class Coordinator {
Metadata metadata,
long timeOut,
long startTime) {
- queryContext.setTableQuery(true);
queryContext.setTimeOut(timeOut);
queryContext.setStartTime(startTime);
TableModelPlanner tableModelPlanner =
@@ -367,7 +379,6 @@ public class Coordinator {
final Metadata metadata,
final long timeOut,
final long startTime) {
- queryContext.setTableQuery(true);
queryContext.setTimeOut(timeOut);
queryContext.setStartTime(startTime);
if (statement instanceof DropDB
@@ -476,14 +487,28 @@ public class Coordinator {
LOGGER.debug("[CleanUpQuery]]");
queryExecution.stopAndCleanup(t);
queryExecutionMap.remove(queryId);
- if (queryExecution.isQuery()) {
+ if (queryExecution.isQuery() && queryExecution.isUserQuery()) {
long costTime = queryExecution.getTotalExecutionTime();
+ // print slow query
if (costTime / 1_000_000 >= CONFIG.getSlowQueryThreshold()) {
SLOW_SQL_LOGGER.info(
"Cost: {} ms, {}",
costTime / 1_000_000,
getContentOfRequest(nativeApiRequest, queryExecution));
}
+
+ // only sample successful query
+ if (t == null && COMMON_CONFIG.isEnableQuerySampling()) { //
sampling is enabled
+ String queryRequest = getContentOfRequest(nativeApiRequest,
queryExecution);
+ if (COMMON_CONFIG.isQuerySamplingHasRateLimit()) {
+ if
(COMMON_CONFIG.getQuerySamplingRateLimiter().tryAcquire(queryRequest.length()))
{
+ SAMPLED_QUERIES_LOGGER.info(queryRequest);
+ }
+ } else {
+ // no limit, always sampled
+ SAMPLED_QUERIES_LOGGER.info(queryRequest);
+ }
+ }
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
index 92c847b850c..4be632b33c9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/TreeSchemaAutoCreatorAndVerifier.java
@@ -331,7 +331,8 @@ public class TreeSchemaAutoCreatorAndVerifier {
"",
loadTsFileAnalyzer.partitionFetcher,
loadTsFileAnalyzer.schemaFetcher,
-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
+ false);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& result.status.code !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()
// In tree model, if the user creates a conflict database
concurrently, for instance, the
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
index 197e96016c5..83b9f1d389c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java
@@ -93,7 +93,8 @@ class AutoCreateSchemaExecutor {
schemaFetcher,
context == null || context.getQueryType().equals(QueryType.WRITE)
? config.getQueryTimeoutThreshold()
- : context.getTimeOut());
+ : context.getTimeOut(),
+ false);
}
// Auto create the missing measurements and merge them into given schemaTree
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index ed9f9b80f58..1c61bda425c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -91,7 +91,8 @@ class ClusterSchemaFetchExecutor {
sql,
ClusterPartitionFetcher.getInstance(),
schemaFetcher,
- timeout);
+ timeout,
+ false);
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
index f7c99b85f29..f1c3b96d2d3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
@@ -53,6 +53,8 @@ public interface IQueryExecution {
boolean isQuery();
+ boolean isUserQuery();
+
String getQueryId();
long getStartExecutionTime();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 87a02f16131..c1e6d725455 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -654,6 +654,11 @@ public class QueryExecution implements IQueryExecution {
return context.getQueryType() == QueryType.READ;
}
+ @Override
+ public boolean isUserQuery() {
+ return context.isUserQuery();
+ }
+
@Override
public String getQueryId() {
return context.getQueryId().getId();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
index 965ed7868bf..6023edf58cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
@@ -268,6 +268,11 @@ public class ConfigExecution implements IQueryExecution {
return context.getQueryType() == QueryType.READ;
}
+ @Override
+ public boolean isUserQuery() {
+ return context.isUserQuery();
+ }
+
@Override
public String getQueryId() {
return context.getQueryId().getId();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 4e73064e58d..3dc8af00917 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -2433,7 +2433,8 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
"",
ClusterPartitionFetcher.getInstance(),
ClusterSchemaFetcher.getInstance(),
-
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold());
+
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
+ false);
if (executionResult.status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
future.setException(
new IoTDBException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
index e8cc70a0aa2..c6d6fc3a372 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaFetcher.java
@@ -115,7 +115,8 @@ public class TableDeviceSchemaFetcher {
.getSessionInfoOfTableModel(SessionManager.getInstance().getCurrSession()),
"Fetch Device for insert",
LocalExecutionPlanner.getInstance().metadata,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ false);
if (executionResult.status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException(
@@ -393,7 +394,8 @@ public class TableDeviceSchemaFetcher {
"fetch device for query %s : %s",
mppQueryContext.getQueryId(), mppQueryContext.getSql()),
LocalExecutionPlanner.getInstance().metadata,
- config.getQueryTimeoutThreshold());
+ config.getQueryTimeoutThreshold(),
+ false);
if (executionResult.status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
index 558012e2ada..a823057c39e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
@@ -244,7 +244,8 @@ public class TableDeviceSchemaValidator {
LocalExecutionPlanner.getInstance().metadata,
context == null || context.getQueryType().equals(QueryType.WRITE)
? config.getQueryTimeoutThreshold()
- : context.getTimeOut());
+ : context.getTimeOut(),
+ false);
if (executionResult.status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new RuntimeException(
new IoTDBException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 2ab0c1ae64e..605218ae0a6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -206,7 +206,8 @@ public class ActiveLoadTsFileLoader {
"",
ClusterPartitionFetcher.getInstance(),
ClusterSchemaFetcher.getInstance(),
- IOTDB_CONFIG.getQueryTimeoutThreshold())
+ IOTDB_CONFIG.getQueryTimeoutThreshold(),
+ false)
.status;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index 3b3e6311164..158301759fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -330,10 +330,7 @@ public class CommonUtils {
req.getStartTime(), req.getEndTime(), req.getPathsSize(), sb);
} else if (request instanceof TSFastLastDataQueryForOneDeviceReq) {
TSFastLastDataQueryForOneDeviceReq req =
(TSFastLastDataQueryForOneDeviceReq) request;
- return String.format(
- "Request name: TSFastLastDataQueryForOneDeviceReq, "
- + "db: %s, deviceId: %s, sensorSize: %s, sensors: %s",
- req.getDb(), req.getDeviceId(), req.getSensorsSize(),
req.getSensors());
+ return getContentOfTSFastLastDataQueryForOneDeviceReq(req);
} else if (request instanceof TSFetchResultsReq) {
TSFetchResultsReq req = (TSFetchResultsReq) request;
StringBuilder sb = new StringBuilder();
@@ -353,6 +350,14 @@ public class CommonUtils {
}
}
+ public static String getContentOfTSFastLastDataQueryForOneDeviceReq(
+ TSFastLastDataQueryForOneDeviceReq req) {
+ return String.format(
+ "Request name: TSFastLastDataQueryForOneDeviceReq, "
+ + "db: %s, deviceId: %s, sensorSize: %s, sensors: %s",
+ req.getDb(), req.getDeviceId(), req.getSensorsSize(),
req.getSensors());
+ }
+
public static int runCli(
List<Class<? extends Runnable>> commands,
String[] args,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
index 486e7cbe2bd..63ae72506ae 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
@@ -1893,5 +1893,10 @@ public class MergeTreeSortOperatorTest {
public boolean isQuery() {
return false;
}
+
+ @Override
+ public boolean isUserQuery() {
+ return false;
+ }
}
}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index eff49a25301..c26b6f5d6ae 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -1027,6 +1027,13 @@ sort_buffer_size_in_bytes=1048576
# Datatype: int
merge_threshold_of_explain_analyze=10
+# The limit of query sampling throughput merge can reach per second
+# 0 means no queries will be recorded
+# negative number means no limit, all queries will be recorded.
+# effectiveMode: hot_reload
+# Datatype: int, Unit: bytes
+query_sample_throughput_bytes_per_sec=160
+
####################
### TTL Configuration
####################
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 8d30d0c6d80..713fa7ca68d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.rpc.RpcUtils;
+import com.google.common.util.concurrent.RateLimiter;
import org.apache.tsfile.fileSystem.FSType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -344,6 +345,14 @@ public class CommonConfig {
private volatile long remoteWriteMaxRetryDurationInMs = 60000;
+ private final RateLimiter querySamplingRateLimiter = RateLimiter.create(160);
+ // if querySamplingRateLimiter < 0, means that there is no rate limit, we
need to full sample all
+ // the queries
+ private volatile boolean querySamplingHasRateLimit = true;
+ // if querySamplingRateLimiter != 0, enableQuerySampling is true;
querySamplingRateLimiter = 0,
+ // enableQuerySampling is false
+ private volatile boolean enableQuerySampling = true;
+
private volatile Pattern trustedUriPattern = Pattern.compile("file:.*");
CommonConfig() {
@@ -1533,6 +1542,36 @@ public class CommonConfig {
this.log2SizeClassGroup = log2SizeClassGroup;
}
+ /**
+ * @param querySamplingRateLimit query_sample_throughput_bytes_per_sec
+ */
+ public void setQuerySamplingRateLimit(int querySamplingRateLimit) {
+ if (querySamplingRateLimit > 0) {
+ this.querySamplingRateLimiter.setRate(querySamplingRateLimit);
+ this.enableQuerySampling = true;
+ this.querySamplingHasRateLimit = true;
+ } else if (querySamplingRateLimit == 0) {
+ // querySamplingRateLimit = 0, means that we sample no queries
+ this.enableQuerySampling = false;
+ } else {
+ // querySamplingRateLimit < 0, means that we need to full sample all
queries
+ this.enableQuerySampling = true;
+ this.querySamplingHasRateLimit = false;
+ }
+ }
+
+ public boolean isQuerySamplingHasRateLimit() {
+ return querySamplingHasRateLimit;
+ }
+
+ public RateLimiter getQuerySamplingRateLimiter() {
+ return querySamplingRateLimiter;
+ }
+
+ public boolean isEnableQuerySampling() {
+ return enableQuerySampling;
+ }
+
public Pattern getTrustedUriPattern() {
return trustedUriPattern;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index c86900e5f99..de7aeae6e05 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -108,6 +108,8 @@ public class IoTDBConstant {
public static final String AUDIT_LOGGER_NAME = "IoTDB_AUDIT_LOGGER";
public static final String SLOW_SQL_LOGGER_NAME = "SLOW_SQL";
+ public static final String SAMPLED_QUERIES_LOGGER_NAME = "SAMPLED_QUERIES";
+
public static final String COMPACTION_LOGGER_NAME = "COMPACTION";
public static final String EXPLAIN_ANALYZE_LOGGER_NAME = "EXPLAIN_ANALYZE";