This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch aggregator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5843336eeb9151c303f40bc80455af8181582d66 Author: Alima777 <[email protected]> AuthorDate: Sun May 1 16:56:33 2022 +0800 add unit tests --- .../operator/aggregation/AccumulatorFactory.java | 12 ++ .../db/mpp/operator/aggregation/Aggregator.java | 4 +- .../aggregation/FirstValueAccumulator.java | 1 + .../aggregation/FirstValueDescAccumulator.java | 2 +- .../operator/aggregation/LastValueAccumulator.java | 2 +- .../aggregation/LastValueDescAccumulator.java | 15 ++- .../aggregation/MaxTimeDescAccumulator.java | 15 ++- .../operator/aggregation/MinTimeAccumulator.java | 2 + .../operator/aggregation/MinValueAccumulator.java | 2 +- .../operator/SeriesAggregateScanOperatorTest.java | 146 ++++++++++++++++++--- .../apache/iotdb/tsfile/utils/TsPrimitiveType.java | 7 + 11 files changed, 176 insertions(+), 32 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java index 10619b9e35..64ad9fc917 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java @@ -22,6 +22,9 @@ package org.apache.iotdb.db.mpp.operator.aggregation; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import java.util.ArrayList; +import java.util.List; + public class AccumulatorFactory { // TODO: Are we going to create different seriesScanOperator based on order by sequence? @@ -56,4 +59,13 @@ public class AccumulatorFactory { throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType); } } + + public static List<Accumulator> createAccumulators( + List<AggregationType> aggregationTypes, TSDataType tsDataType, boolean ascending) { + List<Accumulator> accumulators = new ArrayList<>(); + for (AggregationType aggregationType : aggregationTypes) { + accumulators.add(createAccumulator(aggregationType, tsDataType, ascending)); + } + return accumulators; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java index 617d89d4ae..5cad7948c3 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java @@ -97,9 +97,9 @@ public class Aggregator { public TSDataType[] getOutputType() { if (step.isOutputPartial()) { - return new TSDataType[] {accumulator.getFinalType()}; - } else { return accumulator.getIntermediateType(); + } else { + return new TSDataType[] {accumulator.getFinalType()}; } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java index 8fa0801faf..bdb4ff49e8 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java @@ -80,6 +80,7 @@ public class FirstValueAccumulator implements Accumulator { @Override public void reset() { + hasCandidateResult = false; this.minTime = Long.MAX_VALUE; this.firstValue.reset(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java index 87b939438e..674e251f6b 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueDescAccumulator.java @@ -35,7 +35,7 @@ public class FirstValueDescAccumulator extends FirstValueAccumulator { for (int i = 0; i < column[0].getPositionCount(); i++) { long curTime = column[0].getLong(i); if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) { - updateFirstValue(column[1].getObject(0), curTime); + updateFirstValue(column[1].getObject(i), curTime); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java index 901759b687..8f636e5594 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java @@ -41,7 +41,7 @@ public class LastValueAccumulator implements Accumulator { for (int i = 0; i < column[0].getPositionCount(); i++) { long curTime = column[0].getLong(i); if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) { - updateLastValue(column[1].getObject(0), curTime); + updateLastValue(column[1].getObject(i), curTime); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java index 3d3f61644f..6cc8965251 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueDescAccumulator.java @@ -34,9 +34,12 @@ public class LastValueDescAccumulator extends LastValueAccumulator { // Column should be like: | Time | Value | @Override public void addInput(Column[] column, TimeRange timeRange) { - long curTime = column[0].getLong(0); - if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) { - updateLastValue(column[1].getObject(0), curTime); + // Data inside tsBlock is still in ascending order, we have to traverse the first tsBlock + for (int i = 0; i < column[0].getPositionCount(); i++) { + long curTime = column[0].getLong(i); + if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) { + updateLastValue(column[1].getObject(i), curTime); + } } } @@ -45,6 +48,12 @@ public class LastValueDescAccumulator extends LastValueAccumulator { return hasCandidateResult; } + @Override + public void reset() { + hasCandidateResult = false; + super.reset(); + } + protected void updateLastValue(Object value, long curTime) { hasCandidateResult = true; super.updateLastValue(value, curTime); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java index 01fb65f541..03f99eb61c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeDescAccumulator.java @@ -29,9 +29,12 @@ public class MaxTimeDescAccumulator extends MaxTimeAccumulator { // Column should be like: | Time | @Override public void addInput(Column[] column, TimeRange timeRange) { - long curTime = column[0].getLong(0); - if (curTime < timeRange.getMax() && curTime >= timeRange.getMin()) { - updateMaxTime(curTime); + // Data inside tsBlock is still in ascending order, we have to traverse the tsBlock + for (int i = 0; i < column[0].getPositionCount(); i++) { + long curTime = column[0].getLong(i); + if (curTime >= timeRange.getMin() && curTime < timeRange.getMax()) { + updateMaxTime(curTime); + } } } @@ -40,6 +43,12 @@ public class MaxTimeDescAccumulator extends MaxTimeAccumulator { return hasCandidateResult; } + @Override + public void reset() { + hasCandidateResult = false; + super.reset(); + } + protected void updateMaxTime(long curTime) { hasCandidateResult = true; super.updateMaxTime(curTime); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java index b80adbe470..0001a9d791 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java @@ -74,6 +74,8 @@ public class MinTimeAccumulator implements Accumulator { @Override public void reset() { + + hasCandidateResult = false; this.minTime = Long.MAX_VALUE; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java index 97f46724ae..7a6b7b73fe 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java @@ -30,7 +30,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType; public class MinValueAccumulator implements Accumulator { private TsPrimitiveType minResult; - private boolean hasCandidateResult; + private boolean hasCandidateResult = false; public MinValueAccumulator(TSDataType seriesDataType) { this.minResult = TsPrimitiveType.getByType(seriesDataType); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java index 2b4d7c4c7c..8afac0e69b 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java @@ -50,6 +50,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import com.google.common.collect.Sets; import org.junit.After; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import java.io.IOException; @@ -89,16 +90,12 @@ public class SeriesAggregateScanOperatorTest { @Test public void testAggregationWithoutTimeFilter() throws IllegalPathException { + List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator( - Collections.singletonList( - new Aggregator( - AccumulatorFactory.createAccumulator( - AggregationType.COUNT, TSDataType.INT32, true), - AggregationStep.SINGLE)), - null, - true, - null); + initSeriesAggregateScanOperator(aggregators, null, true, null); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -113,8 +110,11 @@ public class SeriesAggregateScanOperatorTest { List<AggregationType> aggregationTypes = new ArrayList<>(); aggregationTypes.add(AggregationType.COUNT); aggregationTypes.add(AggregationType.SUM); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(null, null, true, null); + initSeriesAggregateScanOperator(aggregators, null, true, null); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -134,8 +134,41 @@ public class SeriesAggregateScanOperatorTest { aggregationTypes.add(AggregationType.MAX_TIME); aggregationTypes.add(AggregationType.MAX_VALUE); aggregationTypes.add(AggregationType.MIN_VALUE); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(null, null, true, null); + initSeriesAggregateScanOperator(aggregators, null, true, null); + int count = 0; + while (seriesAggregateScanOperator.hasNext()) { + TsBlock resultTsBlock = seriesAggregateScanOperator.next(); + assertEquals(20000, resultTsBlock.getColumn(0).getInt(0)); + assertEquals(10499, resultTsBlock.getColumn(1).getInt(0)); + assertEquals(0, resultTsBlock.getColumn(2).getLong(0)); + assertEquals(499, resultTsBlock.getColumn(3).getLong(0)); + assertEquals(20199, resultTsBlock.getColumn(4).getInt(0)); + assertEquals(260, resultTsBlock.getColumn(5).getInt(0)); + count++; + } + assertEquals(1, count); + } + + @Ignore + @Test + public void testMultiAggregationFuncWithoutTimeFilterOrderByTimeDesc() + throws IllegalPathException { + List<AggregationType> aggregationTypes = new ArrayList<>(); + aggregationTypes.add(AggregationType.FIRST_VALUE); + aggregationTypes.add(AggregationType.LAST_VALUE); + aggregationTypes.add(AggregationType.MIN_TIME); + aggregationTypes.add(AggregationType.MAX_TIME); + aggregationTypes.add(AggregationType.MAX_VALUE); + aggregationTypes.add(AggregationType.MIN_VALUE); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, false) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); + SeriesAggregateScanOperator seriesAggregateScanOperator = + initSeriesAggregateScanOperator(aggregators, null, false, null); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -152,9 +185,13 @@ public class SeriesAggregateScanOperatorTest { @Test public void testAggregationWithTimeFilter1() throws IllegalPathException { + List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); Filter timeFilter = TimeFilter.gtEq(120); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(null, timeFilter, true, null); + initSeriesAggregateScanOperator(aggregators, timeFilter, true, null); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -167,8 +204,12 @@ public class SeriesAggregateScanOperatorTest { @Test public void testAggregationWithTimeFilter2() throws IllegalPathException { Filter timeFilter = TimeFilter.ltEq(379); + List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(null, timeFilter, true, null); + initSeriesAggregateScanOperator(aggregators, timeFilter, true, null); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -181,8 +222,12 @@ public class SeriesAggregateScanOperatorTest { @Test public void testAggregationWithTimeFilter3() throws IllegalPathException { Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399)); + List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(null, timeFilter, true, null); + initSeriesAggregateScanOperator(aggregators, timeFilter, true, null); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -201,9 +246,12 @@ public class SeriesAggregateScanOperatorTest { aggregationTypes.add(AggregationType.MAX_TIME); aggregationTypes.add(AggregationType.MAX_VALUE); aggregationTypes.add(AggregationType.MIN_VALUE); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399)); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(null, timeFilter, true, null); + initSeriesAggregateScanOperator(aggregators, timeFilter, true, null); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -222,8 +270,12 @@ public class SeriesAggregateScanOperatorTest { public void testGroupByWithoutGlobalTimeFilter() throws IllegalPathException { int[] result = new int[] {100, 100, 100, 100}; GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true); + List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter); + initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -239,8 +291,12 @@ public class SeriesAggregateScanOperatorTest { int[] result = new int[] {0, 80, 100, 80}; Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379)); GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true); + List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(null, timeFilter, true, groupByTimeParameter); + initSeriesAggregateScanOperator(aggregators, timeFilter, true, groupByTimeParameter); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -266,8 +322,11 @@ public class SeriesAggregateScanOperatorTest { aggregationTypes.add(AggregationType.MAX_VALUE); aggregationTypes.add(AggregationType.MIN_VALUE); GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter); + initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -281,12 +340,50 @@ public class SeriesAggregateScanOperatorTest { assertEquals(4, count); } + @Ignore + @Test + public void testGroupByWithMultiFunctionOrderByTimeDesc() throws IllegalPathException { + int[][] result = + new int[][] { + {20000, 20100, 10200, 10300}, + {20099, 20199, 299, 399}, + {20099, 20199, 10259, 10379}, + {20000, 20100, 260, 380} + }; + List<AggregationType> aggregationTypes = new ArrayList<>(); + aggregationTypes.add(AggregationType.FIRST_VALUE); + aggregationTypes.add(AggregationType.LAST_VALUE); + aggregationTypes.add(AggregationType.MAX_VALUE); + aggregationTypes.add(AggregationType.MIN_VALUE); + GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, false) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); + SeriesAggregateScanOperator seriesAggregateScanOperator = + initSeriesAggregateScanOperator(aggregators, null, false, groupByTimeParameter); + int count = 0; + while (seriesAggregateScanOperator.hasNext()) { + TsBlock resultTsBlock = seriesAggregateScanOperator.next(); + assertEquals(100 * (3 - count), resultTsBlock.getTimeColumn().getLong(0)); + assertEquals(result[0][3 - count], resultTsBlock.getColumn(0).getInt(0)); + assertEquals(result[1][3 - count], resultTsBlock.getColumn(1).getInt(0)); + assertEquals(result[2][3 - count], resultTsBlock.getColumn(2).getInt(0)); + assertEquals(result[3][3 - count], resultTsBlock.getColumn(3).getInt(0)); + count++; + } + assertEquals(4, count); + } + @Test public void testGroupBySlidingTimeWindow() throws IllegalPathException { int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 50}; GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 50, true); + List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter); + initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -302,8 +399,12 @@ public class SeriesAggregateScanOperatorTest { int[] timeColumn = new int[] {0, 20, 30, 50, 60, 80, 90, 110, 120, 140}; int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9}; GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true); + List<AggregationType> aggregationTypes = Collections.singletonList(AggregationType.COUNT); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter); + initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -330,8 +431,11 @@ public class SeriesAggregateScanOperatorTest { aggregationTypes.add(AggregationType.MAX_VALUE); aggregationTypes.add(AggregationType.MIN_VALUE); GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true); + List<Aggregator> aggregators = new ArrayList<>(); + AccumulatorFactory.createAccumulators(aggregationTypes, TSDataType.INT32, true) + .forEach(o -> aggregators.add(new Aggregator(o, AggregationStep.SINGLE))); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter); + initSeriesAggregateScanOperator(aggregators, null, true, groupByTimeParameter); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java index 8251fe9438..938dd4fd60 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java @@ -192,6 +192,7 @@ public abstract class TsPrimitiveType implements Serializable { public void setObject(Object val) { if (val instanceof Binary) { setBinary((Binary) val); + return; } throw new UnSupportedDataTypeException("TsBoolean can only be set Binary value"); } @@ -263,6 +264,7 @@ public abstract class TsPrimitiveType implements Serializable { public void setObject(Object val) { if (val instanceof Integer) { setInt((Integer) val); + return; } throw new UnSupportedDataTypeException("TsInt can only be set Integer value"); } @@ -334,6 +336,7 @@ public abstract class TsPrimitiveType implements Serializable { public void setObject(Object val) { if (val instanceof Long) { setLong((Long) val); + return; } throw new UnSupportedDataTypeException("TsLong can only be set Long value"); } @@ -405,6 +408,7 @@ public abstract class TsPrimitiveType implements Serializable { public void setObject(Object val) { if (val instanceof Float) { setFloat((Float) val); + return; } throw new UnSupportedDataTypeException("TsFloat can only be set float value"); } @@ -476,6 +480,7 @@ public abstract class TsPrimitiveType implements Serializable { public void setObject(Object val) { if (val instanceof Double) { setDouble((Double) val); + return; } throw new UnSupportedDataTypeException("TsDouble can only be set Double value"); } @@ -547,6 +552,7 @@ public abstract class TsPrimitiveType implements Serializable { public void setObject(Object val) { if (val instanceof Binary) { setBinary((Binary) val); + return; } throw new UnSupportedDataTypeException("TsBinary can only be set Binary value"); } @@ -618,6 +624,7 @@ public abstract class TsPrimitiveType implements Serializable { public void setObject(Object val) { if (val instanceof TsPrimitiveType[]) { setVector((TsPrimitiveType[]) val); + return; } throw new UnSupportedDataTypeException("TsVector can only be set TsPrimitiveType[] value"); }
