This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch SampleQuery-1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit cd4d6126ee5164fa349ca965b46b1501c003e3f5 Author: Jackie Tien <[email protected]> AuthorDate: Fri Dec 13 08:33:09 2024 +0800 Sampling queries in each DN (cherry picked from commit 4c213313a9f16645fa2c1dd3d58e344b9ec7c167) --- .../assembly/resources/conf/logback-datanode.xml | 18 +++++++++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 22 +++++++++++ .../protocol/writeback/WriteBackConnector.java | 3 +- .../legacy/IoTDBLegacyPipeReceiverAgent.java | 3 +- .../protocol/legacy/loader/DeletionLoader.java | 3 +- .../protocol/legacy/loader/TsFileLoader.java | 3 +- .../protocol/thrift/IoTDBDataNodeReceiver.java | 3 +- .../iotdb/db/protocol/mqtt/MPPPublishHandler.java | 3 +- .../rest/v1/impl/GrafanaApiServiceImpl.java | 9 +++-- .../protocol/rest/v1/impl/RestApiServiceImpl.java | 9 +++-- .../rest/v2/impl/GrafanaApiServiceImpl.java | 9 +++-- .../protocol/rest/v2/impl/RestApiServiceImpl.java | 12 ++++-- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 45 ++++++++++++++++++---- .../impl/DataNodeInternalRPCServiceImpl.java | 3 +- .../db/queryengine/common/MPPQueryContext.java | 10 +++++ .../iotdb/db/queryengine/plan/Coordinator.java | 30 +++++++++++++-- .../plan/analyze/LoadTsFileAnalyzer.java | 3 +- .../analyze/schema/AutoCreateSchemaExecutor.java | 3 +- .../analyze/schema/ClusterSchemaFetchExecutor.java | 3 +- .../plan/execution/IQueryExecution.java | 2 + .../queryengine/plan/execution/QueryExecution.java | 5 +++ .../plan/execution/config/ConfigExecution.java | 5 +++ .../config/executor/ClusterConfigTaskExecutor.java | 3 +- .../load/active/ActiveLoadTsFileLoader.java | 3 +- .../org/apache/iotdb/db/utils/CommonUtils.java | 13 +++++-- .../execution/operator/MergeSortOperatorTest.java | 5 +++ .../conf/iotdb-system.properties.template | 7 ++++ .../apache/iotdb/commons/conf/CommonConfig.java | 39 +++++++++++++++++++ .../apache/iotdb/commons/conf/IoTDBConstant.java | 2 + 29 files changed, 239 insertions(+), 39 deletions(-) diff --git a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml index 698bdfc3a82..8902599cf8f 100644 --- a/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml +++ b/iotdb-core/datanode/src/assembly/resources/conf/logback-datanode.xml @@ -177,6 +177,21 @@ <level>INFO</level> </filter> </appender> + <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="SAMPLED_QUERIES"> + <file>${IOTDB_HOME}/logs/log_datanode_sampled_queries.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${IOTDB_HOME}/logs/log-datanode-sampled-queries-%d{yyyyMMdd}.log.gz</fileNamePattern> + <maxHistory>30</maxHistory> + </rollingPolicy> + <append>true</append> + <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> + <pattern>%d %m %n</pattern> + <charset>utf-8</charset> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>INFO</level> + </filter> + </appender> <appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="COMPACTION"> <file>${IOTDB_HOME}/logs/log_datanode_compaction.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> @@ -245,6 +260,9 @@ <logger level="info" name="SLOW_SQL"> <appender-ref ref="SLOW_SQL"/> </logger> + <logger level="info" name="SAMPLED_QUERIES"> + <appender-ref ref="SAMPLED_QUERIES"/> + </logger> <logger level="info" name="QUERY_FREQUENCY"> <appender-ref ref="QUERY_FREQUENCY"/> </logger> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 0052cc82140..930c005fc82 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -2005,6 +2005,28 @@ public class IoTDBDescriptor { BinaryAllocator.getInstance().close(true); } + // update query_sample_throughput_bytes_per_sec + String querySamplingRateLimitNumber = + Optional.ofNullable( + properties.getProperty( + "query_sample_throughput_bytes_per_sec", + ConfigurationFileUtils.getConfigurationDefaultValue( + "query_sample_throughput_bytes_per_sec"))) + .map(String::trim) + .orElse( + ConfigurationFileUtils.getConfigurationDefaultValue( + "query_sample_throughput_bytes_per_sec")); + if (querySamplingRateLimitNumber != null) { + try { + int rateLimit = Integer.parseInt(querySamplingRateLimitNumber); + commonDescriptor.getConfig().setQuerySamplingRateLimit(rateLimit); + } catch (Exception e) { + LOGGER.warn( + "Failed to parse query_sample_throughput_bytes_per_sec {} to integer", + querySamplingRateLimitNumber); + } + } + // update trusted_uri_pattern String trustedUriPattern = Optional.ofNullable( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java index 7889b778d86..7d28f8609ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/writeback/WriteBackConnector.java @@ -193,7 +193,8 @@ public class WriteBackConnector implements PipeConnector { "", ClusterPartitionFetcher.getInstance(), ClusterSchemaFetcher.getInstance(), - IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()) + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(), + false) .status; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java index 3fb132523d8..a67d3e55815 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java @@ -143,7 +143,8 @@ public class IoTDBLegacyPipeReceiverAgent { "", partitionFetcher, schemaFetcher, - IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()); + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(), + false); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { LOGGER.error( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java index 19dc268acec..24a69403457 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/DeletionLoader.java @@ -68,7 +68,8 @@ public class DeletionLoader implements ILoader { "", PARTITION_FETCHER, SCHEMA_FETCHER, - IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()); + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(), + false); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.error("Delete {} error, statement: {}.", deletion, statement); LOGGER.error("Delete result status : {}.", result.status); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java index 9b9e0705f21..0e0612ce2ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/loader/TsFileLoader.java @@ -70,7 +70,8 @@ public class TsFileLoader implements ILoader { "", PARTITION_FETCHER, SCHEMA_FETCHER, - IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()); + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(), + false); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.error("Load TsFile {} error, statement: {}.", tsFile.getPath(), statement); LOGGER.error("Load TsFile result status : {}.", result.status); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index b6fea173df2..2734a808eb5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -679,7 +679,8 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { "", ClusterPartitionFetcher.getInstance(), ClusterSchemaFetcher.getInstance(), - IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()) + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(), + false) .status; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java index 3fc50278ff7..772f3672a14 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java @@ -169,7 +169,8 @@ public class MPPPublishHandler extends AbstractInterceptHandler { "", partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + false); tsStatus = result.status; } } catch (Exception e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java index 07c735faa7f..bd955fd00da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/GrafanaApiServiceImpl.java @@ -117,7 +117,8 @@ public class GrafanaApiServiceImpl extends GrafanaApiService { sql.getSql(), partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { return Response.ok() @@ -184,7 +185,8 @@ public class GrafanaApiServiceImpl extends GrafanaApiService { sql, partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { return Response.ok() @@ -246,7 +248,8 @@ public class GrafanaApiServiceImpl extends GrafanaApiService { sql, partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { return Response.ok() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java index 638ea109c9f..084b7cc53e9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v1/impl/RestApiServiceImpl.java @@ -115,7 +115,8 @@ public class RestApiServiceImpl extends RestApiService { sql.getSql(), partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + false); finish = true; return Response.ok() .entity( @@ -190,7 +191,8 @@ public class RestApiServiceImpl extends RestApiService { sql.getSql(), partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + true); finish = true; if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { @@ -266,7 +268,8 @@ public class RestApiServiceImpl extends RestApiService { "", partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + false); return Response.ok() .entity( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java index e3385885a01..6a5ae13a0cb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/GrafanaApiServiceImpl.java @@ -117,7 +117,8 @@ public class GrafanaApiServiceImpl extends GrafanaApiService { sql.getSql(), partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { return Response.ok() @@ -184,7 +185,8 @@ public class GrafanaApiServiceImpl extends GrafanaApiService { sql, partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { return Response.ok() @@ -246,7 +248,8 @@ public class GrafanaApiServiceImpl extends GrafanaApiService { sql, partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { return Response.ok() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java index 018a9d90763..6028d9d257e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/impl/RestApiServiceImpl.java @@ -119,7 +119,8 @@ public class RestApiServiceImpl extends RestApiService { sql.getSql(), partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + false); finish = true; return responseGenerateHelper(result); } catch (Exception e) { @@ -187,7 +188,8 @@ public class RestApiServiceImpl extends RestApiService { sql.getSql(), partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + true); finish = true; if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { @@ -252,7 +254,8 @@ public class RestApiServiceImpl extends RestApiService { "", partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + false); return responseGenerateHelper(result); } catch (Exception e) { @@ -307,7 +310,8 @@ public class RestApiServiceImpl extends RestApiService { "", partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + false); return responseGenerateHelper(result); } catch (Exception e) { return Response.ok().entity(ExceptionHandler.tryCatchException(e)).build(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 20a80ce683d..4c2c4a4e654 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -208,6 +208,7 @@ import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNod import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest; +import static org.apache.iotdb.db.utils.CommonUtils.getContentOfTSFastLastDataQueryForOneDeviceReq; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; @@ -219,6 +220,11 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); + + private static final Logger SAMPLED_QUERIES_LOGGER = + LoggerFactory.getLogger(IoTDBConstant.SAMPLED_QUERIES_LOGGER_NAME); + private static final Coordinator COORDINATOR = Coordinator.getInstance(); private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); @@ -316,7 +322,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { statement, partitionFetcher, schemaFetcher, - req.getTimeout()); + req.getTimeout(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { @@ -410,7 +417,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { "", partitionFetcher, schemaFetcher, - req.getTimeout()); + req.getTimeout(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { finished = true; @@ -500,7 +508,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { "", partitionFetcher, schemaFetcher, - req.getTimeout()); + req.getTimeout(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { finished = true; @@ -589,7 +598,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { "", partitionFetcher, schemaFetcher, - req.getTimeout()); + req.getTimeout(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { finished = true; @@ -834,6 +844,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { resp.setQueryResult(Collections.emptyList()); finished = true; resp.setMoreData(false); + sampleForCacheHitFastLastDataQueryForOneDevice(req); return resp; } @@ -897,6 +908,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } finished = true; resp.setMoreData(false); + sampleForCacheHitFastLastDataQueryForOneDevice(req); return resp; } } @@ -925,7 +937,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { "", partitionFetcher, schemaFetcher, - req.getTimeout()); + req.getTimeout(), + true); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { finished = true; @@ -1001,6 +1014,22 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { return tsLastDataQueryReq; } + private static void sampleForCacheHitFastLastDataQueryForOneDevice( + TSFastLastDataQueryForOneDeviceReq req) { + // only sample successful query + if (COMMON_CONFIG.isEnableQuerySampling()) { // sampling is enabled + String queryRequest = getContentOfTSFastLastDataQueryForOneDeviceReq(req); + if (COMMON_CONFIG.isQuerySamplingHasRateLimit()) { + if (COMMON_CONFIG.getQuerySamplingRateLimiter().tryAcquire(queryRequest.length())) { + SAMPLED_QUERIES_LOGGER.info(queryRequest); + } + } else { + // no limit, always sampled + SAMPLED_QUERIES_LOGGER.info(queryRequest); + } + } + } + @Override public TSExecuteStatementResp executeAggregationQueryV2(TSAggregationQueryReq req) { return executeAggregationQueryInternal(req, SELECT_RESULT); @@ -1556,7 +1585,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { statement, partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + false); results.add(result.status); } catch (Exception e) { LOGGER.warn("Error occurred when executing executeBatchStatement: ", e); @@ -2410,7 +2440,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { null, partitionFetcher, schemaFetcher, - config.getQueryTimeoutThreshold()); + config.getQueryTimeoutThreshold(), + true); if (executionResult.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && executionResult.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index 3850bad073c..74d68128f99 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -1414,7 +1414,8 @@ public class DataNodeInternalRPCServiceImpl implements IDataNodeRPCService.Iface executedSQL, partitionFetcher, schemaFetcher, - req.getTimeout()); + req.getTimeout(), + false); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index ccd7536919f..1621327f9f7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -81,6 +81,8 @@ public class MPPQueryContext { // constructing some Expression and PlanNode. private final MemoryReservationManager memoryReservationManager; + private boolean userQuery = false; + public MPPQueryContext(QueryId queryId) { this.queryId = queryId; this.endPointBlackList = new LinkedList<>(); @@ -330,4 +332,12 @@ public class MPPQueryContext { } // endregion + + public boolean isUserQuery() { + return userQuery; + } + + public void setUserQuery(boolean userQuery) { + this.userQuery = userQuery; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index db56a6ec695..8561a6e4188 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -26,6 +26,8 @@ import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.concurrent.ThreadName; +import org.apache.iotdb.commons.conf.CommonConfig; +import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -70,10 +72,14 @@ public class Coordinator { private static final Logger LOGGER = LoggerFactory.getLogger(Coordinator.class); private static final int COORDINATOR_SCHEDULED_EXECUTOR_SIZE = 10; private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig(); + private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConfig(); private static final Logger SLOW_SQL_LOGGER = LoggerFactory.getLogger(IoTDBConstant.SLOW_SQL_LOGGER_NAME); + private static final Logger SAMPLED_QUERIES_LOGGER = + LoggerFactory.getLogger(IoTDBConstant.SAMPLED_QUERIES_LOGGER_NAME); + private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> SYNC_INTERNAL_SERVICE_CLIENT_MANAGER = new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>() @@ -108,6 +114,7 @@ public class Coordinator { long queryId, SessionInfo session, String sql, + boolean userQuery, BiFunction<MPPQueryContext, Long, IQueryExecution> iQueryExecutionFactory) { long startTime = System.currentTimeMillis(); QueryId globalQueryId = queryIdGenerator.createNextQueryId(); @@ -124,6 +131,7 @@ public class Coordinator { session, DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT, DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT); + queryContext.setUserQuery(userQuery); IQueryExecution execution = iQueryExecutionFactory.apply(queryContext, startTime); if (execution.isQuery()) { queryExecutionMap.put(queryId, execution); @@ -155,7 +163,7 @@ public class Coordinator { IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher) { return executeForTreeModel( - statement, queryId, session, sql, partitionFetcher, schemaFetcher, Long.MAX_VALUE); + statement, queryId, session, sql, partitionFetcher, schemaFetcher, Long.MAX_VALUE, false); } public ExecutionResult executeForTreeModel( @@ -165,11 +173,13 @@ public class Coordinator { String sql, IPartitionFetcher partitionFetcher, ISchemaFetcher schemaFetcher, - long timeOut) { + long timeOut, + boolean userQuery) { return execution( queryId, session, sql, + userQuery, ((queryContext, startTime) -> createQueryExecutionForTreeModel( statement, @@ -254,14 +264,28 @@ public class Coordinator { LOGGER.debug("[CleanUpQuery]]"); queryExecution.stopAndCleanup(t); queryExecutionMap.remove(queryId); - if (queryExecution.isQuery()) { + if (queryExecution.isQuery() && queryExecution.isUserQuery()) { long costTime = queryExecution.getTotalExecutionTime(); + // print slow query if (costTime / 1_000_000 >= CONFIG.getSlowQueryThreshold()) { SLOW_SQL_LOGGER.info( "Cost: {} ms, {}", costTime / 1_000_000, getContentOfRequest(nativeApiRequest, queryExecution)); } + + // only sample successful query + if (t == null && COMMON_CONFIG.isEnableQuerySampling()) { // sampling is enabled + String queryRequest = getContentOfRequest(nativeApiRequest, queryExecution); + if (COMMON_CONFIG.isQuerySamplingHasRateLimit()) { + if (COMMON_CONFIG.getQuerySamplingRateLimiter().tryAcquire(queryRequest.length())) { + SAMPLED_QUERIES_LOGGER.info(queryRequest); + } + } else { + // no limit, always sampled + SAMPLED_QUERIES_LOGGER.info(queryRequest); + } + } } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java index af8239c3104..97065145fe1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java @@ -539,7 +539,8 @@ public class LoadTsFileAnalyzer implements AutoCloseable { "", partitionFetcher, schemaFetcher, - IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()); + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(), + false); if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() && result.status.code != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { LOGGER.warn( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java index 608b78db0e5..38d0f8942e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/AutoCreateSchemaExecutor.java @@ -93,7 +93,8 @@ class AutoCreateSchemaExecutor { schemaFetcher, context == null || context.getQueryType().equals(QueryType.WRITE) ? config.getQueryTimeoutThreshold() - : context.getTimeOut()); + : context.getTimeOut(), + false); } // Auto create the missing measurements and merge them into given schemaTree diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java index d5b6b27f0a7..d62b5640d9d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -90,7 +90,8 @@ class ClusterSchemaFetchExecutor { sql, ClusterPartitionFetcher.getInstance(), schemaFetcher, - timeout); + timeout, + false); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java index b35123e8f70..5cb2d4b449c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java @@ -53,6 +53,8 @@ public interface IQueryExecution { boolean isQuery(); + boolean isUserQuery(); + String getQueryId(); long getStartExecutionTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index 8cb8ea48ea8..29dbd7e3492 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -629,6 +629,11 @@ public class QueryExecution implements IQueryExecution { return context.getQueryType() == QueryType.READ; } + @Override + public boolean isUserQuery() { + return context.isUserQuery(); + } + @Override public String getQueryId() { return context.getQueryId().getId(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java index c0886d91842..3076e87d5b4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java @@ -219,6 +219,11 @@ public class ConfigExecution implements IQueryExecution { return context.getQueryType() == QueryType.READ; } + @Override + public boolean isUserQuery() { + return context.isUserQuery(); + } + @Override public String getQueryId() { return context.getQueryId().getId(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index f9e824a87b6..80e2a716b43 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -2339,7 +2339,8 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor { "", ClusterPartitionFetcher.getInstance(), ClusterSchemaFetcher.getInstance(), - IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold()); + IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(), + false); if (executionResult.status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { future.setException( new IoTDBException( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java index 2ab0c1ae64e..605218ae0a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java @@ -206,7 +206,8 @@ public class ActiveLoadTsFileLoader { "", ClusterPartitionFetcher.getInstance(), ClusterSchemaFetcher.getInstance(), - IOTDB_CONFIG.getQueryTimeoutThreshold()) + IOTDB_CONFIG.getQueryTimeoutThreshold(), + false) .status; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java index e0819843cc2..261ca957492 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java @@ -324,10 +324,7 @@ public class CommonUtils { req.getStartTime(), req.getEndTime(), req.getPathsSize(), sb); } else if (request instanceof TSFastLastDataQueryForOneDeviceReq) { TSFastLastDataQueryForOneDeviceReq req = (TSFastLastDataQueryForOneDeviceReq) request; - return String.format( - "Request name: TSFastLastDataQueryForOneDeviceReq, " - + "db: %s, deviceId: %s, sensorSize: %s, sensors: %s", - req.getDb(), req.getDeviceId(), req.getSensorsSize(), req.getSensors()); + return getContentOfTSFastLastDataQueryForOneDeviceReq(req); } else if (request instanceof TSFetchResultsReq) { TSFetchResultsReq req = (TSFetchResultsReq) request; StringBuilder sb = new StringBuilder(); @@ -347,6 +344,14 @@ public class CommonUtils { } } + public static String getContentOfTSFastLastDataQueryForOneDeviceReq( + TSFastLastDataQueryForOneDeviceReq req) { + return String.format( + "Request name: TSFastLastDataQueryForOneDeviceReq, " + + "db: %s, deviceId: %s, sensorSize: %s, sensors: %s", + req.getDb(), req.getDeviceId(), req.getSensorsSize(), req.getSensors()); + } + public static int runCli( List<Class<? extends Runnable>> commands, String[] args, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java index e1a6c59c9d6..604d3eb3442 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeSortOperatorTest.java @@ -1862,5 +1862,10 @@ public class MergeSortOperatorTest { public boolean isQuery() { return false; } + + @Override + public boolean isUserQuery() { + return false; + } } } diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 6253e50c85c..d6ca096864f 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -1012,6 +1012,13 @@ sort_buffer_size_in_bytes=1048576 # Datatype: int merge_threshold_of_explain_analyze=10 +# The limit of query sampling throughput merge can reach per second +# 0 means no queries will be recorded +# negative number means no limit, all queries will be recorded. +# effectiveMode: hot_reload +# Datatype: int, Unit: bytes +query_sample_throughput_bytes_per_sec=160 + #################### ### TTL Configuration #################### diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index 8d30d0c6d80..713fa7ca68d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -28,6 +28,7 @@ import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.commons.utils.KillPoint.KillPoint; import org.apache.iotdb.rpc.RpcUtils; +import com.google.common.util.concurrent.RateLimiter; import org.apache.tsfile.fileSystem.FSType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -344,6 +345,14 @@ public class CommonConfig { private volatile long remoteWriteMaxRetryDurationInMs = 60000; + private final RateLimiter querySamplingRateLimiter = RateLimiter.create(160); + // if querySamplingRateLimiter < 0, means that there is no rate limit, we need to full sample all + // the queries + private volatile boolean querySamplingHasRateLimit = true; + // if querySamplingRateLimiter != 0, enableQuerySampling is true; querySamplingRateLimiter = 0, + // enableQuerySampling is false + private volatile boolean enableQuerySampling = true; + private volatile Pattern trustedUriPattern = Pattern.compile("file:.*"); CommonConfig() { @@ -1533,6 +1542,36 @@ public class CommonConfig { this.log2SizeClassGroup = log2SizeClassGroup; } + /** + * @param querySamplingRateLimit query_sample_throughput_bytes_per_sec + */ + public void setQuerySamplingRateLimit(int querySamplingRateLimit) { + if (querySamplingRateLimit > 0) { + this.querySamplingRateLimiter.setRate(querySamplingRateLimit); + this.enableQuerySampling = true; + this.querySamplingHasRateLimit = true; + } else if (querySamplingRateLimit == 0) { + // querySamplingRateLimit = 0, means that we sample no queries + this.enableQuerySampling = false; + } else { + // querySamplingRateLimit < 0, means that we need to full sample all queries + this.enableQuerySampling = true; + this.querySamplingHasRateLimit = false; + } + } + + public boolean isQuerySamplingHasRateLimit() { + return querySamplingHasRateLimit; + } + + public RateLimiter getQuerySamplingRateLimiter() { + return querySamplingRateLimiter; + } + + public boolean isEnableQuerySampling() { + return enableQuerySampling; + } + public Pattern getTrustedUriPattern() { return trustedUriPattern; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java index cbcedd497ad..18ac41409ba 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java @@ -108,6 +108,8 @@ public class IoTDBConstant { public static final String AUDIT_LOGGER_NAME = "IoTDB_AUDIT_LOGGER"; public static final String SLOW_SQL_LOGGER_NAME = "SLOW_SQL"; + public static final String SAMPLED_QUERIES_LOGGER_NAME = "SAMPLED_QUERIES"; + public static final String COMPACTION_LOGGER_NAME = "COMPACTION"; public static final String EXPLAIN_ANALYZE_LOGGER_NAME = "EXPLAIN_ANALYZE";
