This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/TableModelGrammar_0627
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c7e2a13f7e38b3b44aeee7ac92e50b3c8e55b82d
Author: Beyyes <[email protected]>
AuthorDate: Wed Jul 3 15:05:36 2024 +0800

    add more ut
---
 .../planner/distribute/AddExchangeNodes.java       |  74 ++++++++-
 .../distribute/DistributedPlanGenerator.java       | 133 ++++++++++++----
 .../distribute/TableDistributionPlanner.java       |  33 ++--
 .../plan/relational/planner/node/CollectNode.java  |   4 +
 .../plan/relational/analyzer/AnalyzerTest.java     |  32 +++-
 .../analyzer/MockTableModelDataPartition.java      | 166 ++++++++++++++++++++
 .../relational/analyzer/MockTablePartition.java    | 174 ---------------------
 .../plan/relational/analyzer/TestMatadata.java     |  29 +++-
 .../iotdb/commons/partition/DataPartition.java     |  17 +-
 9 files changed, 426 insertions(+), 236 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
index e8ff90a5598..61f993ee600 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
@@ -19,4 +19,76 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 
-public class AddExchangeNodes {}
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
+
+import static 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType.SAME_WITH_ALL_CHILDREN;
+import static 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType.SAME_WITH_SOME_CHILD;
+
+public class AddExchangeNodes extends PlanVisitor<PlanNode, 
DistributedPlanGenerator.PlanContext> {
+
+  private final MPPQueryContext queryContext;
+
+  public AddExchangeNodes(MPPQueryContext queryContext) {
+    this.queryContext = queryContext;
+  }
+
+  public PlanNode addExchangeNodes(PlanNode node, 
DistributedPlanGenerator.PlanContext context) {
+    return node.accept(this, context);
+  }
+
+  @Override
+  public PlanNode visitPlan(PlanNode node, 
DistributedPlanGenerator.PlanContext context) {
+    if (node instanceof WritePlanNode) {
+      return node;
+    }
+
+    PlanNode newNode = node.clone();
+    if (node.getChildren().size() == 1) {
+      newNode.addChild(node.getChildren().get(0).accept(this, context));
+      context.nodeDistributionMap.put(
+          node.getPlanNodeId(),
+          new NodeDistribution(
+              SAME_WITH_ALL_CHILDREN,
+              context
+                  .nodeDistributionMap
+                  .get(node.getChildren().get(0).getPlanNodeId())
+                  .getRegion()));
+      return newNode;
+    }
+
+    for (PlanNode child : node.getChildren()) {
+      PlanNode rewriteNode = child.accept(this, context);
+
+      TRegionReplicaSet region =
+          
context.nodeDistributionMap.get(rewriteNode.getPlanNodeId()).getRegion();
+      if (!region.equals(context.mostUsedDataRegion)) {
+        ExchangeNode exchangeNode = new 
ExchangeNode(queryContext.getQueryId().genPlanNodeId());
+        exchangeNode.addChild(rewriteNode);
+        newNode.addChild(exchangeNode);
+      } else {
+        newNode.addChild(rewriteNode);
+      }
+    }
+
+    context.nodeDistributionMap.put(
+        node.getPlanNodeId(),
+        new NodeDistribution(SAME_WITH_SOME_CHILD, 
context.mostUsedDataRegion));
+
+    return newNode;
+  }
+
+  @Override
+  public PlanNode visitTableScan(TableScanNode node, 
DistributedPlanGenerator.PlanContext context) {
+    context.nodeDistributionMap.put(
+        node.getPlanNodeId(),
+        new NodeDistribution(SAME_WITH_ALL_CHILDREN, 
node.getRegionReplicaSet()));
+    return node;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
index 49f0b1257eb..3c605c78640 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
@@ -14,6 +14,7 @@
 package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
 import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType;
@@ -44,15 +45,18 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
-import static 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType.SAME_WITH_ALL_CHILDREN;
+import static org.apache.iotdb.commons.conf.IoTDBConstant.TIME;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
 
 public class DistributedPlanGenerator
@@ -90,6 +94,7 @@ public class DistributedPlanGenerator
 
   @Override
   public List<PlanNode> visitOutput(OutputNode outputNode, PlanContext 
context) {
+    // TODO only consider the order of IDs
     context.expectedOrderingScheme =
         new OrderingScheme(
             outputNode.getOutputSymbols(),
@@ -152,6 +157,7 @@ public class DistributedPlanGenerator
   @Override
   public List<PlanNode> visitSort(SortNode sortNode, PlanContext context) {
     context.expectedOrderingScheme = sortNode.getOrderingScheme();
+    context.hasSortNode = true;
 
     List<PlanNode> childrenNodes = sortNode.getChild().accept(this, context);
     if (childrenNodes.size() == 1) {
@@ -234,43 +240,100 @@ public class DistributedPlanGenerator
       }
     }
 
-    int i = 0;
-    if (tableScanNodeMap.size() > 1) {
-      context.hasExchangeNode = true;
-      List<Symbol> orderBy = node.getOutputSymbols().subList(0, 1);
-      Map<Symbol, SortOrder> orderings =
-          Collections.singletonMap(node.getOutputSymbols().get(0), 
SortOrder.ASC_NULLS_LAST);
-      OrderingScheme orderingScheme = new OrderingScheme(orderBy, orderings);
-      MergeSortNode mergeSortNode =
-          new MergeSortNode(
-              queryContext.getQueryId().genPlanNodeId(), orderingScheme, 
node.getOutputSymbols());
-
-      for (Map.Entry<TRegionReplicaSet, TableScanNode> entry : 
tableScanNodeMap.entrySet()) {
-        TRegionReplicaSet regionReplicaSet = entry.getKey();
-        TableScanNode subTableScanNode = entry.getValue();
-        
subTableScanNode.setPlanNodeId(queryContext.getQueryId().genPlanNodeId());
-        subTableScanNode.setRegionReplicaSet(regionReplicaSet);
-        context.nodeDistributionMap.put(
-            subTableScanNode.getPlanNodeId(),
-            new NodeDistribution(SAME_WITH_ALL_CHILDREN, regionReplicaSet));
-
-        // TODO not use 0 replica set as root replica?
-        if (i == 0) {
-          mergeSortNode.addChild(subTableScanNode);
-          context.nodeDistributionMap.put(
-              mergeSortNode.getPlanNodeId(),
-              new NodeDistribution(SAME_WITH_ALL_CHILDREN, regionReplicaSet));
+    context.hasExchangeNode = tableScanNodeMap.size() > 1;
+
+    List<PlanNode> tableScanNodeList = new ArrayList<>();
+    TRegionReplicaSet mostUsedDataRegion = null;
+    int maxDeviceEntrySizeOfTableScan = 0;
+    for (Map.Entry<TRegionReplicaSet, TableScanNode> entry : 
tableScanNodeMap.entrySet()) {
+      TRegionReplicaSet regionReplicaSet = entry.getKey();
+      TableScanNode subTableScanNode = entry.getValue();
+      
subTableScanNode.setPlanNodeId(queryContext.getQueryId().genPlanNodeId());
+      subTableScanNode.setRegionReplicaSet(regionReplicaSet);
+      tableScanNodeList.add(subTableScanNode);
+
+      if (mostUsedDataRegion == null
+          || subTableScanNode.getDeviceEntries().size() > 
maxDeviceEntrySizeOfTableScan) {
+        mostUsedDataRegion = regionReplicaSet;
+        maxDeviceEntrySizeOfTableScan = 
subTableScanNode.getDeviceEntries().size();
+      }
+    }
+    context.mostUsedDataRegion = mostUsedDataRegion;
+
+    List<Symbol> newOrderingSymbols = new ArrayList<>();
+    List<SortOrder> newSortOrders = new ArrayList<>();
+    OrderingScheme expectedOrderingScheme = context.expectedOrderingScheme;
+
+    for (Symbol symbol : expectedOrderingScheme.getOrderBy()) {
+      if (!context.hasSortNode && TIME.equalsIgnoreCase(symbol.getName())) {
+        continue;
+      }
+
+      if (!node.getIdAndAttributeIndexMap().containsKey(symbol)) {
+        break;
+      }
+
+      newOrderingSymbols.add(symbol);
+      newSortOrders.add(expectedOrderingScheme.getOrdering(symbol));
+    }
+
+    List<Function<DeviceEntry, String>> orderingRules = new ArrayList<>();
+    for (Symbol symbol : newOrderingSymbols) {
+      int idx = node.getIdAndAttributeIndexMap().get(symbol);
+      if (node.getAssignments().get(symbol).getColumnCategory() == 
TsTableColumnCategory.ID) {
+        // segments[0] is always tableName
+        orderingRules.add(deviceEntry -> (String) 
deviceEntry.getDeviceID().getSegments()[idx + 1]);
+      } else {
+        orderingRules.add(deviceEntry -> 
deviceEntry.getAttributeColumnValues().get(idx));
+      }
+    }
+
+    Comparator<DeviceEntry> comparator;
+    if (newSortOrders.get(0).isNullsFirst()) {
+      if (newSortOrders.get(0).isAscending()) {
+        comparator = 
Comparator.nullsFirst(Comparator.comparing(orderingRules.get(0)));
+      } else {
+        comparator = 
Comparator.nullsFirst(Comparator.comparing(orderingRules.get(0))).reversed();
+      }
+    } else {
+      if (newSortOrders.get(0).isAscending()) {
+        comparator = 
Comparator.nullsLast(Comparator.comparing(orderingRules.get(0)));
+      } else {
+        comparator = 
Comparator.nullsLast(Comparator.comparing(orderingRules.get(0))).reversed();
+      }
+    }
+    for (int i = 1; i < orderingRules.size(); i++) {
+      Comparator<DeviceEntry> thenComparator;
+      if (newSortOrders.get(i).isNullsFirst()) {
+        if (newSortOrders.get(i).isAscending()) {
+          thenComparator = 
Comparator.nullsFirst(Comparator.comparing(orderingRules.get(i)));
+        } else {
+          thenComparator = 
Comparator.nullsFirst(Comparator.comparing(orderingRules.get(i))).reversed();
+        }
+      } else {
+        if (newSortOrders.get(i).isAscending()) {
+          thenComparator = 
Comparator.nullsLast(Comparator.comparing(orderingRules.get(i)));
         } else {
-          ExchangeNode exchangeNode = new 
ExchangeNode(queryContext.getQueryId().genPlanNodeId());
-          exchangeNode.addChild(subTableScanNode);
-          mergeSortNode.addChild(exchangeNode);
+          thenComparator = 
Comparator.nullsLast(Comparator.comparing(orderingRules.get(i))).reversed();
         }
-        i++;
       }
-      return Collections.singletonList(mergeSortNode);
-    } else {
-      return 
Collections.singletonList(tableScanNodeMap.entrySet().iterator().next().getValue());
+      comparator = comparator.thenComparing(thenComparator);
+    }
+
+    OrderingScheme newOrderingScheme =
+        new OrderingScheme(
+            newOrderingSymbols,
+            IntStream.range(0, newOrderingSymbols.size())
+                .boxed()
+                .collect(Collectors.toMap(newOrderingSymbols::get, 
newSortOrders::get)));
+    for (PlanNode planNode : tableScanNodeList) {
+      TableScanNode tableScanNode = (TableScanNode) planNode;
+      planNodeOrderingSchemeMap.put(tableScanNode.getPlanNodeId(), 
newOrderingScheme);
+      List<DeviceEntry> deviceEntries = tableScanNode.getDeviceEntries();
+      deviceEntries.sort(comparator);
     }
+
+    return tableScanNodeList;
   }
 
   private List<PlanNode> connectViaMergeSort(
@@ -363,7 +426,9 @@ public class DistributedPlanGenerator
   public static class PlanContext {
     final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
     boolean hasExchangeNode = false;
+    boolean hasSortNode = false;
     OrderingScheme expectedOrderingScheme;
+    TRegionReplicaSet mostUsedDataRegion;
 
     public PlanContext() {
       this.nodeDistributionMap = new HashMap<>();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
index 8eb0f0c90fd..c72f2594043 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributionPlanner.java
@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING;
 
 public class TableDistributionPlanner {
@@ -50,16 +51,18 @@ public class TableDistributionPlanner {
   }
 
   public DistributedQueryPlan plan() {
+
+    // generate table model distributed plan
     DistributedPlanGenerator.PlanContext planContext = new 
DistributedPlanGenerator.PlanContext();
     List<PlanNode> distributedPlanResult =
         new DistributedPlanGenerator(mppQueryContext, analysis)
             .genResult(logicalQueryPlan.getRootNode(), planContext);
+    checkArgument(distributedPlanResult.size() == 1, "Root node must return 
only one");
 
-    if (distributedPlanResult.size() != 1) {
-      throw new IllegalStateException("root node must return only one");
-    }
-
-    PlanNode outputNodeWithExchange = distributedPlanResult.get(0);
+    // add exchange node for distributed plan
+    PlanNode outputNodeWithExchange =
+        new AddExchangeNodes(mppQueryContext)
+            .addExchangeNodes(distributedPlanResult.get(0), planContext);
     if (analysis.getStatement() instanceof Query) {
       analysis
           .getRespDatasetHeader()
@@ -71,17 +74,19 @@ public class TableDistributionPlanner {
     }
     adjustUpStream(outputNodeWithExchange, planContext);
 
+    // generate subPlan
     SubPlan subPlan =
         new SubPlanGenerator()
             .splitToSubPlan(logicalQueryPlan.getContext().getQueryId(), 
outputNodeWithExchange);
     subPlan.getPlanFragment().setRoot(true);
 
+    // generate fragment instances
     List<FragmentInstance> fragmentInstances =
         mppQueryContext.getQueryType() == QueryType.READ
             ? new TableModelQueryFragmentPlanner(subPlan, analysis, 
mppQueryContext).plan()
             : new WriteFragmentParallelPlanner(subPlan, analysis, 
mppQueryContext).parallelPlan();
 
-    // Only execute this step for READ operation
+    // only execute this step for READ operation
     if (mppQueryContext.getQueryType() == QueryType.READ) {
       setSinkForRootInstance(subPlan, fragmentInstances);
     }
@@ -124,32 +129,32 @@ public class TableDistributionPlanner {
     rootInstance.getFragment().setPlanNodeTree(sinkNode);
   }
 
-  private void adjustUpStream(PlanNode root, 
DistributedPlanGenerator.PlanContext exchangeContext) {
-    if (!exchangeContext.hasExchangeNode) {
+  private void adjustUpStream(PlanNode root, 
DistributedPlanGenerator.PlanContext context) {
+    if (!context.hasExchangeNode) {
       return;
     }
 
-    adjustUpStreamHelper(root, exchangeContext, new HashMap<>());
+    adjustUpStreamHelper(root, context, new HashMap<>());
   }
 
   private void adjustUpStreamHelper(
       PlanNode root,
-      DistributedPlanGenerator.PlanContext exchangeContext,
+      DistributedPlanGenerator.PlanContext context,
       Map<TRegionReplicaSet, IdentitySinkNode> regionNodemap) {
     for (PlanNode child : root.getChildren()) {
-      adjustUpStreamHelper(child, exchangeContext, regionNodemap);
+      adjustUpStreamHelper(child, context, regionNodemap);
 
       if (child instanceof ExchangeNode) {
         ExchangeNode exchangeNode = (ExchangeNode) child;
+
         IdentitySinkNode identitySinkNode =
             regionNodemap.computeIfAbsent(
-                exchangeContext
-                    
.getNodeDistribution(exchangeNode.getChild().getPlanNodeId())
-                    .getRegion(),
+                
context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).getRegion(),
                 k -> new 
IdentitySinkNode(mppQueryContext.getQueryId().genPlanNodeId()));
         identitySinkNode.addChild(exchangeNode.getChild());
         identitySinkNode.addDownStreamChannelLocation(
             new 
DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
+
         exchangeNode.setChild(identitySinkNode);
         
exchangeNode.setIndexOfUpstreamSinkHandle(identitySinkNode.getCurrentLastIndex());
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
new file mode 100644
index 00000000000..85444b62264
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CollectNode.java
@@ -0,0 +1,4 @@
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+public class CollectNode {
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
index 77162d0a330..d10b3354605 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AnalyzerTest.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
 import org.apache.iotdb.db.queryengine.plan.relational.function.OperatorType;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnHandle;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
@@ -41,11 +42,13 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute.TableDistributionPlanner;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.parser.SqlParser;
@@ -61,6 +64,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.iotdb.db.queryengine.execution.warnings.WarningCollector.NOOP;
@@ -214,12 +218,34 @@ public class AnalyzerTest {
         Arrays.asList("time", "tag1", "tag2", "tag3", "attr1", "attr2", "s1", 
"s2", "s3"),
         tableScanNode.getOutputColumnNames());
     assertEquals(9, tableScanNode.getAssignments().size());
-    assertEquals(1, tableScanNode.getDeviceEntries().size());
+    assertEquals(6, tableScanNode.getDeviceEntries().size());
     assertEquals(5, tableScanNode.getIdAndAttributeIndexMap().size());
-    assertEquals("(\"time\" > 1)", 
tableScanNode.getTimePredicate().get().toString());
+    assertEquals(
+        "(\"time\" > 1)", 
tableScanNode.getTimePredicate().map(Expression::toString).orElse(null));
     assertNull(tableScanNode.getPushDownPredicate());
     assertEquals(ASC, tableScanNode.getScanOrder());
     distributionPlanner = new TableDistributionPlanner(actualAnalysis, 
logicalQueryPlan, context);
+    distributedQueryPlan = distributionPlanner.plan();
+    assertEquals(3, distributedQueryPlan.getFragments().size());
+    assertTrue(
+        
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0)
+            instanceof OutputNode);
+    OutputNode outputNode =
+        (OutputNode)
+            
distributedQueryPlan.getFragments().get(0).getPlanNodeTree().getChildren().get(0);
+    assertTrue(outputNode.getChildren().get(0) instanceof MergeSortNode);
+    MergeSortNode mergeSortNode = (MergeSortNode) 
outputNode.getChildren().get(0);
+    assertEquals(
+        Arrays.asList("tag1", "tag2", "tag3", "attr1", "attr2"),
+        mergeSortNode.getOrderingScheme().getOrderBy().stream()
+            .map(Symbol::getName)
+            .collect(Collectors.toList()));
+    assertTrue(mergeSortNode.getChildren().get(0) instanceof ExchangeNode);
+    assertTrue(mergeSortNode.getChildren().get(1) instanceof TableScanNode);
+    assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
+    TableScanNode tableScanNode = (TableScanNode) 
mergeSortNode.getChildren().get(1);
+    assertEquals(4, tableScanNode.getDeviceEntries().size());
+    assertEquals(Arrays.asList(), 
tableScanNode.getDeviceEntries().stream().map(d -> d.getDeviceID().toString()));
   }
 
   @Test
@@ -378,7 +404,7 @@ public class AnalyzerTest {
   @Test
   public void singleTableProjectTest() {
     // 1. project without filter
-    sql = "SELECT tag1, attr1, s1 FROM table1";
+    sql = "SELECT time, tag1, attr1, s1 FROM table1";
     actualAnalysis = analyzeSQL(sql, metadata);
     assertNotNull(actualAnalysis);
     assertEquals(1, actualAnalysis.getTables().size());
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTableModelDataPartition.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTableModelDataPartition.java
new file mode 100644
index 00000000000..90e9d71f46b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTableModelDataPartition.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.partition.DataPartition;
+import org.apache.iotdb.commons.partition.SchemaPartition;
+import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MockTableModelDataPartition {
+
+  private static final SeriesPartitionExecutor EXECUTOR =
+      SeriesPartitionExecutor.getSeriesPartitionExecutor(
+          
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+          
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+
+  private static final String DB_NAME = "testdb";
+
+  static final String DEVICE_1 = "table1.beijing.A1.ZZ";
+  static final String DEVICE_2 = "table1.beijing.A2.XX";
+  static final String DEVICE_3 = "table1.shanghai.A3.YY";
+  static final String DEVICE_4 = "table1.shanghai.B3.YY";
+  static final String DEVICE_5 = "table1.shenzhen.B2.ZZ";
+  static final String DEVICE_6 = "table1.shenzhen.B1.XX";
+
+  static final List<String> DEVICE_1_ATTRIBUTES = Arrays.asList("high", "big");
+  static final List<String> DEVICE_2_ATTRIBUTES = Arrays.asList("high", 
"small");
+  static final List<String> DEVICE_3_ATTRIBUTES = Arrays.asList("low", 
"small");
+  static final List<String> DEVICE_4_ATTRIBUTES = Arrays.asList("low", "big");
+  static final List<String> DEVICE_5_ATTRIBUTES = Arrays.asList("mid", "big");
+  static final List<String> DEVICE_6_ATTRIBUTES = Arrays.asList("mid", 
"small");
+
+  private static final TRegionReplicaSet DATA_REGION_GROUP_1 = 
genDataRegionGroup(10, 1, 2);
+  private static final TRegionReplicaSet DATA_REGION_GROUP_2 = 
genDataRegionGroup(11, 3, 2);
+  private static final TRegionReplicaSet DATA_REGION_GROUP_3 = 
genDataRegionGroup(12, 2, 1);
+  private static final TRegionReplicaSet DATA_REGION_GROUP_4 = 
genDataRegionGroup(13, 1, 3);
+
+  /*
+   * DataPartition:
+   *
+   * device1(startTime:0): DataRegionGroup_1,
+   * device2(startTime:0): DataRegionGroup_1,
+   * device3(startTime:0): DataRegionGroup_2,
+   * device4(startTime:0): DataRegionGroup_2,
+   * device5(startTime:0): DataRegionGroup_2,
+   * device5(startTime:100): DataRegionGroup_3,
+   * device6(startTime:0): DataRegionGroup_2,
+   * device6(startTime:100): DataRegionGroup_3,
+   */
+  public static DataPartition constructDataPartition() {
+    DataPartition dataPartition =
+        new DataPartition(
+            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+
+    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>>>
+        dbPartitionMap = new HashMap<>();
+    Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>> devicePartitionMap =
+        new HashMap<>();
+
+    List<TRegionReplicaSet> regionGroup1 = 
Collections.singletonList(DATA_REGION_GROUP_1);
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> dataRegionMap1 =
+        Collections.singletonMap(new TTimePartitionSlot(0L), regionGroup1);
+    devicePartitionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_1), 
dataRegionMap1);
+    devicePartitionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_2), 
dataRegionMap1);
+
+    List<TRegionReplicaSet> regionGroup2 = 
Collections.singletonList(DATA_REGION_GROUP_2);
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> dataRegionMap2 =
+        Collections.singletonMap(new TTimePartitionSlot(0L), regionGroup2);
+    devicePartitionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_3), 
dataRegionMap2);
+    devicePartitionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_4), 
dataRegionMap2);
+
+    List<TRegionReplicaSet> regionGroup3 = 
Collections.singletonList(DATA_REGION_GROUP_2);
+    List<TRegionReplicaSet> regionGroup4 = 
Collections.singletonList(DATA_REGION_GROUP_3);
+    Map<TTimePartitionSlot, List<TRegionReplicaSet>> dataRegionMap3 =
+        ImmutableMap.<TTimePartitionSlot, List<TRegionReplicaSet>>builder()
+            .put(new TTimePartitionSlot(0L), regionGroup3)
+            .put(new TTimePartitionSlot(100L), regionGroup4)
+            .build();
+    devicePartitionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_5), 
dataRegionMap3);
+    devicePartitionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_6), 
dataRegionMap3);
+
+    dbPartitionMap.put(DB_NAME, devicePartitionMap);
+    dataPartition.setDataPartitionMap(dbPartitionMap);
+
+    return dataPartition;
+  }
+
+  public static SchemaPartition constructSchemaPartition() {
+    SchemaPartition schemaPartition =
+        new SchemaPartition(
+            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
+            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
+    Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> 
schemaPartitionMap = new HashMap<>();
+
+    TRegionReplicaSet schemaRegion1 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11),
+            Arrays.asList(
+                genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, 
"192.0.1.2")));
+
+    TRegionReplicaSet schemaRegion2 =
+        new TRegionReplicaSet(
+            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21),
+            Arrays.asList(
+                genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, 
"192.0.2.2")));
+
+    Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new 
HashMap<>();
+    schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_1), 
schemaRegion1);
+    schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_2), 
schemaRegion2);
+    schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(DEVICE_3), 
schemaRegion2);
+    schemaPartitionMap.put(DB_NAME, schemaRegionMap);
+    schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
+
+    return schemaPartition;
+  }
+
+  private static TRegionReplicaSet genDataRegionGroup(
+      int regionGroupId, int dataNodeId1, int dataNodeId2) {
+    return new TRegionReplicaSet(
+        new TConsensusGroupId(TConsensusGroupType.DataRegion, regionGroupId),
+        Arrays.asList(
+            genDataNodeLocation(dataNodeId1, String.format("192.0.%s.1", 
regionGroupId)),
+            genDataNodeLocation(dataNodeId2, String.format("192.0.%s.2", 
regionGroupId))));
+  }
+
+  private static TDataNodeLocation genDataNodeLocation(int dataNodeId, String 
ip) {
+    return new TDataNodeLocation()
+        .setDataNodeId(dataNodeId)
+        .setClientRpcEndPoint(new TEndPoint(ip, 9000))
+        .setMPPDataExchangeEndPoint(new TEndPoint(ip, 9001))
+        .setInternalEndPoint(new TEndPoint(ip, 9002));
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTablePartition.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTablePartition.java
deleted file mode 100644
index ae24f4a219e..00000000000
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/MockTablePartition.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
-
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
-import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
-import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
-import org.apache.iotdb.commons.partition.DataPartition;
-import org.apache.iotdb.commons.partition.SchemaPartition;
-import org.apache.iotdb.commons.partition.executor.SeriesPartitionExecutor;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class MockTablePartition {
-
-  private static final SeriesPartitionExecutor EXECUTOR =
-      SeriesPartitionExecutor.getSeriesPartitionExecutor(
-          
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
-          
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
-
-  private static final String DB_NAME = "root.testdb";
-
-  private static final String device1 = "root.testdb.d1";
-  private static final String device2 = "root.testdb.d22";
-  private static final String device3 = "root.testdb.d333";
-  private static final String device4 = "root.testdb.d4444";
-  private static final String device5 = "root.testdb.d55555";
-  private static final String device6 = "root.testdb.d666666";
-
-  public static DataPartition constructDataPartition() {
-    TRegionReplicaSet dataRegion1 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 1),
-            Arrays.asList(
-                genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, 
"192.0.1.2")));
-
-    TRegionReplicaSet dataRegion2 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 2),
-            Arrays.asList(
-                genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, 
"192.0.2.2")));
-
-    TRegionReplicaSet dataRegion3 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 3),
-            Arrays.asList(
-                genDataNodeLocation(31, "192.0.3.1"), genDataNodeLocation(32, 
"192.0.3.2")));
-
-    TRegionReplicaSet dataRegion4 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.DataRegion, 4),
-            Arrays.asList(
-                genDataNodeLocation(41, "192.0.4.1"), genDataNodeLocation(42, 
"192.0.4.2")));
-
-    DataPartition dataPartition =
-        new DataPartition(
-            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
-            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
-
-    Map<String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>>>
-        dataPartitionMap = new HashMap<>();
-    Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>> sgPartitionMap =
-        new HashMap<>();
-
-    List<TRegionReplicaSet> d1DataRegions = new ArrayList<>();
-    d1DataRegions.add(dataRegion1);
-    d1DataRegions.add(dataRegion2);
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d1DataRegionMap = new 
HashMap<>();
-    d1DataRegionMap.put(new TTimePartitionSlot(), d1DataRegions);
-
-    List<TRegionReplicaSet> d2DataRegions = new ArrayList<>();
-    d2DataRegions.add(dataRegion3);
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d2DataRegionMap = new 
HashMap<>();
-    d2DataRegionMap.put(new TTimePartitionSlot(), d2DataRegions);
-
-    List<TRegionReplicaSet> d3DataRegions = new ArrayList<>();
-    d3DataRegions.add(dataRegion1);
-    d3DataRegions.add(dataRegion4);
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d3DataRegionMap = new 
HashMap<>();
-    d3DataRegionMap.put(new TTimePartitionSlot(), d3DataRegions);
-
-    List<TRegionReplicaSet> d4DataRegions = new ArrayList<>();
-    d4DataRegions.add(dataRegion1);
-    d4DataRegions.add(dataRegion4);
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d4DataRegionMap = new 
HashMap<>();
-    d4DataRegionMap.put(new TTimePartitionSlot(), d4DataRegions);
-
-    List<TRegionReplicaSet> d5DataRegions = new ArrayList<>();
-    d5DataRegions.add(dataRegion4);
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d5DataRegionMap = new 
HashMap<>();
-    d5DataRegionMap.put(new TTimePartitionSlot(), d5DataRegions);
-
-    List<TRegionReplicaSet> d6DataRegions = new ArrayList<>();
-    d6DataRegions.add(dataRegion1);
-    d6DataRegions.add(dataRegion2);
-    Map<TTimePartitionSlot, List<TRegionReplicaSet>> d6DataRegionMap = new 
HashMap<>();
-    d6DataRegionMap.put(new TTimePartitionSlot(), d6DataRegions);
-
-    sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device1), 
d1DataRegionMap);
-    sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device2), 
d2DataRegionMap);
-    sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device3), 
d3DataRegionMap);
-    sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device4), 
d4DataRegionMap);
-    sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device5), 
d5DataRegionMap);
-    sgPartitionMap.put(EXECUTOR.getSeriesPartitionSlot(device6), 
d6DataRegionMap);
-
-    dataPartitionMap.put(DB_NAME, sgPartitionMap);
-    dataPartition.setDataPartitionMap(dataPartitionMap);
-
-    return dataPartition;
-  }
-
-  public static SchemaPartition constructSchemaPartition() {
-    SchemaPartition schemaPartition =
-        new SchemaPartition(
-            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionExecutorClass(),
-            
IoTDBDescriptor.getInstance().getConfig().getSeriesPartitionSlotNum());
-    Map<String, Map<TSeriesPartitionSlot, TRegionReplicaSet>> 
schemaPartitionMap = new HashMap<>();
-
-    TRegionReplicaSet schemaRegion1 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 11),
-            Arrays.asList(
-                genDataNodeLocation(11, "192.0.1.1"), genDataNodeLocation(12, 
"192.0.1.2")));
-
-    TRegionReplicaSet schemaRegion2 =
-        new TRegionReplicaSet(
-            new TConsensusGroupId(TConsensusGroupType.SchemaRegion, 21),
-            Arrays.asList(
-                genDataNodeLocation(21, "192.0.2.1"), genDataNodeLocation(22, 
"192.0.2.2")));
-
-    Map<TSeriesPartitionSlot, TRegionReplicaSet> schemaRegionMap = new 
HashMap<>();
-    schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(device1), 
schemaRegion1);
-    schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(device2), 
schemaRegion2);
-    schemaRegionMap.put(EXECUTOR.getSeriesPartitionSlot(device3), 
schemaRegion2);
-    schemaPartitionMap.put(DB_NAME, schemaRegionMap);
-    schemaPartition.setSchemaPartitionMap(schemaPartitionMap);
-
-    return schemaPartition;
-  }
-
-  private static TDataNodeLocation genDataNodeLocation(int dataNodeId, String 
ip) {
-    return new TDataNodeLocation()
-        .setDataNodeId(dataNodeId)
-        .setClientRpcEndPoint(new TEndPoint(ip, 9000))
-        .setMPPDataExchangeEndPoint(new TEndPoint(ip, 9001))
-        .setInternalEndPoint(new TEndPoint(ip, 9002));
-  }
-}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
index 55fbeacb86c..b20d1e0dc83 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TestMatadata.java
@@ -47,12 +47,23 @@ import org.apache.tsfile.read.common.type.BinaryType;
 import org.apache.tsfile.read.common.type.Type;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_1;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_1_ATTRIBUTES;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_2;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_2_ATTRIBUTES;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_3;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_3_ATTRIBUTES;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_4;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_4_ATTRIBUTES;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_5;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_5_ATTRIBUTES;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_6;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.MockTableModelDataPartition.DEVICE_6_ATTRIBUTES;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl.getFunctionType;
 import static org.apache.tsfile.read.common.type.BinaryType.TEXT;
 import static org.apache.tsfile.read.common.type.BooleanType.BOOLEAN;
