This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/write_opt
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/xingtanzjr/write_opt by this 
push:
     new a20bc0fe5a add stepTracker
a20bc0fe5a is described below

commit a20bc0fe5a39e6c1928162c45f53d69131c6d4a0
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Jun 20 15:48:55 2022 +0800

    add stepTracker
---
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |  6 +-
 .../org/apache/iotdb/db/mpp/plan/StepTracker.java  | 73 ++++++++++++++++++++
 .../db/mpp/plan/execution/QueryExecution.java      | 77 ++++++++++++----------
 .../planner/distribution/DistributionPlanner.java  |  3 -
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |  4 +-
 5 files changed, 120 insertions(+), 43 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 111a9222a6..610b586ceb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -112,9 +112,9 @@ public class Coordinator {
 
     QueryId globalQueryId = queryIdGenerator.createNextQueryId();
     try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) {
-      if (sql != null) {
-        LOGGER.info("start executing sql: {}", sql);
-      }
+      //      if (sql != null) {
+      //        LOGGER.info("start executing sql: {}", sql);
+      //      }
       IQueryExecution execution =
           createQueryExecution(
               statement,
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
new file mode 100644
index 0000000000..1fe7df7be0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class Metric {
+  private static final Logger logger = LoggerFactory.getLogger(Metric.class);
+  public String stepName;
+  public long invokeCount;
+  public long totalTime;
+  public long last100Time;
+
+  public Metric(String stepName) {
+    this.stepName = stepName;
+    this.invokeCount = 0;
+    this.totalTime = 0;
+    this.last100Time = 0;
+  }
+
+  public void trace(long startTime, long endTime) {
+    this.invokeCount++;
+    this.totalTime += (endTime - startTime);
+  }
+
+  public void tryPrint() {
+    if (invokeCount % 100 == 0) {
+      logger.info(
+          String.format(
+              "step metrics [%d]-[%s] - Total: %d, SUM: %.2fms, AVG: %fms, 
Last100AVG: %fms",
+              Thread.currentThread().getId(),
+              stepName,
+              invokeCount,
+              totalTime * 1.0 / 1000000,
+              totalTime * 1.0 / 1000000 / invokeCount,
+              (totalTime * 1.0 - last100Time) / 1000000 / 100));
+      last100Time = totalTime;
+    }
+  }
+}
+
+public class StepTracker {
+  private static final ThreadLocal<Map<String, Metric>> metrics = new 
ThreadLocal<>();
+
+  public static void trace(String stepName, long startTime, long endTime) {
+    if (metrics.get() == null) {
+      metrics.set(new HashMap<>());
+    }
+    metrics.get().computeIfAbsent(stepName, Metric::new).trace(startTime, 
endTime);
+    metrics.get().get(stepName).tryPrint();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 5f4ce1ae3f..48f018b19c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.execution.QueryState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockService;
 import org.apache.iotdb.db.mpp.execution.datatransfer.ISourceHandle;
+import org.apache.iotdb.db.mpp.plan.StepTracker;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
@@ -46,7 +47,6 @@ import 
org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
 import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler;
@@ -189,52 +189,59 @@ public class QueryExecution implements IQueryExecution {
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher) {
     // initialize the variable `analysis`
-    logger.info("start to analyze query");
+    //    logger.info("start to analyze query");
     return new Analyzer(context, partitionFetcher, 
schemaFetcher).analyze(statement);
   }
 
   private void schedule() {
-    // TODO: (xingtanzjr) initialize the query scheduler according to 
configuration
-    this.scheduler =
-        config.isClusterMode()
-            ? new ClusterScheduler(
-                context,
-                stateMachine,
-                distributedPlan.getInstances(),
-                context.getQueryType(),
-                executor,
-                writeOperationExecutor,
-                scheduledExecutor,
-                internalServiceClientManager)
-            : new StandaloneScheduler(
-                context,
-                stateMachine,
-                distributedPlan.getInstances(),
-                context.getQueryType(),
-                executor,
-                scheduledExecutor,
-                internalServiceClientManager);
-    this.scheduler.start();
+    long startTime = System.nanoTime();
+    try {
+      // TODO: (xingtanzjr) initialize the query scheduler according to 
configuration
+      this.scheduler =
+          config.isClusterMode()
+              ? new ClusterScheduler(
+                  context,
+                  stateMachine,
+                  distributedPlan.getInstances(),
+                  context.getQueryType(),
+                  executor,
+                  writeOperationExecutor,
+                  scheduledExecutor,
+                  internalServiceClientManager)
+              : new StandaloneScheduler(
+                  context,
+                  stateMachine,
+                  distributedPlan.getInstances(),
+                  context.getQueryType(),
+                  executor,
+                  scheduledExecutor,
+                  internalServiceClientManager);
+      this.scheduler.start();
+    } finally {
+      StepTracker.trace("dispatch", startTime, System.nanoTime());
+    }
   }
 
   // Use LogicalPlanner to do the logical query plan and logical optimization
   public void doLogicalPlan() {
-    logger.info("do logical plan...");
-    LogicalPlanner planner = new LogicalPlanner(this.context, 
this.planOptimizers);
-    this.logicalPlan = planner.plan(this.analysis);
-    logger.info(
-        "logical plan is: \n {}", 
PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode()));
+    long startTime = System.nanoTime();
+    try {
+      LogicalPlanner planner = new LogicalPlanner(this.context, 
this.planOptimizers);
+      this.logicalPlan = planner.plan(this.analysis);
+    } finally {
+      StepTracker.trace("doLogicalPlan", startTime, System.nanoTime());
+    }
   }
 
   // Generate the distributed plan and split it into fragments
   public void doDistributedPlan() {
-    logger.info("do distribution plan...");
-    DistributionPlanner planner = new DistributionPlanner(this.analysis, 
this.logicalPlan);
-    this.distributedPlan = planner.planFragments();
-    logger.info(
-        "distribution plan done. Fragment instance count is {}, details is: \n 
{}",
-        distributedPlan.getInstances().size(),
-        printFragmentInstances(distributedPlan.getInstances()));
+    long startTime = System.nanoTime();
+    try {
+      DistributionPlanner planner = new DistributionPlanner(this.analysis, 
this.logicalPlan);
+      this.distributedPlan = planner.planFragments();
+    } finally {
+      StepTracker.trace("doDistributionPlan", startTime, System.nanoTime());
+    }
   }
 
   private String printFragmentInstances(List<FragmentInstance> instances) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 421de275da..4dfe1b968f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -29,7 +29,6 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
@@ -67,9 +66,7 @@ public class DistributionPlanner {
 
   public DistributedQueryPlan planFragments() {
     PlanNode rootAfterRewrite = rewriteSource();
-    System.out.println(PlanNodeUtil.nodeToString(rootAfterRewrite));
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
-    System.out.println(PlanNodeUtil.nodeToString(rootWithExchange));
     if (analysis.getStatement() instanceof QueryStatement) {
       analysis
           .getRespDatasetHeader()
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 9437c22536..8390ac6fe5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -86,7 +86,7 @@ public class ClusterScheduler implements IScheduler {
   @Override
   public void start() {
     stateMachine.transitionToDispatching();
-    logger.info("transit to DISPATCHING");
+    //    logger.info("transit to DISPATCHING");
     Future<FragInstanceDispatchResult> dispatchResultFuture = 
dispatcher.dispatch(instances);
 
     // NOTICE: the FragmentInstance may be dispatched to another Host due to 
consensus redirect.
@@ -116,7 +116,7 @@ public class ClusterScheduler implements IScheduler {
     // The FragmentInstances has been dispatched successfully to corresponding 
host, we mark the
     // QueryState to Running
     stateMachine.transitionToRunning();
-    logger.info("transit to RUNNING");
+    //    logger.info("transit to RUNNING");
     instances.forEach(
         instance -> {
           stateMachine.initialFragInstanceState(instance.getId(), 
FragmentInstanceState.RUNNING);

Reply via email to