This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new de5405fef83 Print native request api in datanode_slow_sql.log
de5405fef83 is described below

commit de5405fef83e76a08982b7607a3463f27d058c55
Author: Beyyes <[email protected]>
AuthorDate: Wed Mar 13 20:52:19 2024 +0800

    Print native request api in datanode_slow_sql.log
---
 .../protocol/thrift/impl/ClientRPCServiceImpl.java | 27 +++++---
 .../iotdb/db/queryengine/plan/Coordinator.java     | 10 +--
 .../analyze/schema/ClusterSchemaFetchExecutor.java |  2 +-
 .../org/apache/iotdb/db/utils/CommonUtils.java     | 78 ++++++++++++++++++++++
 4 files changed, 101 insertions(+), 16 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
index 5612d8777e1..6d20d6f7d49 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java
@@ -201,6 +201,7 @@ import static 
org.apache.iotdb.commons.partition.DataPartition.NOT_ASSIGNED;
 import static 
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil.initTimeRangeIterator;
+import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
 import static 
org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
 import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
@@ -354,7 +355,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         // record total time cost for one query
         long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
         addQueryLatency(statementType, executionTime > 0 ? executionTime : 
currentOperationCost);
-        COORDINATOR.cleanupQueryExecution(queryId, t);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
       }
       SESSION_MANAGER.updateIdleTime();
       if (quota != null) {
@@ -442,7 +443,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
         addQueryLatency(
             StatementType.QUERY, executionTime > 0 ? executionTime : 
currentOperationCost);
-        COORDINATOR.cleanupQueryExecution(queryId, t);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
       }
 
       SESSION_MANAGER.updateIdleTime();
@@ -532,7 +533,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
         addQueryLatency(
             StatementType.QUERY, executionTime > 0 ? executionTime : 
currentOperationCost);
-        COORDINATOR.cleanupQueryExecution(queryId, t);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
       }
 
       SESSION_MANAGER.updateIdleTime();
@@ -619,7 +620,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
         addQueryLatency(
             StatementType.QUERY, executionTime > 0 ? executionTime : 
currentOperationCost);
-        COORDINATOR.cleanupQueryExecution(queryId, t);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
       }
 
       SESSION_MANAGER.updateIdleTime();