@@ -195,10 +206,13 @@ public class TestMatadata implements Metadata {
       QualifiedObjectName tableName,
       List<Expression> expressionList,
       List<String> attributeColumns) {
-    return Collections.singletonList(
-        new DeviceEntry(
-            new StringArrayDeviceID("root.testdb", "table1", "t1", "t2", "t3"),
-            Arrays.asList("a1", "a2")));
+    return Arrays.asList(
+        new DeviceEntry(new StringArrayDeviceID(DEVICE_4.split("\\.")), 
DEVICE_4_ATTRIBUTES),
+        new DeviceEntry(new StringArrayDeviceID(DEVICE_1.split("\\.")), 
DEVICE_1_ATTRIBUTES),
+        new DeviceEntry(new StringArrayDeviceID(DEVICE_6.split("\\.")), 
DEVICE_6_ATTRIBUTES),
+        new DeviceEntry(new StringArrayDeviceID(DEVICE_5.split("\\.")), 
DEVICE_5_ATTRIBUTES),
+        new DeviceEntry(new StringArrayDeviceID(DEVICE_3.split("\\.")), 
DEVICE_3_ATTRIBUTES),
+        new DeviceEntry(new StringArrayDeviceID(DEVICE_2.split("\\.")), 
DEVICE_2_ATTRIBUTES));
   }
 
   @Override
