This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/TypeProviderOpt
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c9981379eed90a866b5c094e96e59592e26692e7
Author: Minghui Liu <[email protected]>
AuthorDate: Wed Sep 7 16:17:44 2022 +0800

    split types to sub plans
---
 .../iotdb/db/mpp/common/MPPQueryContext.java       |  7 +++
 .../apache/iotdb/db/mpp/plan/analyze/Analysis.java | 15 ++++--
 .../iotdb/db/mpp/plan/analyze/TypeProvider.java    | 21 +++-----
 .../db/mpp/plan/planner/LogicalPlanBuilder.java    | 59 +++++++++++++++++-----
 .../db/mpp/plan/planner/LogicalPlanVisitor.java    | 40 +++++++--------
 .../db/mpp/plan/planner/SubPlanTypeExtractor.java  | 54 ++++++++++++++++++++
 .../SimpleFragmentParallelPlanner.java             |  2 +-
 .../plan/planner/distribution/SourceRewriter.java  | 18 ++++---
 .../db/mpp/plan/planner/plan/PlanFragment.java     |  9 +++-
 .../plan/planner/plan/node/SimplePlanVisitor.java  | 30 +++++++++++
 .../mpp/plan/plan/distribution/LastQueryTest.java  | 26 ++++++----
 11 files changed, 208 insertions(+), 73 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
index 91d4d85f89..8f4e53470d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/MPPQueryContext.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.common;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 
 import java.util.LinkedList;
 import java.util.List;
