This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/RefineSchemaNode
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4a37a9aadb6e9b567aa7fc9f2e157d6e6bb630e1
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jul 9 18:39:06 2024 +0800

    Correct the way Schema related PlanNode in DistributionPlanner
---
 .../plan/planner/LocalExecutionPlanner.java        |  36 ++---
 .../plan/planner/TableOperatorGenerator.java       |  19 ---
 .../node/metedata/read/TableDeviceFetchNode.java   |   6 +-
 .../node/metedata/read/TableDeviceSourceNode.java  |  23 ++++
 .../fetcher/TableDeviceSchemaValidator.java        |   2 +-
 .../planner/distribute/AddExchangeNodes.java       |  24 ++++
 .../distribute/DistributedPlanGenerator.java       | 147 ++++++++++++---------
 .../apache/iotdb/commons/partition/Partition.java  |   2 +-
 8 files changed, 159 insertions(+), 100 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
index 3b74f1b4a08..1f794df63fb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LocalExecutionPlanner.java
@@ -92,20 +92,7 @@ public class LocalExecutionPlanner {
     LocalExecutionPlanContext context =
         new LocalExecutionPlanContext(types, instanceContext, 
dataNodeQueryContext);
 
-    // Generate pipelines, return the last pipeline data structure
-    // TODO Replace operator with operatorFactory to build multiple driver for 
one pipeline
-    Operator root;
-    IClientSession.SqlDialect sqlDialect = 
instanceContext.getSessionInfo().getSqlDialect();
-    switch (sqlDialect) {
-      case TREE:
-        root = plan.accept(new OperatorTreeGenerator(), context);
-        break;
-      case TABLE:
-        root = plan.accept(new TableOperatorGenerator(metadata), context);
-        break;
-      default:
-        throw new IllegalArgumentException(String.format("Unknown sql dialect: 
%s", sqlDialect));
-    }
+    Operator root = generateOperator(instanceContext, context, plan);
 
     PipelineMemoryEstimator memoryEstimator =
         context.constructPipelineMemoryEstimator(root, null, plan, -1);
@@ -136,7 +123,7 @@ public class LocalExecutionPlanner {
     LocalExecutionPlanContext context =
         new LocalExecutionPlanContext(instanceContext, schemaRegion);
 
-    Operator root = plan.accept(new OperatorTreeGenerator(), context);
+    Operator root = generateOperator(instanceContext, context, plan);
 
     PipelineMemoryEstimator memoryEstimator =
         context.constructPipelineMemoryEstimator(root, null, plan, -1);
@@ -154,6 +141,25 @@ public class LocalExecutionPlanner {
     return context.getPipelineDriverFactories();
   }
 
+  private Operator generateOperator(
+      FragmentInstanceContext instanceContext, LocalExecutionPlanContext 
context, PlanNode node) {
+    // Generate pipelines, return the last pipeline data structure
+    // TODO Replace operator with operatorFactory to build multiple driver for 
one pipeline
+    Operator root;
+    IClientSession.SqlDialect sqlDialect = 
instanceContext.getSessionInfo().getSqlDialect();
+    switch (sqlDialect) {
+      case TREE:
+        root = node.accept(new OperatorTreeGenerator(), context);
+        break;
+      case TABLE:
+        root = node.accept(new TableOperatorGenerator(metadata), context);
+        break;
+      default:
+        throw new IllegalArgumentException(String.format("Unknown sql dialect: 
%s", sqlDialect));
+    }
+    return root;
+  }
+
   private long checkMemory(
       final PipelineMemoryEstimator memoryEstimator, 
FragmentInstanceStateMachine stateMachine)
       throws MemoryNotEnoughException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index a26f284e0d1..b1ba6ded7f0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -40,7 +40,6 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator
 import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.StreamSortOperator;
 import org.apache.iotdb.db.queryengine.execution.operator.process.TopKOperator;
-import 
org.apache.iotdb.db.queryengine.execution.operator.schema.SchemaQueryMergeOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.schema.SchemaQueryScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSourceFactory;
 import 
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
@@ -53,7 +52,6 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceFetchNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceQueryNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
@@ -779,23 +777,6 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
         TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber());
   }
 
-  @Override
-  public Operator visitSchemaQueryMerge(
-      SchemaQueryMergeNode node, LocalExecutionPlanContext context) {
-    List<Operator> children = new ArrayList<>(node.getChildren().size());
-    for (PlanNode child : node.getChildren()) {
-      children.add(child.accept(this, context));
-    }
-    OperatorContext operatorContext =
-        context
-            .getDriverContext()
-            .addOperatorContext(
-                context.getNextOperatorId(),
-                node.getPlanNodeId(),
-                SchemaQueryMergeOperator.class.getSimpleName());
-    return new SchemaQueryMergeOperator(operatorContext, children);
-  }
-
   @Override
   public Operator visitTableDeviceFetch(
       TableDeviceFetchNode node, LocalExecutionPlanContext context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceFetchNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceFetchNode.java
index 28a06fda562..d8e0f5e9028 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceFetchNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceFetchNode.java
@@ -38,7 +38,7 @@ import java.util.stream.Collectors;
 
 public class TableDeviceFetchNode extends TableDeviceSourceNode {
 
-  private List<Object[]> deviceIdList;
+  private final List<Object[]> deviceIdList;
 
   public TableDeviceFetchNode(
       PlanNodeId id,
@@ -51,6 +51,10 @@ public class TableDeviceFetchNode extends 
TableDeviceSourceNode {
     this.deviceIdList = deviceIdList;
   }
 
+  public void addDeviceId(Object[] deviceId) {
+    deviceIdList.add(deviceId);
+  }
+
   public List<Object[]> getDeviceIdList() {
     return deviceIdList;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceSourceNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceSourceNode.java
index 0bc6ee2da94..f2cb19041c2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceSourceNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/metedata/read/TableDeviceSourceNode.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 public abstract class TableDeviceSourceNode extends SourceNode {
@@ -97,4 +98,26 @@ public abstract class TableDeviceSourceNode extends 
SourceNode {
   public int allowedChildCount() {
     return 0;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    TableDeviceSourceNode that = (TableDeviceSourceNode) o;
+    return Objects.equals(database, that.database)
+        && Objects.equals(tableName, that.tableName)
+        && Objects.equals(schemaRegionReplicaSet, that.schemaRegionReplicaSet);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), database, tableName, 
schemaRegionReplicaSet);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
index 8dd2f5885c1..9f179937934 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/TableDeviceSchemaValidator.java
@@ -119,7 +119,7 @@ public class TableDeviceSchemaValidator {
   }
 
   // we need to truncate the tailing null
-  private String[] parseDeviceIdArray(Object[] objects) {
+  public static String[] parseDeviceIdArray(Object[] objects) {
     String[] strings = new String[objects.length];
     int lastNonNullIndex = -1;
     for (int i = 0; i < objects.length; i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
index 61f993ee600..f8c12841ee9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/AddExchangeNodes.java
@@ -25,6 +25,9 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributio
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceFetchNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceQueryNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceSourceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 
@@ -72,6 +75,7 @@ public class AddExchangeNodes extends PlanVisitor<PlanNode, 
DistributedPlanGener
         ExchangeNode exchangeNode = new 
ExchangeNode(queryContext.getQueryId().genPlanNodeId());
         exchangeNode.addChild(rewriteNode);
         newNode.addChild(exchangeNode);
+        context.hasExchangeNode = true;
       } else {
         newNode.addChild(rewriteNode);
       }
@@ -91,4 +95,24 @@ public class AddExchangeNodes extends PlanVisitor<PlanNode, 
DistributedPlanGener
         new NodeDistribution(SAME_WITH_ALL_CHILDREN, 
node.getRegionReplicaSet()));
     return node;
   }
+
+  @Override
+  public PlanNode visitTableDeviceFetch(
+      TableDeviceFetchNode node, DistributedPlanGenerator.PlanContext context) 
{
+    return processTableDeviceSourceNode(node, context);
+  }
+
+  @Override
+  public PlanNode visitTableDeviceQuery(
+      TableDeviceQueryNode node, DistributedPlanGenerator.PlanContext context) 
{
+    return processTableDeviceSourceNode(node, context);
+  }
+
+  private PlanNode processTableDeviceSourceNode(
+      TableDeviceSourceNode node, DistributedPlanGenerator.PlanContext 
context) {
+    context.nodeDistributionMap.put(
+        node.getPlanNodeId(),
+        new NodeDistribution(SAME_WITH_ALL_CHILDREN, 
node.getRegionReplicaSet()));
+    return node;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
index 0bf5bb32a1b..61543eda7d1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/DistributedPlanGenerator.java
@@ -14,20 +14,19 @@
 package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
+import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
-import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.AbstractSchemaMergeNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceFetchNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceQueryNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TableDeviceSourceNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
@@ -45,6 +44,8 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNod
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 
+import org.apache.tsfile.file.metadata.IDeviceID;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -58,6 +59,8 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
+import static org.apache.iotdb.commons.schema.SchemaConstant.ROOT;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.TableDeviceSchemaValidator.parseDeviceIdArray;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.PushPredicateIntoTableScan.containsDiffFunction;
 import static org.apache.iotdb.db.utils.constant.TestConstant.TIMESTAMP_STR;
 
@@ -74,7 +77,16 @@ public class DistributedPlanGenerator
   }
 
   public List<PlanNode> genResult(PlanNode node, PlanContext context) {
-    return node.accept(this, context);
+    List<PlanNode> res = node.accept(this, context);
+    if (res.size() == 1) {
+      return res;
+    } else if (res.size() > 1) {
+      CollectNode collectNode = new CollectNode(queryId.genPlanNodeId());
+      res.forEach(collectNode::addChild);
+      return Collections.singletonList(collectNode);
+    } else {
+      throw new IllegalStateException("List<PlanNode>.size should >= 1, but 
now is 0");
+    }
   }
 
   @Override
@@ -273,8 +285,6 @@ public class DistributedPlanGenerator
       }
     }
 
-    context.hasExchangeNode = tableScanNodeMap.size() > 1;
-
     List<PlanNode> resultTableScanNodeList = new ArrayList<>();
     TRegionReplicaSet mostUsedDataRegion = null;
     int maxDeviceEntrySizeOfTableScan = 0;
@@ -401,18 +411,10 @@ public class DistributedPlanGenerator
   }
 
   // ------------------- schema related interface 
---------------------------------------------
-
   @Override
-  public List<PlanNode> visitSchemaQueryMerge(SchemaQueryMergeNode node, 
PlanContext context) {
-    return Collections.singletonList(
-        addExchangeNodeForSchemaMerge(rewriteSchemaQuerySource(node, context), 
context));
-  }
-
-  private SchemaQueryMergeNode rewriteSchemaQuerySource(
-      SchemaQueryMergeNode node, PlanContext context) {
-    SchemaQueryMergeNode root = (SchemaQueryMergeNode) node.clone();
-
-    String database = ((TableDeviceSourceNode) 
node.getChildren().get(0)).getDatabase();
+  public List<PlanNode> visitTableDeviceQuery(TableDeviceQueryNode node, 
PlanContext context) {
+    String database =
+        ROOT + "." + ((TableDeviceSourceNode) 
node.getChildren().get(0)).getDatabase();
     Set<TRegionReplicaSet> schemaRegionSet = new HashSet<>();
     analysis
         .getSchemaPartitionInfo()
@@ -421,54 +423,77 @@ public class DistributedPlanGenerator
         .forEach(
             (deviceGroupId, schemaRegionReplicaSet) -> 
schemaRegionSet.add(schemaRegionReplicaSet));
 
-    for (PlanNode child : node.getChildren()) {
+    context.mostUsedDataRegion = schemaRegionSet.iterator().next();
+    if (schemaRegionSet.size() == 1) {
+      node.setRegionReplicaSet(schemaRegionSet.iterator().next());
+      return Collections.singletonList(node);
+    } else {
+      List<PlanNode> res = new ArrayList<>(schemaRegionSet.size());
       for (TRegionReplicaSet schemaRegion : schemaRegionSet) {
-        SourceNode clonedChild = (SourceNode) child.clone();
+        TableDeviceQueryNode clonedChild = (TableDeviceQueryNode) node.clone();
         clonedChild.setPlanNodeId(queryId.genPlanNodeId());
         clonedChild.setRegionReplicaSet(schemaRegion);
-        root.addChild(clonedChild);
+        res.add(clonedChild);
       }
+      return res;
     }
-    return root;
   }
 
-  private PlanNode addExchangeNodeForSchemaMerge(
-      AbstractSchemaMergeNode node, PlanContext context) {
-    node.getChildren()
-        .forEach(
-            child ->
-                context.putNodeDistribution(
-                    child.getPlanNodeId(),
-                    new NodeDistribution(
-                        NodeDistributionType.NO_CHILD,
-                        ((SourceNode) child).getRegionReplicaSet())));
-    NodeDistribution nodeDistribution =
-        new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN);
-    PlanNode newNode = node.clone();
-    
nodeDistribution.setRegion(calculateSchemaRegionByChildren(node.getChildren(), 
context));
-    context.putNodeDistribution(newNode.getPlanNodeId(), nodeDistribution);
-    node.getChildren()
-        .forEach(
-            child -> {
-              if (!nodeDistribution
-                  .getRegion()
-                  
.equals(context.getNodeDistribution(child.getPlanNodeId()).getRegion())) {
-                ExchangeNode exchangeNode = new 
ExchangeNode(queryId.genPlanNodeId());
-                exchangeNode.setChild(child);
-                
exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
-                context.hasExchangeNode = true;
-                newNode.addChild(exchangeNode);
-              } else {
-                newNode.addChild(child);
-              }
-            });
-    return newNode;
-  }
+  @Override
+  public List<PlanNode> visitTableDeviceFetch(TableDeviceFetchNode node, 
PlanContext context) {
+    String database =
+        ROOT + "." + ((TableDeviceSourceNode) 
node.getChildren().get(0)).getDatabase();
+    Set<TRegionReplicaSet> schemaRegionSet = new HashSet<>();
+    SchemaPartition schemaPartition = analysis.getSchemaPartitionInfo();
+    Map<TSeriesPartitionSlot, TRegionReplicaSet> databaseMap =
+        schemaPartition.getSchemaPartitionMap().get(database);
 
-  private TRegionReplicaSet calculateSchemaRegionByChildren(
-      List<PlanNode> children, PlanContext context) {
-    // We always make the schemaRegion of SchemaMergeNode to be the same as 
its first child.
-    return 
context.getNodeDistribution(children.get(0).getPlanNodeId()).getRegion();
+    databaseMap.forEach(
+        (deviceGroupId, schemaRegionReplicaSet) -> 
schemaRegionSet.add(schemaRegionReplicaSet));
+
+    if (schemaRegionSet.size() == 1) {
+      context.mostUsedDataRegion = schemaRegionSet.iterator().next();
+      node.setRegionReplicaSet(context.mostUsedDataRegion);
+      return Collections.singletonList(node);
+    } else {
+      Map<TRegionReplicaSet, TableDeviceFetchNode> tableDeviceFetchMap = new 
HashMap<>();
+
+      for (Object[] deviceIdArray : node.getDeviceIdList()) {
+        IDeviceID deviceID =
+            
IDeviceID.Factory.DEFAULT_FACTORY.create(parseDeviceIdArray(deviceIdArray));
+        TRegionReplicaSet regionReplicaSet =
+            databaseMap.get(schemaPartition.calculateDeviceGroupId(deviceID));
+        tableDeviceFetchMap
+            .computeIfAbsent(
+                regionReplicaSet,
+                k ->
+                    new TableDeviceFetchNode(
+                        queryId.genPlanNodeId(),
+                        node.getDatabase(),
+                        node.getTableName(),
+                        new ArrayList<>(),
+                        node.getColumnHeaderList(),
+                        regionReplicaSet))
+            .addDeviceId(deviceIdArray);
+      }
+
+      List<PlanNode> res = new ArrayList<>();
+      TRegionReplicaSet mostUsedDataRegion = null;
+      int maxDeviceEntrySizeOfTableScan = 0;
+      for (Map.Entry<TRegionReplicaSet, TableDeviceFetchNode> entry :
+          tableDeviceFetchMap.entrySet()) {
+        TRegionReplicaSet regionReplicaSet = entry.getKey();
+        TableDeviceFetchNode subTableDeviceFetchNode = entry.getValue();
+        res.add(subTableDeviceFetchNode);
+
+        if (subTableDeviceFetchNode.getDeviceIdList().size() > 
maxDeviceEntrySizeOfTableScan) {
+          mostUsedDataRegion = regionReplicaSet;
+          maxDeviceEntrySizeOfTableScan = 
subTableDeviceFetchNode.getDeviceIdList().size();
+        }
+      }
+      context.mostUsedDataRegion = mostUsedDataRegion;
+      return res;
+    }
   }
 
   public static class PlanContext {
@@ -485,9 +510,5 @@ public class DistributedPlanGenerator
     public NodeDistribution getNodeDistribution(PlanNodeId nodeId) {
       return this.nodeDistributionMap.get(nodeId);
     }
-
-    public void putNodeDistribution(PlanNodeId nodeId, NodeDistribution 
distribution) {
-      this.nodeDistributionMap.put(nodeId, distribution);
-    }
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
index 3bb5a292a94..b074e5bae16 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/Partition.java
@@ -41,7 +41,7 @@ public abstract class Partition {
             seriesSlotExecutorName, seriesPartitionSlotNum);
   }
 
-  protected TSeriesPartitionSlot calculateDeviceGroupId(IDeviceID deviceID) {
+  public TSeriesPartitionSlot calculateDeviceGroupId(IDeviceID deviceID) {
     return executor.getSeriesPartitionSlot(deviceID);
   }
 

Reply via email to