@@ -277,9 +291,10 @@ public class TestMatadata implements Metadata {
     return isNumericType(left) && isNumericType(right);
   }
 
-  private static final DataPartition DATA_PARTITION = 
MockTablePartition.constructDataPartition();
+  private static final DataPartition DATA_PARTITION =
+      MockTableModelDataPartition.constructDataPartition();
   private static final SchemaPartition SCHEMA_PARTITION =
-      MockTablePartition.constructSchemaPartition();
+      MockTableModelDataPartition.constructSchemaPartition();
 
   private static IPartitionFetcher getFakePartitionFetcher() {
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index edeed746f13..1fe12378834 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -140,9 +140,20 @@ public class DataPartition extends Partition {
    * <p>The device id shall be [table, seg1, ....]
    */
   public List<TRegionReplicaSet> getDataRegionReplicaSetWithTimeFilter(
-      String database, IDeviceID deviceID, Filter timeFilter) {
-    // TODO implement this interface, @Potato
-    throw new UnsupportedOperationException();
+      String database, IDeviceID deviceId, Filter timeFilter) {
+    // TODO perfect this interface, @Potato
+    TSeriesPartitionSlot seriesPartitionSlot = 
calculateDeviceGroupId(deviceId);
+    if (!dataPartitionMap.containsKey(database)
+        || !dataPartitionMap.get(database).containsKey(seriesPartitionSlot)) {
+      return Collections.singletonList(NOT_ASSIGNED);
+    }
+    return 
dataPartitionMap.get(database).get(seriesPartitionSlot).entrySet().stream()
+        .filter(
+            entry ->
+                TimePartitionUtils.satisfyPartitionStartTime(timeFilter, 
entry.getKey().startTime))
+        .flatMap(entry -> entry.getValue().stream())
+        .distinct()
+        .collect(toList());
   }
 
   public List<TRegionReplicaSet> getDataRegionReplicaSet(


Reply via email to