@@ -45,6 +46,8 @@ public class MPPQueryContext {
   // onto this node.
   private final List<TEndPoint> endPointBlackList;
 
+  private final TypeProvider typeProvider = new TypeProvider();
+
   public MPPQueryContext(QueryId queryId) {
     this.queryId = queryId;
     this.endPointBlackList = new LinkedList<>();
@@ -129,4 +132,8 @@ public class MPPQueryContext {
   public List<TEndPoint> getEndPointBlackList() {
     return endPointBlackList;
   }
+
+  public TypeProvider getTypeProvider() {
+    return typeProvider;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
index c106572837..d45d98e22c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/Analysis.java
@@ -65,7 +65,6 @@ public class Analysis {
 
   // map from output column name (for every node) to its datatype
   private final Map<NodeRef<Expression>, TSDataType> expressionTypes = new 
LinkedHashMap<>();
-  private TypeProvider typeProvider;
 
   private boolean finishQueryAfterAnalyze;
 
@@ -129,6 +128,8 @@ public class Analysis {
   // Query Common Analysis (above DeviceView)
   
/////////////////////////////////////////////////////////////////////////////////////////////////
 
+  private List<Pair<Expression, String>> outputExpressions;
+
   // indicate is there a value filter
   private boolean hasValueFilter = false;
 
@@ -223,10 +224,6 @@ public class Analysis {
     this.respDatasetHeader = respDatasetHeader;
   }
 
-  public TypeProvider getTypeProvider() {
-    return typeProvider;
-  }
-
   public TSDataType getType(Expression expression) {
     TSDataType type = expressionTypes.get(NodeRef.of(expression));
     checkArgument(type != null, "Expression not analyzed: %s", expression);
@@ -460,4 +457,12 @@ public class Analysis {
   public void addTypes(Map<NodeRef<Expression>, TSDataType> types) {
     this.expressionTypes.putAll(types);
   }
+
+  public List<Pair<Expression, String>> getOutputExpressions() {
+    return outputExpressions;
+  }
+
+  public void setOutputExpressions(List<Pair<Expression, String>> 
outputExpressions) {
+    this.outputExpressions = outputExpressions;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java
index 27ec021ff4..1c3bdd3b56 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/TypeProvider.java
@@ -22,8 +22,6 @@ package org.apache.iotdb.db.mpp.plan.analyze;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
-import com.google.common.collect.ImmutableMap;
-
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -37,10 +35,6 @@ public class TypeProvider {
 
   private final Map<String, TSDataType> typeMap;
 
-  public static TypeProvider empty() {
-    return new TypeProvider(ImmutableMap.of());
-  }
-
   public TypeProvider() {
     this.typeMap = new HashMap<>();
   }
@@ -49,16 +43,17 @@ public class TypeProvider {
     this.typeMap = typeMap;
   }
 
-  public TSDataType getType(String path) {
-    checkState(typeMap.containsKey(path), String.format("no data type found 
for path: %s", path));
-    return typeMap.get(path);
+  public TSDataType getType(String symbol) {
+    checkState(
+        typeMap.containsKey(symbol), String.format("no data type found for 
symbol: %s", symbol));
+    return typeMap.get(symbol);
   }
 
-  public void setType(String path, TSDataType dataType) {
+  public void setType(String symbol, TSDataType dataType) {
     checkState(
-        !typeMap.containsKey(path) || typeMap.get(path) == dataType,
-        String.format("inconsistent data type for path: %s", path));
-    this.typeMap.put(path, dataType);
+        !typeMap.containsKey(symbol) || typeMap.get(symbol) == dataType,
+        String.format("inconsistent data type for symbol: %s", symbol));
+    this.typeMap.put(symbol, dataType);
   }
 
   public boolean containsTypeInfoOf(String path) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index afa7e34806..fd221a0b43 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -27,7 +27,9 @@ import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.metadata.template.Template;
 import org.apache.iotdb.db.metadata.utils.MetaUtils;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
 import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
+import org.apache.iotdb.db.mpp.plan.analyze.Analysis;
 import org.apache.iotdb.db.mpp.plan.analyze.ExpressionAnalyzer;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.mpp.plan.expression.Expression;
@@ -78,13 +80,17 @@ import 
org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
 import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.Pair;
 
+import com.google.common.base.Function;
 import org.apache.commons.lang.Validate;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -101,7 +107,10 @@ public class LogicalPlanBuilder {
 
   private final MPPQueryContext context;
 
-  public LogicalPlanBuilder(MPPQueryContext context) {
+  private final Function<Expression, TSDataType> getPreAnalyzedType;
+
+  public LogicalPlanBuilder(Analysis analysis, MPPQueryContext context) {
+    this.getPreAnalyzedType = analysis::getType;
     this.context = context;
   }
 
@@ -114,6 +123,14 @@ public class LogicalPlanBuilder {
     return this;
   }
 
+  private void updateTypeProvider(Collection<Expression> expressions) {
+    expressions.forEach(
+        expression ->
+            context
+                .getTypeProvider()
+                .setType(expression.toString(), 
getPreAnalyzedType.apply(expression)));
+  }
+
   public LogicalPlanBuilder planRawDataSource(
       Set<Expression> sourceExpressions, Ordering scanOrder, Filter 
timeFilter) {
     List<PlanNode> sourceNodeList = new ArrayList<>();
@@ -140,6 +157,8 @@ public class LogicalPlanBuilder {
       }
     }
 
+    updateTypeProvider(sourceExpressions);
+
     this.root = convergeWithTimeJoin(sourceNodeList, scanOrder);
     return this;
   }
@@ -160,6 +179,7 @@ public class LogicalPlanBuilder {
         sourceNodeList.add(new 
LastQueryScanNode(context.getQueryId().genPlanNodeId(), selectPath));
       }
     }
+    updateTypeProvider(sourceExpressions);
 
     this.root =
         new LastQueryNode(
@@ -167,6 +187,12 @@ public class LogicalPlanBuilder {
             sourceNodeList,
             globalTimeFilter,
             mergeOrderParameter);
+    ColumnHeaderConstant.lastQueryColumnHeaders.forEach(
+        columnHeader ->
+            context
+                .getTypeProvider()
+                .setType(columnHeader.getColumnName(), 
columnHeader.getColumnType()));
+
     return this;
   }
 
@@ -177,9 +203,7 @@ public class LogicalPlanBuilder {
       Filter timeFilter,
       GroupByTimeParameter groupByTimeParameter,
       Set<Expression> aggregationExpressions,
-      Map<Expression, Set<Expression>> groupByLevelExpressions,
-      TypeProvider typeProvider) {
-
+      Map<Expression, Set<Expression>> groupByLevelExpressions) {
     boolean needCheckAscending = groupByTimeParameter == null;
     Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations = new 
HashMap<>();
     Map<PartialPath, List<AggregationDescriptor>> descendingAggregations = new 
HashMap<>();
@@ -189,7 +213,6 @@ public class LogicalPlanBuilder {
           curStep,
           scanOrder,
           needCheckAscending,
-          typeProvider,
           ascendingAggregations,
           descendingAggregations);
     }
@@ -201,6 +224,7 @@ public class LogicalPlanBuilder {
             scanOrder,
             timeFilter,
             groupByTimeParameter);
+    updateTypeProvider(sourceExpressions);
 
     return convergeAggregationSource(
         sourceNodeList,
@@ -219,8 +243,7 @@ public class LogicalPlanBuilder {
       GroupByTimeParameter groupByTimeParameter,
       Set<Expression> aggregationExpressions,
       List<Integer> measurementIndexes,
-      Map<Expression, Set<Expression>> groupByLevelExpressions,
-      TypeProvider typeProvider) {
+      Map<Expression, Set<Expression>> groupByLevelExpressions) {
     checkArgument(
         sourceExpressions.size() == measurementIndexes.size(),
         "Each aggregate should correspond to a column of output.");
@@ -238,7 +261,6 @@ public class LogicalPlanBuilder {
               curStep,
               scanOrder,
               needCheckAscending,
-              typeProvider,
               ascendingAggregations,
               descendingAggregations);
       aggregationToMeasurementIndexMap.put(aggregationDescriptor, 
measurementIndexes.get(index));
@@ -252,6 +274,7 @@ public class LogicalPlanBuilder {
             scanOrder,
             timeFilter,
             groupByTimeParameter);
+    updateTypeProvider(sourceExpressions);
 
     if (!curStep.isOutputPartial()) {
       // update measurementIndexes
@@ -280,14 +303,13 @@ public class LogicalPlanBuilder {
       AggregationStep curStep,
       Ordering scanOrder,
       boolean needCheckAscending,
-      TypeProvider typeProvider,
       Map<PartialPath, List<AggregationDescriptor>> ascendingAggregations,
       Map<PartialPath, List<AggregationDescriptor>> descendingAggregations) {
     AggregationDescriptor aggregationDescriptor =
         new AggregationDescriptor(
             sourceExpression.getFunctionName(), curStep, 
sourceExpression.getExpressions());
     if (curStep.isOutputPartial()) {
-      updateTypeProviderByPartialAggregation(aggregationDescriptor, 
typeProvider);
+      updateTypeProviderByPartialAggregation(aggregationDescriptor, 
context.getTypeProvider());
     }
     PartialPath selectPath =
         ((TimeSeriesOperand) 
sourceExpression.getExpressions().get(0)).getPath();
@@ -416,9 +438,13 @@ public class LogicalPlanBuilder {
 
   public LogicalPlanBuilder planDeviceView(
       Map<String, PlanNode> deviceNameToSourceNodesMap,
-      List<String> outputColumnNames,
+      List<Pair<Expression, String>> outputExpressions,
       Map<String, List<Integer>> deviceToMeasurementIndexesMap,
       Ordering mergeOrder) {
+    List<String> outputColumnNames =
+        outputExpressions.stream()
+            .map(pair -> pair.getLeft().toString())
+            .collect(Collectors.toList());
     DeviceViewNode deviceViewNode =
         new DeviceViewNode(
             context.getQueryId().genPlanNodeId(),
@@ -434,6 +460,9 @@ public class LogicalPlanBuilder {
       deviceViewNode.addChildDeviceNode(deviceName, subPlan);
     }
 
+    context.getTypeProvider().setType(ColumnHeaderConstant.COLUMN_DEVICE, 
TSDataType.TEXT);
+    
updateTypeProvider(outputExpressions.stream().map(Pair::getLeft).collect(Collectors.toList()));
+
     this.root = deviceViewNode;
     return this;
   }
@@ -461,7 +490,6 @@ public class LogicalPlanBuilder {
       Set<Expression> aggregationExpressions,
       GroupByTimeParameter groupByTimeParameter,
       AggregationStep curStep,
-      TypeProvider typeProvider,
       Ordering scanOrder) {
     if (aggregationExpressions == null) {
       return this;
@@ -469,10 +497,12 @@ public class LogicalPlanBuilder {
 
     List<AggregationDescriptor> aggregationDescriptorList =
         constructAggregationDescriptorList(aggregationExpressions, curStep);
+    updateTypeProvider(aggregationExpressions);
     if (curStep.isOutputPartial()) {
       aggregationDescriptorList.forEach(
           aggregationDescriptor ->
-              updateTypeProviderByPartialAggregation(aggregationDescriptor, 
typeProvider));
+              updateTypeProviderByPartialAggregation(
+                  aggregationDescriptor, context.getTypeProvider()));
     }
     this.root =
         new AggregationNode(
@@ -533,6 +563,7 @@ public class LogicalPlanBuilder {
                   .collect(Collectors.toList()),
               groupedExpression.getExpressions().get(0)));
     }
+    updateTypeProvider(groupByLevelExpressions.keySet());
     return new GroupByLevelNode(
         context.getQueryId().genPlanNodeId(),
         children,
@@ -605,6 +636,7 @@ public class LogicalPlanBuilder {
             isGroupByTime,
             zoneId,
             scanOrder);
+    updateTypeProvider(selectExpressions);
     return this;
   }
 
@@ -632,6 +664,7 @@ public class LogicalPlanBuilder {
             isGroupByTime,
             zoneId,
             scanOrder);
+    updateTypeProvider(transformExpressions);
     return this;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index 87d59a3e2e..d1d5d46e20 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -69,7 +69,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.stream.Collectors;
 
 /**
  * This visitor is used to generate a logical plan for the statement and 
returns the {@link
@@ -91,7 +90,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
 
   @Override
   public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext 
context) {
-    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
 
     if (queryStatement.isLastQuery()) {
       return planBuilder
@@ -105,7 +104,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
     if (queryStatement.isAlignByDevice()) {
       Map<String, PlanNode> deviceToSubPlanMap = new TreeMap<>();
       for (String deviceName : 
analysis.getDeviceToSourceExpressions().keySet()) {
-        LogicalPlanBuilder subPlanBuilder = new LogicalPlanBuilder(context);
+        LogicalPlanBuilder subPlanBuilder = new LogicalPlanBuilder(analysis, 
context);
         subPlanBuilder =
             subPlanBuilder.withNewRoot(
                 visitQueryBody(
@@ -129,9 +128,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
       planBuilder =
           planBuilder.planDeviceView(
               deviceToSubPlanMap,
-              
analysis.getRespDatasetHeader().getColumnNameWithoutAlias().stream()
-                  .distinct()
-                  .collect(Collectors.toList()),
+              analysis.getOutputExpressions(),
               analysis.getDeviceToMeasurementIndexesMap(),
               queryStatement.getResultTimeOrder());
     } else {
@@ -171,7 +168,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
       Expression havingExpression,
       List<Integer> measurementIndexes, // only used in ALIGN BY DEVICE
       MPPQueryContext context) {
-    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
 
     // plan data source node
     if (isRawDataSource) {
@@ -209,7 +206,6 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
                 aggregationExpressions,
                 analysis.getGroupByTimeParameter(),
                 curStep,
-                analysis.getTypeProvider(),
                 queryStatement.getResultTimeOrder());
 
         if (curStep.isOutputPartial()) {
@@ -307,8 +303,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
                 analysis.getGroupByTimeParameter(),
                 aggregationExpressions,
                 measurementIndexes,
-                analysis.getGroupByLevelExpressions(),
-                analysis.getTypeProvider());
+                analysis.getGroupByLevelExpressions());
         if (queryStatement.isGroupByLevel()) {
           planBuilder = // plan Having with GroupByLevel
               planBuilder.planFilterAndTransform(
@@ -335,8 +330,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
                 analysis.getGlobalTimeFilter(),
                 analysis.getGroupByTimeParameter(),
                 aggregationExpressions,
-                analysis.getGroupByLevelExpressions(),
-                analysis.getTypeProvider());
+                analysis.getGroupByLevelExpressions());
 
         if (queryStatement.isGroupByLevel()) {
           planBuilder = // plan Having with GroupByLevel
@@ -483,7 +477,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
   @Override
   public PlanNode visitShowTimeSeries(
       ShowTimeSeriesStatement showTimeSeriesStatement, MPPQueryContext 
context) {
-    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
 
     // If there is only one region, we can push down the offset and limit 
operation to
     // source operator.
@@ -519,7 +513,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
         && null != analysis.getDataPartitionInfo()
         && 0 != analysis.getDataPartitionInfo().getDataPartitionMap().size()) {
       PlanNode lastPlanNode =
-          new LogicalPlanBuilder(context)
+          new LogicalPlanBuilder(analysis, context)
               .planLast(
                   analysis.getSourceExpressions(),
                   analysis.getGlobalTimeFilter(),
@@ -541,7 +535,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
   @Override
   public PlanNode visitShowDevices(
       ShowDevicesStatement showDevicesStatement, MPPQueryContext context) {
-    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
 
     // If there is only one region, we can push down the offset and limit 
operation to
     // source operator.
@@ -578,7 +572,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
   @Override
   public PlanNode visitCountDevices(
       CountDevicesStatement countDevicesStatement, MPPQueryContext context) {
-    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
     return planBuilder
         .planDevicesCountSource(
             countDevicesStatement.getPathPattern(), 
countDevicesStatement.isPrefixPath())
@@ -589,7 +583,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
   @Override
   public PlanNode visitCountTimeSeries(
       CountTimeSeriesStatement countTimeSeriesStatement, MPPQueryContext 
context) {
-    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
     return planBuilder
         .planTimeSeriesCountSource(
             countTimeSeriesStatement.getPathPattern(),
@@ -605,7 +599,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
   @Override
   public PlanNode visitCountLevelTimeSeries(
       CountLevelTimeSeriesStatement countLevelTimeSeriesStatement, 
MPPQueryContext context) {
-    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
     return planBuilder
         .planLevelTimeSeriesCountSource(
             countLevelTimeSeriesStatement.getPathPattern(),
@@ -620,7 +614,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
 
   @Override
   public PlanNode visitCountNodes(CountNodesStatement countStatement, 
MPPQueryContext context) {
-    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
     return planBuilder
         .planNodePathsSchemaSource(countStatement.getPathPattern(), 
countStatement.getLevel())
         .planSchemaQueryMerge(false)
@@ -710,7 +704,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
   @Override
   public PlanNode visitSchemaFetch(
       SchemaFetchStatement schemaFetchStatement, MPPQueryContext context) {
-    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
     List<String> storageGroupList =
         new 
ArrayList<>(analysis.getSchemaPartitionInfo().getSchemaPartitionMap().keySet());
     return planBuilder
@@ -725,7 +719,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
   @Override
   public PlanNode visitShowChildPaths(
       ShowChildPathsStatement showChildPathsStatement, MPPQueryContext 
context) {
-    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
     return planBuilder
         .planNodePathsSchemaSource(showChildPathsStatement.getPartialPath(), 
-1)
         .planSchemaQueryMerge(false)
@@ -736,7 +730,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
   @Override
   public PlanNode visitShowChildNodes(
       ShowChildNodesStatement showChildNodesStatement, MPPQueryContext 
context) {
-    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
     return planBuilder
         .planNodePathsSchemaSource(showChildNodesStatement.getPartialPath(), 
-1)
         .planSchemaQueryMerge(false)
@@ -768,7 +762,7 @@ public class LogicalPlanVisitor extends 
StatementVisitor<PlanNode, MPPQueryConte
   @Override
   public PlanNode visitShowPathsUsingTemplate(
       ShowPathsUsingTemplateStatement showPathsUsingTemplateStatement, 
MPPQueryContext context) {
-    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(context);
+    LogicalPlanBuilder planBuilder = new LogicalPlanBuilder(analysis, context);
     planBuilder =
         planBuilder
             
.planPathsUsingTemplateSource(analysis.getTemplateSetInfo().left.getId())
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
new file mode 100644
index 0000000000..0c78183794
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/SubPlanTypeExtractor.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner;
+
+import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.SimplePlanVisitor;
+
+public class SubPlanTypeExtractor {
+
+  public static TypeProvider extractor(PlanNode root, TypeProvider allTypes) {
+    TypeProvider typeProvider = new TypeProvider();
+    root.accept(new Visitor(typeProvider, allTypes), null);
+    return typeProvider;
+  }
+
+  private static class Visitor extends SimplePlanVisitor<Void> {
+
+    private final TypeProvider typeProvider;
+    private final TypeProvider allTypes;
+
+    public Visitor(TypeProvider typeProvider, TypeProvider allTypes) {
+      this.typeProvider = typeProvider;
+      this.allTypes = allTypes;
+    }
+
+    @Override
+    public Void visitPlan(PlanNode node, Void context) {
+      node.getOutputColumnNames()
+          .forEach(name -> typeProvider.setType(name, allTypes.getType(name)));
+      for (PlanNode source : node.getChildren()) {
+        source.accept(this, context);
+      }
+      return null;
+    }
+  }
+}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index d5c73f6822..2559f46e4b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -112,7 +112,7 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
     fragmentInstance.setDataRegionAndHost(regionReplicaSet);
     fragmentInstance.setHostDataNode(selectTargetDataNode(regionReplicaSet));
 
-    fragmentInstance.getFragment().setTypeProvider(analysis.getTypeProvider());
+    
fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
     instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
     fragmentInstanceList.add(fragmentInstance);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index b063c4cb62..c608e09e1b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -315,7 +315,7 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     leafAggDescriptorList.forEach(
         d ->
             LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
-                d, analysis.getTypeProvider()));
+                d, context.queryContext.getTypeProvider()));
     List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>();
     node.getAggregationDescriptorList()
         .forEach(
@@ -532,7 +532,7 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
     rootAggDescriptorList.forEach(
         d ->
             LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
-                d, analysis.getTypeProvider()));
+                d, context.queryContext.getTypeProvider()));
     checkArgument(
         sources.size() > 0, "Aggregation sources should not be empty when 
distribution planning");
     SeriesAggregationSourceNode seed = sources.get(0);
@@ -590,7 +590,7 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
             : groupSourcesForGroupByLevel(root, sourceGroup, context);
 
     // Then, we calculate the attributes for GroupByLevelNode in each level
-    calculateGroupByLevelNodeAttributes(newRoot, 0);
+    calculateGroupByLevelNodeAttributes(newRoot, 0, context);
     return newRoot;
   }
 
@@ -672,11 +672,13 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
   }
 
   // TODO: (xingtanzjr) consider to implement the descriptor construction in 
every class
-  private void calculateGroupByLevelNodeAttributes(PlanNode node, int level) {
+  private void calculateGroupByLevelNodeAttributes(
+      PlanNode node, int level, DistributionPlanContext context) {
     if (node == null) {
       return;
     }
-    node.getChildren().forEach(child -> 
calculateGroupByLevelNodeAttributes(child, level + 1));
+    node.getChildren()
+        .forEach(child -> calculateGroupByLevelNodeAttributes(child, level + 
1, context));
 
     // Construct all outputColumns from children. Using Set here to avoid 
duplication
     Set<String> childrenOutputColumns = new HashSet<>();
@@ -697,7 +699,7 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
         if (keep) {
           descriptorList.add(originalDescriptor);
           LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
-              originalDescriptor, analysis.getTypeProvider());
+              originalDescriptor, context.queryContext.getTypeProvider());
         }
       }
       handle.setAggregationDescriptorList(descriptorList);
@@ -730,7 +732,7 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
         descriptor.setInputExpressions(new ArrayList<>(descriptorExpressions));
         descriptorList.add(descriptor);
         LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
-            descriptor, analysis.getTypeProvider());
+            descriptor, context.queryContext.getTypeProvider());
       }
       handle.setGroupByLevelDescriptors(descriptorList);
     }
@@ -779,7 +781,7 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
               d -> {
                 d.setStep(isFinal ? AggregationStep.FINAL : 
AggregationStep.PARTIAL);
                 LogicalPlanBuilder.updateTypeProviderByPartialAggregation(
-                    d, analysis.getTypeProvider());
+                    d, context.queryContext.getTypeProvider());
               });
     }
     return sources;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
index 000d94e9eb..c21cfa28fc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/PlanFragment.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
 import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.db.mpp.plan.planner.SubPlanTypeExtractor;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.IPartitionRelatedNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
@@ -36,8 +37,10 @@ import java.util.Objects;
 /** PlanFragment contains a sub-query of distributed query. */
 public class PlanFragment {
   // TODO once you add field for this class you need to change the serialize 
and deserialize methods
-  private PlanFragmentId id;
+  private final PlanFragmentId id;
   private PlanNode planNodeTree;
+
+  // map from output column name (for every node) to its datatype
   private TypeProvider typeProvider;
 
   // indicate whether this PlanFragment is the root of the whole 
Fragment-Plan-Tree or not
@@ -69,6 +72,10 @@ public class PlanFragment {
     this.typeProvider = typeProvider;
   }
 
+  public void generateTypeProvider(TypeProvider allTypes) {
+    this.typeProvider = SubPlanTypeExtractor.extractor(planNodeTree, allTypes);
+  }
+
   public boolean isRoot() {
     return isRoot;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/SimplePlanVisitor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/SimplePlanVisitor.java
new file mode 100644
index 0000000000..7b26611ee4
--- /dev/null
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/SimplePlanVisitor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.planner.plan.node;
+
+public class SimplePlanVisitor<C> extends PlanVisitor<Void, C> {
+  @Override
+  public Void visitPlan(PlanNode node, C context) {
+    for (PlanNode source : node.getChildren()) {
+      source.accept(this, context);
+    }
+    return null;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java
index b150d8d6f8..9361038cae 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/LastQueryTest.java
@@ -21,12 +21,10 @@ package org.apache.iotdb.db.mpp.plan.plan.distribution;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
 import org.apache.iotdb.db.metadata.path.MeasurementPath;
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
 import org.apache.iotdb.db.mpp.common.QueryId;
-import org.apache.iotdb.db.mpp.plan.expression.Expression;
-import org.apache.iotdb.db.mpp.plan.expression.leaf.TimeSeriesOperand;
-import org.apache.iotdb.db.mpp.plan.planner.LogicalPlanBuilder;
 import org.apache.iotdb.db.mpp.plan.planner.distribution.DistributionPlanner;
 import org.apache.iotdb.db.mpp.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
@@ -35,16 +33,17 @@ import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryCollectNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryMergeNode;
 import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.process.last.LastQueryNode;
+import 
org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedLastQueryScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 public class LastQueryTest {
 
@@ -197,12 +196,21 @@ public class LastQueryTest {
 
   private LogicalQueryPlan constructLastQuery(List<String> paths, 
MPPQueryContext context)
       throws IllegalPathException {
-    LogicalPlanBuilder builder = new LogicalPlanBuilder(context);
-    Set<Expression> expressions = new HashSet<>();
+    List<PlanNode> sourceNodeList = new ArrayList<>();
     for (String path : paths) {
-      expressions.add(new TimeSeriesOperand(new MeasurementPath(path)));
+      MeasurementPath selectPath = new MeasurementPath(path);
+      if (selectPath.isUnderAlignedEntity()) {
+        sourceNodeList.add(
+            new AlignedLastQueryScanNode(
+                context.getQueryId().genPlanNodeId(), new 
AlignedPath(selectPath)));
+      } else {
+        sourceNodeList.add(new 
LastQueryScanNode(context.getQueryId().genPlanNodeId(), selectPath));
+      }
     }
-    PlanNode root = builder.planLast(expressions, null, new 
OrderByParameter()).getRoot();
+
+    PlanNode root =
+        new LastQueryNode(
+            context.getQueryId().genPlanNodeId(), sourceNodeList, null, new 
OrderByParameter());
     return new LogicalQueryPlan(context, root);
   }
 }

Reply via email to