This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch ty/TableModelGrammar
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/TableModelGrammar by this 
push:
     new 4e3e0fdb240 add temp index scan, prune column, distribute query planner
4e3e0fdb240 is described below

commit 4e3e0fdb24017f2c566095243b3f283b3b05d7bd
Author: Beyyes <[email protected]>
AuthorDate: Wed Apr 17 18:16:19 2024 +0800

    add temp index scan, prune column, distribute query planner
---
 .../planner/distribution/DistributionPlanner.java  |   3 +
 .../SimpleFragmentParallelPlanner.java             |   4 +-
 .../plan/relational/planner/LogicalPlanner.java    |  11 +-
 .../plan/relational/planner/RelationPlanner.java   |  22 +--
 .../relational/planner/RelationalModelPlanner.java |   4 +-
 .../ExchangeNodeGenerator.java}                    |  20 ++-
 .../distribute/FragmentInstanceGenerator.java}     | 179 +++++++--------------
 .../distribute/RelationalDistributionPlanner.java  |  31 +++-
 .../planner/distribute/SimplePlanRewriter.java     |  44 +++++
 .../planner/distribute/SubPlanGenerator.java       |  77 +++++++++
 .../relational/planner/node/TableScanNode.java     |  16 +-
 .../planner/optimizations/IndexScan.java           |  93 ++++++++++-
 ...Expressions.java => PruneTableScanColumns.java} |  53 ++++--
 .../optimizations/RelationalPlanOptimizer.java     |   9 +-
 .../RemoveRedundantIdentityProjections.java        |  12 +-
 .../planner/optimizations/SimplifyExpressions.java |   9 +-
 .../plan/relational/analyzer/AnalyzerTest.java     |  10 +-
 17 files changed, 434 insertions(+), 163 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
index 755119b2607..42920df7561 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
@@ -206,14 +206,17 @@ public class DistributionPlanner {
           .getRespDatasetHeader()
           
.setColumnToTsBlockIndexMap(optimizedRootWithExchange.getOutputColumnNames());
     }
+
     SubPlan subPlan = splitFragment(optimizedRootWithExchange);
     // Mark the root Fragment of root SubPlan as `root`
     subPlan.getPlanFragment().setRoot(true);
+
     List<FragmentInstance> fragmentInstances = planFragmentInstances(subPlan);
     // Only execute this step for READ operation
     if (context.getQueryType() == QueryType.READ) {
       setSinkForRootInstance(subPlan, fragmentInstances);
     }
+
     return new DistributedQueryPlan(
         logicalPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), 
