This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/debug-table in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit da1662de077a88600fefe3f7b0c23ee444d99e53 Author: Beyyes <[email protected]> AuthorDate: Tue May 21 21:01:49 2024 +0800 add multi datanode multi dataregion impl for table model --- .../iotdb/confignode/conf/ConfigNodeConfig.java | 4 +- .../db/queryengine/plan/analyze/TypeProvider.java | 23 ++-- .../plan/planner/TableOperatorGenerator.java | 46 +++++++ .../plan/planner/plan/node/PlanNodeType.java | 42 +++++- .../plan/planner/plan/node/PlanVisitor.java | 9 +- .../plan/relational/metadata/ColumnSchema.java | 56 ++++++++ .../plan/relational/metadata/DeviceEntry.java | 32 +++++ .../plan/relational/planner/OrderingScheme.java | 49 +++++++ .../plan/relational/planner/Symbol.java | 18 +++ .../planner/distribute/ExchangeNodeGenerator.java | 77 ++++++++++- .../distribute/RelationalDistributionPlanner.java | 77 ++++++++--- .../planner/distribute/SubPlanGenerator.java | 8 +- .../TableModelTypeProviderExtractor.java | 21 +++ .../plan/relational/planner/node/FilterNode.java | 4 + .../plan/relational/planner/node/LimitNode.java | 4 + .../relational/planner/node/MergeSortNode.java | 33 ++++- .../plan/relational/planner/node/OffsetNode.java | 4 + .../plan/relational/planner/node/OutputNode.java | 45 ++++++- .../plan/relational/planner/node/ProjectNode.java | 4 + .../plan/relational/planner/node/SortNode.java | 4 + .../relational/planner/node/TableScanNode.java | 144 ++++++++++++++++++++- .../plan/relational/planner/node/TopKNode.java | 4 + .../planner/optimizations/IndexScan.java | 28 ++-- .../schema/table/column/TsTableColumnCategory.java | 6 +- 24 files changed, 670 insertions(+), 72 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 3177bf0c0fd..4b02421f253 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -94,14 +94,14 @@ public class ConfigNodeConfig { /** The policy of extension DataRegionGroup for each Database. */ private RegionGroupExtensionPolicy dataRegionGroupExtensionPolicy = - RegionGroupExtensionPolicy.AUTO; + RegionGroupExtensionPolicy.CUSTOM; /** * When set data_region_group_extension_policy=CUSTOM, this parameter is the default number of * DataRegionGroups for each Database. When set data_region_group_extension_policy=AUTO, this * parameter is the default minimal number of DataRegionGroups for each Database. */ - private int defaultDataRegionGroupNumPerDatabase = 1; + private int defaultDataRegionGroupNumPerDatabase = 3; /** The maximum number of DataRegions expected to be managed by each DataNode. */ private double dataRegionPerDataNode = 5.0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java index 39945fc27ca..fc6c56ae599 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/TypeProvider.java @@ -164,10 +164,12 @@ public class TypeProvider { } public void serialize(DataOutputStream stream) throws IOException { - ReadWriteIOUtils.write(treeModelTypeMap.size(), stream); - for (Map.Entry<String, TSDataType> entry : treeModelTypeMap.entrySet()) { - ReadWriteIOUtils.write(entry.getKey(), stream); - ReadWriteIOUtils.write(entry.getValue().ordinal(), stream); + ReadWriteIOUtils.write(treeModelTypeMap == null ? 0 : treeModelTypeMap.size(), stream); + if (treeModelTypeMap != null) { + for (Map.Entry<String, TSDataType> entry : treeModelTypeMap.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), stream); + ReadWriteIOUtils.write(entry.getValue().ordinal(), stream); + } } if (templatedInfo == null) { @@ -180,6 +182,7 @@ public class TypeProvider { if (tableModelTypes == null) { ReadWriteIOUtils.write((byte) 0, stream); } else { + ReadWriteIOUtils.write((byte) 1, stream); ReadWriteIOUtils.write(tableModelTypes.size(), stream); for (Map.Entry<Symbol, Type> entry : tableModelTypes.entrySet()) { ReadWriteIOUtils.write(entry.getKey().getName(), stream); @@ -204,20 +207,20 @@ public class TypeProvider { templatedInfo = TemplatedInfo.deserialize(byteBuffer); } - Map<Symbol, Type> types = null; - byte hasTypes = ReadWriteIOUtils.readByte(byteBuffer); - if (hasTypes == 1) { + Map<Symbol, Type> tableModelTypes = null; + byte hasTableModelTypes = ReadWriteIOUtils.readByte(byteBuffer); + if (hasTableModelTypes == 1) { mapSize = ReadWriteIOUtils.readInt(byteBuffer); - types = new HashMap<>(mapSize); + tableModelTypes = new HashMap<>(mapSize); while (mapSize > 0) { - types.put( + tableModelTypes.put( new Symbol(ReadWriteIOUtils.readString(byteBuffer)), TypeFactory.getType(TypeEnum.values()[ReadWriteIOUtils.readInt(byteBuffer)])); mapSize--; } } - return new TypeProvider(typeMap, templatedInfo, types); + return new TypeProvider(typeMap, templatedInfo, tableModelTypes); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 09e7b9cae42..69d6cb305b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.planner; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.path.AlignedPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; @@ -28,6 +29,7 @@ import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeService import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelIndex; import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle; import org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle; +import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; import org.apache.iotdb.db.queryengine.execution.operator.process.FilterAndProjectOperator; @@ -38,6 +40,7 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.TopKOperator; import org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator; import org.apache.iotdb.db.queryengine.execution.relational.ColumnTransformerBuilder; import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; @@ -45,6 +48,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation; import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; @@ -86,6 +90,7 @@ import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.db.queryengine.common.DataNodeEndPoints.isSameNode; import static org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparatorForTable; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; import static org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter; @@ -142,6 +147,47 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution return new IdentitySinkOperator(operatorContext, children, downStreamChannelIndex, sinkHandle); } + @Override + public Operator visitExchange(ExchangeNode node, LocalExecutionPlanContext context) { + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + ExchangeOperator.class.getSimpleName()); + + FragmentInstanceId localInstanceId = context.getInstanceContext().getId(); + FragmentInstanceId remoteInstanceId = node.getUpstreamInstanceId(); + + TEndPoint upstreamEndPoint = node.getUpstreamEndpoint(); + boolean isSameNode = isSameNode(upstreamEndPoint); + ISourceHandle sourceHandle = + isSameNode + ? MPP_DATA_EXCHANGE_MANAGER.createLocalSourceHandleForFragment( + localInstanceId.toThrift(), + node.getPlanNodeId().getId(), + node.getUpstreamPlanNodeId().getId(), + remoteInstanceId.toThrift(), + node.getIndexOfUpstreamSinkHandle(), + context.getInstanceContext()::failed) + : MPP_DATA_EXCHANGE_MANAGER.createSourceHandle( + localInstanceId.toThrift(), + node.getPlanNodeId().getId(), + node.getIndexOfUpstreamSinkHandle(), + upstreamEndPoint, + remoteInstanceId.toThrift(), + context.getInstanceContext()::failed); + if (!isSameNode) { + context.addExchangeSumNum(1); + } + sourceHandle.setMaxBytesCanReserve(context.getMaxBytesOneHandleCanReserve()); + ExchangeOperator exchangeOperator = + new ExchangeOperator(operatorContext, sourceHandle, node.getUpstreamPlanNodeId()); + context.addExchangeOperator(exchangeOperator); + return exchangeOperator; + } + @Override public Operator visitTableScan(TableScanNode node, LocalExecutionPlanContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 1152d9b0c6e..0e52538126b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -79,7 +79,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTag import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.IntoNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleDeviceViewNode; @@ -218,7 +217,17 @@ public enum PlanNodeType { CREATE_TABLE_DEVICE((short) 92), TABLE_DEVICE_SCAN((short) 93), TABLE_DEVICE_FETCH((short) 94), - DELETE_TABLE_DEVICE((short) 95); + DELETE_TABLE_DEVICE((short) 95), + + TABLE_SCAN_NODE((short) 1000), + TABLE_FILTER_NODE((short) 1001), + TABLE_PROJECT_NODE((short) 1002), + TABLE_OUTPUT_NODE((short) 1003), + TABLE_LIMIT_NODE((short) 1004), + TABLE_OFFSET_NODE((short) 1005), + TABLE_SORT_NODE((short) 1006), + TABLE_MERGESORT_NODE((short) 1007), + TABLE_TOPK_NODE((short) 1008); public static final int BYTES = Short.BYTES; @@ -400,7 +409,8 @@ public enum PlanNodeType { case 65: return SingleDeviceViewNode.deserialize(buffer); case 66: - return MergeSortNode.deserialize(buffer); + return org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode + .deserialize(buffer); case 67: return ShowQueriesNode.deserialize(buffer); case 68: @@ -457,6 +467,32 @@ public enum PlanNodeType { return TableDeviceFetchNode.deserialize(buffer); case 95: return DeleteTableDeviceNode.deserialize(buffer); + case 1000: + return org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode + .deserialize(buffer); + case 1001: + return org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode.deserialize( + buffer); + case 1002: + return org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode.deserialize( + buffer); + case 1003: + return org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode.deserialize( + buffer); + case 1004: + return org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode.deserialize( + buffer); + case 1005: + return org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode.deserialize( + buffer); + case 1006: + return org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode.deserialize( + buffer); + case 1007: + return org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode + .deserialize(buffer); + case 1008: + return TopKNode.deserialize(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index 03b28177027..34fd19bd7b3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -76,7 +76,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByTag import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.HorizontallyConcatNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.IntoNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ProjectNode; @@ -112,6 +111,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNod import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; @SuppressWarnings("java:S6539") // suppress "Monster class" warning @@ -291,7 +291,9 @@ public abstract class PlanVisitor<R, C> { return visitSingleChildProcess(node, context); } - public R visitMergeSort(MergeSortNode node, C context) { + public R visitMergeSort( + org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode node, + C context) { return visitMultiChildProcess(node, context); } @@ -538,8 +540,7 @@ public abstract class PlanVisitor<R, C> { return visitPlan(node, context); } - public R visitMergeSort( - org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode node, C context) { + public R visitMergeSort(MergeSortNode node, C context) { return visitPlan(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java index bf8027339b4..bb460c61423 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/ColumnSchema.java @@ -21,13 +21,25 @@ package org.apache.iotdb.db.queryengine.plan.relational.metadata; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; +import org.apache.tsfile.read.common.type.BinaryType; +import org.apache.tsfile.read.common.type.BooleanType; +import org.apache.tsfile.read.common.type.DoubleType; +import org.apache.tsfile.read.common.type.FloatType; import org.apache.tsfile.read.common.type.Type; +import org.apache.tsfile.read.common.type.TypeEnum; +import org.apache.tsfile.read.common.type.UnknownType; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Objects; import java.util.StringJoiner; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; +import static org.apache.tsfile.read.common.type.IntType.INT32; +import static org.apache.tsfile.read.common.type.LongType.INT64; public class ColumnSchema { private final String name; @@ -87,6 +99,50 @@ public class ColumnSchema { .toString(); } + public static void serialize(ColumnSchema columnSchema, ByteBuffer byteBuffer) { + ReadWriteIOUtils.write(columnSchema.getName(), byteBuffer); + ReadWriteIOUtils.write(columnSchema.getType().getTypeEnum().ordinal(), byteBuffer); + columnSchema.getColumnCategory().serialize(byteBuffer); + ReadWriteIOUtils.write(columnSchema.isHidden(), byteBuffer); + } + + public static void serialize(ColumnSchema columnSchema, DataOutputStream stream) + throws IOException { + ReadWriteIOUtils.write(columnSchema.getName(), stream); + ReadWriteIOUtils.write(columnSchema.getType().getTypeEnum().ordinal(), stream); + columnSchema.getColumnCategory().serialize(stream); + ReadWriteIOUtils.write(columnSchema.isHidden(), stream); + } + + public static ColumnSchema deserialize(ByteBuffer byteBuffer) { + String name = ReadWriteIOUtils.readString(byteBuffer); + TypeEnum typeEnum = TypeEnum.values()[ReadWriteIOUtils.readInt(byteBuffer)]; + Type type = getType(typeEnum); + TsTableColumnCategory columnCategory = TsTableColumnCategory.deserialize(byteBuffer); + boolean isHidden = ReadWriteIOUtils.readBool(byteBuffer); + + return new ColumnSchema(name, type, isHidden, columnCategory); + } + + public static Type getType(TypeEnum typeEnum) { + switch (typeEnum) { + case BOOLEAN: + return BooleanType.BOOLEAN; + case INT32: + return INT32; + case INT64: + return INT64; + case FLOAT: + return FloatType.FLOAT; + case DOUBLE: + return DoubleType.DOUBLE; + case TEXT: + return BinaryType.TEXT; + default: + return UnknownType.UNKNOWN; + } + } + public static Builder builder() { return new Builder(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/DeviceEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/DeviceEntry.java index cac441ef1f1..396352ab78d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/DeviceEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/DeviceEntry.java @@ -20,7 +20,13 @@ package org.apache.iotdb.db.queryengine.plan.relational.metadata; import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; public class DeviceEntry { @@ -41,6 +47,32 @@ public class DeviceEntry { return attributeColumnValues; } + public void serialize(ByteBuffer byteBuffer) { + deviceID.serialize(byteBuffer); + ReadWriteIOUtils.write(attributeColumnValues.size(), byteBuffer); + for (String value : attributeColumnValues) { + ReadWriteIOUtils.write(value, byteBuffer); + } + } + + public void serialize(DataOutputStream stream) throws IOException { + deviceID.serialize(stream); + ReadWriteIOUtils.write(attributeColumnValues.size(), stream); + for (String value : attributeColumnValues) { + ReadWriteIOUtils.write(value, stream); + } + } + + public static DeviceEntry deserialize(ByteBuffer byteBuffer) { + IDeviceID iDeviceID = StringArrayDeviceID.deserialize(byteBuffer); + int size = ReadWriteIOUtils.readInt(byteBuffer); + List<String> attributeColumnValues = new ArrayList<>(size); + while (size-- > 0) { + attributeColumnValues.add(ReadWriteIOUtils.readString(byteBuffer)); + } + return new DeviceEntry(iDeviceID, attributeColumnValues); + } + @Override public String toString() { return "DeviceEntry{" diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java index b7d57d5f056..9edc2ad3c54 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/OrderingScheme.java @@ -17,7 +17,13 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -59,6 +65,49 @@ public class OrderingScheme { return orderings.get(symbol); } + public void serialize(ByteBuffer byteBuffer) { + ReadWriteIOUtils.write(orderBy.size(), byteBuffer); + for (Symbol symbol : orderBy) { + Symbol.serialize(symbol, byteBuffer); + } + + ReadWriteIOUtils.write(orderings.size(), byteBuffer); + for (Map.Entry<Symbol, SortOrder> entry : orderings.entrySet()) { + Symbol.serialize(entry.getKey(), byteBuffer); + ReadWriteIOUtils.write(entry.getValue().ordinal(), byteBuffer); + } + } + + public void serialize(DataOutputStream stream) throws IOException { + ReadWriteIOUtils.write(orderBy.size(), stream); + for (Symbol symbol : orderBy) { + Symbol.serialize(symbol, stream); + } + + ReadWriteIOUtils.write(orderings.size(), stream); + for (Map.Entry<Symbol, SortOrder> entry : orderings.entrySet()) { + Symbol.serialize(entry.getKey(), stream); + ReadWriteIOUtils.write(entry.getValue().ordinal(), stream); + } + } + + public static OrderingScheme deserialize(ByteBuffer byteBuffer) { + int size = ReadWriteIOUtils.readInt(byteBuffer); + List<Symbol> orderBy = new ArrayList<>(size); + while (size-- > 0) { + orderBy.add(Symbol.deserialize(byteBuffer)); + } + + size = ReadWriteIOUtils.readInt(byteBuffer); + Map<Symbol, SortOrder> orderings = new HashMap<>(size); + while (size-- > 0) { + orderings.put( + Symbol.deserialize(byteBuffer), SortOrder.values()[ReadWriteIOUtils.readInt(byteBuffer)]); + } + + return new OrderingScheme(orderBy, orderings); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/Symbol.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/Symbol.java index 847197ba6b1..946e90d8cbe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/Symbol.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/Symbol.java @@ -22,6 +22,12 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner; import org.apache.iotdb.db.relational.sql.tree.Expression; import org.apache.iotdb.db.relational.sql.tree.SymbolReference; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; @@ -74,4 +80,16 @@ public class Symbol implements Comparable<Symbol> { public int compareTo(Symbol o) { return name.compareTo(o.name); } + + public static void serialize(Symbol symbol, ByteBuffer byteBuffer) { + ReadWriteIOUtils.write(symbol.getName(), byteBuffer); + } + + public static void serialize(Symbol symbol, DataOutputStream stream) throws IOException { + ReadWriteIOUtils.write(symbol.getName(), stream); + } + + public static Symbol deserialize(ByteBuffer byteBuffer) { + return new Symbol(ReadWriteIOUtils.readString(byteBuffer)); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/ExchangeNodeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/ExchangeNodeGenerator.java index d11f3a1ee82..63b7a4896ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/ExchangeNodeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/ExchangeNodeGenerator.java @@ -13,21 +13,84 @@ */ package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; +import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; -public class ExchangeNodeGenerator - extends SimplePlanRewriter<ExchangeNodeGenerator.DistributionPlanContext> { +import static org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistributionType.SAME_WITH_ALL_CHILDREN; + +public class ExchangeNodeGenerator extends SimplePlanRewriter<ExchangeNodeGenerator.PlanContext> { @Override - public List<PlanNode> visitTableScan( - TableScanNode node, ExchangeNodeGenerator.DistributionPlanContext context) { - // TODO process that the data of TableScanNode locates in multi data regions - return Collections.singletonList(node); + public List<PlanNode> visitTableScan(TableScanNode node, PlanContext context) { + + if (node.getRegionReplicaSetList().size() > 1) { + context.hasExchangeNode = true; + List<Symbol> orderBy = node.getOutputSymbols().subList(0, 1); + Map<Symbol, SortOrder> orderings = + Collections.singletonMap(node.getOutputSymbols().get(0), SortOrder.ASC_NULLS_LAST); + OrderingScheme orderingScheme = new OrderingScheme(orderBy, orderings); + MergeSortNode mergeSortNode = + new MergeSortNode( + context.queryContext.getQueryId().genPlanNodeId(), + orderingScheme, + node.getOutputSymbols()); + + for (int i = 0; i < node.getRegionReplicaSetList().size(); i++) { + TRegionReplicaSet regionReplicaSet = node.getRegionReplicaSetList().get(i); + TableScanNode subTableScanNode = node.clone(); + subTableScanNode.setPlanNodeId(context.queryContext.getQueryId().genPlanNodeId()); + subTableScanNode.setRegionReplicaSet(regionReplicaSet); + context.nodeDistributionMap.put( + subTableScanNode.getPlanNodeId(), + new NodeDistribution(SAME_WITH_ALL_CHILDREN, regionReplicaSet)); + + // TODO not use 0 replica set as root replica? + if (i == 0) { + mergeSortNode.addChild(subTableScanNode); + context.nodeDistributionMap.put( + mergeSortNode.getPlanNodeId(), + new NodeDistribution(SAME_WITH_ALL_CHILDREN, regionReplicaSet)); + } else { + ExchangeNode exchangeNode = + new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId()); + exchangeNode.addChild(subTableScanNode); + mergeSortNode.addChild(exchangeNode); + } + } + return Collections.singletonList(mergeSortNode); + } else { + node.setRegionReplicaSet(node.getRegionReplicaSetList().get(0)); + return Collections.singletonList(node); + } } - public static class DistributionPlanContext {} + public static class PlanContext { + final MPPQueryContext queryContext; + final Map<PlanNodeId, NodeDistribution> nodeDistributionMap; + TRegionReplicaSet mostlyUsedDataRegion; + boolean hasExchangeNode = false; + + public PlanContext(MPPQueryContext queryContext) { + this.queryContext = queryContext; + this.nodeDistributionMap = new HashMap<>(); + } + + public NodeDistribution getNodeDistribution(PlanNodeId nodeId) { + return this.nodeDistributionMap.get(nodeId); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java index e3cd4e9f8e6..f9dd7e85cec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/RelationalDistributionPlanner.java @@ -13,6 +13,7 @@ */ package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; @@ -21,13 +22,16 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance; import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.relational.sql.tree.Query; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING; @@ -35,45 +39,47 @@ import static org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOper public class RelationalDistributionPlanner { private final Analysis analysis; private final LogicalQueryPlan logicalQueryPlan; - private final MPPQueryContext context; + private final MPPQueryContext mppQueryContext; public RelationalDistributionPlanner( - Analysis analysis, LogicalQueryPlan logicalQueryPlan, MPPQueryContext context) { + Analysis analysis, LogicalQueryPlan logicalQueryPlan, MPPQueryContext mppQueryContext) { this.analysis = analysis; this.logicalQueryPlan = logicalQueryPlan; - this.context = context; + this.mppQueryContext = mppQueryContext; } public DistributedQueryPlan plan() { + ExchangeNodeGenerator.PlanContext exchangeContext = + new ExchangeNodeGenerator.PlanContext(mppQueryContext); List<PlanNode> distributedPlanNodeResult = - new ExchangeNodeGenerator() - .visitPlan( - logicalQueryPlan.getRootNode(), - new ExchangeNodeGenerator.DistributionPlanContext()); + new ExchangeNodeGenerator().visitPlan(logicalQueryPlan.getRootNode(), exchangeContext); if (distributedPlanNodeResult.size() != 1) { throw new IllegalStateException("root node must return only one"); } - PlanNode outputNode = distributedPlanNodeResult.get(0); - if (analysis.getStatement() != null && analysis.getStatement() instanceof Query) { + PlanNode outputNodeWithExchange = distributedPlanNodeResult.get(0); + if (analysis.getStatement() instanceof Query) { analysis .getRespDatasetHeader() .setColumnToTsBlockIndexMap( - outputNode.getOutputSymbols().stream() + outputNodeWithExchange.getOutputSymbols().stream() .map(Symbol::getName) .filter(e -> !TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(e)) .collect(Collectors.toList())); } + adjustUpStream(outputNodeWithExchange, exchangeContext); - SubPlan subPlan = new SubPlanGenerator().splitToSubPlan(logicalQueryPlan); + SubPlan subPlan = + new SubPlanGenerator() + .splitToSubPlan(logicalQueryPlan.getContext().getQueryId(), outputNodeWithExchange); subPlan.getPlanFragment().setRoot(true); List<FragmentInstance> fragmentInstances = - new FragmentInstanceGenerator(subPlan, analysis, context).plan(); + new FragmentInstanceGenerator(subPlan, analysis, mppQueryContext).plan(); // Only execute this step for READ operation - if (context.getQueryType() == QueryType.READ) { + if (mppQueryContext.getQueryType() == QueryType.READ) { setSinkForRootInstance(subPlan, fragmentInstances); } @@ -96,14 +102,17 @@ public class RelationalDistributionPlanner { IdentitySinkNode sinkNode = new IdentitySinkNode( - context.getQueryId().genPlanNodeId(), + mppQueryContext.getQueryId().genPlanNodeId(), Collections.singletonList(rootInstance.getFragment().getPlanNodeTree()), Collections.singletonList( new DownStreamChannelLocation( - context.getLocalDataBlockEndpoint(), - context.getResultNodeContext().getVirtualFragmentInstanceId().toThrift(), - context.getResultNodeContext().getVirtualResultNodeId().getId()))); - context + mppQueryContext.getLocalDataBlockEndpoint(), + mppQueryContext + .getResultNodeContext() + .getVirtualFragmentInstanceId() + .toThrift(), + mppQueryContext.getResultNodeContext().getVirtualResultNodeId().getId()))); + mppQueryContext .getResultNodeContext() .setUpStream( rootInstance.getHostDataNode().mPPDataExchangeEndPoint, @@ -111,4 +120,36 @@ public class RelationalDistributionPlanner { sinkNode.getPlanNodeId()); rootInstance.getFragment().setPlanNodeTree(sinkNode); } + + private void adjustUpStream(PlanNode root, ExchangeNodeGenerator.PlanContext exchangeContext) { + if (!exchangeContext.hasExchangeNode) { + return; + } + + adjustUpStreamHelper(root, exchangeContext, new HashMap<>()); + } + + private void adjustUpStreamHelper( + PlanNode root, + ExchangeNodeGenerator.PlanContext exchangeContext, + Map<TRegionReplicaSet, IdentitySinkNode> regionNodemap) { + for (PlanNode child : root.getChildren()) { + adjustUpStreamHelper(child, exchangeContext, regionNodemap); + + if (child instanceof ExchangeNode) { + ExchangeNode exchangeNode = (ExchangeNode) child; + IdentitySinkNode identitySinkNode = + regionNodemap.computeIfAbsent( + exchangeContext + .getNodeDistribution(exchangeNode.getChild().getPlanNodeId()) + .getRegion(), + k -> new IdentitySinkNode(mppQueryContext.getQueryId().genPlanNodeId())); + identitySinkNode.addChild(exchangeNode.getChild()); + identitySinkNode.addDownStreamChannelLocation( + new DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString())); + exchangeNode.setChild(identitySinkNode); + exchangeNode.setIndexOfUpstreamSinkHandle(identitySinkNode.getCurrentLastIndex()); + } + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SubPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SubPlanGenerator.java index de160e61145..8f7027dc7bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SubPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/SubPlanGenerator.java @@ -15,7 +15,6 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute; import org.apache.iotdb.db.queryengine.common.QueryId; -import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment; import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; @@ -32,11 +31,10 @@ import java.util.Set; /** Split SubPlan according to ExchangeNode. */ public class SubPlanGenerator { - public SubPlan splitToSubPlan(LogicalQueryPlan logicalQueryPlan) { - QueryId queryId = logicalQueryPlan.getContext().getQueryId(); - SubPlan rootSubPlan = createSubPlan(logicalQueryPlan.getRootNode(), queryId); + public SubPlan splitToSubPlan(QueryId queryId, PlanNode rootPlanNode) { + SubPlan rootSubPlan = createSubPlan(rootPlanNode, queryId); Set<PlanNodeId> visitedSinkNode = new HashSet<>(); - splitToSubPlan(logicalQueryPlan.getRootNode(), rootSubPlan, visitedSinkNode, queryId); + splitToSubPlan(rootPlanNode, rootSubPlan, visitedSinkNode, queryId); return rootSubPlan; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java index 60b1b564898..5fe412ca3f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java @@ -17,8 +17,11 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.distribute; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.SimplePlanVisitor; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ExchangeNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; @@ -97,5 +100,23 @@ public class TableModelTypeProviderExtractor { node.getChild().accept(this, context); return null; } + + @Override + public Void visitMergeSort(MergeSortNode node, Void context) { + node.getChildren().forEach(c -> c.accept(this, context)); + return null; + } + + @Override + public Void visitExchange(ExchangeNode node, Void context) { + node.getChildren().forEach(c -> c.accept(this, context)); + return null; + } + + @Override + public Void visitIdentitySink(IdentitySinkNode node, Void context) { + node.getChildren().forEach(c -> c.accept(this, context)); + return null; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java index 68289d5a83d..87cbfe2edee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/FilterNode.java @@ -60,6 +60,10 @@ public class FilterNode extends SingleChildProcessNode { @Override protected void serializeAttributes(DataOutputStream stream) throws IOException {} + public static FilterNode deserialize(ByteBuffer byteBuffer) { + return null; + } + public Expression getPredicate() { return predicate; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java index d3399cb1e16..e73519c2c17 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java @@ -62,6 +62,10 @@ public class LimitNode extends SingleChildProcessNode { @Override protected void serializeAttributes(DataOutputStream stream) throws IOException {} + public static LimitNode deserialize(ByteBuffer byteBuffer) { + return null; + } + @Override public List<Symbol> getOutputSymbols() { return child.getOutputSymbols(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java index 68c1ffdc17e..4bce78f7c9e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java @@ -20,14 +20,18 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.node; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.tsfile.utils.ReadWriteIOUtils; + import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; public class MergeSortNode extends MultiChildProcessNode { @@ -57,10 +61,35 @@ public class MergeSortNode extends MultiChildProcessNode { } @Override - protected void serializeAttributes(ByteBuffer byteBuffer) {} + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_MERGESORT_NODE.serialize(byteBuffer); + orderingScheme.serialize(byteBuffer); + ReadWriteIOUtils.write(outputSymbols.size(), byteBuffer); + for (Symbol symbol : outputSymbols) { + Symbol.serialize(symbol, byteBuffer); + } + } @Override - protected void serializeAttributes(DataOutputStream stream) throws IOException {} + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_MERGESORT_NODE.serialize(stream); + orderingScheme.serialize(stream); + ReadWriteIOUtils.write(outputSymbols.size(), stream); + for (Symbol symbol : outputSymbols) { + Symbol.serialize(symbol, stream); + } + } + + public static MergeSortNode deserialize(ByteBuffer byteBuffer) { + OrderingScheme orderingScheme = OrderingScheme.deserialize(byteBuffer); + int size = ReadWriteIOUtils.readInt(byteBuffer); + List<Symbol> outputSymbols = new ArrayList<>(size); + while (size-- > 0) { + outputSymbols.add(Symbol.deserialize(byteBuffer)); + } + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new MergeSortNode(planNodeId, orderingScheme, outputSymbols); + } @Override public List<Symbol> getOutputSymbols() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java index feca446b9d0..a491f02d874 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java @@ -54,6 +54,10 @@ public class OffsetNode extends SingleChildProcessNode { @Override protected void serializeAttributes(DataOutputStream stream) throws IOException {} + public static OffsetNode deserialize(ByteBuffer byteBuffer) { + return null; + } + @Override public List<Symbol> getOutputSymbols() { return child.getOutputSymbols(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java index fb76b4590f2..6a707a30000 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OutputNode.java @@ -16,15 +16,18 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.node; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; public class OutputNode extends SingleChildProcessNode { @@ -42,6 +45,13 @@ public class OutputNode extends SingleChildProcessNode { this.outputs = ImmutableList.copyOf(outputs); } + public OutputNode(PlanNodeId id, List<String> columnNames, List<Symbol> outputs) { + super(id); + this.id = id; + this.columnNames = ImmutableList.copyOf(columnNames); + this.outputs = ImmutableList.copyOf(outputs); + } + @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { return visitor.visitOutput(this, context); @@ -58,10 +68,41 @@ public class OutputNode extends SingleChildProcessNode { } @Override - protected void serializeAttributes(ByteBuffer byteBuffer) {} + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_OUTPUT_NODE.serialize(byteBuffer); + ReadWriteIOUtils.write(columnNames.size(), byteBuffer); + columnNames.forEach(columnName -> ReadWriteIOUtils.write(columnName, byteBuffer)); + ReadWriteIOUtils.write(outputs.size(), byteBuffer); + outputs.forEach(symbol -> Symbol.serialize(symbol, byteBuffer)); + } @Override - protected void serializeAttributes(DataOutputStream stream) throws IOException {} + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_OUTPUT_NODE.serialize(stream); + ReadWriteIOUtils.write(columnNames.size(), stream); + for (String columnName : columnNames) { + ReadWriteIOUtils.write(columnName, stream); + } + ReadWriteIOUtils.write(outputs.size(), stream); + for (Symbol symbol : outputs) { + Symbol.serialize(symbol, stream); + } + } + + public static OutputNode deserialize(ByteBuffer byteBuffer) { + int size = ReadWriteIOUtils.readInt(byteBuffer); + List<String> columnNames = new ArrayList<>(size); + while (size-- > 0) { + columnNames.add(ReadWriteIOUtils.readString(byteBuffer)); + } + size = ReadWriteIOUtils.readInt(byteBuffer); + List<Symbol> outputs = new ArrayList<>(size); + while (size-- > 0) { + outputs.add(Symbol.deserialize(byteBuffer)); + } + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + return new OutputNode(planNodeId, columnNames, outputs); + } public List<String> getColumnNames() { return this.columnNames; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java index 15167478367..5465979bd56 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ProjectNode.java @@ -59,6 +59,10 @@ public class ProjectNode extends SingleChildProcessNode { @Override protected void serializeAttributes(DataOutputStream stream) throws IOException {} + public static ProjectNode deserialize(ByteBuffer byteBuffer) { + return null; + } + @Override public List<Symbol> getOutputSymbols() { return assignments.getOutputs(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java index 474101ddd72..c2dd7754d01 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java @@ -57,6 +57,10 @@ public class SortNode extends SingleChildProcessNode { @Override protected void serializeAttributes(DataOutputStream stream) throws IOException {} + public static SortNode deserialize(ByteBuffer byteBuffer) { + return null; + } + @Override public List<Symbol> getOutputSymbols() { return child.getOutputSymbols(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java index cd29d50eb68..3d560e9efce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableScanNode.java @@ -17,6 +17,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.node; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SourceNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; @@ -26,12 +27,15 @@ import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.relational.sql.tree.Expression; import com.google.common.collect.ImmutableList; +import org.apache.tsfile.utils.ReadWriteIOUtils; import javax.annotation.Nullable; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -64,6 +68,8 @@ public class TableScanNode extends SourceNode { // The id of DataRegion where the node will run private TRegionReplicaSet regionReplicaSet; + private List<TRegionReplicaSet> regionReplicaSetList; + public TableScanNode( PlanNodeId id, String qualifiedTableName, @@ -75,6 +81,25 @@ public class TableScanNode extends SourceNode { this.assignments = assignments; } + public TableScanNode( + PlanNodeId id, + String qualifiedTableName, + List<Symbol> outputSymbols, + Map<Symbol, ColumnSchema> assignments, + List<DeviceEntry> deviceEntries, + Map<Symbol, Integer> idAndAttributeIndexMap, + Ordering scanOrder, + Expression pushDownPredicate) { + super(id); + this.qualifiedTableName = qualifiedTableName; + this.outputSymbols = outputSymbols; + this.assignments = assignments; + this.deviceEntries = deviceEntries; + this.idAndAttributeIndexMap = idAndAttributeIndexMap; + this.scanOrder = scanOrder; + this.pushDownPredicate = pushDownPredicate; + } + @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { return visitor.visitTableScan(this, context); @@ -89,8 +114,16 @@ public class TableScanNode extends SourceNode { public void addChild(PlanNode child) {} @Override - public PlanNode clone() { - return null; + public TableScanNode clone() { + return new TableScanNode( + getPlanNodeId(), + qualifiedTableName, + outputSymbols, + assignments, + deviceEntries, + idAndAttributeIndexMap, + scanOrder, + pushDownPredicate); } @Override @@ -104,10 +137,105 @@ public class TableScanNode extends SourceNode { } @Override - protected void serializeAttributes(ByteBuffer byteBuffer) {} + protected void serializeAttributes(ByteBuffer byteBuffer) { + PlanNodeType.TABLE_SCAN_NODE.serialize(byteBuffer); + ReadWriteIOUtils.write(qualifiedTableName, byteBuffer); + + ReadWriteIOUtils.write(outputSymbols.size(), byteBuffer); + outputSymbols.forEach(symbol -> ReadWriteIOUtils.write(symbol.getName(), byteBuffer)); + + ReadWriteIOUtils.write(assignments.size(), byteBuffer); + for (Map.Entry<Symbol, ColumnSchema> entry : assignments.entrySet()) { + Symbol.serialize(entry.getKey(), byteBuffer); + ColumnSchema.serialize(entry.getValue(), byteBuffer); + } + + ReadWriteIOUtils.write(deviceEntries.size(), byteBuffer); + for (DeviceEntry entry : deviceEntries) { + entry.serialize(byteBuffer); + } + + ReadWriteIOUtils.write(idAndAttributeIndexMap.size(), byteBuffer); + for (Map.Entry<Symbol, Integer> entry : idAndAttributeIndexMap.entrySet()) { + Symbol.serialize(entry.getKey(), byteBuffer); + ReadWriteIOUtils.write(entry.getValue(), byteBuffer); + } + + ReadWriteIOUtils.write(scanOrder.ordinal(), byteBuffer); + } @Override - protected void serializeAttributes(DataOutputStream stream) throws IOException {} + protected void serializeAttributes(DataOutputStream stream) throws IOException { + PlanNodeType.TABLE_SCAN_NODE.serialize(stream); + ReadWriteIOUtils.write(qualifiedTableName, stream); + + ReadWriteIOUtils.write(outputSymbols.size(), stream); + for (Symbol symbol : outputSymbols) { + ReadWriteIOUtils.write(symbol.getName(), stream); + } + + ReadWriteIOUtils.write(assignments.size(), stream); + for (Map.Entry<Symbol, ColumnSchema> entry : assignments.entrySet()) { + Symbol.serialize(entry.getKey(), stream); + ColumnSchema.serialize(entry.getValue(), stream); + } + + ReadWriteIOUtils.write(deviceEntries.size(), stream); + for (DeviceEntry entry : deviceEntries) { + entry.serialize(stream); + } + + ReadWriteIOUtils.write(idAndAttributeIndexMap.size(), stream); + for (Map.Entry<Symbol, Integer> entry : idAndAttributeIndexMap.entrySet()) { + Symbol.serialize(entry.getKey(), stream); + ReadWriteIOUtils.write(entry.getValue(), stream); + } + + ReadWriteIOUtils.write(scanOrder.ordinal(), stream); + } + + public static TableScanNode deserialize(ByteBuffer byteBuffer) { + String qualifiedTableName = ReadWriteIOUtils.readString(byteBuffer); + int size = ReadWriteIOUtils.readInt(byteBuffer); + + List<Symbol> outputSymbols = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + outputSymbols.add(Symbol.deserialize(byteBuffer)); + } + + size = ReadWriteIOUtils.readInt(byteBuffer); + Map<Symbol, ColumnSchema> assignments = new HashMap<>(size); + for (int i = 0; i < size; i++) { + assignments.put(Symbol.deserialize(byteBuffer), ColumnSchema.deserialize(byteBuffer)); + } + + size = ReadWriteIOUtils.readInt(byteBuffer); + List<DeviceEntry> deviceEntries = new ArrayList<>(size); + while (size-- > 0) { + deviceEntries.add(DeviceEntry.deserialize(byteBuffer)); + } + + size = ReadWriteIOUtils.readInt(byteBuffer); + Map<Symbol, Integer> idAndAttributeIndexMap = new HashMap<>(size); + while (size-- > 0) { + idAndAttributeIndexMap.put( + Symbol.deserialize(byteBuffer), ReadWriteIOUtils.readInt(byteBuffer)); + } + + Ordering scanOrder = Ordering.values()[ReadWriteIOUtils.readInt(byteBuffer)]; + + PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); + + return new TableScanNode( + planNodeId, + qualifiedTableName, + outputSymbols, + assignments, + deviceEntries, + idAndAttributeIndexMap, + scanOrder, + null); + } @Override public List<Symbol> getOutputSymbols() { @@ -186,6 +314,14 @@ public class TableScanNode extends SourceNode { return this.regionReplicaSet; } + public List<TRegionReplicaSet> getRegionReplicaSetList() { + return regionReplicaSetList; + } + + public void setRegionReplicaSetList(List<TRegionReplicaSet> regionReplicaSetList) { + this.regionReplicaSetList = regionReplicaSetList; + } + public void setRegionReplicaSet(TRegionReplicaSet regionReplicaSet) { this.regionReplicaSet = regionReplicaSet; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java index b554e852933..6a78ac84e95 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java @@ -75,6 +75,10 @@ public class TopKNode extends MultiChildProcessNode { @Override protected void serializeAttributes(DataOutputStream stream) throws IOException {} + public static TopKNode deserialize(ByteBuffer byteBuffer) { + return null; + } + @Override public List<Symbol> getOutputSymbols() { return outputSymbols; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java index 67b11eb95fd..0e1ccefde5d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/IndexScan.java @@ -17,6 +17,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.partition.DataPartitionQueryParam; @@ -135,20 +136,19 @@ public class IndexScan implements RelationalPlanOptimizer { context.getAnalysis().setFinishQueryAfterAnalyze(); } else { // TODO add the real impl - TRegionReplicaSet regionReplicaSet = - dataPartition - .getDataPartitionMap() - .values() - .iterator() - .next() - .values() - .iterator() - .next() - .values() - .iterator() - .next() - .get(0); - node.setRegionReplicaSet(regionReplicaSet); + Set<TRegionReplicaSet> regionReplicaSet = new HashSet<>(); + for (Map.Entry< + String, Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>>> + e1 : dataPartition.getDataPartitionMap().entrySet()) { + for (Map.Entry<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> + e2 : e1.getValue().entrySet()) { + for (Map.Entry<TTimePartitionSlot, List<TRegionReplicaSet>> e3 : + e2.getValue().entrySet()) { + regionReplicaSet.addAll(e3.getValue()); + } + } + } + node.setRegionReplicaSetList(new ArrayList<>(regionReplicaSet)); } return node; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java index c516cc17f19..b7d6ca02dae 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/column/TsTableColumnCategory.java @@ -34,7 +34,7 @@ public enum TsTableColumnCategory { private final byte category; - private TsTableColumnCategory(byte category) { + TsTableColumnCategory(byte category) { this.category = category; } @@ -46,6 +46,10 @@ public enum TsTableColumnCategory { ReadWriteIOUtils.write(category, stream); } + public void serialize(ByteBuffer byteBuffer) { + ReadWriteIOUtils.write(category, byteBuffer); + } + public static TsTableColumnCategory deserialize(InputStream stream) throws IOException { byte category = (byte) stream.read(); return deserialize(category);