@@ -953,7 +954,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         long executionTime = COORDINATOR.getTotalExecutionTime(queryId);
         addQueryLatency(
             StatementType.QUERY, executionTime > 0 ? executionTime : 
currentOperationCost);
-        COORDINATOR.cleanupQueryExecution(queryId, t);
+        COORDINATOR.cleanupQueryExecution(queryId, req, t);
       }
       SESSION_MANAGER.updateIdleTime();
       if (quota != null) {
@@ -1053,6 +1054,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     boolean finished = false;
     String statementType = null;
     Throwable t = null;
+    IQueryExecution queryExecution = null;
     try {
       IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
       if (!SESSION_MANAGER.checkLogin(clientSession)) {
@@ -1060,7 +1062,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
       }
       TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
 
-      IQueryExecution queryExecution = 
COORDINATOR.getQueryExecution(req.queryId);
+      queryExecution = COORDINATOR.getQueryExecution(req.queryId);
 
       if (queryExecution == null) {
         resp.setHasResultSet(false);
@@ -1084,7 +1086,8 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     } catch (Exception e) {
       finished = true;
       t = e;
-      return RpcUtils.getTSFetchResultsResp(onQueryException(e, 
OperationType.FETCH_RESULTS));
+      return RpcUtils.getTSFetchResultsResp(
+          onQueryException(e, getContentOfRequest(req, queryExecution)));
     } catch (Error error) {
       finished = true;
       t = error;
@@ -1103,7 +1106,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId);
         addQueryLatency(
             StatementType.QUERY, executionTime > 0 ? executionTime : 
currentOperationCost);
-        COORDINATOR.cleanupQueryExecution(req.queryId, t);
+        COORDINATOR.cleanupQueryExecution(req.queryId, req, t);
       }
 
       SESSION_MANAGER.updateIdleTime();
@@ -1573,6 +1576,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     long startTime = System.nanoTime();
     String statementType = null;
     Throwable t = null;
+    IQueryExecution queryExecution = null;
     try {
       IClientSession clientSession = 
SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();
       if (!SESSION_MANAGER.checkLogin(clientSession)) {
@@ -1581,7 +1585,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
 
       TSFetchResultsResp resp = 
RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
 
-      IQueryExecution queryExecution = 
COORDINATOR.getQueryExecution(req.queryId);
+      queryExecution = COORDINATOR.getQueryExecution(req.queryId);
       if (queryExecution == null) {
         resp.setHasResultSet(false);
         resp.setMoreData(true);
@@ -1604,7 +1608,8 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
     } catch (Exception e) {
       finished = true;
       t = e;
-      return RpcUtils.getTSFetchResultsResp(onQueryException(e, 
OperationType.FETCH_RESULTS));
+      return RpcUtils.getTSFetchResultsResp(
+          onQueryException(e, getContentOfRequest(req, queryExecution)));
     } catch (Error error) {
       t = error;
       throw error;
@@ -1622,7 +1627,7 @@ public class ClientRPCServiceImpl implements 
IClientRPCServiceWithHandler {
         long executionTime = COORDINATOR.getTotalExecutionTime(req.queryId);
         addQueryLatency(
             StatementType.QUERY, executionTime > 0 ? executionTime : 
currentOperationCost);
-        COORDINATOR.cleanupQueryExecution(req.queryId, t);
+        COORDINATOR.cleanupQueryExecution(req.queryId, req, t);
       }
 
       SESSION_MANAGER.updateIdleTime();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 8e9cedf2862..d68f022c6dd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -58,6 +58,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.BiFunction;
 
 import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
+import static org.apache.iotdb.db.utils.CommonUtils.getContentOfRequest;
 
 /**
  * The coordinator for MPP. It manages all the queries which are executed in 
current Node. And it
@@ -247,7 +248,8 @@ public class Coordinator {
     return queryIdGenerator.createNextQueryId();
   }
 
-  public void cleanupQueryExecution(Long queryId, Throwable t) {
+  public void cleanupQueryExecution(
+      Long queryId, org.apache.thrift.TBase<?, ?> nativeApiRequest, Throwable 
t) {
     IQueryExecution queryExecution = getQueryExecution(queryId);
     if (queryExecution != null) {
       try (SetThreadName threadName = new 
SetThreadName(queryExecution.getQueryId())) {
@@ -258,9 +260,9 @@ public class Coordinator {
           long costTime = queryExecution.getTotalExecutionTime();
           if (costTime / 1_000_000 >= CONFIG.getSlowQueryThreshold()) {
             SLOW_SQL_LOGGER.info(
-                "Cost: {} ms, sql is {}",
+                "Cost: {} ms, {}",
                 costTime / 1_000_000,
-                queryExecution.getExecuteSQL().orElse("UNKNOWN"));
+                getContentOfRequest(nativeApiRequest, queryExecution));
           }
         }
       }
@@ -268,7 +270,7 @@ public class Coordinator {
   }
 
   public void cleanupQueryExecution(Long queryId) {
-    cleanupQueryExecution(queryId, null);
+    cleanupQueryExecution(queryId, null, null);
   }
 
   public IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index 75e087307ff..e9f2cdf307d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -245,7 +245,7 @@ class ClusterSchemaFetchExecutor {
       t = throwable;
       throw throwable;
     } finally {
-      coordinator.cleanupQueryExecution(queryId, t);
+      coordinator.cleanupQueryExecution(queryId, null, t);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
index 4237ddc1fad..6796431e049 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/CommonUtils.java
@@ -20,7 +20,13 @@ package org.apache.iotdb.db.utils;
 
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
 import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.iotdb.service.rpc.thrift.TSAggregationQueryReq;
+import org.apache.iotdb.service.rpc.thrift.TSFastLastDataQueryForOneDeviceReq;
+import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
+import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
+import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
@@ -44,6 +50,10 @@ import java.util.Objects;
 @SuppressWarnings("java:S106") // for console outputs
 public class CommonUtils {
 
+  private static final int MAX_SLOW_NATIVE_API_OUTPUT_NUM = 10;
+
+  private static final String UNKNOWN_RESULT = "UNKNOWN";
+
   private CommonUtils() {}
 
   public static Object parseValue(TSDataType dataType, String value) throws 
QueryProcessException {
@@ -232,6 +242,74 @@ public class CommonUtils {
     throw new QueryProcessException("The BOOLEAN should be true/TRUE, 
false/FALSE or 0/1");
   }
 
+  public static String getContentOfRequest(
+      org.apache.thrift.TBase<?, ?> request, IQueryExecution queryExecution) {
+    if (queryExecution == null) {
+      return UNKNOWN_RESULT;
+    }
+
+    String executeSql = queryExecution.getExecuteSQL().orElse("");
+    if (!executeSql.isEmpty()) {
+      return executeSql;
+    } else if (request == null) {
+      return UNKNOWN_RESULT;
+    } else if (request instanceof TSRawDataQueryReq) {
+      TSRawDataQueryReq req = (TSRawDataQueryReq) request;
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < Math.min(req.getPathsSize(), 
MAX_SLOW_NATIVE_API_OUTPUT_NUM); i++) {
+        sb.append(i == 0 ? "" : ",").append(req.getPaths().get(i));
+      }
+      return String.format(
+          "Request name: TSRawDataQueryReq, paths size: %s, starTime: %s, "
+              + "endTime: %s, some paths: %s",
+          req.getPathsSize(), req.getStartTime(), req.getEndTime(), sb);
+    } else if (request instanceof TSLastDataQueryReq) {
+      TSLastDataQueryReq req = (TSLastDataQueryReq) request;
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < Math.min(req.getPathsSize(), 
MAX_SLOW_NATIVE_API_OUTPUT_NUM); i++) {
+        sb.append(i == 0 ? "" : ",").append(req.getPaths().get(i));
+      }
+      return String.format(
+          "Request name: TSLastDataQueryReq, paths size: %s, some paths: %s",
+          req.getPathsSize(), sb);
+    } else if (request instanceof TSAggregationQueryReq) {
+      TSAggregationQueryReq req = (TSAggregationQueryReq) request;
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < Math.min(req.getPathsSize(), 
MAX_SLOW_NATIVE_API_OUTPUT_NUM); i++) {
+        sb.append(i == 0 ? "" : ",")
+            .append(req.getAggregations().get(i))
+            .append(":")
+            .append(req.getPaths().get(i));
+      }
+      return String.format(
+          "Request name: TSAggregationQueryReq, startTime: %s, endTime: %s, "
+              + "paths size: %s, some paths: %s",
+          req.getStartTime(), req.getEndTime(), req.getPathsSize(), sb);
+    } else if (request instanceof TSFastLastDataQueryForOneDeviceReq) {
+      TSFastLastDataQueryForOneDeviceReq req = 
(TSFastLastDataQueryForOneDeviceReq) request;
+      return String.format(
+          "Request name: TSFastLastDataQueryForOneDeviceReq, "
+              + "db: %s, deviceId: %s, sensorSize: %s, sensors: %s",
+          req.getDb(), req.getDeviceId(), req.getSensorsSize(), 
req.getSensors());
+    } else if (request instanceof TSFetchResultsReq) {
+      TSFetchResultsReq req = (TSFetchResultsReq) request;
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0;
+          i < Math.min(queryExecution.getOutputValueColumnCount(), 
MAX_SLOW_NATIVE_API_OUTPUT_NUM);
+          i++) {
+        sb.append(i == 0 ? "" : ",")
+            .append(queryExecution.getDatasetHeader().getRespColumns().get(i));
+      }
+      return String.format(
+          "Request name: TSFetchResultsReq, "
+              + "queryId: %s, output value column count: %s, fetchSize: %s, "
+              + "some response headers: %s",
+          req.getQueryId(), queryExecution.getOutputValueColumnCount(), 
req.getFetchSize(), sb);
+    } else {
+      return UNKNOWN_RESULT;
+    }
+  }
+
   public static int runCli(
       List<Class<? extends Runnable>> commands,
       String[] args,

Reply via email to