This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fc965db8734 Add serialize and deserialize method for Agg-related Node
fc965db8734 is described below
commit fc965db873482dc6d139fb8d128d2fab51626bf3
Author: Weihao Li <[email protected]>
AuthorDate: Tue Oct 8 19:26:58 2024 +0800
Add serialize and deserialize method for Agg-related Node
---
.../db/queryengine/plan/analyze/TypeProvider.java | 10 +-
.../plan/planner/plan/node/PlanNodeType.java | 8 +
.../plan/relational/function/BoundSignature.java | 36 +++
.../plan/relational/function/FunctionKind.java | 20 +-
.../plan/relational/metadata/ColumnSchema.java | 47 +---
.../relational/metadata/FunctionNullability.java | 31 +++
.../plan/relational/metadata/ResolvedFunction.java | 32 ++-
.../relational/planner/node/AggregationNode.java | 219 +++++++++++++++-
.../planner/node/AggregationTableScanNode.java | 275 +++++++++++++++++++++
.../plan/relational/utils/TypeUtil.java | 105 ++++++++
10 files changed, 730 insertions(+), 53 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
index d599e702d42..da39e926815 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java
@@ -20,12 +20,11 @@
package org.apache.iotdb.db.queryengine.plan.analyze;
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.TypeUtil;
import com.google.common.collect.ImmutableMap;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.type.Type;
-import org.apache.tsfile.read.common.type.TypeEnum;
-import org.apache.tsfile.read.common.type.TypeFactory;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -162,7 +161,7 @@ public class TypeProvider {
ReadWriteIOUtils.write(tableModelTypes.size(), byteBuffer);
for (Map.Entry<Symbol, Type> entry : tableModelTypes.entrySet()) {
ReadWriteIOUtils.write(entry.getKey().getName(), byteBuffer);
- ReadWriteIOUtils.write(entry.getValue().getTypeEnum().ordinal(),
byteBuffer);
+ TypeUtil.serialize(entry.getValue(), byteBuffer);
}
}
}
@@ -190,7 +189,7 @@ public class TypeProvider {
ReadWriteIOUtils.write(tableModelTypes.size(), stream);
for (Map.Entry<Symbol, Type> entry : tableModelTypes.entrySet()) {
ReadWriteIOUtils.write(entry.getKey().getName(), stream);
- ReadWriteIOUtils.write(entry.getValue().getTypeEnum().ordinal(),
stream);
+ TypeUtil.serialize(entry.getValue(), stream);
}
}
}
@@ -218,8 +217,7 @@ public class TypeProvider {
tableModelTypes = new HashMap<>(mapSize);
while (mapSize > 0) {
tableModelTypes.put(
- new Symbol(ReadWriteIOUtils.readString(byteBuffer)),
-
TypeFactory.getType(TypeEnum.values()[ReadWriteIOUtils.readInt(byteBuffer)]));
+ new Symbol(ReadWriteIOUtils.readString(byteBuffer)),
TypeUtil.deserialize(byteBuffer));
mapSize--;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
index 26d0d4c82ea..f6ea426a627 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java
@@ -251,6 +251,8 @@ public enum PlanNodeType {
TABLE_COLLECT_NODE((short) 1009),
TABLE_STREAM_SORT_NODE((short) 1010),
TABLE_JOIN_NODE((short) 1011),
+ TABLE_AGGREGATION_NODE((short) 1012),
+ TABLE_AGGREGATION_TABLE_SCAN_NODE((short) 1013),
RELATIONAL_INSERT_TABLET((short) 2000),
RELATIONAL_INSERT_ROW((short) 2001),
@@ -567,6 +569,12 @@ public enum PlanNodeType {
case 1011:
return
org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode.deserialize(
buffer);
+ case 1012:
+ return
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode
+ .deserialize(buffer);
+ case 1013:
+ return
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode
+ .deserialize(buffer);
case 2000:
return RelationalInsertTabletNode.deserialize(buffer);
case 2001:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/BoundSignature.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/BoundSignature.java
index 883af02640c..6347209d722 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/BoundSignature.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/BoundSignature.java
@@ -19,8 +19,15 @@
package org.apache.iotdb.db.queryengine.plan.relational.function;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.TypeUtil;
+
import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@@ -94,4 +101,33 @@ public class BoundSignature {
+ argumentTypes.stream().map(Type::toString).collect(joining(", ",
"(", "):"))
+ returnType;
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(functionName, byteBuffer);
+ TypeUtil.serialize(returnType, byteBuffer);
+ ReadWriteIOUtils.write(argumentTypes.size(), byteBuffer);
+ for (Type type : argumentTypes) {
+ TypeUtil.serialize(type, byteBuffer);
+ }
+ }
+
+ public void serialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(functionName, stream);
+ TypeUtil.serialize(returnType, stream);
+ ReadWriteIOUtils.write(argumentTypes.size(), stream);
+ for (Type type : argumentTypes) {
+ TypeUtil.serialize(type, stream);
+ }
+ }
+
+ public static BoundSignature deserialize(ByteBuffer byteBuffer) {
+ String functionName = ReadWriteIOUtils.readString(byteBuffer);
+ Type returnType = TypeUtil.deserialize(byteBuffer);
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Type> argumentTypes = new ArrayList<>(size);
+ while (size-- > 0) {
+ argumentTypes.add(TypeUtil.deserialize(byteBuffer));
+ }
+ return new BoundSignature(functionName, returnType, argumentTypes);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/FunctionKind.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/FunctionKind.java
index 1e7c3da34a7..1e336502af9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/FunctionKind.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/FunctionKind.java
@@ -19,9 +19,27 @@
package org.apache.iotdb.db.queryengine.plan.relational.function;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
public enum FunctionKind {
SCALAR,
AGGREGATE,
WINDOW,
- TABLE,
+ TABLE;
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(ordinal(), byteBuffer);
+ }
+
+ public void serialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(ordinal(), stream);
+ }
+
+ public static FunctionKind deserialize(ByteBuffer byteBuffer) {
+ return FunctionKind.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
index ed224017b12..ea9e1f1f4c7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java
@@ -21,19 +21,10 @@ package
org.apache.iotdb.db.queryengine.plan.relational.metadata;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
+import org.apache.iotdb.db.queryengine.plan.relational.utils.TypeUtil;
-import org.apache.tsfile.read.common.type.BinaryType;
-import org.apache.tsfile.read.common.type.BlobType;
-import org.apache.tsfile.read.common.type.BooleanType;
-import org.apache.tsfile.read.common.type.DateType;
-import org.apache.tsfile.read.common.type.DoubleType;
-import org.apache.tsfile.read.common.type.FloatType;
-import org.apache.tsfile.read.common.type.StringType;
-import org.apache.tsfile.read.common.type.TimestampType;
import org.apache.tsfile.read.common.type.Type;
-import org.apache.tsfile.read.common.type.TypeEnum;
import org.apache.tsfile.read.common.type.TypeFactory;
-import org.apache.tsfile.read.common.type.UnknownType;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
@@ -44,8 +35,6 @@ import java.util.StringJoiner;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
-import static org.apache.tsfile.read.common.type.IntType.INT32;
-import static org.apache.tsfile.read.common.type.LongType.INT64;
public class ColumnSchema {
private final String name;
@@ -107,7 +96,7 @@ public class ColumnSchema {
public static void serialize(ColumnSchema columnSchema, ByteBuffer
byteBuffer) {
ReadWriteIOUtils.write(columnSchema.getName(), byteBuffer);
- ReadWriteIOUtils.write(columnSchema.getType().getTypeEnum().ordinal(),
byteBuffer);
+ TypeUtil.serialize(columnSchema.getType(), byteBuffer);
columnSchema.getColumnCategory().serialize(byteBuffer);
ReadWriteIOUtils.write(columnSchema.isHidden(), byteBuffer);
}
@@ -115,48 +104,20 @@ public class ColumnSchema {
public static void serialize(ColumnSchema columnSchema, DataOutputStream
stream)
throws IOException {
ReadWriteIOUtils.write(columnSchema.getName(), stream);
- ReadWriteIOUtils.write(columnSchema.getType().getTypeEnum().ordinal(),
stream);
+ TypeUtil.serialize(columnSchema.getType(), stream);
columnSchema.getColumnCategory().serialize(stream);
ReadWriteIOUtils.write(columnSchema.isHidden(), stream);
}
public static ColumnSchema deserialize(ByteBuffer byteBuffer) {
String name = ReadWriteIOUtils.readString(byteBuffer);
- TypeEnum typeEnum =
TypeEnum.values()[ReadWriteIOUtils.readInt(byteBuffer)];
- Type type = getType(typeEnum);
+ Type type = TypeUtil.deserialize(byteBuffer);
TsTableColumnCategory columnCategory =
TsTableColumnCategory.deserialize(byteBuffer);
boolean isHidden = ReadWriteIOUtils.readBool(byteBuffer);
return new ColumnSchema(name, type, isHidden, columnCategory);
}
- public static Type getType(TypeEnum typeEnum) {
- switch (typeEnum) {
- case BOOLEAN:
- return BooleanType.BOOLEAN;
- case INT32:
- return INT32;
- case INT64:
- return INT64;
- case FLOAT:
- return FloatType.FLOAT;
- case DOUBLE:
- return DoubleType.DOUBLE;
- case TEXT:
- return BinaryType.TEXT;
- case STRING:
- return StringType.STRING;
- case BLOB:
- return BlobType.BLOB;
- case TIMESTAMP:
- return TimestampType.TIMESTAMP;
- case DATE:
- return DateType.DATE;
- default:
- return UnknownType.UNKNOWN;
- }
- }
-
public static ColumnSchema ofTsColumnSchema(TsTableColumnSchema schema) {
return new ColumnSchema(
schema.getColumnName(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/FunctionNullability.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/FunctionNullability.java
index 417b9a12126..d838d335b8d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/FunctionNullability.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/FunctionNullability.java
@@ -19,6 +19,11 @@
package org.apache.iotdb.db.queryengine.plan.relational.metadata;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -80,4 +85,30 @@ public class FunctionNullability {
return argumentNullable.stream().map(Objects::toString).collect(joining(",
", "(", ")"))
+ returnNullable;
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(returnNullable, byteBuffer);
+ ReadWriteIOUtils.write(argumentNullable.size(), byteBuffer);
+ for (boolean eachNullable : argumentNullable) {
+ ReadWriteIOUtils.write(eachNullable, byteBuffer);
+ }
+ }
+
+ public void serialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(returnNullable, stream);
+ ReadWriteIOUtils.write(argumentNullable.size(), stream);
+ for (boolean eachNullable : argumentNullable) {
+ ReadWriteIOUtils.write(eachNullable, stream);
+ }
+ }
+
+ public static FunctionNullability deserialize(ByteBuffer byteBuffer) {
+ boolean returnNullable = ReadWriteIOUtils.readBool(byteBuffer);
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Boolean> argumentNullable = new ArrayList<>(size);
+ while (size-- > 0) {
+ argumentNullable.add(ReadWriteIOUtils.readBool(byteBuffer));
+ }
+ return new FunctionNullability(returnNullable, argumentNullable);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ResolvedFunction.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ResolvedFunction.java
index ec30b1c78ae..432488612f2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ResolvedFunction.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ResolvedFunction.java
@@ -24,6 +24,11 @@ import
org.apache.iotdb.db.queryengine.plan.relational.function.FunctionId;
import org.apache.iotdb.db.queryengine.plan.relational.function.FunctionKind;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Objects;
import static java.util.Objects.requireNonNull;
@@ -47,7 +52,6 @@ public class ResolvedFunction {
this.functionKind = requireNonNull(functionKind, "functionKind is null");
this.deterministic = deterministic;
this.functionNullability = requireNonNull(functionNullability,
"functionNullability is null");
- ;
}
public BoundSignature getSignature() {
@@ -113,4 +117,30 @@ public class ResolvedFunction {
public String toString() {
return signature.toString();
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ signature.serialize(byteBuffer);
+ ReadWriteIOUtils.write(functionId.toString(), byteBuffer);
+ functionKind.serialize(byteBuffer);
+ ReadWriteIOUtils.write(deterministic, byteBuffer);
+ functionNullability.serialize(byteBuffer);
+ }
+
+ public void serialize(DataOutputStream stream) throws IOException {
+ signature.serialize(stream);
+ ReadWriteIOUtils.write(functionId.toString(), stream);
+ functionKind.serialize(stream);
+ ReadWriteIOUtils.write(deterministic, stream);
+ functionNullability.serialize(stream);
+ }
+
+ public static ResolvedFunction deserialize(ByteBuffer byteBuffer) {
+ BoundSignature signature = BoundSignature.deserialize(byteBuffer);
+ FunctionId functionId = new
FunctionId(ReadWriteIOUtils.readString(byteBuffer));
+ FunctionKind functionKind = FunctionKind.deserialize(byteBuffer);
+ boolean deterministic = ReadWriteIOUtils.readBool(byteBuffer);
+ FunctionNullability functionNullability =
FunctionNullability.deserialize(byteBuffer);
+ return new ResolvedFunction(
+ signature, functionId, functionKind, deterministic,
functionNullability);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationNode.java
index 831b1a3e512..9ab5ea86911 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationNode.java
@@ -17,6 +17,7 @@ import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata;
@@ -30,10 +31,13 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -213,10 +217,111 @@ public class AggregationNode extends
SingleChildProcessNode {
}
@Override
- protected void serializeAttributes(ByteBuffer byteBuffer) {}
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ AggregationNode that = (AggregationNode) o;
+ return Objects.equals(aggregations, that.aggregations)
+ && Objects.equals(groupingSets, that.groupingSets)
+ && Objects.equals(step, that.step)
+ && Objects.equals(hashSymbol, that.hashSymbol)
+ && Objects.equals(groupIdSymbol, that.groupIdSymbol);
+ }
@Override
- protected void serializeAttributes(DataOutputStream stream) throws
IOException {}
+ public int hashCode() {
+ return Objects.hash(
+ super.hashCode(), aggregations, groupingSets, step, hashSymbol,
groupIdSymbol);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.TABLE_AGGREGATION_NODE.serialize(byteBuffer);
+ ReadWriteIOUtils.write(aggregations.size(), byteBuffer);
+ aggregations.forEach(
+ (k, v) -> {
+ Symbol.serialize(k, byteBuffer);
+ v.serialize(byteBuffer);
+ });
+ groupingSets.serialize(byteBuffer);
+ ReadWriteIOUtils.write(preGroupedSymbols.size(), byteBuffer);
+ for (Symbol preGroupedSymbol : preGroupedSymbols) {
+ Symbol.serialize(preGroupedSymbol, byteBuffer);
+ }
+ step.serialize(byteBuffer);
+ ReadWriteIOUtils.write(hashSymbol.isPresent(), byteBuffer);
+ if (hashSymbol.isPresent()) {
+ Symbol.serialize(hashSymbol.get(), byteBuffer);
+ }
+ ReadWriteIOUtils.write(groupIdSymbol.isPresent(), byteBuffer);
+ if (groupIdSymbol.isPresent()) {
+ Symbol.serialize(groupIdSymbol.get(), byteBuffer);
+ }
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ PlanNodeType.TABLE_AGGREGATION_NODE.serialize(stream);
+ ReadWriteIOUtils.write(aggregations.size(), stream);
+ for (Map.Entry<Symbol, Aggregation> aggregation : aggregations.entrySet())
{
+ Symbol.serialize(aggregation.getKey(), stream);
+ aggregation.getValue().serialize(stream);
+ }
+ groupingSets.serialize(stream);
+ ReadWriteIOUtils.write(preGroupedSymbols.size(), stream);
+ for (Symbol preGroupedSymbol : preGroupedSymbols) {
+ Symbol.serialize(preGroupedSymbol, stream);
+ }
+ step.serialize(stream);
+ ReadWriteIOUtils.write(hashSymbol.isPresent(), stream);
+ if (hashSymbol.isPresent()) {
+ Symbol.serialize(hashSymbol.get(), stream);
+ }
+ ReadWriteIOUtils.write(groupIdSymbol.isPresent(), stream);
+ if (groupIdSymbol.isPresent()) {
+ Symbol.serialize(groupIdSymbol.get(), stream);
+ }
+ }
+
+ public static AggregationNode deserialize(ByteBuffer byteBuffer) {
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ final Map<Symbol, Aggregation> aggregations = new HashMap<>(size);
+ while (size-- > 0) {
+ aggregations.put(Symbol.deserialize(byteBuffer),
Aggregation.deserialize(byteBuffer));
+ }
+ GroupingSetDescriptor groupingSetDescriptor =
GroupingSetDescriptor.deserialize(byteBuffer);
+ size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Symbol> preGroupedSymbols = new ArrayList<>(size);
+ while (size-- > 0) {
+ preGroupedSymbols.add(Symbol.deserialize(byteBuffer));
+ }
+ Step step = Step.deserialize(byteBuffer);
+ Optional<Symbol> hashSymbol = Optional.empty();
+ if (ReadWriteIOUtils.readBool(byteBuffer)) {
+ hashSymbol = Optional.of(Symbol.deserialize(byteBuffer));
+ }
+ Optional<Symbol> groupIdSymbol = Optional.empty();
+ if (ReadWriteIOUtils.readBool(byteBuffer)) {
+ groupIdSymbol = Optional.of(Symbol.deserialize(byteBuffer));
+ }
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new AggregationNode(
+ planNodeId,
+ null,
+ aggregations,
+ groupingSetDescriptor,
+ preGroupedSymbols,
+ step,
+ hashSymbol,
+ groupIdSymbol);
+ }
@Override
public PlanNode replaceChildren(List<PlanNode> newChildren) {
@@ -323,6 +428,45 @@ public class AggregationNode extends
SingleChildProcessNode {
public Set<Integer> getGlobalGroupingSets() {
return globalGroupingSets;
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(groupingKeys.size(), byteBuffer);
+ for (Symbol symbol : groupingKeys) {
+ Symbol.serialize(symbol, byteBuffer);
+ }
+ ReadWriteIOUtils.write(groupingSetCount, byteBuffer);
+ ReadWriteIOUtils.write(globalGroupingSets.size(), byteBuffer);
+ for (int globalGroupingSet : globalGroupingSets) {
+ ReadWriteIOUtils.write(globalGroupingSet, byteBuffer);
+ }
+ }
+
+ public void serialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(groupingKeys.size(), stream);
+ for (Symbol symbol : groupingKeys) {
+ Symbol.serialize(symbol, stream);
+ }
+ ReadWriteIOUtils.write(groupingSetCount, stream);
+ ReadWriteIOUtils.write(globalGroupingSets.size(), stream);
+ for (int globalGroupingSet : globalGroupingSets) {
+ ReadWriteIOUtils.write(globalGroupingSet, stream);
+ }
+ }
+
+ public static GroupingSetDescriptor deserialize(ByteBuffer byteBuffer) {
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Symbol> groupingKeys = new ArrayList<>(size);
+ while (size-- > 0) {
+ groupingKeys.add(Symbol.deserialize(byteBuffer));
+ }
+ int groupingSetCount = ReadWriteIOUtils.readInt(byteBuffer);
+ size = ReadWriteIOUtils.readInt(byteBuffer);
+ Set<Integer> globalGroupingSets = new HashSet<>(size);
+ while (size-- > 0) {
+ globalGroupingSets.add(ReadWriteIOUtils.readInt(byteBuffer));
+ }
+ return new GroupingSetDescriptor(groupingKeys, groupingSetCount,
globalGroupingSets);
+ }
}
public enum Step {
@@ -360,6 +504,18 @@ public class AggregationNode extends
SingleChildProcessNode {
}
return Step.FINAL;
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(ordinal(), byteBuffer);
+ }
+
+ public void serialize(DataOutputStream stream) throws IOException {
+ ReadWriteIOUtils.write(ordinal(), stream);
+ }
+
+ public static Step deserialize(ByteBuffer byteBuffer) {
+ return Step.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ }
}
public static class Aggregation {
@@ -459,6 +615,65 @@ public class AggregationNode extends
SingleChildProcessNode {
expectedArgumentCount,
arguments.size());
}
+
+ public void serialize(ByteBuffer byteBuffer) {
+ resolvedFunction.serialize(byteBuffer);
+ ReadWriteIOUtils.write(arguments.size(), byteBuffer);
+ for (Expression argument : arguments) {
+ Expression.serialize(argument, byteBuffer);
+ }
+ ReadWriteIOUtils.write(distinct, byteBuffer);
+ ReadWriteIOUtils.write(filter.isPresent(), byteBuffer);
+ filter.ifPresent(symbol -> Symbol.serialize(symbol, byteBuffer));
+ ReadWriteIOUtils.write(orderingScheme.isPresent(), byteBuffer);
+ orderingScheme.ifPresent(scheme -> scheme.serialize(byteBuffer));
+ ReadWriteIOUtils.write(mask.isPresent(), byteBuffer);
+ mask.ifPresent(symbol -> Symbol.serialize(symbol, byteBuffer));
+ }
+
+ public void serialize(DataOutputStream stream) throws IOException {
+ resolvedFunction.serialize(stream);
+ ReadWriteIOUtils.write(arguments.size(), stream);
+ for (Expression argument : arguments) {
+ Expression.serialize(argument, stream);
+ }
+ ReadWriteIOUtils.write(distinct, stream);
+ ReadWriteIOUtils.write(filter.isPresent(), stream);
+ if (filter.isPresent()) {
+ Symbol.serialize(filter.get(), stream);
+ }
+ ReadWriteIOUtils.write(orderingScheme.isPresent(), stream);
+ if (orderingScheme.isPresent()) {
+ orderingScheme.get().serialize(stream);
+ }
+ ReadWriteIOUtils.write(mask.isPresent(), stream);
+ if (mask.isPresent()) {
+ Symbol.serialize(mask.get(), stream);
+ }
+ }
+
+ public static Aggregation deserialize(ByteBuffer byteBuffer) {
+ ResolvedFunction function = ResolvedFunction.deserialize(byteBuffer);
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Expression> arguments = new ArrayList<>(size);
+ while (size-- > 0) {
+ arguments.add(Expression.deserialize(byteBuffer));
+ }
+ boolean distinct = ReadWriteIOUtils.readBool(byteBuffer);
+ Optional<Symbol> filter = Optional.empty();
+ if (ReadWriteIOUtils.readBool(byteBuffer)) {
+ filter = Optional.of(Symbol.deserialize(byteBuffer));
+ }
+ Optional<OrderingScheme> orderingScheme = Optional.empty();
+ if (ReadWriteIOUtils.readBool(byteBuffer)) {
+ orderingScheme = Optional.of(OrderingScheme.deserialize(byteBuffer));
+ }
+ Optional<Symbol> mask = Optional.empty();
+ if (ReadWriteIOUtils.readBool(byteBuffer)) {
+ mask = Optional.of(Symbol.deserialize(byteBuffer));
+ }
+ return new Aggregation(function, arguments, distinct, filter,
orderingScheme, mask);
+ }
}
public static Builder builderFrom(AggregationNode node) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
index 2227360a2da..b33e9b35d4c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AggregationTableScanNode.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
@@ -30,7 +31,13 @@ import
org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -248,6 +255,25 @@ public class AggregationTableScanNode extends
TableScanNode {
aggregationNode.getGroupIdSymbol());
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ AggregationTableScanNode that = (AggregationTableScanNode) o;
+ return Objects.equals(projection, that.projection)
+ && Objects.equals(aggregations, that.aggregations)
+ && Objects.equals(groupingSets, that.groupingSets)
+ && Objects.equals(step, that.step)
+ && Objects.equals(groupIdSymbol, that.groupIdSymbol);
+ }
+
@Override
public int hashCode() {
return Objects.hash(
@@ -258,4 +284,253 @@ public class AggregationTableScanNode extends
TableScanNode {
public String toString() {
return "AggregationTableScanNode-" + this.getPlanNodeId();
}
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.TABLE_AGGREGATION_TABLE_SCAN_NODE.serialize(byteBuffer);
+
+ if (qualifiedObjectName.getDatabaseName() != null) {
+ ReadWriteIOUtils.write(true, byteBuffer);
+ ReadWriteIOUtils.write(qualifiedObjectName.getDatabaseName(),
byteBuffer);
+ } else {
+ ReadWriteIOUtils.write(false, byteBuffer);
+ }
+ ReadWriteIOUtils.write(qualifiedObjectName.getObjectName(), byteBuffer);
+
+ ReadWriteIOUtils.write(assignments.size(), byteBuffer);
+ for (Map.Entry<Symbol, ColumnSchema> entry : assignments.entrySet()) {
+ Symbol.serialize(entry.getKey(), byteBuffer);
+ ColumnSchema.serialize(entry.getValue(), byteBuffer);
+ }
+
+ ReadWriteIOUtils.write(deviceEntries.size(), byteBuffer);
+ for (DeviceEntry entry : deviceEntries) {
+ entry.serialize(byteBuffer);
+ }
+
+ ReadWriteIOUtils.write(idAndAttributeIndexMap.size(), byteBuffer);
+ for (Map.Entry<Symbol, Integer> entry : idAndAttributeIndexMap.entrySet())
{
+ Symbol.serialize(entry.getKey(), byteBuffer);
+ ReadWriteIOUtils.write(entry.getValue(), byteBuffer);
+ }
+
+ ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer);
+
+ if (timePredicate != null) {
+ ReadWriteIOUtils.write(true, byteBuffer);
+ Expression.serialize(timePredicate, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write(false, byteBuffer);
+ }
+
+ if (pushDownPredicate != null) {
+ ReadWriteIOUtils.write(true, byteBuffer);
+ Expression.serialize(pushDownPredicate, byteBuffer);
+ } else {
+ ReadWriteIOUtils.write(false, byteBuffer);
+ }
+
+ ReadWriteIOUtils.write(pushDownLimit, byteBuffer);
+ ReadWriteIOUtils.write(pushDownOffset, byteBuffer);
+ ReadWriteIOUtils.write(pushLimitToEachDevice, byteBuffer);
+
+ ReadWriteIOUtils.write(projection.getMap().size(), byteBuffer);
+ for (Map.Entry<Symbol, Expression> entry : projection.getMap().entrySet())
{
+ Symbol.serialize(entry.getKey(), byteBuffer);
+ Expression.serialize(entry.getValue(), byteBuffer);
+ }
+
+ ReadWriteIOUtils.write(aggregations.size(), byteBuffer);
+ for (Map.Entry<Symbol, AggregationNode.Aggregation> aggregation :
aggregations.entrySet()) {
+ Symbol.serialize(aggregation.getKey(), byteBuffer);
+ aggregation.getValue().serialize(byteBuffer);
+ }
+
+ groupingSets.serialize(byteBuffer);
+
+ ReadWriteIOUtils.write(preGroupedSymbols.size(), byteBuffer);
+ for (Symbol preGroupedSymbol : preGroupedSymbols) {
+ Symbol.serialize(preGroupedSymbol, byteBuffer);
+ }
+
+ step.serialize(byteBuffer);
+
+ ReadWriteIOUtils.write(groupIdSymbol.isPresent(), byteBuffer);
+ if (groupIdSymbol.isPresent()) {
+ Symbol.serialize(groupIdSymbol.get(), byteBuffer);
+ }
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws
IOException {
+ PlanNodeType.TABLE_AGGREGATION_TABLE_SCAN_NODE.serialize(stream);
+ if (qualifiedObjectName.getDatabaseName() != null) {
+ ReadWriteIOUtils.write(true, stream);
+ ReadWriteIOUtils.write(qualifiedObjectName.getDatabaseName(), stream);
+ } else {
+ ReadWriteIOUtils.write(false, stream);
+ }
+ ReadWriteIOUtils.write(qualifiedObjectName.getObjectName(), stream);
+
+ ReadWriteIOUtils.write(assignments.size(), stream);
+ for (Map.Entry<Symbol, ColumnSchema> entry : assignments.entrySet()) {
+ Symbol.serialize(entry.getKey(), stream);
+ ColumnSchema.serialize(entry.getValue(), stream);
+ }
+
+ ReadWriteIOUtils.write(deviceEntries.size(), stream);
+ for (DeviceEntry entry : deviceEntries) {
+ entry.serialize(stream);
+ }
+
+ ReadWriteIOUtils.write(idAndAttributeIndexMap.size(), stream);
+ for (Map.Entry<Symbol, Integer> entry : idAndAttributeIndexMap.entrySet())
{
+ Symbol.serialize(entry.getKey(), stream);
+ ReadWriteIOUtils.write(entry.getValue(), stream);
+ }
+
+ ReadWriteIOUtils.write(scanOrder.ordinal(), stream);
+
+ if (timePredicate != null) {
+ ReadWriteIOUtils.write(true, stream);
+ Expression.serialize(timePredicate, stream);
+ } else {
+ ReadWriteIOUtils.write(false, stream);
+ }
+
+ if (pushDownPredicate != null) {
+ ReadWriteIOUtils.write(true, stream);
+ Expression.serialize(pushDownPredicate, stream);
+ } else {
+ ReadWriteIOUtils.write(false, stream);
+ }
+
+ ReadWriteIOUtils.write(pushDownLimit, stream);
+ ReadWriteIOUtils.write(pushDownOffset, stream);
+ ReadWriteIOUtils.write(pushLimitToEachDevice, stream);
+
+ ReadWriteIOUtils.write(projection.getMap().size(), stream);
+ for (Map.Entry<Symbol, Expression> entry : projection.getMap().entrySet())
{
+ Symbol.serialize(entry.getKey(), stream);
+ Expression.serialize(entry.getValue(), stream);
+ }
+
+ ReadWriteIOUtils.write(aggregations.size(), stream);
+ for (Map.Entry<Symbol, AggregationNode.Aggregation> aggregation :
aggregations.entrySet()) {
+ Symbol.serialize(aggregation.getKey(), stream);
+ aggregation.getValue().serialize(stream);
+ }
+
+ groupingSets.serialize(stream);
+
+ ReadWriteIOUtils.write(preGroupedSymbols.size(), stream);
+ for (Symbol preGroupedSymbol : preGroupedSymbols) {
+ Symbol.serialize(preGroupedSymbol, stream);
+ }
+
+ step.serialize(stream);
+
+ ReadWriteIOUtils.write(groupIdSymbol.isPresent(), stream);
+ if (groupIdSymbol.isPresent()) {
+ Symbol.serialize(groupIdSymbol.get(), stream);
+ }
+ }
+
+ public static AggregationTableScanNode deserialize(ByteBuffer byteBuffer) {
+ boolean hasDatabaseName = ReadWriteIOUtils.readBool(byteBuffer);
+ String databaseName = null;
+ if (hasDatabaseName) {
+ databaseName = ReadWriteIOUtils.readString(byteBuffer);
+ }
+ String tableName = ReadWriteIOUtils.readString(byteBuffer);
+ QualifiedObjectName qualifiedObjectName = new
QualifiedObjectName(databaseName, tableName);
+
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ Map<Symbol, ColumnSchema> assignments = new HashMap<>(size);
+ for (int i = 0; i < size; i++) {
+ assignments.put(Symbol.deserialize(byteBuffer),
ColumnSchema.deserialize(byteBuffer));
+ }
+
+ size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<DeviceEntry> deviceEntries = new ArrayList<>(size);
+ while (size-- > 0) {
+ deviceEntries.add(DeviceEntry.deserialize(byteBuffer));
+ }
+
+ size = ReadWriteIOUtils.readInt(byteBuffer);
+ Map<Symbol, Integer> idAndAttributeIndexMap = new HashMap<>(size);
+ while (size-- > 0) {
+ idAndAttributeIndexMap.put(
+ Symbol.deserialize(byteBuffer),
ReadWriteIOUtils.readInt(byteBuffer));
+ }
+
+ Ordering scanOrder =
Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+
+ Expression timePredicate = null;
+ boolean hasTimePredicate = ReadWriteIOUtils.readBool(byteBuffer);
+ if (hasTimePredicate) {
+ timePredicate = Expression.deserialize(byteBuffer);
+ }
+
+ Expression pushDownPredicate = null;
+ boolean hasPushDownPredicate = ReadWriteIOUtils.readBool(byteBuffer);
+ if (hasPushDownPredicate) {
+ pushDownPredicate = Expression.deserialize(byteBuffer);
+ }
+
+ long pushDownLimit = ReadWriteIOUtils.readLong(byteBuffer);
+ long pushDownOffset = ReadWriteIOUtils.readLong(byteBuffer);
+ boolean pushLimitToEachDevice = ReadWriteIOUtils.readBool(byteBuffer);
+
+ size = ReadWriteIOUtils.readInt(byteBuffer);
+ Assignments.Builder projection = Assignments.builder();
+ while (size-- > 0) {
+ projection.put(Symbol.deserialize(byteBuffer),
Expression.deserialize(byteBuffer));
+ }
+
+ size = ReadWriteIOUtils.readInt(byteBuffer);
+ final Map<Symbol, AggregationNode.Aggregation> aggregations = new
HashMap<>(size);
+ while (size-- > 0) {
+ aggregations.put(
+ Symbol.deserialize(byteBuffer),
AggregationNode.Aggregation.deserialize(byteBuffer));
+ }
+
+ AggregationNode.GroupingSetDescriptor groupingSetDescriptor =
+ AggregationNode.GroupingSetDescriptor.deserialize(byteBuffer);
+
+ size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Symbol> preGroupedSymbols = new ArrayList<>(size);
+ while (size-- > 0) {
+ preGroupedSymbols.add(Symbol.deserialize(byteBuffer));
+ }
+
+ AggregationNode.Step step = AggregationNode.Step.deserialize(byteBuffer);
+
+ Optional<Symbol> groupIdSymbol = Optional.empty();
+ if (ReadWriteIOUtils.readBool(byteBuffer)) {
+ groupIdSymbol = Optional.of(Symbol.deserialize(byteBuffer));
+ }
+
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+
+ return new AggregationTableScanNode(
+ planNodeId,
+ qualifiedObjectName,
+ null,
+ assignments,
+ deviceEntries,
+ idAndAttributeIndexMap,
+ scanOrder,
+ timePredicate,
+ pushDownPredicate,
+ pushDownLimit,
+ pushDownOffset,
+ pushLimitToEachDevice,
+ projection.build(),
+ aggregations,
+ groupingSetDescriptor,
+ preGroupedSymbols,
+ step,
+ groupIdSymbol);
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/TypeUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/TypeUtil.java
new file mode 100644
index 00000000000..21bbcd4a811
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/utils/TypeUtil.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.queryengine.plan.relational.utils;
+
+import org.apache.tsfile.read.common.type.BinaryType;
+import org.apache.tsfile.read.common.type.BlobType;
+import org.apache.tsfile.read.common.type.BooleanType;
+import org.apache.tsfile.read.common.type.DateType;
+import org.apache.tsfile.read.common.type.DoubleType;
+import org.apache.tsfile.read.common.type.FloatType;
+import org.apache.tsfile.read.common.type.RowType;
+import org.apache.tsfile.read.common.type.StringType;
+import org.apache.tsfile.read.common.type.TimestampType;
+import org.apache.tsfile.read.common.type.Type;
+import org.apache.tsfile.read.common.type.TypeEnum;
+import org.apache.tsfile.read.common.type.UnknownType;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.tsfile.read.common.type.IntType.INT32;
+import static org.apache.tsfile.read.common.type.LongType.INT64;
+
+public class TypeUtil {
+
+ public static void serialize(Type type, ByteBuffer byteBuffer) {
+ ReadWriteIOUtils.write(type.getTypeEnum().ordinal(), byteBuffer);
+ List<Type> typeParameters = type.getTypeParameters();
+ ReadWriteIOUtils.write(typeParameters.size(), byteBuffer);
+ for (Type typeParameter : typeParameters) {
+ ReadWriteIOUtils.write(typeParameter.getTypeEnum().ordinal(),
byteBuffer);
+ }
+ }
+
+ public static void serialize(Type type, DataOutputStream stream) throws
IOException {
+ ReadWriteIOUtils.write(type.getTypeEnum().ordinal(), stream);
+ List<Type> typeParameters = type.getTypeParameters();
+ ReadWriteIOUtils.write(typeParameters.size(), stream);
+ for (Type typeParameter : typeParameters) {
+ ReadWriteIOUtils.write(typeParameter.getTypeEnum().ordinal(), stream);
+ }
+ }
+
+ public static Type deserialize(ByteBuffer byteBuffer) {
+ TypeEnum typeEnum =
TypeEnum.values()[ReadWriteIOUtils.readInt(byteBuffer)];
+ int size = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Type> typeParameters = new ArrayList<>(size);
+ while (size-- > 0) {
+ typeParameters.add(
+ getType(
+ TypeEnum.values()[ReadWriteIOUtils.readInt(byteBuffer)],
Collections.emptyList()));
+ }
+ return getType(typeEnum, typeParameters);
+ }
+
+ public static Type getType(TypeEnum typeEnum, List<Type> subTypes) {
+ switch (typeEnum) {
+ case BOOLEAN:
+ return BooleanType.BOOLEAN;
+ case INT32:
+ return INT32;
+ case INT64:
+ return INT64;
+ case FLOAT:
+ return FloatType.FLOAT;
+ case DOUBLE:
+ return DoubleType.DOUBLE;
+ case TEXT:
+ return BinaryType.TEXT;
+ case STRING:
+ return StringType.STRING;
+ case BLOB:
+ return BlobType.BLOB;
+ case TIMESTAMP:
+ return TimestampType.TIMESTAMP;
+ case DATE:
+ return DateType.DATE;
+ case ROW:
+ return RowType.anonymous(subTypes);
+ default:
+ return UnknownType.UNKNOWN;
+ }
+ }
+}