This is an automated email from the ASF dual-hosted git repository. xuekaifeng pushed a commit to branch improve_aggregation_with_value_filter in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d7a25c2d7178044def1f1d0bccffad93b78b8fef Author: 151250176 <[email protected]> AuthorDate: Mon Mar 8 21:20:09 2021 +0800 improve performance --- .../db/query/aggregation/AggregateResult.java | 14 ++++-- .../db/query/aggregation/impl/AvgAggrResult.java | 12 +++-- .../db/query/aggregation/impl/CountAggrResult.java | 12 +++-- .../aggregation/impl/FirstValueAggrResult.java | 17 +++++-- .../aggregation/impl/FirstValueDescAggrResult.java | 9 +++- .../aggregation/impl/LastValueAggrResult.java | 13 ++++-- .../aggregation/impl/LastValueDescAggrResult.java | 13 +++++- .../query/aggregation/impl/MaxTimeAggrResult.java | 12 +++-- .../aggregation/impl/MaxTimeDescAggrResult.java | 12 ++++- .../query/aggregation/impl/MaxValueAggrResult.java | 12 +++-- .../query/aggregation/impl/MinTimeAggrResult.java | 16 +++++-- .../aggregation/impl/MinTimeDescAggrResult.java | 8 +++- .../query/aggregation/impl/MinValueAggrResult.java | 12 +++-- .../db/query/aggregation/impl/SumAggrResult.java | 12 +++-- .../db/query/executor/AggregationExecutor.java | 52 ++++++++++++++-------- 15 files changed, 159 insertions(+), 67 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java index 7b94a93..de74376 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateResult.java @@ -19,6 +19,9 @@ package org.apache.iotdb.db.query.aggregation; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.query.factory.AggregateResultFactory; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; @@ -29,10 +32,6 @@ import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; - public abstract class AggregateResult { private final AggregationType aggregationType; @@ -95,6 +94,13 @@ public abstract class AggregateResult { long[] timestamps, int length, IReaderByTimestamp dataReader) throws IOException; /** + * This method calculates the aggregation using common timestamps of the cross series filter. + * + * @throws IOException TsFile data read error + */ + public abstract void updateResultUsingTimestamps(long time, Object value); + + /** * Judge if aggregation results have been calculated. In other words, if the aggregated result * does not need to compute the remaining data, it returns true. * diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java index 237fa08..6039763 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/AvgAggrResult.java @@ -19,6 +19,9 @@ package org.apache.iotdb.db.query.aggregation.impl; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; @@ -31,10 +34,6 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; - public class AvgAggrResult extends AggregateResult { private TSDataType seriesDataType; @@ -110,6 +109,11 @@ public class AvgAggrResult extends AggregateResult { } } + @Override + public void updateResultUsingTimestamps(long time, Object value) { + updateAvg(seriesDataType, value); + } + private void updateAvg(TSDataType type, Object sumVal) throws UnSupportedDataTypeException { double val; switch (type) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java index adf0069..d085d5b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java @@ -19,6 +19,9 @@ package org.apache.iotdb.db.query.aggregation.impl; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; @@ -26,10 +29,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; - public class CountAggrResult extends AggregateResult { public CountAggrResult() { @@ -87,6 +86,11 @@ public class CountAggrResult extends AggregateResult { } @Override + public void updateResultUsingTimestamps(long time, Object value) { + setLongValue(getLongValue() + 1); + } + + @Override public boolean hasFinalResult() { return false; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java index 0f51dc3..58e8dbd 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrResult.java @@ -19,6 +19,9 @@ package org.apache.iotdb.db.query.aggregation.impl; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; @@ -27,10 +30,6 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; - public class FirstValueAggrResult extends AggregateResult { // timestamp of current value @@ -106,6 +105,16 @@ public class FirstValueAggrResult extends AggregateResult { } @Override + public void updateResultUsingTimestamps(long time, Object value) { + if (hasFinalResult()) { + return; + } + + setValue(value); + timestamp = time; + } + + @Override public boolean hasFinalResult() { return hasCandidateResult; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java index daf5a56..3d3a1c0 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueDescAggrResult.java @@ -19,13 +19,12 @@ package org.apache.iotdb.db.query.aggregation.impl; +import java.io.IOException; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; -import java.io.IOException; - public class FirstValueDescAggrResult extends FirstValueAggrResult { public FirstValueDescAggrResult(TSDataType dataType) { @@ -63,6 +62,12 @@ public class FirstValueDescAggrResult extends FirstValueAggrResult { } @Override + public void updateResultUsingTimestamps(long time, Object value) { + setValue(value); + timestamp = time; + } + + @Override public boolean hasFinalResult() { return false; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java index 0726d7a..cd1c4d7 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueAggrResult.java @@ -19,6 +19,9 @@ package org.apache.iotdb.db.query.aggregation.impl; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; @@ -27,10 +30,6 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; - public class LastValueAggrResult extends AggregateResult { // timestamp of current value @@ -101,6 +100,12 @@ public class LastValueAggrResult extends AggregateResult { } @Override + public void updateResultUsingTimestamps(long time, Object value) { + setValue(value); + timestamp = time; + } + + @Override public boolean hasFinalResult() { return false; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java index 15af763..2ec007d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/LastValueDescAggrResult.java @@ -18,13 +18,12 @@ */ package org.apache.iotdb.db.query.aggregation.impl; +import java.io.IOException; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; -import java.io.IOException; - public class LastValueDescAggrResult extends LastValueAggrResult { public LastValueDescAggrResult(TSDataType dataType) { @@ -85,6 +84,16 @@ public class LastValueDescAggrResult extends LastValueAggrResult { } @Override + public void updateResultUsingTimestamps(long time, Object value) { + if (hasFinalResult()) { + return; + } + + setValue(value); + timestamp = time; + } + + @Override public boolean hasFinalResult() { return hasCandidateResult; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java index 548b249..4c3f757 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeAggrResult.java @@ -19,6 +19,9 @@ package org.apache.iotdb.db.query.aggregation.impl; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; @@ -26,10 +29,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; - public class MaxTimeAggrResult extends AggregateResult { public MaxTimeAggrResult() { @@ -80,6 +79,11 @@ public class MaxTimeAggrResult extends AggregateResult { } @Override + public void updateResultUsingTimestamps(long time, Object value) { + updateMaxTimeResult(time); + } + + @Override public boolean hasFinalResult() { return false; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java index e29a211..9519288 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxTimeDescAggrResult.java @@ -18,12 +18,11 @@ */ package org.apache.iotdb.db.query.aggregation.impl; +import java.io.IOException; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; -import java.io.IOException; - public class MaxTimeDescAggrResult extends MaxTimeAggrResult { @Override @@ -67,6 +66,15 @@ public class MaxTimeDescAggrResult extends MaxTimeAggrResult { } @Override + public void updateResultUsingTimestamps(long timestamp, Object value) { + if (hasFinalResult()) { + return; + } + + updateMaxTimeResult(timestamp); + } + + @Override public boolean hasFinalResult() { return hasCandidateResult; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java index cd00df2..7e3d26d 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MaxValueAggrResult.java @@ -19,6 +19,9 @@ package org.apache.iotdb.db.query.aggregation.impl; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; @@ -26,10 +29,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; - public class MaxValueAggrResult extends AggregateResult { public MaxValueAggrResult(TSDataType dataType) { @@ -85,6 +84,11 @@ public class MaxValueAggrResult extends AggregateResult { } @Override + public void updateResultUsingTimestamps(long time, Object value) { + updateResult((Comparable<Object>) value); + } + + @Override public boolean hasFinalResult() { return false; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java index a0fbabd..a5aa77b 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeAggrResult.java @@ -19,6 +19,9 @@ package org.apache.iotdb.db.query.aggregation.impl; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; @@ -26,10 +29,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; - public class MinTimeAggrResult extends AggregateResult { public MinTimeAggrResult() { @@ -84,6 +83,15 @@ public class MinTimeAggrResult extends AggregateResult { } @Override + public void updateResultUsingTimestamps(long time, Object value) { + if (hasFinalResult()) { + return; + } + + setLongValue(time); + } + + @Override public boolean hasFinalResult() { return hasCandidateResult; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java index 1bafc56..aa12acf 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinTimeDescAggrResult.java @@ -18,12 +18,11 @@ */ package org.apache.iotdb.db.query.aggregation.impl; +import java.io.IOException; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; -import java.io.IOException; - public class MinTimeDescAggrResult extends MinTimeAggrResult { @Override @@ -52,6 +51,11 @@ public class MinTimeDescAggrResult extends MinTimeAggrResult { } @Override + public void updateResultUsingTimestamps(long time, Object value) { + setLongValue(time); + } + + @Override public boolean hasFinalResult() { return false; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java index 8b17d75..9bf6d63 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/MinValueAggrResult.java @@ -19,6 +19,9 @@ package org.apache.iotdb.db.query.aggregation.impl; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; @@ -26,10 +29,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; - public class MinValueAggrResult extends AggregateResult { public MinValueAggrResult(TSDataType dataType) { @@ -80,6 +79,11 @@ public class MinValueAggrResult extends AggregateResult { } @Override + public void updateResultUsingTimestamps(long time, Object value) { + updateResult((Comparable<Object>) value); + } + + @Override public boolean hasFinalResult() { return false; } diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java index 475e953..c8c4157 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java +++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/SumAggrResult.java @@ -19,6 +19,9 @@ package org.apache.iotdb.db.query.aggregation.impl; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; import org.apache.iotdb.db.query.aggregation.AggregateResult; import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; @@ -30,10 +33,6 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; - public class SumAggrResult extends AggregateResult { private TSDataType seriesDataType; @@ -88,6 +87,11 @@ public class SumAggrResult extends AggregateResult { } } + @Override + public void updateResultUsingTimestamps(long time, Object value) { + updateSum(value); + } + private void updateSum(Object sumVal) throws UnSupportedDataTypeException { double preValue = getDoubleValue(); switch (seriesDataType) { diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index 4c54ac9..54b4faa 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -19,6 +19,13 @@ package org.apache.iotdb.db.query.executor; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.StorageEngine; @@ -52,14 +59,6 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.read.query.timegenerator.TimeGenerator; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - @SuppressWarnings("java:S1135") // ignore todos public class AggregationExecutor { @@ -350,26 +349,36 @@ public class AggregationExecutor { } TimeGenerator timestampGenerator = getTimeGenerator(context, queryPlan); List<IReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>(); + // group by path name + Map<PartialPath, List<Integer>> pathToAggrIndexesMap = + groupAggregationsBySeries(selectedSeries); + List<AggregateResult> aggregateResults = new ArrayList<>(); + // series id -> list of result + Map<Integer, List<Integer>> readerIdToAggrIndexesMap = new HashMap<>(); + List<StorageGroupProcessor> list = StorageEngine.getInstance().mergeLock(selectedSeries); try { for (int i = 0; i < selectedSeries.size(); i++) { - PartialPath path = selectedSeries.get(i); - IReaderByTimestamp seriesReaderByTimestamp = - getReaderByTime(path, queryPlan, dataTypes.get(i), context); - readersOfSelectedSeries.add(seriesReaderByTimestamp); + List<Integer> indexes = pathToAggrIndexesMap.remove(selectedSeries.get(i)); + if (indexes != null) { + readerIdToAggrIndexesMap.put(readersOfSelectedSeries.size(), indexes); + IReaderByTimestamp seriesReaderByTimestamp = + getReaderByTime(selectedSeries.get(i), queryPlan, dataTypes.get(i), context); + readersOfSelectedSeries.add(seriesReaderByTimestamp); + } } } finally { StorageEngine.getInstance().mergeUnLock(list); } - List<AggregateResult> aggregateResults = new ArrayList<>(); for (int i = 0; i < selectedSeries.size(); i++) { TSDataType type = dataTypes.get(i); AggregateResult result = AggregateResultFactory.getAggrResultByName(aggregations.get(i), type, ascending); aggregateResults.add(result); } - aggregateWithValueFilter(aggregateResults, timestampGenerator, readersOfSelectedSeries); + aggregateWithValueFilter( + aggregateResults, timestampGenerator, readersOfSelectedSeries, readerIdToAggrIndexesMap); return constructDataSet(aggregateResults, queryPlan); } @@ -395,7 +404,8 @@ public class AggregationExecutor { private void aggregateWithValueFilter( List<AggregateResult> aggregateResults, TimeGenerator timestampGenerator, - List<IReaderByTimestamp> readersOfSelectedSeries) + List<IReaderByTimestamp> readersOfSelectedSeries, + Map<Integer, List<Integer>> readerIdToAggrIndexesMap) throws IOException { while (timestampGenerator.hasNext()) { @@ -412,10 +422,14 @@ public class AggregationExecutor { // cal part of aggregate result for (int i = 0; i < readersOfSelectedSeries.size(); i++) { - aggregateResults - .get(i) - .updateResultUsingTimestamps( - timeArray, timeArrayLength, readersOfSelectedSeries.get(i)); + for (int j = 0; j < timeArrayLength; j++) { + Object value = readersOfSelectedSeries.get(i).getValueInTimestamp(timeArray[j]); + if (value != null) { + for (int resultIndex : readerIdToAggrIndexesMap.get(i)) { + aggregateResults.get(resultIndex).updateResultUsingTimestamps(timeArray[j], value); + } + } + } } } }
