This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rc/1.3.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8866b04e09056dc9ddf77f4df7b24d0115d75994
Author: Beyyes <[email protected]>
AuthorDate: Wed Mar 13 20:52:19 2024 +0800

    Print native request api in datanode_slow_sql.log
---
 .../protocol/thrift/impl/ClientRPCServiceImpl.java | 27 +++++---
 .../iotdb/db/queryengine/plan/Coordinator.java     | 10 +--
 .../analyze/schema/ClusterSchemaFetchExecutor.java |  2 +-
 .../org/apache/iotdb/db/utils/CommonUtils.java     | 78 ++++++++++++++++++++++
 4 files changed, 101 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 645159b75ae..93a21bb5931 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -198,6 +198,7 @@ import static 
org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
 import static 
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
 import static 
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
@@ -351,7 +352,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         // record total time cost for one query
         long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
         addQueryLatency(statementType, executionTime > 0 ? executionTime : 
currentOperationCost);
-        COORDINATOR.cleanupQueryExecution(queryId, t);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
       }
       SESSION_MANAGER.updateIdleTime();
       if (quota != null) {
@@ -439,7 +440,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
         addQueryLatency(
             StatementType.QUERY, executionTime > 0 ? executionTime : 
currentOperationCost);
-        COORDINATOR.cleanupQueryExecution(queryId, t);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
       }
 
       SESSION_MANAGER.updateIdleTime();
@@ -529,7 +530,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
         addQueryLatency(
             StatementType.QUERY, executionTime > 0 ? executionTime : 
currentOperationCost);
-        COORDINATOR.cleanupQueryExecution(queryId, t);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
       }
 
       SESSION_MANAGER.updateIdleTime();
@@ -616,7 +617,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
         addQueryLatency(
             StatementType.QUERY, executionTime > 0 ? executionTime : 
currentOperationCost);
-        COORDINATOR.cleanupQueryExecution(queryId, t);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
       }
 
       SESSION_MANAGER.updateIdleTime();
