This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/agg_mergesort
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/agg_mergesort by this
push:
new 1351d02a509 perfect code
1351d02a509 is described below
commit 1351d02a50900b04410240fb802a91ab119c949f
Author: Beyyes <[email protected]>
AuthorDate: Sun Feb 18 20:05:32 2024 +0800
perfect code
---
.../process/AggregationMergeSortOperator.java | 283 ++++++++++++---------
.../plan/planner/LogicalPlanBuilder.java | 11 +-
.../plan/planner/OperatorTreeGenerator.java | 25 +-
.../planner/distribution/ExchangeNodeAdder.java | 8 +-
.../plan/planner/distribution/SourceRewriter.java | 12 +-
.../plan/planner/plan/node/PlanGraphPrinter.java | 1 -
.../node/process/AggregationMergeSortNode.java | 141 ++++------
.../operator/AggregationOperatorTest.java | 17 +-
.../distribution/AggregationAlignByDeviceTest.java | 117 +++++++++
9 files changed, 377 insertions(+), 238 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
index 3efb8111624..c7f4b19717d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -21,173 +21,228 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
+import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.db.utils.datastructure.SortKey;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
-import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
-import org.apache.iotdb.tsfile.read.common.block.column.NullColumn;
-import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import com.google.common.util.concurrent.ListenableFuture;
+import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+public class AggregationMergeSortOperator extends AbstractConsumeAllOperator {
-/**
- * Since devices have been sorted by the merge order as expected, what {@link
- * AggregationMergeSortOperator} need to do is traversing the device child
operators, get all
- * tsBlocks of one device and transform it to the form we need, adding the
device column and
- * allocating value column to its expected location, then get the next device
operator until no next
- * device.
- *
- * <p>The deviceOperators can be aggregationSeriesScanOperator,
imeJoinOperator or
- * seriesScanOperator that have not transformed the result form.
- *
- * <p>Attention! If some columns are not existing in one device, those columns
will be null. e.g.
- * [s1,s2,s3] is query, but only [s1, s3] exists in device1, then the column
of s2 will be filled
- * with NullColumn.
- */
-public class AggregationMergeSortOperator implements ProcessOperator {
-
- private final OperatorContext operatorContext;
- // The size devices and deviceOperators should be the same.
- private final List<String> devices;
- private final List<Operator> deviceOperators;
- // Used to fill columns and leave null columns which doesn't exist in some
devices.
- // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 ->
[1, 3], s1 is 1 but
- // not 0 because device is the first column
- private final List<List<Integer>> deviceColumnIndex;
- // Column dataTypes that includes device column
private final List<TSDataType> dataTypes;
+ private final TsBlockBuilder tsBlockBuilder;
+ private final boolean[] noMoreTsBlocks;
+ private final MergeSortHeap mergeSortHeap;
+ private final Comparator<SortKey> comparator;
- private int deviceIndex;
+ private boolean finished;
public AggregationMergeSortOperator(
OperatorContext operatorContext,
- List<String> devices,
- List<Operator> deviceOperators,
- List<List<Integer>> deviceColumnIndex,
- List<TSDataType> dataTypes) {
- this.operatorContext = operatorContext;
- this.devices = devices;
- this.deviceOperators = deviceOperators;
- this.deviceColumnIndex = deviceColumnIndex;
+ List<Operator> inputOperators,
+ List<TSDataType> dataTypes,
+ Comparator<SortKey> comparator) {
+ super(operatorContext, inputOperators);
this.dataTypes = dataTypes;
-
- this.deviceIndex = 0;
- }
-
- private Operator getCurDeviceOperator() {
- return deviceOperators.get(deviceIndex);
- }
-
- private List<Integer> getCurDeviceIndexes() {
- return deviceColumnIndex.get(deviceIndex);
- }
-
- @Override
- public OperatorContext getOperatorContext() {
- return operatorContext;
+ this.mergeSortHeap = new MergeSortHeap(inputOperatorsCount, comparator);
+ this.comparator = comparator;
+ this.noMoreTsBlocks = new boolean[inputOperatorsCount];
+ this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
}
@Override
public ListenableFuture<?> isBlocked() {
- if (deviceIndex >= deviceOperators.size()) {
- return NOT_BLOCKED;
- }
- ListenableFuture<?> blocked = getCurDeviceOperator().isBlocked();
- if (!blocked.isDone()) {
- return blocked;
+ boolean hasReadyChild = false;
+ List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (noMoreTsBlocks[i] || !isEmpty(i) || children.get(i) == null) {
+ continue;
+ }
+ ListenableFuture<?> blocked = children.get(i).isBlocked();
+ if (blocked.isDone()) {
+ hasReadyChild = true;
+ canCallNext[i] = true;
+ } else {
+ listenableFutures.add(blocked);
+ }
}
- return NOT_BLOCKED;
+ return (hasReadyChild || listenableFutures.isEmpty())
+ ? NOT_BLOCKED
+ : successfulAsList(listenableFutures);
}
+ @SuppressWarnings({"squid:S3776", "squid:S135"})
@Override
public TsBlock next() throws Exception {
- if (!getCurDeviceOperator().hasNextWithTimer()) {
- // close finished child
- getCurDeviceOperator().close();
- deviceOperators.set(deviceIndex, null);
- // increment index, move to next child
- deviceIndex++;
- return null;
- }
-
- boolean deviceView =
- getCurDeviceOperator() instanceof DeviceViewOperator
- || getCurDeviceOperator() instanceof ExchangeOperator;
+ // start stopwatch
+ long startTime = System.nanoTime();
+ long maxRuntime =
operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
- TsBlock tsBlock = getCurDeviceOperator().nextWithTimer();
- if (tsBlock == null) {
+ // 1. fill consumed up TsBlock
+ if (!prepareInput()) {
return null;
}
- List<Integer> indexes = getCurDeviceIndexes();
- // fill existing columns
- Column[] newValueColumns = new Column[dataTypes.size()];
- for (int i = 0; i < indexes.size(); i++) {
- newValueColumns[indexes.get(i)] = tsBlock.getColumn(deviceView ? i + 1 :
i);
+ // 2. check if we can directly return the original TsBlock instead of
merging way
+ MergeSortKey minMergeSortKey = mergeSortHeap.poll();
+ if (mergeSortHeap.isEmpty()
+ || comparator.compare(
+ new MergeSortKey(
+ minMergeSortKey.tsBlock,
minMergeSortKey.tsBlock.getPositionCount() - 1),
+ mergeSortHeap.peek())
+ < 0) {
+ inputTsBlocks[minMergeSortKey.inputChannelIndex] = null;
+ return minMergeSortKey.rowIndex == 0
+ ? minMergeSortKey.tsBlock
+ : minMergeSortKey.tsBlock.subTsBlock(minMergeSortKey.rowIndex);
}
- // construct device column
- ColumnBuilder deviceColumnBuilder = new BinaryColumnBuilder(null, 1);
- deviceColumnBuilder.writeBinary(tsBlock.getColumn(0).getBinary(0));
- newValueColumns[0] =
- new RunLengthEncodedColumn(deviceColumnBuilder.build(),
tsBlock.getPositionCount());
- // construct other null columns
- for (int i = 0; i < dataTypes.size(); i++) {
- if (newValueColumns[i] == null) {
- newValueColumns[i] = NullColumn.create(dataTypes.get(i),
tsBlock.getPositionCount());
+ mergeSortHeap.push(minMergeSortKey);
+
+ // 3. do merge sort until one TsBlock is consumed up
+ tsBlockBuilder.reset();
+ TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder[] valueColumnBuilders =
tsBlockBuilder.getValueColumnBuilders();
+ while (!mergeSortHeap.isEmpty()) {
+ MergeSortKey mergeSortKey = mergeSortHeap.poll();
+ TsBlock targetBlock = mergeSortKey.tsBlock;
+ int rowIndex = mergeSortKey.rowIndex;
+ timeBuilder.writeLong(targetBlock.getTimeByIndex(rowIndex));
+ for (int i = 0; i < valueColumnBuilders.length; i++) {
+ if (targetBlock.getColumn(i).isNull(rowIndex)) {
+ valueColumnBuilders[i].appendNull();
+ continue;
+ }
+ valueColumnBuilders[i].write(targetBlock.getColumn(i), rowIndex);
+ }
+ tsBlockBuilder.declarePosition();
+ if (mergeSortKey.rowIndex == mergeSortKey.tsBlock.getPositionCount() -
1) {
+ inputTsBlocks[mergeSortKey.inputChannelIndex] = null;
+ break;
+ } else {
+ mergeSortKey.rowIndex++;
+ mergeSortHeap.push(mergeSortKey);
+ }
+ // break if time is out or tsBlockBuilder is full
+ if (System.nanoTime() - startTime > maxRuntime ||
tsBlockBuilder.isFull()) {
+ break;
}
}
- return new TsBlock(tsBlock.getPositionCount(), tsBlock.getTimeColumn(),
newValueColumns);
+ return tsBlockBuilder.build();
}
@Override
public boolean hasNext() throws Exception {
- return deviceIndex < deviceOperators.size();
- }
-
- @Override
- public void close() throws Exception {
- for (int i = deviceIndex, n = deviceOperators.size(); i < n; i++) {
- Operator currentChild = deviceOperators.get(i);
- if (currentChild != null) {
- deviceOperators.get(i).close();
+ if (finished) {
+ return false;
+ }
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (!isEmpty(i)) {
+ return true;
+ } else if (!noMoreTsBlocks[i]) {
+ if (!canCallNext[i] || children.get(i).hasNextWithTimer()) {
+ return true;
+ } else {
+ children.get(i).close();
+ children.set(i, null);
+ noMoreTsBlocks[i] = true;
+ inputTsBlocks[i] = null;
+ }
}
}
+ return false;
}
@Override
public boolean isFinished() throws Exception {
- return !this.hasNextWithTimer();
+ if (finished) {
+ return true;
+ }
+ finished = true;
+
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (!noMoreTsBlocks[i] || !isEmpty(i)) {
+ finished = false;
+ break;
+ }
+ }
+ return finished;
}
@Override
public long calculateMaxPeekMemory() {
- long maxPeekMemory = calculateMaxReturnSize() +
calculateRetainedSizeAfterCallingNext();
- for (Operator child : deviceOperators) {
- maxPeekMemory = Math.max(maxPeekMemory, child.calculateMaxPeekMemory());
+ // MergeToolKit will cache startKey and endKey
+ long maxPeekMemory =
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ // inputTsBlocks will cache all the tsBlocks returned by inputOperators
+ for (Operator operator : children) {
+ maxPeekMemory += operator.calculateMaxReturnSize();
+ maxPeekMemory += operator.calculateRetainedSizeAfterCallingNext();
}
- return maxPeekMemory;
+ for (Operator operator : children) {
+ maxPeekMemory = Math.max(maxPeekMemory,
operator.calculateMaxPeekMemory());
+ }
+ return Math.max(maxPeekMemory, calculateMaxReturnSize());
}
@Override
public long calculateMaxReturnSize() {
- // null columns would be filled, so return size equals to
- // (numberOfValueColumns(dataTypes.size() - 1) + 1(timeColumn)) *
columnSize + deviceColumnSize
- // size of device name column is ignored
- return (long) (dataTypes.size())
- * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ return (1L + dataTypes.size()) *
TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
}
@Override
public long calculateRetainedSizeAfterCallingNext() {
- long max = 0;
- for (Operator operator : deviceOperators) {
- max = Math.max(max, operator.calculateRetainedSizeAfterCallingNext());
+ long currentRetainedSize = 0;
+ long minChildReturnSize = Long.MAX_VALUE;
+ for (Operator child : children) {
+ long maxReturnSize = child.calculateMaxReturnSize();
+ minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+ currentRetainedSize += (maxReturnSize +
child.calculateRetainedSizeAfterCallingNext());
}
- return max;
+ return currentRetainedSize - minChildReturnSize;
+ }
+
+ // region helper function used in prepareInput
+
+ /**
+ * @param currentChildIndex the index of the child
+ * @return true if we can skip the currentChild in prepareInput
+ */
+ @Override
+ protected boolean canSkipCurrentChild(int currentChildIndex) {
+ return noMoreTsBlocks[currentChildIndex]
+ || !isEmpty(currentChildIndex)
+ || children.get(currentChildIndex) == null;
+ }
+
+ /** @param currentInputIndex index of the input TsBlock */
+ @Override
+ protected void processCurrentInputTsBlock(int currentInputIndex) {
+ mergeSortHeap.push(new MergeSortKey(inputTsBlocks[currentInputIndex], 0,
currentInputIndex));
}
+
+ /**
+ * @param currentChildIndex the index of the child
+ * @throws Exception Potential Exception thrown by Operator.close()
+ */
+ @Override
+ protected void handleFinishedChild(int currentChildIndex) throws Exception {
+ noMoreTsBlocks[currentChildIndex] = true;
+ inputTsBlocks[currentChildIndex] = null;
+ children.get(currentChildIndex).close();
+ children.set(currentChildIndex, null);
+ }
+
+ // endregion
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 3ecf47ad30a..baf0ff42407 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -806,7 +806,6 @@ public class LogicalPlanBuilder {
valueFilterLimit);
} else {
// has order by expression, use TopKNode + DeviceViewNode
-
topKNode.addChild(
addDeviceViewNode(
orderByParameter,
@@ -837,8 +836,8 @@ public class LogicalPlanBuilder {
addAggMergeSortNode(
orderByParameter,
outputColumnNames,
- deviceToMeasurementIndexesMap,
deviceNameToSourceNodesMap,
+ deviceToMeasurementIndexesMap,
-1);
} else {
this.root =
@@ -953,8 +952,8 @@ public class LogicalPlanBuilder {
private MultiChildProcessNode addAggMergeSortNode(
OrderByParameter orderByParameter,
List<String> outputColumnNames,
- Map<String, List<Integer>> deviceToMeasurementIndexesMap,
Map<String, PlanNode> deviceNameToSourceNodesMap,
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap,
long valueFilterLimit) {
AggregationMergeSortNode aggMergeSortNode =
new AggregationMergeSortNode(
@@ -964,14 +963,14 @@ public class LogicalPlanBuilder {
deviceToMeasurementIndexesMap);
for (Map.Entry<String, PlanNode> entry :
deviceNameToSourceNodesMap.entrySet()) {
- String deviceName = entry.getKey();
PlanNode subPlan = entry.getValue();
if (valueFilterLimit > 0) {
+ // TODO test this situation
LimitNode limitNode =
new LimitNode(context.getQueryId().genPlanNodeId(), subPlan,
valueFilterLimit);
- aggMergeSortNode.addChildDeviceNode(deviceName, limitNode);
+ aggMergeSortNode.addChild(limitNode);
} else {
- aggMergeSortNode.addChildDeviceNode(deviceName, subPlan);
+ aggMergeSortNode.addChild(subPlan);
}
}
return aggMergeSortNode;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
index efe5996fc56..08817fb5e52 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/OperatorTreeGenerator.java
@@ -833,16 +833,25 @@ public class OperatorTreeGenerator extends
PlanVisitor<Operator, LocalExecutionP
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- DeviceViewOperator.class.getSimpleName());
- List<Operator> children = dealWithConsumeChildrenOneByOneNode(node,
context);
- List<List<Integer>> deviceColumnIndex =
- node.getDevices().stream()
- .map(deviceName ->
node.getDeviceToMeasurementIndexesMap().get(deviceName))
- .collect(Collectors.toList());
- List<TSDataType> outputColumnTypes = getOutputColumnTypes(node,
context.getTypeProvider());
+ MergeSortOperator.class.getSimpleName());
+ List<TSDataType> dataTypes = getOutputColumnTypes(node,
context.getTypeProvider());
+ context.setCachedDataTypes(dataTypes);
+ List<Operator> children = dealWithConsumeAllChildrenPipelineBreaker(node,
context);
+ List<SortItem> sortItemList =
node.getMergeOrderParameter().getSortItemList();
+ List<Integer> sortItemIndexList = new ArrayList<>(sortItemList.size());
+ List<TSDataType> sortItemDataTypeList = new
ArrayList<>(sortItemList.size());
+ genSortInformation(
+ node.getOutputColumnNames(),
+ dataTypes,
+ sortItemList,
+ sortItemIndexList,
+ sortItemDataTypeList);
return new AggregationMergeSortOperator(
- operatorContext, node.getDevices(), children, deviceColumnIndex,
outputColumnTypes);
+ operatorContext,
+ children,
+ dataTypes,
+ MergeSortComparator.getComparator(sortItemList, sortItemIndexList,
sortItemDataTypeList));
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index ced6ab74a0b..05901a22f5a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -507,6 +507,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
return newNode;
}
+ // TODO the impl of this method is not clear
private PlanNode processAggMergeSortNode(
MultiChildProcessNode node,
List<PlanNode> visitedChildren,
@@ -514,11 +515,11 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
MultiChildProcessNode newNode,
TRegionReplicaSet dataRegion) {
AggregationMergeSortNode aggMergeSortNode = (AggregationMergeSortNode)
node;
- Map<TRegionReplicaSet, DeviceViewNode> regionTopKNodeMap = new HashMap<>();
+ Map<TRegionReplicaSet, DeviceViewNode> regionAggMergeNodeMap = new
HashMap<>();
for (PlanNode child : visitedChildren) {
TRegionReplicaSet region =
context.getNodeDistribution(child.getPlanNodeId()).region;
DeviceViewNode deviceViewNode =
- regionTopKNodeMap.computeIfAbsent(
+ regionAggMergeNodeMap.computeIfAbsent(
region,
k -> {
DeviceViewNode childDeviceViewNode =
@@ -539,7 +540,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
deviceViewNode.addChildDeviceNode(device, child);
}
- for (Map.Entry<TRegionReplicaSet, DeviceViewNode> entry :
regionTopKNodeMap.entrySet()) {
+ for (Map.Entry<TRegionReplicaSet, DeviceViewNode> entry :
regionAggMergeNodeMap.entrySet()) {
TRegionReplicaSet deviceViewNodeLocatedRegion = entry.getKey();
DeviceViewNode deviceViewNode = entry.getValue();
@@ -564,6 +565,7 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
} else if (child instanceof AlignedSeriesAggregationScanNode) {
device = ((AlignedSeriesAggregationScanNode)
child).getAlignedPath().getDevice();
} else {
+ // TODO can be other Node?
throw new UnsupportedOperationException(
String.format("Unsupported child node of AggMergeSortNode, node:
%s", child.getClass()));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
index d152fcd4288..6d0cc848c24 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SourceRewriter.java
@@ -236,23 +236,17 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
@Override
public List<PlanNode> visitAggregationMergeSort(
AggregationMergeSortNode node, DistributionPlanContext context) {
+ // TODO what's the meaning of newRoot
AggregationMergeSortNode newRoot =
new AggregationMergeSortNode(
context.queryContext.getQueryId().genPlanNodeId(),
node.getMergeOrderParameter(),
node.getOutputColumnNames(),
node.getDeviceToMeasurementIndexesMap());
- for (int i = 0; i < node.getDevices().size(); i++) {
+ for (int i = 0; i < node.getChildren().size(); i++) {
List<PlanNode> rewroteNode = rewrite(node.getChildren().get(i), context);
for (PlanNode planNode : rewroteNode) {
- if (planNode instanceof AggregationNode) {
- // if children of AggregationNode can only be AggScanNode?
- for (PlanNode aggScanNode : planNode.getChildren()) {
- newRoot.addChildDeviceNode(node.getDevices().get(i), aggScanNode);
- }
- } else {
- newRoot.addChildDeviceNode(node.getDevices().get(i), planNode);
- }
+ newRoot.addChild(planNode);
}
}
return Collections.singletonList(newRoot);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
index 6db8bb75fbd..df3d42d4d44 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java
@@ -194,7 +194,6 @@ public class PlanGraphPrinter extends
PlanVisitor<List<String>, PlanGraphPrinter
AggregationMergeSortNode node, GraphContext context) {
List<String> boxValue = new ArrayList<>();
boxValue.add(String.format("AggMergeSort-%s",
node.getPlanNodeId().getId()));
- boxValue.add(String.format("AggDeviceCount: %d",
node.getDevices().size()));
return render(node, boxValue, context);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
index 58e1b9aed12..d18f408a599 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggregationMergeSortNode.java
@@ -37,109 +37,82 @@ import java.util.Objects;
public class AggregationMergeSortNode extends MultiChildProcessNode {
- // The result output order, which could sort by device and time.
- // The size of this list is 2 and the first SortItem in this list has higher
priority.
- protected final OrderByParameter mergeOrderParameter;
+ private final OrderByParameter mergeOrderParameter;
- // The size devices and children should be the same.
- protected final List<String> devices = new ArrayList<>();
-
- // Device column and measurement columns in result output
- protected final List<String> outputColumnNames;
+ private final List<String> outputColumns;
// e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 ->
[1, 3], s1 is 1 but
// not 0 because device is the first column
- final Map<String, List<Integer>> deviceToMeasurementIndexesMap;
+ private final Map<String, List<Integer>> deviceToMeasurementIndexesMap;
public AggregationMergeSortNode(
PlanNodeId id,
OrderByParameter mergeOrderParameter,
- List<String> outputColumnNames,
+ List<String> outputColumns,
Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
super(id);
this.mergeOrderParameter = mergeOrderParameter;
- this.outputColumnNames = outputColumnNames;
+ this.outputColumns = outputColumns;
this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
}
public AggregationMergeSortNode(
PlanNodeId id,
+ List<PlanNode> children,
OrderByParameter mergeOrderParameter,
- List<String> outputColumnNames,
- List<String> devices,
+ List<String> outputColumns,
Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
- super(id);
+ super(id, children);
this.mergeOrderParameter = mergeOrderParameter;
- this.outputColumnNames = outputColumnNames;
- this.devices.addAll(devices);
+ this.outputColumns = outputColumns;
this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
}
- @Override
- public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
- return visitor.visitAggregationMergeSort(this, context);
+ public OrderByParameter getMergeOrderParameter() {
+ return mergeOrderParameter;
+ }
+
+ public Map<String, List<Integer>> getDeviceToMeasurementIndexesMap() {
+ return deviceToMeasurementIndexesMap;
}
@Override
public PlanNode clone() {
return new AggregationMergeSortNode(
getPlanNodeId(),
- mergeOrderParameter,
- outputColumnNames,
- devices,
- deviceToMeasurementIndexesMap);
- }
-
- @Override
- public List<String> getOutputColumnNames() {
- return outputColumnNames;
+ getMergeOrderParameter(),
+ outputColumns,
+ getDeviceToMeasurementIndexesMap());
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- AggregationMergeSortNode that = (AggregationMergeSortNode) o;
- return mergeOrderParameter.equals(that.mergeOrderParameter)
- && devices.equals(that.devices)
- && outputColumnNames.equals(that.outputColumnNames)
- &&
deviceToMeasurementIndexesMap.equals(that.deviceToMeasurementIndexesMap);
+ public PlanNode createSubNode(int subNodeId, int startIndex, int endIndex) {
+ return new AggregationMergeSortNode(
+ new PlanNodeId(String.format("%s-%s", getPlanNodeId(), subNodeId)),
+ new ArrayList<>(children.subList(startIndex, endIndex)),
+ getMergeOrderParameter(),
+ outputColumns,
+ getDeviceToMeasurementIndexesMap());
}
@Override
- public int hashCode() {
- return Objects.hash(
- super.hashCode(),
- mergeOrderParameter,
- devices,
- outputColumnNames,
- deviceToMeasurementIndexesMap);
+ public List<String> getOutputColumnNames() {
+ return outputColumns;
}
@Override
- public String toString() {
- return "AggMergeSort-" + this.getPlanNodeId();
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitAggregationMergeSort(this, context);
}
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.AGG_MERGE_SORT.serialize(byteBuffer);
mergeOrderParameter.serializeAttributes(byteBuffer);
- ReadWriteIOUtils.write(outputColumnNames.size(), byteBuffer);
- for (String column : outputColumnNames) {
+ ReadWriteIOUtils.write(outputColumns.size(), byteBuffer);
+ for (String column : outputColumns) {
ReadWriteIOUtils.write(column, byteBuffer);
}
- ReadWriteIOUtils.write(devices.size(), byteBuffer);
- for (String deviceName : devices) {
- ReadWriteIOUtils.write(deviceName, byteBuffer);
- }
ReadWriteIOUtils.write(deviceToMeasurementIndexesMap.size(), byteBuffer);
for (Map.Entry<String, List<Integer>> entry :
deviceToMeasurementIndexesMap.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), byteBuffer);
@@ -154,14 +127,10 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
protected void serializeAttributes(DataOutputStream stream) throws
IOException {
PlanNodeType.AGG_MERGE_SORT.serialize(stream);
mergeOrderParameter.serializeAttributes(stream);
- ReadWriteIOUtils.write(outputColumnNames.size(), stream);
- for (String column : outputColumnNames) {
+ ReadWriteIOUtils.write(outputColumns.size(), stream);
+ for (String column : outputColumns) {
ReadWriteIOUtils.write(column, stream);
}
- ReadWriteIOUtils.write(devices.size(), stream);
- for (String deviceName : devices) {
- ReadWriteIOUtils.write(deviceName, stream);
- }
ReadWriteIOUtils.write(deviceToMeasurementIndexesMap.size(), stream);
for (Map.Entry<String, List<Integer>> entry :
deviceToMeasurementIndexesMap.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), stream);
@@ -173,19 +142,13 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
}
public static AggregationMergeSortNode deserialize(ByteBuffer byteBuffer) {
- OrderByParameter mergeOrderParameter =
OrderByParameter.deserialize(byteBuffer);
+ OrderByParameter orderByParameter =
OrderByParameter.deserialize(byteBuffer);
int columnSize = ReadWriteIOUtils.readInt(byteBuffer);
- List<String> outputColumnNames = new ArrayList<>();
+ List<String> outputColumns = new ArrayList<>();
while (columnSize > 0) {
- outputColumnNames.add(ReadWriteIOUtils.readString(byteBuffer));
+ outputColumns.add(ReadWriteIOUtils.readString(byteBuffer));
columnSize--;
}
- int devicesSize = ReadWriteIOUtils.readInt(byteBuffer);
- List<String> devices = new ArrayList<>();
- while (devicesSize > 0) {
- devices.add(ReadWriteIOUtils.readString(byteBuffer));
- devicesSize--;
- }
int mapSize = ReadWriteIOUtils.readInt(byteBuffer);
Map<String, List<Integer>> deviceToMeasurementIndexesMap = new
HashMap<>(mapSize);
while (mapSize > 0) {
@@ -201,23 +164,31 @@ public class AggregationMergeSortNode extends
MultiChildProcessNode {
}
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
return new AggregationMergeSortNode(
- planNodeId, mergeOrderParameter, outputColumnNames, devices,
deviceToMeasurementIndexesMap);
- }
-
- public void addChildDeviceNode(String deviceName, PlanNode childNode) {
- this.devices.add(deviceName);
- this.children.add(childNode);
+ planNodeId, orderByParameter, outputColumns,
deviceToMeasurementIndexesMap);
}
- public List<String> getDevices() {
- return devices;
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ AggregationMergeSortNode that = (AggregationMergeSortNode) o;
+ return Objects.equals(mergeOrderParameter, that.getMergeOrderParameter());
}
- public Map<String, List<Integer>> getDeviceToMeasurementIndexesMap() {
- return deviceToMeasurementIndexesMap;
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), mergeOrderParameter);
}
- public OrderByParameter getMergeOrderParameter() {
- return mergeOrderParameter;
+ @Override
+ public String toString() {
+ return "AggregationMergeSort-" + this.getPlanNodeId();
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
index 6880e08951f..17211459d65 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/AggregationOperatorTest.java
@@ -124,9 +124,8 @@ public class AggregationOperatorTest {
if (!aggregationOperator.hasNext()) {
break;
}
- TsBlock resultTsBlock = null;
- resultTsBlock = aggregationOperator.next();
+ TsBlock resultTsBlock = aggregationOperator.next();
if (resultTsBlock == null) {
continue;
}
@@ -166,8 +165,7 @@ public class AggregationOperatorTest {
if (!aggregationOperator.hasNext()) {
break;
}
- TsBlock resultTsBlock = null;
- resultTsBlock = aggregationOperator.next();
+ TsBlock resultTsBlock = aggregationOperator.next();
if (resultTsBlock == null) {
continue;
}
@@ -215,8 +213,7 @@ public class AggregationOperatorTest {
if (!aggregationOperator.hasNext()) {
break;
}
- TsBlock resultTsBlock = null;
- resultTsBlock = aggregationOperator.next();
+ TsBlock resultTsBlock = aggregationOperator.next();
if (resultTsBlock == null) {
continue;
}
@@ -267,8 +264,7 @@ public class AggregationOperatorTest {
if (!aggregationOperator.hasNext()) {
break;
}
- TsBlock resultTsBlock = null;
- resultTsBlock = aggregationOperator.next();
+ TsBlock resultTsBlock = aggregationOperator.next();
if (resultTsBlock == null) {
continue;
}
@@ -313,10 +309,7 @@ public class AggregationOperatorTest {
driverContext.addOperatorContext(3, planNodeId3,
AggregationOperator.class.getSimpleName());
driverContext
.getOperatorContexts()
- .forEach(
- operatorContext -> {
- operatorContext.setMaxRunTime(TEST_TIME_SLICE);
- });
+ .forEach(operatorContext ->
OperatorContext.setMaxRunTime(TEST_TIME_SLICE));
MeasurementPath measurementPath1 =
new MeasurementPath(AGGREGATION_OPERATOR_TEST_SG + ".device0.sensor0",
TSDataType.INT32);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
new file mode 100644
index 00000000000..052032d2c28
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/AggregationAlignByDeviceTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.planner.distribution;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class AggregationAlignByDeviceTest {
+ private static final long LIMIT_VALUE = 10;
+
+ QueryId queryId = new QueryId("test");
+ MPPQueryContext context =
+ new MPPQueryContext("", queryId, null, new TEndPoint(), new TEndPoint());
+ String sql;
+ Analysis analysis;
+ PlanNode logicalPlanNode;
+ DistributionPlanner planner;
+ DistributedQueryPlan plan;
+ PlanNode firstFiRoot;
+ PlanNode firstFiTopNode;
+ PlanNode mergeSortNode;
+
+ /*
+ * IdentitySinkNode-27
+ * └──LimitNode-22
+ * └──FilterNode-12
+ * └──DeviceView-14
+ * ├──AggregationNode-5
+ * │ └──FilterNode-4
+ * │ └──FullOuterTimeJoinNode-3
+ * │ ├──SeriesScanNode-15:[SeriesPath:
root.sg.d1.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+ * │ ├──SeriesScanNode-17:[SeriesPath:
root.sg.d1.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
+ * │ └──ExchangeNode-23:
[SourceAddress:192.0.2.1/test.2.0/25]
+ * └──ExchangeNode-24: [SourceAddress:192.0.3.1/test.3.0/26]
+ *
+ * IdentitySinkNode-25
+ * └──FullOuterTimeJoinNode-19
+ * ├──SeriesScanNode-16:[SeriesPath: root.sg.d1.s1, DataRegion:
TConsensusGroupId(type:DataRegion, id:2)]
+ * └──SeriesScanNode-18:[SeriesPath: root.sg.d1.s2, DataRegion:
TConsensusGroupId(type:DataRegion, id:2)]
+ *
+ * IdentitySinkNode-26
+ * └──AggregationNode-10
+ * └──FilterNode-9
+ * └──FullOuterTimeJoinNode-8
+ * ├──SeriesScanNode-20:[SeriesPath: root.sg.d22.s1,
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+ * └──SeriesScanNode-21:[SeriesPath: root.sg.d22.s2,
DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
+ */
+ @Test
+ public void oneMeasurementOneRegionTest() {
+ // aggregation + order by device, no value filter
+ sql = "select first_value(s1) from root.sg.d22 align by device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ assertTrue(firstFiRoot instanceof IdentitySinkNode);
+ assertTrue(firstFiRoot.getChildren().get(0) instanceof LimitNode);
+ PlanNode filterNode = ((LimitNode)
firstFiRoot.getChildren().get(0)).getChild();
+ assertTrue(filterNode instanceof FilterNode);
+ assertTrue(filterNode.getChildren().get(0) instanceof DeviceViewNode);
+ assertTrue(filterNode.getChildren().get(0).getChildren().get(0) instanceof
AggregationNode);
+ assertTrue(
+
filterNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
+ instanceof FilterNode);
+ PlanNode thirdFiRoot =
plan.getInstances().get(2).getFragment().getPlanNodeTree();
+ assertTrue(thirdFiRoot instanceof IdentitySinkNode);
+ assertTrue(thirdFiRoot.getChildren().get(0) instanceof AggregationNode);
+ assertTrue(thirdFiRoot.getChildren().get(0).getChildren().get(0)
instanceof FilterNode);
+ }
+
+ @Test
+ public void twoMeasurementMultiRegionTest() {
+ // aggregation + order by device, no value filter
+ sql = "select count(s1), first_value(s2) from root.sg.d1,root.sg.d333
align by device";
+ analysis = Util.analyze(sql, context);
+ logicalPlanNode = Util.genLogicalPlan(analysis, context);
+ planner = new DistributionPlanner(analysis, new LogicalQueryPlan(context,
logicalPlanNode));
+ plan = planner.planFragments();
+ assertEquals(3, plan.getInstances().size());
+ firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
+ System.out.println("aa");
+ }
+}