This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch xingtanzjr/query_log
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e2e19a7f057c53d7bb65bc1acba07c1f23b928ec
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Wed May 4 21:10:30 2022 +0800

    tmp saved
---
 .../apache/iotdb/db/mpp/plan/analyze/Analyzer.java | 11 ++++
 .../db/mpp/plan/analyze/ClusterSchemaFetcher.java  |  5 +-
 .../db/mpp/plan/execution/QueryExecution.java      | 35 ++++++++++---
 .../plan/node/metedata/read/SchemaFetchNode.java   |  4 ++
 .../node/metedata/read/SeriesSchemaMergeNode.java  |  4 ++
 .../db/mpp/plan/scheduler/ClusterScheduler.java    | 11 +++-
 .../scheduler/SimpleFragInstanceDispatcher.java    | 58 ++++++++++------------
 7 files changed, 88 insertions(+), 40 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
index 0b282f5df3..5e20cd729d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analyzer.java
@@ -68,6 +68,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.expression.IExpression;
 import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -80,6 +83,8 @@ import java.util.stream.Collectors;
 /** Analyze the statement and generate Analysis. */
 public class Analyzer {
 
+  private static final Logger logger = LoggerFactory.getLogger(Analyzer.class);
+
   private final MPPQueryContext context;
 
   private final IPartitionFetcher partitionFetcher;
@@ -96,6 +101,10 @@ public class Analyzer {
     return new AnalyzeVisitor().process(statement, context);
   }
 
+  private String getLogHeader() {
+    return String.format("Query[%s]:", context.getQueryId());
+  }
+
   /** This visitor is used to analyze each type of Statement and returns the 
{@link Analysis}. */
   private final class AnalyzeVisitor extends StatementVisitor<Analysis, 
MPPQueryContext> {
 
@@ -118,7 +127,9 @@ public class Analyzer {
             (QueryStatement) new ConcatPathRewriter().rewrite(queryStatement, 
patternTree);
 
         // request schema fetch API
+        logger.info("{} fetch query schema...", getLogHeader());
         SchemaTree schemaTree = schemaFetcher.fetchSchema(patternTree);
+        logger.info("{} fetch schema done", getLogHeader());
         // (xingtanzjr) If there is no leaf node in the schema tree, the query 
should be completed
         // immediately
         if (schemaTree.isEmpty()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
index 256b53779e..d0c86da11b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterSchemaFetcher.java
@@ -75,7 +75,10 @@ public class ClusterSchemaFetcher implements ISchemaFetcher {
         coordinator.execute(schemaFetchStatement, queryId, null, "", 
partitionFetcher, this);
     // TODO: (xingtanzjr) throw exception
     if (executionResult.status.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new RuntimeException("cannot fetch schema, status is: " + 
executionResult.status);
+      throw new RuntimeException(
+          String.format(
+              "cannot fetch schema, status is: %s, msg is: %s",
+              executionResult.status.getCode(), 
executionResult.status.getMessage()));
     }
     SchemaTree result = new SchemaTree();
     while (coordinator.getQueryExecution(queryId).hasNextResult()) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
index dce5dccc71..975b7d4e16 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/QueryExecution.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.mpp.execution.QueryState;
 import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
 import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockService;
 import org.apache.iotdb.db.mpp.execution.datatransfer.ISourceHandle;
-import org.apache.iotdb.db.mpp.plan.Coordinator;
 import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
@@ -40,6 +39,7 @@ import 
org.apache.iotdb.db.mpp.plan.planner.DistributionPlanner;
 import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanner;
 import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeUtil;
 import org.apache.iotdb.db.mpp.plan.scheduler.ClusterScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.IScheduler;
 import org.apache.iotdb.db.mpp.plan.scheduler.StandaloneScheduler;
@@ -70,7 +70,7 @@ import static 
com.google.common.base.Throwables.throwIfUnchecked;
  * corresponding physical nodes. 3. Collect and monitor the progress/states of 
this query.
  */
 public class QueryExecution implements IQueryExecution {
-  private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(QueryExecution.class);
 
   private static IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
 
@@ -136,6 +136,9 @@ public class QueryExecution implements IQueryExecution {
 
   public void start() {
     if (skipExecute()) {
+      logger.info(
+          "{} execution of query will be skipped. Transit to FINISHED 
immediately.",
+          getLogHeader());
       stateMachine.transitionToFinished();
       return;
     }
@@ -152,12 +155,13 @@ public class QueryExecution implements IQueryExecution {
   }
 
   // Analyze the statement in QueryContext. Generate the analysis this query 
need
-  private static Analysis analyze(
+  private Analysis analyze(
       Statement statement,
       MPPQueryContext context,
       IPartitionFetcher partitionFetcher,
       ISchemaFetcher schemaFetcher) {
     // initialize the variable `analysis`
+    logger.info("{} start to analyze query", getLogHeader());
     return new Analyzer(context, partitionFetcher, 
schemaFetcher).analyze(statement);
   }
 
@@ -186,14 +190,25 @@ public class QueryExecution implements IQueryExecution {
 
   // Use LogicalPlanner to do the logical query plan and logical optimization
   public void doLogicalPlan() {
+    logger.info("{} do logical plan...", getLogHeader());
     LogicalPlanner planner = new LogicalPlanner(this.context, 
this.planOptimizers);
     this.logicalPlan = planner.plan(this.analysis);
+    logger.info(
+        "{} logical plan is: \n {}",
+        getLogHeader(),
+        PlanNodeUtil.nodeToString(this.logicalPlan.getRootNode()));
   }
 
   // Generate the distributed plan and split it into fragments
   public void doDistributedPlan() {
+    logger.info("{} do distribution plan...", getLogHeader());
     DistributionPlanner planner = new DistributionPlanner(this.analysis, 
this.logicalPlan);
     this.distributedPlan = planner.planFragments();
+    logger.info(
+        "{} distribution plan done. Fragment instance count is {}, details is: 
\n {}",
+        getLogHeader(),
+        distributedPlan.getInstances().size(),
+        distributedPlan.getInstances());
   }
 
   // Stop the workers for this query
@@ -294,12 +309,16 @@ public class QueryExecution implements IQueryExecution {
           state == QueryState.FINISHED || state == QueryState.RUNNING
               ? TSStatusCode.SUCCESS_STATUS
               : TSStatusCode.QUERY_PROCESS_ERROR;
-      return new ExecutionResult(context.getQueryId(), 
RpcUtils.getStatus(statusCode));
+      return new ExecutionResult(
+          context.getQueryId(), RpcUtils.getStatus(statusCode, 
stateMachine.getFailureMessage()));
     } catch (InterruptedException | ExecutionException e) {
       // TODO: (xingtanzjr) use more accurate error handling
-      Thread.currentThread().interrupt();
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
       return new ExecutionResult(
-          context.getQueryId(), 
RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+          context.getQueryId(),
+          RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, 
stateMachine.getFailureMessage()));
     }
   }
 
@@ -333,4 +352,8 @@ public class QueryExecution implements IQueryExecution {
   public String toString() {
     return String.format("QueryExecution[%s]", context.getQueryId());
   }
+
+  private String getLogHeader() {
+    return String.format("Query[%s]:", context.getQueryId());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchNode.java
index a43058adc4..fa62745926 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SchemaFetchNode.java
@@ -75,4 +75,8 @@ public class SchemaFetchNode extends SchemaScanNode {
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitSchemaFetch(this, context);
   }
+
+  public String toString() {
+    return String.format("SchemaFetchNode-%s", getPlanNodeId());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java
index a7c4f4052e..c501dc0384 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/metedata/read/SeriesSchemaMergeNode.java
@@ -57,4 +57,8 @@ public class SeriesSchemaMergeNode extends 
AbstractSchemaMergeNode {
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitSchemaMerge(this, context);
   }
+
+  public String toString() {
+    return String.format("SchemaMergeNode-%s", getPlanNodeId());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
index 48c4268034..db4bcfdb84 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/ClusterScheduler.java
@@ -48,7 +48,7 @@ import java.util.concurrent.ScheduledExecutorService;
  * this scheduler.
  */
 public class ClusterScheduler implements IScheduler {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterScheduler.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(ClusterScheduler.class);
 
   private MPPQueryContext queryContext;
   // The stateMachine of the QueryExecution owned by this QueryScheduler
@@ -90,6 +90,7 @@ public class ClusterScheduler implements IScheduler {
   @Override
   public void start() {
     stateMachine.transitionToDispatching();
+    logger.info("{} transit to DISPATCHING", getLogHeader());
     Future<FragInstanceDispatchResult> dispatchResultFuture = 
dispatcher.dispatch(instances);
 
     // NOTICE: the FragmentInstance may be dispatched to another Host due to 
consensus redirect.
@@ -102,7 +103,9 @@ public class ClusterScheduler implements IScheduler {
       }
     } catch (InterruptedException | ExecutionException e) {
       // If the dispatch failed, we make the QueryState as failed, and return.
-      Thread.currentThread().interrupt();
+      if (e instanceof InterruptedException) {
+        Thread.currentThread().interrupt();
+      }
       stateMachine.transitionToFailed(e);
       return;
     }
@@ -156,4 +159,8 @@ public class ClusterScheduler implements IScheduler {
 
   // After sending, start to collect the states of these fragment instances
   private void startMonitorInstances() {}
+
+  private String getLogHeader() {
+    return String.format("Query[%s]", queryContext.getQueryId());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
index fec10957bf..4c64c3b7b6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleFragInstanceDispatcher.java
@@ -19,20 +19,14 @@
 
 package org.apache.iotdb.db.mpp.plan.scheduler;
 
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
-import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceReq;
-import org.apache.iotdb.mpp.rpc.thrift.TSendFragmentInstanceResp;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -55,31 +49,33 @@ public class SimpleFragInstanceDispatcher implements 
IFragInstanceDispatcher {
   public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> 
instances) {
     return executor.submit(
         () -> {
-          TSendFragmentInstanceResp resp = new 
TSendFragmentInstanceResp(false);
-          for (FragmentInstance instance : instances) {
-            TEndPoint endPoint = 
instance.getHostDataNode().getInternalEndPoint();
-            // TODO: (jackie tien) change the port
-            try (SyncDataNodeInternalServiceClient client =
-                internalServiceClientManager.borrowClient(endPoint)) {
-              // TODO: (xingtanzjr) consider how to handle the buffer here
-              ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
-              instance.serializeRequest(buffer);
-              buffer.flip();
-              TConsensusGroupId groupId = 
instance.getRegionReplicaSet().getRegionId();
-              TSendFragmentInstanceReq req =
-                  new TSendFragmentInstanceReq(
-                      new TFragmentInstance(buffer), groupId, 
instance.getType().toString());
-              resp = client.sendFragmentInstance(req);
-            } catch (IOException e) {
-              LOGGER.error("can't connect to node {}", endPoint, e);
-              throw e;
-            }
-
-            if (!resp.accepted) {
-              break;
-            }
-          }
-          return new FragInstanceDispatchResult(resp.accepted);
+          throw new RuntimeException("Dispatch Error");
+          //          TSendFragmentInstanceResp resp = new 
TSendFragmentInstanceResp(false);
+          //          for (FragmentInstance instance : instances) {
+          //            TEndPoint endPoint = 
instance.getHostDataNode().getInternalEndPoint();
+          //            // TODO: (jackie tien) change the port
+          //            try (SyncDataNodeInternalServiceClient client =
+          //                
internalServiceClientManager.borrowClient(endPoint)) {
+          //              // TODO: (xingtanzjr) consider how to handle the 
buffer here
+          //              ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
+          //              instance.serializeRequest(buffer);
+          //              buffer.flip();
+          //              TConsensusGroupId groupId = 
instance.getRegionReplicaSet().getRegionId();
+          //              TSendFragmentInstanceReq req =
+          //                  new TSendFragmentInstanceReq(
+          //                      new TFragmentInstance(buffer), groupId,
+          // instance.getType().toString());
+          //              resp = client.sendFragmentInstance(req);
+          //            } catch (IOException e) {
+          //              LOGGER.error("can't connect to node {}", endPoint, 
e);
+          //              throw e;
+          //            }
+          //
+          //            if (!resp.accepted) {
+          //              break;
+          //            }
+          //          }
+          //          return new FragInstanceDispatchResult(resp.accepted);
         });
   }
 

Reply via email to