This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch aggrVector2 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c38658d785a1fd7477c05bd711b2bb15067d0538 Author: Alima777 <[email protected]> AuthorDate: Thu Sep 16 15:08:20 2021 +0800 implement update by statistics --- .../db/query/executor/AggregationExecutor.java | 238 +++++++++++++++++++-- .../db/query/reader/series/IAggregateReader.java | 2 +- .../iotdb/db/query/reader/series/SeriesReader.java | 38 ++++ .../reader/series/VectorSeriesAggregateReader.java | 169 +++++++++++++++ .../tsfile/file/metadata/VectorChunkMetadata.java | 4 + .../file/metadata/VectorTimeSeriesMetadata.java | 4 + .../tsfile/read/reader/page/VectorPageReader.java | 4 + 7 files changed, 443 insertions(+), 16 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java index 871915a..a750a62 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java @@ -41,6 +41,7 @@ import org.apache.iotdb.db.query.reader.series.IAggregateReader; import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp; import org.apache.iotdb.db.query.reader.series.SeriesAggregateReader; import org.apache.iotdb.db.query.reader.series.SeriesReaderByTimestamp; +import org.apache.iotdb.db.query.reader.series.VectorSeriesAggregateReader; import org.apache.iotdb.db.query.timegenerator.ServerTimeGenerator; import org.apache.iotdb.db.utils.FilePathUtils; import org.apache.iotdb.db.utils.QueryUtils; @@ -107,7 +108,7 @@ public class AggregationExecutor { StorageEngine.getInstance().mergeLock(new ArrayList<>(pathToAggrIndexesMap.keySet())); // Attention: this method will REMOVE vector path from pathToAggrIndexesMap - Map<PartialPath, Map<String, List<Integer>>> vectorPathIndexesMap = + Map<PartialPath, List<List<Integer>>> vectorPathIndexesMap = groupVectorSeries(pathToAggrIndexesMap); try { for (Map.Entry<PartialPath, List<Integer>> entry : pathToAggrIndexesMap.entrySet()) { @@ -118,8 +119,7 @@ public class AggregationExecutor { aggregationPlan.getAllMeasurementsInDevice(seriesPath.getDevice()), timeFilter); } - for (Map.Entry<PartialPath, Map<String, List<Integer>>> entry : - vectorPathIndexesMap.entrySet()) { + for (Map.Entry<PartialPath, List<List<Integer>>> entry : vectorPathIndexesMap.entrySet()) { VectorPartialPath vectorSeries = (VectorPartialPath) entry.getKey(); aggregateOneVectorSeries( vectorSeries, @@ -150,7 +150,6 @@ public class AggregationExecutor { boolean[] isAsc = new boolean[aggregateResultList.length]; TSDataType tsDataType = dataTypes.get(indexes.get(0)); - for (int i : indexes) { // construct AggregateResult AggregateResult aggregateResult = @@ -183,11 +182,56 @@ public class AggregationExecutor { } protected void aggregateOneVectorSeries( - PartialPath seriesPath, - Map<String, List<Integer>> subIndexes, + VectorPartialPath seriesPath, + List<List<Integer>> subIndexes, Set<String> allMeasurementsInDevice, Filter timeFilter) - throws IOException, QueryProcessException, StorageEngineException {} + throws IOException, QueryProcessException, StorageEngineException { + List<List<AggregateResult>> ascAggregateResultList = new ArrayList<>(); + List<List<AggregateResult>> descAggregateResultList = new ArrayList<>(); + boolean[] isAsc = new boolean[aggregateResultList.length]; + + for (List<Integer> subIndex : subIndexes) { + TSDataType tsDataType = dataTypes.get(subIndex.get(0)); + List<AggregateResult> subAscResultList = new ArrayList<>(); + List<AggregateResult> subDescResultList = new ArrayList<>(); + for (int i : subIndex) { + // construct AggregateResult + AggregateResult aggregateResult = + AggregateResultFactory.getAggrResultByName(aggregations.get(i), tsDataType); + if (aggregateResult.isAscending()) { + subAscResultList.add(aggregateResult); + isAsc[i] = true; + } else { + subDescResultList.add(aggregateResult); + } + } + ascAggregateResultList.add(subAscResultList); + descAggregateResultList.add(subDescResultList); + } + + aggregateOneVectorSeries( + seriesPath, + allMeasurementsInDevice, + context, + timeFilter, + TSDataType.VECTOR, + ascAggregateResultList, + descAggregateResultList, + null); + + for (int i = 0; i < subIndexes.size(); i++) { + List<Integer> subIndex = subIndexes.get(i); + List<AggregateResult> subAscResultList = ascAggregateResultList.get(i); + List<AggregateResult> subDescResultList = descAggregateResultList.get(i); + int ascIndex = 0; + int descIndex = 0; + for (int index : subIndex) { + aggregateResultList[index] = + isAsc[index] ? subAscResultList.get(ascIndex++) : subDescResultList.get(descIndex++); + } + } + } @SuppressWarnings("squid:S107") public static void aggregateOneSeries( @@ -240,6 +284,56 @@ public class AggregationExecutor { } } + public static void aggregateOneVectorSeries( + VectorPartialPath seriesPath, + Set<String> measurements, + QueryContext context, + Filter timeFilter, + TSDataType tsDataType, + List<List<AggregateResult>> ascAggregateResultList, + List<List<AggregateResult>> descAggregateResultList, + TsFileFilter fileFilter) + throws StorageEngineException, IOException, QueryProcessException { + + // construct series reader without value filter + QueryDataSource queryDataSource = + QueryResourceManager.getInstance().getQueryDataSource(seriesPath, context, timeFilter); + if (fileFilter != null) { + QueryUtils.filterQueryDataSource(queryDataSource, fileFilter); + } + // update filter by TTL + timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); + + if (ascAggregateResultList != null && !ascAggregateResultList.isEmpty()) { + VectorSeriesAggregateReader seriesReader = + new VectorSeriesAggregateReader( + seriesPath, + measurements, + tsDataType, + context, + queryDataSource, + timeFilter, + null, + null, + true); + aggregateFromVectorReader(seriesReader, ascAggregateResultList); + } + if (descAggregateResultList != null && !descAggregateResultList.isEmpty()) { + VectorSeriesAggregateReader seriesReader = + new VectorSeriesAggregateReader( + seriesPath, + measurements, + tsDataType, + context, + queryDataSource, + timeFilter, + null, + null, + false); + aggregateFromVectorReader(seriesReader, descAggregateResultList); + } + } + @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning private static void aggregateFromReader( IAggregateReader seriesReader, List<AggregateResult> aggregateResultList) @@ -285,6 +379,72 @@ public class AggregationExecutor { } } + private static void aggregateFromVectorReader( + VectorSeriesAggregateReader seriesReader, List<List<AggregateResult>> aggregateResultList) + throws QueryProcessException, IOException { + int remainingToCalculate = 0; + List<boolean[]> isCalculatedArray = new ArrayList<>(); + for (List<AggregateResult> subAggregateResults : aggregateResultList) { + remainingToCalculate += subAggregateResults.size(); + boolean[] subCalculatedArray = new boolean[subAggregateResults.size()]; + isCalculatedArray.add(subCalculatedArray); + } + + while (seriesReader.hasNextFile()) { + // cal by file statistics + if (seriesReader.canUseCurrentFileStatistics()) { + while (seriesReader.getCurIndex() < seriesReader.getSubSensorSize()) { + Statistics fileStatistics = seriesReader.currentFileStatistics(); + remainingToCalculate = + aggregateStatistics( + aggregateResultList.get(seriesReader.getCurIndex()), + isCalculatedArray.get(seriesReader.getCurIndex()), + remainingToCalculate, + fileStatistics); + if (remainingToCalculate == 0) { + seriesReader.resetIndex(); + return; + } + seriesReader.nextIndex(); + } + seriesReader.skipCurrentFile(); + continue; + } + + while (seriesReader.hasNextChunk()) { + // cal by chunk statistics + if (seriesReader.canUseCurrentChunkStatistics()) { + while (seriesReader.getCurIndex() < seriesReader.getSubSensorSize()) { + Statistics chunkStatistics = seriesReader.currentChunkStatistics(); + remainingToCalculate = + aggregateStatistics( + aggregateResultList.get(seriesReader.getCurIndex()), + isCalculatedArray.get(seriesReader.getCurIndex()), + remainingToCalculate, + chunkStatistics); + if (remainingToCalculate == 0) { + seriesReader.resetIndex(); + return; + } + seriesReader.nextIndex(); + } + seriesReader.skipCurrentChunk(); + continue; + } + + remainingToCalculate = + aggregateVectorPages( + seriesReader, + aggregateResultList.get(seriesReader.getCurIndex()), + isCalculatedArray.get(seriesReader.getCurIndex()), + remainingToCalculate); + if (remainingToCalculate == 0) { + return; + } + } + } + } + /** Aggregate each result in the list with the statistics */ private static int aggregateStatistics( List<AggregateResult> aggregateResultList, @@ -348,6 +508,48 @@ public class AggregationExecutor { return remainingToCalculate; } + private static int aggregateVectorPages( + VectorSeriesAggregateReader seriesReader, + List<AggregateResult> aggregateResultList, + boolean[] isCalculatedArray, + int remainingToCalculate) + throws IOException, QueryProcessException { + while (seriesReader.hasNextPage()) { + // cal by page statistics + if (seriesReader.canUseCurrentPageStatistics()) { + while (seriesReader.getCurIndex() < seriesReader.getSubSensorSize()) { + Statistics pageStatistic = seriesReader.currentPageStatistics(); + remainingToCalculate = + aggregateStatistics( + aggregateResultList, isCalculatedArray, remainingToCalculate, pageStatistic); + if (remainingToCalculate == 0) { + seriesReader.resetIndex(); + return 0; + } + seriesReader.nextIndex(); + } + seriesReader.skipCurrentPage(); + continue; + } + BatchData nextOverlappedPageData = seriesReader.nextPage(); + for (int i = 0; i < aggregateResultList.size(); i++) { + if (!isCalculatedArray[i]) { + AggregateResult aggregateResult = aggregateResultList.get(i); + aggregateResult.updateResultFromPageData(nextOverlappedPageData); + nextOverlappedPageData.resetBatchData(); + if (aggregateResult.hasFinalResult()) { + isCalculatedArray[i] = true; + remainingToCalculate--; + if (remainingToCalculate == 0) { + return 0; + } + } + } + } + } + return remainingToCalculate; + } + /** execute aggregate function with value filter. */ public QueryDataSet executeWithValueFilter(AggregationPlan queryPlan) throws StorageEngineException, IOException, QueryProcessException { @@ -521,19 +723,25 @@ public class AggregationExecutor { /** * Group all the subSensors of one vector into one VectorPartialPath and Remove vectorPartialPath - * from pathToAggrIndexesMap. - * - * @return e.g. vector[s1, s2], Map{s1 -> 1, s2 -> 2} + * from pathToAggrIndexesMap. For example, input map: vector1[s1] -> [1, 3], vector1[s2] -> [2,4], + * will return vector1[s1,s2], [[1,3], [2,4]] */ - private Map<PartialPath, Map<String, List<Integer>>> groupVectorSeries( + private Map<PartialPath, List<List<Integer>>> groupVectorSeries( Map<PartialPath, List<Integer>> pathToAggrIndexesMap) { - Map<PartialPath, Map<String, List<Integer>>> result = new HashMap<>(); + Map<PartialPath, List<List<Integer>>> result = new HashMap<>(); + Map<String, VectorPartialPath> temp = new HashMap<>(); + for (PartialPath seriesPath : pathToAggrIndexesMap.keySet()) { if (seriesPath instanceof VectorPartialPath) { List<Integer> indexes = pathToAggrIndexesMap.remove(seriesPath); - result - .computeIfAbsent(seriesPath, key -> new HashMap<>()) - .put(((VectorPartialPath) seriesPath).getSubSensor(0), indexes); + VectorPartialPath groupPath = temp.get(seriesPath.getFullPath()); + if (groupPath == null) { + groupPath = (VectorPartialPath) seriesPath.copy(); + temp.put(seriesPath.getFullPath(), groupPath); + } else { + groupPath.addSubSensor(((VectorPartialPath) seriesPath).getSubSensorsList()); + } + result.computeIfAbsent(groupPath, key -> new ArrayList<>()).add(indexes); } } return result; diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java index 867ad9d..c48b88e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/IAggregateReader.java @@ -37,7 +37,7 @@ public interface IAggregateReader { boolean canUseCurrentChunkStatistics() throws IOException; - Statistics currentChunkStatistics(); + Statistics currentChunkStatistics() throws IOException; void skipCurrentChunk(); diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java index ebd3db0..b6f2d3e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java @@ -35,6 +35,8 @@ import org.apache.iotdb.db.utils.QueryUtils; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.tsfile.file.metadata.IChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.ITimeSeriesMetadata; +import org.apache.iotdb.tsfile.file.metadata.VectorChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.VectorTimeSeriesMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.TimeValuePair; @@ -43,6 +45,7 @@ import org.apache.iotdb.tsfile.read.common.BatchDataFactory; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.filter.basic.UnaryFilter; import org.apache.iotdb.tsfile.read.reader.IPageReader; +import org.apache.iotdb.tsfile.read.reader.page.VectorPageReader; import java.io.IOException; import java.io.Serializable; @@ -273,6 +276,13 @@ public class SeriesReader { return firstTimeSeriesMetadata.getStatistics(); } + Statistics currentFileStatistics(int index) throws IOException { + if (!(firstTimeSeriesMetadata instanceof VectorTimeSeriesMetadata)) { + throw new IOException("Can only get statistics by index from vectorTimeSeriesMetaData"); + } + return ((VectorTimeSeriesMetadata) firstTimeSeriesMetadata).getStatistics(index); + } + boolean currentFileModified() throws IOException { if (firstTimeSeriesMetadata == null) { throw new IOException("no first file"); @@ -394,6 +404,13 @@ public class SeriesReader { return firstChunkMetadata.getStatistics(); } + Statistics currentChunkStatistics(int index) throws IOException { + if (!(firstChunkMetadata instanceof VectorChunkMetadata)) { + throw new IOException("Can only get statistics by index from vectorChunkMetaData"); + } + return ((VectorChunkMetadata) firstChunkMetadata).getStatistics(index); + } + boolean currentChunkModified() throws IOException { if (firstChunkMetadata == null) { throw new IOException("no first chunk"); @@ -615,6 +632,16 @@ public class SeriesReader { return firstPageReader.getStatistics(); } + Statistics currentPageStatistics(int index) throws IOException { + if (firstPageReader == null) { + return null; + } + if (!(firstPageReader.isVectorPageReader())) { + throw new IOException("Can only get statistics by index from VectorPageReader"); + } + return firstPageReader.getStatistics(index); + } + boolean currentPageModified() throws IOException { if (firstPageReader == null) { throw new IOException("no first page"); @@ -1023,10 +1050,21 @@ public class SeriesReader { this.isSeq = isSeq; } + public boolean isVectorPageReader() { + return data instanceof VectorPageReader; + } + Statistics getStatistics() { return data.getStatistics(); } + Statistics getStatistics(int index) throws IOException { + if (!(data instanceof VectorPageReader)) { + throw new IOException("Can only get statistics by index from VectorPageReader"); + } + return ((VectorPageReader) data).getStatistics(index); + } + BatchData getAllSatisfiedPageData(boolean ascending) throws IOException { return data.getAllSatisfiedPageData(ascending); } diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java new file mode 100644 index 0000000..5f35d45 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/VectorSeriesAggregateReader.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.query.reader.series; + +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.metadata.VectorPartialPath; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.filter.TsFileFilter; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.read.common.BatchData; +import org.apache.iotdb.tsfile.read.filter.basic.Filter; + +import java.io.IOException; +import java.util.Set; + +public class VectorSeriesAggregateReader implements IAggregateReader { + + private final SeriesReader seriesReader; + private int curIndex = 0; + private int subSensorSize; + + public VectorSeriesAggregateReader( + VectorPartialPath seriesPath, + Set<String> allSensors, + TSDataType dataType, + QueryContext context, + QueryDataSource dataSource, + Filter timeFilter, + Filter valueFilter, + TsFileFilter fileFilter, + boolean ascending) { + this.seriesReader = + new SeriesReader( + seriesPath, + allSensors, + dataType, + context, + dataSource, + timeFilter, + valueFilter, + fileFilter, + ascending); + this.subSensorSize = seriesPath.getSubSensorsList().size(); + } + + @Override + public boolean isAscending() { + return seriesReader.getOrderUtils().getAscending(); + } + + @Override + public boolean hasNextFile() throws IOException { + return seriesReader.hasNextFile(); + } + + @Override + public boolean canUseCurrentFileStatistics() throws IOException { + Statistics fileStatistics = currentFileStatistics(); + return !seriesReader.isFileOverlapped() + && containedByTimeFilter(fileStatistics) + && !seriesReader.currentFileModified(); + } + + @Override + public Statistics currentFileStatistics() throws IOException { + return seriesReader.currentFileStatistics(curIndex); + } + + @Override + public void skipCurrentFile() { + seriesReader.skipCurrentFile(); + } + + @Override + public boolean hasNextChunk() throws IOException { + return seriesReader.hasNextChunk(); + } + + @Override + public boolean canUseCurrentChunkStatistics() throws IOException { + Statistics chunkStatistics = currentChunkStatistics(); + return !seriesReader.isChunkOverlapped() + && containedByTimeFilter(chunkStatistics) + && !seriesReader.currentChunkModified(); + } + + @Override + public Statistics currentChunkStatistics() throws IOException { + return seriesReader.currentChunkStatistics(curIndex); + } + + @Override + public void skipCurrentChunk() { + seriesReader.skipCurrentChunk(); + } + + @Override + public boolean hasNextPage() throws IOException { + return seriesReader.hasNextPage(); + } + + @Override + public boolean canUseCurrentPageStatistics() throws IOException { + Statistics currentPageStatistics = currentPageStatistics(); + if (currentPageStatistics == null) { + return false; + } + return !seriesReader.isPageOverlapped() + && containedByTimeFilter(currentPageStatistics) + && !seriesReader.currentPageModified(); + } + + @Override + public Statistics currentPageStatistics() throws IOException { + return seriesReader.currentPageStatistics(curIndex); + } + + @Override + public void skipCurrentPage() { + seriesReader.skipCurrentPage(); + } + + @Override + public BatchData nextPage() throws IOException { + return seriesReader.nextPage().flip(); + } + + private boolean containedByTimeFilter(Statistics statistics) { + Filter timeFilter = seriesReader.getTimeFilter(); + return timeFilter == null + || timeFilter.containStartEndTime(statistics.getStartTime(), statistics.getEndTime()); + } + + public int getCurIndex() { + return curIndex; + } + + public int getSubSensorSize() { + return subSensorSize; + } + + public void nextIndex() { + curIndex++; + if (curIndex > subSensorSize) { + resetIndex(); + } + } + + public void resetIndex() { + curIndex = 0; + } +} diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java index 6558da0..2bfda53 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorChunkMetadata.java @@ -49,6 +49,10 @@ public class VectorChunkMetadata implements IChunkMetadata { : timeChunkMetadata.getStatistics(); } + public Statistics getStatistics(int index) { + return valueChunkMetadataList.get(index).getStatistics(); + } + @Override public boolean isModified() { return timeChunkMetadata.isModified(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java index 87e88af..c3d727f 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/VectorTimeSeriesMetadata.java @@ -49,6 +49,10 @@ public class VectorTimeSeriesMetadata implements ITimeSeriesMetadata { : timeseriesMetadata.getStatistics(); } + public Statistics getStatistics(int index) { + return valueTimeseriesMetadataList.get(index).getStatistics(); + } + @Override public boolean isModified() { return timeseriesMetadata.isModified(); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java index 09a2ab7..5d2b368 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/VectorPageReader.java @@ -117,6 +117,10 @@ public class VectorPageReader implements IPageReader { : timePageReader.getStatistics(); } + public Statistics getStatistics(int index) { + return valuePageReaderList.get(index).getStatistics(); + } + @Override public void setFilter(Filter filter) { this.filter = filter;