@@ -942,7 +943,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
         addQueryLatency(
             StatementType.QUERY, executionTime > 0 ? executionTime : 
currentOperationCost);
-        COORDINATOR.cleanupQueryExecution(queryId, t);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
       }
       SESSION_MANAGER.updateIdleTime();
       if (quota != null) {
@@ -1042,6 +1043,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     boolean finished = false;
     StatementType statementType = null;
     Throwable t = null;
+    IQueryExecution queryExecution = null;
     try {
       IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
       if (!SESSION_MANAGER.checkLogin(clientSession)) {
@@ -1049,7 +1051,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       }
       TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
 
-      IQueryExecution queryExecution = 
COORDINATOR.getQueryExecution(req.queryId);
+      queryExecution = COORDINATOR.getQueryExecution(req.queryId);
 
       if (queryExecution == null) {
         resp.setHasResultSet(false);
@@ -1073,7 +1075,8 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     } catch (Exception e) {
       finished = true;
       t = e;
-      return RpcUtils.getTSFetchResultsResp(onQueryException(e, 
OperationType.FETCH_RESULTS));
+      return RpcUtils.getTSFetchResultsResp(
+          onQueryException(e, getContentOfRequest(req, queryExecution)));
     } catch (Error error) {
       finished = true;
       t = error;
@@ -1092,7 +1095,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId);
         addQueryLatency(
             StatementType.QUERY, executionTime > 0 ? executionTime : 
currentOperationCost);
-        COORDINATOR.cleanupQueryExecution(req.queryId, t);
+        COORDINATOR.cleanupQueryExecution(req.queryId, req, t);
       }
 
       SESSION_MANAGER.updateIdleTime();
@@ -1562,6 +1565,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     long startTime = System.nanoTime();
     StatementType statementType = null;
     Throwable t = null;
+    IQueryExecution queryExecution = null;
     try {
       IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
       if (!SESSION_MANAGER.checkLogin(clientSession)) {
@@ -1570,7 +1574,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
       TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
 
-      IQueryExecution queryExecution = 
COORDINATOR.getQueryExecution(req.queryId);
+      queryExecution = COORDINATOR.getQueryExecution(req.queryId);
       if (queryExecution == null) {
         resp.setHasResultSet(false);
         resp.setMoreData(true);
@@ -1593,7 +1597,8 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     } catch (Exception e) {
       finished = true;
       t = e;
-      return RpcUtils.getTSFetchResultsResp(onQueryException(e, 
OperationType.FETCH_RESULTS));
+      return RpcUtils.getTSFetchResultsResp(
+          onQueryException(e, getContentOfRequest(req, queryExecution)));
     } catch (Error error) {
       t = error;
       throw error;
@@ -1611,7 +1616,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId);
         addQueryLatency(
             StatementType.QUERY, executionTime > 0 ? executionTime : 
currentOperationCost);
-        COORDINATOR.cleanupQueryExecution(req.queryId, t);
+        COORDINATOR.cleanupQueryExecution(req.queryId, req, t);
       }
 
       SESSION_MANAGER.updateIdleTime();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 6c8c2d15bf9..bda9e5a6acd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -55,6 +55,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 
 import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
+import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
 
 /**
  * The coordinator for MPP. It manages all the queries which are executed in 
current Node. And it
@@ -225,7 +226,8 @@ public class Coordinator {
     return queryIdGenerator.createNextQueryId();
   }
 
-  public void cleanupQueryExecution(Long queryId, Throwable t) {
+  public void cleanupQueryExecution(
+      Long queryId, org.apache.thrift.TBase<?, ?> nativeApiRequest, Throwable 
t) {
     IQueryExecution queryExecution = getQueryExecution(queryId);
     if (queryExecution != null) {
       try (SetThreadName threadName = new 
SetThreadName(queryExecution.getQueryId())) {
@@ -236,9 +238,9 @@ public class Coordinator {
           long costTime = queryExecution.getTotalExecutionTime();
           if (costTime / 1_000_000 >= CONFIG.getSlowQueryThreshold()) {
             SLOW_SQL_LOGGER.info(
-                "Cost: {} ms, sql is {}",
+                "Cost: {} ms, {}",
                 costTime / 1_000_000,
-                queryExecution.getExecuteSQL().orElse("UNKNOWN"));
+                getContentOfRequest(nativeApiRequest, queryExecution));
           }
         }
       }
@@ -246,7 +248,7 @@ public class Coordinator {
   }
 
   public void cleanupQueryExecution(Long queryId) {
-    cleanupQueryExecution(queryId, null);
+    cleanupQueryExecution(queryId, null, null);
   }
 
   public IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 0adbca34ad5..4f0256f8e88 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -245,7 +245,7 @@ class ClusterSchemaFetchExecutor {
       t = throwable;
       throw throwable;
     } finally {
-      coordinator.cleanupQueryExecution(queryId, t);
+      coordinator.cleanupQueryExecution(queryId, null, t);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index 4237ddc1fad..6796431e049 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -20,7 +20,13 @@ package org.apache.iotdb.db.utils;
 
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
+import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
+import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -44,6 +50,10 @@ import java.util.Objects;
 @SuppressWarnings("java:S106") // for console outputs
 public class CommonUtils {
 
+  private static final int MAX_SLOW_NATIVE_API_OUTPUT_NUM = 10;
+
+  private static final String UNKNOWN_RESULT = "UNKNOWN";
+
   private CommonUtils() {}
 
   public static Object parseValue(TSDataType dataType, String value) throws 
QueryProcessException {
@@ -232,6 +242,74 @@ public class CommonUtils {
     throw new QueryProcessException("The BOOLEAN should be true/TRUE, 
false/FALSE or 0/1");
   }
 
+  public static String getContentOfRequest(
+      org.apache.thrift.TBase<?, ?> request, IQueryExecution queryExecution) {
+    if (queryExecution == null) {
+      return UNKNOWN_RESULT;
+    }
+
+    String executeSql = queryExecution.getExecuteSQL().orElse("");
+    if (!executeSql.isEmpty()) {
+      return executeSql;
+    } else if (request == null) {
+      return UNKNOWN_RESULT;
+    } else if (request instanceof TSRawDataQueryReq) {
+      TSRawDataQueryReq req = (TSRawDataQueryReq) request;
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < Math.min(req.getPathsSize(), 
MAX_SLOW_NATIVE_API_OUTPUT_NUM); i++) {
+        sb.append(i == 0 ? "" : ",").append(req.getPaths().get(i));
+      }
+      return String.format(
+          "Request name: TSRawDataQueryReq, paths size: %s, starTime: %s, "
+              + "endTime: %s, some paths: %s",
+          req.getPathsSize(), req.getStartTime(), req.getEndTime(), sb);
+    } else if (request instanceof TSLastDataQueryReq) {
+      TSLastDataQueryReq req = (TSLastDataQueryReq) request;
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < Math.min(req.getPathsSize(), 
MAX_SLOW_NATIVE_API_OUTPUT_NUM); i++) {
+        sb.append(i == 0 ? "" : ",").append(req.getPaths().get(i));
+      }
+      return String.format(
+          "Request name: TSLastDataQueryReq, paths size: %s, some paths: %s",
+          req.getPathsSize(), sb);
+    } else if (request instanceof TSAggregationQueryReq) {
+      TSAggregationQueryReq req = (TSAggregationQueryReq) request;
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < Math.min(req.getPathsSize(), 
MAX_SLOW_NATIVE_API_OUTPUT_NUM); i++) {
+        sb.append(i == 0 ? "" : ",")
+            .append(req.getAggregations().get(i))
+            .append(":")
+            .append(req.getPaths().get(i));
+      }
+      return String.format(
+          "Request name: TSAggregationQueryReq, startTime: %s, endTime: %s, "
+              + "paths size: %s, some paths: %s",
+          req.getStartTime(), req.getEndTime(), req.getPathsSize(), sb);
+    } else if (request instanceof TSFastLastDataQueryForOneDeviceReq) {
+      TSFastLastDataQueryForOneDeviceReq req = 
(TSFastLastDataQueryForOneDeviceReq) request;
+      return String.format(
+          "Request name: TSFastLastDataQueryForOneDeviceReq, "
+              + "db: %s, deviceId: %s, sensorSize: %s, sensors: %s",
+          req.getDb(), req.getDeviceId(), req.getSensorsSize(), 
req.getSensors());
+    } else if (request instanceof TSFetchResultsReq) {
+      TSFetchResultsReq req = (TSFetchResultsReq) request;
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0;
+          i < Math.min(queryExecution.getOutputValueColumnCount(), 
MAX_SLOW_NATIVE_API_OUTPUT_NUM);
+          i++) {
+        sb.append(i == 0 ? "" : ",")
+            .append(queryExecution.getDatasetHeader().getRespColumns().get(i));
+      }
+      return String.format(
+          "Request name: TSFetchResultsReq, "
+              + "queryId: %s, output value column count: %s, fetchSize: %s, "
+              + "some response headers: %s",
+          req.getQueryId(), queryExecution.getOutputValueColumnCount(), 
req.getFetchSize(), sb);
+    } else {
+      return UNKNOWN_RESULT;
+    }
+  }
+
   public static int runCli(
       List<Class<? extends Runnable>> commands,
       String[] args,

Reply via email to