This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/agg_mergesort
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/beyyes/agg_mergesort by this
push:
new c8e5e9e1285 add AggMergeSortNode
c8e5e9e1285 is described below
commit c8e5e9e1285dde2a3cec5ad784bfd4652e37b023
Author: Beyyes <[email protected]>
AuthorDate: Tue Feb 6 00:05:45 2024 +0800
add AggMergeSortNode
---
.../operator/process/AggMergeSortOperator.java | 17 ++--
.../operator/process/DeviceViewOperator.java | 3 +
.../plan/planner/LogicalPlanBuilder.java | 83 +++++++++++------
.../planner/distribution/ExchangeNodeAdder.java | 14 ++-
.../plan/node/process/AggMergeSortNode.java | 102 ++++++++++++++-------
5 files changed, 146 insertions(+), 73 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggMergeSortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggMergeSortOperator.java
index 8616079e069..14348f90512 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggMergeSortOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggMergeSortOperator.java
@@ -21,7 +21,7 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -30,7 +30,6 @@ import
org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.NullColumn;
import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
-import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.util.concurrent.ListenableFuture;
@@ -42,8 +41,8 @@ import java.util.List;
* it to the form we need, adding the device column and allocating value
column to its expected
* location, then get the next device operator until no next device.
*
- * <p>The deviceOperators can be timeJoinOperator or seriesScanOperator that
have not transformed
- * the result form.
+ * <p>The deviceOperators can be aggregationSeriesScanOperator,
imeJoinOperator or
+ * seriesScanOperator that have not transformed the result form.
*
* <p>Attention! If some columns are not existing in one device, those columns
will be null. e.g.
* [s1,s2,s3] is query, but only [s1, s3] exists in device1, then the column
of s2 will be filled
@@ -119,6 +118,10 @@ public class AggMergeSortOperator implements
ProcessOperator {
return null;
}
+ boolean deviceView =
+ getCurDeviceOperator() instanceof DeviceViewOperator
+ || getCurDeviceOperator() instanceof ExchangeOperator;
+
TsBlock tsBlock = getCurDeviceOperator().nextWithTimer();
if (tsBlock == null) {
return null;
@@ -128,11 +131,11 @@ public class AggMergeSortOperator implements
ProcessOperator {
// fill existing columns
Column[] newValueColumns = new Column[dataTypes.size()];
for (int i = 0; i < indexes.size(); i++) {
- newValueColumns[indexes.get(i)] = tsBlock.getColumn(i);
+ newValueColumns[indexes.get(i)] = tsBlock.getColumn(deviceView ? i + 1 :
i);
}
// construct device column
ColumnBuilder deviceColumnBuilder = new BinaryColumnBuilder(null, 1);
- deviceColumnBuilder.writeBinary(new Binary(getCurDeviceName(),
TSFileConfig.STRING_CHARSET));
+ deviceColumnBuilder.writeBinary(tsBlock.getColumn(0).getBinary(0));
newValueColumns[0] =
new RunLengthEncodedColumn(deviceColumnBuilder.build(),
tsBlock.getPositionCount());
// construct other null columns
@@ -146,7 +149,7 @@ public class AggMergeSortOperator implements
ProcessOperator {
@Override
public boolean hasNext() throws Exception {
- return deviceIndex < devices.size();
+ return deviceIndex < deviceOperators.size();
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewOperator.java
index 0dd6329ac40..122b271096b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewOperator.java
@@ -84,6 +84,9 @@ public class DeviceViewOperator implements ProcessOperator {
}
private Operator getCurDeviceOperator() {
+ if (deviceIndex >= deviceOperators.size()) {
+ System.out.printf("aaa");
+ }
return deviceOperators.get(deviceIndex);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
index 4871df6af9d..aee63275d50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanBuilder.java
@@ -806,14 +806,14 @@ public class LogicalPlanBuilder {
valueFilterLimit);
} else {
// has order by expression, use TopKNode + DeviceViewNode
+
topKNode.addChild(
addDeviceViewNode(
orderByParameter,
outputColumnNames,
deviceToMeasurementIndexesMap,
deviceNameToSourceNodesMap,
- valueFilterLimit,
- queryStatement.isAggregationQuery()));
+ valueFilterLimit));
}
analysis.setUseTopKNode();
@@ -832,14 +832,23 @@ public class LogicalPlanBuilder {
this.root = mergeSortNode;
} else {
// order by based on device, use DeviceViewNode
- this.root =
- addDeviceViewNode(
- orderByParameter,
- outputColumnNames,
- deviceToMeasurementIndexesMap,
- deviceNameToSourceNodesMap,
- -1,
- queryStatement.isAggregationQuery());
+ if (queryStatement.isAggregationQuery()) {
+ this.root =
+ addAggMergeSortNode(
+ orderByParameter,
+ outputColumnNames,
+ deviceToMeasurementIndexesMap,
+ deviceNameToSourceNodesMap,
+ -1);
+ } else {
+ this.root =
+ addDeviceViewNode(
+ orderByParameter,
+ outputColumnNames,
+ deviceToMeasurementIndexesMap,
+ deviceNameToSourceNodesMap,
+ -1);
+ }
}
context.getTypeProvider().setType(DEVICE, TSDataType.TEXT);
@@ -914,29 +923,18 @@ public class LogicalPlanBuilder {
}
}
- private DeviceViewNode addDeviceViewNode(
+ private MultiChildProcessNode addDeviceViewNode(
OrderByParameter orderByParameter,
List<String> outputColumnNames,
Map<String, List<Integer>> deviceToMeasurementIndexesMap,
Map<String, PlanNode> deviceNameToSourceNodesMap,
- long valueFilterLimit,
- boolean isAggregation) {
- DeviceViewNode deviceViewNode;
- if (isAggregation) {
- deviceViewNode =
- new AggMergeSortNode(
- context.getQueryId().genPlanNodeId(),
- orderByParameter,
- outputColumnNames,
- deviceToMeasurementIndexesMap);
- } else {
- deviceViewNode =
- new DeviceViewNode(
- context.getQueryId().genPlanNodeId(),
- orderByParameter,
- outputColumnNames,
- deviceToMeasurementIndexesMap);
- }
+ long valueFilterLimit) {
+ DeviceViewNode deviceViewNode =
+ new DeviceViewNode(
+ context.getQueryId().genPlanNodeId(),
+ orderByParameter,
+ outputColumnNames,
+ deviceToMeasurementIndexesMap);
for (Map.Entry<String, PlanNode> entry :
deviceNameToSourceNodesMap.entrySet()) {
String deviceName = entry.getKey();
@@ -952,6 +950,33 @@ public class LogicalPlanBuilder {
return deviceViewNode;
}
+ private MultiChildProcessNode addAggMergeSortNode(
+ OrderByParameter orderByParameter,
+ List<String> outputColumnNames,
+ Map<String, List<Integer>> deviceToMeasurementIndexesMap,
+ Map<String, PlanNode> deviceNameToSourceNodesMap,
+ long valueFilterLimit) {
+ AggMergeSortNode aggMergeSortNode =
+ new AggMergeSortNode(
+ context.getQueryId().genPlanNodeId(),
+ orderByParameter,
+ outputColumnNames,
+ deviceToMeasurementIndexesMap);
+
+ for (Map.Entry<String, PlanNode> entry :
deviceNameToSourceNodesMap.entrySet()) {
+ String deviceName = entry.getKey();
+ PlanNode subPlan = entry.getValue();
+ if (valueFilterLimit > 0) {
+ LimitNode limitNode =
+ new LimitNode(context.getQueryId().genPlanNodeId(), subPlan,
valueFilterLimit);
+ aggMergeSortNode.addChildDeviceNode(deviceName, limitNode);
+ } else {
+ aggMergeSortNode.addChildDeviceNode(deviceName, subPlan);
+ }
+ }
+ return aggMergeSortNode;
+ }
+
public LogicalPlanBuilder planGroupByLevel(
Map<Expression, Set<Expression>> groupByLevelExpressions,
GroupByTimeParameter groupByTimeParameter,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 8e6fed5a02a..03d02d58e8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -516,8 +516,8 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
Map<TRegionReplicaSet, DeviceViewNode> regionTopKNodeMap = new HashMap<>();
for (PlanNode child : visitedChildren) {
TRegionReplicaSet region =
context.getNodeDistribution(child.getPlanNodeId()).region;
- regionTopKNodeMap
- .computeIfAbsent(
+ DeviceViewNode deviceViewNode =
+ regionTopKNodeMap.computeIfAbsent(
region,
k -> {
DeviceViewNode childDeviceViewNode =
@@ -525,13 +525,17 @@ public class ExchangeNodeAdder extends
PlanVisitor<PlanNode, NodeGroupContext> {
context.queryContext.getQueryId().genPlanNodeId(),
aggMergeSortNode.getMergeOrderParameter(),
aggMergeSortNode.getOutputColumnNames(),
- aggMergeSortNode.getDeviceToMeasurementIndexesMap());
+ new HashMap<>());
context.putNodeDistribution(
childDeviceViewNode.getPlanNodeId(),
new
NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, region));
return childDeviceViewNode;
- })
- .addChild(child);
+ });
+ String device = ((SeriesAggregationScanNode)
child).getSeriesPath().getDevice();
+ deviceViewNode
+ .getDeviceToMeasurementIndexesMap()
+ .put(device,
aggMergeSortNode.getDeviceToMeasurementIndexesMap().get(device));
+ deviceViewNode.addChildDeviceNode(device, child);
}
for (Map.Entry<TRegionReplicaSet, DeviceViewNode> entry :
regionTopKNodeMap.entrySet()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggMergeSortNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggMergeSortNode.java
index ad1f2797db7..7ead4883973 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggMergeSortNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/AggMergeSortNode.java
@@ -35,14 +35,31 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
-public class AggMergeSortNode extends DeviceViewNode {
+public class AggMergeSortNode extends MultiChildProcessNode {
+
+ // The result output order, which could sort by device and time.
+ // The size of this list is 2 and the first SortItem in this list has higher
priority.
+ protected final OrderByParameter mergeOrderParameter;
+
+ // The size devices and children should be the same.
+ protected final List<String> devices = new ArrayList<>();
+
+ // Device column and measurement columns in result output
+ protected final List<String> outputColumnNames;
+
+ // e.g. [s1,s2,s3] is query, but [s1, s3] exists in device1, then device1 ->
[1, 3], s1 is 1 but
+ // not 0 because device is the first column
+ final Map<String, List<Integer>> deviceToMeasurementIndexesMap;
public AggMergeSortNode(
PlanNodeId id,
OrderByParameter mergeOrderParameter,
List<String> outputColumnNames,
Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
- super(id, mergeOrderParameter, outputColumnNames,
deviceToMeasurementIndexesMap);
+ super(id);
+ this.mergeOrderParameter = mergeOrderParameter;
+ this.outputColumnNames = outputColumnNames;
+ this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
}
public AggMergeSortNode(
@@ -51,7 +68,11 @@ public class AggMergeSortNode extends DeviceViewNode {
List<String> outputColumnNames,
List<String> devices,
Map<String, List<Integer>> deviceToMeasurementIndexesMap) {
- super(id, mergeOrderParameter, outputColumnNames, devices,
deviceToMeasurementIndexesMap);
+ super(id);
+ this.mergeOrderParameter = mergeOrderParameter;
+ this.outputColumnNames = outputColumnNames;
+ this.devices.addAll(devices);
+ this.deviceToMeasurementIndexesMap = deviceToMeasurementIndexesMap;
}
@Override
@@ -74,6 +95,39 @@ public class AggMergeSortNode extends DeviceViewNode {
return outputColumnNames;
}
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ AggMergeSortNode that = (AggMergeSortNode) o;
+ return mergeOrderParameter.equals(that.mergeOrderParameter)
+ && devices.equals(that.devices)
+ && outputColumnNames.equals(that.outputColumnNames)
+ &&
deviceToMeasurementIndexesMap.equals(that.deviceToMeasurementIndexesMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ super.hashCode(),
+ mergeOrderParameter,
+ devices,
+ outputColumnNames,
+ deviceToMeasurementIndexesMap);
+ }
+
+ @Override
+ public String toString() {
+ return "AggMergeSort-" + this.getPlanNodeId();
+ }
+
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.AGG_MERGE_SORT.serialize(byteBuffer);
@@ -118,7 +172,7 @@ public class AggMergeSortNode extends DeviceViewNode {
}
}
- public static DeviceViewNode deserialize(ByteBuffer byteBuffer) {
+ public static AggMergeSortNode deserialize(ByteBuffer byteBuffer) {
OrderByParameter mergeOrderParameter =
OrderByParameter.deserialize(byteBuffer);
int columnSize = ReadWriteIOUtils.readInt(byteBuffer);
List<String> outputColumnNames = new ArrayList<>();
@@ -146,40 +200,24 @@ public class AggMergeSortNode extends DeviceViewNode {
mapSize--;
}
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new DeviceViewNode(
+ return new AggMergeSortNode(
planNodeId, mergeOrderParameter, outputColumnNames, devices,
deviceToMeasurementIndexesMap);
}
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- DeviceViewNode that = (DeviceViewNode) o;
- return mergeOrderParameter.equals(that.mergeOrderParameter)
- && devices.equals(that.devices)
- && outputColumnNames.equals(that.outputColumnNames)
- &&
deviceToMeasurementIndexesMap.equals(that.deviceToMeasurementIndexesMap);
+ public void addChildDeviceNode(String deviceName, PlanNode childNode) {
+ this.devices.add(deviceName);
+ this.children.add(childNode);
}
- @Override
- public int hashCode() {
- return Objects.hash(
- super.hashCode(),
- mergeOrderParameter,
- devices,
- outputColumnNames,
- deviceToMeasurementIndexesMap);
+ public List<String> getDevices() {
+ return devices;
}
- @Override
- public String toString() {
- return "AggMergeSort-" + this.getPlanNodeId();
+ public Map<String, List<Integer>> getDeviceToMeasurementIndexesMap() {
+ return deviceToMeasurementIndexesMap;
+ }
+
+ public OrderByParameter getMergeOrderParameter() {
+ return mergeOrderParameter;
}
}