This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch cost_analyze
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b77f71bdf0013f67fa5f542124063744169cb6aa
Author: Beyyes <[email protected]>
AuthorDate: Fri Nov 22 19:02:08 2024 +0800

    perfect query statistics
---
 .../iotdb/db/queryengine/plan/Coordinator.java     | 13 +++
 .../db/queryengine/plan/analyze/Analyzer.java      |  9 +-
 .../plan/execution/IQueryExecution.java            |  6 ++
 .../queryengine/plan/execution/QueryExecution.java | 31 ++++---
 .../plan/execution/config/ConfigExecution.java     | 11 +++
 .../db/queryengine/plan/planner/IPlanner.java      |  2 +
 .../queryengine/plan/planner/TreeModelPlanner.java |  8 +-
 .../plan/relational/analyzer/Analyzer.java         |  7 +-
 .../relational/planner/TableLogicalPlanner.java    | 15 +++-
 .../plan/relational/planner/TableModelPlanner.java |  8 +-
 .../distribute/TableDistributedPlanner.java        |  5 +-
 .../optimizations/PushPredicateIntoTableScan.java  | 16 ++--
 .../plan/scheduler/ClusterScheduler.java           |  5 --
 .../scheduler/FragmentInstanceDispatcherImpl.java  |  7 --
 .../FixedScheduledOutputQueryPlanStatistics.java   | 97 ++++++++++++++++++++++
 .../operator/MergeTreeSortOperatorTest.java        | 12 +++
 16 files changed, 207 insertions(+), 45 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 61be1471b15..0999c6831a9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -85,6 +85,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
 import 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
 import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import 
org.apache.iotdb.db.queryengine.statistics.FixedScheduledOutputQueryPlanStatistics;
 import org.apache.iotdb.db.utils.SetThreadName;
 
 import org.slf4j.Logger;
