This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch beyyes/agg_plan_device_cross_region
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/beyyes/agg_plan_device_cross_region by this push:
new 5de943b495b temp
5de943b495b is described below
commit 5de943b495ba949f10ea2b4b9a1f2859a95364be
Author: Beyyes <[email protected]>
AuthorDate: Thu Feb 29 00:58:46 2024 +0800
temp
---
.../process/AggregationMergeSortOperator.java | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
index b83e491e522..331784d1451 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
@@ -28,11 +28,13 @@ import
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.iotdb.tsfile.utils.Binary;
+import org.checkerframework.checker.units.qual.C;
import java.util.ArrayList;
import java.util.Comparator;
@@ -142,23 +144,31 @@ public class AggregationMergeSortOperator extends
AbstractConsumeAllOperator {
Binary device = tsBlock.getColumn(0).getBinary(readIndex[idx]);
if (device.equals(currentDevice)) {
currentTime = tsBlock.getTimeColumn().getLong(readIndex[idx]);
- int cnt = 0;
+ int cnt = 1;
for (int i = 0; i < accumulators.size(); i++) {
Accumulator accumulator = accumulators.get(i);
- if (newAggregationIdx.get(i) == 2) {
- accumulator.addIntermediate(tsBlock.getColumns(new int[2]{cnt++,
cnt+}));
+ if (accumulator.getPartialResultSize() == 2) {
+ Column[] columns = new Column[2];
+ columns[0] = tsBlock.getColumn(cnt++);
+ columns[1] = tsBlock.getColumn(cnt++);
+ accumulator.addIntermediate(columns);
} else {
- accumulator.addIntermediate(tsBlock.getColumns(new int[]));
+ Column[] columns = new Column[1];
+ columns[0] = tsBlock.getColumn(cnt++);
+ accumulator.addIntermediate(columns);
}
}
readIndex[idx] ++;
+
+ accumulators.forEach(Accumulator::reset);
}
}
timeBuilder.writeLong(currentTime);
for (int i = 1; i < dataTypes.size(); i++) {
- accumulators.get(i).outputFinal(valueColumnBuilders[i]);
+ accumulators.get(i-1).outputFinal(valueColumnBuilders[i]);
}
+ tsBlockBuilder.declarePosition();
currentDevice = null;