This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/TableModelGrammar_0627
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 47d2b5248e03b5b5bc6b4a816dad430fe5967e7c
Merge: c7e2a13f7e3 751420c9eb9
Author: Beyyes <[email protected]>
AuthorDate: Wed Jul 3 17:30:37 2024 +0800

    merge

 .../iotdb/itbase/runtime/ClusterTestStatement.java |  11 +-
 .../relational/it/schema/IoTDBDatabaseIT.java      |   4 +-
 .../iotdb/relational/it/schema/IoTDBTableIT.java   | 203 +++++++++++++++++++++
 .../CnToDnInternalServiceAsyncRequestManager.java  |   5 +
 .../load/subscriber/NodeStatisticsChangeEvent.java |  31 ----
 .../impl/schema/table/CreateTableProcedure.java    |   8 +-
 .../relational/ColumnTransformerBuilder.java       |  55 +++---
 .../execution/config/TableConfigTaskVisitor.java   |  65 ++++---
 .../config/executor/ClusterConfigTaskExecutor.java |  13 +-
 .../relational/AlterTableAddColumnTask.java        |  14 +-
 .../config/metadata/relational/ShowTablesTask.java |   7 +-
 .../plan/planner/plan/node/PlanNodeType.java       |   3 +-
 .../plan/planner/plan/node/PlanVisitor.java        |   5 +
 .../relational/analyzer/ExpressionAnalyzer.java    |  43 ++---
 .../predicate/ConvertPredicateToFilterVisitor.java |   8 +
 .../plan/relational/metadata/ColumnSchema.java     |  12 ++
 .../relational/metadata/TableMetadataImpl.java     |  39 ++--
 .../metadata/fetcher/TableDeviceSchemaFetcher.java |  15 +-
 .../fetcher/TableHeaderSchemaValidator.java        |  78 +++++++-
 .../plan/relational/planner/LogicalPlanner.java    |  27 +--
 .../distribute/DistributedPlanGenerator.java       |   6 +-
 .../plan/relational/planner/node/CollectNode.java  |  73 +++++++-
 .../plan/relational/planner/node/OutputNode.java   |   1 +
 .../plan/relational/type/InternalTypeManager.java  |  16 ++
 .../dag/column/CaseWhenThenColumnTransformer.java  |  13 +-
 .../dag/column/ColumnTransformer.java              |   8 +-
 .../binary/CompareBinaryColumnTransformer.java     |   4 +-
 .../binary/CompareEqualToColumnTransformer.java    |  12 +-
 .../binary/CompareNonEqualColumnTransformer.java   |  12 +-
 .../column/ternary/BetweenColumnTransformer.java   |   5 +-
 .../ternary/CompareTernaryColumnTransformer.java   |   9 +-
 .../dag/column/unary/InColumnTransformer.java      |   8 +
 .../dag/column/unary/RegularColumnTransformer.java |   5 +-
 .../scalar/CastFunctionColumnTransformer.java      |  38 ++++
 .../db/schemaengine/table/DataNodeTableCache.java  |  10 +-
 .../compaction/io/CompactionTsFileWriter.java      |   1 +
 .../apache/iotdb/db/utils/ErrorHandlingUtils.java  |   6 +-
 .../execution/operator/OperatorMemoryTest.java     |  19 +-
 .../plan/relational/analyzer/AnalyzerTest.java     |   4 +-
 .../plan/relational/analyzer/TestMatadata.java     |  55 +-----
 .../compaction/CompactionValidationTest.java       |   6 +-
 .../apache/iotdb/commons/schema/table/TsTable.java |   4 +
 42 files changed, 686 insertions(+), 275 deletions(-)

diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
index 3c605c78640,00000000000..458bc53ca1c
mode 100644,000000..100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
@@@ -1,445 -1,0 +1,447 @@@
 +/*
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 +
 +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 +import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 +import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
 +import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 +import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.AbstractSchemaMergeNode;
 +import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
 +import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceSourceNode;
 +import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
 +import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
 +import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
 +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 +import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 +import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 +import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
 +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
 +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
 +import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 +
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.Comparator;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
 +import static com.google.common.collect.ImmutableList.toImmutableList;
 +import static org.apache.iotdb.commons.conf.IoTDBConstant.TIME;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
 +
 +public class DistributedPlanGenerator
 +    extends PlanVisitor<List<PlanNode>, DistributedPlanGenerator.PlanContext> 
{
 +  private final MPPQueryContext queryContext;
 +  private final Analysis analysis;
 +  Map<PlanNodeId, OrderingScheme> planNodeOrderingSchemeMap = new HashMap<>();
 +
 +  public DistributedPlanGenerator(MPPQueryContext queryContext, Analysis 
analysis) {
 +    this.queryContext = queryContext;
 +    this.analysis = analysis;
 +  }
 +
 +  public List<PlanNode> genResult(PlanNode node, PlanContext context) {
 +    return node.accept(this, context);
 +  }
 +
 +  @Override
 +  public List<PlanNode> visitPlan(PlanNode node, 
DistributedPlanGenerator.PlanContext context) {
 +    if (node instanceof WritePlanNode) {
 +      return Collections.singletonList(node);
 +    }
 +
 +    List<List<PlanNode>> children =
 +        node.getChildren().stream()
 +            .map(child -> child.accept(this, context))
 +            .collect(toImmutableList());
 +
 +    PlanNode newNode = node.clone();
 +    for (List<PlanNode> planNodes : children) {
 +      planNodes.forEach(newNode::addChild);
 +    }
 +    return Collections.singletonList(newNode);
 +  }
 +
 +  @Override
 +  public List<PlanNode> visitOutput(OutputNode outputNode, PlanContext 
context) {
 +    // TODO only consider the order of IDs
 +    context.expectedOrderingScheme =
 +        new OrderingScheme(
 +            outputNode.getOutputSymbols(),
 +            outputNode.getOutputSymbols().stream()
 +                .collect(Collectors.toMap(symbol -> symbol, symbol -> 
SortOrder.ASC_NULLS_LAST)));
 +
 +    List<PlanNode> childrenNodes = outputNode.getChild().accept(this, 
context);
 +    if (childrenNodes.size() == 1) {
 +      outputNode.setChild(childrenNodes.get(0));
 +      return Collections.singletonList(outputNode);
 +    }
 +
 +    return connectViaMergeSort(outputNode, childrenNodes);
 +  }
 +
 +  @Override
 +  public List<PlanNode> visitLimit(LimitNode limitNode, PlanContext context) {
 +    List<PlanNode> childrenNodes = limitNode.getChild().accept(this, context);
 +    if (childrenNodes.size() == 1) {
 +      limitNode.setChild(childrenNodes.get(0));
 +      return Collections.singletonList(limitNode);
 +    }
 +
 +    return connectViaMergeSort(limitNode, childrenNodes);
 +  }
 +
 +  @Override
 +  public List<PlanNode> visitOffset(OffsetNode offsetNode, PlanContext 
context) {
 +    List<PlanNode> childrenNodes = offsetNode.getChild().accept(this, 
context);
 +    if (childrenNodes.size() == 1) {
 +      offsetNode.setChild(childrenNodes.get(0));
 +      return Collections.singletonList(offsetNode);
 +    }
 +
 +    return connectViaMergeSort(offsetNode, childrenNodes);
 +  }
 +
 +  @Override
 +  public List<PlanNode> visitProject(ProjectNode projectNode, PlanContext 
context) {
 +    List<PlanNode> childrenNodes = projectNode.getChild().accept(this, 
context);
 +    if (childrenNodes.size() == 1) {
 +      projectNode.setChild(childrenNodes.get(0));
 +      return Collections.singletonList(projectNode);
 +    }
 +
 +    for (Expression expression : 
projectNode.getAssignments().getMap().values()) {
 +      if (containsDiffFunction(expression)) {
 +        return connectViaMergeSort(projectNode, childrenNodes);
 +      }
 +    }
 +
 +    return childrenNodes.stream()
 +        .map(
 +            child ->
 +                new ProjectNode(
 +                    queryContext.getQueryId().genPlanNodeId(), child, 
projectNode.getAssignments()))
 +        .collect(Collectors.toList());
 +  }
 +
 +  @Override
 +  public List<PlanNode> visitSort(SortNode sortNode, PlanContext context) {
 +    context.expectedOrderingScheme = sortNode.getOrderingScheme();
 +    context.hasSortNode = true;
 +
 +    List<PlanNode> childrenNodes = sortNode.getChild().accept(this, context);
 +    if (childrenNodes.size() == 1) {
 +      sortNode.setChild(childrenNodes.get(0));
 +      return Collections.singletonList(sortNode);
 +    }
 +
 +    MergeSortNode mergeSortNode =
 +        new MergeSortNode(
 +            queryContext.getQueryId().genPlanNodeId(),
 +            sortNode.getOrderingScheme(),
 +            sortNode.getOutputSymbols());
 +    for (PlanNode child : childrenNodes) {
 +      SortNode subSortNode =
 +          new SortNode(
 +              queryContext.getQueryId().genPlanNodeId(),
 +              child,
 +              sortNode.getOrderingScheme(),
 +              false);
 +      mergeSortNode.addChild(subSortNode);
 +      planNodeOrderingSchemeMap.put(subSortNode.getPlanNodeId(), 
sortNode.getOrderingScheme());
 +    }
 +    planNodeOrderingSchemeMap.put(mergeSortNode.getPlanNodeId(), 
sortNode.getOrderingScheme());
 +    return Collections.singletonList(mergeSortNode);
 +  }
 +
 +  @Override
 +  public List<PlanNode> visitFilter(FilterNode filterNode, PlanContext 
context) {
 +    List<PlanNode> childrenNodes = filterNode.getChild().accept(this, 
context);
 +    if (childrenNodes.size() == 1) {
 +      filterNode.setChild(childrenNodes.get(0));
 +      return Collections.singletonList(filterNode);
 +    }
 +
 +    if (containsDiffFunction(filterNode.getPredicate())) {
 +      return connectViaMergeSort(filterNode, childrenNodes);
 +    }
 +
 +    return childrenNodes.stream()
 +        .map(
 +            child ->
 +                new FilterNode(
 +                    queryContext.getQueryId().genPlanNodeId(), child, 
filterNode.getPredicate()))
 +        .collect(Collectors.toList());
 +  }
 +
 +  @Override
 +  public List<PlanNode> visitTableScan(TableScanNode node, PlanContext 
context) {
 +
 +    Map<TRegionReplicaSet, TableScanNode> tableScanNodeMap = new HashMap<>();
 +
 +    for (DeviceEntry deviceEntry : node.getDeviceEntries()) {
 +      List<TRegionReplicaSet> regionReplicaSets =
 +          analysis
 +              .getDataPartitionInfo()
 +              .getDataRegionReplicaSetWithTimeFilter(
 +                  node.getQualifiedObjectName().getDatabaseName(),
 +                  deviceEntry.getDeviceID(),
 +                  node.getTimeFilter());
 +      for (TRegionReplicaSet regionReplicaSet : regionReplicaSets) {
 +        TableScanNode tableScanNode =
 +            tableScanNodeMap.computeIfAbsent(
 +                regionReplicaSet,
 +                k -> {
 +                  TableScanNode scanNode =
 +                      new TableScanNode(
 +                          queryContext.getQueryId().genPlanNodeId(),
 +                          node.getQualifiedObjectName(),
 +                          node.getOutputSymbols(),
 +                          node.getAssignments(),
 +                          new ArrayList<>(),
 +                          node.getIdAndAttributeIndexMap(),
 +                          node.getScanOrder(),
 +                          node.getTimePredicate().orElse(null),
 +                          node.getPushDownPredicate());
 +                  scanNode.setRegionReplicaSet(regionReplicaSet);
 +                  return scanNode;
 +                });
 +        tableScanNode.appendDeviceEntry(deviceEntry);
 +      }
 +    }
 +
 +    context.hasExchangeNode = tableScanNodeMap.size() > 1;
 +
 +    List<PlanNode> tableScanNodeList = new ArrayList<>();
 +    TRegionReplicaSet mostUsedDataRegion = null;
 +    int maxDeviceEntrySizeOfTableScan = 0;
 +    for (Map.Entry<TRegionReplicaSet, TableScanNode> entry : 
tableScanNodeMap.entrySet()) {
 +      TRegionReplicaSet regionReplicaSet = entry.getKey();
 +      TableScanNode subTableScanNode = entry.getValue();
 +      
subTableScanNode.setPlanNodeId(queryContext.getQueryId().genPlanNodeId());
 +      subTableScanNode.setRegionReplicaSet(regionReplicaSet);
 +      tableScanNodeList.add(subTableScanNode);
 +
 +      if (mostUsedDataRegion == null
 +          || subTableScanNode.getDeviceEntries().size() > 
maxDeviceEntrySizeOfTableScan) {
 +        mostUsedDataRegion = regionReplicaSet;
 +        maxDeviceEntrySizeOfTableScan = 
subTableScanNode.getDeviceEntries().size();
 +      }
 +    }
 +    context.mostUsedDataRegion = mostUsedDataRegion;
 +
 +    List<Symbol> newOrderingSymbols = new ArrayList<>();
 +    List<SortOrder> newSortOrders = new ArrayList<>();
 +    OrderingScheme expectedOrderingScheme = context.expectedOrderingScheme;
 +
 +    for (Symbol symbol : expectedOrderingScheme.getOrderBy()) {
 +      if (!context.hasSortNode && TIME.equalsIgnoreCase(symbol.getName())) {
 +        continue;
 +      }
 +
 +      if (!node.getIdAndAttributeIndexMap().containsKey(symbol)) {
 +        break;
 +      }
 +
 +      newOrderingSymbols.add(symbol);
 +      newSortOrders.add(expectedOrderingScheme.getOrdering(symbol));
 +    }
 +
 +    List<Function<DeviceEntry, String>> orderingRules = new ArrayList<>();
 +    for (Symbol symbol : newOrderingSymbols) {
 +      int idx = node.getIdAndAttributeIndexMap().get(symbol);
 +      if (node.getAssignments().get(symbol).getColumnCategory() == 
TsTableColumnCategory.ID) {
 +        // segments[0] is always tableName
 +        orderingRules.add(deviceEntry -> (String) 
deviceEntry.getDeviceID().getSegments()[idx + 1]);
 +      } else {
 +        orderingRules.add(deviceEntry -> 
deviceEntry.getAttributeColumnValues().get(idx));
 +      }
 +    }
 +
 +    Comparator<DeviceEntry> comparator;
 +    if (newSortOrders.get(0).isNullsFirst()) {
 +      if (newSortOrders.get(0).isAscending()) {
 +        comparator = 
Comparator.nullsFirst(Comparator.comparing(orderingRules.get(0)));
 +      } else {
 +        comparator = 
Comparator.nullsFirst(Comparator.comparing(orderingRules.get(0))).reversed();
 +      }
 +    } else {
 +      if (newSortOrders.get(0).isAscending()) {
 +        comparator = 
Comparator.nullsLast(Comparator.comparing(orderingRules.get(0)));
 +      } else {
 +        comparator = 
Comparator.nullsLast(Comparator.comparing(orderingRules.get(0))).reversed();
 +      }
 +    }
 +    for (int i = 1; i < orderingRules.size(); i++) {
 +      Comparator<DeviceEntry> thenComparator;
 +      if (newSortOrders.get(i).isNullsFirst()) {
 +        if (newSortOrders.get(i).isAscending()) {
 +          thenComparator = 
Comparator.nullsFirst(Comparator.comparing(orderingRules.get(i)));
 +        } else {
-           thenComparator = 
Comparator.nullsFirst(Comparator.comparing(orderingRules.get(i))).reversed();
++          thenComparator =
++              
Comparator.nullsFirst(Comparator.comparing(orderingRules.get(i))).reversed();
 +        }
 +      } else {
 +        if (newSortOrders.get(i).isAscending()) {
 +          thenComparator = 
Comparator.nullsLast(Comparator.comparing(orderingRules.get(i)));
 +        } else {
-           thenComparator = 
Comparator.nullsLast(Comparator.comparing(orderingRules.get(i))).reversed();
++          thenComparator =
++              
Comparator.nullsLast(Comparator.comparing(orderingRules.get(i))).reversed();
 +        }
 +      }
 +      comparator = comparator.thenComparing(thenComparator);
 +    }
 +
 +    OrderingScheme newOrderingScheme =
 +        new OrderingScheme(
 +            newOrderingSymbols,
 +            IntStream.range(0, newOrderingSymbols.size())
 +                .boxed()
 +                .collect(Collectors.toMap(newOrderingSymbols::get, 
newSortOrders::get)));
 +    for (PlanNode planNode : tableScanNodeList) {
 +      TableScanNode tableScanNode = (TableScanNode) planNode;
 +      planNodeOrderingSchemeMap.put(tableScanNode.getPlanNodeId(), 
newOrderingScheme);
 +      List<DeviceEntry> deviceEntries = tableScanNode.getDeviceEntries();
 +      deviceEntries.sort(comparator);
 +    }
 +
 +    return tableScanNodeList;
 +  }
 +
 +  private List<PlanNode> connectViaMergeSort(
 +      SingleChildProcessNode node, List<PlanNode> childrenNodes) {
 +    OrderingScheme childrenOrderingScheme =
 +        planNodeOrderingSchemeMap.get(childrenNodes.get(0).getPlanNodeId());
 +    MergeSortNode mergeSortNode =
 +        new MergeSortNode(
 +            queryContext.getQueryId().genPlanNodeId(),
 +            childrenOrderingScheme,
 +            node.getOutputSymbols());
 +    childrenNodes.forEach(mergeSortNode::addChild);
 +    node.setChild(mergeSortNode);
 +    planNodeOrderingSchemeMap.put(node.getPlanNodeId(), 
childrenOrderingScheme);
 +    return Collections.singletonList(node);
 +  }
 +
 +  // ------------------- schema related interface 
---------------------------------------------
 +
 +  @Override
 +  public List<PlanNode> visitSchemaQueryMerge(SchemaQueryMergeNode node, 
PlanContext context) {
 +    return Collections.singletonList(
 +        addExchangeNodeForSchemaMerge(rewriteSchemaQuerySource(node, 
context), context));
 +  }
 +
 +  private SchemaQueryMergeNode rewriteSchemaQuerySource(
 +      SchemaQueryMergeNode node, PlanContext context) {
 +    SchemaQueryMergeNode root = (SchemaQueryMergeNode) node.clone();
 +
 +    String database = ((TableDeviceSourceNode) 
node.getChildren().get(0)).getDatabase();
 +    Set<TRegionReplicaSet> schemaRegionSet = new HashSet<>();
 +    analysis
 +        .getSchemaPartitionInfo()
 +        .getSchemaPartitionMap()
 +        .get(database)
 +        .forEach(
 +            (deviceGroupId, schemaRegionReplicaSet) -> 
schemaRegionSet.add(schemaRegionReplicaSet));
 +
 +    for (PlanNode child : node.getChildren()) {
 +      for (TRegionReplicaSet schemaRegion : schemaRegionSet) {
 +        SourceNode clonedChild = (SourceNode) child.clone();
 +        clonedChild.setPlanNodeId(queryContext.getQueryId().genPlanNodeId());
 +        clonedChild.setRegionReplicaSet(schemaRegion);
 +        root.addChild(clonedChild);
 +      }
 +    }
 +    return root;
 +  }
 +
 +  private PlanNode addExchangeNodeForSchemaMerge(
 +      AbstractSchemaMergeNode node, PlanContext context) {
 +    node.getChildren()
 +        .forEach(
 +            child ->
 +                context.putNodeDistribution(
 +                    child.getPlanNodeId(),
 +                    new NodeDistribution(
 +                        NodeDistributionType.NO_CHILD,
 +                        ((SourceNode) child).getRegionReplicaSet())));
 +    NodeDistribution nodeDistribution =
 +        new 
NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN);
 +    PlanNode newNode = node.clone();
 +    
nodeDistribution.setRegion(calculateSchemaRegionByChildren(node.getChildren(), 
context));
 +    context.putNodeDistribution(newNode.getPlanNodeId(), nodeDistribution);
 +    node.getChildren()
 +        .forEach(
 +            child -> {
 +              if (!nodeDistribution
 +                  .getRegion()
 +                  
.equals(context.getNodeDistribution(child.getPlanNodeId()).getRegion())) {
 +                ExchangeNode exchangeNode =
 +                    new 
ExchangeNode(queryContext.getQueryId().genPlanNodeId());
 +                exchangeNode.setChild(child);
 +                
exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
 +                context.hasExchangeNode = true;
 +                newNode.addChild(exchangeNode);
 +              } else {
 +                newNode.addChild(child);
 +              }
 +            });
 +    return newNode;
 +  }
 +
 +  private TRegionReplicaSet calculateSchemaRegionByChildren(
 +      List<PlanNode> children, PlanContext context) {
 +    // We always make the schemaRegion of SchemaMergeNode to be the same as 
its first child.
 +    return 
context.getNodeDistribution(children.get(0).getPlanNodeId()).getRegion();
 +  }
 +
 +  public static class PlanContext {
 +    final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
 +    boolean hasExchangeNode = false;
 +    boolean hasSortNode = false;
 +    OrderingScheme expectedOrderingScheme;
 +    TRegionReplicaSet mostUsedDataRegion;
 +
 +    public PlanContext() {
 +      this.nodeDistributionMap = new HashMap<>();
 +    }
 +
 +    public NodeDistribution getNodeDistribution(PlanNodeId nodeId) {
 +      return this.nodeDistributionMap.get(nodeId);
 +    }
 +
 +    public void putNodeDistribution(PlanNodeId nodeId, NodeDistribution 
distribution) {
 +      this.nodeDistributionMap.put(nodeId, distribution);
 +    }
 +  }
 +}
diff --cc 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
index 85444b62264,87d3ce34afa..332277013ba
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
@@@ -1,4 -1,56 +1,75 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
  package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
  
- public class CollectNode {
+ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
+ import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+ 
+ import java.io.DataOutputStream;
+ import java.io.IOException;
+ import java.nio.ByteBuffer;
+ import java.util.List;
+ 
+ /** CollectNode output the content of children. */
+ public class CollectNode extends MultiChildProcessNode {
+ 
+   public CollectNode(PlanNodeId id) {
+     super(id);
+   }
+ 
+   @Override
+   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+     return visitor.visitCollect(this, context);
+   }
+ 
+   @Override
+   public PlanNode clone() {
+     return new CollectNode(id);
+   }
+ 
+   @Override
+   public List<Symbol> getOutputSymbols() {
+     return children.get(0).getOutputSymbols();
+   }
+ 
+   @Override
+   public List<String> getOutputColumnNames() {
+     throw new UnsupportedOperationException();
+   }
+ 
+   @Override
+   protected void serializeAttributes(ByteBuffer byteBuffer) {
+     PlanNodeType.TABLE_COLLECT_NODE.serialize(byteBuffer);
+   }
+ 
+   @Override
+   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+     PlanNodeType.TABLE_COLLECT_NODE.serialize(stream);
+   }
+ 
+   public static CollectNode deserialize(ByteBuffer byteBuffer) {
+     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+     return new CollectNode(planNodeId);
+   }
  }
