This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch aggregator in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1065953784e6c2ed8aabf10ef255f2a0efb05394 Author: Alima777 <[email protected]> AuthorDate: Fri Apr 29 17:00:09 2022 +0800 implement part accumulator --- .../db/mpp/operator/aggregation/Accumulator.java | 27 +++- ...regatorFactory.java => AccumulatorFactory.java} | 26 +++- .../db/mpp/operator/aggregation/Aggregator.java | 18 +-- .../mpp/operator/aggregation/AvgAccumulator.java | 76 +++++++++-- .../mpp/operator/aggregation/CountAccumulator.java | 20 ++- .../operator/aggregation/ExtremeAccumulator.java | 142 +++++++++++++++++++++ ...atorFactory.java => FirstValueAccumulator.java} | 5 +- ...gatorFactory.java => LastValueAccumulator.java} | 5 +- ...regatorFactory.java => MaxTimeAccumulator.java} | 5 +- ...ntAccumulator.java => MaxValueAccumulator.java} | 52 ++++++-- ...regatorFactory.java => MinTimeAccumulator.java} | 5 +- ...ntAccumulator.java => MinValueAccumulator.java} | 54 ++++++-- .../{CountAccumulator.java => SumAccumulator.java} | 70 ++++++++-- .../source/SeriesAggregateScanOperator.java | 26 ++-- .../plan/parameter/AggregationDescriptor.java | 4 + .../operator/SeriesAggregateScanOperatorTest.java | 45 ++++--- .../apache/iotdb/tsfile/utils/TsPrimitiveType.java | 139 +++++++++++++++++++- 17 files changed, 606 insertions(+), 113 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java index f6f268aad1..5833a75959 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Accumulator.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.db.mpp.operator.aggregation; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.column.Column; @@ -25,20 +26,44 @@ import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; public interface Accumulator { - // Column should be like: | Time | Value | + /** Column should be like: | Time | Value | */ void addInput(Column[] column, TimeRange timeRange); + /** + * For aggregation function like COUNT, SUM, partialResult should be single; But for AVG, + * last_value, it should be double column with dictionary order. + */ void addIntermediate(Column[] partialResult); + /** + * This method can only be used in seriesAggregateScanOperator, it will use different statistics + * based on the type of Accumulator. + */ void addStatistics(Statistics statistics); + /** + * Attention: setFinal should be invoked only once, and addInput() and addIntermediate() are not + * allowed again. + */ void setFinal(Column finalResult); + /** + * For aggregation function like COUNT, SUM, partialResult should be single, so its output column + * is single too; But for AVG, last_value, it should be double column with dictionary order. + */ void outputIntermediate(ColumnBuilder[] tsBlockBuilder); + /** Final result is single column for any aggregation function. */ void outputFinal(ColumnBuilder tsBlockBuilder); void reset(); + /** + * For first_value or last_value in decreasing order, we can get final result by the first record. + */ boolean hasFinalResult(); + + TSDataType[] getIntermediateType(); + + TSDataType getFinalType(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java similarity index 57% copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java index fb49e8c8c1..1eb421c4e7 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AccumulatorFactory.java @@ -19,4 +19,28 @@ package org.apache.iotdb.db.mpp.operator.aggregation; -public class AggregatorFactory {} +import org.apache.iotdb.db.query.aggregation.AggregationType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; + +public class AccumulatorFactory { + + public static Accumulator createAccumulator( + AggregationType aggregationType, TSDataType tsDataType) { + switch (aggregationType) { + case COUNT: + return new CountAccumulator(); + case AVG: + return new AvgAccumulator(tsDataType); + case SUM: + case EXTREME: + case MAX_TIME: + case MIN_TIME: + case MAX_VALUE: + case MIN_VALUE: + case LAST_VALUE: + case FIRST_VALUE: + default: + return null; + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java index 8a05f460c5..a808c734bc 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/Aggregator.java @@ -36,22 +36,14 @@ public class Aggregator { // In some intermediate result input, inputLocation[] should include two columns private final List<InputLocation[]> inputLocationList; private final AggregationStep step; - private final TSDataType intermediateType; - private final TSDataType finalType; - private TimeRange timeRange; + private TimeRange timeRange = new TimeRange(0, Long.MAX_VALUE); public Aggregator( - Accumulator accumulator, - AggregationStep step, - List<InputLocation[]> inputLocationList, - TSDataType intermediateType, - TSDataType finalType) { + Accumulator accumulator, AggregationStep step, List<InputLocation[]> inputLocationList) { this.accumulator = accumulator; this.step = step; this.inputLocationList = inputLocationList; - this.intermediateType = intermediateType; - this.finalType = finalType; } // Used for SeriesAggregateScanOperator @@ -96,11 +88,11 @@ public class Aggregator { accumulator.addStatistics(statistics); } - public TSDataType getOutputType() { + public TSDataType[] getOutputType() { if (step.isOutputPartial()) { - return intermediateType; + return new TSDataType[] {accumulator.getFinalType()}; } else { - return finalType; + return accumulator.getIntermediateType(); } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java index dc993a65c5..0898eefcb4 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AvgAccumulator.java @@ -21,10 +21,12 @@ package org.apache.iotdb.db.mpp.operator.aggregation; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; public class AvgAccumulator implements Accumulator { @@ -32,23 +34,60 @@ public class AvgAccumulator implements Accumulator { private long countValue; private double sumValue; + public AvgAccumulator(TSDataType seriesDataType) { + this.seriesDataType = seriesDataType; + } + @Override - public void addInput(Column[] column, TimeRange timeRange) {} + public void addInput(Column[] column, TimeRange timeRange) { + TimeColumn timeColumn = (TimeColumn) column[0]; + for (int i = 0; i < timeColumn.getPositionCount(); i++) { + long curTime = timeColumn.getLong(i); + if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) { + break; + } + countValue++; + updateSumValue(column[1].getObject(i)); + } + } + // partialResult should be like: | countValue1 | sumValue1 | @Override - public void addIntermediate(Column[] partialResult) {} + public void addIntermediate(Column[] partialResult) { + if (partialResult.length != 2) { + throw new IllegalArgumentException("partialResult of Avg should be 2"); + } + countValue += partialResult[0].getLong(0); + updateSumValue(partialResult[1].getObject(0)); + } @Override - public void addStatistics(Statistics statistics) {} + public void addStatistics(Statistics statistics) { + countValue += statistics.getCount(); + if (statistics instanceof IntegerStatistics) { + sumValue += statistics.getSumLongValue(); + } else { + sumValue += statistics.getSumDoubleValue(); + } + } + // Set sumValue to finalResult and keep countValue equals to 1 @Override - public void setFinal(Column finalResult) {} + public void setFinal(Column finalResult) { + reset(); + updateSumValue(finalResult.getObject(0)); + } @Override - public void outputIntermediate(ColumnBuilder[] tsBlockBuilder) {} + public void outputIntermediate(ColumnBuilder[] columnBuilders) { + columnBuilders[0].writeLong(countValue); + columnBuilders[1].writeDouble(sumValue); + } @Override - public void outputFinal(ColumnBuilder tsBlockBuilder) {} + public void outputFinal(ColumnBuilder columnBuilder) { + columnBuilder.writeDouble(sumValue / countValue); + } @Override public void reset() { @@ -61,26 +100,35 @@ public class AvgAccumulator implements Accumulator { return false; } - private void updateAvg(TSDataType type, Object sumVal) throws UnSupportedDataTypeException { - double val; - switch (type) { + @Override + public TSDataType[] getIntermediateType() { + return new TSDataType[] {TSDataType.INT64, TSDataType.DOUBLE}; + } + + @Override + public TSDataType getFinalType() { + return TSDataType.DOUBLE; + } + + private void updateSumValue(Object sumVal) throws UnSupportedDataTypeException { + switch (seriesDataType) { case INT32: - val = (int) sumVal; + sumValue += (int) sumVal; break; case INT64: - val = (long) sumVal; + sumValue = (long) sumVal; break; case FLOAT: - val = (float) sumVal; + sumValue = (float) sumVal; break; case DOUBLE: - val = (double) sumVal; + sumValue = (double) sumVal; break; case TEXT: case BOOLEAN: default: throw new UnSupportedDataTypeException( - String.format("Unsupported data type in aggregation AVG : %s", type)); + String.format("Unsupported data type in aggregation AVG : %s", seriesDataType)); } } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java index db266b55e4..84dae3f986 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.mpp.operator.aggregation; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.column.Column; @@ -29,6 +30,8 @@ public class CountAccumulator implements Accumulator { private long countValue = 0; + public CountAccumulator() {} + // Column should be like: | Time | Value | @Override public void addInput(Column[] column, TimeRange timeRange) { @@ -42,12 +45,13 @@ public class CountAccumulator implements Accumulator { } } - // partialResult should be like: | partialCountValue1 | partialCountValue2 | + // partialResult should be like: | partialCountValue1 | @Override public void addIntermediate(Column[] partialResult) { - for (int i = 0; i < partialResult.length; i++) { - countValue += partialResult[i].getLong(0); + if (partialResult.length != 1) { + throw new IllegalArgumentException("partialResult of Count should be 1"); } + countValue += partialResult[0].getLong(0); } @Override @@ -81,4 +85,14 @@ public class CountAccumulator implements Accumulator { public boolean hasFinalResult() { return false; } + + @Override + public TSDataType[] getIntermediateType() { + return new TSDataType[] {TSDataType.INT64}; + } + + @Override + public TSDataType getFinalType() { + return TSDataType.INT64; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/ExtremeAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/ExtremeAccumulator.java new file mode 100644 index 0000000000..599feec646 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/ExtremeAccumulator.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.operator.aggregation; + +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.TimeRange; +import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; +import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + +public class ExtremeAccumulator implements Accumulator { + + private TsPrimitiveType extremeResult; + private boolean hasCandidateResult; + + public ExtremeAccumulator(TSDataType seriesDataType) { + this.extremeResult = TsPrimitiveType.getByType(seriesDataType); + } + + @Override + public void addInput(Column[] column, TimeRange timeRange) { + TimeColumn timeColumn = (TimeColumn) column[0]; + for (int i = 0; i < timeColumn.getPositionCount(); i++) { + long curTime = timeColumn.getLong(i); + if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) { + break; + } + updateResult((Comparable<Object>) column[1].getObject(i)); + } + } + + @Override + public void addIntermediate(Column[] partialResult) { + if (partialResult.length != 1) { + throw new IllegalArgumentException("partialResult of ExtremeValue should be 1"); + } + updateResult((Comparable<Object>) partialResult[0].getObject(0)); + } + + @Override + public void addStatistics(Statistics statistics) { + Comparable<Object> maxVal = (Comparable<Object>) statistics.getMaxValue(); + Comparable<Object> minVal = (Comparable<Object>) statistics.getMinValue(); + + Comparable<Object> absMaxVal = (Comparable<Object>) getAbsValue(maxVal); + Comparable<Object> absMinVal = (Comparable<Object>) getAbsValue(minVal); + + Comparable<Object> extVal = absMaxVal.compareTo(absMinVal) >= 0 ? maxVal : minVal; + updateResult(extVal); + } + + @Override + public void setFinal(Column finalResult) { + extremeResult.setObject(finalResult.getObject(0)); + } + + // columnBuilder should be single in ExtremeAccumulator + @Override + public void outputIntermediate(ColumnBuilder[] columnBuilders) { + columnBuilders[0].writeObject(extremeResult.getValue()); + } + + @Override + public void outputFinal(ColumnBuilder columnBuilder) { + columnBuilder.writeObject(extremeResult.getValue()); + } + + @Override + public void reset() { + hasCandidateResult = false; + extremeResult.reset(); + } + + @Override + public boolean hasFinalResult() { + return false; + } + + @Override + public TSDataType[] getIntermediateType() { + return new TSDataType[] {extremeResult.getDataType()}; + } + + @Override + public TSDataType getFinalType() { + return extremeResult.getDataType(); + } + + public Object getAbsValue(Object v) { + switch (extremeResult.getDataType()) { + case DOUBLE: + return Math.abs((Double) v); + case FLOAT: + return Math.abs((Float) v); + case INT32: + return Math.abs((Integer) v); + case INT64: + return Math.abs((Long) v); + default: + throw new UnSupportedDataTypeException(String.valueOf(extremeResult.getDataType())); + } + } + + private void updateResult(Comparable<Object> extVal) { + if (extVal == null) { + return; + } + + Comparable<Object> absExtVal = (Comparable<Object>) getAbsValue(extVal); + Comparable<Object> candidateResult = (Comparable<Object>) extremeResult.getValue(); + Comparable<Object> absCandidateResult = + (Comparable<Object>) getAbsValue(extremeResult.getValue()); + + if (!hasCandidateResult + || (absExtVal.compareTo(absCandidateResult) > 0 + || (absExtVal.compareTo(absCandidateResult) == 0 + && extVal.compareTo(candidateResult) > 0))) { + hasCandidateResult = true; + extremeResult.setObject(extVal); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java similarity index 89% copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java index fb49e8c8c1..49ad6a5133 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/FirstValueAccumulator.java @@ -17,6 +17,5 @@ * under the License. */ -package org.apache.iotdb.db.mpp.operator.aggregation; - -public class AggregatorFactory {} +package org.apache.iotdb.db.mpp.operator.aggregation;public class FirstValueAccumulator { +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java similarity index 89% copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java index fb49e8c8c1..97183aae52 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/LastValueAccumulator.java @@ -17,6 +17,5 @@ * under the License. */ -package org.apache.iotdb.db.mpp.operator.aggregation; - -public class AggregatorFactory {} +package org.apache.iotdb.db.mpp.operator.aggregation;public class LastValueAccumulator { +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java similarity index 90% copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java index fb49e8c8c1..c5d03f8442 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxTimeAccumulator.java @@ -17,6 +17,5 @@ * under the License. */ -package org.apache.iotdb.db.mpp.operator.aggregation; - -public class AggregatorFactory {} +package org.apache.iotdb.db.mpp.operator.aggregation;public class MaxTimeAccumulator { +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxValueAccumulator.java similarity index 59% copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxValueAccumulator.java index db266b55e4..bb3f1d880a 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MaxValueAccumulator.java @@ -19,15 +19,22 @@ package org.apache.iotdb.db.mpp.operator.aggregation; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; -public class CountAccumulator implements Accumulator { +public class MaxValueAccumulator implements Accumulator { - private long countValue = 0; + private TsPrimitiveType maxResult; + private boolean hasCandidateResult; + + public MaxValueAccumulator(TSDataType seriesDataType) { + this.maxResult = TsPrimitiveType.getByType(seriesDataType); + } // Column should be like: | Time | Value | @Override @@ -38,47 +45,70 @@ public class CountAccumulator implements Accumulator { if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) { break; } - countValue++; + updateResult((Comparable<Object>) column[1].getObject(i)); } } - // partialResult should be like: | partialCountValue1 | partialCountValue2 | + // partialResult should be like: | partialMaxValue1 | @Override public void addIntermediate(Column[] partialResult) { - for (int i = 0; i < partialResult.length; i++) { - countValue += partialResult[i].getLong(0); + if (partialResult.length != 1) { + throw new IllegalArgumentException("partialResult of MaxValue should be 1"); } + updateResult((Comparable<Object>) partialResult[0].getObject(0)); } @Override public void addStatistics(Statistics statistics) { - countValue += statistics.getCount(); + Comparable<Object> maxValue = (Comparable<Object>) statistics.getMaxValue(); + updateResult(maxValue); } // finalResult should be single column, like: | finalCountValue | @Override public void setFinal(Column finalResult) { - countValue = finalResult.getLong(0); + maxResult.setObject(finalResult.getObject(0)); } // columnBuilder should be single in countAccumulator @Override public void outputIntermediate(ColumnBuilder[] columnBuilders) { - columnBuilders[0].writeLong(countValue); + columnBuilders[0].writeObject(maxResult.getValue()); } @Override public void outputFinal(ColumnBuilder columnBuilder) { - columnBuilder.writeLong(countValue); + columnBuilder.writeObject(maxResult.getValue()); } @Override public void reset() { - this.countValue = 0; + hasCandidateResult = false; + this.maxResult.reset(); } @Override public boolean hasFinalResult() { return false; } + + @Override + public TSDataType[] getIntermediateType() { + return new TSDataType[] {maxResult.getDataType()}; + } + + @Override + public TSDataType getFinalType() { + return maxResult.getDataType(); + } + + private void updateResult(Comparable<Object> minVal) { + if (minVal == null) { + return; + } + if (!hasCandidateResult || minVal.compareTo(maxResult.getValue()) > 0) { + hasCandidateResult = true; + maxResult.setObject(minVal); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java similarity index 90% rename from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java rename to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java index fb49e8c8c1..95bf611acf 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/AggregatorFactory.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinTimeAccumulator.java @@ -17,6 +17,5 @@ * under the License. */ -package org.apache.iotdb.db.mpp.operator.aggregation; - -public class AggregatorFactory {} +package org.apache.iotdb.db.mpp.operator.aggregation;public class MinTimeAccumulator { +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java similarity index 58% copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java index db266b55e4..97f46724ae 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/MinValueAccumulator.java @@ -19,15 +19,22 @@ package org.apache.iotdb.db.mpp.operator.aggregation; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; -public class CountAccumulator implements Accumulator { +public class MinValueAccumulator implements Accumulator { - private long countValue = 0; + private TsPrimitiveType minResult; + private boolean hasCandidateResult; + + public MinValueAccumulator(TSDataType seriesDataType) { + this.minResult = TsPrimitiveType.getByType(seriesDataType); + } // Column should be like: | Time | Value | @Override @@ -38,47 +45,70 @@ public class CountAccumulator implements Accumulator { if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) { break; } - countValue++; + updateResult((Comparable<Object>) column[1].getObject(i)); } } - // partialResult should be like: | partialCountValue1 | partialCountValue2 | + // partialResult should be like: | partialMinValue1 | @Override public void addIntermediate(Column[] partialResult) { - for (int i = 0; i < partialResult.length; i++) { - countValue += partialResult[i].getLong(0); + if (partialResult.length != 1) { + throw new IllegalArgumentException("partialResult of MinValue should be 1"); } + updateResult((Comparable<Object>) partialResult[0].getObject(0)); } @Override public void addStatistics(Statistics statistics) { - countValue += statistics.getCount(); + Comparable<Object> minVal = (Comparable<Object>) statistics.getMinValue(); + updateResult(minVal); } // finalResult should be single column, like: | finalCountValue | @Override public void setFinal(Column finalResult) { - countValue = finalResult.getLong(0); + minResult.setObject(finalResult.getObject(0)); } - // columnBuilder should be single in countAccumulator + // columnBuilder should be single in MinValueAccumulator @Override public void outputIntermediate(ColumnBuilder[] columnBuilders) { - columnBuilders[0].writeLong(countValue); + columnBuilders[0].writeObject(minResult.getValue()); } @Override public void outputFinal(ColumnBuilder columnBuilder) { - columnBuilder.writeLong(countValue); + columnBuilder.writeObject(minResult.getValue()); } @Override public void reset() { - this.countValue = 0; + hasCandidateResult = false; + this.minResult.reset(); } @Override public boolean hasFinalResult() { return false; } + + @Override + public TSDataType[] getIntermediateType() { + return new TSDataType[] {minResult.getDataType()}; + } + + @Override + public TSDataType getFinalType() { + return minResult.getDataType(); + } + + private void updateResult(Comparable<Object> minVal) { + if (minVal == null) { + return; + } + if (!hasCandidateResult || minVal.compareTo(minResult.getValue()) < 0) { + hasCandidateResult = true; + minResult.setObject(minVal); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/SumAccumulator.java similarity index 52% copy from server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java copy to server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/SumAccumulator.java index db266b55e4..7dd3896b1c 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/CountAccumulator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/aggregation/SumAccumulator.java @@ -19,15 +19,23 @@ package org.apache.iotdb.db.mpp.operator.aggregation; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.column.Column; import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder; import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn; -public class CountAccumulator implements Accumulator { +public class SumAccumulator implements Accumulator { - private long countValue = 0; + private TSDataType seriesDataType; + private double sumValue = 0; + + public SumAccumulator(TSDataType seriesDataType) { + this.seriesDataType = seriesDataType; + } // Column should be like: | Time | Value | @Override @@ -38,47 +46,85 @@ public class CountAccumulator implements Accumulator { if (curTime >= timeRange.getMax() || curTime < timeRange.getMin()) { break; } - countValue++; + updateSumValue(column[1].getObject(i)); } } - // partialResult should be like: | partialCountValue1 | partialCountValue2 | + // partialResult should be like: | partialSumValue1 | @Override public void addIntermediate(Column[] partialResult) { - for (int i = 0; i < partialResult.length; i++) { - countValue += partialResult[i].getLong(0); + if (partialResult.length != 1) { + throw new IllegalArgumentException("partialResult of Sum should be 1"); } + updateSumValue(partialResult[0].getObject(0)); } @Override public void addStatistics(Statistics statistics) { - countValue += statistics.getCount(); + if (statistics instanceof IntegerStatistics) { + sumValue += statistics.getSumLongValue(); + } else { + sumValue += statistics.getSumDoubleValue(); + } } - // finalResult should be single column, like: | finalCountValue | + // finalResult should be single column, like: | finalSumValue | @Override public void setFinal(Column finalResult) { - countValue = finalResult.getLong(0); + reset(); + updateSumValue(finalResult.getObject(0)); } // columnBuilder should be single in countAccumulator @Override public void outputIntermediate(ColumnBuilder[] columnBuilders) { - columnBuilders[0].writeLong(countValue); + columnBuilders[0].writeDouble(sumValue); } @Override public void outputFinal(ColumnBuilder columnBuilder) { - columnBuilder.writeLong(countValue); + columnBuilder.writeDouble(sumValue); } @Override public void reset() { - this.countValue = 0; + this.sumValue = 0; } @Override public boolean hasFinalResult() { return false; } + + @Override + public TSDataType[] getIntermediateType() { + return new TSDataType[] {TSDataType.DOUBLE}; + } + + @Override + public TSDataType getFinalType() { + return TSDataType.DOUBLE; + } + + private void updateSumValue(Object sumVal) throws UnSupportedDataTypeException { + switch (seriesDataType) { + case INT32: + sumValue += (int) sumVal; + break; + case INT64: + sumValue = (long) sumVal; + break; + case FLOAT: + sumValue = (float) sumVal; + break; + case DOUBLE: + sumValue = (double) sumVal; + break; + case TEXT: + case BOOLEAN: + default: + throw new UnSupportedDataTypeException( + String.format("Unsupported data type in aggregation AVG : %s", seriesDataType)); + } + } } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java index 8966b1f57b..b679febcca 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/operator/source/SeriesAggregateScanOperator.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.utils.timerangeiterator.ITimeRangeIterator; import org.apache.iotdb.db.utils.timerangeiterator.SingleTimeWindowIterator; import org.apache.iotdb.db.utils.timerangeiterator.TimeRangeIteratorFactory; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -39,9 +40,10 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import com.google.common.util.concurrent.ListenableFuture; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; -import java.util.stream.Collectors; /** * This operator is responsible to do the aggregation calculation for one series based on global @@ -94,9 +96,11 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { null, ascending); this.aggregators = aggregators; - tsBlockBuilder = - new TsBlockBuilder( - aggregators.stream().map(Aggregator::getOutputType).collect(Collectors.toList())); + List<TSDataType> dataTypes = new ArrayList<>(); + for (Aggregator aggregator : aggregators) { + dataTypes.addAll(Arrays.asList(aggregator.getOutputType())); + } + tsBlockBuilder = new TsBlockBuilder(dataTypes); this.timeRangeIterator = initTimeRangeIterator(groupByTimeParameter); } @@ -216,10 +220,14 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { // Use start time of current time range as time column timeColumnBuilder.writeLong(curTimeRange.getMin()); ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders(); - for (int i = 0; i < aggregators.size(); i++) { - ColumnBuilder[] columnBuilder = new ColumnBuilder[1]; - columnBuilder[0] = columnBuilders[i]; - aggregators.get(i).outputResult(columnBuilder); + int columnIndex = 0; + for (Aggregator aggregator : aggregators) { + ColumnBuilder[] columnBuilder = new ColumnBuilder[aggregator.getOutputType().length]; + columnBuilder[0] = columnBuilders[columnIndex++]; + if (columnBuilder.length > 1) { + columnBuilder[1] = columnBuilders[columnIndex++]; + } + aggregator.outputResult(columnBuilder); } tsBlockBuilder.declarePosition(); resultTsBlock = tsBlockBuilder.build(); @@ -261,7 +269,7 @@ public class SeriesAggregateScanOperator implements DataSourceOperator { @SuppressWarnings("squid:S3776") private void calcFromBatch(TsBlock tsBlock, TimeRange curTimeRange) { // check if the batchData does not contain points in current interval - if (!satisfied(tsBlock, curTimeRange)) { + if (tsBlock == null || !satisfied(tsBlock, curTimeRange)) { return; } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java index 244fbcf7b8..8f3dbff4fe 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/parameter/AggregationDescriptor.java @@ -59,6 +59,10 @@ public class AggregationDescriptor { return aggregationType; } + public AggregationStep getStep() { + return step; + } + public void serialize(ByteBuffer byteBuffer) { ReadWriteIOUtils.write(aggregationType.ordinal(), byteBuffer); step.serialize(byteBuffer); diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java index bfaced0670..c21314b5d9 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/operator/SeriesAggregateScanOperatorTest.java @@ -30,10 +30,12 @@ import org.apache.iotdb.db.mpp.common.QueryId; import org.apache.iotdb.db.mpp.execution.FragmentInstanceContext; import org.apache.iotdb.db.mpp.execution.FragmentInstanceState; import org.apache.iotdb.db.mpp.execution.FragmentInstanceStateMachine; +import org.apache.iotdb.db.mpp.operator.aggregation.AccumulatorFactory; import org.apache.iotdb.db.mpp.operator.aggregation.Aggregator; import org.apache.iotdb.db.mpp.operator.source.SeriesAggregateScanOperator; import org.apache.iotdb.db.mpp.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.mpp.sql.planner.plan.parameter.GroupByTimeParameter; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil; @@ -89,7 +91,14 @@ public class SeriesAggregateScanOperatorTest { public void testAggregationWithoutTimeFilter() throws IllegalPathException { SeriesAggregateScanOperator seriesAggregateScanOperator = initSeriesAggregateScanOperator( - Collections.singletonList(AggregationType.COUNT), null, true, null); + Collections.singletonList( + new Aggregator( + AccumulatorFactory.createAccumulator(AggregationType.COUNT, TSDataType.INT32), + AggregationStep.SINGLE, + null)), + null, + true, + null); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -105,7 +114,7 @@ public class SeriesAggregateScanOperatorTest { aggregationTypes.add(AggregationType.COUNT); aggregationTypes.add(AggregationType.SUM); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(aggregationTypes, null, true, null); + initSeriesAggregateScanOperator(null, null, true, null); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -126,7 +135,7 @@ public class SeriesAggregateScanOperatorTest { aggregationTypes.add(AggregationType.MAX_VALUE); aggregationTypes.add(AggregationType.MIN_VALUE); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(aggregationTypes, null, true, null); + initSeriesAggregateScanOperator(null, null, true, null); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -145,8 +154,7 @@ public class SeriesAggregateScanOperatorTest { public void testAggregationWithTimeFilter1() throws IllegalPathException { Filter timeFilter = TimeFilter.gtEq(120); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator( - Collections.singletonList(AggregationType.COUNT), timeFilter, true, null); + initSeriesAggregateScanOperator(null, timeFilter, true, null); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -160,8 +168,7 @@ public class SeriesAggregateScanOperatorTest { public void testAggregationWithTimeFilter2() throws IllegalPathException { Filter timeFilter = TimeFilter.ltEq(379); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator( - Collections.singletonList(AggregationType.COUNT), timeFilter, true, null); + initSeriesAggregateScanOperator(null, timeFilter, true, null); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -175,8 +182,7 @@ public class SeriesAggregateScanOperatorTest { public void testAggregationWithTimeFilter3() throws IllegalPathException { Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399)); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator( - Collections.singletonList(AggregationType.COUNT), timeFilter, true, null); + initSeriesAggregateScanOperator(null, timeFilter, true, null); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -197,7 +203,7 @@ public class SeriesAggregateScanOperatorTest { aggregationTypes.add(AggregationType.MIN_VALUE); Filter timeFilter = new AndFilter(TimeFilter.gtEq(100), TimeFilter.ltEq(399)); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(aggregationTypes, timeFilter, true, null); + initSeriesAggregateScanOperator(null, timeFilter, true, null); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -217,8 +223,7 @@ public class SeriesAggregateScanOperatorTest { int[] result = new int[] {100, 100, 100, 100}; GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator( - Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter); + initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -235,11 +240,7 @@ public class SeriesAggregateScanOperatorTest { Filter timeFilter = new AndFilter(TimeFilter.gtEq(120), TimeFilter.ltEq(379)); GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator( - Collections.singletonList(AggregationType.COUNT), - timeFilter, - true, - groupByTimeParameter); + initSeriesAggregateScanOperator(null, timeFilter, true, groupByTimeParameter); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -266,7 +267,7 @@ public class SeriesAggregateScanOperatorTest { aggregationTypes.add(AggregationType.MIN_VALUE); GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 100, true); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter); + initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -285,8 +286,7 @@ public class SeriesAggregateScanOperatorTest { int[] result = new int[] {50, 50, 50, 50, 50, 50, 50, 50}; GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 399, 100, 50, true); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator( - Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter); + initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -303,8 +303,7 @@ public class SeriesAggregateScanOperatorTest { int[] result = new int[] {20, 10, 20, 10, 20, 10, 20, 10, 20, 9}; GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator( - Collections.singletonList(AggregationType.COUNT), null, true, groupByTimeParameter); + initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); @@ -332,7 +331,7 @@ public class SeriesAggregateScanOperatorTest { aggregationTypes.add(AggregationType.MIN_VALUE); GroupByTimeParameter groupByTimeParameter = new GroupByTimeParameter(0, 149, 50, 30, true); SeriesAggregateScanOperator seriesAggregateScanOperator = - initSeriesAggregateScanOperator(aggregationTypes, null, true, groupByTimeParameter); + initSeriesAggregateScanOperator(null, null, true, groupByTimeParameter); int count = 0; while (seriesAggregateScanOperator.hasNext()) { TsBlock resultTsBlock = seriesAggregateScanOperator.next(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java index 7bfd218374..8251fe9438 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsPrimitiveType.java @@ -29,8 +29,34 @@ public abstract class TsPrimitiveType implements Serializable { /** * get tsPrimitiveType by resultDataType. * - * @param dataType -given TsDataType - * @param v - + * @param dataType given TsDataType + */ + public static TsPrimitiveType getByType(TSDataType dataType) { + switch (dataType) { + case BOOLEAN: + return new TsPrimitiveType.TsBoolean(); + case INT32: + return new TsPrimitiveType.TsInt(); + case INT64: + return new TsPrimitiveType.TsLong(); + case FLOAT: + return new TsPrimitiveType.TsFloat(); + case DOUBLE: + return new TsPrimitiveType.TsDouble(); + case TEXT: + return new TsPrimitiveType.TsBinary(); + case VECTOR: + return new TsPrimitiveType.TsVector(); + default: + throw new UnSupportedDataTypeException("Unsupported data type:" + dataType); + } + } + + /** + * get tsPrimitiveType by resultDataType and initial value. + * + * @param dataType given TsDataType + * @param v initial value */ public static TsPrimitiveType getByType(TSDataType dataType, Object v) { switch (dataType) { @@ -109,6 +135,10 @@ public abstract class TsPrimitiveType implements Serializable { throw new UnsupportedOperationException("setVector() is not supported for current sub-class"); } + public abstract void setObject(Object val); + + public abstract void reset(); + /** * get the size of one instance of current class. * @@ -142,6 +172,8 @@ public abstract class TsPrimitiveType implements Serializable { private boolean value; + public TsBoolean() {} + public TsBoolean(boolean value) { this.value = value; } @@ -156,6 +188,19 @@ public abstract class TsPrimitiveType implements Serializable { this.value = val; } + @Override + public void setObject(Object val) { + if (val instanceof Binary) { + setBinary((Binary) val); + } + throw new UnSupportedDataTypeException("TsBoolean can only be set Binary value"); + } + + @Override + public void reset() { + value = false; + } + @Override public int getSize() { return 4 + 1; @@ -198,6 +243,8 @@ public abstract class TsPrimitiveType implements Serializable { private int value; + public TsInt() {} + public TsInt(int value) { this.value = value; } @@ -212,6 +259,19 @@ public abstract class TsPrimitiveType implements Serializable { this.value = val; } + @Override + public void setObject(Object val) { + if (val instanceof Integer) { + setInt((Integer) val); + } + throw new UnSupportedDataTypeException("TsInt can only be set Integer value"); + } + + @Override + public void reset() { + value = 0; + } + @Override public int getSize() { return 4 + 4; @@ -254,6 +314,8 @@ public abstract class TsPrimitiveType implements Serializable { private long value; + public TsLong() {} + public TsLong(long value) { this.value = value; } @@ -268,6 +330,19 @@ public abstract class TsPrimitiveType implements Serializable { this.value = val; } + @Override + public void setObject(Object val) { + if (val instanceof Long) { + setLong((Long) val); + } + throw new UnSupportedDataTypeException("TsLong can only be set Long value"); + } + + @Override + public void reset() { + value = 0; + } + @Override public int getSize() { return 4 + 8; @@ -310,6 +385,8 @@ public abstract class TsPrimitiveType implements Serializable { private float value; + public TsFloat() {} + public TsFloat(float value) { this.value = value; } @@ -324,6 +401,19 @@ public abstract class TsPrimitiveType implements Serializable { this.value = val; } + @Override + public void setObject(Object val) { + if (val instanceof Float) { + setFloat((Float) val); + } + throw new UnSupportedDataTypeException("TsFloat can only be set float value"); + } + + @Override + public void reset() { + value = 0; + } + @Override public int getSize() { return 4 + 4; @@ -366,6 +456,8 @@ public abstract class TsPrimitiveType implements Serializable { private double value; + public TsDouble() {} + public TsDouble(double value) { this.value = value; } @@ -380,6 +472,19 @@ public abstract class TsPrimitiveType implements Serializable { this.value = val; } + @Override + public void setObject(Object val) { + if (val instanceof Double) { + setDouble((Double) val); + } + throw new UnSupportedDataTypeException("TsDouble can only be set Double value"); + } + + @Override + public void reset() { + value = 0.0; + } + @Override public int getSize() { return 4 + 8; @@ -422,6 +527,8 @@ public abstract class TsPrimitiveType implements Serializable { private Binary value; + public TsBinary() {} + public TsBinary(Binary value) { this.value = value; } @@ -436,6 +543,19 @@ public abstract class TsPrimitiveType implements Serializable { this.value = val; } + @Override + public void setObject(Object val) { + if (val instanceof Binary) { + setBinary((Binary) val); + } + throw new UnSupportedDataTypeException("TsBinary can only be set Binary value"); + } + + @Override + public void reset() { + value = null; + } + @Override public int getSize() { return 4 + 4 + value.getLength(); @@ -478,6 +598,8 @@ public abstract class TsPrimitiveType implements Serializable { private TsPrimitiveType[] values; + public TsVector() {} + public TsVector(TsPrimitiveType[] values) { this.values = values; } @@ -492,6 +614,19 @@ public abstract class TsPrimitiveType implements Serializable { this.values = vals; } + @Override + public void setObject(Object val) { + if (val instanceof TsPrimitiveType[]) { + setVector((TsPrimitiveType[]) val); + } + throw new UnSupportedDataTypeException("TsVector can only be set TsPrimitiveType[] value"); + } + + @Override + public void reset() { + values = null; + } + @Override public int getSize() { int size = 0;