fragmentInstances);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 1394ba96503..a7b475a4e3c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -214,12 +214,12 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
     if (availableDataNodes.isEmpty()) {
       String errorMsg =
           String.format(
-              "all replicas for region[%s] are not available in these 
DataNodes[%s]",
+              "All replicas for region[%s] are not available in these 
DataNodes[%s]",
               regionReplicaSet.getRegionId(), 
regionReplicaSet.getDataNodeLocations());
       throw new IllegalArgumentException(errorMsg);
     }
     if (regionReplicaSet.getDataNodeLocationsSize() != 
availableDataNodes.size()) {
-      logger.info("available replicas: " + availableDataNodes);
+      logger.info("available replicas: {}", availableDataNodes);
     }
     int targetIndex;
     if (!selectRandomDataNode || queryContext.getSession() == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
index 5e54848763f..843517be036 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/LogicalPlanner.java
@@ -24,6 +24,8 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.RelationType;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.IndexScan;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PruneTableScanColumns;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.RelationalPlanOptimizer;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.RemoveRedundantIdentityProjections;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.SimplifyExpressions;
@@ -60,13 +62,18 @@ public class LogicalPlanner {
     this.warningCollector = requireNonNull(warningCollector, "warningCollector 
is null");
 
     this.relationalPlanOptimizers =
-        Arrays.asList(new SimplifyExpressions(), new 
RemoveRedundantIdentityProjections());
+        Arrays.asList(
+            new SimplifyExpressions(),
+            new RemoveRedundantIdentityProjections(),
+            new PruneTableScanColumns(),
+            new IndexScan());
   }
 
   public LogicalQueryPlan plan(Analysis analysis) throws IoTDBException {
     PlanNode planNode = planStatement(analysis, analysis.getStatement());
 
-    relationalPlanOptimizers.forEach(optimizer -> optimizer.optimize(planNode, 
analysis, context));
+    relationalPlanOptimizers.forEach(
+        optimizer -> optimizer.optimize(planNode, analysis, metadata, 
sessionInfo, context));
 
     return new LogicalQueryPlan(context, planNode);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index 584b9c1f80b..d8526986a92 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -49,39 +49,40 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
   private final Analysis analysis;
   private final SymbolAllocator symbolAllocator;
   private final QueryId idAllocator;
-  private final SessionInfo session;
+  private final SessionInfo sessionInfo;
   private final Map<NodeRef<Node>, RelationPlan> recursiveSubqueries;
 
   public RelationPlanner(
       Analysis analysis,
       SymbolAllocator symbolAllocator,
       QueryId idAllocator,
-      SessionInfo session,
+      SessionInfo sessionInfo,
       Map<NodeRef<Node>, RelationPlan> recursiveSubqueries) {
     requireNonNull(analysis, "analysis is null");
     requireNonNull(symbolAllocator, "symbolAllocator is null");
     requireNonNull(idAllocator, "idAllocator is null");
-    requireNonNull(session, "session is null");
+    requireNonNull(sessionInfo, "session is null");
     requireNonNull(recursiveSubqueries, "recursiveSubqueries is null");
 
     this.analysis = analysis;
     this.symbolAllocator = symbolAllocator;
     this.idAllocator = idAllocator;
-    this.session = session;
+    this.sessionInfo = sessionInfo;
     this.recursiveSubqueries = recursiveSubqueries;
   }
 
   @Override
   protected RelationPlan visitQuery(Query node, Void context) {
-    return new QueryPlanner(analysis, symbolAllocator, idAllocator, session, 
recursiveSubqueries)
+    return new QueryPlanner(
+            analysis, symbolAllocator, idAllocator, sessionInfo, 
recursiveSubqueries)
         .plan(node);
   }
 
   @Override
-  protected RelationPlan visitTable(Table node, Void context) {
+  protected RelationPlan visitTable(Table table, Void context) {
     // is this a recursive reference in expandable named query? If so, there's 
base relation already
     // planned.
-    RelationPlan expansion = recursiveSubqueries.get(NodeRef.of(node));
+    RelationPlan expansion = recursiveSubqueries.get(NodeRef.of(table));
     if (expansion != null) {
       // put the pre-planned recursive subquery in the actual outer context to 
enable resolving
       // correlation
@@ -89,7 +90,7 @@ public class RelationPlanner extends AstVisitor<RelationPlan, 
Void> {
           expansion.getRoot(), expansion.getScope(), 
expansion.getFieldMappings());
     }
 
-    Scope scope = analysis.getScope(node);
+    Scope scope = analysis.getScope(table);
     ImmutableList.Builder<Symbol> outputSymbolsBuilder = 
ImmutableList.builder();
     ImmutableMap.Builder<Symbol, ColumnSchema> symbolToColumnSchema = 
ImmutableMap.builder();
     Collection<Field> fields = scope.getRelationType().getAllFields();
@@ -106,7 +107,7 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
     PlanNode root =
         new TableScanNode(
             idAllocator.genPlanNodeId(),
-            node.getName().toString(),
+            table.getName().toString(),
             outputSymbols,
             symbolToColumnSchema.build());
 
@@ -121,7 +122,8 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
 
   @Override
   protected RelationPlan visitQuerySpecification(QuerySpecification node, Void 
context) {
-    return new QueryPlanner(analysis, symbolAllocator, idAllocator, session, 
recursiveSubqueries)
+    return new QueryPlanner(
+            analysis, symbolAllocator, idAllocator, sessionInfo, 
recursiveSubqueries)
         .plan(node);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
index 713be1450bd..b9a6604e708 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationalModelPlanner.java
@@ -117,7 +117,9 @@ public class RelationalModelPlanner implements IPlanner {
 
   @Override
   public DistributedQueryPlan doDistributionPlan(IAnalysis analysis, 
LogicalQueryPlan logicalPlan) {
-    return new RelationalDistributionPlanner((Analysis) analysis, 
logicalPlan).planFragments();
+    return new RelationalDistributionPlanner(
+            (Analysis) analysis, logicalPlan, logicalPlan.getContext())
+        .plan();
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/ExchangeNodeGenerator.java
similarity index 56%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/ExchangeNodeGenerator.java
index 21a54da4e73..d11f3a1ee82 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/ExchangeNodeGenerator.java
@@ -11,17 +11,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 
-package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
-
-import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+
+import java.util.Collections;
+import java.util.List;
 
-public class IndexScan implements RelationalPlanOptimizer {
+public class ExchangeNodeGenerator
+    extends SimplePlanRewriter<ExchangeNodeGenerator.DistributionPlanContext> {
 
   @Override
-  public PlanNode optimize(PlanNode planNode, Analysis analysis, 
MPPQueryContext context) {
-    return null;
+  public List<PlanNode> visitTableScan(
+      TableScanNode node, ExchangeNodeGenerator.DistributionPlanContext 
context) {
+    // TODO process that the data of TableScanNode locates in multi data 
regions
+    return Collections.singletonList(node);
   }
+
+  public static class DistributionPlanContext {}
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/FragmentInstanceGenerator.java
similarity index 54%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/FragmentInstanceGenerator.java
index 1394ba96503..e15e8ec00ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/FragmentInstanceGenerator.java
@@ -1,22 +1,17 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-package org.apache.iotdb.db.queryengine.plan.planner.distribution;
+package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -27,9 +22,9 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
-import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
-import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
@@ -38,12 +33,8 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.LastSeriesSourceNode;
-import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesStatement;
-import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement;
-import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
-import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.relational.sql.tree.Query;
 import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.slf4j.Logger;
@@ -54,106 +45,75 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * A simple implementation of IFragmentParallelPlaner. This planner will 
transform one PlanFragment
- * into only one FragmentInstance.
- */
-public class SimpleFragmentParallelPlanner implements IFragmentParallelPlaner {
-  private static final Logger logger = 
LoggerFactory.getLogger(SimpleFragmentParallelPlanner.class);
+public class FragmentInstanceGenerator {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FragmentInstanceGenerator.class);
 
   private final SubPlan subPlan;
+
   private final Analysis analysis;
+
+  private final List<FragmentInstance> fragmentInstanceList = new 
ArrayList<>();
+
   private final MPPQueryContext queryContext;
 
   // Record all the FragmentInstances belonged to same PlanFragment
-  private final Map<PlanFragmentId, FragmentInstance> instanceMap;
+  private final Map<PlanFragmentId, FragmentInstance> instanceMap = new 
HashMap<>();
+
   // Record which PlanFragment the PlanNode belongs
-  private final Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap;
-  private final List<FragmentInstance> fragmentInstanceList;
+  private final Map<PlanNodeId, Pair<PlanFragmentId, PlanNode>> planNodeMap = 
new HashMap<>();
 
   // Record FragmentInstances dispatched to same DataNode
-  private final Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap;
+  private final Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap = 
new HashMap<>();
 
-  public SimpleFragmentParallelPlanner(
-      SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
+  FragmentInstanceGenerator(SubPlan subPlan, Analysis analysis, 
MPPQueryContext queryContext) {
     this.subPlan = subPlan;
     this.analysis = analysis;
-    this.queryContext = context;
-    this.instanceMap = new HashMap<>();
-    this.planNodeMap = new HashMap<>();
-    this.fragmentInstanceList = new ArrayList<>();
-    this.dataNodeFIMap = new HashMap<>();
+    this.queryContext = queryContext;
   }
 
-  @Override
-  public List<FragmentInstance> parallelPlan() {
+  public List<FragmentInstance> plan() {
     prepare();
     calculateNodeTopologyBetweenInstance();
     return fragmentInstanceList;
   }
 
   private void prepare() {
-    List<PlanFragment> fragments = subPlan.getPlanFragmentList();
-    for (PlanFragment fragment : fragments) {
+    for (PlanFragment fragment : subPlan.getPlanFragmentList()) {
       recordPlanNodeRelation(fragment.getPlanNodeTree(), fragment.getId());
       produceFragmentInstance(fragment);
     }
-    fragmentInstanceList.forEach(
-        fragmentInstance ->
-            fragmentInstance.setDataNodeFINum(
-                dataNodeFIMap.get(fragmentInstance.getHostDataNode()).size()));
 
-    // compute dataNodeSeriesScanNum in LastQueryScanNode
-    if (analysis.getStatement() instanceof QueryStatement
-        && ((QueryStatement) analysis.getStatement()).isLastQuery()) {
-      final Map<Path, AtomicInteger> pathSumMap = new HashMap<>();
-      dataNodeFIMap
-          .values()
-          .forEach(
-              fragmentInstances -> {
-                fragmentInstances.forEach(
-                    fragmentInstance ->
-                        updateScanNum(
-                            fragmentInstance.getFragment().getPlanNodeTree(), 
pathSumMap));
-                pathSumMap.clear();
-              });
-    }
+    fragmentInstanceList.forEach(
+        fi -> 
fi.setDataNodeFINum(dataNodeFIMap.get(fi.getHostDataNode()).size()));
   }
 
-  private void updateScanNum(PlanNode planNode, Map<Path, AtomicInteger> 
pathSumMap) {
-    if (planNode instanceof LastSeriesSourceNode) {
-      LastSeriesSourceNode lastSeriesSourceNode = (LastSeriesSourceNode) 
planNode;
-      pathSumMap.merge(
-          lastSeriesSourceNode.getSeriesPath(),
-          lastSeriesSourceNode.getDataNodeSeriesScanNum(),
-          (k, v) -> {
-            v.incrementAndGet();
-            return v;
-          });
-    }
-    planNode.getChildren().forEach(node -> updateScanNum(node, pathSumMap));
+  private void recordPlanNodeRelation(PlanNode root, PlanFragmentId 
planFragmentId) {
+    planNodeMap.put(root.getPlanNodeId(), new Pair<>(planFragmentId, root));
+    root.getChildren().forEach(child -> recordPlanNodeRelation(child, 
planFragmentId));
   }
 
   private void produceFragmentInstance(PlanFragment fragment) {
-    Expression globalTimePredicate = analysis.getGlobalTimePredicate();
+    // TODO fix globalTimePredicate
+    // Expression globalTimePredicate = analysis.getGlobalTimePredicate();
+    Expression globalTimePredicate = null;
     FragmentInstance fragmentInstance =
         new FragmentInstance(
             fragment,
             fragment.getId().genFragmentInstanceId(),
             globalTimePredicate == null ? null : new 
TreeModelTimePredicate(globalTimePredicate),
-            queryContext.getQueryType(),
+            QueryType.READ,
             queryContext.getTimeOut(),
             queryContext.getSession(),
-            queryContext.isExplainAnalyze(),
+            false,
             fragment.isRoot());
 
     // Get the target region for origin PlanFragment, then its instance will 
be distributed one
     // of them.
     TRegionReplicaSet regionReplicaSet = fragment.getTargetRegion();
 
-    // Set ExecutorType and target host for the instance
+    // Set ExecutorType and target host for the instance,
     // We need to store all the replica host in case of the scenario that the 
instance need to be
     // redirected
     // to another host when scheduling
@@ -183,11 +143,7 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
           return v;
         });
 
-    if (analysis.getStatement() instanceof QueryStatement
-        || analysis.getStatement() instanceof ExplainAnalyzeStatement
-        || analysis.getStatement() instanceof ShowQueriesStatement
-        || (analysis.getStatement() instanceof ShowTimeSeriesStatement
-            && ((ShowTimeSeriesStatement) 
analysis.getStatement()).isOrderByHeat())) {
+    if (analysis.getStatement() instanceof Query) {
       
fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
     }
     instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
@@ -199,12 +155,10 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
         || regionReplicaSet.getDataNodeLocations() == null
         || regionReplicaSet.getDataNodeLocations().isEmpty()) {
       throw new IllegalArgumentException(
-          String.format("regionReplicaSet is invalid: %s", regionReplicaSet));
+          String.format("RegionReplicaSet is invalid: %s", regionReplicaSet));
     }
     String readConsistencyLevel =
         IoTDBDescriptor.getInstance().getConfig().getReadConsistencyLevel();
-    // TODO: (Chen Rongzhao) need to make the values of ReadConsistencyLevel 
as static variable or
-    // enums
     boolean selectRandomDataNode = "weak".equals(readConsistencyLevel);
 
     // When planning fragment onto specific DataNode, the DataNode whose 
endPoint is in
@@ -214,12 +168,12 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
     if (availableDataNodes.isEmpty()) {
       String errorMsg =
           String.format(
-              "all replicas for region[%s] are not available in these 
DataNodes[%s]",
+              "All replicas for region[%s] are not available in these 
DataNodes[%s]",
               regionReplicaSet.getRegionId(), 
regionReplicaSet.getDataNodeLocations());
       throw new IllegalArgumentException(errorMsg);
     }
     if (regionReplicaSet.getDataNodeLocationsSize() != 
availableDataNodes.size()) {
-      logger.info("available replicas: " + availableDataNodes);
+      LOGGER.info("Available replicas: {}", availableDataNodes);
     }
     int targetIndex;
     if (!selectRandomDataNode || queryContext.getSession() == null) {
@@ -255,27 +209,25 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
       PlanNode rootNode = instance.getFragment().getPlanNodeTree();
       if (rootNode instanceof MultiChildrenSinkNode) {
         MultiChildrenSinkNode sinkNode = (MultiChildrenSinkNode) rootNode;
-        sinkNode
-            .getDownStreamChannelLocationList()
-            .forEach(
-                downStreamChannelLocation -> {
-                  // Set target Endpoint for FragmentSinkNode
-                  PlanNodeId downStreamNodeId =
-                      new 
PlanNodeId(downStreamChannelLocation.getRemotePlanNodeId());
-                  FragmentInstance downStreamInstance = 
findDownStreamInstance(downStreamNodeId);
-                  downStreamChannelLocation.setRemoteEndpoint(
-                      
downStreamInstance.getHostDataNode().getMPPDataExchangeEndPoint());
-                  downStreamChannelLocation.setRemoteFragmentInstanceId(
-                      downStreamInstance.getId().toThrift());
-
-                  // Set upstream info for corresponding ExchangeNode in 
downstream FragmentInstance
-                  PlanNode downStreamExchangeNode = 
planNodeMap.get(downStreamNodeId).right;
-                  ((ExchangeNode) downStreamExchangeNode)
-                      .setUpstream(
-                          
instance.getHostDataNode().getMPPDataExchangeEndPoint(),
-                          instance.getId(),
-                          sinkNode.getPlanNodeId());
-                });
+        for (DownStreamChannelLocation downStreamChannelLocation :
+            sinkNode.getDownStreamChannelLocationList()) {
+          // Set target Endpoint for FragmentSinkNode
+          PlanNodeId downStreamNodeId =
+              new PlanNodeId(downStreamChannelLocation.getRemotePlanNodeId());
+          FragmentInstance downStreamInstance = 
findDownStreamInstance(downStreamNodeId);
+          downStreamChannelLocation.setRemoteEndpoint(
+              
downStreamInstance.getHostDataNode().getMPPDataExchangeEndPoint());
+          downStreamChannelLocation.setRemoteFragmentInstanceId(
+              downStreamInstance.getId().toThrift());
+
+          // Set upstream info for corresponding ExchangeNode in downstream 
FragmentInstance
+          PlanNode downStreamExchangeNode = 
planNodeMap.get(downStreamNodeId).right;
+          ((ExchangeNode) downStreamExchangeNode)
+              .setUpstream(
+                  instance.getHostDataNode().getMPPDataExchangeEndPoint(),
+                  instance.getId(),
+                  sinkNode.getPlanNodeId());
+        }
       }
     }
   }
@@ -283,11 +235,4 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
   private FragmentInstance findDownStreamInstance(PlanNodeId exchangeNodeId) {
     return instanceMap.get(planNodeMap.get(exchangeNodeId).left);
   }
-
-  private void recordPlanNodeRelation(PlanNode root, PlanFragmentId 
planFragmentId) {
-    planNodeMap.put(root.getPlanNodeId(), new Pair<>(planFragmentId, root));
-    for (PlanNode child : root.getChildren()) {
-      recordPlanNodeRelation(child, planFragmentId);
-    }
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
index de946fb8c33..64d2aa70599 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
@@ -15,21 +15,44 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 
+import java.util.List;
+
 public class RelationalDistributionPlanner {
   private final Analysis analysis;
   private final LogicalQueryPlan logicalQueryPlan;
   private final MPPQueryContext context;
 
-  public RelationalDistributionPlanner(Analysis analysis, LogicalQueryPlan 
logicalQueryPlan) {
+  public RelationalDistributionPlanner(
+      Analysis analysis, LogicalQueryPlan logicalQueryPlan, MPPQueryContext 
context) {
     this.analysis = analysis;
     this.logicalQueryPlan = logicalQueryPlan;
-    this.context = null;
+    this.context = context;
   }
 
-  public DistributedQueryPlan planFragments() {
-    return null;
+  public DistributedQueryPlan plan() {
+    List<PlanNode> distributedPlanNodeResult =
+        new ExchangeNodeGenerator()
+            .visitPlan(
+                logicalQueryPlan.getRootNode(),
+                new ExchangeNodeGenerator.DistributionPlanContext());
+
+    if (distributedPlanNodeResult.size() != 1) {
+      throw new IllegalStateException("root node must return only one");
+    }
+
+    SubPlan subPlan = new SubPlanGenerator().splitToSubPlan(logicalQueryPlan);
+    subPlan.getPlanFragment().setRoot(true);
+
+    List<FragmentInstance> fragmentInstances =
+        new FragmentInstanceGenerator(subPlan, analysis, context).plan();
+
+    return new DistributedQueryPlan(
+        logicalQueryPlan.getContext(), subPlan, subPlan.getPlanFragmentList(), 
fragmentInstances);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SimplePlanRewriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SimplePlanRewriter.java
new file mode 100644
index 00000000000..9c5ac404c6b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SimplePlanRewriter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+
+import java.util.Collections;
+import java.util.List;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+
+public class SimplePlanRewriter<C> extends PlanVisitor<List<PlanNode>, C> {
+
+  @Override
+  public List<PlanNode> visitPlan(PlanNode node, C context) {
+    if (node instanceof WritePlanNode) {
+      return Collections.singletonList(node);
+    }
+
+    List<List<PlanNode>> children =
+        node.getChildren().stream()
+            .map(child -> child.accept(this, context))
+            .collect(toImmutableList());
+
+    PlanNode newNode = node.clone();
+    for (List<PlanNode> planNodes : children) {
+      planNodes.forEach(newNode::addChild);
+    }
+    return Collections.singletonList(newNode);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SubPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SubPlanGenerator.java
new file mode 100644
index 00000000000..de160e61145
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SubPlanGenerator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
+
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.MultiChildrenSinkNode;
+
+import org.apache.commons.lang3.Validate;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** Split SubPlan according to ExchangeNode. */
+public class SubPlanGenerator {
+
+  public SubPlan splitToSubPlan(LogicalQueryPlan logicalQueryPlan) {
+    QueryId queryId = logicalQueryPlan.getContext().getQueryId();
+    SubPlan rootSubPlan = createSubPlan(logicalQueryPlan.getRootNode(), 
queryId);
+    Set<PlanNodeId> visitedSinkNode = new HashSet<>();
+    splitToSubPlan(logicalQueryPlan.getRootNode(), rootSubPlan, 
visitedSinkNode, queryId);
+    return rootSubPlan;
+  }
+
+  private void splitToSubPlan(
+      PlanNode root, SubPlan subPlan, Set<PlanNodeId> visitedSinkNode, QueryId 
queryId) {
+    if (root instanceof WritePlanNode) {
+      return;
+    }
+    if (root instanceof ExchangeNode) {
+      // We add a FragmentSinkNode for newly created PlanFragment
+      ExchangeNode exchangeNode = (ExchangeNode) root;
+      Validate.isTrue(
+          exchangeNode.getChild() instanceof MultiChildrenSinkNode,
+          "child of ExchangeNode must be MultiChildrenSinkNode");
+      MultiChildrenSinkNode sinkNode = (MultiChildrenSinkNode) 
(exchangeNode.getChild());
+
+      // We cut off the subtree to make the ExchangeNode as the leaf node of 
current PlanFragment
+      exchangeNode.cleanChildren();
+
+      // If the SinkNode hasn't visited, build the child SubPlan Tree
+      if (!visitedSinkNode.contains(sinkNode.getPlanNodeId())) {
+        visitedSinkNode.add(sinkNode.getPlanNodeId());
+        SubPlan childSubPlan = createSubPlan(sinkNode, queryId);
+        splitToSubPlan(sinkNode, childSubPlan, visitedSinkNode, queryId);
+        subPlan.addChild(childSubPlan);
+      }
+      return;
+    }
+    for (PlanNode child : root.getChildren()) {
+      splitToSubPlan(child, subPlan, visitedSinkNode, queryId);
+    }
+  }
+
+  private SubPlan createSubPlan(PlanNode root, QueryId queryId) {
+    PlanFragment fragment = new PlanFragment(queryId.genPlanFragmentId(), 
root);
+    return new SubPlan(fragment);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
index 978978b2088..6804c056cac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
@@ -10,6 +10,8 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.relational.sql.tree.Expression;
 
+import com.google.common.collect.ImmutableList;
+
 import javax.annotation.Nullable;
 
 import java.io.DataOutputStream;
@@ -65,7 +67,7 @@ public class TableScanNode extends PlanNode {
 
   @Override
   public List<PlanNode> getChildren() {
-    return null;
+    return ImmutableList.of();
   }
 
   @Override
@@ -123,6 +125,18 @@ public class TableScanNode extends PlanNode {
     return this.qualifiedTableName;
   }
 
+  public List<DeviceEntry> getDeviceEntries() {
+    return this.deviceEntries;
+  }
+
+  public void setDeviceEntries(List<DeviceEntry> deviceEntries) {
+    this.deviceEntries = deviceEntries;
+  }
+
+  public Map<Symbol, Integer> getIdAndAttributeIndexMap() {
+    return this.idAndAttributeIndexMap;
+  }
+
   public Map<Symbol, ColumnSchema> getAssignments() {
     return this.assignments;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
index 21a54da4e73..c52131813c9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
@@ -15,13 +15,102 @@
 package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
 
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+import org.apache.iotdb.db.relational.sql.tree.Expression;
 
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.ATTRIBUTE;
+
+/** Extract IDeviceID and */
 public class IndexScan implements RelationalPlanOptimizer {
 
   @Override
-  public PlanNode optimize(PlanNode planNode, Analysis analysis, 
MPPQueryContext context) {
-    return null;
+  public PlanNode optimize(
+      PlanNode planNode,
+      Analysis analysis,
+      Metadata metadata,
+      SessionInfo sessionInfo,
+      MPPQueryContext context) {
+    return planNode.accept(new Rewriter(), new RewriterContext(null, metadata, 
sessionInfo));
+  }
+
+  private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> 
{
+    @Override
+    public PlanNode visitPlan(PlanNode node, RewriterContext context) {
+      for (PlanNode child : node.getChildren()) {
+        child.accept(this, context);
+      }
+      return node;
+    }
+
+    @Override
+    public PlanNode visitFilter(FilterNode node, RewriterContext context) {
+      context.setPredicate(node.getPredicate());
+      return node;
+    }
+
+    @Override
+    public PlanNode visitTableScan(TableScanNode node, RewriterContext 
context) {
+      List<String> attributeColumns =
+          node.getAssignments().entrySet().stream()
+              .filter(e -> e.getValue().getColumnCategory().equals(ATTRIBUTE))
+              .map(e -> e.getKey().getName())
+              .collect(Collectors.toList());
+      // TODO extract predicate to expression list
+      List<DeviceEntry> deviceEntries =
+          context
+              .getMetadata()
+              .indexScan(
+                  new QualifiedObjectName(
+                      context.getSessionInfo().getDatabaseName().get(),
+                      node.getQualifiedTableName()),
+                  Collections.singletonList(context.getPredicate()),
+                  attributeColumns);
+      node.setDeviceEntries(deviceEntries);
+      return node;
+    }
+  }
+
+  private static class RewriterContext {
+    private Expression predicate;
+    private Metadata metadata;
+    private final SessionInfo sessionInfo;
+
+    RewriterContext(Expression predicate, Metadata metadata, SessionInfo 
sessionInfo) {
+      this.predicate = predicate;
+      this.metadata = metadata;
+      this.sessionInfo = sessionInfo;
+    }
+
+    public Expression getPredicate() {
+      return this.predicate;
+    }
+
+    public void setPredicate(Expression predicate) {
+      this.predicate = predicate;
+    }
+
+    public Metadata getMetadata() {
+      return this.metadata;
+    }
+
+    public void setMetadata(Metadata metadata) {
+      this.metadata = metadata;
+    }
+
+    public SessionInfo getSessionInfo() {
+      return this.sessionInfo;
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneTableScanColumns.java
similarity index 50%
copy from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneTableScanColumns.java
index 1e5839fe32a..dc75af69b26 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PruneTableScanColumns.java
@@ -15,26 +15,36 @@
 package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
 
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
-import org.apache.iotdb.db.relational.sql.tree.Expression;
+import org.apache.iotdb.db.relational.sql.tree.DefaultTraversalVisitor;
+import org.apache.iotdb.db.relational.sql.tree.SymbolReference;
 
-import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.ExtractCommonPredicatesExpressionRewriter.extractCommonPredicates;
-import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.NormalizeOrExpressionRewriter.normalizeOrExpression;
+import com.google.common.collect.ImmutableList;
 
-public class SimplifyExpressions implements RelationalPlanOptimizer {
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
+public class PruneTableScanColumns implements RelationalPlanOptimizer {
   @Override
-  public PlanNode optimize(PlanNode planNode, Analysis analysis, 
MPPQueryContext context) {
-    // TODO add query statement pruning
+  public PlanNode optimize(
+      PlanNode planNode,
+      Analysis analysis,
+      Metadata metadata,
+      SessionInfo sessionInfo,
+      MPPQueryContext context) {
     return planNode.accept(new Rewriter(), new RewriterContext());
   }
 
   private static class Rewriter extends PlanVisitor<PlanNode, RewriterContext> 
{
-
     @Override
     public PlanNode visitPlan(PlanNode node, RewriterContext context) {
       for (PlanNode child : node.getChildren()) {
@@ -43,11 +53,20 @@ public class SimplifyExpressions implements 
RelationalPlanOptimizer {
       return node;
     }
 
+    @Override
+    public PlanNode visitProject(ProjectNode node, RewriterContext context) {
+      context.symbolHashSet.addAll(node.getOutputSymbols());
+      node.getChild().accept(this, context);
+      return node;
+    }
+
     @Override
     public PlanNode visitFilter(FilterNode node, RewriterContext context) {
-      Expression predicate = normalizeOrExpression(node.getPredicate());
-      predicate = extractCommonPredicates(predicate);
-      node.setPredicate(predicate);
+      ImmutableList.Builder<Symbol> symbolBuilder = ImmutableList.builder();
+      new SymbolBuilderVisitor().process(node.getPredicate(), 
ImmutableList.builder());
+      List<Symbol> ret = symbolBuilder.build();
+      context.symbolHashSet.addAll(ret);
+      node.getChild().accept(this, context);
       return node;
     }
 
@@ -57,5 +76,17 @@ public class SimplifyExpressions implements 
RelationalPlanOptimizer {
     }
   }
 
-  private static class RewriterContext {}
+  private static class SymbolBuilderVisitor
+      extends DefaultTraversalVisitor<ImmutableList.Builder<Symbol>> {
+    @Override
+    protected Void visitSymbolReference(
+        SymbolReference node, ImmutableList.Builder<Symbol> builder) {
+      builder.add(Symbol.from(node));
+      return null;
+    }
+  }
+
+  private static class RewriterContext {
+    Set<Symbol> symbolHashSet = new HashSet<>();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java
index df7cb945167..37611e6a35c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RelationalPlanOptimizer.java
@@ -15,9 +15,16 @@
 package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
 
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 
 public interface RelationalPlanOptimizer {
-  PlanNode optimize(PlanNode planNode, Analysis analysis, MPPQueryContext 
context);
+  PlanNode optimize(
+      PlanNode planNode,
+      Analysis analysis,
+      Metadata metadata,
+      SessionInfo sessionInfo,
+      MPPQueryContext context);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
index 34da737fd93..67027079a17 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/RemoveRedundantIdentityProjections.java
@@ -15,10 +15,12 @@
 package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
 
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 
@@ -28,7 +30,12 @@ import java.util.List;
 public class RemoveRedundantIdentityProjections implements 
RelationalPlanOptimizer {
 
   @Override
-  public PlanNode optimize(PlanNode planNode, Analysis analysis, 
MPPQueryContext context) {
+  public PlanNode optimize(
+      PlanNode planNode,
+      Analysis analysis,
+      Metadata metadata,
+      SessionInfo sessionInfo,
+      MPPQueryContext context) {
     return planNode.accept(new Rewriter(), new RewriterContext());
   }
 
@@ -60,8 +67,9 @@ public class RemoveRedundantIdentityProjections implements 
RelationalPlanOptimiz
             }
           }
         }
-        return projectNode.getChild();
+        return projectNode.getChild().accept(this, context);
       } else {
+        projectNode.getChild().accept(this, context);
         return projectNode;
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
index 1e5839fe32a..782c9705136 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/SimplifyExpressions.java
@@ -15,9 +15,11 @@
 package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
 
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import org.apache.iotdb.db.relational.sql.tree.Expression;
@@ -28,7 +30,12 @@ import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.Normali
 public class SimplifyExpressions implements RelationalPlanOptimizer {
 
   @Override
-  public PlanNode optimize(PlanNode planNode, Analysis analysis, 
MPPQueryContext context) {
+  public PlanNode optimize(
+      PlanNode planNode,
+      Analysis analysis,
+      Metadata metadata,
+      SessionInfo sessionInfo,
+      MPPQueryContext context) {
     // TODO add query statement pruning
     return planNode.accept(new Rewriter(), new RewriterContext());
   }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index 34eca515305..8e562f742e6 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@ -34,6 +34,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectN
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableHandle;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.LogicalPlanner;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.RelationalDistributionPlanner;
 import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
 import org.apache.iotdb.db.relational.sql.parser.SqlParser;
 import org.apache.iotdb.db.relational.sql.tree.Statement;
@@ -145,8 +146,13 @@ public class AnalyzerTest {
     WarningCollector warningCollector = WarningCollector.NOOP;
     LogicalPlanner logicalPlanner =
         new LogicalPlanner(context, metadata, sessionInfo, warningCollector);
-    LogicalQueryPlan result = logicalPlanner.plan(actualAnalysis);
-    System.out.println(result);
+    LogicalQueryPlan logicalQueryPlan = logicalPlanner.plan(actualAnalysis);
+    System.out.println(logicalQueryPlan);
+
+    RelationalDistributionPlanner distributionPlanner =
+        new RelationalDistributionPlanner(actualAnalysis, logicalQueryPlan, 
context);
+    // DistributedQueryPlan distributedQueryPlan = distributionPlanner.plan();
+    // System.out.println(distributedQueryPlan);
   }
 
   public static Analysis analyzeSQL(String sql, Metadata metadata) {

Reply via email to