This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5cb15970ef7421a5cd319d6920bd1cb0d8f1ba3a Author: YangCaiyin <[email protected]> AuthorDate: Fri Mar 15 09:03:08 2024 +0800 Optimize the result of EXPLAIN ANALYZE --- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++++++++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 14 ++++++++++++++ .../fragment/FragmentInstanceExecution.java | 7 ++++--- .../execution/operator/ExplainAnalyzeOperator.java | 18 +++++++++++++----- .../queryengine/execution/operator/Operator.java | 10 +++++----- .../execution/operator/OperatorContext.java | 10 +++++----- .../plan/planner/LogicalPlanVisitor.java | 3 ++- .../plan/planner/OperatorTreeGenerator.java | 2 +- .../plan/planner/plan/node/ExplainAnalyzeNode.java | 22 +++++++++++++++++----- .../FragmentInstanceStatisticsDrawer.java | 9 ++++++--- .../statistics/StatisticsMergeUtil.java | 2 +- .../resources/conf/iotdb-common.properties | 4 ++++ .../src/main/thrift/datanode.thrift | 3 ++- 13 files changed, 84 insertions(+), 30 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 08420029988..682d3d00022 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -349,6 +349,8 @@ public class IoTDBConfig { private int degreeOfParallelism = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + private int mergeThresholdOfExplainAnalyze = 10; + private int modeMapSizeThreshold = 10000; /** How many queries can be concurrently executed. When <= 0, use 1000. */ @@ -1604,6 +1606,14 @@ public class IoTDBConfig { return degreeOfParallelism; } + public void setMergeThresholdOfExplainAnalyze(int mergeThresholdOfExplainAnalyze) { + this.mergeThresholdOfExplainAnalyze = mergeThresholdOfExplainAnalyze; + } + + public int getMergeThresholdOfExplainAnalyze() { + return mergeThresholdOfExplainAnalyze; + } + public int getMaxAllowedConcurrentQueries() { return maxAllowedConcurrentQueries; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 9160cb8b0b1..ba8a5f41c40 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -552,6 +552,12 @@ public class IoTDBDescriptor { conf.setDegreeOfParallelism(Runtime.getRuntime().availableProcessors() / 2); } + conf.setMergeThresholdOfExplainAnalyze( + Integer.parseInt( + properties.getProperty( + "merge_threshold_of_explain_analyze", + Integer.toString(conf.getMergeThresholdOfExplainAnalyze())))); + conf.setModeMapSizeThreshold( Integer.parseInt( properties.getProperty( @@ -1613,6 +1619,14 @@ public class IoTDBDescriptor { properties.getProperty( "load_clean_up_task_execution_delay_time_seconds", String.valueOf(conf.getLoadCleanupTaskExecutionDelayTimeSeconds())))); + + // update merge_threshold_of_explain_analyze + conf.setMergeThresholdOfExplainAnalyze( + Integer.parseInt( + properties.getProperty( + "merge_threshold_of_explain_analyze", + String.valueOf(conf.getMergeThresholdOfExplainAnalyze())))); + } catch (Exception e) { throw new QueryProcessException(String.format("Fail to reload configuration because %s", e)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java index f76e54a9409..dac8bdf9080 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecution.java @@ -149,7 +149,7 @@ public class FragmentInstanceExecution { FragmentInstanceContext context, TFetchFragmentInstanceStatisticsResp statistics) { statistics.setFragmentInstanceId(context.getId().toThrift()); statistics.setQueryStatistics(context.getQueryStatistics().toThrift()); - + statistics.setState(getInstanceState().toString()); IDataRegionForQuery dataRegionForQuery = context.getDataRegion(); if (dataRegionForQuery instanceof VirtualDataRegion) { // We don't need to output the region having ExplainAnalyzeOperator only. @@ -206,7 +206,8 @@ public class FragmentInstanceExecution { setOperatorStatistics(operatorStatistics, operatorContext); operatorStatisticsMap.put(operatorContext.getPlanNodeId().toString(), operatorStatistics); operatorCoutMap.put(operatorType, operatorCoutMap.getOrDefault(operatorType, 0) + 1); - if (operatorCoutMap.get(operatorType) >= 10) { + if (operatorCoutMap.get(operatorType) + >= IoTDBDescriptor.getInstance().getConfig().getMergeThresholdOfExplainAnalyze()) { needMerge = true; // merge all the operatorStatistics with the overload type and remain only one in // operatorStatisticsMap @@ -224,7 +225,7 @@ public class FragmentInstanceExecution { operatorStatistics.setTotalExecutionTimeInNanos(operatorContext.getTotalExecutionTimeInNanos()); operatorStatistics.setNextCalledCount(operatorContext.getNextCalledCount()); operatorStatistics.setHasNextCalledCount(operatorContext.getHasNextCalledCount()); - operatorStatistics.setInputRows(operatorContext.getInputRows()); + operatorStatistics.setOutputRows(operatorContext.getOutputRows()); operatorStatistics.setSpecifiedInfo(operatorContext.getSpecifiedInfo()); operatorStatistics.setMemoryUsage(operatorContext.getEstimatedMemorySize()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/ExplainAnalyzeOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/ExplainAnalyzeOperator.java index ae8fb9ccdcc..a5ab5a47c06 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/ExplainAnalyzeOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/ExplainAnalyzeOperator.java @@ -59,18 +59,21 @@ public class ExplainAnalyzeOperator implements ProcessOperator { private final boolean verbose; private boolean outputResult = false; private final List<FragmentInstance> instances; - private static final long LOG_INTERNAL_IN_MS = 10000; private static final Logger logger = LoggerFactory.getLogger(IoTDBConstant.EXPLAIN_ANALYZE_LOGGER_NAME); private final FragmentInstanceStatisticsDrawer fragmentInstanceStatisticsDrawer = new FragmentInstanceStatisticsDrawer(); private static final String LOG_TITLE = - "---------------------Intermediate result of EXPLAIN ANALYZE---------------------:"; + "---------------------Intermediate Results of EXPLAIN ANALYZE---------------------:"; private final ScheduledFuture<?> logRecordTask; private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager; public ExplainAnalyzeOperator( - OperatorContext operatorContext, Operator child, long queryId, boolean verbose) { + OperatorContext operatorContext, + Operator child, + long queryId, + boolean verbose, + long timeout) { this.operatorContext = operatorContext; this.child = child; this.verbose = verbose; @@ -81,12 +84,17 @@ public class ExplainAnalyzeOperator implements ProcessOperator { QueryExecution queryExecution = (QueryExecution) coordinator.getQueryExecution(queryId); this.instances = queryExecution.getDistributedPlan().getInstances(); fragmentInstanceStatisticsDrawer.renderPlanStatistics(queryExecution.getContext()); + + // The time interval guarantees the result of EXPLAIN ANALYZE will be printed at least three + // times. + // And the maximum time interval is 15s. + long logIntervalInMs = Math.min(timeout / 3, 15000); this.logRecordTask = ScheduledExecutorUtil.safelyScheduleAtFixedRate( queryExecution.getScheduledExecutor(), this::logIntermediateResultIfTimeout, - LOG_INTERNAL_IN_MS, - LOG_INTERNAL_IN_MS, + logIntervalInMs, + logIntervalInMs, TimeUnit.MILLISECONDS); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java index 26665559721..0af295e3efa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/Operator.java @@ -43,17 +43,17 @@ public interface Operator extends AutoCloseable { default TsBlock nextWithTimer() throws Exception { OperatorContext context = getOperatorContext(); long startTime = System.nanoTime(); - TsBlock input = null; + TsBlock output = null; try { - input = next(); + output = next(); } finally { context.recordExecutionTime(System.nanoTime() - startTime); - if (input != null) { - context.addInputRows(input.getPositionCount()); + if (output != null) { + context.addOutputRows(output.getPositionCount()); } context.recordNextCalled(); } - return input; + return output; } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java index 64bb52259e9..00a39e418ca 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/OperatorContext.java @@ -58,7 +58,7 @@ public class OperatorContext { // SpecifiedInfo is used to record some custom information for the operator, // which will be shown in the result of EXPLAIN ANALYZE to analyze the query. private Map<String, String> specifiedInfo = null; - private long inputRows = 0; + private long output = 0; private long estimatedMemorySize; public OperatorContext( @@ -149,12 +149,12 @@ public class OperatorContext { return estimatedMemorySize; } - public void addInputRows(long inputRows) { - this.inputRows += inputRows; + public void addOutputRows(long outputRows) { + this.output += outputRows; } - public long getInputRows() { - return inputRows; + public long getOutputRows() { + return output; } public void recordSpecifiedInfo(String key, String value) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index c18ae28479e..87c04426010 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -134,7 +134,8 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte context.getQueryId().genPlanNodeId(), root, explainAnalyzeStatement.isVerbose(), - context.getLocalQueryId()); + context.getLocalQueryId(), + context.getTimeOut()); context.getTypeProvider().setType(ColumnHeaderConstant.EXPLAIN_ANALYZE, TSDataType.TEXT); return root; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index db47d9dbf56..a6d043f5ad2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -3260,6 +3260,6 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getPlanNodeId(), ExplainAnalyzeOperator.class.getSimpleName()); return new ExplainAnalyzeOperator( - operatorContext, operator, node.getQueryId(), node.isVerbose()); + operatorContext, operator, node.getQueryId(), node.isVerbose(), node.getTimeout()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/ExplainAnalyzeNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/ExplainAnalyzeNode.java index 14bb402df75..a8f8783d203 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/ExplainAnalyzeNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/ExplainAnalyzeNode.java @@ -33,16 +33,19 @@ public class ExplainAnalyzeNode extends SingleChildProcessNode { private final boolean verbose; private final long queryId; + private final long timeout; - public ExplainAnalyzeNode(PlanNodeId id, PlanNode child, boolean verbose, long queryId) { + public ExplainAnalyzeNode( + PlanNodeId id, PlanNode child, boolean verbose, long queryId, long timeout) { super(id, child); this.verbose = verbose; this.queryId = queryId; + this.timeout = timeout; } @Override public PlanNode clone() { - return new ExplainAnalyzeNode(getPlanNodeId(), child, verbose, queryId); + return new ExplainAnalyzeNode(getPlanNodeId(), child, verbose, queryId, timeout); } @Override @@ -66,13 +69,15 @@ public class ExplainAnalyzeNode extends SingleChildProcessNode { PlanNodeType.EXPLAIN_ANALYZE.serialize(stream); ReadWriteIOUtils.write(verbose, stream); ReadWriteIOUtils.write(queryId, stream); + ReadWriteIOUtils.write(timeout, stream); } public static ExplainAnalyzeNode deserialize(ByteBuffer byteBuffer) { boolean verbose = ReadWriteIOUtils.readBool(byteBuffer); long queryId = ReadWriteIOUtils.readLong(byteBuffer); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new ExplainAnalyzeNode(planNodeId, null, verbose, queryId); + long timeout = ReadWriteIOUtils.readLong(byteBuffer); + return new ExplainAnalyzeNode(planNodeId, null, verbose, queryId, timeout); } public boolean isVerbose() { @@ -83,16 +88,23 @@ public class ExplainAnalyzeNode extends SingleChildProcessNode { return queryId; } + public long getTimeout() { + return timeout; + } + @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof ExplainAnalyzeNode)) return false; ExplainAnalyzeNode that = (ExplainAnalyzeNode) o; - return verbose == that.verbose; + return verbose == that.verbose && queryId == that.queryId && timeout == that.timeout; } @Override public int hashCode() { - return super.hashCode() + Boolean.hashCode(verbose); + return super.hashCode() + + Boolean.hashCode(verbose) + + Long.hashCode(queryId) + + Long.hashCode(timeout); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java index 55e085b6cc1..e7ca8ab887e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FragmentInstanceStatisticsDrawer.java @@ -89,8 +89,11 @@ public class FragmentInstanceStatisticsDrawer { singleFragmentInstanceArea, 0, String.format( - "FRAGMENT-INSTANCE[Id: %s][IP: %s][DataRegion: %s]", - instance.getId().toString(), statistics.getIp(), statistics.getDataRegion())); + "FRAGMENT-INSTANCE[Id: %s][IP: %s][DataRegion: %s][State: %s]", + instance.getId().toString(), + statistics.getIp(), + statistics.getDataRegion(), + statistics.getState())); addLine( singleFragmentInstanceArea, 1, @@ -362,7 +365,7 @@ public class FragmentInstanceStatisticsDrawer { addLine( singleFragmentInstanceArea, indentNum + 2, - String.format("input: %s rows", operatorStatistic.getInputRows())); + String.format("output: %s rows", operatorStatistic.getOutputRows())); addLine( singleFragmentInstanceArea, indentNum + 2, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/StatisticsMergeUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/StatisticsMergeUtil.java index 88679fe8905..b2f39d12672 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/StatisticsMergeUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/StatisticsMergeUtil.java @@ -84,7 +84,7 @@ public class StatisticsMergeUtil { first.getTotalExecutionTimeInNanos() + second.getTotalExecutionTimeInNanos()); first.setNextCalledCount(first.getNextCalledCount() + second.getNextCalledCount()); first.setHasNextCalledCount(first.getHasNextCalledCount() + second.getHasNextCalledCount()); - first.setInputRows(first.getInputRows() + second.getInputRows()); + first.setOutputRows(first.getOutputRows() + second.getOutputRows()); first.setMemoryUsage(first.getMemoryUsage() + second.getMemoryUsage()); first.setCount(first.getCount() + 1); first.setSpecifiedInfo( diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties index 1a626c25aa2..425872440b7 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -436,6 +436,10 @@ data_replication_factor=1 # Datatype: long # sort_buffer_size_in_bytes=1048576 +# The threshold of operator count in the result set of EXPLAIN ANALYZE, if the number of operator in the result set is larger than this threshold, operator will be merged. +# Datatype: int +# merge_threshold_of_explain_analyze=10 + #################### ### Storage Engine Configuration #################### diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index f8b9a691005..ec8df721a94 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -471,7 +471,7 @@ struct TOperatorStatistics{ 4: required i64 nextCalledCount 5: required i64 hasNextCalledCount 6: required map<string,string> specifiedInfo - 7: required i64 inputRows + 7: required i64 outputRows 8: required i64 memoryUsage 9: optional i64 count } @@ -536,6 +536,7 @@ struct TFetchFragmentInstanceStatisticsResp { 13: optional i64 readyQueuedTime 14: optional i64 blockQueuedTime 15: optional string ip + 16: optional string state } /** * END: Used for EXPLAIN ANALYZE
