This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch aggregator
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 5843336eeb9151c303f40bc80455af8181582d66
Author: Alima777 <[email protected]>
AuthorDate: Sun May 1 16:56:33 2022 +0800

    add unit tests
---
 .../operator/aggregation/AccumulatorFactory.java   |  12 ++
 .../db/mpp/operator/aggregation/Aggregator.java    |   4 +-
 .../aggregation/FirstValueAccumulator.java         |   1 +
 .../aggregation/FirstValueDescAccumulator.java     |   2 +-
 .../operator/aggregation/LastValueAccumulator.java |   2 +-
 .../aggregation/LastValueDescAccumulator.java      |  15 ++-
 .../aggregation/MaxTimeDescAccumulator.java        |  15 ++-
 .../operator/aggregation/MinTimeAccumulator.java   |   2 +
 .../operator/aggregation/MinValueAccumulator.java  |   2 +-
 .../operator/SeriesAggregateScanOperatorTest.java  | 146 ++++++++++++++++++---
 .../apache/iotdb/tsfile/utils/TsPrimitiveType.java |   7 +
 11 files changed, 176 insertions(+), 32 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
index 10619b9e35..64ad9fc917 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java
@@ -22,6 +22,9 @@ package org.apache.iotdb.db.mpp.operator.aggregation;
 import org.apache.iotdb.db.query.aggregation.AggregationType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.util.ArrayList;
+import java.util.List;
+
 public class AccumulatorFactory {
 
   // TODO: Are we going to create different seriesScanOperator based on order 
by sequence?
@@ -56,4 +59,13 @@ public class AccumulatorFactory {
         throw new IllegalArgumentException("Invalid Aggregation function: " + 
aggregationType);
     }
   }
