This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a70fba  [IOTDB-2658] Generate logical plan for query statement (#5356)
4a70fba is described below

commit 4a70fba534e241ce75a0b92d24dee8849694e392
Author: liuminghui233 <[email protected]>
AuthorDate: Mon Mar 28 23:03:08 2022 +0800

    [IOTDB-2658] Generate logical plan for query statement (#5356)
    
    * replace FilterOperator with QueryFilter
    
    * generate LogicalPlan for query statement
    
    * fill NPE
    
    * merge master
    
    * fix compile error
    
    * implement LogicalPlanPrinter
    
    * update printer format
    
    * add tests in LogicalPlannerTest
    
    * resolve conflicts
    
    * remove toString()
    
    * remove set DataRegionReplicaSet in LogicalPlannerl
    
    * add TODO
---
 .../apache/iotdb/db/mpp/sql/parser/ASTVisitor.java |  19 +-
 .../db/mpp/sql/planner/LocalExecutionPlanner.java  |   4 +-
 .../iotdb/db/mpp/sql/planner/LogicalPlanner.java   | 177 ++++++++++++++++-
 .../sql/planner/plan/node/PlanNodeIdAllocator.java |   7 +
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |   4 +
 .../planner/plan/node/process/DeviceMergeNode.java |  17 +-
 .../sql/planner/plan/node/process/FillNode.java    |  20 +-
 .../sql/planner/plan/node/process/FilterNode.java  |  22 ++-
 .../planner/plan/node/process/FilterNullNode.java  |  29 ++-
 .../plan/node/process/GroupByLevelNode.java        |  21 +-
 .../sql/planner/plan/node/process/LimitNode.java   |  11 ++
 .../sql/planner/plan/node/process/OffsetNode.java  |  11 ++
 .../sql/planner/plan/node/process/SortNode.java    |  15 ++
 .../planner/plan/node/process/TimeJoinNode.java    |  23 ++-
 .../planner/plan/node/source/CsvSourceNode.java    |  10 +
 .../plan/node/source/SeriesAggregateScanNode.java  |  21 ++
 .../planner/plan/node/source/SeriesScanNode.java   |  34 ++++
 .../sql/planner/plan/node/source/SourceNode.java   |  22 +++
 .../statement/component}/FillPolicy.java           |   2 +-
 .../statement/component/FilterNullComponent.java   |   8 +-
 .../statement/component}/FilterNullPolicy.java     |   2 +-
 .../statement/component/GroupByLevelComponent.java |   6 +
 .../component/GroupByLevelController.java          |  10 +-
 .../mpp/sql/statement/component/ResultColumn.java  |   3 +-
 .../sql/statement/component/SelectComponent.java   |   4 +-
 .../mpp/sql/statement/crud/UDAFQueryStatement.java |   2 +-
 .../iotdb/db/qp/logical/crud/SelectComponent.java  |   4 +-
 .../db/qp/logical/crud/UDAFQueryOperator.java      |   4 +-
 .../iotdb/db/qp/utils/GroupByLevelController.java  |   4 +-
 .../iotdb/db/query/expression/Expression.java      |   5 +-
 .../query/expression/binary/BinaryExpression.java  |  10 +-
 .../db/query/expression/unary/ConstantOperand.java |   6 +
 .../query/expression/unary/FunctionExpression.java |  30 ++-
 .../query/expression/unary/LogicNotExpression.java |   8 +-
 .../query/expression/unary/NegationExpression.java |   8 +-
 .../query/expression/unary/TimeSeriesOperand.java  |   8 +
 .../db/mpp/sql/plan/DistributionPlannerTest.java   |   2 +-
 .../iotdb/db/mpp/sql/plan/LogicalPlanPrinter.java  | 219 +++++++++++++++++++++
 .../iotdb/db/mpp/sql/plan/LogicalPlannerTest.java  |  79 +++++++-
 39 files changed, 815 insertions(+), 76 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java
index 6985a3e..d291dc8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/parser/ASTVisitor.java
@@ -34,16 +34,7 @@ import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
 import org.apache.iotdb.db.mpp.common.filter.RegexpFilter;
 import org.apache.iotdb.db.mpp.sql.constant.FilterConstant;
 import org.apache.iotdb.db.mpp.sql.statement.Statement;
-import org.apache.iotdb.db.mpp.sql.statement.component.FillComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.FromComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.GroupByLevelComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.GroupByTimeComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
-import org.apache.iotdb.db.mpp.sql.statement.component.ResultColumn;
-import org.apache.iotdb.db.mpp.sql.statement.component.ResultSetFormat;
-import org.apache.iotdb.db.mpp.sql.statement.component.SelectComponent;
-import org.apache.iotdb.db.mpp.sql.statement.component.WhereCondition;
+import org.apache.iotdb.db.mpp.sql.statement.component.*;
 import org.apache.iotdb.db.mpp.sql.statement.crud.AggregationQueryStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.FillQueryStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.GroupByFillQueryStatement;
@@ -277,6 +268,8 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
     }
   }
 
