This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fc965db8734 Add serialize and deserialize method for Agg-related Node
fc965db8734 is described below

commit fc965db873482dc6d139fb8d128d2fab51626bf3
Author: Weihao Li <[email protected]>
AuthorDate: Tue Oct 8 19:26:58 2024 +0800

    Add serialize and deserialize method for Agg-related Node
---
 .../db/queryengine/plan/analyze/TypeProvider.java  |  10 +-
 .../plan/planner/plan/node/PlanNodeType.java       |   8 +
 .../plan/relational/function/BoundSignature.java   |  36 +++
 .../plan/relational/function/FunctionKind.java     |  20 +-
 .../plan/relational/metadata/ColumnSchema.java     |  47 +---
 .../relational/metadata/FunctionNullability.java   |  31 +++
 .../plan/relational/metadata/ResolvedFunction.java |  32 ++-
 .../relational/planner/node/AggregationNode.java   | 219 +++++++++++++++-
 .../planner/node/AggregationTableScanNode.java     | 275 +++++++++++++++++++++
 .../plan/relational/utils/TypeUtil.java            | 105 ++++++++
 10 files changed, 730 insertions(+), 53 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
index d599e702d42..da39e926815 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
@@ -20,12 +20,11 @@
 package org.apache.iotdb.db.queryengine.plan.analyze;
 
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.TypeUtil;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.read.common.type.Type;
-import org.apache.tsfile.read.common.type.TypeEnum;
-import org.apache.tsfile.read.common.type.TypeFactory;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
@@ -162,7 +161,7 @@ public class TypeProvider {
       ReadWriteIOUtils.write(tableModelTypes.size(), byteBuffer);
       for (Map.Entry<Symbol, Type> entry : tableModelTypes.entrySet()) {
         ReadWriteIOUtils.write(entry.getKey().getName(), byteBuffer);
-        ReadWriteIOUtils.write(entry.getValue().getTypeEnum().ordinal(), 
byteBuffer);
+        TypeUtil.serialize(entry.getValue(), byteBuffer);
       }
     }
   }
@@ -190,7 +189,7 @@ public class TypeProvider {
       ReadWriteIOUtils.write(tableModelTypes.size(), stream);
       for (Map.Entry<Symbol, Type> entry : tableModelTypes.entrySet()) {
         ReadWriteIOUtils.write(entry.getKey().getName(), stream);
-        ReadWriteIOUtils.write(entry.getValue().getTypeEnum().ordinal(), 
stream);
+        TypeUtil.serialize(entry.getValue(), stream);
       }
     }
   }