+
+  public static List<Accumulator> createAccumulators(
+      List<AggregationType> aggregationTypes, TSDataType tsDataType, boolean 
ascending) {
+    List<Accumulator> accumulators = new ArrayList<>();
+    for (AggregationType aggregationType : aggregationTypes) {
+      accumulators.add(createAccumulator(aggregationType, tsDataType, 
ascending));
+    }
+    return accumulators;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
index 617d89d4ae..5cad7948c3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java
@@ -97,9 +97,9 @@ public class Aggregator {
 
   public TSDataType[] getOutputType() {
     if (step.isOutputPartial()) {
-      return new TSDataType[] {accumulator.getFinalType()};
-    } else {
       return accumulator.getIntermediateType();
+    } else {
+      return new TSDataType[] {accumulator.getFinalType()};
     }
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
index 8fa0801faf..bdb4ff49e8 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java
@@ -80,6 +80,7 @@ public class FirstValueAccumulator implements Accumulator {
 
   @Override
   public void reset() {
+    hasCandidateResult = false;
     this.minTime = Long.MAX_VALUE;
     this.firstValue.reset();
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java
index 87b939438e..674e251f6b 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java
@@ -35,7 +35,7 @@ public class FirstValueDescAccumulator extends 
FirstValueAccumulator {
     for (int i = 0; i < column[0].getPositionCount(); i++) {
       long curTime = column[0].getLong(i);
       if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
-        updateFirstValue(column[1].getObject(0), curTime);
+        updateFirstValue(column[1].getObject(i), curTime);
       }
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
index 901759b687..8f636e5594 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java
@@ -41,7 +41,7 @@ public class LastValueAccumulator implements Accumulator {
     for (int i = 0; i < column[0].getPositionCount(); i++) {
       long curTime = column[0].getLong(i);
       if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
-        updateLastValue(column[1].getObject(0), curTime);
+        updateLastValue(column[1].getObject(i), curTime);
       }
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java
index 3d3f61644f..6cc8965251 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java
@@ -34,9 +34,12 @@ public class LastValueDescAccumulator extends 
LastValueAccumulator {
   // Column should be like: | Time | Value |
   @Override
   public void addInput(Column[] column, TimeRange timeRange) {
-    long curTime = column[0].getLong(0);
-    if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) {
-      updateLastValue(column[1].getObject(0), curTime);
+    // Data inside tsBlock is still in ascending order, we have to traverse 
the first tsBlock
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
+        updateLastValue(column[1].getObject(i), curTime);
+      }
     }
   }
 
@@ -45,6 +48,12 @@ public class LastValueDescAccumulator extends 
LastValueAccumulator {
     return hasCandidateResult;
   }
 
+  @Override
+  public void reset() {
+    hasCandidateResult = false;
+    super.reset();
+  }
+
   protected void updateLastValue(Object value, long curTime) {
     hasCandidateResult = true;
     super.updateLastValue(value, curTime);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java
index 01fb65f541..03f99eb61c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java
@@ -29,9 +29,12 @@ public class MaxTimeDescAccumulator extends 
MaxTimeAccumulator {
   // Column should be like: | Time |
   @Override
   public void addInput(Column[] column, TimeRange timeRange) {
-    long curTime = column[0].getLong(0);
-    if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) {
-      updateMaxTime(curTime);
+    // Data inside tsBlock is still in ascending order, we have to traverse 
the tsBlock
+    for (int i = 0; i < column[0].getPositionCount(); i++) {
+      long curTime = column[0].getLong(i);
+      if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) {
+        updateMaxTime(curTime);
+      }
     }
   }
 
@@ -40,6 +43,12 @@ public class MaxTimeDescAccumulator extends 
MaxTimeAccumulator {
     return hasCandidateResult;
   }
 
+  @Override
+  public void reset() {
+    hasCandidateResult = false;
+    super.reset();
+  }
+
   protected void updateMaxTime(long curTime) {
     hasCandidateResult = true;
     super.updateMaxTime(curTime);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
index b80adbe470..0001a9d791 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java
@@ -74,6 +74,8 @@ public class MinTimeAccumulator implements Accumulator {
 
   @Override
   public void reset() {
+
+    hasCandidateResult = false;
     this.minTime = Long.MAX_VALUE;
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
index 97f46724ae..7a6b7b73fe 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
 public class MinValueAccumulator implements Accumulator {
 
   private TsPrimitiveType minResult;
-  private boolean hasCandidateResult;
+  private boolean hasCandidateResult = false;
 
   public MinValueAccumulator(TSDataType seriesDataType) {
     this.minResult = TsPrimitiveType.getByType(seriesDataType);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
index 2b4d7c4c7c..8afac0e69b 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java
@@ -50,6 +50,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -89,16 +90,12 @@ public class SeriesAggregateScanOperatorTest {
 
   @Test
   public void testAggregationWithoutTimeFilter() throws IllegalPathException {
+    List<AggregationType> aggregationTypes = 
Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
true)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(
-            Collections.singletonList(
-                new Aggregator(
-                    AccumulatorFactory.createAccumulator(
-                        AggregationType.COUNT, TSDataType.INT32, true),
-                    AggregationStep.SINGLE)),
-            null,
-            true,
-            null);
+        initSeriesAggregateScanOperator(aggregators, null, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -113,8 +110,11 @@ public class SeriesAggregateScanOperatorTest {
     List<AggregationType> aggregationTypes = new ArrayList<>();
     aggregationTypes.add(AggregationType.COUNT);
     aggregationTypes.add(AggregationType.SUM);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
true)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, null, true, null);
+        initSeriesAggregateScanOperator(aggregators, null, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -134,8 +134,41 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MAX_TIME);
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
true)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, null, true, null);
+        initSeriesAggregateScanOperator(aggregators, null, true, null);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(20000, resultTsBlock.getColumn(0).getInt(0));
+      assertEquals(10499, resultTsBlock.getColumn(1).getInt(0));
+      assertEquals(0, resultTsBlock.getColumn(2).getLong(0));
+      assertEquals(499, resultTsBlock.getColumn(3).getLong(0));
+      assertEquals(20199, resultTsBlock.getColumn(4).getInt(0));
+      assertEquals(260, resultTsBlock.getColumn(5).getInt(0));
+      count++;
+    }
+    assertEquals(1, count);
+  }
+
+  @Ignore
+  @Test
+  public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc()
+      throws IllegalPathException {
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.FIRST_VALUE);
+    aggregationTypes.add(AggregationType.LAST_VALUE);
+    aggregationTypes.add(AggregationType.MIN_TIME);
+    aggregationTypes.add(AggregationType.MAX_TIME);
+    aggregationTypes.add(AggregationType.MAX_VALUE);
+    aggregationTypes.add(AggregationType.MIN_VALUE);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
false)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(aggregators, null, false, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -152,9 +185,13 @@ public class SeriesAggregateScanOperatorTest {
 
   @Test
   public void testAggregationWithTimeFilter1() throws IllegalPathException {
+    List<AggregationType> aggregationTypes = 
Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
true)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
     Filter timeFilter = TimeFilter.gtEq(120);
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, timeFilter, true, null);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -167,8 +204,12 @@ public class SeriesAggregateScanOperatorTest {
   @Test
   public void testAggregationWithTimeFilter2() throws IllegalPathException {
     Filter timeFilter = TimeFilter.ltEq(379);
+    List<AggregationType> aggregationTypes = 
Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
true)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, timeFilter, true, null);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -181,8 +222,12 @@ public class SeriesAggregateScanOperatorTest {
   @Test
   public void testAggregationWithTimeFilter3() throws IllegalPathException {
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), 
TimeFilter.ltEq(399));
+    List<AggregationType> aggregationTypes = 
Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
true)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, timeFilter, true, null);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -201,9 +246,12 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MAX_TIME);
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
true)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), 
TimeFilter.ltEq(399));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, timeFilter, true, null);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, null);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -222,8 +270,12 @@ public class SeriesAggregateScanOperatorTest {
   public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException 
{
     int[] result = new int[] {100, 100, 100, 100};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 
399, 100, 100, true);
+    List<AggregationType> aggregationTypes = 
Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
true)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, null, true, 
groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, 
groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -239,8 +291,12 @@ public class SeriesAggregateScanOperatorTest {
     int[] result = new int[] {0, 80, 100, 80};
     Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), 
TimeFilter.ltEq(379));
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 
399, 100, 100, true);
+    List<AggregationType> aggregationTypes = 
Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
true)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, timeFilter, true, 
groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, timeFilter, true, 
groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -266,8 +322,11 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 
399, 100, 100, true);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
true)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, null, true, 
groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, 
groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -281,12 +340,50 @@ public class SeriesAggregateScanOperatorTest {
     assertEquals(4, count);
   }
 
