This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch aggrWithValueFilter in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 04996ef36bed10fd8cd6e01827ec00545df371db Author: Alima777 <[email protected]> AuthorDate: Thu Nov 25 11:19:58 2021 +0800 reimplement value iterator to update aggr result --- .../db/query/aggregation/AggregateResult.java | 4 +- .../db/query/aggregation/impl/AvgAggrResult.java | 9 +- .../db/query/aggregation/impl/CountAggrResult.java | 11 ++- .../query/aggregation/impl/ExtremeAggrResult.java | 7 +- .../aggregation/impl/FirstValueAggrResult.java | 11 +-- .../aggregation/impl/FirstValueDescAggrResult.java | 7 +- .../aggregation/impl/LastValueAggrResult.java | 7 +- .../aggregation/impl/LastValueDescAggrResult.java | 12 ++- .../query/aggregation/impl/MaxTimeAggrResult.java | 5 +- .../aggregation/impl/MaxTimeDescAggrResult.java | 10 +-- .../query/aggregation/impl/MaxValueAggrResult.java | 10 ++- .../query/aggregation/impl/MinTimeAggrResult.java | 10 +-- .../aggregation/impl/MinTimeDescAggrResult.java | 5 +- .../query/aggregation/impl/MinValueAggrResult.java | 10 ++- .../db/query/aggregation/impl/SumAggrResult.java | 9 +- .../db/query/executor/AggregationExecutor.java | 98 ++++++++++++++++------ .../iotdb/db/utils/AlignedValueIterator.java | 46 ++++++++++ .../org/apache/iotdb/db/utils/ValueIterator.java | 56 +++++++++++++ 18 files changed, 236 insertions(+), 91 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java index efd9aee..7daa1e8 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.query.factory.AggregateResultFactory; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; @@ -98,7 +99,8 @@ public abstract class AggregateResult { long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException; /** This method calculates the aggregation using values that have been calculated */ - public abstract void updateResultUsingValues(long[] timestamps, int length, Object[] values); + public abstract void updateResultUsingValues( + long[] timestamps, int length, ValueIterator valueIterator); /** * Judge if aggregation results have been calculated. In other words, if the aggregated result diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java index a1fffa6..aebe259 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; @@ -110,11 +111,9 @@ public class AvgAggrResult extends AggregateResult { } @Override - public void updateResultUsingValues(long[] timestamps, int length, Object[] values) { - for (int i = 0; i < length; i++) { - if (values[i] != null) { - updateAvg(seriesDataType, values[i]); - } + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { + while (valueIterator.hasNext()) { + updateAvg(seriesDataType, valueIterator.next()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java index 57a3a7e..971c76a 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; @@ -57,7 +58,6 @@ public class CountAggrResult extends AggregateResult { public void updateResultFromPageData( IBatchDataIterator batchIterator, long minBound, long maxBound) { int cnt = 0; - int count = batchIterator.totalLength(); while (batchIterator.hasNext()) { if (batchIterator.currentTime() >= maxBound || batchIterator.currentTime() < minBound) { break; @@ -82,12 +82,11 @@ public class CountAggrResult extends AggregateResult { } @Override - public void updateResultUsingValues(long[] timestamps, int length, Object[] values) { + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { int cnt = 0; - for (int i = 0; i < length; i++) { - if (values[i] != null) { - cnt++; - } + while (valueIterator.hasNext()) { + valueIterator.next(); + cnt++; } setLongValue(getLongValue() + cnt); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java index 144fe00..0a0c165 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/ExtremeAggrResult.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; @@ -125,10 +126,10 @@ public class ExtremeAggrResult extends AggregateResult { } @Override - public void updateResultUsingValues(long[] timestamps, int length, Object[] values) { + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { Comparable<Object> extVal = null; - for (int i = 0; i < length; i++) { - extVal = getExtremeValue(extVal, (Comparable<Object>) values[i]); + while (valueIterator.hasNext()) { + extVal = getExtremeValue(extVal, (Comparable<Object>) valueIterator.next()); } updateResult(extVal); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java index 233abdc..be5bd9e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; @@ -107,16 +108,12 @@ public class FirstValueAggrResult extends AggregateResult { } @Override - public void updateResultUsingValues(long[] timestamps, int length, Object[] values) { + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { if (hasFinalResult()) { return; } - for (int i = 0; i < length; i++) { - if (values[i] != null) { - setValue(values[i]); - timestamp = timestamps[i]; - break; - } + if (valueIterator.hasNext()) { + setValue(valueIterator.next()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java index 3092818..afcd23f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.query.aggregation.impl; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; @@ -65,10 +66,10 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult { } @Override - public void updateResultUsingValues(long[] timestamps, int length, Object[] values) { + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { for (int i = length - 1; i >= 0; i--) { - if (values[i] != null) { - setValue(values[i]); + if (valueIterator.get(i) != null) { + setValue(valueIterator.get(i)); timestamp = timestamps[i]; return; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java index 443751c..d369f79 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; @@ -97,11 +98,11 @@ public class LastValueAggrResult extends AggregateResult { } @Override - public void updateResultUsingValues(long[] timestamps, int length, Object[] values) { + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { for (int i = length - 1; i >= 0; i--) { - if (values[i] != null) { + if (valueIterator.get(i) != null) { timestamp = timestamps[i]; - setValue(values[i]); + setValue(valueIterator.get(i)); return; } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java index 981167b..7bc236c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.query.aggregation.impl; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; @@ -87,16 +88,13 @@ public class LastValueDescAggrResult extends LastValueAggrResult { } @Override - public void updateResultUsingValues(long[] timestamps, int length, Object[] values) { + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { if (hasFinalResult()) { return; } - for (int i = 0; i < length; i++) { - if (values[i] != null) { - timestamp = timestamps[i]; - setValue(values[i]); - return; - } + if (valueIterator.hasNext()) { + timestamp = timestamps[valueIterator.getCurPos()]; + setValue(valueIterator.next()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java index 46ebe15..d990b8f 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; @@ -77,9 +78,9 @@ public class MaxTimeAggrResult extends AggregateResult { } @Override - public void updateResultUsingValues(long[] timestamps, int length, Object[] values) { + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { for (int i = length - 1; i >= 0; i--) { - if (values[i] != null) { + if (valueIterator.get(i) != null) { updateMaxTimeResult(timestamps[i]); return; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java index e867bf7..1dfc1ad 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.query.aggregation.impl; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; @@ -70,15 +71,12 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult { } @Override - public void updateResultUsingValues(long[] timestamps, int length, Object[] values) { + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { if (hasFinalResult()) { return; } - for (int i = 0; i < length; i++) { - if (values[i] != null) { - updateMaxTimeResult(timestamps[i]); - return; - } + if (valueIterator.hasNext()) { + updateMaxTimeResult(timestamps[valueIterator.getCurPos()]); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java index a61583b..f939d9c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; @@ -83,11 +84,12 @@ public class MaxValueAggrResult extends AggregateResult { } @Override - public void updateResultUsingValues(long[] timestamps, int length, Object[] values) { + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { Comparable<Object> maxVal = null; - for (int i = 0; i < length; i++) { - if (values[i] != null && (maxVal == null || maxVal.compareTo(values[i]) < 0)) { - maxVal = (Comparable<Object>) values[i]; + while (valueIterator.hasNext()) { + Object value = valueIterator.next(); + if (maxVal == null || maxVal.compareTo(value) < 0) { + maxVal = (Comparable<Object>) value; } } updateResult(maxVal); diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java index 4d0365f..0cf2205 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; @@ -92,15 +93,12 @@ public class MinTimeAggrResult extends AggregateResult { } @Override - public void updateResultUsingValues(long[] timestamps, int length, Object[] values) { + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { if (hasFinalResult()) { return; } - for (int i = 0; i < length; i++) { - if (values[i] != null) { - setLongValue(timestamps[i]); - return; - } + if (valueIterator.hasNext()) { + setLongValue(timestamps[valueIterator.getCurPos()]); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java index 9abceb5..aac888d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.query.aggregation.impl; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; @@ -54,9 +55,9 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult { } @Override - public void updateResultUsingValues(long[] timestamps, int length, Object[] values) { + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { for (int i = length - 1; i >= 0; i--) { - if (values[i] != null) { + if (valueIterator.get(i) != null) { setLongValue(timestamps[i]); return; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java index f3c01ed..8b9e1e4 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.IBatchDataIterator; @@ -78,11 +79,12 @@ public class MinValueAggrResult extends AggregateResult { } @Override - public void updateResultUsingValues(long[] timestamps, int length, Object[] values) { + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { Comparable<Object> minVal = null; - for (int i = 0; i < length; i++) { - if (values[i] != null && (minVal == null || minVal.compareTo(values[i]) > 0)) { - minVal = (Comparable<Object>) values[i]; + while (valueIterator.hasNext()) { + Object value = valueIterator.next(); + if (minVal == null || minVal.compareTo(value) > 0) { + minVal = (Comparable<Object>) value; } } updateResult(minVal); diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java index a047c33..46cd1b1 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.query.aggregation.impl; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.BooleanStatistics; @@ -91,11 +92,9 @@ public class SumAggrResult extends AggregateResult { } @Override - public void updateResultUsingValues(long[] timestamps, int length, Object[] values) { - for (int i = 0; i < length; i++) { - if (values[i] != null) { - updateSum(values[i]); - } + public void updateResultUsingValues(long[] timestamps, int length, ValueIterator valueIterator) { + while (valueIterator.hasNext()) { + updateSum(valueIterator.next()); } } diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index 53c218a..94f4f3c 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -44,7 +44,9 @@ import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader; import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp; import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator; +import org.apache.iotdb.db.utils.AlignedValueIterator; import org.apache.iotdb.db.utils.QueryUtils; +import org.apache.iotdb.db.utils.ValueIterator; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; @@ -55,10 +57,12 @@ import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -588,18 +592,25 @@ public class AggregationExecutor { // group by path name Map<PartialPath, List<Integer>> pathToAggrIndexesMap = groupAggregationsBySeries(selectedSeries); - Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap = new HashMap<>(); + Map<PartialPath, List<List<Integer>>> vectorPathIndexesMap = + groupVectorSeries(pathToAggrIndexesMap); + Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap = new HashMap<>(); List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries); + try { - for (int i = 0; i < selectedSeries.size(); i++) { - PartialPath path = selectedSeries.get(i); - List<Integer> indexes = pathToAggrIndexesMap.remove(path); - if (indexes != null) { - IReaderByTimestamp seriesReaderByTimestamp = - getReaderByTime(path, queryPlan, dataTypes.get(i), context); - readerToAggrIndexesMap.put(seriesReaderByTimestamp, indexes); - } + for (PartialPath path : pathToAggrIndexesMap.keySet()) { + IReaderByTimestamp seriesReaderByTimestamp = + getReaderByTime(path, queryPlan, path.getSeriesType(), context); + readerToAggrIndexesMap.put( + seriesReaderByTimestamp, Collections.singletonList(pathToAggrIndexesMap.get(path))); } + pathToAggrIndexesMap.clear(); + for (PartialPath vectorPath : vectorPathIndexesMap.keySet()) { + IReaderByTimestamp seriesReaderByTimestamp = + getReaderByTime(vectorPath, queryPlan, vectorPath.getSeriesType(), context); + readerToAggrIndexesMap.put(seriesReaderByTimestamp, vectorPathIndexesMap.get(vectorPath)); + } + vectorPathIndexesMap.clear(); } finally { StorageEngine.getInstance().mergeUnLock(list); } @@ -649,7 +660,7 @@ public class AggregationExecutor { /** calculate aggregation result with value filter. */ private void aggregateWithValueFilter( TimeGenerator timestampGenerator, - Map<IReaderByTimestamp, List<Integer>> readerToAggrIndexesMap) + Map<IReaderByTimestamp, List<List<Integer>>> readerToAggrIndexesMap) throws IOException { List<Boolean> cached = markFilterdPaths( @@ -668,23 +679,38 @@ public class AggregationExecutor { } // cal part of aggregate result - for (Entry<IReaderByTimestamp, List<Integer>> entry : readerToAggrIndexesMap.entrySet()) { - int pathId = entry.getValue().get(0); - // cache in timeGenerator - if (cached.get(pathId)) { - Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId)); - for (Integer i : entry.getValue()) { - aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values); + for (Entry<IReaderByTimestamp, List<List<Integer>>> entry : + readerToAggrIndexesMap.entrySet()) { + // use cache data as much as possible + boolean[] cachedOrNot = new boolean[entry.getValue().size()]; + for (int i = 0; i < entry.getValue().size(); i++) { + List<Integer> subIndexes = entry.getValue().get(i); + int pathId = subIndexes.get(0); + // if cached in timeGenerator + if (cached.get(pathId)) { + // TODO: need to get exact path class? + Object[] values = timestampGenerator.getValues(selectedSeries.get(pathId)); + ValueIterator valueIterator = generateValueIterator(values); + for (Integer index : subIndexes) { + aggregateResultList[index].updateResultUsingValues( + timeArray, timeArrayLength, valueIterator); + } + cachedOrNot[i] = true; } - } else { - if (entry.getValue().size() == 1) { - aggregateResultList[entry.getValue().get(0)].updateResultUsingTimestamps( - timeArray, timeArrayLength, entry.getKey()); - } else { - Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength); - if (values != null) { - for (Integer i : entry.getValue()) { - aggregateResultList[i].updateResultUsingValues(timeArray, timeArrayLength, values); + } + // TODO: if size = 1, we only need to get the exact number of values for specific aggregate + if (hasRemaining(cachedOrNot)) { + Object[] values = entry.getKey().getValuesInTimestamps(timeArray, timeArrayLength); + if (values != null) { + ValueIterator valueIterator = generateValueIterator(values); + for (int i = 0; i < entry.getValue().size(); i++) { + if (!cachedOrNot[i]) { + valueIterator.setSubMeasurementIndex(i); + for (Integer index : entry.getValue().get(i)) { + aggregateResultList[index].updateResultUsingValues( + timeArray, timeArrayLength, valueIterator); + valueIterator.reset(); + } } } } @@ -693,6 +719,24 @@ public class AggregationExecutor { } } + private ValueIterator generateValueIterator(Object[] values) { + if (values[0] instanceof TsPrimitiveType[]) { + return new AlignedValueIterator(values); + } else { + return new ValueIterator(values); + } + } + + /** Return whether there is result that has not been cached */ + private boolean hasRemaining(boolean[] cachedOrNot) { + for (int i = 0; i < cachedOrNot.length; i++) { + if (!cachedOrNot[i]) { + return true; + } + } + return false; + } + /** * using aggregate result data list construct QueryDataSet. * @@ -766,7 +810,7 @@ public class AggregationExecutor { result.computeIfAbsent(groupPath, key -> new ArrayList<>()).add(indexes); } else { // groupPath is changed here so we update it - List<List<Integer>> subIndexes = result.remove(groupPath); + List<List<Integer>> subIndexes = result.get(groupPath); subIndexes.add(indexes); groupPath.addMeasurements(exactPath.getMeasurementList()); groupPath.addSchemas(exactPath.getSchemaList()); diff --git a/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java new file mode 100644 index 0000000..b47da5c --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/utils/AlignedValueIterator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + +/** Used for value object is instance of TsPrimitiveType[] */ +public class AlignedValueIterator extends ValueIterator { + + int subMeasurementIndex; + + public AlignedValueIterator(Object[] values) { + super(values); + } + + public void setSubMeasurementIndex(int subMeasurementIndex) { + this.subMeasurementIndex = subMeasurementIndex; + } + + @Override + public Object next() { + return ((TsPrimitiveType[]) values[curPos++])[subMeasurementIndex]; + } + + @Override + public Object get(int index) { + return ((TsPrimitiveType[]) values[index])[subMeasurementIndex]; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java b/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java new file mode 100644 index 0000000..d870f7f --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/utils/ValueIterator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.utils; + +public class ValueIterator { + + // Object: TsPrimitiveType[] or common data type + protected Object[] values; + protected int curPos = 0; + + public ValueIterator(Object[] values) { + this.values = values; + } + + public boolean hasNext() { + while (curPos < values.length && values[curPos] == null) { + curPos++; + } + return curPos < values.length; + } + + public void setSubMeasurementIndex(int subMeasurementIndex) {} + + public Object next() { + return values[curPos++]; + } + + public Object get(int index) { + return values[index]; + } + + public int getCurPos() { + return curPos; + } + + public void reset() { + this.curPos = 0; + } +}