@@ -218,8 +217,7 @@ public class TypeProvider {
       tableModelTypes = new HashMap<>(mapSize);
       while (mapSize > 0) {
         tableModelTypes.put(
-            new Symbol(ReadWriteIOUtils.readString(byteBuffer)),
-            
TypeFactory.getType(TypeEnum.values()[ReadWriteIOUtils.readInt(byteBuffer)]));
+            new Symbol(ReadWriteIOUtils.readString(byteBuffer)), 
TypeUtil.deserialize(byteBuffer));
         mapSize--;
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 26d0d4c82ea..f6ea426a627 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -251,6 +251,8 @@ public enum PlanNodeType {
   TABLE_COLLECT_NODE((short) 1009),
   TABLE_STREAM_SORT_NODE((short) 1010),
   TABLE_JOIN_NODE((short) 1011),
+  TABLE_AGGREGATION_NODE((short) 1012),
+  TABLE_AGGREGATION_TABLE_SCAN_NODE((short) 1013),
 
   RELATIONAL_INSERT_TABLET((short) 2000),
   RELATIONAL_INSERT_ROW((short) 2001),
@@ -567,6 +569,12 @@ public enum PlanNodeType {
       case 1011:
         return 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.deserialize(
             buffer);
+      case 1012:
+        return 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode
+            .deserialize(buffer);
+      case 1013:
+        return 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode
+            .deserialize(buffer);
       case 2000:
         return RelationalInsertTabletNode.deserialize(buffer);
       case 2001:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/BoundSignature.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/BoundSignature.java
index 883af02640c..6347209d722 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/BoundSignature.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/BoundSignature.java
@@ -19,8 +19,15 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.function;
 
+import org.apache.iotdb.db.queryengine.plan.relational.utils.TypeUtil;
+
 import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
@@ -94,4 +101,33 @@ public class BoundSignature {
         + argumentTypes.stream().map(Type::toString).collect(joining(", ", 
"(", "):"))
         + returnType;
   }
+
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(functionName, byteBuffer);
+    TypeUtil.serialize(returnType, byteBuffer);
+    ReadWriteIOUtils.write(argumentTypes.size(), byteBuffer);
+    for (Type type : argumentTypes) {
+      TypeUtil.serialize(type, byteBuffer);
+    }
+  }
+
+  public void serialize(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(functionName, stream);
+    TypeUtil.serialize(returnType, stream);
+    ReadWriteIOUtils.write(argumentTypes.size(), stream);
+    for (Type type : argumentTypes) {
+      TypeUtil.serialize(type, stream);
+    }
+  }
+
+  public static BoundSignature deserialize(ByteBuffer byteBuffer) {
+    String functionName = ReadWriteIOUtils.readString(byteBuffer);
+    Type returnType = TypeUtil.deserialize(byteBuffer);
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<Type> argumentTypes = new ArrayList<>(size);
+    while (size-- > 0) {
+      argumentTypes.add(TypeUtil.deserialize(byteBuffer));
+    }
+    return new BoundSignature(functionName, returnType, argumentTypes);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/FunctionKind.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/FunctionKind.java
index 1e7c3da34a7..1e336502af9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/FunctionKind.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/FunctionKind.java
@@ -19,9 +19,27 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.function;
 
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
 public enum FunctionKind {
   SCALAR,
   AGGREGATE,
   WINDOW,
-  TABLE,
+  TABLE;
+
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(ordinal(), byteBuffer);
+  }
+
+  public void serialize(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(ordinal(), stream);
+  }
+
+  public static FunctionKind deserialize(ByteBuffer byteBuffer) {
+    return FunctionKind.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
index ed224017b12..ea9e1f1f4c7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
@@ -21,19 +21,10 @@ package 
org.apache.iotdb.db.queryengine.plan.relational.metadata;
 
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.TypeUtil;
 
-import org.apache.tsfile.read.common.type.BinaryType;
-import org.apache.tsfile.read.common.type.BlobType;
-import org.apache.tsfile.read.common.type.BooleanType;
-import org.apache.tsfile.read.common.type.DateType;
-import org.apache.tsfile.read.common.type.DoubleType;
-import org.apache.tsfile.read.common.type.FloatType;
-import org.apache.tsfile.read.common.type.StringType;
-import org.apache.tsfile.read.common.type.TimestampType;
 import org.apache.tsfile.read.common.type.Type;
-import org.apache.tsfile.read.common.type.TypeEnum;
 import org.apache.tsfile.read.common.type.TypeFactory;
-import org.apache.tsfile.read.common.type.UnknownType;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
@@ -44,8 +35,6 @@ import java.util.StringJoiner;
 
 import static java.util.Locale.ENGLISH;
 import static java.util.Objects.requireNonNull;
-import static org.apache.tsfile.read.common.type.IntType.INT32;
-import static org.apache.tsfile.read.common.type.LongType.INT64;
 
 public class ColumnSchema {
   private final String name;
@@ -107,7 +96,7 @@ public class ColumnSchema {
 
   public static void serialize(ColumnSchema columnSchema, ByteBuffer 
byteBuffer) {
     ReadWriteIOUtils.write(columnSchema.getName(), byteBuffer);
-    ReadWriteIOUtils.write(columnSchema.getType().getTypeEnum().ordinal(), 
byteBuffer);
+    TypeUtil.serialize(columnSchema.getType(), byteBuffer);
     columnSchema.getColumnCategory().serialize(byteBuffer);
     ReadWriteIOUtils.write(columnSchema.isHidden(), byteBuffer);
   }
@@ -115,48 +104,20 @@ public class ColumnSchema {
   public static void serialize(ColumnSchema columnSchema, DataOutputStream 
stream)
       throws IOException {
     ReadWriteIOUtils.write(columnSchema.getName(), stream);
-    ReadWriteIOUtils.write(columnSchema.getType().getTypeEnum().ordinal(), 
stream);
+    TypeUtil.serialize(columnSchema.getType(), stream);
     columnSchema.getColumnCategory().serialize(stream);
     ReadWriteIOUtils.write(columnSchema.isHidden(), stream);
   }
 
   public static ColumnSchema deserialize(ByteBuffer byteBuffer) {
     String name = ReadWriteIOUtils.readString(byteBuffer);
-    TypeEnum typeEnum = 
TypeEnum.values()[ReadWriteIOUtils.readInt(byteBuffer)];
-    Type type = getType(typeEnum);
+    Type type = TypeUtil.deserialize(byteBuffer);
     TsTableColumnCategory columnCategory = 
TsTableColumnCategory.deserialize(byteBuffer);
     boolean isHidden = ReadWriteIOUtils.readBool(byteBuffer);
 
     return new ColumnSchema(name, type, isHidden, columnCategory);
   }
 
-  public static Type getType(TypeEnum typeEnum) {
-    switch (typeEnum) {
-      case BOOLEAN:
-        return BooleanType.BOOLEAN;
-      case INT32:
-        return INT32;
-      case INT64:
-        return INT64;
-      case FLOAT:
-        return FloatType.FLOAT;
-      case DOUBLE:
-        return DoubleType.DOUBLE;
-      case TEXT:
-        return BinaryType.TEXT;
-      case STRING:
-        return StringType.STRING;
-      case BLOB:
-        return BlobType.BLOB;
-      case TIMESTAMP:
-        return TimestampType.TIMESTAMP;
-      case DATE:
-        return DateType.DATE;
-      default:
-        return UnknownType.UNKNOWN;
-    }
-  }
-
   public static ColumnSchema ofTsColumnSchema(TsTableColumnSchema schema) {
     return new ColumnSchema(
         schema.getColumnName(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/FunctionNullability.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/FunctionNullability.java
index 417b9a12126..d838d335b8d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/FunctionNullability.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/FunctionNullability.java
@@ -19,6 +19,11 @@
 
 package org.apache.iotdb.db.queryengine.plan.relational.metadata;
 
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -80,4 +85,30 @@ public class FunctionNullability {
     return argumentNullable.stream().map(Objects::toString).collect(joining(", 
", "(", ")"))
         + returnNullable;
   }
+
+  public void serialize(ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(returnNullable, byteBuffer);
+    ReadWriteIOUtils.write(argumentNullable.size(), byteBuffer);
+    for (boolean eachNullable : argumentNullable) {
+      ReadWriteIOUtils.write(eachNullable, byteBuffer);
+    }
+  }
+
+  public void serialize(DataOutputStream stream) throws IOException {
+    ReadWriteIOUtils.write(returnNullable, stream);
+    ReadWriteIOUtils.write(argumentNullable.size(), stream);
+    for (boolean eachNullable : argumentNullable) {
+      ReadWriteIOUtils.write(eachNullable, stream);
+    }
+  }
+
+  public static FunctionNullability deserialize(ByteBuffer byteBuffer) {
+    boolean returnNullable = ReadWriteIOUtils.readBool(byteBuffer);
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<Boolean> argumentNullable = new ArrayList<>(size);
+    while (size-- > 0) {
+      argumentNullable.add(ReadWriteIOUtils.readBool(byteBuffer));
+    }
+    return new FunctionNullability(returnNullable, argumentNullable);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ResolvedFunction.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ResolvedFunction.java
index ec30b1c78ae..432488612f2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ResolvedFunction.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ResolvedFunction.java
@@ -24,6 +24,11 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.function.FunctionId;
 import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionKind;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName;
 
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Objects;
 
 import static java.util.Objects.requireNonNull;
@@ -47,7 +52,6 @@ public class ResolvedFunction {
     this.functionKind = requireNonNull(functionKind, "functionKind is null");
     this.deterministic = deterministic;
     this.functionNullability = requireNonNull(functionNullability, 
"functionNullability is null");
-    ;
   }
 
   public BoundSignature getSignature() {
@@ -113,4 +117,30 @@ public class ResolvedFunction {
   public String toString() {
     return signature.toString();
   }
+
+  public void serialize(ByteBuffer byteBuffer) {
+    signature.serialize(byteBuffer);
+    ReadWriteIOUtils.write(functionId.toString(), byteBuffer);
+    functionKind.serialize(byteBuffer);
+    ReadWriteIOUtils.write(deterministic, byteBuffer);
+    functionNullability.serialize(byteBuffer);
+  }
+
+  public void serialize(DataOutputStream stream) throws IOException {
+    signature.serialize(stream);
+    ReadWriteIOUtils.write(functionId.toString(), stream);
+    functionKind.serialize(stream);
+    ReadWriteIOUtils.write(deterministic, stream);
+    functionNullability.serialize(stream);
+  }
+
+  public static ResolvedFunction deserialize(ByteBuffer byteBuffer) {
+    BoundSignature signature = BoundSignature.deserialize(byteBuffer);
+    FunctionId functionId = new 
FunctionId(ReadWriteIOUtils.readString(byteBuffer));
+    FunctionKind functionKind = FunctionKind.deserialize(byteBuffer);
+    boolean deterministic = ReadWriteIOUtils.readBool(byteBuffer);
+    FunctionNullability functionNullability = 
FunctionNullability.deserialize(byteBuffer);
+    return new ResolvedFunction(
+        signature, functionId, functionKind, deterministic, 
functionNullability);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationNode.java
index 831b1a3e512..9ab5ea86911 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationNode.java
@@ -17,6 +17,7 @@ import org.apache.iotdb.db.queryengine.common.SessionInfo;
 import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
@@ -30,10 +31,13 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -213,10 +217,111 @@ public class AggregationNode extends 
SingleChildProcessNode {
   }
 
   @Override
-  protected void serializeAttributes(ByteBuffer byteBuffer) {}
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    AggregationNode that = (AggregationNode) o;
+    return Objects.equals(aggregations, that.aggregations)
+        && Objects.equals(groupingSets, that.groupingSets)
+        && Objects.equals(step, that.step)
+        && Objects.equals(hashSymbol, that.hashSymbol)
+        && Objects.equals(groupIdSymbol, that.groupIdSymbol);
+  }
 
   @Override
-  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
+  public int hashCode() {
+    return Objects.hash(
+        super.hashCode(), aggregations, groupingSets, step, hashSymbol, 
groupIdSymbol);
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.TABLE_AGGREGATION_NODE.serialize(byteBuffer);
+    ReadWriteIOUtils.write(aggregations.size(), byteBuffer);
+    aggregations.forEach(
+        (k, v) -> {
+          Symbol.serialize(k, byteBuffer);
+          v.serialize(byteBuffer);
+        });
+    groupingSets.serialize(byteBuffer);
+    ReadWriteIOUtils.write(preGroupedSymbols.size(), byteBuffer);
+    for (Symbol preGroupedSymbol : preGroupedSymbols) {
+      Symbol.serialize(preGroupedSymbol, byteBuffer);
+    }
+    step.serialize(byteBuffer);
+    ReadWriteIOUtils.write(hashSymbol.isPresent(), byteBuffer);
+    if (hashSymbol.isPresent()) {
+      Symbol.serialize(hashSymbol.get(), byteBuffer);
+    }
+    ReadWriteIOUtils.write(groupIdSymbol.isPresent(), byteBuffer);
+    if (groupIdSymbol.isPresent()) {
+      Symbol.serialize(groupIdSymbol.get(), byteBuffer);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.TABLE_AGGREGATION_NODE.serialize(stream);
+    ReadWriteIOUtils.write(aggregations.size(), stream);
+    for (Map.Entry<Symbol, Aggregation> aggregation : aggregations.entrySet()) 
{
+      Symbol.serialize(aggregation.getKey(), stream);
+      aggregation.getValue().serialize(stream);
+    }
+    groupingSets.serialize(stream);
+    ReadWriteIOUtils.write(preGroupedSymbols.size(), stream);
+    for (Symbol preGroupedSymbol : preGroupedSymbols) {
+      Symbol.serialize(preGroupedSymbol, stream);
+    }
+    step.serialize(stream);
+    ReadWriteIOUtils.write(hashSymbol.isPresent(), stream);
+    if (hashSymbol.isPresent()) {
+      Symbol.serialize(hashSymbol.get(), stream);
+    }
+    ReadWriteIOUtils.write(groupIdSymbol.isPresent(), stream);
+    if (groupIdSymbol.isPresent()) {
+      Symbol.serialize(groupIdSymbol.get(), stream);
+    }
+  }
+
+  public static AggregationNode deserialize(ByteBuffer byteBuffer) {
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    final Map<Symbol, Aggregation> aggregations = new HashMap<>(size);
+    while (size-- > 0) {
+      aggregations.put(Symbol.deserialize(byteBuffer), 
Aggregation.deserialize(byteBuffer));
+    }
+    GroupingSetDescriptor groupingSetDescriptor = 
GroupingSetDescriptor.deserialize(byteBuffer);
+    size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<Symbol> preGroupedSymbols = new ArrayList<>(size);
+    while (size-- > 0) {
+      preGroupedSymbols.add(Symbol.deserialize(byteBuffer));
+    }
+    Step step = Step.deserialize(byteBuffer);
+    Optional<Symbol> hashSymbol = Optional.empty();
+    if (ReadWriteIOUtils.readBool(byteBuffer)) {
+      hashSymbol = Optional.of(Symbol.deserialize(byteBuffer));
+    }
+    Optional<Symbol> groupIdSymbol = Optional.empty();
+    if (ReadWriteIOUtils.readBool(byteBuffer)) {
+      groupIdSymbol = Optional.of(Symbol.deserialize(byteBuffer));
+    }
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+    return new AggregationNode(
+        planNodeId,
+        null,
+        aggregations,
+        groupingSetDescriptor,
+        preGroupedSymbols,
+        step,
+        hashSymbol,
+        groupIdSymbol);
+  }
 
   @Override
   public PlanNode replaceChildren(List<PlanNode> newChildren) {
@@ -323,6 +428,45 @@ public class AggregationNode extends 
SingleChildProcessNode {
     public Set<Integer> getGlobalGroupingSets() {
       return globalGroupingSets;
     }
+
+    public void serialize(ByteBuffer byteBuffer) {
+      ReadWriteIOUtils.write(groupingKeys.size(), byteBuffer);
+      for (Symbol symbol : groupingKeys) {
+        Symbol.serialize(symbol, byteBuffer);
+      }
+      ReadWriteIOUtils.write(groupingSetCount, byteBuffer);
+      ReadWriteIOUtils.write(globalGroupingSets.size(), byteBuffer);
+      for (int globalGroupingSet : globalGroupingSets) {
+        ReadWriteIOUtils.write(globalGroupingSet, byteBuffer);
+      }
+    }
+
+    public void serialize(DataOutputStream stream) throws IOException {
+      ReadWriteIOUtils.write(groupingKeys.size(), stream);
+      for (Symbol symbol : groupingKeys) {
+        Symbol.serialize(symbol, stream);
+      }
+      ReadWriteIOUtils.write(groupingSetCount, stream);
+      ReadWriteIOUtils.write(globalGroupingSets.size(), stream);
+      for (int globalGroupingSet : globalGroupingSets) {
+        ReadWriteIOUtils.write(globalGroupingSet, stream);
+      }
+    }
+
+    public static GroupingSetDescriptor deserialize(ByteBuffer byteBuffer) {
+      int size = ReadWriteIOUtils.readInt(byteBuffer);
+      List<Symbol> groupingKeys = new ArrayList<>(size);
+      while (size-- > 0) {
+        groupingKeys.add(Symbol.deserialize(byteBuffer));
+      }
+      int groupingSetCount = ReadWriteIOUtils.readInt(byteBuffer);
+      size = ReadWriteIOUtils.readInt(byteBuffer);
+      Set<Integer> globalGroupingSets = new HashSet<>(size);
+      while (size-- > 0) {
+        globalGroupingSets.add(ReadWriteIOUtils.readInt(byteBuffer));
+      }
+      return new GroupingSetDescriptor(groupingKeys, groupingSetCount, 
globalGroupingSets);
+    }
   }
 
   public enum Step {
@@ -360,6 +504,18 @@ public class AggregationNode extends 
SingleChildProcessNode {
       }
       return Step.FINAL;
     }
+
+    public void serialize(ByteBuffer byteBuffer) {
+      ReadWriteIOUtils.write(ordinal(), byteBuffer);
+    }
+
+    public void serialize(DataOutputStream stream) throws IOException {
+      ReadWriteIOUtils.write(ordinal(), stream);
+    }
+
+    public static Step deserialize(ByteBuffer byteBuffer) {
+      return Step.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+    }
   }
 
   public static class Aggregation {
@@ -459,6 +615,65 @@ public class AggregationNode extends 
SingleChildProcessNode {
           expectedArgumentCount,
           arguments.size());
     }
+
+    public void serialize(ByteBuffer byteBuffer) {
+      resolvedFunction.serialize(byteBuffer);
+      ReadWriteIOUtils.write(arguments.size(), byteBuffer);
+      for (Expression argument : arguments) {
+        Expression.serialize(argument, byteBuffer);
+      }
+      ReadWriteIOUtils.write(distinct, byteBuffer);
+      ReadWriteIOUtils.write(filter.isPresent(), byteBuffer);
+      filter.ifPresent(symbol -> Symbol.serialize(symbol, byteBuffer));
+      ReadWriteIOUtils.write(orderingScheme.isPresent(), byteBuffer);
+      orderingScheme.ifPresent(scheme -> scheme.serialize(byteBuffer));
+      ReadWriteIOUtils.write(mask.isPresent(), byteBuffer);
+      mask.ifPresent(symbol -> Symbol.serialize(symbol, byteBuffer));
+    }
+
+    public void serialize(DataOutputStream stream) throws IOException {
+      resolvedFunction.serialize(stream);
+      ReadWriteIOUtils.write(arguments.size(), stream);
+      for (Expression argument : arguments) {
+        Expression.serialize(argument, stream);
+      }
+      ReadWriteIOUtils.write(distinct, stream);
+      ReadWriteIOUtils.write(filter.isPresent(), stream);
+      if (filter.isPresent()) {
+        Symbol.serialize(filter.get(), stream);
+      }
+      ReadWriteIOUtils.write(orderingScheme.isPresent(), stream);
+      if (orderingScheme.isPresent()) {
+        orderingScheme.get().serialize(stream);
+      }
+      ReadWriteIOUtils.write(mask.isPresent(), stream);
+      if (mask.isPresent()) {
+        Symbol.serialize(mask.get(), stream);
+      }
+    }
+
+    public static Aggregation deserialize(ByteBuffer byteBuffer) {
+      ResolvedFunction function = ResolvedFunction.deserialize(byteBuffer);
+      int size = ReadWriteIOUtils.readInt(byteBuffer);
+      List<Expression> arguments = new ArrayList<>(size);
+      while (size-- > 0) {
+        arguments.add(Expression.deserialize(byteBuffer));
+      }
+      boolean distinct = ReadWriteIOUtils.readBool(byteBuffer);
+      Optional<Symbol> filter = Optional.empty();
+      if (ReadWriteIOUtils.readBool(byteBuffer)) {
+        filter = Optional.of(Symbol.deserialize(byteBuffer));
+      }
+      Optional<OrderingScheme> orderingScheme = Optional.empty();
+      if (ReadWriteIOUtils.readBool(byteBuffer)) {
+        orderingScheme = Optional.of(OrderingScheme.deserialize(byteBuffer));
+      }
+      Optional<Symbol> mask = Optional.empty();
+      if (ReadWriteIOUtils.readBool(byteBuffer)) {
+        mask = Optional.of(Symbol.deserialize(byteBuffer));
+      }
+      return new Aggregation(function, arguments, distinct, filter, 
orderingScheme, mask);
+    }
   }
 
   public static Builder builderFrom(AggregationNode node) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
index 2227360a2da..b33e9b35d4c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
@@ -30,7 +31,13 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -248,6 +255,25 @@ public class AggregationTableScanNode extends 
TableScanNode {
         aggregationNode.getGroupIdSymbol());
   }
 
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    AggregationTableScanNode that = (AggregationTableScanNode) o;
+    return Objects.equals(projection, that.projection)
+        && Objects.equals(aggregations, that.aggregations)
+        && Objects.equals(groupingSets, that.groupingSets)
+        && Objects.equals(step, that.step)
+        && Objects.equals(groupIdSymbol, that.groupIdSymbol);
+  }
+
   @Override
   public int hashCode() {
     return Objects.hash(
@@ -258,4 +284,253 @@ public class AggregationTableScanNode extends 
TableScanNode {
   public String toString() {
     return "AggregationTableScanNode-" + this.getPlanNodeId();
   }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {
+    PlanNodeType.TABLE_AGGREGATION_TABLE_SCAN_NODE.serialize(byteBuffer);
+
+    if (qualifiedObjectName.getDatabaseName() != null) {
+      ReadWriteIOUtils.write(true, byteBuffer);
+      ReadWriteIOUtils.write(qualifiedObjectName.getDatabaseName(), 
byteBuffer);
+    } else {
+      ReadWriteIOUtils.write(false, byteBuffer);
+    }
+    ReadWriteIOUtils.write(qualifiedObjectName.getObjectName(), byteBuffer);
+
+    ReadWriteIOUtils.write(assignments.size(), byteBuffer);
+    for (Map.Entry<Symbol, ColumnSchema> entry : assignments.entrySet()) {
+      Symbol.serialize(entry.getKey(), byteBuffer);
+      ColumnSchema.serialize(entry.getValue(), byteBuffer);
+    }
+
+    ReadWriteIOUtils.write(deviceEntries.size(), byteBuffer);
+    for (DeviceEntry entry : deviceEntries) {
+      entry.serialize(byteBuffer);
+    }
+
+    ReadWriteIOUtils.write(idAndAttributeIndexMap.size(), byteBuffer);
+    for (Map.Entry<Symbol, Integer> entry : idAndAttributeIndexMap.entrySet()) 
{
+      Symbol.serialize(entry.getKey(), byteBuffer);
+      ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+    }
+
+    ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
+
+    if (timePredicate != null) {
+      ReadWriteIOUtils.write(true, byteBuffer);
+      Expression.serialize(timePredicate, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write(false, byteBuffer);
+    }
+
+    if (pushDownPredicate != null) {
+      ReadWriteIOUtils.write(true, byteBuffer);
+      Expression.serialize(pushDownPredicate, byteBuffer);
+    } else {
+      ReadWriteIOUtils.write(false, byteBuffer);
+    }
+
+    ReadWriteIOUtils.write(pushDownLimit, byteBuffer);
+    ReadWriteIOUtils.write(pushDownOffset, byteBuffer);
+    ReadWriteIOUtils.write(pushLimitToEachDevice, byteBuffer);
+
+    ReadWriteIOUtils.write(projection.getMap().size(), byteBuffer);
+    for (Map.Entry<Symbol, Expression> entry : projection.getMap().entrySet()) 
{
+      Symbol.serialize(entry.getKey(), byteBuffer);
+      Expression.serialize(entry.getValue(), byteBuffer);
+    }
+
+    ReadWriteIOUtils.write(aggregations.size(), byteBuffer);
+    for (Map.Entry<Symbol, AggregationNode.Aggregation> aggregation : 
aggregations.entrySet()) {
+      Symbol.serialize(aggregation.getKey(), byteBuffer);
+      aggregation.getValue().serialize(byteBuffer);
+    }
+
+    groupingSets.serialize(byteBuffer);
+
+    ReadWriteIOUtils.write(preGroupedSymbols.size(), byteBuffer);
+    for (Symbol preGroupedSymbol : preGroupedSymbols) {
+      Symbol.serialize(preGroupedSymbol, byteBuffer);
+    }
+
+    step.serialize(byteBuffer);
+
+    ReadWriteIOUtils.write(groupIdSymbol.isPresent(), byteBuffer);
+    if (groupIdSymbol.isPresent()) {
+      Symbol.serialize(groupIdSymbol.get(), byteBuffer);
+    }
+  }
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {
+    PlanNodeType.TABLE_AGGREGATION_TABLE_SCAN_NODE.serialize(stream);
+    if (qualifiedObjectName.getDatabaseName() != null) {
+      ReadWriteIOUtils.write(true, stream);
+      ReadWriteIOUtils.write(qualifiedObjectName.getDatabaseName(), stream);
+    } else {
+      ReadWriteIOUtils.write(false, stream);
+    }
+    ReadWriteIOUtils.write(qualifiedObjectName.getObjectName(), stream);
+
+    ReadWriteIOUtils.write(assignments.size(), stream);
+    for (Map.Entry<Symbol, ColumnSchema> entry : assignments.entrySet()) {
+      Symbol.serialize(entry.getKey(), stream);
+      ColumnSchema.serialize(entry.getValue(), stream);
+    }
+
+    ReadWriteIOUtils.write(deviceEntries.size(), stream);
+    for (DeviceEntry entry : deviceEntries) {
+      entry.serialize(stream);
+    }
+
+    ReadWriteIOUtils.write(idAndAttributeIndexMap.size(), stream);
+    for (Map.Entry<Symbol, Integer> entry : idAndAttributeIndexMap.entrySet()) 
{
+      Symbol.serialize(entry.getKey(), stream);
+      ReadWriteIOUtils.write(entry.getValue(), stream);
+    }
+
+    ReadWriteIOUtils.write(scanOrder.ordinal(), stream);
+
+    if (timePredicate != null) {
+      ReadWriteIOUtils.write(true, stream);
+      Expression.serialize(timePredicate, stream);
+    } else {
+      ReadWriteIOUtils.write(false, stream);
+    }
+
+    if (pushDownPredicate != null) {
+      ReadWriteIOUtils.write(true, stream);
+      Expression.serialize(pushDownPredicate, stream);
+    } else {
+      ReadWriteIOUtils.write(false, stream);
+    }
+
+    ReadWriteIOUtils.write(pushDownLimit, stream);
+    ReadWriteIOUtils.write(pushDownOffset, stream);
+    ReadWriteIOUtils.write(pushLimitToEachDevice, stream);
+
+    ReadWriteIOUtils.write(projection.getMap().size(), stream);
+    for (Map.Entry<Symbol, Expression> entry : projection.getMap().entrySet()) 
{
+      Symbol.serialize(entry.getKey(), stream);
+      Expression.serialize(entry.getValue(), stream);
+    }
+
+    ReadWriteIOUtils.write(aggregations.size(), stream);
+    for (Map.Entry<Symbol, AggregationNode.Aggregation> aggregation : 
aggregations.entrySet()) {
+      Symbol.serialize(aggregation.getKey(), stream);
+      aggregation.getValue().serialize(stream);
+    }
+
+    groupingSets.serialize(stream);
+
+    ReadWriteIOUtils.write(preGroupedSymbols.size(), stream);
+    for (Symbol preGroupedSymbol : preGroupedSymbols) {
+      Symbol.serialize(preGroupedSymbol, stream);
+    }
+
+    step.serialize(stream);
+
+    ReadWriteIOUtils.write(groupIdSymbol.isPresent(), stream);
+    if (groupIdSymbol.isPresent()) {
+      Symbol.serialize(groupIdSymbol.get(), stream);
+    }
+  }
+
+  public static AggregationTableScanNode deserialize(ByteBuffer byteBuffer) {
+    boolean hasDatabaseName = ReadWriteIOUtils.readBool(byteBuffer);
+    String databaseName = null;
+    if (hasDatabaseName) {
+      databaseName = ReadWriteIOUtils.readString(byteBuffer);
+    }
+    String tableName = ReadWriteIOUtils.readString(byteBuffer);
+    QualifiedObjectName qualifiedObjectName = new 
QualifiedObjectName(databaseName, tableName);
+
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    Map<Symbol, ColumnSchema> assignments = new HashMap<>(size);
+    for (int i = 0; i < size; i++) {
+      assignments.put(Symbol.deserialize(byteBuffer), 
ColumnSchema.deserialize(byteBuffer));
+    }
+
+    size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<DeviceEntry> deviceEntries = new ArrayList<>(size);
+    while (size-- > 0) {
+      deviceEntries.add(DeviceEntry.deserialize(byteBuffer));
+    }
+
+    size = ReadWriteIOUtils.readInt(byteBuffer);
+    Map<Symbol, Integer> idAndAttributeIndexMap = new HashMap<>(size);
+    while (size-- > 0) {
+      idAndAttributeIndexMap.put(
+          Symbol.deserialize(byteBuffer), 
ReadWriteIOUtils.readInt(byteBuffer));
+    }
+
+    Ordering scanOrder = 
Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+
+    Expression timePredicate = null;
+    boolean hasTimePredicate = ReadWriteIOUtils.readBool(byteBuffer);
+    if (hasTimePredicate) {
+      timePredicate = Expression.deserialize(byteBuffer);
+    }
+
+    Expression pushDownPredicate = null;
+    boolean hasPushDownPredicate = ReadWriteIOUtils.readBool(byteBuffer);
+    if (hasPushDownPredicate) {
+      pushDownPredicate = Expression.deserialize(byteBuffer);
+    }
+
+    long pushDownLimit = ReadWriteIOUtils.readLong(byteBuffer);
+    long pushDownOffset = ReadWriteIOUtils.readLong(byteBuffer);
+    boolean pushLimitToEachDevice = ReadWriteIOUtils.readBool(byteBuffer);
+
+    size = ReadWriteIOUtils.readInt(byteBuffer);
+    Assignments.Builder projection = Assignments.builder();
+    while (size-- > 0) {
+      projection.put(Symbol.deserialize(byteBuffer), 
Expression.deserialize(byteBuffer));
+    }
+
+    size = ReadWriteIOUtils.readInt(byteBuffer);
+    final Map<Symbol, AggregationNode.Aggregation> aggregations = new 
HashMap<>(size);
+    while (size-- > 0) {
+      aggregations.put(
+          Symbol.deserialize(byteBuffer), 
AggregationNode.Aggregation.deserialize(byteBuffer));
+    }
+
+    AggregationNode.GroupingSetDescriptor groupingSetDescriptor =
+        AggregationNode.GroupingSetDescriptor.deserialize(byteBuffer);
+
+    size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<Symbol> preGroupedSymbols = new ArrayList<>(size);
+    while (size-- > 0) {
+      preGroupedSymbols.add(Symbol.deserialize(byteBuffer));
+    }
+
+    AggregationNode.Step step = AggregationNode.Step.deserialize(byteBuffer);
+
+    Optional<Symbol> groupIdSymbol = Optional.empty();
+    if (ReadWriteIOUtils.readBool(byteBuffer)) {
+      groupIdSymbol = Optional.of(Symbol.deserialize(byteBuffer));
+    }
+
+    PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+
+    return new AggregationTableScanNode(
+        planNodeId,
+        qualifiedObjectName,
+        null,
+        assignments,
+        deviceEntries,
+        idAndAttributeIndexMap,
+        scanOrder,
+        timePredicate,
+        pushDownPredicate,
+        pushDownLimit,
+        pushDownOffset,
+        pushLimitToEachDevice,
+        projection.build(),
+        aggregations,
+        groupingSetDescriptor,
+        preGroupedSymbols,
+        step,
+        groupIdSymbol);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/TypeUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/TypeUtil.java
new file mode 100644
index 00000000000..21bbcd4a811
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/TypeUtil.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.relational.utils;
+
+import org.apache.tsfile.read.common.type.BinaryType;
+import org.apache.tsfile.read.common.type.BlobType;
+import org.apache.tsfile.read.common.type.BooleanType;
+import org.apache.tsfile.read.common.type.DateType;
+import org.apache.tsfile.read.common.type.DoubleType;
+import org.apache.tsfile.read.common.type.FloatType;
+import org.apache.tsfile.read.common.type.RowType;
+import org.apache.tsfile.read.common.type.StringType;
+import org.apache.tsfile.read.common.type.TimestampType;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.read.common.type.TypeEnum;
+import org.apache.tsfile.read.common.type.UnknownType;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.tsfile.read.common.type.IntType.INT32;
+import static org.apache.tsfile.read.common.type.LongType.INT64;
+
+public class TypeUtil {
+
+  public static void serialize(Type type, ByteBuffer byteBuffer) {
+    ReadWriteIOUtils.write(type.getTypeEnum().ordinal(), byteBuffer);
+    List<Type> typeParameters = type.getTypeParameters();
+    ReadWriteIOUtils.write(typeParameters.size(), byteBuffer);
+    for (Type typeParameter : typeParameters) {
+      ReadWriteIOUtils.write(typeParameter.getTypeEnum().ordinal(), 
byteBuffer);
+    }
+  }
+
+  public static void serialize(Type type, DataOutputStream stream) throws 
IOException {
+    ReadWriteIOUtils.write(type.getTypeEnum().ordinal(), stream);
+    List<Type> typeParameters = type.getTypeParameters();
+    ReadWriteIOUtils.write(typeParameters.size(), stream);
+    for (Type typeParameter : typeParameters) {
+      ReadWriteIOUtils.write(typeParameter.getTypeEnum().ordinal(), stream);
+    }
+  }
+
+  public static Type deserialize(ByteBuffer byteBuffer) {
+    TypeEnum typeEnum = 
TypeEnum.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+    int size = ReadWriteIOUtils.readInt(byteBuffer);
+    List<Type> typeParameters = new ArrayList<>(size);
+    while (size-- > 0) {
+      typeParameters.add(
+          getType(
+              TypeEnum.values()[ReadWriteIOUtils.readInt(byteBuffer)], 
Collections.emptyList()));
+    }
+    return getType(typeEnum, typeParameters);
+  }
+
+  public static Type getType(TypeEnum typeEnum, List<Type> subTypes) {
+    switch (typeEnum) {
+      case BOOLEAN:
+        return BooleanType.BOOLEAN;
+      case INT32:
+        return INT32;
+      case INT64:
+        return INT64;
+      case FLOAT:
+        return FloatType.FLOAT;
+      case DOUBLE:
+        return DoubleType.DOUBLE;
+      case TEXT:
+        return BinaryType.TEXT;
+      case STRING:
+        return StringType.STRING;
+      case BLOB:
+        return BlobType.BLOB;
+      case TIMESTAMP:
+        return TimestampType.TIMESTAMP;
+      case DATE:
+        return DateType.DATE;
+      case ROW:
+        return RowType.anonymous(subTypes);
+      default:
+        return UnknownType.UNKNOWN;
+    }
+  }
+}


Reply via email to