This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new de5405fef83 Print native request api in datanode_slow_sql.log
de5405fef83 is described below
commit de5405fef83e76a08982b7607a3463f27d058c55
Author: Beyyes <[email protected]>
AuthorDate: Wed Mar 13 20:52:19 2024 +0800
Print native request api in datanode_slow_sql.log
---
.../protocol/thrift/impl/ClientRPCServiceImpl.java | 27 +++++---
.../iotdb/db/queryengine/plan/Coordinator.java | 10 +--
.../analyze/schema/ClusterSchemaFetchExecutor.java | 2 +-
.../org/apache/iotdb/db/utils/CommonUtils.java | 78 ++++++++++++++++++++++
4 files changed, 101 insertions(+), 16 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 5612d8777e1..6d20d6f7d49 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -201,6 +201,7 @@ import static
org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
import static
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
import static
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
@@ -354,7 +355,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
// record total time cost for one query
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
addQueryLatency(statementType, executionTime > 0 ? executionTime :
currentOperationCost);
- COORDINATOR.cleanupQueryExecution(queryId, t);
+ COORDINATOR.cleanupQueryExecution(queryId, req, t);
}
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
@@ -442,7 +443,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime :
currentOperationCost);
- COORDINATOR.cleanupQueryExecution(queryId, t);
+ COORDINATOR.cleanupQueryExecution(queryId, req, t);
}
SESSION_MANAGER.updateIdleTime();
@@ -532,7 +533,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime :
currentOperationCost);
- COORDINATOR.cleanupQueryExecution(queryId, t);
+ COORDINATOR.cleanupQueryExecution(queryId, req, t);
}
SESSION_MANAGER.updateIdleTime();
@@ -619,7 +620,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime :
currentOperationCost);
- COORDINATOR.cleanupQueryExecution(queryId, t);
+ COORDINATOR.cleanupQueryExecution(queryId, req, t);
}
SESSION_MANAGER.updateIdleTime();
@@ -953,7 +954,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime :
currentOperationCost);
- COORDINATOR.cleanupQueryExecution(queryId, t);
+ COORDINATOR.cleanupQueryExecution(queryId, req, t);
}
SESSION_MANAGER.updateIdleTime();
if (quota != null) {
@@ -1053,6 +1054,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
boolean finished = false;
String statementType = null;
Throwable t = null;
+ IQueryExecution queryExecution = null;
try {
IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
if (!SESSION_MANAGER.checkLogin(clientSession)) {
@@ -1060,7 +1062,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
}
TSFetchResultsResp resp =
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
- IQueryExecution queryExecution =
COORDINATOR.getQueryExecution(req.queryId);
+ queryExecution = COORDINATOR.getQueryExecution(req.queryId);
if (queryExecution == null) {
resp.setHasResultSet(false);
@@ -1084,7 +1086,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
} catch (Exception e) {
finished = true;
t = e;
- return RpcUtils.getTSFetchResultsResp(onQueryException(e,
OperationType.FETCH_RESULTS));
+ return RpcUtils.getTSFetchResultsResp(
+ onQueryException(e, getContentOfRequest(req, queryExecution)));
} catch (Error error) {
finished = true;
t = error;
@@ -1103,7 +1106,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId);
addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime :
currentOperationCost);
- COORDINATOR.cleanupQueryExecution(req.queryId, t);
+ COORDINATOR.cleanupQueryExecution(req.queryId, req, t);
}
SESSION_MANAGER.updateIdleTime();
@@ -1573,6 +1576,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
long startTime = System.nanoTime();
String statementType = null;
Throwable t = null;
+ IQueryExecution queryExecution = null;
try {
IClientSession clientSession =
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
if (!SESSION_MANAGER.checkLogin(clientSession)) {
@@ -1581,7 +1585,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
TSFetchResultsResp resp =
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
- IQueryExecution queryExecution =
COORDINATOR.getQueryExecution(req.queryId);
+ queryExecution = COORDINATOR.getQueryExecution(req.queryId);
if (queryExecution == null) {
resp.setHasResultSet(false);
resp.setMoreData(true);
@@ -1604,7 +1608,8 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
} catch (Exception e) {
finished = true;
t = e;
- return RpcUtils.getTSFetchResultsResp(onQueryException(e,
OperationType.FETCH_RESULTS));
+ return RpcUtils.getTSFetchResultsResp(
+ onQueryException(e, getContentOfRequest(req, queryExecution)));
} catch (Error error) {
t = error;
throw error;
@@ -1622,7 +1627,7 @@ public class ClientRPCServiceImpl implements
IClientRPCServiceWithHandler {
long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId);
addQueryLatency(
StatementType.QUERY, executionTime > 0 ? executionTime :
currentOperationCost);
- COORDINATOR.cleanupQueryExecution(req.queryId, t);
+ COORDINATOR.cleanupQueryExecution(req.queryId, req, t);
}
SESSION_MANAGER.updateIdleTime();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 8e9cedf2862..d68f022c6dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -58,6 +58,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;
import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
+import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
/**
* The coordinator for MPP. It manages all the queries which are executed in
current Node. And it
@@ -247,7 +248,8 @@ public class Coordinator {
return queryIdGenerator.createNextQueryId();
}
- public void cleanupQueryExecution(Long queryId, Throwable t) {
+ public void cleanupQueryExecution(
+ Long queryId, org.apache.thrift.TBase<?, ?> nativeApiRequest, Throwable
t) {
IQueryExecution queryExecution = getQueryExecution(queryId);
if (queryExecution != null) {
try (SetThreadName threadName = new
SetThreadName(queryExecution.getQueryId())) {
@@ -258,9 +260,9 @@ public class Coordinator {
long costTime = queryExecution.getTotalExecutionTime();
if (costTime / 1_000_000 >= CONFIG.getSlowQueryThreshold()) {
SLOW_SQL_LOGGER.info(
- "Cost: {} ms, sql is {}",
+ "Cost: {} ms, {}",
costTime / 1_000_000,
- queryExecution.getExecuteSQL().orElse("UNKNOWN"));
+ getContentOfRequest(nativeApiRequest, queryExecution));
}
}
}
@@ -268,7 +270,7 @@ public class Coordinator {
}
public void cleanupQueryExecution(Long queryId) {
- cleanupQueryExecution(queryId, null);
+ cleanupQueryExecution(queryId, null, null);
}
public IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 75e087307ff..e9f2cdf307d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -245,7 +245,7 @@ class ClusterSchemaFetchExecutor {
t = throwable;
throw throwable;
} finally {
- coordinator.cleanupQueryExecution(queryId, t);
+ coordinator.cleanupQueryExecution(queryId, null, t);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index 4237ddc1fad..6796431e049 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -20,7 +20,13 @@ package org.apache.iotdb.db.utils;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
+import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
+import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -44,6 +50,10 @@ import java.util.Objects;
@SuppressWarnings("java:S106") // for console outputs
public class CommonUtils {
+ private static final int MAX_SLOW_NATIVE_API_OUTPUT_NUM = 10;
+
+ private static final String UNKNOWN_RESULT = "UNKNOWN";
+
private CommonUtils() {}
public static Object parseValue(TSDataType dataType, String value) throws
QueryProcessException {
@@ -232,6 +242,74 @@ public class CommonUtils {
throw new QueryProcessException("The BOOLEAN should be true/TRUE,
false/FALSE or 0/1");
}
+ public static String getContentOfRequest(
+ org.apache.thrift.TBase<?, ?> request, IQueryExecution queryExecution) {
+ if (queryExecution == null) {
+ return UNKNOWN_RESULT;
+ }
+
+ String executeSql = queryExecution.getExecuteSQL().orElse("");
+ if (!executeSql.isEmpty()) {
+ return executeSql;
+ } else if (request == null) {
+ return UNKNOWN_RESULT;
+ } else if (request instanceof TSRawDataQueryReq) {
+ TSRawDataQueryReq req = (TSRawDataQueryReq) request;
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < Math.min(req.getPathsSize(),
MAX_SLOW_NATIVE_API_OUTPUT_NUM); i++) {
+ sb.append(i == 0 ? "" : ",").append(req.getPaths().get(i));
+ }
+ return String.format(
+ "Request name: TSRawDataQueryReq, paths size: %s, starTime: %s, "
+ + "endTime: %s, some paths: %s",
+ req.getPathsSize(), req.getStartTime(), req.getEndTime(), sb);
+ } else if (request instanceof TSLastDataQueryReq) {
+ TSLastDataQueryReq req = (TSLastDataQueryReq) request;
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < Math.min(req.getPathsSize(),
MAX_SLOW_NATIVE_API_OUTPUT_NUM); i++) {
+ sb.append(i == 0 ? "" : ",").append(req.getPaths().get(i));
+ }
+ return String.format(
+ "Request name: TSLastDataQueryReq, paths size: %s, some paths: %s",
+ req.getPathsSize(), sb);
+ } else if (request instanceof TSAggregationQueryReq) {
+ TSAggregationQueryReq req = (TSAggregationQueryReq) request;
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < Math.min(req.getPathsSize(),
MAX_SLOW_NATIVE_API_OUTPUT_NUM); i++) {
+ sb.append(i == 0 ? "" : ",")
+ .append(req.getAggregations().get(i))
+ .append(":")
+ .append(req.getPaths().get(i));
+ }
+ return String.format(
+ "Request name: TSAggregationQueryReq, startTime: %s, endTime: %s, "
+ + "paths size: %s, some paths: %s",
+ req.getStartTime(), req.getEndTime(), req.getPathsSize(), sb);
+ } else if (request instanceof TSFastLastDataQueryForOneDeviceReq) {
+ TSFastLastDataQueryForOneDeviceReq req =
(TSFastLastDataQueryForOneDeviceReq) request;
+ return String.format(
+ "Request name: TSFastLastDataQueryForOneDeviceReq, "
+ + "db: %s, deviceId: %s, sensorSize: %s, sensors: %s",
+ req.getDb(), req.getDeviceId(), req.getSensorsSize(),
req.getSensors());
+ } else if (request instanceof TSFetchResultsReq) {
+ TSFetchResultsReq req = (TSFetchResultsReq) request;
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0;
+ i < Math.min(queryExecution.getOutputValueColumnCount(),
MAX_SLOW_NATIVE_API_OUTPUT_NUM);
+ i++) {
+ sb.append(i == 0 ? "" : ",")
+ .append(queryExecution.getDatasetHeader().getRespColumns().get(i));
+ }
+ return String.format(
+ "Request name: TSFetchResultsReq, "
+ + "queryId: %s, output value column count: %s, fetchSize: %s, "
+ + "some response headers: %s",
+ req.getQueryId(), queryExecution.getOutputValueColumnCount(),
req.getFetchSize(), sb);
+ } else {
+ return UNKNOWN_RESULT;
+ }
+ }
+
public static int runCli(
List<Class<? extends Runnable>> commands,
String[] args,