This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/agg_mergesort in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bcb42793358d879c736bc4a656612df82edb3b27 Author: Beyyes <[email protected]> AuthorDate: Sat Feb 3 16:24:54 2024 +0800 add AggMergeSort impl --- .../operator/process/AggMergeSortOperator.java | 193 +++++++++++++++++++++ .../plan/planner/LogicalPlanBuilder.java | 32 +++- .../plan/planner/OperatorTreeGenerator.java | 22 +++ .../planner/distribution/ExchangeNodeAdder.java | 62 ++++++- .../plan/planner/distribution/SourceRewriter.java | 132 ++++++++------ .../plan/planner/plan/node/PlanGraphPrinter.java | 9 + .../plan/planner/plan/node/PlanNodeType.java | 7 +- .../plan/planner/plan/node/PlanVisitor.java | 5 + .../{DeviceViewNode.java => AggMergeSortNode.java} | 73 ++------ .../planner/plan/node/process/DeviceViewNode.java | 8 +- .../AlignByDeviceOrderByLimitOffsetTest.java | 3 +- 11 files changed, 412 insertions(+), 134 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggMergeSortOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggMergeSortOperator.java new file mode 100644 index 00000000000..8616079e069 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggMergeSortOperator.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.execution.operator.process; + +import org.apache.iotdb.db.queryengine.execution.operator.Operator; +import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.block.TsBlock; +import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.NullColumn; +import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn; +import org.apache.iotdb.tsfile.utils.Binary; + +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.List; + +/** + * Since devices have been sorted by the merge order as expected, what {@link AggMergeSortOperator} + * need to do is traversing the device child operators, get all tsBlocks of one device and transform + * it to the form we need, adding the device column and allocating value column to its expected + * location, then get the next device operator until no next device. + * + * <p>The deviceOperators can be timeJoinOperator or seriesScanOperator that have not transformed + * the result form. + * + * <p>Attention! If some columns are not existing in one device, those columns will be null. e.g. + * [s1,s2,s3] is query, but only [s1, s3] exists in device1, then the column of s2 will be filled + * with NullColumn. + */ +public class AggMergeSortOperator implements ProcessOperator { + + private final OperatorContext operatorContext; + // The size devices and deviceOperators should be the same. + private final List<String> devices; + private final List<Operator> deviceOperators; + // Used to fill columns and leave null columns which doesn't exist in some devices. + // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> [1, 3], s1 is 1 but + // not 0 because device is the first column + private final List<List<Integer>> deviceColumnIndex; + // Column dataTypes that includes device column + private final List<TSDataType> dataTypes; + + private int deviceIndex; + + public AggMergeSortOperator( + OperatorContext operatorContext, + List<String> devices, + List<Operator> deviceOperators, + List<List<Integer>> deviceColumnIndex, + List<TSDataType> dataTypes) { + this.operatorContext = operatorContext; + this.devices = devices; + this.deviceOperators = deviceOperators; + this.deviceColumnIndex = deviceColumnIndex; + this.dataTypes = dataTypes; + + this.deviceIndex = 0; + } + + private String getCurDeviceName() { + return devices.get(deviceIndex); + } + + private Operator getCurDeviceOperator() { + return deviceOperators.get(deviceIndex); + } + + private List<Integer> getCurDeviceIndexes() { + return deviceColumnIndex.get(deviceIndex); + } + + @Override + public OperatorContext getOperatorContext() { + return operatorContext; + } + + @Override + public ListenableFuture<?> isBlocked() { + if (deviceIndex >= deviceOperators.size()) { + return NOT_BLOCKED; + } + ListenableFuture<?> blocked = getCurDeviceOperator().isBlocked(); + if (!blocked.isDone()) { + return blocked; + } + return NOT_BLOCKED; + } + + @Override + public TsBlock next() throws Exception { + if (!getCurDeviceOperator().hasNextWithTimer()) { + // close finished child + getCurDeviceOperator().close(); + deviceOperators.set(deviceIndex, null); + // increment index, move to next child + deviceIndex++; + return null; + } + + TsBlock tsBlock = getCurDeviceOperator().nextWithTimer(); + if (tsBlock == null) { + return null; + } + List<Integer> indexes = getCurDeviceIndexes(); + + // fill existing columns + Column[] newValueColumns = new Column[dataTypes.size()]; + for (int i = 0; i < indexes.size(); i++) { + newValueColumns[indexes.get(i)] = tsBlock.getColumn(i); + } + // construct device column + ColumnBuilder deviceColumnBuilder = new BinaryColumnBuilder(null, 1); + deviceColumnBuilder.writeBinary(new Binary(getCurDeviceName(), TSFileConfig.STRING_CHARSET)); + newValueColumns[0] = + new RunLengthEncodedColumn(deviceColumnBuilder.build(), tsBlock.getPositionCount()); + // construct other null columns + for (int i = 0; i < dataTypes.size(); i++) { + if (newValueColumns[i] == null) { + newValueColumns[i] = NullColumn.create(dataTypes.get(i), tsBlock.getPositionCount()); + } + } + return new TsBlock(tsBlock.getPositionCount(), tsBlock.getTimeColumn(), newValueColumns); + } + + @Override + public boolean hasNext() throws Exception { + return deviceIndex < devices.size(); + } + + @Override + public void close() throws Exception { + for (int i = deviceIndex, n = deviceOperators.size(); i < n; i++) { + Operator currentChild = deviceOperators.get(i); + if (currentChild != null) { + deviceOperators.get(i).close(); + } + } + } + + @Override + public boolean isFinished() throws Exception { + return !this.hasNextWithTimer(); + } + + @Override + public long calculateMaxPeekMemory() { + long maxPeekMemory = calculateMaxReturnSize() + calculateRetainedSizeAfterCallingNext(); + for (Operator child : deviceOperators) { + maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory()); + } + return maxPeekMemory; + } + + @Override + public long calculateMaxReturnSize() { + // null columns would be filled, so return size equals to + // (numberOfValueColumns(dataTypes.size() - 1) + 1(timeColumn)) * columnSize + deviceColumnSize + // size of device name column is ignored + return (long) (dataTypes.size()) + * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + long max = 0; + for (Operator operator : deviceOperators) { + max = Math.max(max, operator.calculateRetainedSizeAfterCallingNext()); + } + return max; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java index f4e95050210..4871df6af9d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java @@ -55,6 +55,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggMergeSortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewIntoNode; @@ -811,7 +812,8 @@ public class LogicalPlanBuilder { outputColumnNames, deviceToMeasurementIndexesMap, deviceNameToSourceNodesMap, - valueFilterLimit)); + valueFilterLimit, + queryStatement.isAggregationQuery())); } analysis.setUseTopKNode(); @@ -836,7 +838,8 @@ public class LogicalPlanBuilder { outputColumnNames, deviceToMeasurementIndexesMap, deviceNameToSourceNodesMap, - -1); + -1, + queryStatement.isAggregationQuery()); } context.getTypeProvider().setType(DEVICE, TSDataType.TEXT); @@ -916,13 +919,24 @@ public class LogicalPlanBuilder { List<String> outputColumnNames, Map<String, List<Integer>> deviceToMeasurementIndexesMap, Map<String, PlanNode> deviceNameToSourceNodesMap, - long valueFilterLimit) { - DeviceViewNode deviceViewNode = - new DeviceViewNode( - context.getQueryId().genPlanNodeId(), - orderByParameter, - outputColumnNames, - deviceToMeasurementIndexesMap); + long valueFilterLimit, + boolean isAggregation) { + DeviceViewNode deviceViewNode; + if (isAggregation) { + deviceViewNode = + new AggMergeSortNode( + context.getQueryId().genPlanNodeId(), + orderByParameter, + outputColumnNames, + deviceToMeasurementIndexesMap); + } else { + deviceViewNode = + new DeviceViewNode( + context.getQueryId().genPlanNodeId(), + orderByParameter, + outputColumnNames, + deviceToMeasurementIndexesMap); + } for (Map.Entry<String, PlanNode> entry : deviceNameToSourceNodesMap.entrySet()) { String deviceName = entry.getKey(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java index 7d2b88bb9cc..8661406f2a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java @@ -45,6 +45,7 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManage import org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil; import org.apache.iotdb.db.queryengine.execution.operator.Operator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; +import org.apache.iotdb.db.queryengine.execution.operator.process.AggMergeSortOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.AggregationOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.ColumnInjectOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.DeviceViewIntoOperator; @@ -166,6 +167,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesCountNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggMergeSortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewIntoNode; @@ -822,6 +824,26 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP operatorContext, node.getDevices(), children, deviceColumnIndex, outputColumnTypes); } + @Override + public Operator visitAggMergeSort(AggMergeSortNode node, LocalExecutionPlanContext context) { + OperatorContext operatorContext = + context + .getDriverContext() + .addOperatorContext( + context.getNextOperatorId(), + node.getPlanNodeId(), + DeviceViewOperator.class.getSimpleName()); + List<Operator> children = dealWithConsumeChildrenOneByOneNode(node, context); + List<List<Integer>> deviceColumnIndex = + node.getDevices().stream() + .map(deviceName -> node.getDeviceToMeasurementIndexesMap().get(deviceName)) + .collect(Collectors.toList()); + List<TSDataType> outputColumnTypes = getOutputColumnTypes(node, context.getTypeProvider()); + + return new AggMergeSortOperator( + operatorContext, node.getDevices(), children, deviceColumnIndex, outputColumnTypes); + } + @Override public Operator visitMergeSort(MergeSortNode node, LocalExecutionPlanContext context) { OperatorContext operatorContext = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java index 06043bd2e41..8e6fed5a02a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java @@ -32,6 +32,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggMergeSortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; @@ -200,6 +201,11 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { return processMultiChildNode(node, context); } + @Override + public PlanNode visitAggMergeSort(AggMergeSortNode node, NodeGroupContext context) { + return processMultiChildNode(node, context); + } + @Override public PlanNode visitDeviceMerge(DeviceMergeNode node, NodeGroupContext context) { return processMultiChildNode(node, context); @@ -377,7 +383,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { .map(child -> visit(child, context)) .collect(Collectors.toList()); - // DataRegion which node locates + // DataRegion in which node locates TRegionReplicaSet dataRegion; boolean isChildrenDistributionSame = nodeDistributionIsSame(visitedChildren, context); NodeDistributionType distributionType = @@ -406,10 +412,14 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { return newNode; } + if (node instanceof AggMergeSortNode) { + return processAggMergeSortNode(node, visitedChildren, context, newNode, dataRegion); + } + // optimize `order by time|expression limit N align by device` query, // to ensure that the number of ExchangeNode equals to DataRegionNum but not equals to DeviceNum if (node instanceof TopKNode) { - return processTopNode(node, visitedChildren, context, newNode, dataRegion); + return processTopKNode(node, visitedChildren, context, newNode, dataRegion); } // Otherwise, we need to add ExchangeNode for the child whose DataRegion is different from the @@ -450,7 +460,7 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { return newNode; } - private PlanNode processTopNode( + private PlanNode processTopKNode( MultiChildProcessNode node, List<PlanNode> visitedChildren, NodeGroupContext context, @@ -496,6 +506,52 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> { return newNode; } + private PlanNode processAggMergeSortNode( + MultiChildProcessNode node, + List<PlanNode> visitedChildren, + NodeGroupContext context, + MultiChildProcessNode newNode, + TRegionReplicaSet dataRegion) { + AggMergeSortNode aggMergeSortNode = (AggMergeSortNode) node; + Map<TRegionReplicaSet, DeviceViewNode> regionTopKNodeMap = new HashMap<>(); + for (PlanNode child : visitedChildren) { + TRegionReplicaSet region = context.getNodeDistribution(child.getPlanNodeId()).region; + regionTopKNodeMap + .computeIfAbsent( + region, + k -> { + DeviceViewNode childDeviceViewNode = + new DeviceViewNode( + context.queryContext.getQueryId().genPlanNodeId(), + aggMergeSortNode.getMergeOrderParameter(), + aggMergeSortNode.getOutputColumnNames(), + aggMergeSortNode.getDeviceToMeasurementIndexesMap()); + context.putNodeDistribution( + childDeviceViewNode.getPlanNodeId(), + new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, region)); + return childDeviceViewNode; + }) + .addChild(child); + } + + for (Map.Entry<TRegionReplicaSet, DeviceViewNode> entry : regionTopKNodeMap.entrySet()) { + TRegionReplicaSet deviceViewNodeLocatedRegion = entry.getKey(); + DeviceViewNode deviceViewNode = entry.getValue(); + + if (!dataRegion.equals(deviceViewNodeLocatedRegion)) { + ExchangeNode exchangeNode = + new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId()); + exchangeNode.setChild(deviceViewNode); + exchangeNode.setOutputColumnNames(deviceViewNode.getOutputColumnNames()); + context.hasExchangeNode = true; + newNode.addChild(exchangeNode); + } else { + newNode.addChild(deviceViewNode); + } + } + return newNode; + } + @Override public PlanNode visitSlidingWindowAggregation( SlidingWindowAggregationNode node, NodeGroupContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java index 7b5a82062f9..3661d93b252 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java @@ -38,6 +38,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.Sche import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaFetchScanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggMergeSortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.GroupByLevelNode; @@ -232,6 +233,30 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte return Collections.singletonList(mergeSortNode); } + @Override + public List<PlanNode> visitAggMergeSort(AggMergeSortNode node, DistributionPlanContext context) { + AggMergeSortNode newRoot = + new AggMergeSortNode( + context.queryContext.getQueryId().genPlanNodeId(), + node.getMergeOrderParameter(), + node.getOutputColumnNames(), + node.getDeviceToMeasurementIndexesMap()); + for (int i = 0; i < node.getDevices().size(); i++) { + List<PlanNode> rewroteNode = rewrite(node.getChildren().get(i), context); + for (PlanNode planNode : rewroteNode) { + if (planNode instanceof AggregationNode) { + // if children of AggregationNode can only be AggScanNode? + for (PlanNode aggScanNode : planNode.getChildren()) { + newRoot.addChildDeviceNode(node.getDevices().get(i), aggScanNode); + } + } else { + newRoot.addChildDeviceNode(node.getDevices().get(i), planNode); + } + } + } + return Collections.singletonList(newRoot); + } + private List<PlanNode> processSpecialDeviceView( DeviceViewNode node, DistributionPlanContext context) { DeviceViewNode newRoot = cloneDeviceViewNodeWithoutChild(node, context); @@ -347,23 +372,21 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte } }); schemaRegions.forEach( - region -> { - addSchemaSourceNode( - root, - seed.getPath(), - region, - context.queryContext.getQueryId().genPlanNodeId(), - seed); - }); + region -> + addSchemaSourceNode( + root, + seed.getPath(), + region, + context.queryContext.getQueryId().genPlanNodeId(), + seed)); regionsOfSystemDatabase.forEach( - region -> { - addSchemaSourceNode( - root, - seed.getPath(), - region, - context.queryContext.getQueryId().genPlanNodeId(), - seed); - }); + region -> + addSchemaSourceNode( + root, + seed.getPath(), + region, + context.queryContext.getQueryId().genPlanNodeId(), + seed)); } else { // the path pattern may only overlap with part of storageGroup or storageGroup.**, need filter PathPatternTree patternTree = new PathPatternTree(); @@ -394,31 +417,29 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte List<PartialPath> filteredPathPatternList = filterPathPattern(patternTree, storageGroup); schemaRegionSet.forEach( - region -> { - addSchemaSourceNode( - root, - filteredPathPatternList.size() == 1 - ? filteredPathPatternList.get(0) - : seed.getPath(), - region, - context.queryContext.getQueryId().genPlanNodeId(), - seed); - }); + region -> + addSchemaSourceNode( + root, + filteredPathPatternList.size() == 1 + ? filteredPathPatternList.get(0) + : seed.getPath(), + region, + context.queryContext.getQueryId().genPlanNodeId(), + seed)); }); if (!regionsOfSystemDatabase.isEmpty()) { List<PartialPath> filteredPathPatternList = filterPathPattern(patternTree, SchemaConstant.SYSTEM_DATABASE); regionsOfSystemDatabase.forEach( - region -> { - addSchemaSourceNode( - root, - filteredPathPatternList.size() == 1 - ? filteredPathPatternList.get(0) - : seed.getPath(), - region, - context.queryContext.getQueryId().genPlanNodeId(), - seed); - }); + region -> + addSchemaSourceNode( + root, + filteredPathPatternList.size() == 1 + ? filteredPathPatternList.get(0) + : seed.getPath(), + region, + context.queryContext.getQueryId().genPlanNodeId(), + seed)); } } return Collections.singletonList(root); @@ -462,11 +483,10 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte .getSchemaPartitionInfo() .getSchemaPartitionMap() .forEach( - (storageGroup, deviceGroup) -> { - deviceGroup.forEach( - (deviceGroupId, schemaRegionReplicaSet) -> - schemaRegions.add(schemaRegionReplicaSet)); - }); + (storageGroup, deviceGroup) -> + deviceGroup.forEach( + (deviceGroupId, schemaRegionReplicaSet) -> + schemaRegions.add(schemaRegionReplicaSet))); schemaRegions.forEach( region -> { SchemaQueryScanNode schemaQueryScanNode = (SchemaQueryScanNode) seed.clone(); @@ -564,14 +584,13 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte List<AggregationDescriptor> leafAggDescriptorList = new ArrayList<>(); node.getAggregationDescriptorList() .forEach( - descriptor -> { - leafAggDescriptorList.add( - new AggregationDescriptor( - descriptor.getAggregationFuncName(), - AggregationStep.PARTIAL, - descriptor.getInputExpressions(), - descriptor.getInputAttributes())); - }); + descriptor -> + leafAggDescriptorList.add( + new AggregationDescriptor( + descriptor.getAggregationFuncName(), + AggregationStep.PARTIAL, + descriptor.getInputExpressions(), + descriptor.getInputAttributes()))); leafAggDescriptorList.forEach( d -> LogicalPlanBuilder.updateTypeProviderByPartialAggregation( @@ -579,14 +598,13 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte List<AggregationDescriptor> rootAggDescriptorList = new ArrayList<>(); node.getAggregationDescriptorList() .forEach( - descriptor -> { - rootAggDescriptorList.add( - new AggregationDescriptor( - descriptor.getAggregationFuncName(), - context.isRoot ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE, - descriptor.getInputExpressions(), - descriptor.getInputAttributes())); - }); + descriptor -> + rootAggDescriptorList.add( + new AggregationDescriptor( + descriptor.getAggregationFuncName(), + context.isRoot ? AggregationStep.FINAL : AggregationStep.INTERMEDIATE, + descriptor.getInputExpressions(), + descriptor.getInputAttributes()))); AggregationNode aggregationNode = new AggregationNode( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index fcebd9b0c66..72ac2a5a581 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.commons.partition.DataPartition; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.queryengine.plan.expression.Expression; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggMergeSortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode; @@ -188,6 +189,14 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter return render(node, boxValue, context); } + @Override + public List<String> visitAggMergeSort(AggMergeSortNode node, GraphContext context) { + List<String> boxValue = new ArrayList<>(); + boxValue.add(String.format("AggMergeSort-%s", node.getPlanNodeId().getId())); + boxValue.add(String.format("AggDeviceCount: %d", node.getDevices().size())); + return render(node, boxValue, context); + } + @Override public List<String> visitMergeSort(MergeSortNode node, GraphContext context) { List<String> boxValue = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java index 5eb722c7422..1cf585eeb7d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java @@ -60,6 +60,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedC import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggMergeSortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode; @@ -200,7 +201,9 @@ public enum PlanNodeType { PIPE_ENRICHED_DELETE_SCHEMA((short) 86), INNER_TIME_JOIN((short) 87), - LEFT_OUTER_TIME_JOIN((short) 88); + LEFT_OUTER_TIME_JOIN((short) 88), + + AGG_MERGE_SORT((short) 89); public static final int BYTES = Short.BYTES; @@ -425,6 +428,8 @@ public enum PlanNodeType { return InnerTimeJoinNode.deserialize(buffer); case 88: return LeftOuterTimeJoinNode.deserialize(buffer); + case 89: + return AggMergeSortNode.deserialize(buffer); default: throw new IllegalArgumentException("Invalid node type: " + nodeType); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java index ed7072a76eb..8b67a9a98bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanVisitor.java @@ -57,6 +57,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedC import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedInsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeEnrichedWriteSchemaNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggMergeSortNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.ColumnInjectNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode; @@ -231,6 +232,10 @@ public abstract class PlanVisitor<R, C> { return visitMultiChildProcess(node, context); } + public R visitAggMergeSort(AggMergeSortNode node, C context) { + return visitMultiChildProcess(node, context); + } + public R visitDeviceMerge(DeviceMergeNode node, C context) { return visitMultiChildProcess(node, context); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggMergeSortNode.java similarity index 72% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggMergeSortNode.java index 4d4b9b67049..ad1f2797db7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggMergeSortNode.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; @@ -34,70 +35,33 @@ import java.util.List; import java.util.Map; import java.util.Objects; -/** - * DeviceViewNode is responsible for constructing a device-based view of a set of series. And output - * the result with specific order. The order could be 'order by device' or 'order by timestamp' - * - * <p>Each output from its children should have the same schema. That means, the columns should be - * same between these TsBlocks. If the input TsBlock contains n columns, the device-based view will - * contain n+1 columns where the new column is Device column. - */ -public class DeviceViewNode extends MultiChildProcessNode { - - // The result output order, which could sort by device and time. - // The size of this list is 2 and the first SortItem in this list has higher priority. - private final OrderByParameter mergeOrderParameter; - - // The size devices and children should be the same. - private final List<String> devices = new ArrayList<>(); +public class AggMergeSortNode extends DeviceViewNode { - // Device column and measurement columns in result output - private final List<String> outputColumnNames; - - // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> [1, 3], s1 is 1 but - // not 0 because device is the first column - private final Map<String, List<Integer>> deviceToMeasurementIndexesMap; - - public DeviceViewNode( + public AggMergeSortNode( PlanNodeId id, OrderByParameter mergeOrderParameter, List<String> outputColumnNames, Map<String, List<Integer>> deviceToMeasurementIndexesMap) { - super(id); - this.mergeOrderParameter = mergeOrderParameter; - this.outputColumnNames = outputColumnNames; - this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap; + super(id, mergeOrderParameter, outputColumnNames, deviceToMeasurementIndexesMap); } - public DeviceViewNode( + public AggMergeSortNode( PlanNodeId id, OrderByParameter mergeOrderParameter, List<String> outputColumnNames, List<String> devices, Map<String, List<Integer>> deviceToMeasurementIndexesMap) { - super(id); - this.mergeOrderParameter = mergeOrderParameter; - this.outputColumnNames = outputColumnNames; - this.devices.addAll(devices); - this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap; - } - - public void addChildDeviceNode(String deviceName, PlanNode childNode) { - this.devices.add(deviceName); - this.children.add(childNode); - } - - public List<String> getDevices() { - return devices; + super(id, mergeOrderParameter, outputColumnNames, devices, deviceToMeasurementIndexesMap); } - public Map<String, List<Integer>> getDeviceToMeasurementIndexesMap() { - return deviceToMeasurementIndexesMap; + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitAggMergeSort(this, context); } @Override public PlanNode clone() { - return new DeviceViewNode( + return new AggMergeSortNode( getPlanNodeId(), mergeOrderParameter, outputColumnNames, @@ -105,23 +69,14 @@ public class DeviceViewNode extends MultiChildProcessNode { deviceToMeasurementIndexesMap); } - public OrderByParameter getMergeOrderParameter() { - return mergeOrderParameter; - } - @Override public List<String> getOutputColumnNames() { return outputColumnNames; } - @Override - public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { - return visitor.visitDeviceView(this, context); - } - @Override protected void serializeAttributes(ByteBuffer byteBuffer) { - PlanNodeType.DEVICE_VIEW.serialize(byteBuffer); + PlanNodeType.AGG_MERGE_SORT.serialize(byteBuffer); mergeOrderParameter.serializeAttributes(byteBuffer); ReadWriteIOUtils.write(outputColumnNames.size(), byteBuffer); for (String column : outputColumnNames) { @@ -143,7 +98,7 @@ public class DeviceViewNode extends MultiChildProcessNode { @Override protected void serializeAttributes(DataOutputStream stream) throws IOException { - PlanNodeType.DEVICE_VIEW.serialize(stream); + PlanNodeType.AGG_MERGE_SORT.serialize(stream); mergeOrderParameter.serializeAttributes(stream); ReadWriteIOUtils.write(outputColumnNames.size(), stream); for (String column : outputColumnNames) { @@ -225,6 +180,6 @@ public class DeviceViewNode extends MultiChildProcessNode { @Override public String toString() { - return "DeviceView-" + this.getPlanNodeId(); + return "AggMergeSort-" + this.getPlanNodeId(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java index 4d4b9b67049..72fddc56896 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/DeviceViewNode.java @@ -46,17 +46,17 @@ public class DeviceViewNode extends MultiChildProcessNode { // The result output order, which could sort by device and time. // The size of this list is 2 and the first SortItem in this list has higher priority. - private final OrderByParameter mergeOrderParameter; + protected final OrderByParameter mergeOrderParameter; // The size devices and children should be the same. - private final List<String> devices = new ArrayList<>(); + protected final List<String> devices = new ArrayList<>(); // Device column and measurement columns in result output - private final List<String> outputColumnNames; + protected final List<String> outputColumnNames; // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 -> [1, 3], s1 is 1 but // not 0 because device is the first column - private final Map<String, List<Integer>> deviceToMeasurementIndexesMap; + protected final Map<String, List<Integer>> deviceToMeasurementIndexesMap; public DeviceViewNode( PlanNodeId id, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java index 750a86c4b3a..bf9ac3e9861 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AlignByDeviceOrderByLimitOffsetTest.java @@ -90,7 +90,8 @@ public class AlignByDeviceOrderByLimitOffsetTest { @Test public void orderByDeviceTest1() { // no order by - sql = "select * from root.sg.d1, root.sg.d22 LIMIT 10 align by device"; + // sql = "select * from root.sg.d1, root.sg.d22 LIMIT 10 align by device"; + sql = "select first_value(s1) from root.sg.d1, root.sg.d22 align by device"; analysis = Util.analyze(sql, context); logicalPlanNode = Util.genLogicalPlan(analysis, context); planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context, logicalPlanNode));