@@ -140,6 +141,9 @@ public class Coordinator {
   private final List<PlanOptimizer> logicalPlanOptimizers;
   private final List<PlanOptimizer> distributionPlanOptimizers;
 
+  FixedScheduledOutputQueryPlanStatistics 
fixedScheduledOutputQueryPlanStatistics =
+      new FixedScheduledOutputQueryPlanStatistics();
+
   private Coordinator() {
     this.queryExecutionMap = new ConcurrentHashMap<>();
     this.executor = getQueryExecutor();
@@ -443,6 +447,15 @@ public class Coordinator {
       try (SetThreadName threadName = new 
SetThreadName(queryExecution.getQueryId())) {
         LOGGER.debug("[CleanUpQuery]]");
         queryExecution.stopAndCleanup(t);
+
+        // TODO(beyyes) add fe statistic output
+        IQueryExecution queryExecution1 = queryExecutionMap.get(queryId);
+        if (queryExecution1.getPlanner() != null
+            && queryExecution1.getPlanner().isQueryStatement()) {
+          MPPQueryContext queryContext = queryExecution1.getQueryContext();
+          fixedScheduledOutputQueryPlanStatistics.recordCost(queryContext);
+        }
+
         queryExecutionMap.remove(queryId);
         if (queryExecution.isQuery()) {
           long costTime = queryExecution.getTotalExecutionTime();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
index 238620b6f48..6b2739034d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/Analyzer.java
@@ -52,8 +52,13 @@ public class Analyzer {
     }
 
     if (statement.isQuery()) {
-      QueryPlanCostMetricSet.getInstance()
-          .recordPlanCost(TREE_TYPE, ANALYZER, System.nanoTime() - startTime);
+      long cost =
+          System.nanoTime()
+              - startTime
+              - context.getFetchSchemaCost()
+              - context.getFetchPartitionCost();
+      QueryPlanCostMetricSet.getInstance().recordPlanCost(TREE_TYPE, ANALYZER, 
cost);
+      context.setAnalyzeCost(cost);
     }
     return analysis;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
index b35123e8f70..d2517c02b57 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java
@@ -20,7 +20,9 @@
 package org.apache.iotdb.db.queryengine.plan.execution;
 
 import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
+import org.apache.iotdb.db.queryengine.plan.planner.IPlanner;
 
 import org.apache.tsfile.read.common.block.TsBlock;
 
@@ -67,4 +69,8 @@ public interface IQueryExecution {
   Optional<String> getExecuteSQL();
 
   String getStatementType();
+
+  MPPQueryContext getQueryContext();
+
+  IPlanner getPlanner();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
index 4ecddf1a80e..c9fd577fad3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java
@@ -267,16 +267,17 @@ public class QueryExecution implements IQueryExecution {
 
   // Analyze the statement in QueryContext. Generate the analysis this query 
need
   private IAnalysis analyze(MPPQueryContext context) {
-    final long startTime = System.nanoTime();
-    IAnalysis result;
-    try {
-      result = planner.analyze(context);
-    } finally {
-      long analyzeCost = System.nanoTime() - startTime;
-      context.setAnalyzeCost(analyzeCost);
-      PERFORMANCE_OVERVIEW_METRICS.recordAnalyzeCost(analyzeCost);
-    }
-    return result;
+    return planner.analyze(context);
+    //    final long startTime = System.nanoTime();
+    //    IAnalysis result;
+    //    try {
+    //      result = planner.analyze(context);
+    //    } finally {
+    //      long analyzeCost = System.nanoTime() - startTime;
+    //      context.setAnalyzeCost(analyzeCost);
+    //      PERFORMANCE_OVERVIEW_METRICS.recordAnalyzeCost(analyzeCost);
+    //    }
+    //    return result;
   }
 
   private void schedule() {
@@ -693,6 +694,16 @@ public class QueryExecution implements IQueryExecution {
     return analysis.getStatementType();
   }
 
+  @Override
+  public MPPQueryContext getQueryContext() {
+    return context;
+  }
+
+  @Override
+  public IPlanner getPlanner() {
+    return planner;
+  }
+
   public MPPQueryContext getContext() {
     return context;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
index c0886d91842..9cd94461117 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.queryengine.plan.planner.IPlanner;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
@@ -248,4 +249,14 @@ public class ConfigExecution implements IQueryExecution {
   public String getStatementType() {
     return statementType.name();
   }
+
+  @Override
+  public MPPQueryContext getQueryContext() {
+    return context;
+  }
+
+  @Override
+  public IPlanner getPlanner() {
+    return null;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java
index 616cca3efb7..c53294b1a1d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/IPlanner.java
@@ -51,4 +51,6 @@ public interface IPlanner {
 
   void setRedirectInfo(
       IAnalysis analysis, TEndPoint localEndPoint, TSStatus tsstatus, 
TSStatusCode statusCode);
+
+  boolean isQueryStatement();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
index a819fa950de..2e0289c0781 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -42,6 +42,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -134,8 +135,6 @@ public class TreeModelPlanner implements IPlanner {
               stateMachine,
               distributedPlan.getInstances(),
               context.getQueryType(),
-              executor,
-              writeOperationExecutor,
               scheduledExecutor,
               syncInternalServiceClientManager,
               asyncInternalServiceClientManager);
@@ -201,4 +200,9 @@ public class TreeModelPlanner implements IPlanner {
       }
     }
   }
+
+  @Override
+  public boolean isQueryStatement() {
+    return statement instanceof QueryStatement;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java
index 8d25c7a1e85..978e2b0682d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analyzer.java
@@ -67,6 +67,7 @@ public class Analyzer {
   }
 
   public Analysis analyze(Statement statement) {
+    long startTime = System.nanoTime();
     Analysis analysis = new Analysis(statement, parameterLookup);
     Statement innerStatement =
         statement instanceof PipeEnriched
@@ -85,15 +86,15 @@ public class Analyzer {
       analysis.setDatabaseName(session.getDatabaseName().get());
     }
 
-    long startTime = System.nanoTime();
     StatementAnalyzer analyzer =
         statementAnalyzerFactory.createStatementAnalyzer(
             analysis, context, session, warningCollector, 
CorrelationSupport.ALLOWED);
 
     analyzer.analyze(statement);
     if (statement instanceof Query) {
-      QueryPlanCostMetricSet.getInstance()
-          .recordPlanCost(TABLE_TYPE, ANALYZER, System.nanoTime() - startTime);
+      long cost = System.nanoTime() - startTime;
+      QueryPlanCostMetricSet.getInstance().recordPlanCost(TABLE_TYPE, 
ANALYZER, cost);
+      context.setAnalyzeCost(cost);
     }
 
     // TODO access control
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java
index 2692690c82f..bf7af33c341 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableLogicalPlanner.java
@@ -124,8 +124,9 @@ public class TableLogicalPlanner {
     PlanNode planNode = planStatement(analysis, statement);
 
     if (statement instanceof Query) {
-      QueryPlanCostMetricSet.getInstance()
-          .recordPlanCost(TABLE_TYPE, LOGICAL_PLANNER, System.nanoTime() - 
startTime);
+      long cost = System.nanoTime() - startTime;
+      QueryPlanCostMetricSet.getInstance().recordPlanCost(TABLE_TYPE, 
LOGICAL_PLANNER, cost);
+      queryContext.setLogicalPlanCost(cost);
       startTime = System.nanoTime();
 
       for (PlanOptimizer optimizer : planOptimizers) {
@@ -142,8 +143,14 @@ public class TableLogicalPlanner {
                     warningCollector,
                     
PlanOptimizersStatsCollector.createPlanOptimizersStatsCollector()));
       }
-      QueryPlanCostMetricSet.getInstance()
-          .recordPlanCost(TABLE_TYPE, LOGICAL_PLAN_OPTIMIZE, System.nanoTime() 
- startTime);
+
+      cost =
+          System.nanoTime()
+              - startTime
+              - queryContext.getFetchSchemaCost()
+              - queryContext.getFetchPartitionCost();
+      QueryPlanCostMetricSet.getInstance().recordPlanCost(TABLE_TYPE, 
LOGICAL_PLAN_OPTIMIZE, cost);
+      queryContext.setLogicalOptimizationCost(cost);
     }
 
     return new LogicalQueryPlan(queryContext, planNode);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
index a7f0878041e..41d456d9ab5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/TableModelPlanner.java
@@ -41,6 +41,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.Pla
 import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.WrappedInsertStatement;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
@@ -167,8 +168,6 @@ public class TableModelPlanner implements IPlanner {
               stateMachine,
               distributedPlan.getInstances(),
               context.getQueryType(),
-              executor,
-              writeOperationExecutor,
               scheduledExecutor,
               syncInternalServiceClientManager,
               asyncInternalServiceClientManager);
@@ -228,5 +227,10 @@ public class TableModelPlanner implements IPlanner {
     }
   }
 
+  @Override
+  public boolean isQueryStatement() {
+    return statement instanceof Query;
+  }
+
   public static class NopAccessControl implements AccessControl {}
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
index a485f3814f4..592b93d87fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanner.java
@@ -104,8 +104,9 @@ public class TableDistributedPlanner {
     DistributedQueryPlan resultDistributedPlan = 
generateDistributedPlan(outputNodeWithExchange);
 
     if (analysis.getStatement() instanceof Query) {
-      QueryPlanCostMetricSet.getInstance()
-          .recordPlanCost(TABLE_TYPE, DISTRIBUTION_PLANNER, System.nanoTime() 
- startTime);
+      long cost = System.nanoTime() - startTime;
+      QueryPlanCostMetricSet.getInstance().recordPlanCost(TABLE_TYPE, 
DISTRIBUTION_PLANNER, cost);
+      mppQueryContext.setDistributionPlanCost(cost);
     }
     return resultDistributedPlan;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index f9d2d5edb1a..4baa98c6796 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -303,7 +303,7 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
 
       // no predicate, just scan all matched deviceEntries
       if (TRUE_LITERAL.equals(context.inheritedPredicate)) {
-        getDeviceEntriesWithDataPartitions(tableScanNode, 
Collections.emptyList(), null);
+        getDeviceEntriesWithDataPartitions(tableScanNode, 
Collections.emptyList());
         return tableScanNode;
       }
 
@@ -342,10 +342,7 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
       }
 
       // do index scan after expressionCanPushDown is processed
-      getDeviceEntriesWithDataPartitions(
-          tableScanNode,
-          splitExpression.getMetadataExpressions(),
-          splitExpression.getTimeColumnName());
+      getDeviceEntriesWithDataPartitions(tableScanNode, 
splitExpression.getMetadataExpressions());
 
       // exist expressions can not push down to scan operator
       if (!splitExpression.getExpressionsCannotPushDown().isEmpty()) {
@@ -413,7 +410,7 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
     }
 
     private void getDeviceEntriesWithDataPartitions(
-        TableScanNode tableScanNode, List<Expression> metadataExpressions, 
String timeColumnName) {
+        TableScanNode tableScanNode, List<Expression> metadataExpressions) {
 
       List<String> attributeColumns = new ArrayList<>();
       int attributeIndex = 0;
@@ -439,8 +436,9 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
               attributeColumns,
               queryContext);
       tableScanNode.setDeviceEntries(deviceEntries);
-      QueryPlanCostMetricSet.getInstance()
-          .recordPlanCost(TABLE_TYPE, SCHEMA_FETCHER, System.nanoTime() - 
startTime);
+      long cost = System.nanoTime() - startTime;
+      QueryPlanCostMetricSet.getInstance().recordPlanCost(TABLE_TYPE, 
SCHEMA_FETCHER, cost);
+      queryContext.setFetchSchemaCost(cost);
 
       if (deviceEntries.isEmpty()) {
         if (analysis.noAggregates()) {
@@ -478,8 +476,10 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
           analysis.upsertDataPartition(dataPartition);
         }
 
+        cost = System.nanoTime() - startTime;
         QueryPlanCostMetricSet.getInstance()
             .recordPlanCost(TABLE_TYPE, PARTITION_FETCHER, System.nanoTime() - 
startTime);
+        queryContext.setFetchPartitionCost(cost);
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
index 448f0829b5a..4835321aeda 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
@@ -37,7 +37,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 
@@ -71,8 +70,6 @@ public class ClusterScheduler implements IScheduler {
       QueryStateMachine stateMachine,
       List<FragmentInstance> instances,
       QueryType queryType,
-      ExecutorService executor,
-      ExecutorService writeOperationExecutor,
       ScheduledExecutorService scheduledExecutor,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
syncInternalServiceClientManager,
       IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
@@ -84,8 +81,6 @@ public class ClusterScheduler implements IScheduler {
         new FragmentInstanceDispatcherImpl(
             queryType,
             queryContext,
-            executor,
-            writeOperationExecutor,
             syncInternalServiceClientManager,
             asyncInternalServiceClientManager);
     if (queryType == QueryType.READ) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 6bdc5c20f64..0ce548351e5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -63,7 +63,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
@@ -77,8 +76,6 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
 
   private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
 
-  private final ExecutorService executor;
-  private final ExecutorService writeOperationExecutor;
   private final QueryType type;
   private final MPPQueryContext queryContext;
   private final String localhostIpAddr;
@@ -100,15 +97,11 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
   public FragmentInstanceDispatcherImpl(
       QueryType type,
       MPPQueryContext queryContext,
-      ExecutorService executor,
-      ExecutorService writeOperationExecutor,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
syncInternalServiceClientManager,
       IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
           asyncInternalServiceClientManager) {
     this.type = type;
     this.queryContext = queryContext;
-    this.executor = executor;
-    this.writeOperationExecutor = writeOperationExecutor;
     this.syncInternalServiceClientManager = syncInternalServiceClientManager;
     this.asyncInternalServiceClientManager = asyncInternalServiceClientManager;
     this.localhostIpAddr = 
IoTDBDescriptor.getInstance().getConfig().getInternalAddress();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FixedScheduledOutputQueryPlanStatistics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FixedScheduledOutputQueryPlanStatistics.java
new file mode 100644
index 00000000000..7188fc24a8b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/statistics/FixedScheduledOutputQueryPlanStatistics.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.statistics;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class FixedScheduledOutputQueryPlanStatistics {
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(FixedScheduledOutputQueryPlanStatistics.class);
+
+  AtomicLong analyzeCost = new AtomicLong(0);
+  AtomicLong fetchPartitionCost = new AtomicLong(0);
+  AtomicLong fetchSchemaCost = new AtomicLong(0);
+  AtomicLong logicalPlanCost = new AtomicLong(0);
+  AtomicLong logicalOptimizationCost = new AtomicLong(0);
+  AtomicLong distributionPlanCost = new AtomicLong(0);
+  AtomicLong dispatchCost = new AtomicLong(0);
+  AtomicLong num = new AtomicLong(0);
+
+  public FixedScheduledOutputQueryPlanStatistics() {
+    ScheduledExecutorService scheduledExecutor =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            "FixedScheduledOutputQueryPlanStatistics");
+    ScheduledExecutorUtil.safelyScheduleWithFixedDelay(
+        scheduledExecutor, this::output, 0, 60_000, TimeUnit.MILLISECONDS);
+  }
+
+  private synchronized void output() {
+    long count = num.get();
+    if (count == 0) {
+      return;
+    }
+
+    LOGGER.info(
+        "\r\n======ScheduledOutputQueryPlanStatistics, num: {}, avg 
analyzeCost: {}, "
+            + "avg fetchPartitionCost: {}, avg fetchSchemaCost: {}, avg 
logicalPlanCost: {}, "
+            + "avg logicalOptimizationCost: {}, avg distributionPlanCost: {}, 
avg dispatchCost: {}",
+        num.get(),
+        format(analyzeCost, count),
+        format(fetchPartitionCost, count),
+        format(fetchPartitionCost, count),
+        format(logicalPlanCost, count),
+        format(logicalOptimizationCost, count),
+        format(distributionPlanCost, count),
+        format(dispatchCost, count));
+    num.set(0);
+    analyzeCost.set(0);
+    fetchPartitionCost.set(0);
+    fetchSchemaCost.set(0);
+    logicalPlanCost.set(0);
+    logicalOptimizationCost.set(0);
+    distributionPlanCost.set(0);
+    dispatchCost.set(0);
+  }
+
+  private String format(AtomicLong time, long count) {
+    return time.get() / count + "ns";
+  }
+
+  public synchronized void recordCost(MPPQueryContext queryContext) {
+    num.incrementAndGet();
+
+    analyzeCost.getAndAdd(queryContext.getAnalyzeCost());
+    fetchPartitionCost.getAndAdd(queryContext.getFetchPartitionCost());
+    fetchSchemaCost.getAndAdd(queryContext.getFetchSchemaCost());
+    logicalPlanCost.getAndAdd(queryContext.getLogicalPlanCost());
+    
logicalOptimizationCost.getAndAdd(queryContext.getLogicalOptimizationCost());
+    distributionPlanCost.getAndAdd(queryContext.getDistributionPlanCost());
+    dispatchCost.getAndAdd(queryContext.getDispatchCost());
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
index 8f89b41b92a..be6cb2a5b88 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.NonAlignedFullPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
@@ -44,6 +45,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.source.ShowQueriesOper
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
+import org.apache.iotdb.db.queryengine.plan.planner.IPlanner;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
@@ -1838,6 +1840,16 @@ public class MergeTreeSortOperatorTest {
       return null;
     }
 
+    @Override
+    public MPPQueryContext getQueryContext() {
+      return null;
+    }
+
+    @Override
+    public IPlanner getPlanner() {
+      return null;
+    }
+
     @Override
     public void start() {}
 

Reply via email to