This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch udtf-optimize in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bc073a0d63217b2515df97d8de22693cfdce2161 Author: Chen YZ <[email protected]> AuthorDate: Sun Feb 23 17:50:07 2025 +0800 save --- a | 0 .../org/apache/iotdb/udf/table/RepeatExample.java | 5 ++ .../iotdb/confignode/conf/ConfigNodeConfig.java | 2 +- .../plan/planner/plan/node/PlanGraphPrinter.java | 4 +- .../plan/planner/plan/node/PlanNodeType.java | 6 +- .../plan/planner/plan/node/PlanVisitor.java | 4 +- .../plan/relational/planner/QueryPlanner.java | 4 +- .../distribute/TableDistributedPlanGenerator.java | 17 +++-- .../TableModelTypeProviderExtractor.java | 4 +- .../rule/ImplementTableFunctionSource.java | 6 +- .../{AuxSortNode.java => SortBasedGroupNode.java} | 41 +++++----- .../planner/node/TableFunctionProcessorNode.java | 16 ++++ .../optimizations/LogicalOptimizeFactory.java | 2 +- ...lelizeAuxSort.java => ParallelizeGrouping.java} | 88 ++++++++++++++-------- .../PushLimitOffsetIntoTableScan.java | 4 +- .../TransformAggregationToStreamable.java | 19 +++++ .../optimizations/TransformSortToStreamSort.java | 4 +- .../optimizations/UnaliasSymbolReferences.java | 8 +- .../relational/TableBuiltinTableFunction.java | 4 + .../udf/builtin/relational/tvf}/RepeatExample.java | 7 +- 20 files changed, 166 insertions(+), 79 deletions(-) diff --git a/a b/a new file mode 100644 index 00000000000..e69de29bb2d diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java index 8b260c59acb..7d6bab4ef4d 100644 --- a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java +++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java @@ -111,6 +111,11 @@ public class RepeatExample implements TableFunction { @Override public void finish( List<ColumnBuilder> columnBuilders, ColumnBuilder passThroughIndexBuilder) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } for (int i = 1; i < n; i++) { for (int j = 0; j < recordIndex; j++) { columnBuilders.get(0).writeInt(i); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 183f41294e2..61077dde18c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -104,7 +104,7 @@ public class ConfigNodeConfig { * DataRegionGroups for each Database. When set data_region_group_extension_policy=AUTO, this * parameter is the default minimal number of DataRegionGroups for each Database. */ - private int defaultDataRegionGroupNumPerDatabase = 2; + private int defaultDataRegionGroupNumPerDatabase = 3; /** * The maximum number of DataRegions expected to be managed by each DataNode. Set to 0 means that diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index 0ad7ab155ec..03c1addab36 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -67,7 +67,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.DeviceViewInt import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.IntoPathDescriptor; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; @@ -76,6 +75,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNo import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode; @@ -928,7 +928,7 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter } @Override - public List<String> visitAuxSort(AuxSortNode node, GraphContext context) { + public List<String> visitSortBasedGroup(SortBasedGroupNode node, GraphContext context) { List<String> boxValue = new ArrayList<>(); boxValue.add(String.format("AuxSort-%s", node.getPlanNodeId().getId())); boxValue.add(String.format("EnableParalleled: %s", node.isEnableParalleled())); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 04308a71323..085bac9e2ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -117,7 +117,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; @@ -125,6 +124,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNo import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeAlignedDeviceViewScanNode; @@ -297,7 +297,7 @@ public enum PlanNodeType { MARK_DISTINCT_NODE((short) 1026), TABLE_FUNCTION_NODE((short) 1027), TABLE_FUNCTION_PROCESSOR_NODE((short) 1028), - TABLE_AUX_SORT_NODE((short) 1029), + TABLE_SORT_BASED_GROUP_NODE((short) 1029), RELATIONAL_INSERT_TABLET((short) 2000), RELATIONAL_INSERT_ROW((short) 2001), @@ -672,7 +672,7 @@ public enum PlanNodeType { case 1028: return TableFunctionProcessorNode.deserialize(buffer); case 1029: - return AuxSortNode.deserialize(buffer); + return SortBasedGroupNode.deserialize(buffer); case 2000: return RelationalInsertTabletNode.deserialize(buffer); case 2001: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index da861abf15f..e3ccdc4d42a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -129,6 +129,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNo import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MarkDistinctNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; @@ -748,8 +749,7 @@ public abstract class PlanVisitor<R, C> { return visitSingleChildProcess(node, context); } - public R visitAuxSort( - org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode node, C context) { + public R visitSortBasedGroup(SortBasedGroupNode node, C context) { return visitSingleChildProcess(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java index 26e57030fe1..de858008a45 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java @@ -31,7 +31,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GapFillStartAndEndTimeExtractVisitor; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode.Aggregation; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; @@ -39,6 +38,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNo import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast; @@ -822,7 +822,7 @@ public class QueryPlanner { OrderingScheme orderingScheme = new OrderingScheme(orderBySymbols.build(), orderings); analysis.setSortNode(true); return subPlan.withNewRoot( - new AuxSortNode( + new SortBasedGroupNode( queryIdAllocator.genPlanNodeId(), subPlan.getRoot(), orderingScheme, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index 501a5b9c1a1..278824a9466 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -43,7 +43,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; @@ -60,6 +59,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; @@ -190,8 +190,8 @@ public class TableDistributedPlanGenerator context.clearExpectedOrderingScheme(); } boolean parallel = - (node.getChild() instanceof AuxSortNode) - && ((AuxSortNode) node.getChild()).isEnableParalleled(); + (node.getChild() instanceof SortBasedGroupNode) + && ((SortBasedGroupNode) node.getChild()).isEnableParalleled(); List<PlanNode> childrenNodes = node.getChild().accept(this, context); OrderingScheme childOrdering = nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId()); if (childOrdering != null) { @@ -315,7 +315,7 @@ public class TableDistributedPlanGenerator } @Override - public List<PlanNode> visitAuxSort(AuxSortNode node, PlanContext context) { + public List<PlanNode> visitSortBasedGroup(SortBasedGroupNode node, PlanContext context) { boolean pushDown = context.pushDownAuxSort; try { context.setPushDownAuxSort(node.isEnableParalleled()); @@ -973,14 +973,15 @@ public class TableDistributedPlanGenerator if (node.getChildren().isEmpty()) { return Collections.singletonList(node); } - boolean parallel = - (node.getChild() instanceof AuxSortNode) - && ((AuxSortNode) node.getChild()).isEnableParalleled(); + boolean canSplitPushDown = + node.isRowSemantic() + || (node.getChild() instanceof SortBasedGroupNode) + && ((SortBasedGroupNode) node.getChild()).isEnableParalleled(); List<PlanNode> childrenNodes = node.getChild().accept(this, context); if (childrenNodes.size() == 1) { node.setChild(childrenNodes.get(0)); return Collections.singletonList(node); - } else if (!parallel) { + } else if (!canSplitPushDown) { CollectNode collectNode = new CollectNode(queryId.genPlanNodeId(), node.getChildren().get(0).getOutputSymbols()); childrenNodes.forEach(collectNode::addChild); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java index d2596b45593..c12c9341986 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java @@ -27,7 +27,6 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkN import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ExchangeNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FillNode; @@ -39,6 +38,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNod import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; @@ -205,7 +205,7 @@ public class TableModelTypeProviderExtractor { } @Override - public Void visitAuxSort(AuxSortNode node, Void context) { + public Void visitSortBasedGroup(SortBasedGroupNode node, Void context) { node.getChild().accept(this, context); return null; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java index b189d2d5e97..ff6d7514c68 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java @@ -24,8 +24,8 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.Rule; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import org.apache.iotdb.db.queryengine.plan.relational.utils.matching.Captures; @@ -112,6 +112,7 @@ public class ImplementTableFunctionSource implements Rule<TableFunctionNode> { Optional.empty(), ImmutableList.of(), Optional.empty(), + false, node.getArguments())); } else if (node.getChildren().size() == 1) { // Single source does not require pre-processing. @@ -147,7 +148,7 @@ public class ImplementTableFunctionSource implements Rule<TableFunctionNode> { } }); child.set( - new AuxSortNode( + new SortBasedGroupNode( context.getIdAllocator().genPlanNodeId(), child.get(), new OrderingScheme(sortSymbols, sortOrderings), @@ -163,6 +164,7 @@ public class ImplementTableFunctionSource implements Rule<TableFunctionNode> { Optional.ofNullable(sourceProperties.getPassThroughSpecification()), sourceProperties.getRequiredColumns(), sourceProperties.getDataOrganizationSpecification(), + sourceProperties.isRowSemantics(), node.getArguments())); } else { // TODO(UDF): we dont support multiple source now. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AuxSortNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortBasedGroupNode.java similarity index 74% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AuxSortNode.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortBasedGroupNode.java index 3c72a0ee8ba..77bc98e454c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/AuxSortNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SortBasedGroupNode.java @@ -34,26 +34,33 @@ import java.nio.ByteBuffer; import java.util.List; /** - * Auxiliary node for sorting. Not generated by user queries, it is used by special process such as - * table function. + * SortBasedGroupNode is a auxiliary node that is used to group data. Currently, it is implemented + * based on SortNode. It will only be generated some special node that required grouping source, + * such as FillNode and TableFunctionNode. + * + * <p>SortBasedGroupNode's ordering schema consists of two parts: PartitionKey and SortKey. It + * guarantees to return data grouped by PartitionKey and sorted by SortKey. For example, PARTITION + * BY device_id ORDER BY time will return data grouped by device_id, and in each group, data will be + * sorted by time. */ -public class AuxSortNode extends SortNode { - - private boolean enableParalleled = false; +public class SortBasedGroupNode extends SortNode { /** - * AuxSort#orderingScheme may include two parts: partition key and sort key. For example, - * PARTITION BY device_id, color ORDER BY time will construct an AuxSortNode with - * OrderingScheme[device_id, color, time] and partitionColumnCount = 2. + * orderingScheme may include two parts: PartitionKey and SortKey. It marks the number of + * PartitionKey. */ private int partitionKeyCount; - public AuxSortNode(PlanNodeId id, PlanNode child, OrderingScheme scheme, int partitionKeyCount) { + /** SortBasedGroupNode can be pushed down for paralleled execution. */ + private boolean enableParalleled = false; + + public SortBasedGroupNode( + PlanNodeId id, PlanNode child, OrderingScheme scheme, int partitionKeyCount) { super(id, child, scheme, false, false); this.partitionKeyCount = partitionKeyCount; } - public AuxSortNode( + public SortBasedGroupNode( PlanNodeId id, PlanNode child, OrderingScheme scheme, @@ -68,7 +75,7 @@ public class AuxSortNode extends SortNode { @Override public PlanNode replaceChildren(List<PlanNode> newChildren) { - return new AuxSortNode( + return new SortBasedGroupNode( id, Iterables.getOnlyElement(newChildren), orderingScheme, @@ -92,12 +99,12 @@ public class AuxSortNode extends SortNode { @Override public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitAuxSort(this, context); + return visitor.visitSortBasedGroup(this, context); } @Override public PlanNode clone() { - return new AuxSortNode( + return new SortBasedGroupNode( id, null, orderingScheme, @@ -109,7 +116,7 @@ public class AuxSortNode extends SortNode { @Override protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.TABLE_AUX_SORT_NODE.serialize(byteBuffer); + PlanNodeType.TABLE_SORT_BASED_GROUP_NODE.serialize(byteBuffer); orderingScheme.serialize(byteBuffer); ReadWriteIOUtils.write(enableParalleled, byteBuffer); ReadWriteIOUtils.write(partitionKeyCount, byteBuffer); @@ -117,18 +124,18 @@ public class AuxSortNode extends SortNode { @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.TABLE_AUX_SORT_NODE.serialize(stream); + PlanNodeType.TABLE_SORT_BASED_GROUP_NODE.serialize(stream); orderingScheme.serialize(stream); ReadWriteIOUtils.write(enableParalleled, stream); ReadWriteIOUtils.write(partitionKeyCount, stream); } - public static AuxSortNode deserialize(ByteBuffer byteBuffer) { + public static SortBasedGroupNode deserialize(ByteBuffer byteBuffer) { OrderingScheme orderingScheme = OrderingScheme.deserialize(byteBuffer); boolean enableParalleled = ReadWriteIOUtils.readBoolean(byteBuffer); int partitionColumnCount = ReadWriteIOUtils.readInt(byteBuffer); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); - return new AuxSortNode( + return new SortBasedGroupNode( planNodeId, null, orderingScheme, false, false, enableParalleled, partitionColumnCount); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java index 6479678f260..b3b8ac1d11c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java @@ -65,6 +65,8 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { // partitioning and ordering combined from sources private final Optional<DataOrganizationSpecification> dataOrganizationSpecification; + private final boolean rowSemantic; + private final Map<String, Argument> arguments; public TableFunctionProcessorNode( @@ -76,6 +78,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { Optional<TableFunctionNode.PassThroughSpecification> passThroughSpecification, List<Symbol> requiredSymbols, Optional<DataOrganizationSpecification> dataOrganizationSpecification, + boolean rowSemantic, Map<String, Argument> arguments) { super(id, source.orElse(null)); this.name = requireNonNull(name, "name is null"); @@ -85,6 +88,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { this.requiredSymbols = ImmutableList.copyOf(requiredSymbols); this.dataOrganizationSpecification = requireNonNull(dataOrganizationSpecification, "specification is null"); + this.rowSemantic = rowSemantic; this.arguments = ImmutableMap.copyOf(arguments); } @@ -96,6 +100,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { Optional<TableFunctionNode.PassThroughSpecification> passThroughSpecification, List<Symbol> requiredSymbols, Optional<DataOrganizationSpecification> dataOrganizationSpecification, + boolean rowSemantic, Map<String, Argument> arguments) { super(id); this.name = requireNonNull(name, "name is null"); @@ -105,6 +110,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { this.requiredSymbols = ImmutableList.copyOf(requiredSymbols); this.dataOrganizationSpecification = requireNonNull(dataOrganizationSpecification, "specification is null"); + this.rowSemantic = rowSemantic; this.arguments = ImmutableMap.copyOf(arguments); } @@ -116,6 +122,10 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { return properOutputs; } + public boolean isRowSemantic() { + return rowSemantic; + } + public boolean isPruneWhenEmpty() { return pruneWhenEmpty; } @@ -146,6 +156,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { passThroughSpecification, requiredSymbols, dataOrganizationSpecification, + rowSemantic, arguments); } @@ -195,6 +206,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { if (dataOrganizationSpecification.isPresent()) { dataOrganizationSpecification.get().serialize(byteBuffer); } + ReadWriteIOUtils.write(rowSemantic, byteBuffer); ReadWriteIOUtils.write(arguments.size(), byteBuffer); arguments.forEach( (key, value) -> { @@ -224,6 +236,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { if (dataOrganizationSpecification.isPresent()) { dataOrganizationSpecification.get().serialize(stream); } + ReadWriteIOUtils.write(rowSemantic, stream); ReadWriteIOUtils.write(arguments.size(), stream); for (Map.Entry<String, Argument> entry : arguments.entrySet()) { ReadWriteIOUtils.write(entry.getKey(), stream); @@ -254,6 +267,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { hasDataOrganizationSpecification ? Optional.of(DataOrganizationSpecification.deserialize(byteBuffer)) : Optional.empty(); + boolean rowSemantic = ReadWriteIOUtils.readBoolean(byteBuffer); size = ReadWriteIOUtils.readInt(byteBuffer); Map<String, Argument> arguments = new HashMap<>(size); while (size-- > 0) { @@ -271,6 +285,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { passThroughSpecification, requiredSymbols, dataOrganizationSpecification, + rowSemantic, arguments); } @@ -287,6 +302,7 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { passThroughSpecification, requiredSymbols, dataOrganizationSpecification, + rowSemantic, arguments); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java index ae3dff9d838..adff9b3d119 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/LogicalOptimizeFactory.java @@ -261,7 +261,7 @@ public class LogicalOptimizeFactory { plannerContext, ruleStats, ImmutableSet.of(new MergeLimitWithSort(), new MergeLimitOverProjectWithSort())), - new ParallelizeAuxSort()); + new ParallelizeGrouping()); this.planOptimizers = optimizerBuilder.build(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java similarity index 78% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java index 0acbfc504bd..bb5f04350da 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeAuxSort.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/ParallelizeGrouping.java @@ -29,12 +29,12 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; @@ -48,20 +48,44 @@ import java.util.stream.Collectors; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TAG; import static org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory.TIME; -import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeAuxSort.CanPushDown.ENABLE; -import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeAuxSort.CanPushDown.PENDING; -import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeAuxSort.CanPushDown.UNABLE; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanParalleled.ENABLE; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanParalleled.PENDING; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations.ParallelizeGrouping.CanParalleled.UNABLE; -public class ParallelizeAuxSort implements PlanOptimizer { +/** + * This rule is used to determine whether the SortBasedGroupNode can be parallelized during Logical + * + * <p>Optimization phase: Logical plan planning. + * + * <p>The SortBasedGroupNode can be parallelized if the following conditions are met: + * + * <ul> + * SortingKey is empty and the result child node has been pre-grouped. In the other world, the + * PartitionKey matches the lasted offspring that guarantees the data is grouped by PartitionKey. + * For example: + * <li>SortBasedGroupNode[tag1,tag2] -> SortNode[sort=tag1] + * <li>SortBasedGroupNode[tag1,tag2] -> TopKNode[sort=tag1,tag2] + * <li>SortBasedGroupNode[tag1,tag2] -> AggregationNode[group=tag1] + * <li>SortBasedGroupNode[tag1,tag2] -> TableFunctionNode[partition=tag1] + * </ul> + * + * <ul> + * SortingKey is time column and the lasted offspring that guarantees the data is grouped by + * PartitionKey is TableDeviceScanNode. For example: + * <li>SortBasedGroupNode[device_id,time] -> ... -> TableDeviceScanNode + * </ul> + */ +public class ParallelizeGrouping implements PlanOptimizer { @Override public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) { if (!(context.getAnalysis().isQuery())) { return plan; } - System.out.println("before optimize ParallelizeAuxSort =========================="); + // TODO: remove println + System.out.println("before optimize ParallelizeGrouping =========================="); PlanGraphPrinter.print(plan); PlanNode res = plan.accept(new Rewriter(context.getAnalysis()), new Context(null, 0)); - System.out.println("after optimize ParallelizeAuxSort =========================="); + System.out.println("after optimize ParallelizeGrouping =========================="); PlanGraphPrinter.print(res); return res; // return plan.accept(new Rewriter(context.getAnalysis()), new Context()); @@ -98,31 +122,31 @@ public class ParallelizeAuxSort implements PlanOptimizer { } OrderingScheme prefix = context.orderKey; if (prefix.getOrderBy().size() != context.partitionKeyCount) { - context.canPushDown = UNABLE; + context.canParalleled = UNABLE; return; } if (prefix.getOrderBy().size() > childOrder.size()) { - context.canPushDown = UNABLE; + context.canParalleled = UNABLE; return; } for (int i = 0; i < prefix.getOrderBy().size(); i++) { Symbol lhs = prefix.getOrderBy().get(i); Symbol rhs = childOrder.get(i); if (!lhs.equals(rhs)) { - context.canPushDown = UNABLE; + context.canParalleled = UNABLE; return; } } - context.canPushDown = ENABLE; + context.canParalleled = ENABLE; } @Override - public PlanNode visitAuxSort(AuxSortNode node, Context context) { + public PlanNode visitSortBasedGroup(SortBasedGroupNode node, Context context) { checkPrefixMatch(context, node.getOrderingScheme().getOrderBy()); Context newContext = new Context(node.getOrderingScheme(), node.getPartitionKeyCount()); - AuxSortNode newNode = (AuxSortNode) node.clone(); + SortBasedGroupNode newNode = (SortBasedGroupNode) node.clone(); newNode.addChild(node.getChild().accept(this, newContext)); - if (newContext.canPushDown.equals(ENABLE)) { + if (newContext.canParalleled.equals(ENABLE)) { newNode.setEnableParalleled(true); } return newNode; @@ -148,19 +172,19 @@ public class ParallelizeAuxSort implements PlanOptimizer { @Override public PlanNode visitJoin(JoinNode node, Context context) { - context.canPushDown = UNABLE; + context.canParalleled = UNABLE; return visitPlan(node, context); } @Override public PlanNode visitCorrelatedJoin(CorrelatedJoinNode node, Context context) { - context.canPushDown = UNABLE; + context.canParalleled = UNABLE; return visitPlan(node, context); } @Override public PlanNode visitSemiJoin(SemiJoinNode node, Context context) { - context.canPushDown = UNABLE; + context.canParalleled = UNABLE; return visitPlan(node, context); } @@ -169,13 +193,13 @@ public class ParallelizeAuxSort implements PlanOptimizer { if (!context.canSkip()) { if (node.getChildren().isEmpty()) { // leaf node - context.canPushDown = UNABLE; + context.canParalleled = UNABLE; return node; } Optional<DataOrganizationSpecification> dataOrganizationSpecification = node.getDataOrganizationSpecification(); if (!dataOrganizationSpecification.isPresent()) { - context.canPushDown = UNABLE; + context.canParalleled = UNABLE; } else { checkPrefixMatch(context, dataOrganizationSpecification.get().getPartitionBy()); } @@ -189,7 +213,7 @@ public class ParallelizeAuxSort implements PlanOptimizer { OrderingScheme orderKey = context.orderKey; for (int i = 0; i < orderKey.getOrderBy().size(); i++) { if (!node.getAssignments().contains(orderKey.getOrderBy().get(i))) { - context.canPushDown = UNABLE; + context.canParalleled = UNABLE; break; } } @@ -205,13 +229,13 @@ public class ParallelizeAuxSort implements PlanOptimizer { analysis.getTableColumnSchema(node.getQualifiedObjectName()); // 1. It is possible for the last sort key to be a time column if (orderKey.getOrderBy().size() > context.partitionKeyCount + 1) { - context.canPushDown = UNABLE; + context.canParalleled = UNABLE; return node; } else if (orderKey.getOrderBy().size() == context.partitionKeyCount + 1) { Symbol lastSymbol = orderKey.getOrderBy().get(context.partitionKeyCount); if (!tableColumnSchema.containsKey(lastSymbol) || tableColumnSchema.get(lastSymbol).getColumnCategory() != TIME) { - context.canPushDown = UNABLE; + context.canParalleled = UNABLE; return node; } } @@ -224,7 +248,7 @@ public class ParallelizeAuxSort implements PlanOptimizer { for (int i = 0; i < context.partitionKeyCount; i++) { Symbol symbol = orderKey.getOrderBy().get(i); if (!tableColumnSchema.containsKey(symbol)) { - context.canPushDown = UNABLE; + context.canParalleled = UNABLE; return node; } switch (tableColumnSchema.get(symbol).getColumnCategory()) { @@ -235,34 +259,36 @@ public class ParallelizeAuxSort implements PlanOptimizer { // If all tags in partition key, attributes must be the same in one partition. break; default: - context.canPushDown = UNABLE; + context.canParalleled = UNABLE; return node; } } if (!tagSymbols.isEmpty()) { - context.canPushDown = UNABLE; + context.canParalleled = UNABLE; return node; } - context.canPushDown = ENABLE; + context.canParalleled = ENABLE; } return node; } @Override public PlanNode visitAggregation(AggregationNode node, Context context) { - return super.visitAggregation(node, context); + checkPrefixMatch(context, node.getGroupingKeys()); + return visitPlan(node, context); } @Override public PlanNode visitAggregationTableScan(AggregationTableScanNode node, Context context) { - return super.visitAggregationTableScan(node, context); + checkPrefixMatch(context, node.getGroupingKeys()); + return node; } } private static class Context { private final OrderingScheme orderKey; private final int partitionKeyCount; - private CanPushDown canPushDown = PENDING; + private CanParalleled canParalleled = PENDING; private Context(OrderingScheme orderKey, int sortKeyOffset) { this.orderKey = orderKey; @@ -270,11 +296,11 @@ public class ParallelizeAuxSort implements PlanOptimizer { } private boolean canSkip() { - return orderKey == null || canPushDown != PENDING; + return orderKey == null || canParalleled != PENDING; } } - protected enum CanPushDown { + protected enum CanParalleled { ENABLE, UNABLE, PENDING diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java index f3cb9f99776..fa20100c9db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushLimitOffsetIntoTableScan.java @@ -27,7 +27,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode; @@ -37,6 +36,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LinearFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; @@ -221,7 +221,7 @@ public class PushLimitOffsetIntoTableScan implements PlanOptimizer { } @Override - public PlanNode visitAuxSort(AuxSortNode node, Context context) { + public PlanNode visitSortBasedGroup(SortBasedGroupNode node, Context context) { return visitSort(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java index 340d9ebf592..7e9d27253ba 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java @@ -22,18 +22,21 @@ package org.apache.iotdb.db.queryengine.plan.relational.planner.optimizations; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; +import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -121,6 +124,22 @@ public class TransformAggregationToStreamable implements PlanOptimizer { return ImmutableList.of(); } + @Override + public List<Symbol> visitTableFunctionProcessor( + TableFunctionProcessorNode node, GroupContext context) { + if (node.getChildren().isEmpty()) { + // leaf node + return ImmutableList.of(); + } + Optional<DataOrganizationSpecification> dataOrganizationSpecification = + node.getDataOrganizationSpecification(); + return dataOrganizationSpecification + .<List<Symbol>>map( + organizationSpecification -> + ImmutableList.copyOf(organizationSpecification.getPartitionBy())) + .orElseGet(ImmutableList::of); + } + @Override public List<Symbol> visitDeviceTableScan(DeviceTableScanNode node, GroupContext context) { Set<Symbol> expectedGroupingKeys = context.groupingKeys; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java index f66bd9f25f6..01fd477925b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformSortToStreamSort.java @@ -29,9 +29,9 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.InformationSchemaTableScanNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.StreamSortNode; @@ -122,7 +122,7 @@ public class TransformSortToStreamSort implements PlanOptimizer { } @Override - public PlanNode visitAuxSort(AuxSortNode node, Context context) { + public PlanNode visitSortBasedGroup(SortBasedGroupNode node, Context context) { return visitSingleChildProcess(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index 341add0ab31..e796bf77123 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -32,7 +32,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.SymbolAllocator; import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.DeterminismEvaluator; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ApplyNode; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AuxSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CorrelatedJoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode; @@ -49,6 +48,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OutputNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.PreviousFillNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SemiJoinNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortBasedGroupNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; @@ -425,7 +425,7 @@ public class UnaliasSymbolReferences implements PlanOptimizer { } @Override - public PlanAndMappings visitAuxSort(AuxSortNode node, UnaliasContext context) { + public PlanAndMappings visitSortBasedGroup(SortBasedGroupNode node, UnaliasContext context) { PlanAndMappings rewrittenSource = node.getChild().accept(this, context); Map<Symbol, Symbol> mapping = new HashMap<>(rewrittenSource.getMappings()); SymbolMapper mapper = symbolMapper(mapping); @@ -433,7 +433,7 @@ public class UnaliasSymbolReferences implements PlanOptimizer { OrderingScheme newOrderingScheme = mapper.map(node.getOrderingScheme()); return new PlanAndMappings( - new AuxSortNode( + new SortBasedGroupNode( node.getPlanNodeId(), rewrittenSource.getRoot(), newOrderingScheme, @@ -872,6 +872,7 @@ public class UnaliasSymbolReferences implements PlanOptimizer { Optional.empty(), ImmutableList.of(), Optional.empty(), + false, node.getArguments()), mapping); } @@ -908,6 +909,7 @@ public class UnaliasSymbolReferences implements PlanOptimizer { newPassThroughSpecification, newRequiredSymbols, newSpecification, + node.isRowSemantic(), node.getArguments()); return new PlanAndMappings(rewrittenTableFunctionProcessor, mapping); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java index 2981ecae0dd..830ffa8733c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/TableBuiltinTableFunction.java @@ -20,6 +20,7 @@ package org.apache.iotdb.commons.udf.builtin.relational; import org.apache.iotdb.commons.udf.builtin.relational.tvf.HOPTableFunction; +import org.apache.iotdb.commons.udf.builtin.relational.tvf.RepeatExample; import org.apache.iotdb.udf.api.relational.TableFunction; import java.util.Arrays; @@ -29,6 +30,7 @@ import java.util.stream.Collectors; public enum TableBuiltinTableFunction { HOP("hop"), + REPEAT("repeat"), ; private final String functionName; @@ -59,6 +61,8 @@ public enum TableBuiltinTableFunction { switch (functionName.toLowerCase()) { case "hop": return new HOPTableFunction(); + case "repeat": + return new RepeatExample(); default: throw new UnsupportedOperationException("Unsupported table function: " + functionName); } diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/RepeatExample.java similarity index 95% copy from example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java copy to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/RepeatExample.java index 8b260c59acb..e140b1de42d 100644 --- a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/RepeatExample.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.udf.table; +package org.apache.iotdb.commons.udf.builtin.relational.tvf; import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException; import org.apache.iotdb.udf.api.exception.UDFException; @@ -111,6 +111,11 @@ public class RepeatExample implements TableFunction { @Override public void finish( List<ColumnBuilder> columnBuilders, ColumnBuilder passThroughIndexBuilder) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } for (int i = 1; i < n; i++) { for (int j = 0; j < recordIndex; j++) { columnBuilders.get(0).writeInt(i);