diff --cc 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index d10b3354605,0369e96d111..06fd8dc24ef
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@@ -225,27 -218,6 +225,29 @@@ public class AnalyzerTest 
      assertNull(tableScanNode.getPushDownPredicate());
      assertEquals(ASC, tableScanNode.getScanOrder());
      distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
 +    distributedQueryPlan = distributionPlanner.plan();
 +    assertEquals(3, distributedQueryPlan.getFragments().size());
 +    assertTrue(
 +        
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0)
 +            instanceof OutputNode);
 +    OutputNode outputNode =
 +        (OutputNode)
 +            
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
 +    assertTrue(outputNode.getChildren().get(0) instanceof MergeSortNode);
 +    MergeSortNode mergeSortNode = (MergeSortNode) 
outputNode.getChildren().get(0);
 +    assertEquals(
 +        Arrays.asList("tag1", "tag2", "tag3", "attr1", "attr2"),
 +        mergeSortNode.getOrderingScheme().getOrderBy().stream()
 +            .map(Symbol::getName)
 +            .collect(Collectors.toList()));
 +    assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
 +    assertTrue(mergeSortNode.getChildren().get(1) instanceof TableScanNode);
 +    assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
 +    TableScanNode tableScanNode = (TableScanNode) 
mergeSortNode.getChildren().get(1);
 +    assertEquals(4, tableScanNode.getDeviceEntries().size());
