This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/debug-table
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit da1662de077a88600fefe3f7b0c23ee444d99e53
Author: Beyyes <[email protected]>
AuthorDate: Tue May 21 21:01:49 2024 +0800

    add multi datanode multi dataregion impl for table model
---
 .../iotdb/confignode/conf/ConfigNodeConfig.java    |   4 +-
 .../db/queryengine/plan/analyze/TypeProvider.java  |  23 ++--
 .../plan/planner/TableOperatorGenerator.java       |  46 +++++++
 .../plan/planner/plan/node/PlanNodeType.java       |  42 +++++-
 .../plan/planner/plan/node/PlanVisitor.java        |   9 +-
 .../plan/relational/metadata/ColumnSchema.java     |  56 ++++++++
 .../plan/relational/metadata/DeviceEntry.java      |  32 +++++
 .../plan/relational/planner/OrderingScheme.java    |  49 +++++++
 .../plan/relational/planner/Symbol.java            |  18 +++
 .../planner/distribute/ExchangeNodeGenerator.java  |  77 ++++++++++-
 .../distribute/RelationalDistributionPlanner.java  |  77 ++++++++---
 .../planner/distribute/SubPlanGenerator.java       |   8 +-
 .../TableModelTypeProviderExtractor.java           |  21 +++
 .../plan/relational/planner/node/FilterNode.java   |   4 +
 .../plan/relational/planner/node/LimitNode.java    |   4 +
 .../relational/planner/node/MergeSortNode.java     |  33 ++++-
 .../plan/relational/planner/node/OffsetNode.java   |   4 +
 .../plan/relational/planner/node/OutputNode.java   |  45 ++++++-
 .../plan/relational/planner/node/ProjectNode.java  |   4 +
 .../plan/relational/planner/node/SortNode.java     |   4 +
 .../relational/planner/node/TableScanNode.java     | 144 ++++++++++++++++++++-
 .../plan/relational/planner/node/TopKNode.java     |   4 +
 .../planner/optimizations/IndexScan.java           |  28 ++--
 .../schema/table/column/TsTableColumnCategory.java |   6 +-
 24 files changed, 670 insertions(+), 72 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 3177bf0c0fd..4b02421f253 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -94,14 +94,14 @@ public class ConfigNodeConfig {
 
   /** The policy of extension DataRegionGroup for each Database. */
   private RegionGroupExtensionPolicy dataRegionGroupExtensionPolicy =
-      RegionGroupExtensionPolicy.AUTO;
+      RegionGroupExtensionPolicy.CUSTOM;
 
   /**
    * When set data_region_group_extension_policy=CUSTOM, this parameter is the 
default number of
    * DataRegionGroups for each Database. When set 
data_region_group_extension_policy=AUTO, this
    * parameter is the default minimal number of DataRegionGroups for each 
Database.
    */
-  private int defaultDataRegionGroupNumPerDatabase = 1;
+  private int defaultDataRegionGroupNumPerDatabase = 3;
 
   /** The maximum number of DataRegions expected to be managed by each 
DataNode. */
   private double dataRegionPerDataNode = 5.0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
index 39945fc27ca..fc6c56ae599 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
@@ -164,10 +164,12 @@ public class TypeProvider {
   }
 
   public void serialize(DataOutputStream stream) throws IOException {
-    ReadWriteIOUtils.write(treeModelTypeMap.size(), stream);
-    for (Map.Entry<String, TSDataType> entry : treeModelTypeMap.entrySet()) {
-      ReadWriteIOUtils.write(entry.getKey(), stream);
-      ReadWriteIOUtils.write(entry.getValue().ordinal(), stream);
+    ReadWriteIOUtils.write(treeModelTypeMap == null ? 0 : 
treeModelTypeMap.size(), stream);
+    if (treeModelTypeMap != null) {
+      for (Map.Entry<String, TSDataType> entry : treeModelTypeMap.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), stream);
+        ReadWriteIOUtils.write(entry.getValue().ordinal(), stream);
+      }
     }
 
     if (templatedInfo == null) {
@@ -180,6 +182,7 @@ public class TypeProvider {
     if (tableModelTypes == null) {
       ReadWriteIOUtils.write((byte) 0, stream);
     } else {
+      ReadWriteIOUtils.write((byte) 1, stream);
       ReadWriteIOUtils.write(tableModelTypes.size(), stream);
       for (Map.Entry<Symbol, Type> entry : tableModelTypes.entrySet()) {
         ReadWriteIOUtils.write(entry.getKey().getName(), stream);
@@ -204,20 +207,20 @@ public class TypeProvider {
       templatedInfo = TemplatedInfo.deserialize(byteBuffer);
     }
 
-    Map<Symbol, Type> types = null;
-    byte hasTypes = ReadWriteIOUtils.readByte(byteBuffer);
-    if (hasTypes == 1) {
+    Map<Symbol, Type> tableModelTypes = null;
+    byte hasTableModelTypes = ReadWriteIOUtils.readByte(byteBuffer);
+    if (hasTableModelTypes == 1) {
       mapSize = ReadWriteIOUtils.readInt(byteBuffer);
-      types = new HashMap<>(mapSize);
+      tableModelTypes = new HashMap<>(mapSize);
       while (mapSize > 0) {
-        types.put(
+        tableModelTypes.put(
             new Symbol(ReadWriteIOUtils.readString(byteBuffer)),
             
TypeFactory.getType(TypeEnum.values()[ReadWriteIOUtils.readInt(byteBuffer)]));
         mapSize--;
       }
     }
 
-    return new TypeProvider(typeMap, templatedInfo, types);
+    return new TypeProvider(typeMap, templatedInfo, tableModelTypes);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 09e7b9cae42..69d6cb305b6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
@@ -28,6 +29,7 @@ import 
org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeService
 import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex;
 import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle;
+import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.FilterAndProjectOperator;
@@ -38,6 +40,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator;
 import org.apache.iotdb.db.queryengine.execution.operator.process.TopKOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
 import 
org.apache.iotdb.db.queryengine.execution.relational.ColumnTransformerBuilder;
 import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
@@ -45,6 +48,7 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions;
@@ -86,6 +90,7 @@ import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparatorForTable;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath;
 import static 
org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter;
@@ -142,6 +147,47 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
     return new IdentitySinkOperator(operatorContext, children, 
downStreamChannelIndex, sinkHandle);
   }
 
+  @Override
+  public Operator visitExchange(ExchangeNode node, LocalExecutionPlanContext 
context) {
+    OperatorContext operatorContext =
+        context
+            .getDriverContext()
+            .addOperatorContext(
+                context.getNextOperatorId(),
+                node.getPlanNodeId(),
+                ExchangeOperator.class.getSimpleName());
+
+    FragmentInstanceId localInstanceId = context.getInstanceContext().getId();
+    FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId();
+
+    TEndPoint upstreamEndPoint = node.getUpstreamEndpoint();
+    boolean isSameNode = isSameNode(upstreamEndPoint);
+    ISourceHandle sourceHandle =
+        isSameNode
+            ? MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForFragment(
+                localInstanceId.toThrift(),
+                node.getPlanNodeId().getId(),
+                node.getUpstreamPlanNodeId().getId(),
+                remoteInstanceId.toThrift(),
+                node.getIndexOfUpstreamSinkHandle(),
+                context.getInstanceContext()::failed)
+            : MPP_DATA_EXCHANGE_MANAGER.createSourceHandle(
+                localInstanceId.toThrift(),
+                node.getPlanNodeId().getId(),
+                node.getIndexOfUpstreamSinkHandle(),
+                upstreamEndPoint,
+                remoteInstanceId.toThrift(),
+                context.getInstanceContext()::failed);
+    if (!isSameNode) {
+      context.addExchangeSumNum(1);
+    }
+    
sourceHandle.setMaxBytesCanReserve(context.getMaxBytesOneHandleCanReserve());
+    ExchangeOperator exchangeOperator =
+        new ExchangeOperator(operatorContext, sourceHandle, 
node.getUpstreamPlanNodeId());
+    context.addExchangeOperator(exchangeOperator);
+    return exchangeOperator;
+  }
+
   @Override
   public Operator visitTableScan(TableScanNode node, LocalExecutionPlanContext 
context) {
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 1152d9b0c6e..0e52538126b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -79,7 +79,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTag
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.IntoNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode;
@@ -218,7 +217,17 @@ public enum PlanNodeType {
   CREATE_TABLE_DEVICE((short) 92),
   TABLE_DEVICE_SCAN((short) 93),
   TABLE_DEVICE_FETCH((short) 94),
-  DELETE_TABLE_DEVICE((short) 95);
+  DELETE_TABLE_DEVICE((short) 95),
+
+  TABLE_SCAN_NODE((short) 1000),
+  TABLE_FILTER_NODE((short) 1001),
+  TABLE_PROJECT_NODE((short) 1002),
+  TABLE_OUTPUT_NODE((short) 1003),
+  TABLE_LIMIT_NODE((short) 1004),
+  TABLE_OFFSET_NODE((short) 1005),
+  TABLE_SORT_NODE((short) 1006),
+  TABLE_MERGESORT_NODE((short) 1007),
+  TABLE_TOPK_NODE((short) 1008);
 
   public static final int BYTES = Short.BYTES;
 
@@ -400,7 +409,8 @@ public enum PlanNodeType {
       case 65:
         return SingleDeviceViewNode.deserialize(buffer);
       case 66:
-        return MergeSortNode.deserialize(buffer);
+        return 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode
+            .deserialize(buffer);
       case 67:
         return ShowQueriesNode.deserialize(buffer);
       case 68:
@@ -457,6 +467,32 @@ public enum PlanNodeType {
         return TableDeviceFetchNode.deserialize(buffer);
       case 95:
         return DeleteTableDeviceNode.deserialize(buffer);
+      case 1000:
+        return 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode
+            .deserialize(buffer);
+      case 1001:
+        return 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode.deserialize(
+            buffer);
+      case 1002:
+        return 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode.deserialize(
+            buffer);
+      case 1003:
+        return 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode.deserialize(
+            buffer);
+      case 1004:
+        return 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode.deserialize(
+            buffer);
+      case 1005:
+        return 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode.deserialize(
+            buffer);
+      case 1006:
+        return 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode.deserialize(
+            buffer);
+      case 1007:
+        return 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode
+            .deserialize(buffer);
+      case 1008:
+        return TopKNode.deserialize(buffer);
       default:
         throw new IllegalArgumentException("Invalid node type: " + nodeType);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
index 03b28177027..34fd19bd7b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java
@@ -76,7 +76,6 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTag
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.IntoNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
-import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode;
@@ -112,6 +111,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 
 @SuppressWarnings("java:S6539") // suppress "Monster class" warning
@@ -291,7 +291,9 @@ public abstract class PlanVisitor<R, C> {
     return visitSingleChildProcess(node, context);
   }
 
-  public R visitMergeSort(MergeSortNode node, C context) {
+  public R visitMergeSort(
+      
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode 
node,
+      C context) {
     return visitMultiChildProcess(node, context);
   }
 
@@ -538,8 +540,7 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
-  public R visitMergeSort(
-      
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode 
node, C context) {
+  public R visitMergeSort(MergeSortNode node, C context) {
     return visitPlan(node, context);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
index bf8027339b4..bb460c61423 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
@@ -21,13 +21,25 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.metadata;
 
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 
+import org.apache.tsfile.read.common.type.BinaryType;
+import org.apache.tsfile.read.common.type.BooleanType;
+import org.apache.tsfile.read.common.type.DoubleType;
+import org.apache.tsfile.read.common.type.FloatType;
 import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.read.common.type.TypeEnum;
+import org.apache.tsfile.read.common.type.UnknownType;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Objects;
 import java.util.StringJoiner;
 
 import static java.util.Locale.ENGLISH;
 import static java.util.Objects.requireNonNull;
+import static org.apache.tsfile.read.common.type.IntType.INT32;
+import static org.apache.tsfile.read.common.type.LongType.INT64;
 
 public class ColumnSchema {
   private final String name;
@@ -87,6 +99,50 @@ public class ColumnSchema {
         .toString();
   }
 
+  public static void serialize(ColumnSchema columnSchema, ByteBuffer 
byteBuffer) {
+    ReadWriteIOUtils.write(columnSchema.getName(), byteBuffer);
+    ReadWriteIOUtils.write(columnSchema.getType().getTypeEnum().ordinal(), 
byteBuffer);
+    columnSchema.getColumnCategory().serialize(byteBuffer);
+    ReadWriteIOUtils.write(columnSchema.isHidden(), byteBuffer);
+  }
+
+  public static void serialize(ColumnSchema columnSchema, DataOutputStream 
stream)
+      throws IOException {
+    ReadWriteIOUtils.write(columnSchema.getName(), stream);
+    ReadWriteIOUtils.write(columnSchema.getType().getTypeEnum().ordinal(), 
stream);
+    columnSchema.getColumnCategory().serialize(stream);
+    ReadWriteIOUtils.write(columnSchema.isHidden(), stream);
+  }
+
+  public static ColumnSchema deserialize(ByteBuffer byteBuffer) {
+    String name = ReadWriteIOUtils.readString(byteBuffer);
+    TypeEnum typeEnum = 
TypeEnum.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+    Type type = getType(typeEnum);
+    TsTableColumnCategory columnCategory = 
TsTableColumnCategory.deserialize(byteBuffer);
+    boolean isHidden = ReadWriteIOUtils.readBool(byteBuffer);
+
+    return new ColumnSchema(name, type, isHidden, columnCategory);
+  }
+
+  public static Type getType(TypeEnum typeEnum) {
+    switch (typeEnum) {
+      case BOOLEAN:
+        return BooleanType.BOOLEAN;
+      case INT32:
+        return INT32;
+      case INT64:
+        return INT64;
+      case FLOAT:
+        return FloatType.FLOAT;
+      case DOUBLE:
+        return DoubleType.DOUBLE;
+      case TEXT:
+        return BinaryType.TEXT;
+      default:
+        return UnknownType.UNKNOWN;
+    }
+  }
+
   public static Builder builder() {
     return new Builder();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/DeviceEntry.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/DeviceEntry.java
index cac441ef1f1..396352ab78d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/DeviceEntry.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/DeviceEntry.java
@@ -20,7 +20,13 @@
 package org.apache.iotdb.db.queryengine.plan.relational.metadata;
 
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 public class DeviceEntry {
@@ -41,6 +47,32 @@ public class DeviceEntry {
     return attributeColumnValues;
   }
 
+  public void serialize(ByteBuffer byteBuffer) {
+    deviceID.serialize(byteBuffer);
+    ReadWriteIOUtils.write(attributeColumnValues.size(), byteBuffer);
+    for (String value : attributeColumnValues) {
+      ReadWriteIOUtils.write(value, byteBuffer);
+    }
+  }
+
+  public void serialize(DataOutputStream stream) throws IOException {
+    deviceID.serialize(stream);
+    ReadWriteIOUtils.write(attributeColumnValues.size(), stream);
+    for (String value : attributeColumnValues) {
+      ReadWriteIOUtils.write(value, stream);
+    }
+  }
+
+  public static DeviceEntry deserialize(ByteBuffer byteBuffer) {
+    IDeviceID iDeviceID = StringArrayDeviceID.deserialize(byteBuffer);
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<String> attributeColumnValues = new ArrayList<>(size);
+    while (size-- > 0) {
+      attributeColumnValues.add(ReadWriteIOUtils.readString(byteBuffer));
+    }
+    return new DeviceEntry(iDeviceID, attributeColumnValues);
+  }
+
   @Override
   public String toString() {
     return "DeviceEntry{"
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java
index b7d57d5f056..9edc2ad3c54 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java
@@ -17,7 +17,13 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -59,6 +65,49 @@ public class OrderingScheme {
     return orderings.get(symbol);
   }
 
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(orderBy.size(), byteBuffer);
+    for (Symbol symbol : orderBy) {
+      Symbol.serialize(symbol, byteBuffer);
+    }
+
+    ReadWriteIOUtils.write(orderings.size(), byteBuffer);
+    for (Map.Entry<Symbol, SortOrder> entry : orderings.entrySet()) {
+      Symbol.serialize(entry.getKey(), byteBuffer);
+      ReadWriteIOUtils.write(entry.getValue().ordinal(), byteBuffer);
+    }
+  }
+
+  public void serialize(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(orderBy.size(), stream);
+    for (Symbol symbol : orderBy) {
+      Symbol.serialize(symbol, stream);
+    }
+
+    ReadWriteIOUtils.write(orderings.size(), stream);
+    for (Map.Entry<Symbol, SortOrder> entry : orderings.entrySet()) {
+      Symbol.serialize(entry.getKey(), stream);
+      ReadWriteIOUtils.write(entry.getValue().ordinal(), stream);
+    }
+  }
+
+  public static OrderingScheme deserialize(ByteBuffer byteBuffer) {
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<Symbol> orderBy = new ArrayList<>(size);
+    while (size-- > 0) {
+      orderBy.add(Symbol.deserialize(byteBuffer));
+    }
+
+    size = ReadWriteIOUtils.readInt(byteBuffer);
+    Map<Symbol, SortOrder> orderings = new HashMap<>(size);
+    while (size-- > 0) {
+      orderings.put(
+          Symbol.deserialize(byteBuffer), 
SortOrder.values()[ReadWriteIOUtils.readInt(byteBuffer)]);
+    }
+
+    return new OrderingScheme(orderBy, orderings);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/Symbol.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/Symbol.java
index 847197ba6b1..946e90d8cbe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/Symbol.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/Symbol.java
@@ -22,6 +22,12 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner;
 import org.apache.iotdb.db.relational.sql.tree.Expression;
 import org.apache.iotdb.db.relational.sql.tree.SymbolReference;
 
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.Objects.requireNonNull;
 
@@ -74,4 +80,16 @@ public class Symbol implements Comparable<Symbol> {
   public int compareTo(Symbol o) {
     return name.compareTo(o.name);
   }
+
+  public static void serialize(Symbol symbol, ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(symbol.getName(), byteBuffer);
+  }
+
+  public static void serialize(Symbol symbol, DataOutputStream stream) throws 
IOException {
+    ReadWriteIOUtils.write(symbol.getName(), stream);
+  }
+
+  public static Symbol deserialize(ByteBuffer byteBuffer) {
+    return new Symbol(ReadWriteIOUtils.readString(byteBuffer));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/ExchangeNodeGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/ExchangeNodeGenerator.java
index d11f3a1ee82..63b7a4896ab 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/ExchangeNodeGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/ExchangeNodeGenerator.java
@@ -13,21 +13,84 @@
  */
 package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
-public class ExchangeNodeGenerator
-    extends SimplePlanRewriter<ExchangeNodeGenerator.DistributionPlanContext> {
+import static 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType.SAME_WITH_ALL_CHILDREN;
+
+public class ExchangeNodeGenerator extends 
SimplePlanRewriter<ExchangeNodeGenerator.PlanContext> {
 
   @Override
-  public List<PlanNode> visitTableScan(
-      TableScanNode node, ExchangeNodeGenerator.DistributionPlanContext 
context) {
-    // TODO process that the data of TableScanNode locates in multi data 
regions
-    return Collections.singletonList(node);
+  public List<PlanNode> visitTableScan(TableScanNode node, PlanContext 
context) {
+
+    if (node.getRegionReplicaSetList().size() > 1) {
+      context.hasExchangeNode = true;
+      List<Symbol> orderBy = node.getOutputSymbols().subList(0, 1);
+      Map<Symbol, SortOrder> orderings =
+          Collections.singletonMap(node.getOutputSymbols().get(0), 
SortOrder.ASC_NULLS_LAST);
+      OrderingScheme orderingScheme = new OrderingScheme(orderBy, orderings);
+      MergeSortNode mergeSortNode =
+          new MergeSortNode(
+              context.queryContext.getQueryId().genPlanNodeId(),
+              orderingScheme,
+              node.getOutputSymbols());
+
+      for (int i = 0; i < node.getRegionReplicaSetList().size(); i++) {
+        TRegionReplicaSet regionReplicaSet = 
node.getRegionReplicaSetList().get(i);
+        TableScanNode subTableScanNode = node.clone();
+        
subTableScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId());
+        subTableScanNode.setRegionReplicaSet(regionReplicaSet);
+        context.nodeDistributionMap.put(
+            subTableScanNode.getPlanNodeId(),
+            new NodeDistribution(SAME_WITH_ALL_CHILDREN, regionReplicaSet));
+
+        // TODO not use 0 replica set as root replica?
+        if (i == 0) {
+          mergeSortNode.addChild(subTableScanNode);
+          context.nodeDistributionMap.put(
+              mergeSortNode.getPlanNodeId(),
+              new NodeDistribution(SAME_WITH_ALL_CHILDREN, regionReplicaSet));
+        } else {
+          ExchangeNode exchangeNode =
+              new 
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+          exchangeNode.addChild(subTableScanNode);
+          mergeSortNode.addChild(exchangeNode);
+        }
+      }
+      return Collections.singletonList(mergeSortNode);
+    } else {
+      node.setRegionReplicaSet(node.getRegionReplicaSetList().get(0));
+      return Collections.singletonList(node);
+    }
   }
 
-  public static class DistributionPlanContext {}
+  public static class PlanContext {
+    final MPPQueryContext queryContext;
+    final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
+    TRegionReplicaSet mostlyUsedDataRegion;
+    boolean hasExchangeNode = false;
+
+    public PlanContext(MPPQueryContext queryContext) {
+      this.queryContext = queryContext;
+      this.nodeDistributionMap = new HashMap<>();
+    }
+
+    public NodeDistribution getNodeDistribution(PlanNodeId nodeId) {
+      return this.nodeDistributionMap.get(nodeId);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
index e3cd4e9f8e6..f9dd7e85cec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java
@@ -13,6 +13,7 @@
  */
 package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
@@ -21,13 +22,16 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.db.relational.sql.tree.Query;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING;
@@ -35,45 +39,47 @@ import static 
org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOper
 public class RelationalDistributionPlanner {
   private final Analysis analysis;
   private final LogicalQueryPlan logicalQueryPlan;
-  private final MPPQueryContext context;
+  private final MPPQueryContext mppQueryContext;
 
   public RelationalDistributionPlanner(
-      Analysis analysis, LogicalQueryPlan logicalQueryPlan, MPPQueryContext 
context) {
+      Analysis analysis, LogicalQueryPlan logicalQueryPlan, MPPQueryContext 
mppQueryContext) {
     this.analysis = analysis;
     this.logicalQueryPlan = logicalQueryPlan;
-    this.context = context;
+    this.mppQueryContext = mppQueryContext;
   }
 
   public DistributedQueryPlan plan() {
+    ExchangeNodeGenerator.PlanContext exchangeContext =
+        new ExchangeNodeGenerator.PlanContext(mppQueryContext);
     List<PlanNode> distributedPlanNodeResult =
-        new ExchangeNodeGenerator()
-            .visitPlan(
-                logicalQueryPlan.getRootNode(),
-                new ExchangeNodeGenerator.DistributionPlanContext());
+        new ExchangeNodeGenerator().visitPlan(logicalQueryPlan.getRootNode(), 
exchangeContext);
 
     if (distributedPlanNodeResult.size() != 1) {
       throw new IllegalStateException("root node must return only one");
     }
 
-    PlanNode outputNode = distributedPlanNodeResult.get(0);
-    if (analysis.getStatement() != null && analysis.getStatement() instanceof 
Query) {
+    PlanNode outputNodeWithExchange = distributedPlanNodeResult.get(0);
+    if (analysis.getStatement() instanceof Query) {
       analysis
           .getRespDatasetHeader()
           .setColumnToTsBlockIndexMap(
-              outputNode.getOutputSymbols().stream()
+              outputNodeWithExchange.getOutputSymbols().stream()
                   .map(Symbol::getName)
                   .filter(e -> 
!TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(e))
                   .collect(Collectors.toList()));
     }
+    adjustUpStream(outputNodeWithExchange, exchangeContext);
 
-    SubPlan subPlan = new SubPlanGenerator().splitToSubPlan(logicalQueryPlan);
+    SubPlan subPlan =
+        new SubPlanGenerator()
+            .splitToSubPlan(logicalQueryPlan.getContext().getQueryId(), 
outputNodeWithExchange);
     subPlan.getPlanFragment().setRoot(true);
 
     List<FragmentInstance> fragmentInstances =
-        new FragmentInstanceGenerator(subPlan, analysis, context).plan();
+        new FragmentInstanceGenerator(subPlan, analysis, 
mppQueryContext).plan();
 
     // Only execute this step for READ operation
-    if (context.getQueryType() == QueryType.READ) {
+    if (mppQueryContext.getQueryType() == QueryType.READ) {
       setSinkForRootInstance(subPlan, fragmentInstances);
     }
 
@@ -96,14 +102,17 @@ public class RelationalDistributionPlanner {
 
     IdentitySinkNode sinkNode =
         new IdentitySinkNode(
-            context.getQueryId().genPlanNodeId(),
+            mppQueryContext.getQueryId().genPlanNodeId(),
             
Collections.singletonList(rootInstance.getFragment().getPlanNodeTree()),
             Collections.singletonList(
                 new DownStreamChannelLocation(
-                    context.getLocalDataBlockEndpoint(),
-                    
context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(),
-                    
context.getResultNodeContext().getVirtualResultNodeId().getId())));
-    context
+                    mppQueryContext.getLocalDataBlockEndpoint(),
+                    mppQueryContext
+                        .getResultNodeContext()
+                        .getVirtualFragmentInstanceId()
+                        .toThrift(),
+                    
mppQueryContext.getResultNodeContext().getVirtualResultNodeId().getId())));
+    mppQueryContext
         .getResultNodeContext()
         .setUpStream(
             rootInstance.getHostDataNode().mPPDataExchangeEndPoint,
@@ -111,4 +120,36 @@ public class RelationalDistributionPlanner {
             sinkNode.getPlanNodeId());
     rootInstance.getFragment().setPlanNodeTree(sinkNode);
   }
+
+  private void adjustUpStream(PlanNode root, ExchangeNodeGenerator.PlanContext 
exchangeContext) {
+    if (!exchangeContext.hasExchangeNode) {
+      return;
+    }
+
+    adjustUpStreamHelper(root, exchangeContext, new HashMap<>());
+  }
+
+  private void adjustUpStreamHelper(
+      PlanNode root,
+      ExchangeNodeGenerator.PlanContext exchangeContext,
+      Map<TRegionReplicaSet, IdentitySinkNode> regionNodemap) {
+    for (PlanNode child : root.getChildren()) {
+      adjustUpStreamHelper(child, exchangeContext, regionNodemap);
+
+      if (child instanceof ExchangeNode) {
+        ExchangeNode exchangeNode = (ExchangeNode) child;
+        IdentitySinkNode identitySinkNode =
+            regionNodemap.computeIfAbsent(
+                exchangeContext
+                    
.getNodeDistribution(exchangeNode.getChild().getPlanNodeId())
+                    .getRegion(),
+                k -> new 
IdentitySinkNode(mppQueryContext.getQueryId().genPlanNodeId()));
+        identitySinkNode.addChild(exchangeNode.getChild());
+        identitySinkNode.addDownStreamChannelLocation(
+            new 
DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
+        exchangeNode.setChild(identitySinkNode);
+        
exchangeNode.setIndexOfUpstreamSinkHandle(identitySinkNode.getCurrentLastIndex());
+      }
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SubPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SubPlanGenerator.java
index de160e61145..8f7027dc7bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SubPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SubPlanGenerator.java
@@ -15,7 +15,6 @@
 package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 
 import org.apache.iotdb.db.queryengine.common.QueryId;
-import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -32,11 +31,10 @@ import java.util.Set;
 /** Split SubPlan according to ExchangeNode. */
 public class SubPlanGenerator {
 
-  public SubPlan splitToSubPlan(LogicalQueryPlan logicalQueryPlan) {
-    QueryId queryId = logicalQueryPlan.getContext().getQueryId();
-    SubPlan rootSubPlan = createSubPlan(logicalQueryPlan.getRootNode(), 
queryId);
+  public SubPlan splitToSubPlan(QueryId queryId, PlanNode rootPlanNode) {
+    SubPlan rootSubPlan = createSubPlan(rootPlanNode, queryId);
     Set<PlanNodeId> visitedSinkNode = new HashSet<>();
-    splitToSubPlan(logicalQueryPlan.getRootNode(), rootSubPlan, 
visitedSinkNode, queryId);
+    splitToSubPlan(rootPlanNode, rootSubPlan, visitedSinkNode, queryId);
     return rootSubPlan;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
index 60b1b564898..5fe412ca3f3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
@@ -17,8 +17,11 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.distribute;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.SimplePlanVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
@@ -97,5 +100,23 @@ public class TableModelTypeProviderExtractor {
       node.getChild().accept(this, context);
       return null;
     }
+
+    @Override
+    public Void visitMergeSort(MergeSortNode node, Void context) {
+      node.getChildren().forEach(c -> c.accept(this, context));
+      return null;
+    }
+
+    @Override
+    public Void visitExchange(ExchangeNode node, Void context) {
+      node.getChildren().forEach(c -> c.accept(this, context));
+      return null;
+    }
+
+    @Override
+    public Void visitIdentitySink(IdentitySinkNode node, Void context) {
+      node.getChildren().forEach(c -> c.accept(this, context));
+      return null;
+    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
index 68289d5a83d..87cbfe2edee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java
@@ -60,6 +60,10 @@ public class FilterNode extends SingleChildProcessNode {
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
 
+  public static FilterNode deserialize(ByteBuffer byteBuffer) {
+    return null;
+  }
+
   public Expression getPredicate() {
     return predicate;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
index d3399cb1e16..e73519c2c17 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java
@@ -62,6 +62,10 @@ public class LimitNode extends SingleChildProcessNode {
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
 
+  public static LimitNode deserialize(ByteBuffer byteBuffer) {
+    return null;
+  }
+
   @Override
   public List<Symbol> getOutputSymbols() {
     return child.getOutputSymbols();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java
index 68c1ffdc17e..4bce78f7c9e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java
@@ -20,14 +20,18 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.node;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 public class MergeSortNode extends MultiChildProcessNode {
@@ -57,10 +61,35 @@ public class MergeSortNode extends MultiChildProcessNode {
   }
 
   @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {}
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.TABLE_MERGESORT_NODE.serialize(byteBuffer);
+    orderingScheme.serialize(byteBuffer);
+    ReadWriteIOUtils.write(outputSymbols.size(), byteBuffer);
+    for (Symbol symbol : outputSymbols) {
+      Symbol.serialize(symbol, byteBuffer);
+    }
+  }
 
   @Override
-  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.TABLE_MERGESORT_NODE.serialize(stream);
+    orderingScheme.serialize(stream);
+    ReadWriteIOUtils.write(outputSymbols.size(), stream);
+    for (Symbol symbol : outputSymbols) {
+      Symbol.serialize(symbol, stream);
+    }
+  }
+
+  public static MergeSortNode deserialize(ByteBuffer byteBuffer) {
+    OrderingScheme orderingScheme = OrderingScheme.deserialize(byteBuffer);
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<Symbol> outputSymbols = new ArrayList<>(size);
+    while (size-- > 0) {
+      outputSymbols.add(Symbol.deserialize(byteBuffer));
+    }
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new MergeSortNode(planNodeId, orderingScheme, outputSymbols);
+  }
 
   @Override
   public List<Symbol> getOutputSymbols() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
index feca446b9d0..a491f02d874 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java
@@ -54,6 +54,10 @@ public class OffsetNode extends SingleChildProcessNode {
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
 
+  public static OffsetNode deserialize(ByteBuffer byteBuffer) {
+    return null;
+  }
+
   @Override
   public List<Symbol> getOutputSymbols() {
     return child.getOutputSymbols();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
index fb76b4590f2..6a707a30000 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java
@@ -16,15 +16,18 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.node;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 
 public class OutputNode extends SingleChildProcessNode {
@@ -42,6 +45,13 @@ public class OutputNode extends SingleChildProcessNode {
     this.outputs = ImmutableList.copyOf(outputs);
   }
 
+  public OutputNode(PlanNodeId id, List<String> columnNames, List<Symbol> 
outputs) {
+    super(id);
+    this.id = id;
+    this.columnNames = ImmutableList.copyOf(columnNames);
+    this.outputs = ImmutableList.copyOf(outputs);
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitOutput(this, context);
@@ -58,10 +68,41 @@ public class OutputNode extends SingleChildProcessNode {
   }
 
   @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {}
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.TABLE_OUTPUT_NODE.serialize(byteBuffer);
+    ReadWriteIOUtils.write(columnNames.size(), byteBuffer);
+    columnNames.forEach(columnName -> ReadWriteIOUtils.write(columnName, 
byteBuffer));
+    ReadWriteIOUtils.write(outputs.size(), byteBuffer);
+    outputs.forEach(symbol -> Symbol.serialize(symbol, byteBuffer));
+  }
 
   @Override
-  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.TABLE_OUTPUT_NODE.serialize(stream);
+    ReadWriteIOUtils.write(columnNames.size(), stream);
+    for (String columnName : columnNames) {
+      ReadWriteIOUtils.write(columnName, stream);
+    }
+    ReadWriteIOUtils.write(outputs.size(), stream);
+    for (Symbol symbol : outputs) {
+      Symbol.serialize(symbol, stream);
+    }
+  }
+
+  public static OutputNode deserialize(ByteBuffer byteBuffer) {
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<String> columnNames = new ArrayList<>(size);
+    while (size-- > 0) {
+      columnNames.add(ReadWriteIOUtils.readString(byteBuffer));
+    }
+    size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<Symbol> outputs = new ArrayList<>(size);
+    while (size-- > 0) {
+      outputs.add(Symbol.deserialize(byteBuffer));
+    }
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new OutputNode(planNodeId, columnNames, outputs);
+  }
 
   public List<String> getColumnNames() {
     return this.columnNames;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
index 15167478367..5465979bd56 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java
@@ -59,6 +59,10 @@ public class ProjectNode extends SingleChildProcessNode {
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
 
+  public static ProjectNode deserialize(ByteBuffer byteBuffer) {
+    return null;
+  }
+
   @Override
   public List<Symbol> getOutputSymbols() {
     return assignments.getOutputs();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
index 474101ddd72..c2dd7754d01 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java
@@ -57,6 +57,10 @@ public class SortNode extends SingleChildProcessNode {
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
 
+  public static SortNode deserialize(ByteBuffer byteBuffer) {
+    return null;
+  }
+
   @Override
   public List<Symbol> getOutputSymbols() {
     return child.getOutputSymbols();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
index cd29d50eb68..3d560e9efce 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java
@@ -17,6 +17,7 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.node;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
@@ -26,12 +27,15 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 import org.apache.iotdb.db.relational.sql.tree.Expression;
 
 import com.google.common.collect.ImmutableList;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import javax.annotation.Nullable;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -64,6 +68,8 @@ public class TableScanNode extends SourceNode {
   // The id of DataRegion where the node will run
   private TRegionReplicaSet regionReplicaSet;
 
+  private List<TRegionReplicaSet> regionReplicaSetList;
+
   public TableScanNode(
       PlanNodeId id,
       String qualifiedTableName,
@@ -75,6 +81,25 @@ public class TableScanNode extends SourceNode {
     this.assignments = assignments;
   }
 
+  public TableScanNode(
+      PlanNodeId id,
+      String qualifiedTableName,
+      List<Symbol> outputSymbols,
+      Map<Symbol, ColumnSchema> assignments,
+      List<DeviceEntry> deviceEntries,
+      Map<Symbol, Integer> idAndAttributeIndexMap,
+      Ordering scanOrder,
+      Expression pushDownPredicate) {
+    super(id);
+    this.qualifiedTableName = qualifiedTableName;
+    this.outputSymbols = outputSymbols;
+    this.assignments = assignments;
+    this.deviceEntries = deviceEntries;
+    this.idAndAttributeIndexMap = idAndAttributeIndexMap;
+    this.scanOrder = scanOrder;
+    this.pushDownPredicate = pushDownPredicate;
+  }
+
   @Override
   public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
     return visitor.visitTableScan(this, context);
@@ -89,8 +114,16 @@ public class TableScanNode extends SourceNode {
   public void addChild(PlanNode child) {}
 
   @Override
-  public PlanNode clone() {
-    return null;
+  public TableScanNode clone() {
+    return new TableScanNode(
+        getPlanNodeId(),
+        qualifiedTableName,
+        outputSymbols,
+        assignments,
+        deviceEntries,
+        idAndAttributeIndexMap,
+        scanOrder,
+        pushDownPredicate);
   }
 
   @Override
@@ -104,10 +137,105 @@ public class TableScanNode extends SourceNode {
   }
 
   @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {}
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.TABLE_SCAN_NODE.serialize(byteBuffer);
+    ReadWriteIOUtils.write(qualifiedTableName, byteBuffer);
+
+    ReadWriteIOUtils.write(outputSymbols.size(), byteBuffer);
+    outputSymbols.forEach(symbol -> ReadWriteIOUtils.write(symbol.getName(), 
byteBuffer));
+
+    ReadWriteIOUtils.write(assignments.size(), byteBuffer);
+    for (Map.Entry<Symbol, ColumnSchema> entry : assignments.entrySet()) {
+      Symbol.serialize(entry.getKey(), byteBuffer);
+      ColumnSchema.serialize(entry.getValue(), byteBuffer);
+    }
+
+    ReadWriteIOUtils.write(deviceEntries.size(), byteBuffer);
+    for (DeviceEntry entry : deviceEntries) {
+      entry.serialize(byteBuffer);
+    }
+
+    ReadWriteIOUtils.write(idAndAttributeIndexMap.size(), byteBuffer);
+    for (Map.Entry<Symbol, Integer> entry : idAndAttributeIndexMap.entrySet()) 
{
+      Symbol.serialize(entry.getKey(), byteBuffer);
+      ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+    }
+
+    ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
+  }
 
   @Override
-  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.TABLE_SCAN_NODE.serialize(stream);
+    ReadWriteIOUtils.write(qualifiedTableName, stream);
+
+    ReadWriteIOUtils.write(outputSymbols.size(), stream);
+    for (Symbol symbol : outputSymbols) {
+      ReadWriteIOUtils.write(symbol.getName(), stream);
+    }
+
+    ReadWriteIOUtils.write(assignments.size(), stream);
+    for (Map.Entry<Symbol, ColumnSchema> entry : assignments.entrySet()) {
+      Symbol.serialize(entry.getKey(), stream);
+      ColumnSchema.serialize(entry.getValue(), stream);
+    }
+
+    ReadWriteIOUtils.write(deviceEntries.size(), stream);
+    for (DeviceEntry entry : deviceEntries) {
+      entry.serialize(stream);
+    }
+
+    ReadWriteIOUtils.write(idAndAttributeIndexMap.size(), stream);
+    for (Map.Entry<Symbol, Integer> entry : idAndAttributeIndexMap.entrySet()) 
{
+      Symbol.serialize(entry.getKey(), stream);
+      ReadWriteIOUtils.write(entry.getValue(), stream);
+    }
+
+    ReadWriteIOUtils.write(scanOrder.ordinal(), stream);
+  }
+
+  public static TableScanNode deserialize(ByteBuffer byteBuffer) {
+    String qualifiedTableName = ReadWriteIOUtils.readString(byteBuffer);
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+
+    List<Symbol> outputSymbols = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      outputSymbols.add(Symbol.deserialize(byteBuffer));
+    }
+
+    size = ReadWriteIOUtils.readInt(byteBuffer);
+    Map<Symbol, ColumnSchema> assignments = new HashMap<>(size);
+    for (int i = 0; i < size; i++) {
+      assignments.put(Symbol.deserialize(byteBuffer), 
ColumnSchema.deserialize(byteBuffer));
+    }
+
+    size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<DeviceEntry> deviceEntries = new ArrayList<>(size);
+    while (size-- > 0) {
+      deviceEntries.add(DeviceEntry.deserialize(byteBuffer));
+    }
+
+    size = ReadWriteIOUtils.readInt(byteBuffer);
+    Map<Symbol, Integer> idAndAttributeIndexMap = new HashMap<>(size);
+    while (size-- > 0) {
+      idAndAttributeIndexMap.put(
+          Symbol.deserialize(byteBuffer), 
ReadWriteIOUtils.readInt(byteBuffer));
+    }
+
+    Ordering scanOrder = 
Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+
+    return new TableScanNode(
+        planNodeId,
+        qualifiedTableName,
+        outputSymbols,
+        assignments,
+        deviceEntries,
+        idAndAttributeIndexMap,
+        scanOrder,
+        null);
+  }
 
   @Override
   public List<Symbol> getOutputSymbols() {
@@ -186,6 +314,14 @@ public class TableScanNode extends SourceNode {
     return this.regionReplicaSet;
   }
 
+  public List<TRegionReplicaSet> getRegionReplicaSetList() {
+    return regionReplicaSetList;
+  }
+
+  public void setRegionReplicaSetList(List<TRegionReplicaSet> 
regionReplicaSetList) {
+    this.regionReplicaSetList = regionReplicaSetList;
+  }
+
   public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) {
     this.regionReplicaSet = regionReplicaSet;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
index b554e852933..6a78ac84e95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java
@@ -75,6 +75,10 @@ public class TopKNode extends MultiChildProcessNode {
   @Override
   protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
 
+  public static TopKNode deserialize(ByteBuffer byteBuffer) {
+    return null;
+  }
+
   @Override
   public List<Symbol> getOutputSymbols() {
     return outputSymbols;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
index 67b11eb95fd..0e1ccefde5d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java
@@ -17,6 +17,7 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
@@ -135,20 +136,19 @@ public class IndexScan implements RelationalPlanOptimizer 
{
         context.getAnalysis().setFinishQueryAfterAnalyze();
       } else {
         // TODO add the real impl
-        TRegionReplicaSet regionReplicaSet =
-            dataPartition
-                .getDataPartitionMap()
-                .values()
-                .iterator()
-                .next()
-                .values()
-                .iterator()
-                .next()
-                .values()
-                .iterator()
-                .next()
-                .get(0);
-        node.setRegionReplicaSet(regionReplicaSet);
+        Set<TRegionReplicaSet> regionReplicaSet = new HashSet<>();
+        for (Map.Entry<
+                String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>>>
+            e1 : dataPartition.getDataPartitionMap().entrySet()) {
+          for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>>
+              e2 : e1.getValue().entrySet()) {
+            for (Map.Entry<TTimePartitionSlot, List<TRegionReplicaSet>> e3 :
+                e2.getValue().entrySet()) {
+              regionReplicaSet.addAll(e3.getValue());
+            }
+          }
+        }
+        node.setRegionReplicaSetList(new ArrayList<>(regionReplicaSet));
       }
 
       return node;
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
index c516cc17f19..b7d6ca02dae 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java
@@ -34,7 +34,7 @@ public enum TsTableColumnCategory {
 
   private final byte category;
 
-  private TsTableColumnCategory(byte category) {
+  TsTableColumnCategory(byte category) {
     this.category = category;
   }
 
@@ -46,6 +46,10 @@ public enum TsTableColumnCategory {
     ReadWriteIOUtils.write(category, stream);
   }
 
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(category, byteBuffer);
+  }
+
   public static TsTableColumnCategory deserialize(InputStream stream) throws 
IOException {
     byte category = (byte) stream.read();
     return deserialize(category);

Reply via email to