This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch rc/1.3.1 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8866b04e09056dc9ddf77f4df7b24d0115d75994 Author: Beyyes <[email protected]> AuthorDate: Wed Mar 13 20:52:19 2024 +0800 Print native request api in datanode_slow_sql.log --- .../protocol/thrift/impl/ClientRPCServiceImpl.java | 27 +++++--- .../iotdb/db/queryengine/plan/Coordinator.java | 10 +-- .../analyze/schema/ClusterSchemaFetchExecutor.java | 2 +- .../org/apache/iotdb/db/utils/CommonUtils.java | 78 ++++++++++++++++++++++ 4 files changed, 101 insertions(+), 16 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 645159b75ae..93a21bb5931 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -198,6 +198,7 @@ import static org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED; import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator; +import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException; import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException; @@ -351,7 +352,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { // record total time cost for one query long executionTime = COORDINATOR.getTotalExecutionTime(queryId); addQueryLatency(statementType, executionTime > 0 ? executionTime : currentOperationCost); - COORDINATOR.cleanupQueryExecution(queryId, t); + COORDINATOR.cleanupQueryExecution(queryId, req, t); } SESSION_MANAGER.updateIdleTime(); if (quota != null) { @@ -439,7 +440,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long executionTime = COORDINATOR.getTotalExecutionTime(queryId); addQueryLatency( StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); - COORDINATOR.cleanupQueryExecution(queryId, t); + COORDINATOR.cleanupQueryExecution(queryId, req, t); } SESSION_MANAGER.updateIdleTime(); @@ -529,7 +530,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long executionTime = COORDINATOR.getTotalExecutionTime(queryId); addQueryLatency( StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); - COORDINATOR.cleanupQueryExecution(queryId, t); + COORDINATOR.cleanupQueryExecution(queryId, req, t); } SESSION_MANAGER.updateIdleTime(); @@ -616,7 +617,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long executionTime = COORDINATOR.getTotalExecutionTime(queryId); addQueryLatency( StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); - COORDINATOR.cleanupQueryExecution(queryId, t); + COORDINATOR.cleanupQueryExecution(queryId, req, t); } SESSION_MANAGER.updateIdleTime(); @@ -942,7 +943,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long executionTime = COORDINATOR.getTotalExecutionTime(queryId); addQueryLatency( StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); - COORDINATOR.cleanupQueryExecution(queryId, t); + COORDINATOR.cleanupQueryExecution(queryId, req, t); } SESSION_MANAGER.updateIdleTime(); if (quota != null) { @@ -1042,6 +1043,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { boolean finished = false; StatementType statementType = null; Throwable t = null; + IQueryExecution queryExecution = null; try { IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); if (!SESSION_MANAGER.checkLogin(clientSession)) { @@ -1049,7 +1051,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); - IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId); + queryExecution = COORDINATOR.getQueryExecution(req.queryId); if (queryExecution == null) { resp.setHasResultSet(false); @@ -1073,7 +1075,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } catch (Exception e) { finished = true; t = e; - return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS)); + return RpcUtils.getTSFetchResultsResp( + onQueryException(e, getContentOfRequest(req, queryExecution))); } catch (Error error) { finished = true; t = error; @@ -1092,7 +1095,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId); addQueryLatency( StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); - COORDINATOR.cleanupQueryExecution(req.queryId, t); + COORDINATOR.cleanupQueryExecution(req.queryId, req, t); } SESSION_MANAGER.updateIdleTime(); @@ -1562,6 +1565,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long startTime = System.nanoTime(); StatementType statementType = null; Throwable t = null; + IQueryExecution queryExecution = null; try { IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime(); if (!SESSION_MANAGER.checkLogin(clientSession)) { @@ -1570,7 +1574,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); - IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId); + queryExecution = COORDINATOR.getQueryExecution(req.queryId); if (queryExecution == null) { resp.setHasResultSet(false); resp.setMoreData(true); @@ -1593,7 +1597,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } catch (Exception e) { finished = true; t = e; - return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS)); + return RpcUtils.getTSFetchResultsResp( + onQueryException(e, getContentOfRequest(req, queryExecution))); } catch (Error error) { t = error; throw error; @@ -1611,7 +1616,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId); addQueryLatency( StatementType.QUERY, executionTime > 0 ? executionTime : currentOperationCost); - COORDINATOR.cleanupQueryExecution(req.queryId, t); + COORDINATOR.cleanupQueryExecution(req.queryId, req, t); } SESSION_MANAGER.updateIdleTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java index 6c8c2d15bf9..bda9e5a6acd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java @@ -55,6 +55,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import static org.apache.iotdb.commons.utils.StatusUtils.needRetry; +import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest; /** * The coordinator for MPP. It manages all the queries which are executed in current Node. And it @@ -225,7 +226,8 @@ public class Coordinator { return queryIdGenerator.createNextQueryId(); } - public void cleanupQueryExecution(Long queryId, Throwable t) { + public void cleanupQueryExecution( + Long queryId, org.apache.thrift.TBase<?, ?> nativeApiRequest, Throwable t) { IQueryExecution queryExecution = getQueryExecution(queryId); if (queryExecution != null) { try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) { @@ -236,9 +238,9 @@ public class Coordinator { long costTime = queryExecution.getTotalExecutionTime(); if (costTime / 1_000_000 >= CONFIG.getSlowQueryThreshold()) { SLOW_SQL_LOGGER.info( - "Cost: {} ms, sql is {}", + "Cost: {} ms, {}", costTime / 1_000_000, - queryExecution.getExecuteSQL().orElse("UNKNOWN")); + getContentOfRequest(nativeApiRequest, queryExecution)); } } } @@ -246,7 +248,7 @@ public class Coordinator { } public void cleanupQueryExecution(Long queryId) { - cleanupQueryExecution(queryId, null); + cleanupQueryExecution(queryId, null, null); } public IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java index 0adbca34ad5..4f0256f8e88 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -245,7 +245,7 @@ class ClusterSchemaFetchExecutor { t = throwable; throw throwable; } finally { - coordinator.cleanupQueryExecution(queryId, t); + coordinator.cleanupQueryExecution(queryId, null, t); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java index 4237ddc1fad..6796431e049 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java @@ -20,7 +20,13 @@ package org.apache.iotdb.db.utils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; import org.apache.iotdb.db.utils.constant.SqlConstant; +import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq; +import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq; +import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq; +import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq; +import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq; import org.apache.iotdb.tsfile.common.conf.TSFileConfig; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; @@ -44,6 +50,10 @@ import java.util.Objects; @SuppressWarnings("java:S106") // for console outputs public class CommonUtils { + private static final int MAX_SLOW_NATIVE_API_OUTPUT_NUM = 10; + + private static final String UNKNOWN_RESULT = "UNKNOWN"; + private CommonUtils() {} public static Object parseValue(TSDataType dataType, String value) throws QueryProcessException { @@ -232,6 +242,74 @@ public class CommonUtils { throw new QueryProcessException("The BOOLEAN should be true/TRUE, false/FALSE or 0/1"); } + public static String getContentOfRequest( + org.apache.thrift.TBase<?, ?> request, IQueryExecution queryExecution) { + if (queryExecution == null) { + return UNKNOWN_RESULT; + } + + String executeSql = queryExecution.getExecuteSQL().orElse(""); + if (!executeSql.isEmpty()) { + return executeSql; + } else if (request == null) { + return UNKNOWN_RESULT; + } else if (request instanceof TSRawDataQueryReq) { + TSRawDataQueryReq req = (TSRawDataQueryReq) request; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < Math.min(req.getPathsSize(), MAX_SLOW_NATIVE_API_OUTPUT_NUM); i++) { + sb.append(i == 0 ? "" : ",").append(req.getPaths().get(i)); + } + return String.format( + "Request name: TSRawDataQueryReq, paths size: %s, starTime: %s, " + + "endTime: %s, some paths: %s", + req.getPathsSize(), req.getStartTime(), req.getEndTime(), sb); + } else if (request instanceof TSLastDataQueryReq) { + TSLastDataQueryReq req = (TSLastDataQueryReq) request; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < Math.min(req.getPathsSize(), MAX_SLOW_NATIVE_API_OUTPUT_NUM); i++) { + sb.append(i == 0 ? "" : ",").append(req.getPaths().get(i)); + } + return String.format( + "Request name: TSLastDataQueryReq, paths size: %s, some paths: %s", + req.getPathsSize(), sb); + } else if (request instanceof TSAggregationQueryReq) { + TSAggregationQueryReq req = (TSAggregationQueryReq) request; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < Math.min(req.getPathsSize(), MAX_SLOW_NATIVE_API_OUTPUT_NUM); i++) { + sb.append(i == 0 ? "" : ",") + .append(req.getAggregations().get(i)) + .append(":") + .append(req.getPaths().get(i)); + } + return String.format( + "Request name: TSAggregationQueryReq, startTime: %s, endTime: %s, " + + "paths size: %s, some paths: %s", + req.getStartTime(), req.getEndTime(), req.getPathsSize(), sb); + } else if (request instanceof TSFastLastDataQueryForOneDeviceReq) { + TSFastLastDataQueryForOneDeviceReq req = (TSFastLastDataQueryForOneDeviceReq) request; + return String.format( + "Request name: TSFastLastDataQueryForOneDeviceReq, " + + "db: %s, deviceId: %s, sensorSize: %s, sensors: %s", + req.getDb(), req.getDeviceId(), req.getSensorsSize(), req.getSensors()); + } else if (request instanceof TSFetchResultsReq) { + TSFetchResultsReq req = (TSFetchResultsReq) request; + StringBuilder sb = new StringBuilder(); + for (int i = 0; + i < Math.min(queryExecution.getOutputValueColumnCount(), MAX_SLOW_NATIVE_API_OUTPUT_NUM); + i++) { + sb.append(i == 0 ? "" : ",") + .append(queryExecution.getDatasetHeader().getRespColumns().get(i)); + } + return String.format( + "Request name: TSFetchResultsReq, " + + "queryId: %s, output value column count: %s, fetchSize: %s, " + + "some response headers: %s", + req.getQueryId(), queryExecution.getOutputValueColumnCount(), req.getFetchSize(), sb); + } else { + return UNKNOWN_RESULT; + } + } + public static int runCli( List<Class<? extends Runnable>> commands, String[] args,
