This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new de6c2864243 Optimize the result of EXPLAIN ANALYZE
de6c2864243 is described below

commit de6c2864243dbfde75396857008d86e60f30a67a
Author: YangCaiyin <[email protected]>
AuthorDate: Fri Mar 15 09:03:08 2024 +0800

    Optimize the result of EXPLAIN ANALYZE
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 14 ++++++++++++++
 .../fragment/FragmentInstanceExecution.java        |  7 ++++---
 .../execution/operator/ExplainAnalyzeOperator.java | 18 +++++++++++++-----
 .../queryengine/execution/operator/Operator.java   | 10 +++++-----
 .../execution/operator/OperatorContext.java        | 10 +++++-----
 .../plan/planner/LogicalPlanVisitor.java           |  3 ++-
 .../plan/planner/OperatorTreeGenerator.java        |  2 +-
 .../plan/planner/plan/node/ExplainAnalyzeNode.java | 22 +++++++++++++++++-----
 .../FragmentInstanceStatisticsDrawer.java          |  9 ++++++---
 .../statistics/StatisticsMergeUtil.java            |  2 +-
 .../resources/conf/iotdb-common.properties         |  4 ++++
 .../src/main/thrift/datanode.thrift                |  3 ++-
 13 files changed, 84 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 08420029988..682d3d00022 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -349,6 +349,8 @@ public class IoTDBConfig {
 
   private int degreeOfParallelism = Math.max(1, 
Runtime.getRuntime().availableProcessors() / 2);
 
+  private int mergeThresholdOfExplainAnalyze = 10;
+
   private int modeMapSizeThreshold = 10000;
 
   /** How many queries can be concurrently executed. When <= 0, use 1000. */
@@ -1604,6 +1606,14 @@ public class IoTDBConfig {
     return degreeOfParallelism;
   }
 
+  public void setMergeThresholdOfExplainAnalyze(int 
mergeThresholdOfExplainAnalyze) {
+    this.mergeThresholdOfExplainAnalyze = mergeThresholdOfExplainAnalyze;
+  }
+
+  public int getMergeThresholdOfExplainAnalyze() {
+    return mergeThresholdOfExplainAnalyze;
+  }
+
   public int getMaxAllowedConcurrentQueries() {
     return maxAllowedConcurrentQueries;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 9160cb8b0b1..ba8a5f41c40 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -552,6 +552,12 @@ public class IoTDBDescriptor {
       conf.setDegreeOfParallelism(Runtime.getRuntime().availableProcessors() / 
2);
     }
 
+    conf.setMergeThresholdOfExplainAnalyze(
+        Integer.parseInt(
+            properties.getProperty(
+                "merge_threshold_of_explain_analyze",
+                Integer.toString(conf.getMergeThresholdOfExplainAnalyze()))));
+
     conf.setModeMapSizeThreshold(
         Integer.parseInt(
             properties.getProperty(
@@ -1613,6 +1619,14 @@ public class IoTDBDescriptor {
               properties.getProperty(
                   "load_clean_up_task_execution_delay_time_seconds",
                   
String.valueOf(conf.getLoadCleanupTaskExecutionDelayTimeSeconds()))));
+
+      // update merge_threshold_of_explain_analyze
+      conf.setMergeThresholdOfExplainAnalyze(
+          Integer.parseInt(
+              properties.getProperty(
+                  "merge_threshold_of_explain_analyze",
+                  String.valueOf(conf.getMergeThresholdOfExplainAnalyze()))));
+
     } catch (Exception e) {
       throw new QueryProcessException(String.format("Fail to reload 
configuration because %s", e));
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
index f76e54a9409..dac8bdf9080 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java
@@ -149,7 +149,7 @@ public class FragmentInstanceExecution {
       FragmentInstanceContext context, TFetchFragmentInstanceStatisticsResp 
statistics) {
     statistics.setFragmentInstanceId(context.getId().toThrift());
     statistics.setQueryStatistics(context.getQueryStatistics().toThrift());
-
+    statistics.setState(getInstanceState().toString());
     IDataRegionForQuery dataRegionForQuery = context.getDataRegion();
     if (dataRegionForQuery instanceof VirtualDataRegion) {
       // We don't need to output the region having ExplainAnalyzeOperator only.
@@ -206,7 +206,8 @@ public class FragmentInstanceExecution {
         setOperatorStatistics(operatorStatistics, operatorContext);
         operatorStatisticsMap.put(operatorContext.getPlanNodeId().toString(), 
operatorStatistics);
         operatorCoutMap.put(operatorType, 
operatorCoutMap.getOrDefault(operatorType, 0) + 1);
-        if (operatorCoutMap.get(operatorType) >= 10) {
+        if (operatorCoutMap.get(operatorType)
+            >= 
IoTDBDescriptor.getInstance().getConfig().getMergeThresholdOfExplainAnalyze()) {
           needMerge = true;
           // merge all the operatorStatistics with the overload type and 
remain only one in
           // operatorStatisticsMap
@@ -224,7 +225,7 @@ public class FragmentInstanceExecution {
     
operatorStatistics.setTotalExecutionTimeInNanos(operatorContext.getTotalExecutionTimeInNanos());
     
operatorStatistics.setNextCalledCount(operatorContext.getNextCalledCount());
     
operatorStatistics.setHasNextCalledCount(operatorContext.getHasNextCalledCount());
-    operatorStatistics.setInputRows(operatorContext.getInputRows());
+    operatorStatistics.setOutputRows(operatorContext.getOutputRows());
     operatorStatistics.setSpecifiedInfo(operatorContext.getSpecifiedInfo());
     
operatorStatistics.setMemoryUsage(operatorContext.getEstimatedMemorySize());
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/ExplainAnalyzeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/ExplainAnalyzeOperator.java
index ae8fb9ccdcc..a5ab5a47c06 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/ExplainAnalyzeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/ExplainAnalyzeOperator.java
@@ -59,18 +59,21 @@ public class ExplainAnalyzeOperator implements 
ProcessOperator {
   private final boolean verbose;
   private boolean outputResult = false;
   private final List<FragmentInstance> instances;
-  private static final long LOG_INTERNAL_IN_MS = 10000;
   private static final Logger logger =
       LoggerFactory.getLogger(IoTDBConstant.EXPLAIN_ANALYZE_LOGGER_NAME);
   private final FragmentInstanceStatisticsDrawer 
fragmentInstanceStatisticsDrawer =
       new FragmentInstanceStatisticsDrawer();
   private static final String LOG_TITLE =
-      "---------------------Intermediate result of EXPLAIN 
ANALYZE---------------------:";
+      "---------------------Intermediate Results of EXPLAIN 
ANALYZE---------------------:";
   private final ScheduledFuture<?> logRecordTask;
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
clientManager;
 
   public ExplainAnalyzeOperator(
-      OperatorContext operatorContext, Operator child, long queryId, boolean 
verbose) {
+      OperatorContext operatorContext,
+      Operator child,
+      long queryId,
+      boolean verbose,
+      long timeout) {
     this.operatorContext = operatorContext;
     this.child = child;
     this.verbose = verbose;
@@ -81,12 +84,17 @@ public class ExplainAnalyzeOperator implements 
ProcessOperator {
     QueryExecution queryExecution = (QueryExecution) 
coordinator.getQueryExecution(queryId);
     this.instances = queryExecution.getDistributedPlan().getInstances();
     
fragmentInstanceStatisticsDrawer.renderPlanStatistics(queryExecution.getContext());
+
+    // The time interval guarantees the result of EXPLAIN ANALYZE will be 
printed at least three
+    // times.
+    // And the maximum time interval is 15s.
+    long logIntervalInMs = Math.min(timeout / 3, 15000);
     this.logRecordTask =
         ScheduledExecutorUtil.safelyScheduleAtFixedRate(
             queryExecution.getScheduledExecutor(),
             this::logIntermediateResultIfTimeout,
-            LOG_INTERNAL_IN_MS,
-            LOG_INTERNAL_IN_MS,
+            logIntervalInMs,
+            logIntervalInMs,
             TimeUnit.MILLISECONDS);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java
index 26665559721..0af295e3efa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java
@@ -43,17 +43,17 @@ public interface Operator extends AutoCloseable {
   default TsBlock nextWithTimer() throws Exception {
     OperatorContext context = getOperatorContext();
     long startTime = System.nanoTime();
-    TsBlock input = null;
+    TsBlock output = null;
     try {
-      input = next();
+      output = next();
     } finally {
       context.recordExecutionTime(System.nanoTime() - startTime);
-      if (input != null) {
-        context.addInputRows(input.getPositionCount());
+      if (output != null) {
+        context.addOutputRows(output.getPositionCount());
       }
       context.recordNextCalled();
     }
-    return input;
+    return output;
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
index 64bb52259e9..00a39e418ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java
@@ -58,7 +58,7 @@ public class OperatorContext {
   // SpecifiedInfo is used to record some custom information for the operator,
   // which will be shown in the result of EXPLAIN ANALYZE to analyze the query.
   private Map<String, String> specifiedInfo = null;
-  private long inputRows = 0;
+  private long output = 0;
   private long estimatedMemorySize;
 
   public OperatorContext(
@@ -149,12 +149,12 @@ public class OperatorContext {
     return estimatedMemorySize;
   }
 
-  public void addInputRows(long inputRows) {
-    this.inputRows += inputRows;
+  public void addOutputRows(long outputRows) {
+    this.output += outputRows;
   }
 
-  public long getInputRows() {
-    return inputRows;
+  public long getOutputRows() {
+    return output;
   }
 
   public void recordSpecifiedInfo(String key, String value) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
index c18ae28479e..87c04426010 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java
@@ -134,7 +134,8 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
             context.getQueryId().genPlanNodeId(),
             root,
             explainAnalyzeStatement.isVerbose(),
-            context.getLocalQueryId());
+            context.getLocalQueryId(),
+            context.getTimeOut());
     context.getTypeProvider().setType(ColumnHeaderConstant.EXPLAIN_ANALYZE, 
TSDataType.TEXT);
     return root;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index db47d9dbf56..a6d043f5ad2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -3260,6 +3260,6 @@ public class OperatorTreeGenerator extends 
PlanVisitor<Operator, LocalExecutionP
                 node.getPlanNodeId(),
                 ExplainAnalyzeOperator.class.getSimpleName());
     return new ExplainAnalyzeOperator(
-        operatorContext, operator, node.getQueryId(), node.isVerbose());
+        operatorContext, operator, node.getQueryId(), node.isVerbose(), 
node.getTimeout());
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/ExplainAnalyzeNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/ExplainAnalyzeNode.java
index 14bb402df75..a8f8783d203 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/ExplainAnalyzeNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/ExplainAnalyzeNode.java
@@ -33,16 +33,19 @@ public class ExplainAnalyzeNode extends 
SingleChildProcessNode {
   private final boolean verbose;
 
   private final long queryId;
+  private final long timeout;
 
-  public ExplainAnalyzeNode(PlanNodeId id, PlanNode child, boolean verbose, 
long queryId) {
+  public ExplainAnalyzeNode(
+      PlanNodeId id, PlanNode child, boolean verbose, long queryId, long 
timeout) {
     super(id, child);
     this.verbose = verbose;
     this.queryId = queryId;
+    this.timeout = timeout;
   }
 
   @Override
   public PlanNode clone() {
-    return new ExplainAnalyzeNode(getPlanNodeId(), child, verbose, queryId);
+    return new ExplainAnalyzeNode(getPlanNodeId(), child, verbose, queryId, 
timeout);
   }
 
   @Override
@@ -66,13 +69,15 @@ public class ExplainAnalyzeNode extends 
SingleChildProcessNode {
     PlanNodeType.EXPLAIN_ANALYZE.serialize(stream);
     ReadWriteIOUtils.write(verbose, stream);
     ReadWriteIOUtils.write(queryId, stream);
+    ReadWriteIOUtils.write(timeout, stream);
   }
 
   public static ExplainAnalyzeNode deserialize(ByteBuffer byteBuffer) {
     boolean verbose = ReadWriteIOUtils.readBool(byteBuffer);
     long queryId = ReadWriteIOUtils.readLong(byteBuffer);
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new ExplainAnalyzeNode(planNodeId, null, verbose, queryId);
+    long timeout = ReadWriteIOUtils.readLong(byteBuffer);
+    return new ExplainAnalyzeNode(planNodeId, null, verbose, queryId, timeout);
   }
 
   public boolean isVerbose() {
@@ -83,16 +88,23 @@ public class ExplainAnalyzeNode extends 
SingleChildProcessNode {
     return queryId;
   }
 
+  public long getTimeout() {
+    return timeout;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) return true;
     if (!(o instanceof ExplainAnalyzeNode)) return false;
     ExplainAnalyzeNode that = (ExplainAnalyzeNode) o;
-    return verbose == that.verbose;
+    return verbose == that.verbose && queryId == that.queryId && timeout == 
that.timeout;
   }
 
   @Override
   public int hashCode() {
-    return super.hashCode() + Boolean.hashCode(verbose);
+    return super.hashCode()
+        + Boolean.hashCode(verbose)
+        + Long.hashCode(queryId)
+        + Long.hashCode(timeout);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java
index 55e085b6cc1..e7ca8ab887e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java
@@ -89,8 +89,11 @@ public class FragmentInstanceStatisticsDrawer {
           singleFragmentInstanceArea,
           0,
           String.format(
-              "FRAGMENT-INSTANCE[Id: %s][IP: %s][DataRegion: %s]",
-              instance.getId().toString(), statistics.getIp(), 
statistics.getDataRegion()));
+              "FRAGMENT-INSTANCE[Id: %s][IP: %s][DataRegion: %s][State: %s]",
+              instance.getId().toString(),
+              statistics.getIp(),
+              statistics.getDataRegion(),
+              statistics.getState()));
       addLine(
           singleFragmentInstanceArea,
           1,
@@ -362,7 +365,7 @@ public class FragmentInstanceStatisticsDrawer {
       addLine(
           singleFragmentInstanceArea,
           indentNum + 2,
-          String.format("input: %s rows", operatorStatistic.getInputRows()));
+          String.format("output: %s rows", operatorStatistic.getOutputRows()));
       addLine(
           singleFragmentInstanceArea,
           indentNum + 2,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/StatisticsMergeUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/StatisticsMergeUtil.java
index 88679fe8905..b2f39d12672 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/StatisticsMergeUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/StatisticsMergeUtil.java
@@ -84,7 +84,7 @@ public class StatisticsMergeUtil {
         first.getTotalExecutionTimeInNanos() + 
second.getTotalExecutionTimeInNanos());
     first.setNextCalledCount(first.getNextCalledCount() + 
second.getNextCalledCount());
     first.setHasNextCalledCount(first.getHasNextCalledCount() + 
second.getHasNextCalledCount());
-    first.setInputRows(first.getInputRows() + second.getInputRows());
+    first.setOutputRows(first.getOutputRows() + second.getOutputRows());
     first.setMemoryUsage(first.getMemoryUsage() + second.getMemoryUsage());
     first.setCount(first.getCount() + 1);
     first.setSpecifiedInfo(
diff --git 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 1a626c25aa2..425872440b7 100644
--- 
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ 
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -436,6 +436,10 @@ data_replication_factor=1
 # Datatype: long
 # sort_buffer_size_in_bytes=1048576
 
+# The threshold of operator count in the result set of EXPLAIN ANALYZE, if the 
number of operator in the result set is larger than this threshold, operator 
will be merged.
+# Datatype: int
+# merge_threshold_of_explain_analyze=10
+
 ####################
 ### Storage Engine Configuration
 ####################
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index f8b9a691005..ec8df721a94 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -471,7 +471,7 @@ struct TOperatorStatistics{
   4: required i64 nextCalledCount
   5: required i64 hasNextCalledCount
   6: required map<string,string> specifiedInfo
-  7: required i64 inputRows
+  7: required i64 outputRows
   8: required i64 memoryUsage
   9: optional i64 count
 }
@@ -536,6 +536,7 @@ struct TFetchFragmentInstanceStatisticsResp {
   13: optional i64 readyQueuedTime
   14: optional i64 blockQueuedTime
   15: optional string ip
+  16: optional string state
 }
 /**
 * END: Used for EXPLAIN ANALYZE

Reply via email to