+  @Ignore
+  @Test
+  public void testGroupByWithMultiFunctionOrderByTimeDesc() throws 
IllegalPathException {
+    int[][] result =
+        new int[][] {
+          {20000, 20100, 10200, 10300},
+          {20099, 20199, 299, 399},
+          {20099, 20199, 10259, 10379},
+          {20000, 20100, 260, 380}
+        };
+    List<AggregationType> aggregationTypes = new ArrayList<>();
+    aggregationTypes.add(AggregationType.FIRST_VALUE);
+    aggregationTypes.add(AggregationType.LAST_VALUE);
+    aggregationTypes.add(AggregationType.MAX_VALUE);
+    aggregationTypes.add(AggregationType.MIN_VALUE);
+    GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 
399, 100, 100, true);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
false)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
+    SeriesAggregateScanOperator seriesAggregateScanOperator =
+        initSeriesAggregateScanOperator(aggregators, null, false, 
groupByTimeParameter);
+    int count = 0;
+    while (seriesAggregateScanOperator.hasNext()) {
+      TsBlock resultTsBlock = seriesAggregateScanOperator.next();
+      assertEquals(100 * (3 - count), 
resultTsBlock.getTimeColumn().getLong(0));
+      assertEquals(result[0][3 - count], resultTsBlock.getColumn(0).getInt(0));
+      assertEquals(result[1][3 - count], resultTsBlock.getColumn(1).getInt(0));
+      assertEquals(result[2][3 - count], resultTsBlock.getColumn(2).getInt(0));
+      assertEquals(result[3][3 - count], resultTsBlock.getColumn(3).getInt(0));
+      count++;
+    }
+    assertEquals(4, count);
+  }
+
   @Test
   public void testGroupBySlidingTimeWindow() throws IllegalPathException {
     int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 50};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 
