This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/use_template_accelerate in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 55c5178433b30e9401fdeffc727231902b1ba1a6 Author: Beyyes <[email protected]> AuthorDate: Wed Nov 22 16:32:40 2023 +0800 use template to accelerate the performance of serialization --- .../IoTDBAlignByDeviceWithTemplateIT.java | 7 ++ .../IoTDBOrderByLimitOffsetAlignByDeviceIT.java | 7 ++ .../request/PipeTransferTabletBatchReq.java | 2 +- .../queryengine/plan/analyze/AnalyzeVisitor.java | 3 - .../db/queryengine/plan/analyze/TypeProvider.java | 99 ++++++++-------------- .../plan/planner/OperatorTreeGenerator.java | 12 +-- .../plan/planner/SubPlanTypeExtractor.java | 9 +- .../plan/planner/TemplatedLogicalPlan.java | 31 ++++--- .../plan/planner/TemplatedLogicalPlanBuilder.java | 3 + .../plan/planner/plan/PlanFragment.java | 36 ++++++-- .../plan/planner/plan/node/PlanNode.java | 18 +++- .../plan/planner/plan/node/PlanNodeType.java | 15 ++++ .../plan/node/source/AlignedSeriesScanNode.java | 38 +++++++++ 13 files changed, 184 insertions(+), 96 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java index 9ad4ab87e3f..21ff962ad6a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBAlignByDeviceWithTemplateIT.java @@ -19,16 +19,23 @@ package org.apache.iotdb.db.it.alignbydevice; import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; import java.sql.Connection; import java.sql.Statement; import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest; +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class}) public class IoTDBAlignByDeviceWithTemplateIT { private static final String[] sqls = new String[] { diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java index 6aac79065e2..1b88c38b585 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByLimitOffsetAlignByDeviceIT.java @@ -19,10 +19,15 @@ package org.apache.iotdb.db.it.alignbydevice; import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.LocalStandaloneIT; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; import java.sql.Connection; import java.sql.Statement; @@ -31,6 +36,8 @@ import static org.apache.iotdb.db.it.alignbydevice.IoTDBOrderByWithAlignByDevice import static org.apache.iotdb.db.it.alignbydevice.IoTDBOrderByWithAlignByDeviceIT.insertData2; import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest; +@RunWith(IoTDBTestRunner.class) +@Category({LocalStandaloneIT.class, ClusterIT.class}) public class IoTDBOrderByLimitOffsetAlignByDeviceIT { @BeforeClass diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java index 3faafbce293..710e3cb437e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletBatchReq.java @@ -162,7 +162,7 @@ public class PipeTransferTabletBatchReq extends TPipeTransferReq { for (int i = 0; i < size; ++i) { batchReq.insertNodeReqs.add( PipeTransferTabletInsertNodeReq.toTPipeTransferReq( - (InsertNode) PlanFragment.deserializeHelper(transferReq.body))); + (InsertNode) PlanFragment.deserializeHelper(transferReq.body, null))); } size = ReadWriteIOUtils.readInt(transferReq.body); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index d8993125300..66e0a02dd7a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -243,15 +243,12 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Analysis analysis = new Analysis(); analysis.setLastLevelUseWildcard(queryStatement.isLastLevelUseWildcard()); - long startTime = System.currentTimeMillis(); try { // check for semantic errors queryStatement.semanticCheck(); ISchemaTree schemaTree = analyzeSchema(queryStatement, analysis, context); - logger.warn("--- [analyzeSchema] : {}ms", System.currentTimeMillis() - startTime); - // If there is no leaf node in the schema tree, the query should be completed immediately if (schemaTree.isEmpty()) { return finishQuery(queryStatement, analysis); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java index 625f1e9d770..d54bf4b1e7a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java @@ -21,49 +21,27 @@ package org.apache.iotdb.db.queryengine.plan.analyze; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; public class TypeProvider { private final Map<String, TSDataType> typeMap; - ///////////////////////////////////////////////////////////////////////////////////////////////// - // All Queries Devices Set In One Template - ///////////////////////////////////////////////////////////////////////////////////////////////// - private List<String> measurementList; - private List<IMeasurementSchema> schemaList; - private List<TSDataType> dataTypes; - private Set<String> allSensors; + private TemplatedInfo templatedInfo; public TypeProvider() { this.typeMap = new HashMap<>(); } - public TypeProvider(Map<String, TSDataType> typeMap) { + public TypeProvider(Map<String, TSDataType> typeMap, TemplatedInfo templatedInfo) { this.typeMap = typeMap; - } - - public TypeProvider( - List<String> measurementList, - List<IMeasurementSchema> schemaList, - List<TSDataType> dataTypes, - Set<String> allSensors) { - if (measurementList != null) { - this.measurementList = measurementList; - this.schemaList = schemaList; - this.dataTypes = dataTypes; - this.allSensors = allSensors; - } - this.typeMap = new HashMap<>(); + this.templatedInfo = templatedInfo; } public TSDataType getType(String symbol) { @@ -77,12 +55,31 @@ public class TypeProvider { } } + public Map<String, TSDataType> getTypeMap() { + return this.typeMap; + } + + public void setTemplatedInfo(TemplatedInfo templatedInfo) { + this.templatedInfo = templatedInfo; + } + + public TemplatedInfo getTemplatedInfo() { + return this.templatedInfo; + } + public void serialize(ByteBuffer byteBuffer) { ReadWriteIOUtils.write(typeMap.size(), byteBuffer); for (Map.Entry<String, TSDataType> entry : typeMap.entrySet()) { ReadWriteIOUtils.write(entry.getKey(), byteBuffer); ReadWriteIOUtils.write(entry.getValue().ordinal(), byteBuffer); } + + if (templatedInfo == null) { + ReadWriteIOUtils.write((byte) 0, byteBuffer); + } else { + ReadWriteIOUtils.write((byte) 1, byteBuffer); + templatedInfo.serialize(byteBuffer); + } } public void serialize(DataOutputStream stream) throws IOException { @@ -91,6 +88,13 @@ public class TypeProvider { ReadWriteIOUtils.write(entry.getKey(), stream); ReadWriteIOUtils.write(entry.getValue().ordinal(), stream); } + + if (templatedInfo == null) { + ReadWriteIOUtils.write((byte) 0, stream); + } else { + ReadWriteIOUtils.write((byte) 1, stream); + templatedInfo.serialize(stream); + } } public static TypeProvider deserialize(ByteBuffer byteBuffer) { @@ -102,7 +106,14 @@ public class TypeProvider { TSDataType.values()[ReadWriteIOUtils.readInt(byteBuffer)]); mapSize--; } - return new TypeProvider(typeMap); + + TemplatedInfo templatedInfo = null; + byte hasTemplatedInfo = ReadWriteIOUtils.readByte(byteBuffer); + if (hasTemplatedInfo == 1) { + templatedInfo = TemplatedInfo.deserialize(byteBuffer); + } + + return new TypeProvider(typeMap, templatedInfo); } @Override @@ -121,40 +132,4 @@ public class TypeProvider { public int hashCode() { return Objects.hash(typeMap); } - - ///////////////////////////////////////////////////////////////////////////////////////////////// - // All Queries Devices Set In One Template - ///////////////////////////////////////////////////////////////////////////////////////////////// - - public void setMeasurementList(List<String> measurementList) { - this.measurementList = measurementList; - } - - public List<String> getMeasurementList() { - return this.measurementList; - } - - public void setSchemaList(List<IMeasurementSchema> schemaList) { - this.schemaList = schemaList; - } - - public List<IMeasurementSchema> getSchemaList() { - return this.schemaList; - } - - public void setDataTypes(List<TSDataType> dataTypes) { - this.dataTypes = dataTypes; - } - - public List<TSDataType> getDataTypes() { - return this.dataTypes; - } - - public void setAllSensors(Set<String> allSensors) { - this.allSensors = allSensors; - } - - public Set<String> getAllSensors() { - return this.allSensors; - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index e78d41133cb..fba7dab5587 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -346,8 +346,8 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP seriesScanOptionsBuilder.withOffset(node.getOffset()); AlignedPath seriesPath = node.getAlignedPath(); seriesScanOptionsBuilder.withAllSensors( - context.getTypeProvider().getAllSensors() != null - ? context.getTypeProvider().getAllSensors() + context.getTypeProvider().getTemplatedInfo() != null + ? context.getTypeProvider().getTemplatedInfo().getAllSensors() : new HashSet<>(seriesPath.getMeasurementList())); OperatorContext operatorContext = @@ -365,7 +365,9 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP node.getScanOrder(), seriesScanOptionsBuilder.build(), node.isQueryAllSensors(), - context.getTypeProvider().getDataTypes()); + context.getTypeProvider().getTemplatedInfo() != null + ? context.getTypeProvider().getTemplatedInfo().getDataTypes() + : null); ((DataDriverContext) context.getDriverContext()).addSourceOperator(seriesScanOperator); ((DataDriverContext) context.getDriverContext()).addPath(seriesPath); @@ -1880,7 +1882,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP List<OutputColumn> outputColumns = generateOutputColumnsFromChildren(node); List<ColumnMerger> mergers = createColumnMergers(outputColumns, timeComparator); List<TSDataType> outputColumnTypes = - context.getTypeProvider().getMeasurementList() != null + context.getTypeProvider().getTemplatedInfo() != null ? getOutputColumnTypesOfTimeJoinNode(node) : getOutputColumnTypes(node, context.getTypeProvider()); @@ -2487,7 +2489,7 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP } private List<TSDataType> getInputColumnTypes(PlanNode node, TypeProvider typeProvider) { - if (typeProvider.getMeasurementList() == null) { + if (typeProvider.getTemplatedInfo() == null) { return node.getChildren().stream() .map(PlanNode::getOutputColumnNames) .flatMap(List::stream) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java index 8d4df010115..f14582a6075 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/SubPlanTypeExtractor.java @@ -46,12 +46,9 @@ public class SubPlanTypeExtractor { private SubPlanTypeExtractor() {} public static TypeProvider extractor(PlanNode root, TypeProvider allTypes) { + TypeProvider typeProvider = - new TypeProvider( - allTypes.getMeasurementList(), - allTypes.getSchemaList(), - allTypes.getDataTypes(), - allTypes.getAllSensors()); + new TypeProvider(allTypes.getTypeMap(), allTypes.getTemplatedInfo()); root.accept(new Visitor(typeProvider, allTypes), null); return typeProvider; } @@ -164,7 +161,7 @@ public class SubPlanTypeExtractor { @Override public Void visitSingleDeviceView(SingleDeviceViewNode node, Void context) { - if (typeProvider.getMeasurementList() != null) { + if (typeProvider.getTemplatedInfo() != null) { return null; } return visitPlan(node, context); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java index ead71d394f9..abf8eb7a51c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlan.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.plan.analyze.Analysis; +import org.apache.iotdb.db.queryengine.plan.analyze.TemplatedInfo; import org.apache.iotdb.db.queryengine.plan.expression.Expression; import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; @@ -69,9 +70,10 @@ public class TemplatedLogicalPlan { new TemplatedLogicalPlanBuilder(analysis, context, measurementList, schemaList); Map<String, PlanNode> deviceToSubPlanMap = new LinkedHashMap<>(); + long limitValue = pushDownLimitToScanNode(queryStatement, analysis); for (PartialPath devicePath : analysis.getDeviceList()) { String deviceName = devicePath.getFullPath(); - PlanNode rootNode = visitQueryBody(devicePath, analysis, queryStatement, context); + PlanNode rootNode = visitQueryBody(devicePath, analysis, queryStatement, context, limitValue); LogicalPlanBuilder subPlanBuilder = new TemplatedLogicalPlanBuilder(analysis, context, measurementList, schemaList) @@ -120,7 +122,8 @@ public class TemplatedLogicalPlan { PartialPath devicePath, Analysis analysis, QueryStatement queryStatement, - MPPQueryContext context) { + MPPQueryContext context, + long limitValue) { List<String> mergedMeasurementList = measurementList; List<IMeasurementSchema> mergedSchemaList = schemaList; @@ -157,7 +160,7 @@ public class TemplatedLogicalPlan { queryStatement.getResultTimeOrder(), analysis.getGlobalTimeFilter(), 0, - pushDownLimitToScanNode(queryStatement, analysis), + limitValue, analysis.isLastLevelUseWildcard()); if (whereExpression != null) { @@ -178,16 +181,22 @@ public class TemplatedLogicalPlan { queryStatement.getResultTimeOrder()); } - if (context.getTypeProvider().getMeasurementList() == null) { - context.getTypeProvider().setMeasurementList(mergedMeasurementList); - context.getTypeProvider().setSchemaList(mergedSchemaList); + if (context.getTypeProvider().getTemplatedInfo() == null) { context .getTypeProvider() - .setDataTypes( - mergedSchemaList.stream() - .map(IMeasurementSchema::getType) - .collect(Collectors.toList())); - context.getTypeProvider().setAllSensors(new HashSet<>(mergedMeasurementList)); + .setTemplatedInfo( + new TemplatedInfo( + mergedMeasurementList, + mergedSchemaList, + mergedSchemaList.stream() + .map(IMeasurementSchema::getType) + .collect(Collectors.toList()), + new HashSet<>(mergedMeasurementList), + analysis.getGlobalTimeFilter(), + queryStatement.getResultTimeOrder(), + analysis.isLastLevelUseWildcard(), + 0, + limitValue)); } return planBuilder.getRoot(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java index 6770053fea5..465e0ca5499 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TemplatedLogicalPlanBuilder.java @@ -77,6 +77,9 @@ public class TemplatedLogicalPlanBuilder extends LogicalPlanBuilder { path.setMeasurementList(measurementList); path.addSchemas(schemaList); + // if value filter push down is implemented, + // the serializeUseTemplate and deserializeUseTemplate method of AlignedSeriesScanNode also + // need adapt AlignedSeriesScanNode alignedSeriesScanNode = new AlignedSeriesScanNode( context.getQueryId().genPlanNodeId(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java index 658c39b26ab..f2d5d73affb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.SubPlanTypeExtractor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.IPartitionRelatedNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.VirtualSourceNode; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -145,31 +146,52 @@ public class PlanFragment { public void serialize(DataOutputStream stream) throws IOException { id.serialize(stream); - planNodeTree.serialize(stream); if (typeProvider == null) { ReadWriteIOUtils.write((byte) 0, stream); } else { ReadWriteIOUtils.write((byte) 1, stream); + + // templated device, the serialized attribute basically same, + // so there is no need to serialize all the SeriesScanNode repeated + if (typeProvider.getTemplatedInfo() != null) { + typeProvider.serialize(stream); + planNodeTree.serializeUseTemplate(stream, typeProvider); + return; + } + typeProvider.serialize(stream); } + planNodeTree.serialize(stream); } public static PlanFragment deserialize(ByteBuffer byteBuffer) { - PlanFragment planFragment = - new PlanFragment(PlanFragmentId.deserialize(byteBuffer), deserializeHelper(byteBuffer)); + PlanFragmentId planFragmentId = PlanFragmentId.deserialize(byteBuffer); byte hasTypeProvider = ReadWriteIOUtils.readByte(byteBuffer); + TypeProvider typeProvider = null; if (hasTypeProvider == 1) { - planFragment.setTypeProvider(TypeProvider.deserialize(byteBuffer)); + typeProvider = TypeProvider.deserialize(byteBuffer); } + PlanFragment planFragment = + new PlanFragment(planFragmentId, deserializeHelper(byteBuffer, typeProvider)); + planFragment.setTypeProvider(typeProvider); return planFragment; } // deserialize the plan node recursively - public static PlanNode deserializeHelper(ByteBuffer byteBuffer) { - PlanNode root = PlanNodeType.deserialize(byteBuffer); + public static PlanNode deserializeHelper(ByteBuffer byteBuffer, TypeProvider typeProvider) { + PlanNode root; + if (typeProvider != null && typeProvider.getTemplatedInfo() != null) { + root = PlanNodeType.deserializeWithTemplate(byteBuffer, typeProvider); + if (root instanceof AlignedSeriesScanNode) { + return root; + } + } else { + root = PlanNodeType.deserialize(byteBuffer); + } + int childrenCount = byteBuffer.getInt(); for (int i = 0; i < childrenCount; i++) { - root.addChild(deserializeHelper(byteBuffer)); + root.addChild(deserializeHelper(byteBuffer, typeProvider)); } return root; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java index 9dac211fb02..2b73576ea03 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNode.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node; import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException; import org.apache.iotdb.consensus.common.request.IConsensusRequest; +import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.tsfile.utils.PublicBAOS; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; @@ -43,7 +44,7 @@ public abstract class PlanNode implements IConsensusRequest { protected static final int ONE_CHILD = 1; protected static final int CHILD_COUNT_NO_LIMIT = -1; - private PlanNodeId id; + protected PlanNodeId id; protected PlanNode(PlanNodeId id) { requireNonNull(id, "id is null"); @@ -134,6 +135,21 @@ public abstract class PlanNode implements IConsensusRequest { } } + public void serializeUseTemplate(DataOutputStream stream, TypeProvider typeProvider) + throws IOException { + serializeAttributes(stream); + id.serialize(stream); + List<PlanNode> planNodes = getChildren(); + if (planNodes == null) { + ReadWriteIOUtils.write(0, stream); + } else { + ReadWriteIOUtils.write(planNodes.size(), stream); + for (PlanNode planNode : planNodes) { + planNode.serializeUseTemplate(stream, typeProvider); + } + } + } + /** * Deserialize via {@link * org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType#deserialize(ByteBuffer)} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 19e73bfdede..fc3034f87ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.queryengine.plan.planner.plan.node; +import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.CountSchemaMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.DevicesCountNode; @@ -238,6 +239,20 @@ public enum PlanNodeType { public static PlanNode deserialize(ByteBuffer buffer) { short nodeType = buffer.getShort(); + return deserialize(buffer, nodeType); + } + + public static PlanNode deserializeWithTemplate(ByteBuffer buffer, TypeProvider typeProvider) { + short nodeType = buffer.getShort(); + switch (nodeType) { + case 33: + return AlignedSeriesScanNode.deserializeUseTemplate(buffer, typeProvider); + default: + return deserialize(buffer, nodeType); + } + } + + public static PlanNode deserialize(ByteBuffer buffer, short nodeType) { switch (nodeType) { case 0: return AggregationNode.deserialize(buffer); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java index 638c1c81e3f..37bb374f25b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/source/AlignedSeriesScanNode.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.path.PathDeserializeUtil; +import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; @@ -287,6 +288,43 @@ public class AlignedSeriesScanNode extends SeriesSourceNode { queryAllSensors); } + @Override + public void serializeUseTemplate(DataOutputStream stream, TypeProvider typeProvider) + throws IOException { + PlanNodeType.ALIGNED_SERIES_SCAN.serialize(stream); + id.serialize(stream); + ReadWriteIOUtils.write(alignedPath.getNodes().length, stream); + for (String node : alignedPath.getNodes()) { + ReadWriteIOUtils.write(node, stream); + } + // ReadWriteIOUtils.write(getChildren().size(), stream); + } + + public static AlignedSeriesScanNode deserializeUseTemplate( + ByteBuffer byteBuffer, TypeProvider typeProvider) { + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + + int nodeSize = ReadWriteIOUtils.readInt(byteBuffer); + String[] nodes = new String[nodeSize]; + for (int i = 0; i < nodeSize; i++) { + nodes[i] = ReadWriteIOUtils.readString(byteBuffer); + } + AlignedPath alignedPath = new AlignedPath(new PartialPath(nodes)); + alignedPath.setMeasurementList(typeProvider.getTemplatedInfo().getMeasurementList()); + alignedPath.addSchemas(typeProvider.getTemplatedInfo().getSchemaList()); + + return new AlignedSeriesScanNode( + planNodeId, + alignedPath, + typeProvider.getTemplatedInfo().getScanOrder(), + typeProvider.getTemplatedInfo().getTimeFilter(), + typeProvider.getTemplatedInfo().getTimeFilter(), + typeProvider.getTemplatedInfo().getLimitValue(), + typeProvider.getTemplatedInfo().getOffsetValue(), + null, + typeProvider.getTemplatedInfo().isQueryAllSensors()); + } + @Override public boolean equals(Object o) { if (this == o) {