-     assertEquals(Arrays.asList(), 
tableScanNode.getDeviceEntries().stream().map(d -> d.getDeviceID().toString()));
++    assertEquals(
++        Arrays.asList(),
++        tableScanNode.getDeviceEntries().stream().map(d -> 
d.getDeviceID().toString()));
    }
  
    @Test
diff --cc 
iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
index b20d1e0dc83,0584cf62dfb..6942b7863ed
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
@@@ -52,24 -53,12 +52,24 @@@ import java.util.Locale
  import java.util.Map;
  import java.util.Optional;
  
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_1;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_1_ATTRIBUTES;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_2;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_2_ATTRIBUTES;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_3;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_3_ATTRIBUTES;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_4;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_4_ATTRIBUTES;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_5;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_5_ATTRIBUTES;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_6;
 +import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_6_ATTRIBUTES;
  import static 
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl.getFunctionType;
- import static org.apache.tsfile.read.common.type.BinaryType.TEXT;
+ import static 
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl.isOneNumericType;
+ import static 
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl.isTwoNumericType;
+ import static 
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl.isTwoTypeComparable;
  import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
  import static org.apache.tsfile.read.common.type.DoubleType.DOUBLE;
- import static org.apache.tsfile.read.common.type.FloatType.FLOAT;
- import static org.apache.tsfile.read.common.type.IntType.INT32;
  import static org.apache.tsfile.read.common.type.LongType.INT64;
  
  public class TestMatadata implements Metadata {
@@@ -255,46 -241,9 +255,11 @@@
      return DATA_PARTITION;
    }
  