399, 100, 50, true);
+    List<AggregationType> aggregationTypes = 
Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
true)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, null, true, 
groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, 
groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -302,8 +399,12 @@ public class SeriesAggregateScanOperatorTest {
     int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140};
     int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9};
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 
149, 50, 30, true);
+    List<AggregationType> aggregationTypes = 
Collections.singletonList(AggregationType.COUNT);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
true)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, null, true, 
groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, 
groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
@@ -330,8 +431,11 @@ public class SeriesAggregateScanOperatorTest {
     aggregationTypes.add(AggregationType.MAX_VALUE);
     aggregationTypes.add(AggregationType.MIN_VALUE);
     GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 
149, 50, 30, true);
+    List<Aggregator> aggregators = new ArrayList<>();
+    AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, 
true)
+        .forEach(o -> aggregators.add(new Aggregator(o, 
AggregationStep.SINGLE)));
     SeriesAggregateScanOperator seriesAggregateScanOperator =
-        initSeriesAggregateScanOperator(null, null, true, 
groupByTimeParameter);
+        initSeriesAggregateScanOperator(aggregators, null, true, 
groupByTimeParameter);
     int count = 0;
     while (seriesAggregateScanOperator.hasNext()) {
       TsBlock resultTsBlock = seriesAggregateScanOperator.next();
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
index 8251fe9438..938dd4fd60 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java
@@ -192,6 +192,7 @@ public abstract class TsPrimitiveType implements 
Serializable {
     public void setObject(Object val) {
       if (val instanceof Binary) {
         setBinary((Binary) val);
+        return;
       }
       throw new UnSupportedDataTypeException("TsBoolean can only be set Binary 
value");
     }
@@ -263,6 +264,7 @@ public abstract class TsPrimitiveType implements 
Serializable {
     public void setObject(Object val) {
       if (val instanceof Integer) {
         setInt((Integer) val);
+        return;
       }
       throw new UnSupportedDataTypeException("TsInt can only be set Integer 
value");
     }
@@ -334,6 +336,7 @@ public abstract class TsPrimitiveType implements 
Serializable {
     public void setObject(Object val) {
       if (val instanceof Long) {
         setLong((Long) val);
+        return;
       }
       throw new UnSupportedDataTypeException("TsLong can only be set Long 
value");
     }
@@ -405,6 +408,7 @@ public abstract class TsPrimitiveType implements 
Serializable {
     public void setObject(Object val) {
       if (val instanceof Float) {
         setFloat((Float) val);
+        return;
       }
       throw new UnSupportedDataTypeException("TsFloat can only be set float 
value");
     }
@@ -476,6 +480,7 @@ public abstract class TsPrimitiveType implements 
Serializable {
     public void setObject(Object val) {
       if (val instanceof Double) {
         setDouble((Double) val);
+        return;
       }
       throw new UnSupportedDataTypeException("TsDouble can only be set Double 
value");
     }
@@ -547,6 +552,7 @@ public abstract class TsPrimitiveType implements 
Serializable {
     public void setObject(Object val) {
       if (val instanceof Binary) {
         setBinary((Binary) val);
+        return;
       }
       throw new UnSupportedDataTypeException("TsBinary can only be set Binary 
value");
     }
@@ -618,6 +624,7 @@ public abstract class TsPrimitiveType implements 
Serializable {
     public void setObject(Object val) {
       if (val instanceof TsPrimitiveType[]) {
         setVector((TsPrimitiveType[]) val);
+        return;
       }
       throw new UnSupportedDataTypeException("TsVector can only be set 
TsPrimitiveType[] value");
     }

Reply via email to