This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch ty/TableModelGrammar in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9e3e9d6cda88e2ac7eedcd071d4e9eeb0e502544 Author: JackieTien97 <[email protected]> AuthorDate: Fri Apr 26 18:15:45 2024 +0800 support SortNode, MergeSortNode and TopKNode --- .../process/join/merge/MergeSortComparator.java | 20 +++ .../plan/planner/TableOperatorGenerator.java | 162 +++++++++++++++++++-- .../plan/relational/planner/node/LimitNode.java | 8 +- .../relational/planner/node/MergeSortNode.java | 29 ++-- .../plan/relational/planner/node/OffsetNode.java | 8 +- .../plan/relational/planner/node/SortNode.java | 12 +- .../plan/relational/planner/node/TopKNode.java | 43 +++++- 7 files changed, 251 insertions(+), 31 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/MergeSortComparator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/MergeSortComparator.java index c299a368e73..db6f36206f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/MergeSortComparator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/join/merge/MergeSortComparator.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.process.join.merge; +import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder; import org.apache.iotdb.db.queryengine.plan.statement.component.NullOrdering; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.queryengine.plan.statement.component.SortItem; @@ -66,6 +67,25 @@ public class MergeSortComparator { return list.size() == 1 ? list.get(0) : new ComparatorChain<>(list); } + public static Comparator<SortKey> getComparatorForTable( + List<SortOrder> sortOrderList, List<Integer> indexList, List<TSDataType> dataTypeList) { + + // use code-gen compile this comparator + List<Comparator<SortKey>> list = new ArrayList<>(indexList.size()); + for (int i = 0; i < indexList.size(); i++) { + int index = indexList.get(i); + if (index == -2) { + continue; + } + TSDataType dataType = dataTypeList.get(i); + SortOrder sortOrder = sortOrderList.get(i); + list.add( + genSingleComparator(sortOrder.isAscending(), index, dataType, sortOrder.isNullsFirst())); + } + + return list.size() == 1 ? list.get(0) : new ComparatorChain<>(list); + } + public static Comparator<SortKey> getComparator(TSDataType dataType, int index, boolean asc) { Comparator<SortKey> comparator; switch (dataType) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 767325bb302..09e7b9cae42 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.planner; import org.apache.iotdb.commons.path.AlignedPath; +import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.execution.driver.DataDriverContext; import org.apache.iotdb.db.queryengine.execution.exchange.MPPDataExchangeManager; @@ -33,6 +34,8 @@ import org.apache.iotdb.db.queryengine.execution.operator.process.FilterAndProje import org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.MergeSortOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.SortOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.TopKOperator; import org.apache.iotdb.db.queryengine.execution.operator.sink.IdentitySinkOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator; @@ -47,6 +50,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; +import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; @@ -69,6 +73,7 @@ import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.write.schema.IMeasurementSchema; import org.apache.tsfile.write.schema.MeasurementSchema; +import java.io.File; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -81,6 +86,7 @@ import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparatorForTable; import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; import static org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils.convertPredicateToFilter; import static org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand.TIMESTAMP_EXPRESSION_STRING; @@ -477,18 +483,6 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution return new OffsetOperator(operatorContext, node.getCount(), child); } - @Override - public Operator visitMergeSort(MergeSortNode node, LocalExecutionPlanContext context) { - OperatorContext operatorContext = context.getDriverContext().addOperatorContext(context.getNextOperatorId(), node.getPlanNodeId(), - MergeSortOperator.class.getSimpleName()); - List<Operator> children = new ArrayList<>(node.getChildren().size()); - for (PlanNode child : node.getChildren()) { - children.add(this.process(child, context)); - } - - return super.visitMergeSort(node, context); - } - @Override public Operator visitOutput(OutputNode node, LocalExecutionPlanContext context) { TypeProvider typeProvider = context.getTypeProvider(); @@ -522,13 +516,153 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution context); } + @Override + public Operator visitMergeSort(MergeSortNode node, LocalExecutionPlanContext context) { + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + MergeSortOperator.class.getSimpleName()); + List<Operator> children = new ArrayList<>(node.getChildren().size()); + for (PlanNode child : node.getChildren()) { + children.add(this.process(child, context)); + } + List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider()); + int sortItemsCount = node.getOrderingScheme().getOrderBy().size(); + + List<Integer> sortItemIndexList = new ArrayList<>(sortItemsCount); + List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemsCount); + genSortInformation( + node.getOutputSymbols(), + node.getOrderingScheme(), + sortItemIndexList, + sortItemDataTypeList, + context.getTypeProvider()); + + return new MergeSortOperator( + operatorContext, + children, + dataTypes, + getComparatorForTable( + node.getOrderingScheme().getOrderingList(), sortItemIndexList, sortItemDataTypeList)); + } + @Override public Operator visitSort(SortNode node, LocalExecutionPlanContext context) { - return super.visitSort(node, context); + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + SortOperator.class.getSimpleName()); + List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider()); + int sortItemsCount = node.getOrderingScheme().getOrderBy().size(); + + List<Integer> sortItemIndexList = new ArrayList<>(sortItemsCount); + List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemsCount); + genSortInformation( + node.getOutputSymbols(), + node.getOrderingScheme(), + sortItemIndexList, + sortItemDataTypeList, + context.getTypeProvider()); + + String filePrefix = + IoTDBDescriptor.getInstance().getConfig().getSortTmpDir() + + File.separator + + operatorContext.getDriverContext().getFragmentInstanceContext().getId().getFullId() + + File.separator + + operatorContext.getDriverContext().getPipelineId() + + File.separator; + + context.getDriverContext().setHaveTmpFile(true); + context.getDriverContext().getFragmentInstanceContext().setMayHaveTmpFile(true); + + Operator child = node.getChild().accept(this, context); + + return new SortOperator( + operatorContext, + child, + dataTypes, + filePrefix, + getComparatorForTable( + node.getOrderingScheme().getOrderingList(), sortItemIndexList, sortItemDataTypeList)); } @Override public Operator visitTopK(TopKNode node, LocalExecutionPlanContext context) { - return super.visitTopK(node, context); + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + TopKOperator.class.getSimpleName()); + List<Operator> children = new ArrayList<>(node.getChildren().size()); + for (PlanNode child : node.getChildren()) { + children.add(this.process(child, context)); + } + List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider()); + int sortItemsCount = node.getOrderingScheme().getOrderBy().size(); + + List<Integer> sortItemIndexList = new ArrayList<>(sortItemsCount); + List<TSDataType> sortItemDataTypeList = new ArrayList<>(sortItemsCount); + genSortInformation( + node.getOutputSymbols(), + node.getOrderingScheme(), + sortItemIndexList, + sortItemDataTypeList, + context.getTypeProvider()); + return new TopKOperator( + operatorContext, + children, + dataTypes, + getComparatorForTable( + node.getOrderingScheme().getOrderingList(), sortItemIndexList, sortItemDataTypeList), + node.getCount(), + node.isChildrenDataInOrder()); + } + + private List<TSDataType> getOutputColumnTypes(PlanNode node, TypeProvider typeProvider) { + return node.getOutputSymbols().stream() + .filter(s -> !TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(s.getName())) + .map(s -> getTSDataType(typeProvider.getTableModelType(s))) + .collect(Collectors.toList()); + } + + private void genSortInformation( + List<Symbol> outputSymbols, + OrderingScheme orderingScheme, + List<Integer> sortItemIndexList, + List<TSDataType> sortItemDataTypeList, + TypeProvider typeProvider) { + Map<Symbol, Integer> columnIndex = new HashMap<>(); + int index = 0; + for (Symbol symbol : outputSymbols) { + if (TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(symbol.getName())) { + continue; + } + columnIndex.put(symbol, index++); + } + orderingScheme + .getOrderBy() + .forEach( + sortItem -> { + if (TIMESTAMP_EXPRESSION_STRING.equalsIgnoreCase(sortItem.getName())) { + sortItemIndexList.add(-1); + sortItemDataTypeList.add(TSDataType.INT64); + } else { + Integer i = columnIndex.get(sortItem); + if (i == null) { + throw new IllegalStateException( + "Sort Item %s is not included in children's output columns"); + } + sortItemIndexList.add(i); + sortItemDataTypeList.add(getTSDataType(typeProvider.getTableModelType(sortItem))); + } + }); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java index e05545a8f97..d3399cb1e16 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/LimitNode.java @@ -16,6 +16,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.node; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; @@ -45,9 +46,14 @@ public class LimitNode extends SingleChildProcessNode { return new LimitNode(id, child, count, tiesResolvingScheme); } + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitLimit(this, context); + } + @Override public List<String> getOutputColumnNames() { - return null; + throw new UnsupportedOperationException(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java index 680d3812bac..68c1ffdc17e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/MergeSortNode.java @@ -20,8 +20,9 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.node; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; -import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.OrderByParameter; +import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import java.io.DataOutputStream; @@ -30,25 +31,29 @@ import java.nio.ByteBuffer; import java.util.List; public class MergeSortNode extends MultiChildProcessNode { - private final OrderByParameter mergeOrderParameter; + private final OrderingScheme orderingScheme; - private final List<String> outputColumns; + private final List<Symbol> outputSymbols; - public MergeSortNode( - PlanNodeId id, OrderByParameter mergeOrderParameter, List<String> outputColumns) { + public MergeSortNode(PlanNodeId id, OrderingScheme orderingScheme, List<Symbol> outputSymbols) { super(id); - this.mergeOrderParameter = mergeOrderParameter; - this.outputColumns = outputColumns; + this.orderingScheme = orderingScheme; + this.outputSymbols = outputSymbols; } @Override public PlanNode clone() { - return null; + return new MergeSortNode(getPlanNodeId(), orderingScheme, outputSymbols); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitMergeSort(this, context); } @Override public List<String> getOutputColumnNames() { - return children.get(0).getOutputColumnNames(); + throw new UnsupportedOperationException(); } @Override @@ -59,6 +64,10 @@ public class MergeSortNode extends MultiChildProcessNode { @Override public List<Symbol> getOutputSymbols() { - return super.getOutputSymbols(); + return outputSymbols; + } + + public OrderingScheme getOrderingScheme() { + return orderingScheme; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java index 806ff99d317..feca446b9d0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/OffsetNode.java @@ -16,6 +16,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.node; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; @@ -39,7 +40,12 @@ public class OffsetNode extends SingleChildProcessNode { @Override public List<String> getOutputColumnNames() { - return null; + throw new UnsupportedOperationException(); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitOffset(this, context); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java index d51b131683f..474101ddd72 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortNode.java @@ -16,6 +16,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.node; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; @@ -42,7 +43,12 @@ public class SortNode extends SingleChildProcessNode { @Override public List<String> getOutputColumnNames() { - return null; + throw new UnsupportedOperationException(); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitSort(this, context); } @Override @@ -55,4 +61,8 @@ public class SortNode extends SingleChildProcessNode { public List<Symbol> getOutputSymbols() { return child.getOutputSymbols(); } + + public OrderingScheme getOrderingScheme() { + return orderingScheme; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java index d356d25f1d0..b554e852933 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TopKNode.java @@ -21,8 +21,10 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.node; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import java.io.DataOutputStream; import java.io.IOException; @@ -33,22 +35,38 @@ public class TopKNode extends MultiChildProcessNode { private final OrderingScheme orderingScheme; - private final long count; + private final int count; - public TopKNode(PlanNodeId id, OrderingScheme scheme, long count) { + private final List<Symbol> outputSymbols; + + private final boolean childrenDataInOrder; + + public TopKNode( + PlanNodeId id, + OrderingScheme scheme, + int count, + List<Symbol> outputSymbols, + boolean childrenDataInOrder) { super(id); this.orderingScheme = scheme; this.count = count; + this.outputSymbols = outputSymbols; + this.childrenDataInOrder = childrenDataInOrder; } @Override public PlanNode clone() { - return null; + return new TopKNode(getPlanNodeId(), orderingScheme, count, outputSymbols, childrenDataInOrder); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitTopK(this, context); } @Override public List<String> getOutputColumnNames() { - return null; + throw new UnsupportedOperationException(); } @Override @@ -56,4 +74,21 @@ public class TopKNode extends MultiChildProcessNode { @Override protected void serializeAttributes(DataOutputStream stream) throws IOException {} + + @Override + public List<Symbol> getOutputSymbols() { + return outputSymbols; + } + + public OrderingScheme getOrderingScheme() { + return orderingScheme; + } + + public int getCount() { + return count; + } + + public boolean isChildrenDataInOrder() { + return childrenDataInOrder; + } }
