This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/write_opt_version1
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e84d20e81b33d5bb28951488bd5a95488b88afc3
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Jun 20 20:29:20 2022 +0800

    stable version but has getStatus() issue
---
 .../org/apache/iotdb/db/mpp/plan/Coordinator.java  |  48 ++++----
 .../org/apache/iotdb/db/mpp/plan/StepTracker.java  |  73 ++++++++++++
 .../db/mpp/plan/execution/QueryExecution.java      | 130 ++++++++++++---------
 .../planner/distribution/DistributionPlanner.java  |   3 -
 .../db/mpp/plan/scheduler/ClusterScheduler.java    |   4 +-
 .../db/mpp/plan/scheduler/StandaloneScheduler.java |   1 -
 6 files changed, 176 insertions(+), 83 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 111a9222a6..495af1f3e0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -109,29 +109,33 @@ public class Coordinator {
       String sql,
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher) {
-
-    QueryId globalQueryId = queryIdGenerator.createNextQueryId();
-    try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) {
-      if (sql != null) {
-        LOGGER.info("start executing sql: {}", sql);
-      }
-      IQueryExecution execution =
-          createQueryExecution(
-              statement,
-              new MPPQueryContext(
-                  sql,
-                  globalQueryId,
-                  session,
-                  DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT,
-                  DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT),
-              partitionFetcher,
-              schemaFetcher);
-      if (execution.isQuery()) {
-        queryExecutionMap.put(queryId, execution);
+    long startTime = System.nanoTime();
+    try {
+      QueryId globalQueryId = queryIdGenerator.createNextQueryId();
+      try (SetThreadName queryName = new SetThreadName(globalQueryId.getId())) 
{
+        //      if (sql != null) {
+        //        LOGGER.info("start executing sql: {}", sql);
+        //      }
+        IQueryExecution execution =
+            createQueryExecution(
+                statement,
+                new MPPQueryContext(
+                    sql,
+                    globalQueryId,
+                    session,
+                    DataNodeEndPoints.LOCAL_HOST_DATA_BLOCK_ENDPOINT,
+                    DataNodeEndPoints.LOCAL_HOST_INTERNAL_ENDPOINT),
+                partitionFetcher,
+                schemaFetcher);
+        if (execution.isQuery()) {
+          queryExecutionMap.put(queryId, execution);
+        }
+        execution.start();
+
+        return execution.getStatus();
       }
-      execution.start();
-
-      return execution.getStatus();
+    } finally {
+      StepTracker.trace("statementExec", startTime, System.nanoTime());
     }
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
new file mode 100644
index 0000000000..1fe7df7be0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/StepTracker.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class Metric {
+  private static final Logger logger = LoggerFactory.getLogger(Metric.class);
+  public String stepName;
+  public long invokeCount;
+  public long totalTime;
+  public long last100Time;
+
+  public Metric(String stepName) {
+    this.stepName = stepName;
+    this.invokeCount = 0;
+    this.totalTime = 0;
+    this.last100Time = 0;
+  }
+
+  public void trace(long startTime, long endTime) {
+    this.invokeCount++;
+    this.totalTime += (endTime - startTime);
+  }
+
+  public void tryPrint() {
+    if (invokeCount % 100 == 0) {
+      logger.info(
+          String.format(
+              "step metrics [%d]-[%s] - Total: %d, SUM: %.2fms, AVG: %fms, 
Last100AVG: %fms",
+              Thread.currentThread().getId(),
+              stepName,
+              invokeCount,
+              totalTime * 1.0 / 1000000,
+              totalTime * 1.0 / 1000000 / invokeCount,
+              (totalTime * 1.0 - last100Time) / 1000000 / 100));
+      last100Time = totalTime;
+    }
+  }
+}
+
+public class StepTracker {
+  private static final ThreadLocal<Map<String, Metric>> metrics = new 
ThreadLocal<>();
+
+  public static void trace(String stepName, long startTime, long endTime) {
+    if (metrics.get() == null) {
+      metrics.set(new HashMap<>());
+    }
+    metrics.get().computeIfAbsent(stepName, Metric::new).trace(startTime, 
endTime);
+    metrics.get().get(stepName).tryPrint();
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index 5f4ce1ae3f..a7a39564ef 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.mpp.execution.QueryState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockService;
 import org.apache.iotdb.db.mpp.execution.datatransfer.ISourceHandle;
+import org.apache.iotdb.db.mpp.plan.StepTracker;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
@@ -46,7 +47,6 @@ import 
org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
 import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler;
@@ -113,6 +113,8 @@ public class QueryExecution implements IQueryExecution {
   private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       internalServiceClientManager;
 
+  private long queryExecutionStartTime;
+
   public QueryExecution(
       Statement statement,
       MPPQueryContext context,
@@ -122,12 +124,15 @@ public class QueryExecution implements IQueryExecution {
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher,
       IClientManager<TEndPoint, SyncDataNodeInternalServiceClient> 
internalServiceClientManager) {
+    this.queryExecutionStartTime = System.nanoTime();
     this.executor = executor;
     this.writeOperationExecutor = writeOperationExecutor;
     this.scheduledExecutor = scheduledExecutor;
     this.context = context;
     this.planOptimizers = new ArrayList<>();
+    long startTime = System.nanoTime();
     this.analysis = analyze(statement, context, partitionFetcher, 
schemaFetcher);
+    StepTracker.trace("analyze", startTime, System.nanoTime());
     this.stateMachine = new QueryStateMachine(context.getQueryId(), executor);
     this.partitionFetcher = partitionFetcher;
     this.schemaFetcher = schemaFetcher;
@@ -189,52 +194,59 @@ public class QueryExecution implements IQueryExecution {
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher) {
     // initialize the variable `analysis`
-    logger.info("start to analyze query");
+    //    logger.info("start to analyze query");
     return new Analyzer(context, partitionFetcher, 
schemaFetcher).analyze(statement);
   }
 
   private void schedule() {
-    // TODO: (xingtanzjr) initialize the query scheduler according to 
configuration
-    this.scheduler =
-        config.isClusterMode()
-            ? new ClusterScheduler(
-                context,
-                stateMachine,
-                distributedPlan.getInstances(),
-                context.getQueryType(),
-                executor,
-                writeOperationExecutor,
-                scheduledExecutor,
-                internalServiceClientManager)
-            : new StandaloneScheduler(
-                context,
-                stateMachine,
-                distributedPlan.getInstances(),
-                context.getQueryType(),
-                executor,
-                scheduledExecutor,
-                internalServiceClientManager);
-    this.scheduler.start();
+    long startTime = System.nanoTime();
+    try {
+      // TODO: (xingtanzjr) initialize the query scheduler according to 
configuration
+      this.scheduler =
+          config.isClusterMode()
+              ? new ClusterScheduler(
+                  context,
+                  stateMachine,
+                  distributedPlan.getInstances(),
+                  context.getQueryType(),
+                  executor,
+                  writeOperationExecutor,
+                  scheduledExecutor,
+                  internalServiceClientManager)
+              : new StandaloneScheduler(
+                  context,
+                  stateMachine,
+                  distributedPlan.getInstances(),
+                  context.getQueryType(),
+                  executor,
+                  scheduledExecutor,
+                  internalServiceClientManager);
+      this.scheduler.start();
+    } finally {
+      StepTracker.trace("dispatch", startTime, System.nanoTime());
+    }
   }
 
   // Use LogicalPlanner to do the logical query plan and logical optimization
   public void doLogicalPlan() {
-    logger.info("do logical plan...");
-    LogicalPlanner planner = new LogicalPlanner(this.context, 
this.planOptimizers);
-    this.logicalPlan = planner.plan(this.analysis);
-    logger.info(
-        "logical plan is: \n {}", 
PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode()));
+    long startTime = System.nanoTime();
+    try {
+      LogicalPlanner planner = new LogicalPlanner(this.context, 
this.planOptimizers);
+      this.logicalPlan = planner.plan(this.analysis);
+    } finally {
+      StepTracker.trace("doLogicalPlan", startTime, System.nanoTime());
+    }
   }
 
   // Generate the distributed plan and split it into fragments
   public void doDistributedPlan() {
-    logger.info("do distribution plan...");
-    DistributionPlanner planner = new DistributionPlanner(this.analysis, 
this.logicalPlan);
-    this.distributedPlan = planner.planFragments();
-    logger.info(
-        "distribution plan done. Fragment instance count is {}, details is: \n 
{}",
-        distributedPlan.getInstances().size(),
-        printFragmentInstances(distributedPlan.getInstances()));
+    long startTime = System.nanoTime();
+    try {
+      DistributionPlanner planner = new DistributionPlanner(this.analysis, 
this.logicalPlan);
+      this.distributedPlan = planner.planFragments();
+    } finally {
+      StepTracker.trace("doDistributionPlan", startTime, System.nanoTime());
+    }
   }
 
   private String printFragmentInstances(List<FragmentInstance> instances) {
@@ -332,28 +344,36 @@ public class QueryExecution implements IQueryExecution {
    * @return ExecutionStatus. Contains the QueryId and the TSStatus.
    */
   public ExecutionResult getStatus() {
-    // Although we monitor the state to transition to RUNNING, the future will 
return if any
-    // Terminated state is triggered
-    SettableFuture<QueryState> future = SettableFuture.create();
-    stateMachine.addStateChangeListener(
-        state -> {
-          if (state == QueryState.RUNNING || state.isDone()) {
-            future.set(state);
-          }
-        });
-
+    long startTime = System.nanoTime();
     try {
-      QueryState state = future.get();
-      // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't 
FINISHED
-      return getExecutionResult(state);
-    } catch (InterruptedException | ExecutionException e) {
-      // TODO: (xingtanzjr) use more accurate error handling
-      if (e instanceof InterruptedException) {
-        Thread.currentThread().interrupt();
+      // Although we monitor the state to transition to RUNNING, the future 
will return if any
+      // Terminated state is triggered
+      try {
+        SettableFuture<QueryState> future = SettableFuture.create();
+        stateMachine.addStateChangeListener(
+            state -> {
+              if (state == QueryState.RUNNING || state.isDone()) {
+                future.set(state);
+              }
+            });
+        long startTimeGet = System.nanoTime();
+        QueryState state = future.get();
+        StepTracker.trace("get()", startTimeGet, System.nanoTime());
+        // TODO: (xingtanzjr) use more TSStatusCode if the QueryState isn't 
FINISHED
+        return getExecutionResult(state);
+      } catch (InterruptedException | ExecutionException e) {
+        // TODO: (xingtanzjr) use more accurate error handling
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
+        }
+        return new ExecutionResult(
+            context.getQueryId(),
+            RpcUtils.getStatus(
+                TSStatusCode.INTERNAL_SERVER_ERROR, 
stateMachine.getFailureMessage()));
       }
-      return new ExecutionResult(
-          context.getQueryId(),
-          RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
stateMachine.getFailureMessage()));
+    } finally {
+      StepTracker.trace("getStatus", startTime, System.nanoTime());
+      StepTracker.trace("QueryExecutionLifeCycle", 
this.queryExecutionStartTime, System.nanoTime());
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 421de275da..4dfe1b968f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -29,7 +29,6 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.WritePlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.sink.FragmentSinkNode;
@@ -67,9 +66,7 @@ public class DistributionPlanner {
 
   public DistributedQueryPlan planFragments() {
     PlanNode rootAfterRewrite = rewriteSource();
-    System.out.println(PlanNodeUtil.nodeToString(rootAfterRewrite));
     PlanNode rootWithExchange = addExchangeNode(rootAfterRewrite);
-    System.out.println(PlanNodeUtil.nodeToString(rootWithExchange));
     if (analysis.getStatement() instanceof QueryStatement) {
       analysis
           .getRespDatasetHeader()
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 9437c22536..8390ac6fe5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -86,7 +86,7 @@ public class ClusterScheduler implements IScheduler {
   @Override
   public void start() {
     stateMachine.transitionToDispatching();
-    logger.info("transit to DISPATCHING");
+    //    logger.info("transit to DISPATCHING");
     Future<FragInstanceDispatchResult> dispatchResultFuture = 
dispatcher.dispatch(instances);
 
     // NOTICE: the FragmentInstance may be dispatched to another Host due to 
consensus redirect.
@@ -116,7 +116,7 @@ public class ClusterScheduler implements IScheduler {
     // The FragmentInstances has been dispatched successfully to corresponding 
host, we mark the
     // QueryState to Running
     stateMachine.transitionToRunning();
-    logger.info("transit to RUNNING");
+    //    logger.info("transit to RUNNING");
     instances.forEach(
         instance -> {
           stateMachine.initialFragInstanceState(instance.getId(), 
FragmentInstanceState.RUNNING);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
index 89bacfb562..5433b07c7c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/StandaloneScheduler.java
@@ -98,7 +98,6 @@ public class StandaloneScheduler implements IScheduler {
   @Override
   public void start() {
     stateMachine.transitionToDispatching();
-    LOGGER.info("{} transit to DISPATCHING", getLogHeader());
     // For the FragmentInstance of WRITE, it will be executed directly when 
dispatching.
     // TODO: Other QueryTypes
     switch (queryType) {

Reply via email to