This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/agg_plan_device_cross_region
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to 
refs/heads/beyyes/agg_plan_device_cross_region by this push:
     new 5de943b495b temp
5de943b495b is described below

commit 5de943b495ba949f10ea2b4b9a1f2859a95364be
Author: Beyyes <[email protected]>
AuthorDate: Thu Feb 29 00:58:46 2024 +0800

    temp
---
 .../process/AggregationMergeSortOperator.java        | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
index b83e491e522..331784d1451 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/AggregationMergeSortOperator.java
@@ -28,11 +28,13 @@ import 
org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.TimeRange;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
 import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
 import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
 import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iotdb.tsfile.utils.Binary;
+import org.checkerframework.checker.units.qual.C;
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -142,23 +144,31 @@ public class AggregationMergeSortOperator extends 
AbstractConsumeAllOperator {
         Binary device = tsBlock.getColumn(0).getBinary(readIndex[idx]);
         if (device.equals(currentDevice)) {
           currentTime = tsBlock.getTimeColumn().getLong(readIndex[idx]);
-          int cnt = 0;
+          int cnt = 1;
           for (int i = 0; i < accumulators.size(); i++) {
             Accumulator accumulator = accumulators.get(i);
-            if (newAggregationIdx.get(i) == 2) {
-              accumulator.addIntermediate(tsBlock.getColumns(new int[2]{cnt++, 
cnt+}));
+            if (accumulator.getPartialResultSize() == 2) {
+              Column[] columns = new Column[2];
+              columns[0] = tsBlock.getColumn(cnt++);
+              columns[1] = tsBlock.getColumn(cnt++);
+              accumulator.addIntermediate(columns);
             } else {
-              accumulator.addIntermediate(tsBlock.getColumns(new int[]));
+              Column[] columns = new Column[1];
+              columns[0] = tsBlock.getColumn(cnt++);
+              accumulator.addIntermediate(columns);
             }
           }
           readIndex[idx] ++;
+
+          accumulators.forEach(Accumulator::reset);
         }
       }
 
       timeBuilder.writeLong(currentTime);
       for (int i = 1; i < dataTypes.size(); i++) {
-        accumulators.get(i).outputFinal(valueColumnBuilders[i]);
+        accumulators.get(i-1).outputFinal(valueColumnBuilders[i]);
       }
+      tsBlockBuilder.declarePosition();
 
       currentDevice = null;
 

Reply via email to