-   public static boolean isTwoNumericType(List<? extends Type> argumentTypes) {
-     return argumentTypes.size() == 2
-         && isNumericType(argumentTypes.get(0))
-         && isNumericType(argumentTypes.get(1));
-   }
- 
-   public static boolean isOneNumericType(List<? extends Type> argumentTypes) {
-     return argumentTypes.size() == 1 && isNumericType(argumentTypes.get(0));
-   }
- 
-   public static boolean isOneBooleanType(List<? extends Type> argumentTypes) {
-     return argumentTypes.size() == 1 && BOOLEAN.equals(argumentTypes.get(0));
-   }
- 
-   public static boolean isOneTextType(List<? extends Type> argumentTypes) {
-     return argumentTypes.size() == 1 && TEXT.equals(argumentTypes.get(0));
-   }
- 
-   public static boolean isNumericType(Type type) {
-     return DOUBLE.equals(type) || FLOAT.equals(type) || INT32.equals(type) || 
INT64.equals(type);
-   }
- 
-   public static boolean isTwoTypeComparable(List<? extends Type> 
argumentTypes) {
-     if (argumentTypes.size() != 2) {
-       return false;
-     }
-     Type left = argumentTypes.get(0);
-     Type right = argumentTypes.get(1);
-     if (left.equals(right)) {
-       return true;
-     }
- 
-     // Boolean type and Binary Type can not be compared with other types
-     return isNumericType(left) && isNumericType(right);
-   }
- 
 -  private static final DataPartition DATA_PARTITION = 
MockTablePartition.constructDataPartition();
 +  private static final DataPartition DATA_PARTITION =
 +      MockTableModelDataPartition.constructDataPartition();
++
    private static final SchemaPartition SCHEMA_PARTITION =
 -      MockTablePartition.constructSchemaPartition();
 +      MockTableModelDataPartition.constructSchemaPartition();
  
    private static IPartitionFetcher getFakePartitionFetcher() {
  

Reply via email to