+  // Alter Timeseries 
========================================================================
+
   @Override
   public Statement visitAlterTimeseries(IoTDBSqlParser.AlterTimeseriesContext 
ctx) {
     AlterTimeSeriesStatement alterTimeSeriesStatement = new 
AlterTimeSeriesStatement();
@@ -1089,11 +1082,11 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
 
     // set without null policy
     if (ctx.ANY() != null) {
-      
filterNullComponent.setWithoutPolicyType(FilterNullComponent.FilterNullPolicy.CONTAINS_NULL);
+      filterNullComponent.setWithoutPolicyType(FilterNullPolicy.CONTAINS_NULL);
     } else if (ctx.ALL() != null) {
-      
filterNullComponent.setWithoutPolicyType(FilterNullComponent.FilterNullPolicy.ALL_NULL);
+      filterNullComponent.setWithoutPolicyType(FilterNullPolicy.ALL_NULL);
     } else {
-      
filterNullComponent.setWithoutPolicyType(FilterNullComponent.FilterNullPolicy.NULL);
+      filterNullComponent.setWithoutPolicyType(FilterNullPolicy.NO_FILTER);
     }
 
     queryStatement.setFilterNullComponent(filterNullComponent);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
index 2c14ff1..c25ce55 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LocalExecutionPlanner.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner;
 
+import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext;
 import org.apache.iotdb.db.mpp.operator.Operator;
 import org.apache.iotdb.db.mpp.operator.process.LimitOperator;
@@ -35,7 +36,6 @@ import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.process.SortNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
-import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
 
 import java.util.List;
 
@@ -79,7 +79,7 @@ public class LocalExecutionPlanner {
     public Operator visitFilter(FilterNode node, LocalExecutionPlanContext 
context) {
       PlanNode child = node.getChild();
 
-      FilterOperator filterExpression = node.getPredicate();
+      QueryFilter filterExpression = node.getPredicate();
       List<String> outputSymbols = node.getOutputColumnNames();
       return super.visitFilter(node, context);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
index e1f5d1d..379a90f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/LogicalPlanner.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.mpp.sql.planner;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
 import org.apache.iotdb.db.mpp.sql.optimization.PlanOptimizer;
 import org.apache.iotdb.db.mpp.sql.planner.plan.LogicalQueryPlan;
@@ -27,15 +28,22 @@ import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
 import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
+import org.apache.iotdb.db.mpp.sql.statement.component.*;
+import org.apache.iotdb.db.mpp.sql.statement.crud.AggregationQueryStatement;
+import org.apache.iotdb.db.mpp.sql.statement.crud.FillQueryStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.mpp.sql.statement.crud.QueryStatement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
 import 
org.apache.iotdb.db.mpp.sql.statement.metadata.CreateAlignedTimeSeriesStatement;
 import 
org.apache.iotdb.db.mpp.sql.statement.metadata.CreateTimeSeriesStatement;
+import org.apache.iotdb.db.query.expression.Expression;
 
-import java.util.List;
+import java.util.*;
+import java.util.stream.Collectors;
 
 /** Generate a logical plan for the statement. */
 public class LogicalPlanner {
@@ -65,8 +73,7 @@ public class LogicalPlanner {
    * This visitor is used to generate a logical plan for the statement and 
returns the {@link
    * PlanNode}.
    */
-  private static final class LogicalPlanVisitor
-      extends StatementVisitor<PlanNode, MPPQueryContext> {
+  private class LogicalPlanVisitor extends StatementVisitor<PlanNode, 
MPPQueryContext> {
 
     private final Analysis analysis;
 
@@ -76,8 +83,151 @@ public class LogicalPlanner {
 
     @Override
     public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext 
context) {
-      // TODO: Generate logical planNode tree for query statement
-      return null;
+      PlanBuilder planBuilder = planSelectComponent(queryStatement);
+
+      if (queryStatement.getWhereCondition() != null) {
+        planBuilder =
+            planQueryFilter(planBuilder, 
queryStatement.getWhereCondition().getQueryFilter());
+      }
+
+      if (queryStatement.isGroupByLevel()) {
+        planBuilder =
+            planGroupByLevel(
+                planBuilder,
+                ((AggregationQueryStatement) 
queryStatement).getGroupByLevelComponent());
+      }
+
+      if (queryStatement instanceof FillQueryStatement) {
+        planBuilder =
+            planFill(planBuilder, ((FillQueryStatement) 
queryStatement).getFillComponent());
+      }
+
+      planBuilder = planFilterNull(planBuilder, 
queryStatement.getFilterNullComponent());
+      planBuilder = planSort(planBuilder, queryStatement.getResultOrder());
+      planBuilder = planLimit(planBuilder, queryStatement.getRowLimit());
+      planBuilder = planOffset(planBuilder, queryStatement.getRowOffset());
+      return planBuilder.getRoot();
+    }
+
+    private PlanBuilder planSelectComponent(QueryStatement queryStatement) {
+      // TODO: generate SourceNode for QueryFilter
+      Map<String, Set<SourceNode>> deviceNameToSourceNodesMap = new 
HashMap<>();
+
+      for (ResultColumn resultColumn : 
queryStatement.getSelectComponent().getResultColumns()) {
+        Set<SourceNode> sourceNodes = planResultColumn(resultColumn);
+        for (SourceNode sourceNode : sourceNodes) {
+          String deviceName = sourceNode.getDeviceName();
+          deviceNameToSourceNodesMap
+              .computeIfAbsent(deviceName, k -> new HashSet<>())
+              .add(sourceNode);
+        }
+      }
+
+      if (queryStatement.isAlignByDevice()) {
+        DeviceMergeNode deviceMergeNode = new 
DeviceMergeNode(PlanNodeIdAllocator.generateId());
+        for (Map.Entry<String, Set<SourceNode>> entry : 
deviceNameToSourceNodesMap.entrySet()) {
+          String deviceName = entry.getKey();
+          List<PlanNode> planNodes = new ArrayList<>(entry.getValue());
+          if (planNodes.size() == 1) {
+            deviceMergeNode.addChildDeviceNode(deviceName, planNodes.get(0));
+          } else {
+            TimeJoinNode timeJoinNode =
+                new TimeJoinNode(
+                    PlanNodeIdAllocator.generateId(),
+                    queryStatement.getResultOrder(),
+                    null,
+                    planNodes);
+            deviceMergeNode.addChildDeviceNode(deviceName, timeJoinNode);
+          }
+        }
+        return new PlanBuilder(deviceMergeNode);
+      }
+
+      List<PlanNode> planNodes =
+          deviceNameToSourceNodesMap.entrySet().stream()
+              .flatMap(entry -> entry.getValue().stream())
+              .collect(Collectors.toList());
+      TimeJoinNode timeJoinNode =
+          new TimeJoinNode(
+              PlanNodeIdAllocator.generateId(), 
queryStatement.getResultOrder(), null, planNodes);
+      return new PlanBuilder(timeJoinNode);
+    }
+
+    private Set<SourceNode> planResultColumn(ResultColumn resultColumn) {
+      Set<SourceNode> resultSourceNodeSet = new HashSet<>();
+      resultColumn.getExpression().collectPlanNode(resultSourceNodeSet);
+      return resultSourceNodeSet;
+    }
+
+    private PlanBuilder planQueryFilter(PlanBuilder planBuilder, QueryFilter 
queryFilter) {
+      if (queryFilter == null) {
+        return planBuilder;
+      }
+
+      return planBuilder.withNewRoot(
+          new FilterNode(PlanNodeIdAllocator.generateId(), 
planBuilder.getRoot(), queryFilter));
+    }
+
+    private PlanBuilder planGroupByLevel(
+        PlanBuilder planBuilder, GroupByLevelComponent groupByLevelComponent) {
+      if (groupByLevelComponent == null) {
+        return planBuilder;
+      }
+
+      return planBuilder.withNewRoot(
+          new GroupByLevelNode(
+              PlanNodeIdAllocator.generateId(),
+              planBuilder.getRoot(),
+              groupByLevelComponent.getLevels(),
+              groupByLevelComponent.getGroupedPathMap()));
+    }
+
+    private PlanBuilder planFill(PlanBuilder planBuilder, FillComponent 
fillComponent) {
+      // TODO: support Fill
+      return planBuilder;
+    }
+
+    private PlanBuilder planFilterNull(
+        PlanBuilder planBuilder, FilterNullComponent filterNullComponent) {
+      if (filterNullComponent == null) {
+        return planBuilder;
+      }
+
+      return planBuilder.withNewRoot(
+          new FilterNullNode(
+              PlanNodeIdAllocator.generateId(),
+              planBuilder.getRoot(),
+              filterNullComponent.getWithoutPolicyType(),
+              filterNullComponent.getWithoutNullColumns().stream()
+                  .map(Expression::getExpressionString)
+                  .collect(Collectors.toList())));
+    }
+
+    private PlanBuilder planSort(PlanBuilder planBuilder, OrderBy resultOrder) 
{
+      if (resultOrder == null || resultOrder == OrderBy.TIMESTAMP_ASC) {
+        return planBuilder;
+      }
+
+      return planBuilder.withNewRoot(
+          new SortNode(PlanNodeIdAllocator.generateId(), 
planBuilder.getRoot(), null, resultOrder));
+    }
+
+    private PlanBuilder planLimit(PlanBuilder planBuilder, int rowLimit) {
+      if (rowLimit == 0) {
+        return planBuilder;
+      }
+
+      return planBuilder.withNewRoot(
+          new LimitNode(PlanNodeIdAllocator.generateId(), rowLimit, 
planBuilder.getRoot()));
+    }
+
+    private PlanBuilder planOffset(PlanBuilder planBuilder, int rowOffset) {
+      if (rowOffset == 0) {
+        return planBuilder;
+      }
+
+      return planBuilder.withNewRoot(
+          new OffsetNode(PlanNodeIdAllocator.generateId(), 
planBuilder.getRoot(), rowOffset));
     }
 
     @Override
@@ -134,4 +284,21 @@ public class LogicalPlanner {
       return node;
     }
   }
+
+  private class PlanBuilder {
+
+    private PlanNode root;
+
+    public PlanBuilder(PlanNode root) {
+      this.root = root;
+    }
+
+    public PlanNode getRoot() {
+      return root;
+    }
+
+    public PlanBuilder withNewRoot(PlanNode newRoot) {
+      return new PlanBuilder(newRoot);
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
index 52b925c..8176239 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanNodeIdAllocator.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.mpp.sql.planner.plan.node;
 
+import org.apache.iotdb.commons.utils.TestOnly;
+
 public class PlanNodeIdAllocator {
   public static int initialId = 0;
 
@@ -26,4 +28,9 @@ public class PlanNodeIdAllocator {
     initialId++;
     return new PlanNodeId(String.valueOf(initialId));
   }
+
+  @TestOnly
+  public static synchronized void reset() {
+    initialId = 0;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
index 58f88af..916f516 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -34,6 +34,10 @@ import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
 
 public abstract class PlanVisitor<R, C> {
 
+  public R process(PlanNode node, C context) {
+    return node.accept(this, context);
+  }
+
   public abstract R visitPlan(PlanNode node, C context);
 
   public R visitSeriesScan(SeriesScanNode node, C context) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
index 96dc317..5d98d33 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/DeviceMergeNode.java
@@ -18,13 +18,16 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
-import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -82,6 +85,10 @@ public class DeviceMergeNode extends ProcessNode {
     return columnNames;
   }
 
+  public OrderBy getMergeOrder() {
+    return mergeOrder;
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitDeviceMerge(this, context);
@@ -104,4 +111,12 @@ public class DeviceMergeNode extends ProcessNode {
     this.childDeviceNodeMap.put(deviceName, childNode);
     this.children.add(childNode);
   }
+
+  @TestOnly
+  public Pair<String, List<String>> print() {
+    String title = String.format("[DeviceMergeNode (%s)]", this.getId());
+    List<String> attributes = new ArrayList<>();
+    attributes.add("MergeOrder: " + (this.getMergeOrder() == null ? "null" : 
this.getMergeOrder()));
+    return new Pair<>(title, attributes);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
index 99a2718..f79a1c0 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FillNode.java
@@ -18,14 +18,17 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
-import org.apache.iotdb.db.mpp.common.FillPolicy;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.statement.component.FillPolicy;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.collect.ImmutableList;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 /** FillNode is used to fill the empty field in one row. */
@@ -63,6 +66,10 @@ public class FillNode extends ProcessNode {
     return child.getOutputColumnNames();
   }
 
+  public FillPolicy getFillPolicy() {
+    return fillPolicy;
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitFill(this, context);
@@ -75,8 +82,17 @@ public class FillNode extends ProcessNode {
   @Override
   public void serialize(ByteBuffer byteBuffer) {}
 
-  public FillNode(PlanNodeId id, FillPolicy fillPolicy) {
+  public FillNode(PlanNodeId id, PlanNode child, FillPolicy fillPolicy) {
     this(id);
+    this.child = child;
     this.fillPolicy = fillPolicy;
   }
+
+  @TestOnly
+  public Pair<String, List<String>> print() {
+    String title = String.format("[FillNode (%s)]", this.getId());
+    List<String> attributes = new ArrayList<>();
+    attributes.add("FillPolicy: " + this.getFillPolicy());
+    return new Pair<>(title, attributes);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
index 123948e..46a26cc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNode.java
@@ -18,25 +18,27 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.mpp.common.filter.QueryFilter;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
-import org.apache.iotdb.db.qp.logical.crud.FilterOperator;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.collect.ImmutableList;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 /** The FilterNode is responsible to filter the RowRecord from TsBlock. */
 public class FilterNode extends ProcessNode {
 
   private final PlanNode child;
-  // TODO we need to rename it to something like expression in order to 
distinguish from Operator
-  // class
-  private final FilterOperator predicate;
 
-  public FilterNode(PlanNodeId id, PlanNode child, FilterOperator predicate) {
+  private final QueryFilter predicate;
+
+  public FilterNode(PlanNodeId id, PlanNode child, QueryFilter predicate) {
     super(id);
     this.child = child;
     this.predicate = predicate;
@@ -77,11 +79,19 @@ public class FilterNode extends ProcessNode {
   @Override
   public void serialize(ByteBuffer byteBuffer) {}
 
-  public FilterOperator getPredicate() {
+  public QueryFilter getPredicate() {
     return predicate;
   }
 
   public PlanNode getChild() {
     return child;
   }
+
+  @TestOnly
+  public Pair<String, List<String>> print() {
+    String title = String.format("[FilterNode (%s)]", this.getId());
+    List<String> attributes = new ArrayList<>();
+    attributes.add("QueryFilter: " + this.getPredicate());
+    return new Pair<>(title, attributes);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
index c51cecb..661d735 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/FilterNullNode.java
@@ -18,14 +18,17 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
-import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.collect.ImmutableList;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 /** WithoutNode is used to discard specific rows from upstream node. */
@@ -43,9 +46,14 @@ public class FilterNullNode extends ProcessNode {
     this.child = child;
   }
 
-  public FilterNullNode(PlanNodeId id, PlanNode child, List<String> 
filterNullColumnNames) {
+  public FilterNullNode(
+      PlanNodeId id,
+      PlanNode child,
+      FilterNullPolicy discardPolicy,
+      List<String> filterNullColumnNames) {
     super(id);
     this.child = child;
+    this.discardPolicy = discardPolicy;
     this.filterNullColumnNames = filterNullColumnNames;
   }
 
@@ -72,6 +80,14 @@ public class FilterNullNode extends ProcessNode {
     return child.getOutputColumnNames();
   }
 
+  public FilterNullPolicy getDiscardPolicy() {
+    return discardPolicy;
+  }
+
+  public List<String> getFilterNullColumnNames() {
+    return filterNullColumnNames;
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitFilterNull(this, context);
@@ -87,4 +103,13 @@ public class FilterNullNode extends ProcessNode {
   public void setFilterNullColumnNames(List<String> filterNullColumnNames) {
     this.filterNullColumnNames = filterNullColumnNames;
   }
+
+  @TestOnly
+  public Pair<String, List<String>> print() {
+    String title = String.format("[FilterNullNode (%s)]", this.getId());
+    List<String> attributes = new ArrayList<>();
+    attributes.add("FilterNullPolicy: " + this.getDiscardPolicy());
+    attributes.add("FilterNullColumnNames: " + 
this.getFilterNullColumnNames());
+    return new Pair<>(title, attributes);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
index a8a3629..ed49bb1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/GroupByLevelNode.java
@@ -18,12 +18,17 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 
 /**
  * This node is responsible for the final aggregation merge operation. It will 
process the data from
@@ -43,12 +48,15 @@ public class GroupByLevelNode extends ProcessNode {
 
   private List<String> columnNames;
 
+  private Map<String, String> groupedPathMap;
+
   public GroupByLevelNode(
-      PlanNodeId id, PlanNode child, int[] groupByLevels, List<String> 
columnNames) {
+      PlanNodeId id, PlanNode child, int[] groupByLevels, Map<String, String> 
groupedPathMap) {
     super(id);
     this.child = child;
     this.groupByLevels = groupByLevels;
-    this.columnNames = columnNames;
+    this.groupedPathMap = groupedPathMap;
+    this.columnNames = new ArrayList<>(groupedPathMap.values());
   }
 
   @Override
@@ -101,4 +109,13 @@ public class GroupByLevelNode extends ProcessNode {
   public void setColumnNames(List<String> columnNames) {
     this.columnNames = columnNames;
   }
+
+  @TestOnly
+  public Pair<String, List<String>> print() {
+    String title = String.format("[GroupByLevelNode (%s)]", this.getId());
+    List<String> attributes = new ArrayList<>();
+    attributes.add("GroupByLevels: " + 
Arrays.toString(this.getGroupByLevels()));
+    attributes.add("ColumnNames: " + this.getOutputColumnNames());
+    return new Pair<>(title, attributes);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
index 0771572..2b15a0f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/LimitNode.java
@@ -18,14 +18,17 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.collect.ImmutableList;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 /** LimitNode is used to select top n result. It uses the default order of 
upstream nodes */
@@ -97,4 +100,12 @@ public class LimitNode extends ProcessNode {
   public String toString() {
     return "LimitNode-" + this.getId();
   }
+
+  @TestOnly
+  public Pair<String, List<String>> print() {
+    String title = String.format("[LimitNode (%s)]", this.getId());
+    List<String> attributes = new ArrayList<>();
+    attributes.add("RowLimit: " + this.getLimit());
+    return new Pair<>(title, attributes);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
index 734c0d9..9f9fd1f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/OffsetNode.java
@@ -18,11 +18,14 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -83,4 +86,12 @@ public class OffsetNode extends ProcessNode {
   public int getOffset() {
     return offset;
   }
+
+  @TestOnly
+  public Pair<String, List<String>> print() {
+    String title = String.format("[OffsetNode (%s)]", this.getId());
+    List<String> attributes = new ArrayList<>();
+    attributes.add("RowOffset: " + this.getOffset());
+    return new Pair<>(title, attributes);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
index c3073d4..25a98da 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/SortNode.java
@@ -18,14 +18,17 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.collect.ImmutableList;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -70,6 +73,10 @@ public class SortNode extends ProcessNode {
     return child.getOutputColumnNames();
   }
 
+  public OrderBy getSortOrder() {
+    return sortOrder;
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitSort(this, context);
@@ -81,4 +88,12 @@ public class SortNode extends ProcessNode {
 
   @Override
   public void serialize(ByteBuffer byteBuffer) {}
+
+  @TestOnly
+  public Pair<String, List<String>> print() {
+    String title = String.format("[SortNode (%s)]", this.getId());
+    List<String> attributes = new ArrayList<>();
+    attributes.add("SortOrder: " + (this.getSortOrder() == null ? "null" : 
this.getSortOrder()));
+    return new Pair<>(title, attributes);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
index a0ec54e..cb40125 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/process/TimeJoinNode.java
@@ -18,12 +18,14 @@
  */
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.process;
 
-import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -113,6 +115,14 @@ public class TimeJoinNode extends ProcessNode {
     this.children = children;
   }
 
+  public OrderBy getMergeOrder() {
+    return mergeOrder;
+  }
+
+  public FilterNullPolicy getFilterNullPolicy() {
+    return filterNullPolicy;
+  }
+
   public void setMergeOrder(OrderBy mergeOrder) {
     this.mergeOrder = mergeOrder;
   }
@@ -124,4 +134,15 @@ public class TimeJoinNode extends ProcessNode {
   public String toString() {
     return "TimeJoinNode-" + this.getId();
   }
+
+  @TestOnly
+  public Pair<String, List<String>> print() {
+    String title = String.format("[TimeJoinNode (%s)]", this.getId());
+    List<String> attributes = new ArrayList<>();
+    attributes.add("MergeOrder: " + (this.getMergeOrder() == null ? "null" : 
this.getMergeOrder()));
+    attributes.add(
+        "FilterNullPolicy: "
+            + (this.getFilterNullPolicy() == null ? "null" : 
this.getFilterNullPolicy()));
+    return new Pair<>(title, attributes);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
index e28c4d1..17d8be3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/CsvSourceNode.java
@@ -77,4 +77,14 @@ public class CsvSourceNode extends SourceNode {
   public void setDataRegionReplicaSet(DataRegionReplicaSet 
dataRegionReplicaSet) {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public String getDeviceName() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  protected String getExpressionString() {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
index 0a45487d..e9414ef 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesAggregateScanNode.java
@@ -19,16 +19,19 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
 import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.mpp.common.GroupByTimeParameter;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.collect.ImmutableList;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -116,6 +119,16 @@ public class SeriesAggregateScanNode extends SourceNode {
   }
 
   @Override
+  public String getDeviceName() {
+    return aggregateFunc.getPaths().get(0).getDevice();
+  }
+
+  @Override
+  protected String getExpressionString() {
+    return aggregateFunc.getExpressionString();
+  }
+
+  @Override
   public void close() throws Exception {}
 
   @Override
@@ -136,4 +149,12 @@ public class SeriesAggregateScanNode extends SourceNode {
   public void setFilter(Filter filter) {
     this.filter = filter;
   }
+
+  @TestOnly
+  public Pair<String, List<String>> print() {
+    String title = String.format("[SeriesAggregateScanNode (%s)]", 
this.getId());
+    List<String> attributes = new ArrayList<>();
+    attributes.add("AggregateFunction: " + this.getExpressionString());
+    return new Pair<>(title, attributes);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
index cd33b51..d4d54e6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SeriesScanNode.java
@@ -19,16 +19,19 @@
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.source;
 
 import org.apache.iotdb.commons.partition.DataRegionReplicaSet;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import com.google.common.collect.ImmutableList;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -99,6 +102,28 @@ public class SeriesScanNode extends SourceNode {
     this.dataRegionReplicaSet = dataRegion;
   }
 
+  @Override
+  public String getDeviceName() {
+    return seriesPath.getDevice();
+  }
+
+  @Override
+  protected String getExpressionString() {
+    return seriesPath.getFullPath();
+  }
+
+  public OrderBy getScanOrder() {
+    return scanOrder;
+  }
+
+  public int getLimit() {
+    return limit;
+  }
+
+  public int getOffset() {
+    return offset;
+  }
+
   public void setScanOrder(OrderBy scanOrder) {
     this.scanOrder = scanOrder;
   }
@@ -159,4 +184,13 @@ public class SeriesScanNode extends SourceNode {
         "SeriesScanNode-%s:[SeriesPath: %s, DataRegion: %s]",
         this.getId(), this.getSeriesPath(), this.getDataRegionReplicaSet());
   }
+
+  @TestOnly
+  public Pair<String, List<String>> print() {
+    String title = String.format("[SeriesScanNode (%s)]", this.getId());
+    List<String> attributes = new ArrayList<>();
+    attributes.add("SeriesPath: " + this.getSeriesPath());
+    attributes.add("scanOrder: " + this.getScanOrder());
+    return new Pair<>(title, attributes);
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
index 67c9e2b..758c087 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/source/SourceNode.java
@@ -33,4 +33,26 @@ public abstract class SourceNode extends PlanNode implements 
AutoCloseable {
   public abstract DataRegionReplicaSet getDataRegionReplicaSet();
 
   public abstract void setDataRegionReplicaSet(DataRegionReplicaSet 
dataRegionReplicaSet);
+
+  public abstract String getDeviceName();
+
+  protected abstract String getExpressionString();
+
+  @Override
+  public final int hashCode() {
+    return getExpressionString().hashCode();
+  }
+
+  @Override
+  public final boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (!(o instanceof SourceNode)) {
+      return false;
+    }
+
+    return getExpressionString().equals(o.toString());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/FillPolicy.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FillPolicy.java
similarity index 93%
rename from server/src/main/java/org/apache/iotdb/db/mpp/common/FillPolicy.java
rename to 
server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FillPolicy.java
index 1b753d5..6be0347 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FillPolicy.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FillPolicy.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.sql.statement.component;
 
 public enum FillPolicy {
   PREVIOUS,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FilterNullComponent.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FilterNullComponent.java
index 566c859..30ffb95 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FilterNullComponent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FilterNullComponent.java
@@ -28,7 +28,7 @@ import java.util.List;
 /** This class maintains information of {@code WITHOUT NULL} clause. */
 public class FilterNullComponent extends StatementNode {
 
-  FilterNullPolicy filterNullPolicy = FilterNullPolicy.NULL;
+  FilterNullPolicy filterNullPolicy = FilterNullPolicy.NO_FILTER;
 
   List<Expression> withoutNullColumns = new ArrayList<>();
 
@@ -51,10 +51,4 @@ public class FilterNullComponent extends StatementNode {
   public void setWithoutNullColumns(List<Expression> withoutNullColumns) {
     this.withoutNullColumns = withoutNullColumns;
   }
-
-  public enum FilterNullPolicy {
-    NULL,
-    CONTAINS_NULL,
-    ALL_NULL
-  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FilterNullPolicy.java
similarity index 93%
rename from 
server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
rename to 
server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FilterNullPolicy.java
index 9de26fa..1bdddb9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/FilterNullPolicy.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/FilterNullPolicy.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.mpp.common;
+package org.apache.iotdb.db.mpp.sql.statement.component;
 
 public enum FilterNullPolicy {
   NO_FILTER,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelComponent.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelComponent.java
index 52e31db..5b91a7b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelComponent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelComponent.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.mpp.sql.statement.component;
 
 import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
 
+import java.util.Map;
+
 /** This class maintains information of {@code GROUP BY LEVEL} clause. */
 public class GroupByLevelComponent extends StatementNode {
 
@@ -42,4 +44,8 @@ public class GroupByLevelComponent extends StatementNode {
   public GroupByLevelController getGroupByLevelController() {
     return groupByLevelController;
   }
+
+  public Map<String, String> getGroupedPathMap() {
+    return groupByLevelController.getGroupedPathMap();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelController.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelController.java
index 99d3eab..f00b67d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelController.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/GroupByLevelController.java
@@ -48,7 +48,7 @@ public class GroupByLevelController {
   private final int[] levels;
   int prevSize = 0;
   /** count(root.sg.d1.s1) with level = 1 -> count(root.*.d1.s1) */
-  private Map<String, String> groupedPathMap;
+  private final Map<String, String> groupedPathMap;
   /** count(root.*.d1.s1) -> alias */
   private Map<String, String> columnToAliasMap;
   /**
@@ -96,7 +96,7 @@ public class GroupByLevelController {
       for (Iterator<Expression> it = rootExpression.iterator(); it.hasNext(); 
) {
         Expression expression = it.next();
         if (expression instanceof FunctionExpression
-            && expression.isPlainAggregationFunctionExpression()) {
+            && expression.isBuiltInAggregationFunctionExpression()) {
           hasAggregation = true;
           List<PartialPath> paths = ((FunctionExpression) 
expression).getPaths();
           String functionName = ((FunctionExpression) 
expression).getFunctionName();
@@ -149,7 +149,7 @@ public class GroupByLevelController {
     for (Iterator<Expression> it = rawColumn.getExpression().iterator(); 
it.hasNext(); ) {
       Expression expression = it.next();
       if (expression instanceof FunctionExpression
-          && expression.isPlainAggregationFunctionExpression()
+          && expression.isBuiltInAggregationFunctionExpression()
           && ((FunctionExpression) expression).isCountStar()) {
         countWildcardIterIndices.add(idx);
       }
@@ -218,4 +218,8 @@ public class GroupByLevelController {
     }
     return transformedPath.toString();
   }
+
+  public Map<String, String> getGroupedPathMap() {
+    return groupedPathMap;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/ResultColumn.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/ResultColumn.java
index 81a376c..d741852 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/ResultColumn.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/ResultColumn.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
+import org.apache.iotdb.db.mpp.sql.statement.StatementNode;
 import org.apache.iotdb.db.query.expression.Expression;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
@@ -68,7 +69,7 @@ import java.util.Set;
  *       udf(udf(root.sg.d.a)), udf(udf(root.sg.d.b))]
  * </ul>
  */
-public class ResultColumn {
+public class ResultColumn extends StatementNode {
 
   private final Expression expression;
   private final String alias;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/SelectComponent.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/SelectComponent.java
index 263cb41..5662881 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/SelectComponent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/component/SelectComponent.java
@@ -81,7 +81,7 @@ public class SelectComponent extends StatementNode {
     if 
(resultColumn.getExpression().isUserDefinedAggregationFunctionExpression()) {
       hasUserDefinedAggregationFunction = true;
     }
-    if (resultColumn.getExpression().isPlainAggregationFunctionExpression()) {
+    if (resultColumn.getExpression().isBuiltInAggregationFunctionExpression()) 
{
       hasBuiltInAggregationFunction = true;
     }
     if 
(resultColumn.getExpression().isTimeSeriesGeneratingFunctionExpression()) {
@@ -113,7 +113,7 @@ public class SelectComponent extends StatementNode {
         if (expression instanceof TimeSeriesOperand) {
           pathsCache.add(((TimeSeriesOperand) expression).getPath());
         } else if (expression instanceof FunctionExpression
-            && expression.isPlainAggregationFunctionExpression()) {
+            && expression.isBuiltInAggregationFunctionExpression()) {
           pathsCache.add(((TimeSeriesOperand) 
expression.getExpressions().get(0)).getPath());
         } else {
           pathsCache.add(null);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDAFQueryStatement.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDAFQueryStatement.java
index 865b1dd..d2475e1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDAFQueryStatement.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/crud/UDAFQueryStatement.java
@@ -75,7 +75,7 @@ public class UDAFQueryStatement extends QueryStatement {
           "Common queries and aggregated queries are not allowed to appear at 
the same time.");
     }
     // Currently, the aggregation function expression can only contain a 
timeseries operand.
-    if (expression.isPlainAggregationFunctionExpression()) {
+    if (expression.isBuiltInAggregationFunctionExpression()) {
       if (expression.getExpressions().size() == 1
           && expression.getExpressions().get(0) instanceof TimeSeriesOperand) {
         return;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
index 4b955eb..d4b3c8e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/SelectComponent.java
@@ -86,7 +86,7 @@ public class SelectComponent {
     if 
(resultColumn.getExpression().isUserDefinedAggregationFunctionExpression()) {
       hasUserDefinedAggregationFunction = true;
     }
-    if (resultColumn.getExpression().isPlainAggregationFunctionExpression()) {
+    if (resultColumn.getExpression().isBuiltInAggregationFunctionExpression()) 
{
       hasPlainAggregationFunction = true;
     }
     if 
(resultColumn.getExpression().isTimeSeriesGeneratingFunctionExpression()) {
@@ -113,7 +113,7 @@ public class SelectComponent {
         if (expression instanceof TimeSeriesOperand) {
           pathsCache.add(((TimeSeriesOperand) expression).getPath());
         } else if (expression instanceof FunctionExpression
-            && expression.isPlainAggregationFunctionExpression()) {
+            && expression.isBuiltInAggregationFunctionExpression()) {
           pathsCache.add(
               ((TimeSeriesOperand) ((FunctionExpression) 
expression).getExpressions().get(0))
                   .getPath());
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/UDAFQueryOperator.java
 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/UDAFQueryOperator.java
index 5f61fdb..7c0fdab 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/UDAFQueryOperator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/UDAFQueryOperator.java
@@ -92,7 +92,7 @@ public class UDAFQueryOperator extends QueryOperator {
   private void addInnerResultColumn(Expression expression) {
     for (Iterator<Expression> it = expression.iterator(); it.hasNext(); ) {
       Expression currentExp = it.next();
-      if (currentExp.isPlainAggregationFunctionExpression()) {
+      if (currentExp.isBuiltInAggregationFunctionExpression()) {
         innerResultColumnsCache.add(new ResultColumn(currentExp));
       }
     }
@@ -163,7 +163,7 @@ public class UDAFQueryOperator extends QueryOperator {
       throw new 
LogicalOperatorException(AggregationQueryOperator.ERROR_MESSAGE1);
     }
     // Currently, the aggregation function expression can only contain a 
timeseries operand.
-    if (expression.isPlainAggregationFunctionExpression()) {
+    if (expression.isBuiltInAggregationFunctionExpression()) {
       if (expression.getExpressions().size() == 1
           && expression.getExpressions().get(0) instanceof TimeSeriesOperand) {
         return;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/utils/GroupByLevelController.java 
b/server/src/main/java/org/apache/iotdb/db/qp/utils/GroupByLevelController.java
index fb301dd..8b3b699 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/qp/utils/GroupByLevelController.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/qp/utils/GroupByLevelController.java
@@ -101,7 +101,7 @@ public class GroupByLevelController {
       for (Iterator<Expression> it = rootExpression.iterator(); it.hasNext(); 
) {
         Expression expression = it.next();
         if (expression instanceof FunctionExpression
-            && expression.isPlainAggregationFunctionExpression()) {
+            && expression.isBuiltInAggregationFunctionExpression()) {
           hasAggregation = true;
           List<PartialPath> paths = ((FunctionExpression) 
expression).getPaths();
           String functionName = ((FunctionExpression) 
expression).getFunctionName();
@@ -154,7 +154,7 @@ public class GroupByLevelController {
     for (Iterator<Expression> it = rawColumn.getExpression().iterator(); 
it.hasNext(); ) {
       Expression expression = it.next();
       if (expression instanceof FunctionExpression
-          && expression.isPlainAggregationFunctionExpression()
+          && expression.isBuiltInAggregationFunctionExpression()
           && ((FunctionExpression) expression).isCountStar()) {
         countWildcardIterIndices.add(idx);
       }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java 
b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
index 84a4f1e..47e80ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/Expression.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.unary.ConstantOperand;
@@ -48,7 +49,7 @@ public abstract class Expression {
   private String expressionStringCache;
   protected Boolean isConstantOperandCache = null;
 
-  public boolean isPlainAggregationFunctionExpression() {
+  public boolean isBuiltInAggregationFunctionExpression() {
     return false;
   }
 
@@ -83,6 +84,8 @@ public abstract class Expression {
   public abstract void constructUdfExecutors(
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId);
 
+  public abstract void collectPlanNode(Set<SourceNode> planNodeSet);
+
   public abstract void updateStatisticsForMemoryAssigner(LayerMemoryAssigner 
memoryAssigner);
 
   public abstract IntermediateLayer constructIntermediateLayer(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
index 0fee73b..7d61695 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/binary/BinaryExpression.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -76,8 +77,8 @@ public abstract class BinaryExpression extends Expression {
 
   @Override
   public boolean isUserDefinedAggregationFunctionExpression() {
-    return leftExpression.isPlainAggregationFunctionExpression()
-        || rightExpression.isPlainAggregationFunctionExpression()
+    return leftExpression.isBuiltInAggregationFunctionExpression()
+        || rightExpression.isBuiltInAggregationFunctionExpression()
         || leftExpression.isUserDefinedAggregationFunctionExpression()
         || rightExpression.isUserDefinedAggregationFunctionExpression();
   }
@@ -199,6 +200,11 @@ public abstract class BinaryExpression extends Expression {
   }
 
   @Override
+  public void collectPlanNode(Set<SourceNode> planNodeSet) {
+    // TODO: support nested expressions
+  }
+
+  @Override
   public void constructUdfExecutors(
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
     leftExpression.constructUdfExecutors(expressionName2Executor, zoneId);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
index 0d32cfb..70fe842 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/ConstantOperand.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.expression.unary;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -103,6 +104,11 @@ public class ConstantOperand extends Expression {
   }
 
   @Override
+  public void collectPlanNode(Set<SourceNode> planNodeSet) {
+    // Do nothing
+  }
+
+  @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner 
memoryAssigner) {
     // Do nothing
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
index f0d3067..3611cbe 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/FunctionExpression.java
@@ -25,6 +25,9 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
+import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.constant.SQLConstant;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
@@ -62,7 +65,7 @@ public class FunctionExpression extends Expression {
    * true: aggregation function<br>
    * false: time series generating function
    */
-  private final boolean isPlainAggregationFunctionExpression;
+  private final boolean isBuiltInAggregationFunctionExpression;
 
   private boolean isUserDefinedAggregationFunctionExpression;
 
@@ -84,7 +87,7 @@ public class FunctionExpression extends Expression {
     this.functionName = functionName;
     functionAttributes = new LinkedHashMap<>();
     expressions = new ArrayList<>();
-    isPlainAggregationFunctionExpression =
+    isBuiltInAggregationFunctionExpression =
         
SQLConstant.getNativeFunctionNames().contains(functionName.toLowerCase());
     isConstantOperandCache = true;
   }
@@ -94,7 +97,7 @@ public class FunctionExpression extends Expression {
     this.functionName = functionName;
     this.functionAttributes = functionAttributes;
     this.expressions = expressions;
-    isPlainAggregationFunctionExpression =
+    isBuiltInAggregationFunctionExpression =
         
SQLConstant.getNativeFunctionNames().contains(functionName.toLowerCase());
     isConstantOperandCache = 
expressions.stream().anyMatch(Expression::isConstantOperand);
     isUserDefinedAggregationFunctionExpression =
@@ -102,12 +105,12 @@ public class FunctionExpression extends Expression {
             .anyMatch(
                 v ->
                     v.isUserDefinedAggregationFunctionExpression()
-                        || v.isPlainAggregationFunctionExpression());
+                        || v.isBuiltInAggregationFunctionExpression());
   }
 
   @Override
-  public boolean isPlainAggregationFunctionExpression() {
-    return isPlainAggregationFunctionExpression;
+  public boolean isBuiltInAggregationFunctionExpression() {
+    return isBuiltInAggregationFunctionExpression;
   }
 
   @Override
@@ -117,7 +120,8 @@ public class FunctionExpression extends Expression {
 
   @Override
   public boolean isTimeSeriesGeneratingFunctionExpression() {
-    return !isPlainAggregationFunctionExpression() && 
!isUserDefinedAggregationFunctionExpression();
+    return !isBuiltInAggregationFunctionExpression()
+        && !isUserDefinedAggregationFunctionExpression();
   }
 
   @Override
@@ -141,7 +145,7 @@ public class FunctionExpression extends Expression {
     isUserDefinedAggregationFunctionExpression =
         isUserDefinedAggregationFunctionExpression
             || expression.isUserDefinedAggregationFunctionExpression()
-            || expression.isPlainAggregationFunctionExpression();
+            || expression.isBuiltInAggregationFunctionExpression();
     expressions.add(expression);
   }
 
@@ -232,6 +236,14 @@ public class FunctionExpression extends Expression {
   }
 
   @Override
+  public void collectPlanNode(Set<SourceNode> planNodeSet) {
+    if (isBuiltInAggregationFunctionExpression) {
+      planNodeSet.add(new 
SeriesAggregateScanNode(PlanNodeIdAllocator.generateId(), this));
+    }
+    // TODO: support UDF
+  }
+
+  @Override
   public void constructUdfExecutors(
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
     String expressionString = getExpressionString();
@@ -265,7 +277,7 @@ public class FunctionExpression extends Expression {
     if (!expressionIntermediateLayerMap.containsKey(this)) {
       float memoryBudgetInMB = memoryAssigner.assign();
       Transformer transformer;
-      if (isPlainAggregationFunctionExpression) {
+      if (isBuiltInAggregationFunctionExpression) {
         transformer =
             new TransparentTransformer(
                 rawTimeSeriesInputLayer.constructPointReader(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
index 2603bae..983f58c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/LogicNotExpression.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.qp.utils.WildcardsRemover;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -70,7 +71,7 @@ public class LogicNotExpression extends Expression {
   @Override
   public boolean isUserDefinedAggregationFunctionExpression() {
     return expression.isUserDefinedAggregationFunctionExpression()
-        || expression.isPlainAggregationFunctionExpression();
+        || expression.isBuiltInAggregationFunctionExpression();
   }
 
   @Override
@@ -128,6 +129,11 @@ public class LogicNotExpression extends Expression {
   }
 
   @Override
+  public void collectPlanNode(Set<SourceNode> planNodeSet) {
+    // TODO: support LogicNotExpression
+  }
+
+  @Override
   public void updateStatisticsForMemoryAssigner(LayerMemoryAssigner 
memoryAssigner) {
     expression.updateStatisticsForMemoryAssigner(memoryAssigner);
     memoryAssigner.increaseExpressionReference(this);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
index 7be5a4e..e5ecde9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/NegationExpression.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -75,7 +76,7 @@ public class NegationExpression extends Expression {
   @Override
   public boolean isUserDefinedAggregationFunctionExpression() {
     return expression.isUserDefinedAggregationFunctionExpression()
-        || expression.isPlainAggregationFunctionExpression();
+        || expression.isBuiltInAggregationFunctionExpression();
   }
 
   @Override
@@ -127,6 +128,11 @@ public class NegationExpression extends Expression {
   }
 
   @Override
+  public void collectPlanNode(Set<SourceNode> planNodeSet) {
+    // TODO: support nested expressions
+  }
+
+  @Override
   public void constructUdfExecutors(
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
     expression.constructUdfExecutors(expressionName2Executor, zoneId);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
 
b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
index a9c1052..6294372 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/query/expression/unary/TimeSeriesOperand.java
@@ -24,6 +24,9 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.mpp.sql.rewriter.WildcardsRemover;
 import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
 import org.apache.iotdb.db.query.expression.Expression;
@@ -111,6 +114,11 @@ public class TimeSeriesOperand extends Expression {
   }
 
   @Override
+  public void collectPlanNode(Set<SourceNode> planNodeSet) {
+    planNodeSet.add(new SeriesScanNode(PlanNodeIdAllocator.generateId(), 
path));
+  }
+
+  @Override
   public void constructUdfExecutors(
       Map<String, UDTFExecutor> expressionName2Executor, ZoneId zoneId) {
     // nothing to do
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
index 79c5939..40f78fb 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/DistributionPlannerTest.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.commons.partition.DeviceGroupId;
 import org.apache.iotdb.commons.partition.TimePartitionId;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.mpp.common.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
@@ -40,6 +39,7 @@ import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.LimitNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.TimeJoinNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.sql.statement.component.FilterNullPolicy;
 import org.apache.iotdb.db.mpp.sql.statement.component.OrderBy;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlanPrinter.java 
b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlanPrinter.java
new file mode 100644
index 0000000..6992aff
--- /dev/null
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlanPrinter.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.sql.plan;
+
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.*;
+import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesAggregateScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.util.List;
+
+public class LogicalPlanPrinter {
+
+  private static final String INDENT = "   ";
+  private static final String CORNER = " └─";
+  private static final String LINE = " │";
+
+  public String print(PlanNode root) {
+    LogicalPlanPrintVisitor printer = new LogicalPlanPrintVisitor();
+    printer.process(root, new PrinterContext(0, false));
+    return printer.getOutput();
+  }
+
+  private static class LogicalPlanPrintVisitor extends PlanVisitor<Void, 
PrinterContext> {
+
+    private final StringBuilder stringBuilder = new StringBuilder();
+
+    @Override
+    public Void visitPlan(PlanNode node, PrinterContext context) {
+      throw new UnsupportedOperationException("not yet implemented: " + node);
+    }
+
+    @Override
+    public Void visitSeriesScan(SeriesScanNode node, PrinterContext context) {
+      print(context.getIndentLevel(), context.isShowCorner(), node.print());
+      return null;
+    }
+
+    @Override
+    public Void visitSeriesAggregate(SeriesAggregateScanNode node, 
PrinterContext context) {
+      print(context.getIndentLevel(), context.isShowCorner(), node.print());
+      return null;
+    }
+
+    @Override
+    public Void visitDeviceMerge(DeviceMergeNode node, PrinterContext context) 
{
+      print(context.getIndentLevel(), context.isShowCorner(), node.print());
+      context.incIndentLevel();
+      context.setShowCorner(true);
+      for (int i = 0; i < node.getChildren().size(); i++) {
+        if (i > 0) {
+          context.setShowCorner(false);
+        }
+        process(node.getChildren().get(i), context);
+      }
+      return null;
+    }
+
+    @Override
+    public Void visitFill(FillNode node, PrinterContext context) {
+      print(context.getIndentLevel(), context.isShowCorner(), node.print());
+      context.incIndentLevel();
+      context.setShowCorner(true);
+      process(node.getChildren().get(0), context);
+      return null;
+    }
+
+    @Override
+    public Void visitFilter(FilterNode node, PrinterContext context) {
+      print(context.getIndentLevel(), context.isShowCorner(), node.print());
+      context.incIndentLevel();
+      context.setShowCorner(true);
+      process(node.getChildren().get(0), context);
+      return null;
+    }
+
+    @Override
+    public Void visitFilterNull(FilterNullNode node, PrinterContext context) {
+      print(context.getIndentLevel(), context.isShowCorner(), node.print());
+      context.incIndentLevel();
+      context.setShowCorner(true);
+      process(node.getChildren().get(0), context);
+      return null;
+    }
+
+    @Override
+    public Void visitGroupByLevel(GroupByLevelNode node, PrinterContext 
context) {
+      print(context.getIndentLevel(), context.isShowCorner(), node.print());
+      context.incIndentLevel();
+      context.setShowCorner(true);
+      for (int i = 0; i < node.getChildren().size(); i++) {
+        if (i > 0) {
+          context.setShowCorner(false);
+        }
+        process(node.getChildren().get(i), context);
+      }
+      return null;
+    }
+
+    @Override
+    public Void visitLimit(LimitNode node, PrinterContext context) {
+      print(context.getIndentLevel(), context.isShowCorner(), node.print());
+      context.incIndentLevel();
+      context.setShowCorner(true);
+      process(node.getChildren().get(0), context);
+      return null;
+    }
+
+    @Override
+    public Void visitOffset(OffsetNode node, PrinterContext context) {
+      print(context.getIndentLevel(), context.isShowCorner(), node.print());
+      context.incIndentLevel();
+      context.setShowCorner(true);
+      process(node.getChild(), context);
+      return null;
+    }
+
+    @Override
+    public Void visitRowBasedSeriesAggregate(AggregateNode node, 
PrinterContext context) {
+      return visitPlan(node, context);
+    }
+
+    @Override
+    public Void visitSort(SortNode node, PrinterContext context) {
+      print(context.getIndentLevel(), context.isShowCorner(), node.print());
+      context.incIndentLevel();
+      context.setShowCorner(true);
+      process(node.getChildren().get(0), context);
+      return null;
+    }
+
+    @Override
+    public Void visitTimeJoin(TimeJoinNode node, PrinterContext context) {
+      print(context.getIndentLevel(), context.isShowCorner(), node.print());
+      context.incIndentLevel();
+      context.setShowCorner(true);
+      for (int i = 0; i < node.getChildren().size(); i++) {
+        if (i > 0) {
+          context.setShowCorner(false);
+        }
+        process(node.getChildren().get(i), context);
+      }
+      return null;
+    }
+
+    private void print(Integer indentLevel, boolean showCorner, Pair<String, 
List<String>> value) {
+      stringBuilder.append(repeatIndent(indentLevel));
+      if (indentLevel > 0) {
+        stringBuilder.append(showCorner ? CORNER : INDENT);
+      }
+      stringBuilder.append(value.left).append('\n');
+      for (String attribute : value.right) {
+        stringBuilder.append(repeatIndent(indentLevel + 1));
+        stringBuilder.append(LINE).append(INDENT);
+        stringBuilder.append(attribute).append('\n');
+      }
+    }
+
+    private String repeatIndent(int indentLevel) {
+      if (indentLevel < 2) {
+        return "";
+      }
+      StringBuilder res = new StringBuilder();
+      for (int i = 0; i < indentLevel - 1; i++) {
+        res.append(INDENT);
+      }
+      return res.toString();
+    }
+
+    public String getOutput() {
+      return stringBuilder.toString();
+    }
+  }
+
+  private static class PrinterContext {
+    private int indentLevel;
+    private boolean showCorner;
+
+    public PrinterContext(int indentLevel, boolean showCorner) {
+      this.indentLevel = indentLevel;
+      this.showCorner = showCorner;
+    }
+
+    public int getIndentLevel() {
+      return indentLevel;
+    }
+
+    public void incIndentLevel() {
+      indentLevel++;
+    }
+
+    public boolean isShowCorner() {
+      return showCorner;
+    }
+
+    public void setShowCorner(boolean showCorner) {
+      this.showCorner = showCorner;
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java 
b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
index 154aa03..1f8ee23 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/sql/plan/LogicalPlannerTest.java
@@ -23,19 +23,22 @@ import 
org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.sql.analyze.Analysis;
-import org.apache.iotdb.db.mpp.sql.analyze.Analyzer;
 import org.apache.iotdb.db.mpp.sql.parser.StatementGenerator;
 import org.apache.iotdb.db.mpp.sql.planner.LogicalPlanner;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeIdAllocator;
 import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import 
org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.time.ZonedDateTime;
@@ -46,6 +49,69 @@ import java.util.Map;
 import static org.junit.Assert.fail;
 
 public class LogicalPlannerTest {
+
+  LogicalPlanPrinter planPrinter = new LogicalPlanPrinter();
+
+  @Before
+  public void setUp() {
+    PlanNodeIdAllocator.reset();
+  }
+
+  @Test
+  @Ignore
+  public void rawDataQueryTest() {
+    PlanNode root =
+        parseSQLToPlanNode(
+            "SELECT s1,s2 FROM root.sg1.d1 WHERE time > 10 and s2 > 100 
WITHOUT NULL ANY(s1) LIMIT 1 OFFSET 10");
+    System.out.println(planPrinter.print(root));
+    // TODO: replace all paths to full paths
+    Assert.assertEquals(
+        "[OffsetNode (7)]\n"
+            + " │   RowOffset: 10\n"
+            + " └─[LimitNode (6)]\n"
+            + "    │   RowLimit: 1\n"
+            + "    └─[FilterNullNode (5)]\n"
+            + "       │   FilterNullPolicy: CONTAINS_NULL\n"
+            + "       │   FilterNullColumnNames: [s1]\n"
+            + "       └─[FilterNode (4)]\n"
+            + "          │   QueryFilter: [and [time>10][s2>100]]\n"
+            + "          └─[TimeJoinNode (3)]\n"
+            + "             │   MergeOrder: TIMESTAMP_ASC\n"
+            + "             │   FilterNullPolicy: null\n"
+            + "             └─[SeriesScanNode (1)]\n"
+            + "                │   SeriesPath: s1\n"
+            + "                │   scanOrder: TIMESTAMP_ASC\n"
+            + "               [SeriesScanNode (2)]\n"
+            + "                │   SeriesPath: s2\n"
+            + "                │   scanOrder: TIMESTAMP_ASC\n",
+        planPrinter.print(root));
+  }
+
+  @Test
+  @Ignore
+  public void aggregationQueryTest() {
+    PlanNode root =
+        parseSQLToPlanNode(
+            "SELECT sum(s1), avg(s2) FROM root.sg1.d1 WHERE time > 10 LIMIT 1 
OFFSET 10");
+    System.out.println(planPrinter.print(root));
+    // TODO: replace all paths to full paths
+    Assert.assertEquals(
+        "[OffsetNode (6)]\n"
+            + " │   RowOffset: 10\n"
+            + " └─[LimitNode (5)]\n"
+            + "    │   RowLimit: 1\n"
+            + "    └─[FilterNode (4)]\n"
+            + "       │   QueryFilter: [time>10]\n"
+            + "       └─[TimeJoinNode (3)]\n"
+            + "          │   MergeOrder: TIMESTAMP_ASC\n"
+            + "          │   FilterNullPolicy: null\n"
+            + "          └─[SeriesAggregateScanNode (2)]\n"
+            + "             │   AggregateFunction: avg(s2)\n"
+            + "            [SeriesAggregateScanNode (1)]\n"
+            + "             │   AggregateFunction: sum(s1)\n",
+        planPrinter.print(root));
+  }
+
   @Test
   public void createTimeseriesPlanTest() {
     String sql =
@@ -289,11 +355,14 @@ public class LogicalPlannerTest {
   private PlanNode parseSQLToPlanNode(String sql) {
     PlanNode planNode = null;
     try {
+      Statement statement =
+          StatementGenerator.createStatement(sql, 
ZonedDateTime.now().getOffset());
       MPPQueryContext context = new MPPQueryContext();
-      Analyzer analyzer = new Analyzer(context);
-      Analysis analysis =
-          analyzer.analyze(
-              StatementGenerator.createStatement(sql, 
ZonedDateTime.now().getOffset()));
+      // TODO: do analyze after implementing ISchemaFetcher and 
IPartitionFetcher
+      //      Analyzer analyzer = new Analyzer(context);
+      //      Analysis analysis = analyzer.analyze(statement);
+      Analysis analysis = new Analysis();
+      analysis.setStatement(statement);
       LogicalPlanner planner = new LogicalPlanner(context, new ArrayList<>());
       planNode = planner.plan(analysis).getRootNode();
     } catch (Exception e) {

Reply via email to