This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch slow_sql in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 16219bbe081e8e2d800397ad639030395f5588aa Author: Beyyes <[email protected]> AuthorDate: Mon Mar 11 20:45:39 2024 +0800 add native slow sql request --- .../main/java/org/apache/iotdb/SessionExample.java | 36 +++++++++++ .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 14 ++-- .../iotdb/db/queryengine/plan/Coordinator.java | 74 +++++++++++++++++++--- .../analyze/schema/ClusterSchemaFetchExecutor.java | 2 +- 5 files changed, 112 insertions(+), 18 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index d5f40452ea8..f1ae6d5f2c2 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -76,6 +76,42 @@ public class SessionExample { .build(); session.open(false); + // set session fetchSize + session.setFetchSize(10); + + SessionDataSet dataSet; + dataSet = + session.executeRawDataQuery( + Arrays.asList( + "root.sg2.d1.s1", + "root.sg2.d1.s2", + "root.sg2.d1.s3", + "root.sg2.d2.s1", + "root.sg2.d2.s2", + "root.sg2.d2.s3"), + 1L, + 1000000L); + System.out.println(dataSet.getColumnNames()); + dataSet.setFetchSize(1024); // default is 10000 + while (dataSet.hasNext()) { + System.out.println(dataSet.next()); + } + + dataSet = session.executeQueryStatement(SELECT_D1); + } + + public static void main1(String[] args) + throws IoTDBConnectionException, StatementExecutionException { + session = + new Session.Builder() + .host(LOCAL_HOST) + .port(6667) + .username("root") + .password("root") + .version(Version.V_1_0) + .build(); + session.open(false); + // set session fetchSize session.setFetchSize(10000); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index a9ec1ee5f5b..e77756fceaf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -810,7 +810,7 @@ public class IoTDBConfig { private int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY; /** time cost(ms) threshold for slow query. Unit: millisecond */ - private long slowQueryThreshold = 30000; + private long slowQueryThreshold = 50; private int patternMatchingThreshold = 1000000; @@ -1054,7 +1054,7 @@ public class IoTDBConfig { private long schemaRatisPeriodicSnapshotInterval = 24L * 60 * 60; // 24hr /** whether to enable the audit log * */ - private boolean enableAuditLog = false; + private boolean enableAuditLog = true; /** Output location of audit logs * */ private List<AuditLogStorage> auditLogStorage = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 03b28a97c6a..0c0a6091b3d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -354,7 +354,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(queryId); addQueryLatency(statementType, executionTime > 0 ? executionTime : currentOperationCost); - COORDINATOR.cleanupQueryExecution(queryId, t); + COORDINATOR.cleanupQueryExecution(queryId, req, t); } SESSION_MANAGER.updateIdleTime(); if (quota != null) { @@ -442,7 +442,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long executionTime = COORDINATOR.getTotalExecutionTime(queryId); addQueryLatency( StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); - COORDINATOR.cleanupQueryExecution(queryId, t); + COORDINATOR.cleanupQueryExecution(queryId, req, t); } SESSION_MANAGER.updateIdleTime(); @@ -532,7 +532,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long executionTime = COORDINATOR.getTotalExecutionTime(queryId); addQueryLatency( StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); - COORDINATOR.cleanupQueryExecution(queryId, t); + COORDINATOR.cleanupQueryExecution(queryId, req, t); } SESSION_MANAGER.updateIdleTime(); @@ -619,7 +619,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long executionTime = COORDINATOR.getTotalExecutionTime(queryId); addQueryLatency( StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); - COORDINATOR.cleanupQueryExecution(queryId, t); + COORDINATOR.cleanupQueryExecution(queryId, req, t); } SESSION_MANAGER.updateIdleTime(); @@ -953,7 +953,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long executionTime = COORDINATOR.getTotalExecutionTime(queryId); addQueryLatency( StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); - COORDINATOR.cleanupQueryExecution(queryId, t); + COORDINATOR.cleanupQueryExecution(queryId, req, t); } SESSION_MANAGER.updateIdleTime(); if (quota != null) { @@ -1103,7 +1103,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId); addQueryLatency( StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); - COORDINATOR.cleanupQueryExecution(req.queryId, t); + COORDINATOR.cleanupQueryExecution(req.queryId, req, t); } SESSION_MANAGER.updateIdleTime(); @@ -1622,7 +1622,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId); addQueryLatency( StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); - COORDINATOR.cleanupQueryExecution(req.queryId, t); + COORDINATOR.cleanupQueryExecution(req.queryId, req, t); } SESSION_MANAGER.updateIdleTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 327008e9c2a..890476c4cde 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -44,6 +44,11 @@ import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigExecution; import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.utils.SetThreadName; +import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq; +import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq; +import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; +import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq; +import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,7 +231,8 @@ public class Coordinator { return queryIdGenerator.createNextQueryId(); } - public void cleanupQueryExecution(Long queryId, Throwable t) { + public void cleanupQueryExecution( + Long queryId, org.apache.thrift.TBase nativeApiRequest, Throwable t) { IQueryExecution queryExecution = getQueryExecution(queryId); if (queryExecution != null) { try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) { @@ -235,19 +241,71 @@ public class Coordinator { queryExecutionMap.remove(queryId); if (queryExecution.isQuery()) { long costTime = queryExecution.getTotalExecutionTime(); - if (costTime / 1_000_000 >= CONFIG.getSlowQueryThreshold()) { - SLOW_SQL_LOGGER.info( - "Cost: {} ms, sql is {}", - costTime / 1_000_000, - queryExecution.getExecuteSQL().orElse("UNKNOWN")); - } + outputSlowSql(queryExecution, costTime, nativeApiRequest); } } } } + private void outputSlowSql( + IQueryExecution queryExecution, long costTime, org.apache.thrift.TBase request) { + if (costTime / 1_000_000 < CONFIG.getSlowQueryThreshold()) { + return; + } + + String slowContent = ""; + if (request == null || !queryExecution.getExecuteSQL().orElse("").isEmpty()) { + slowContent = queryExecution.getExecuteSQL().orElse("UNKNOWN"); + } else if (request instanceof TSRawDataQueryReq) { + TSRawDataQueryReq req = (TSRawDataQueryReq) request; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < Math.min(req.getPathsSize(), 10); i++) { + sb.append(i == 0 ? "" : ",").append(req.getPaths().get(i)); + } + slowContent = + String.format( + "Request name: TSRawDataQueryReq, paths size: %s, starTime: %s, " + + "endTime: %s, some paths: %s", + req.getPathsSize(), req.getStartTime(), req.getEndTime(), sb); + } else if (request instanceof TSLastDataQueryReq) { + TSLastDataQueryReq req = (TSLastDataQueryReq) request; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < Math.min(req.getPathsSize(), 10); i++) { + sb.append(i == 0 ? "" : ",").append(req.getPaths().get(i)); + } + slowContent = + String.format( + "Request name: TSLastDataQueryReq, paths size: %s, some paths: %s", + req.getPathsSize(), sb); + } else if (request instanceof TSAggregationQueryReq) { + TSAggregationQueryReq req = (TSAggregationQueryReq) request; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < Math.min(req.getPathsSize(), 10); i++) { + sb.append(i == 0 ? "" : ",").append(req.getPaths().get(i)); + } + slowContent = + String.format( + "Request name: TSAggregationQueryReq, paths size: %s, some paths: %s", + req.getPathsSize(), sb); + } else if (request instanceof TSFastLastDataQueryForOneDeviceReq) { + TSFastLastDataQueryForOneDeviceReq req = (TSFastLastDataQueryForOneDeviceReq) request; + slowContent = + String.format( + "Request name: TSFastLastDataQueryForOneDeviceReq, db: %s, deviceId: %s, sensorSize: %s, sensors: %s", + req.getDb(), req.getDeviceId(), req.getSensorsSize(), req.getSensors()); + } else if (request instanceof TSFetchResultsReq) { + TSFetchResultsReq req = (TSFetchResultsReq) request; + slowContent = + String.format( + "Request name: TSFetchResultsReq, statement: %s, fetchSize: %s", + req.getStatement(), req.getFetchSize()); + } + + SLOW_SQL_LOGGER.info("Cost: {} ms, {}", costTime / 1_000_000, slowContent); + } + public void cleanupQueryExecution(Long queryId) { - cleanupQueryExecution(queryId, null); + cleanupQueryExecution(queryId, null, null); } public IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java index 0adbca34ad5..4f0256f8e88 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -245,7 +245,7 @@ class ClusterSchemaFetchExecutor { t = throwable; throw throwable; } finally { - coordinator.cleanupQueryExecution(queryId, t); + coordinator.cleanupQueryExecution(queryId, null, t); } }
