This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/M4-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 93118b5a8335d3b7c9a8823021082aba8fb43493 Author: Lei Rui <[email protected]> AuthorDate: Wed Jan 25 23:09:21 2023 +0800 query dev 50% --- .../dataset/groupby/LocalGroupByExecutor4CPV.java | 448 ++++++++------------- .../apache/iotdb/db/tools/TsFileSketchTool.java | 2 +- .../apache/iotdb/db/integration/m4/MyTest1.java | 9 +- .../iotdb/tsfile/common/conf/TSFileConfig.java | 2 +- .../iotdb/tsfile/file/metadata/ChunkMetadata.java | 29 ++ .../file/metadata/statistics/DoubleStatistics.java | 16 + .../file/metadata/statistics/FloatStatistics.java | 16 + .../metadata/statistics/IntegerStatistics.java | 119 ++++-- .../file/metadata/statistics/LongStatistics.java | 16 + .../file/metadata/statistics/Statistics.java | 6 +- .../file/metadata/statistics/StepRegress.java | 28 +- .../iotdb/tsfile/read/common/ChunkSuit4CPV.java | 371 +++++++++++++++-- .../iotdb/tsfile/read/reader/page/PageReader.java | 354 ++++++++-------- 13 files changed, 895 insertions(+), 521 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java index 545190e8fe..2cabc62d4e 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java @@ -19,6 +19,14 @@ package org.apache.iotdb.db.query.dataset.groupby; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -50,22 +58,13 @@ import org.apache.iotdb.tsfile.read.reader.page.PageReader; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.utils.TsPrimitiveType; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; - /** * Sql format: SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), * max_value(s0) ROM root.xx group by ([tqs,tqe),IntervalLength). Requirements: (1) Don't change the * sequence of the above six aggregates (2) Make sure (tqe-tqs) is divisible by IntervalLength. (3) * Assume each chunk has only one page. */ -// This is the CPVGroupByExecutor in paper. +// This is the CPVGroupByExecutor in M4-LSM paper. public class LocalGroupByExecutor4CPV implements GroupByExecutor { // Aggregate result buffer of this path @@ -85,50 +84,34 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // private PriorityMergeReader mergeReader; - public LocalGroupByExecutor4CPV( - PartialPath path, - Set<String> allSensors, - TSDataType dataType, - QueryContext context, - Filter timeFilter, - TsFileFilter fileFilter, - boolean ascending) + public LocalGroupByExecutor4CPV(PartialPath path, Set<String> allSensors, TSDataType dataType, + QueryContext context, Filter timeFilter, TsFileFilter fileFilter, boolean ascending) throws StorageEngineException, QueryProcessException { this.tsDataType = dataType; // this.mergeReader = new PriorityMergeReader(); // get all data sources - QueryDataSource queryDataSource = - QueryResourceManager.getInstance().getQueryDataSource(path, context, this.timeFilter); + QueryDataSource queryDataSource = QueryResourceManager.getInstance() + .getQueryDataSource(path, context, this.timeFilter); // update filter by TTL this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); - SeriesReader seriesReader = - new SeriesReader( - path, - allSensors, - dataType, - context, - queryDataSource, - timeFilter, - null, - fileFilter, - ascending); + SeriesReader seriesReader = new SeriesReader(path, allSensors, dataType, context, + queryDataSource, timeFilter, null, fileFilter, ascending); // unpackAllOverlappedFilesToTimeSeriesMetadata try { // TODO: this might be bad to load all chunk metadata at first futureChunkList.addAll(seriesReader.getAllChunkMetadatas4CPV()); // order futureChunkList by chunk startTime - futureChunkList.sort( - new Comparator<ChunkSuit4CPV>() { - public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - return ((Comparable) (o1.getChunkMetadata().getStartTime())) - .compareTo(o2.getChunkMetadata().getStartTime()); - } - }); + futureChunkList.sort(new Comparator<ChunkSuit4CPV>() { + public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { + return ((Comparable) (o1.getChunkMetadata().getStartTime())).compareTo( + o2.getChunkMetadata().getStartTime()); + } + }); } catch (IOException e) { throw new QueryProcessException(e.getMessage()); } @@ -139,9 +122,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { results.add(aggrResult); } - private void getCurrentChunkListFromFutureChunkList( - long curStartTime, long curEndTime, long startTime, long endTime, long interval) - throws IOException { + private void getCurrentChunkListFromFutureChunkList(long curStartTime, long curEndTime, + long startTime, long endTime, long interval) throws IOException { // empty currentChunkList currentChunkList = new ArrayList<>(); @@ -154,7 +136,6 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // iterate futureChunkList ListIterator itr = futureChunkList.listIterator(); - // List<ChunkSuit4CPV> tmpFutureChunkList = new ArrayList<>(); while (itr.hasNext()) { ChunkSuit4CPV chunkSuit4CPV = (ChunkSuit4CPV) (itr.next()); ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata(); @@ -177,23 +158,20 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // the chunk partially overlaps in time with the current M4 interval Ii. // load this chunk, split it on deletes and all w intervals. // add to currentChunkList and futureChunkList. + // TODO chunk data read operation (b) get the closest data point after or before a timestamp itr.remove(); // B: loads chunk data from disk to memory // C: decompress page data, split time&value buffers - List<IPageReader> pageReaderList = - FileLoaderUtils.loadPageReaderList(chunkSuit4CPV.getChunkMetadata(), this.timeFilter); + List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( + chunkSuit4CPV.getChunkMetadata(), this.timeFilter); + if (pageReaderList.size() > 1) { + throw new IOException("Against the assumption that there is only one page in a chunk!"); + } for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk // assume all data on disk, no data in memory - ((PageReader) pageReader) - .split4CPV( - startTime, - endTime, - interval, - curStartTime, - currentChunkList, - splitChunkList, - chunkMetadata); + ((PageReader) pageReader).split4CPV(startTime, endTime, interval, curStartTime, + currentChunkList, splitChunkList, chunkMetadata); } } } @@ -201,14 +179,13 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { /** * @param curStartTime closed - * @param curEndTime open - * @param startTime closed - * @param endTime open + * @param curEndTime open + * @param startTime closed + * @param endTime open */ @Override - public List<AggregateResult> calcResult( - long curStartTime, long curEndTime, long startTime, long endTime, long interval) - throws IOException { + public List<AggregateResult> calcResult(long curStartTime, long curEndTime, long startTime, + long endTime, long interval) throws IOException { // clear result cache for (AggregateResult result : results) { result.reset(); @@ -228,7 +205,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { return results; } - /** 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中 */ + /** + * 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中 + */ private void updateBatchData(ChunkSuit4CPV chunkSuit4CPV, TSDataType dataType) { if (chunkSuit4CPV.getBatchData() != null) { BatchData batchData1 = BatchDataFactory.createBatchData(dataType, true, false); @@ -291,68 +270,53 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } } - private void calculateBottomPoint( - List<ChunkSuit4CPV> currentChunkList, - long startTime, - long endTime, - long interval, - long curStartTime) - throws IOException { + private void calculateBottomPoint(List<ChunkSuit4CPV> currentChunkList, long startTime, + long endTime, long interval, long curStartTime) throws IOException { while (true) { // 循环1 // 按照bottomValue排序,找出BP candidate set currentChunkList.sort( new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for different // aggregations public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - return ((Comparable) (o1.getChunkMetadata().getStatistics().getMinValue())) - .compareTo(o2.getChunkMetadata().getStatistics().getMinValue()); + return ((Comparable) (o1.getStatistics().getMinValue())).compareTo( + o2.getStatistics().getMinValue()); } }); - Object value = currentChunkList.get(0).getChunkMetadata().getStatistics().getMinValue(); + Object value = currentChunkList.get(0).getStatistics().getMinValue(); List<ChunkSuit4CPV> candidateSet = new ArrayList<>(); for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) { - if (chunkSuit4CPV.getChunkMetadata().getStatistics().getMinValue().equals(value)) { + if (chunkSuit4CPV.getStatistics().getMinValue().equals(value)) { candidateSet.add(chunkSuit4CPV); } else { break; } } - List<ChunkSuit4CPV> nonLazyLoad = - new ArrayList<>( - candidateSet); // TODO check, whether nonLazyLoad remove affects candidateSet + List<ChunkSuit4CPV> nonLazyLoad = new ArrayList<>( + candidateSet); // TODO check, whether nonLazyLoad remove affects candidateSet nonLazyLoad.sort( new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for version public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - return new MergeReaderPriority( - o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()) - .compareTo( - new MergeReaderPriority( - o1.getChunkMetadata().getVersion(), - o1.getChunkMetadata().getOffsetOfChunkHeader())); + return new MergeReaderPriority(o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()).compareTo( + new MergeReaderPriority(o1.getChunkMetadata().getVersion(), + o1.getChunkMetadata().getOffsetOfChunkHeader())); } }); while (true) { // 循环2 // 如果set里所有点所在的chunk都是lazy // load,则对所有块进行load,应用deleteIntervals,并把BP删掉(因为不管是被删除删掉还是被更新删掉都是删掉这个点) if (nonLazyLoad.size() == 0) { + // TODO chunk data read operation (c): get all data points for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) { if (chunkSuit4CPV.getBatchData() == null) { currentChunkList.remove(chunkSuit4CPV); // TODO check this - List<IPageReader> pageReaderList = - FileLoaderUtils.loadPageReaderList( - chunkSuit4CPV.getChunkMetadata(), this.timeFilter); + List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( + chunkSuit4CPV.getChunkMetadata(), this.timeFilter); for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk - ((PageReader) pageReader) - .split4CPV( - startTime, - endTime, - interval, - curStartTime, - currentChunkList, - null, - chunkSuit4CPV.getChunkMetadata()); + ((PageReader) pageReader).split4CPV(startTime, endTime, interval, curStartTime, + currentChunkList, // add back into currentChunkList with loaded data + null, chunkSuit4CPV.getChunkMetadata()); } } else { // 已经load过,比如一开始被M4 interval分开,现在因为update而candidate失效 updateBatchData(chunkSuit4CPV, tsDataType); @@ -362,17 +326,15 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } // 否则,找出candidate set里非lazy load里version最高的那个块的BP点作为candidate point ChunkSuit4CPV candidate = nonLazyLoad.get(0); // TODO check sort right - MergeReaderPriority candidateVersion = - new MergeReaderPriority( - candidate.getChunkMetadata().getVersion(), - candidate.getChunkMetadata().getOffsetOfChunkHeader()); - long candidateTimestamp = - candidate.getChunkMetadata().getStatistics().getBottomTimestamp(); // TODO check - Object candidateValue = - candidate.getChunkMetadata().getStatistics().getMinValue(); // TODO check + MergeReaderPriority candidateVersion = new MergeReaderPriority( + candidate.getChunkMetadata().getVersion(), + candidate.getChunkMetadata().getOffsetOfChunkHeader()); + long candidateTimestamp = candidate.getChunkMetadata().getStatistics() + .getBottomTimestamp(); // TODO check + Object candidateValue = candidate.getChunkMetadata().getStatistics() + .getMinValue(); // TODO check - // verify这个candidate point - // 是否被删除 + // verify if this candidate point is deleted boolean isDeletedItself = false; if (candidate.getChunkMetadata().getDeleteIntervalList() != null) { for (TimeRange timeRange : candidate.getChunkMetadata().getDeleteIntervalList()) { @@ -395,9 +357,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { List<ChunkSuit4CPV> overlaps = new ArrayList<>(); for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) { ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata(); - MergeReaderPriority version = - new MergeReaderPriority( - chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader()); + MergeReaderPriority version = new MergeReaderPriority(chunkMetadata.getVersion(), + chunkMetadata.getOffsetOfChunkHeader()); if (version.compareTo(candidateVersion) <= 0) { // including bottomChunkMetadata continue; } @@ -409,30 +370,29 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } if (overlaps.size() == 0) { // 否被overlap,则当前candidate point就是计算结果,结束 - results - .get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, + results.get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] - .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + .updateResultUsingValues(new long[]{candidateTimestamp}, 1, + new Object[]{candidateValue}); // TODO check updateResult return; // 计算结束 } else { // 是被overlap,则partial scan所有这些overlap的块 + // TODO chunk data read operation (a): check existence of data point at a timestamp boolean isUpdate = false; for (ChunkSuit4CPV chunkSuit4CPV : overlaps) { // scan这个chunk的数据 if (chunkSuit4CPV.getBatchData() == null) { - List<IPageReader> pageReaderList = - FileLoaderUtils.loadPageReaderList( - chunkSuit4CPV.getChunkMetadata(), this.timeFilter); + List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( + chunkSuit4CPV.getChunkMetadata(), this.timeFilter); List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>(); for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk - isUpdate = - ((PageReader) pageReader).partialScan(candidateTimestamp); // TODO check + isUpdate = ((PageReader) pageReader).partialScan( + candidateTimestamp); // TODO check } } else { // 对已经加载的batchData进行partial scan,直到点的时间戳大于或等于candidateTimestamp - BatchDataIterator batchDataIterator = - chunkSuit4CPV.getBatchData().getBatchDataIterator(); + BatchDataIterator batchDataIterator = chunkSuit4CPV.getBatchData() + .getBatchDataIterator(); while (batchDataIterator.hasNextTimeValuePair()) { long timestamp = batchDataIterator.nextTimeValuePair().getTimestamp(); if (timestamp > candidateTimestamp) { @@ -443,8 +403,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { break; } } - chunkSuit4CPV - .getBatchData() + chunkSuit4CPV.getBatchData() .resetBatchData(); // This step is necessary, because this BatchData may be // accessed multiple times! } @@ -453,11 +412,10 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } } if (!isUpdate) { // partial scan了所有overlap的块都没有找到这样的点,则当前candidate point就是计算结果,结束 - results - .get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, + results.get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] - .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + .updateResultUsingValues(new long[]{candidateTimestamp}, 1, + new Object[]{candidateValue}); // TODO check updateResult return; // 计算结束 } else { // 找到这样的点,于是标记candidate point所在块为lazy @@ -467,9 +425,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { tmp.add(new TimeRange(candidateTimestamp, candidateTimestamp)); candidate.getChunkMetadata().setDeleteIntervalList(tmp); } else { - candidate - .getChunkMetadata() - .getDeleteIntervalList() + candidate.getChunkMetadata().getDeleteIntervalList() .add(new TimeRange(candidateTimestamp, candidateTimestamp)); // TODO check } // 删除那里不需要再加了,而这里更新就需要手动加一下删除操作 @@ -485,30 +441,22 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } } - private void calculateTopPoint( - List<ChunkSuit4CPV> currentChunkList, - long startTime, - long endTime, - long interval, - long curStartTime) - throws IOException { + private void calculateTopPoint(List<ChunkSuit4CPV> currentChunkList, long startTime, long endTime, + long interval, long curStartTime) throws IOException { while (true) { // 循环1 // 按照topValue排序,找出TP candidate set currentChunkList.sort( new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for different // aggregations public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - return ((Comparable) (o2.getChunkMetadata().getStatistics().getMaxValue())) - .compareTo(o1.getChunkMetadata().getStatistics().getMaxValue()); + return ((Comparable) (o2.getChunkMetadata().getStatistics().getMaxValue())).compareTo( + o1.getChunkMetadata().getStatistics().getMaxValue()); } }); Object value = currentChunkList.get(0).getChunkMetadata().getStatistics().getMaxValue(); List<ChunkSuit4CPV> candidateSet = new ArrayList<>(); for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) { - if (chunkSuit4CPV - .getChunkMetadata() - .getStatistics() - .getMaxValue() + if (chunkSuit4CPV.getChunkMetadata().getStatistics().getMaxValue() .equals(value)) { // TODO CHECK candidateSet.add(chunkSuit4CPV); } else { @@ -516,19 +464,15 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } } - List<ChunkSuit4CPV> nonLazyLoad = - new ArrayList<>( - candidateSet); // TODO check, whether nonLazyLoad remove affects candidateSet + List<ChunkSuit4CPV> nonLazyLoad = new ArrayList<>( + candidateSet); // TODO check, whether nonLazyLoad remove affects candidateSet nonLazyLoad.sort( new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for version public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - return new MergeReaderPriority( - o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()) - .compareTo( - new MergeReaderPriority( - o1.getChunkMetadata().getVersion(), - o1.getChunkMetadata().getOffsetOfChunkHeader())); + return new MergeReaderPriority(o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()).compareTo( + new MergeReaderPriority(o1.getChunkMetadata().getVersion(), + o1.getChunkMetadata().getOffsetOfChunkHeader())); } }); while (true) { // 循环2 @@ -538,19 +482,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) { if (chunkSuit4CPV.getBatchData() == null) { currentChunkList.remove(chunkSuit4CPV); // TODO check this - List<IPageReader> pageReaderList = - FileLoaderUtils.loadPageReaderList( - chunkSuit4CPV.getChunkMetadata(), this.timeFilter); + List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( + chunkSuit4CPV.getChunkMetadata(), this.timeFilter); for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk - ((PageReader) pageReader) - .split4CPV( - startTime, - endTime, - interval, - curStartTime, - currentChunkList, - null, - chunkSuit4CPV.getChunkMetadata()); + ((PageReader) pageReader).split4CPV(startTime, endTime, interval, curStartTime, + currentChunkList, null, chunkSuit4CPV.getChunkMetadata()); } } else { // 已经load过,比如一开始被M4 interval分开,现在因为update而candidate失效 updateBatchData(chunkSuit4CPV, tsDataType); @@ -560,14 +496,13 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } // 否则,找出candidate set里非lazy load里version最高的那个块的TP点作为candidate point ChunkSuit4CPV candidate = nonLazyLoad.get(0); // TODO check sort right - MergeReaderPriority candidateVersion = - new MergeReaderPriority( - candidate.getChunkMetadata().getVersion(), - candidate.getChunkMetadata().getOffsetOfChunkHeader()); - long candidateTimestamp = - candidate.getChunkMetadata().getStatistics().getTopTimestamp(); // TODO check - Object candidateValue = - candidate.getChunkMetadata().getStatistics().getMaxValue(); // TODO check + MergeReaderPriority candidateVersion = new MergeReaderPriority( + candidate.getChunkMetadata().getVersion(), + candidate.getChunkMetadata().getOffsetOfChunkHeader()); + long candidateTimestamp = candidate.getChunkMetadata().getStatistics() + .getTopTimestamp(); // TODO check + Object candidateValue = candidate.getChunkMetadata().getStatistics() + .getMaxValue(); // TODO check // verify这个candidate point // 是否被删除 @@ -593,9 +528,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { List<ChunkSuit4CPV> overlaps = new ArrayList<>(); for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) { ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata(); - MergeReaderPriority version = - new MergeReaderPriority( - chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader()); + MergeReaderPriority version = new MergeReaderPriority(chunkMetadata.getVersion(), + chunkMetadata.getOffsetOfChunkHeader()); if (version.compareTo(candidateVersion) <= 0) { // including topChunkMetadata continue; } @@ -607,11 +541,10 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } if (overlaps.size() == 0) { // 否被overlap,则当前candidate point就是计算结果,结束 - results - .get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, + results.get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] - .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + .updateResultUsingValues(new long[]{candidateTimestamp}, 1, + new Object[]{candidateValue}); // TODO check updateResult return; // 计算结束 } else { // 是被overlap,则partial scan所有这些overlap的块 @@ -619,18 +552,17 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { for (ChunkSuit4CPV chunkSuit4CPV : overlaps) { // scan这个chunk的数据 if (chunkSuit4CPV.getBatchData() == null) { - List<IPageReader> pageReaderList = - FileLoaderUtils.loadPageReaderList( - chunkSuit4CPV.getChunkMetadata(), this.timeFilter); + List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( + chunkSuit4CPV.getChunkMetadata(), this.timeFilter); List<ChunkSuit4CPV> tmpCurrentChunkList = new ArrayList<>(); for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk - isUpdate = - ((PageReader) pageReader).partialScan(candidateTimestamp); // TODO check + isUpdate = ((PageReader) pageReader).partialScan( + candidateTimestamp); // TODO check } } else { // 对已经加载的batchData进行partial scan,直到点的时间戳大于或等于candidateTimestamp - BatchDataIterator batchDataIterator = - chunkSuit4CPV.getBatchData().getBatchDataIterator(); + BatchDataIterator batchDataIterator = chunkSuit4CPV.getBatchData() + .getBatchDataIterator(); while (batchDataIterator.hasNextTimeValuePair()) { long timestamp = batchDataIterator.nextTimeValuePair().getTimestamp(); if (timestamp > candidateTimestamp) { @@ -641,8 +573,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { break; } } - chunkSuit4CPV - .getBatchData() + chunkSuit4CPV.getBatchData() .resetBatchData(); // This step is necessary, because this BatchData may be // accessed multiple times! } @@ -651,11 +582,10 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } } if (!isUpdate) { // partial scan了所有overlap的块都没有找到这样的点,则当前candidate point就是计算结果,结束 - results - .get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, + results.get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] - .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + .updateResultUsingValues(new long[]{candidateTimestamp}, 1, + new Object[]{candidateValue}); // TODO check updateResult return; // 计算结束 } else { // 找到这样的点,于是标记candidate point所在块为lazy @@ -665,9 +595,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { tmp.add(new TimeRange(candidateTimestamp, candidateTimestamp)); candidate.getChunkMetadata().setDeleteIntervalList(tmp); } else { - candidate - .getChunkMetadata() - .getDeleteIntervalList() + candidate.getChunkMetadata().getDeleteIntervalList() .add(new TimeRange(candidateTimestamp, candidateTimestamp)); // TODO check } // 删除那里不需要再加了,而这里更新就需要手动加一下删除操作 @@ -683,32 +611,23 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } } - private void calculateFirstPoint( - List<ChunkSuit4CPV> currentChunkList, - long startTime, - long endTime, - long interval, - long curStartTime) - throws IOException { + private void calculateFirstPoint(List<ChunkSuit4CPV> currentChunkList, long startTime, + long endTime, long interval, long curStartTime) throws IOException { while (true) { // 循环1 // 按照startTime和version排序,找出疑似FP candidate currentChunkList.sort( new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for different // aggregations public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - int res = - ((Comparable) (o1.getChunkMetadata().getStartTime())) - .compareTo(o2.getChunkMetadata().getStartTime()); + int res = ((Comparable) (o1.getChunkMetadata().getStartTime())).compareTo( + o2.getChunkMetadata().getStartTime()); if (res != 0) { return res; } else { - return new MergeReaderPriority( - o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()) - .compareTo( - new MergeReaderPriority( - o1.getChunkMetadata().getVersion(), - o1.getChunkMetadata().getOffsetOfChunkHeader())); + return new MergeReaderPriority(o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()).compareTo( + new MergeReaderPriority(o1.getChunkMetadata().getVersion(), + o1.getChunkMetadata().getOffsetOfChunkHeader())); } } }); @@ -718,26 +637,20 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { if (susp_candidate.isLazyLoad()) { // 如果是lazy // load,则此时load、应用deletes、更新batchData和statistics,取消lazyLoad标记,然后回到循环1 currentChunkList.remove(susp_candidate); // TODO check this - List<IPageReader> pageReaderList = - FileLoaderUtils.loadPageReaderList(susp_candidate.getChunkMetadata(), this.timeFilter); + List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( + susp_candidate.getChunkMetadata(), this.timeFilter); for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk - ((PageReader) pageReader) - .split4CPV( - startTime, - endTime, - interval, - curStartTime, - currentChunkList, - null, - susp_candidate.getChunkMetadata()); // 新增的ChunkSuit4CPV默认isLazyLoad=false + ((PageReader) pageReader).split4CPV(startTime, endTime, interval, curStartTime, + currentChunkList, null, + susp_candidate.getChunkMetadata()); // 新增的ChunkSuit4CPV默认isLazyLoad=false } continue; // 回到循环1 } else { // 如果不是lazy load,则该疑似candidate就是真正的candidate。 // 于是verification判断该点是否被更高优先级(更高优先级这一点在QueryUtils.modifyChunkMetaData(chunkMetadataList, // pathModifications)已做好)的deletes覆盖 long candidateTimestamp = susp_candidate.getChunkMetadata().getStartTime(); // TODO check - Object candidateValue = - susp_candidate.getChunkMetadata().getStatistics().getFirstValue(); // TODO check + Object candidateValue = susp_candidate.getChunkMetadata().getStatistics() + .getFirstValue(); // TODO check boolean isDeletedItself = false; long deleteEndTime = -1; @@ -745,11 +658,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { for (TimeRange timeRange : susp_candidate.getChunkMetadata().getDeleteIntervalList()) { if (timeRange.contains(candidateTimestamp)) { isDeletedItself = true; - deleteEndTime = - Math.max( - deleteEndTime, - timeRange - .getMax()); // deleteEndTime不会超过chunkEndTime,因为否则的话这个chunk就会modifyChunkMetaData步骤里被处理掉整个删掉 + deleteEndTime = Math.max(deleteEndTime, + timeRange.getMax()); // deleteEndTime不会超过chunkEndTime,因为否则的话这个chunk就会modifyChunkMetaData步骤里被处理掉整个删掉 // TODO check } } @@ -757,55 +667,40 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // 如果被删除,标记该点所在chunk为lazy load,并且在不load数据的情况下更新chunkStartTime,然后回到循环1 if (isDeletedItself) { susp_candidate.setLazyLoad(true); - susp_candidate - .getChunkMetadata() - .getStatistics() + susp_candidate.getChunkMetadata().getStatistics() .setStartTime(deleteEndTime); // TODO check continue; // 回到循环1 } else { // 否则,则就是计算结果,结束 // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] - results - .get(0) - .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); - results - .get(2) - .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + results.get(0).updateResultUsingValues(new long[]{candidateTimestamp}, 1, + new Object[]{candidateValue}); + results.get(2).updateResultUsingValues(new long[]{candidateTimestamp}, 1, + new Object[]{candidateValue}); return; } } } } - private void calculateLastPoint( - List<ChunkSuit4CPV> currentChunkList, - long startTime, - long endTime, - long interval, - long curStartTime) - throws IOException { + private void calculateLastPoint(List<ChunkSuit4CPV> currentChunkList, long startTime, + long endTime, long interval, long curStartTime) throws IOException { while (true) { // 循环1 // 按照startTime和version排序,找出疑似LP candidate currentChunkList.sort( new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for different // aggregations public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - int res = - ((Comparable) (o2.getChunkMetadata().getEndTime())) - .compareTo(o1.getChunkMetadata().getEndTime()); + int res = ((Comparable) (o2.getChunkMetadata().getEndTime())).compareTo( + o1.getChunkMetadata().getEndTime()); if (res != 0) { return res; } else { - return new MergeReaderPriority( - o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()) - .compareTo( - new MergeReaderPriority( - o1.getChunkMetadata().getVersion(), - o1.getChunkMetadata().getOffsetOfChunkHeader())); + return new MergeReaderPriority(o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()).compareTo( + new MergeReaderPriority(o1.getChunkMetadata().getVersion(), + o1.getChunkMetadata().getOffsetOfChunkHeader())); } } }); @@ -815,26 +710,20 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { if (susp_candidate.isLazyLoad()) { // 如果是lazy // load,则此时load、应用deletes、更新batchData和statistics,取消lazyLoad标记,然后回到循环1 currentChunkList.remove(susp_candidate); // TODO check this - List<IPageReader> pageReaderList = - FileLoaderUtils.loadPageReaderList(susp_candidate.getChunkMetadata(), this.timeFilter); + List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( + susp_candidate.getChunkMetadata(), this.timeFilter); for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk - ((PageReader) pageReader) - .split4CPV( - startTime, - endTime, - interval, - curStartTime, - currentChunkList, - null, - susp_candidate.getChunkMetadata()); // 新增的ChunkSuit4CPV默认isLazyLoad=false + ((PageReader) pageReader).split4CPV(startTime, endTime, interval, curStartTime, + currentChunkList, null, + susp_candidate.getChunkMetadata()); // 新增的ChunkSuit4CPV默认isLazyLoad=false } continue; // 回到循环1 } else { // 如果不是lazy load,则该疑似candidate就是真正的candidate。 // 于是verification判断该点是否被更高优先级(更高优先级这一点在QueryUtils.modifyChunkMetaData(chunkMetadataList, // pathModifications)已做好)的deletes覆盖 long candidateTimestamp = susp_candidate.getChunkMetadata().getEndTime(); // TODO check - Object candidateValue = - susp_candidate.getChunkMetadata().getStatistics().getLastValue(); // TODO check + Object candidateValue = susp_candidate.getChunkMetadata().getStatistics() + .getLastValue(); // TODO check boolean isDeletedItself = false; long deleteStartTime = Long.MAX_VALUE; // TODO check @@ -842,11 +731,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { for (TimeRange timeRange : susp_candidate.getChunkMetadata().getDeleteIntervalList()) { if (timeRange.contains(candidateTimestamp)) { isDeletedItself = true; - deleteStartTime = - Math.min( - deleteStartTime, - timeRange - .getMin()); // deleteStartTime不会小于chunkStartTime,因为否则的话这个chunk就会modifyChunkMetaData步骤里被处理掉整个删掉 + deleteStartTime = Math.min(deleteStartTime, + timeRange.getMin()); // deleteStartTime不会小于chunkStartTime,因为否则的话这个chunk就会modifyChunkMetaData步骤里被处理掉整个删掉 // TODO check } } @@ -854,23 +740,17 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // 如果被删除,标记该点所在chunk为lazy load,并且在不load数据的情况下更新chunkEndTime,然后回到循环1 if (isDeletedItself) { susp_candidate.setLazyLoad(true); - susp_candidate - .getChunkMetadata() - .getStatistics() + susp_candidate.getChunkMetadata().getStatistics() .setEndTime(deleteStartTime); // TODO check continue; // 回到循环1 } else { // 否则,则就是计算结果,结束 // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] - results - .get(1) - .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); - results - .get(3) - .updateResultUsingValues( - new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + results.get(1).updateResultUsingValues(new long[]{candidateTimestamp}, 1, + new Object[]{candidateValue}); + results.get(3).updateResultUsingValues(new long[]{candidateTimestamp}, 1, + new Object[]{candidateValue}); return; } } diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java index b833b01e8f..d5489700cc 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java @@ -246,7 +246,7 @@ public class TsFileSketchTool { private static Pair<String, String> checkArgs(String[] args) { String filename = - "/home/kyy/Documents/kdd/iotdb-server-0.12.4/data/data/sequence/root.kobelco/0/0/1643270757735-1-0-0.tsfile"; + "D:\\plain-plain-noindex.tsfile"; String outFile = "TsFile_sketch_view.txt"; if (args.length == 1) { filename = args[0]; diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java index a6a11054a5..0768fb3339 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java @@ -25,6 +25,10 @@ import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; +import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; +import org.apache.iotdb.tsfile.read.reader.page.PageReader; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -45,7 +49,8 @@ public class MyTest1 { private static String[] creationSqls = new String[] { "SET STORAGE GROUP TO root.vehicle.d0", - "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE", + "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT64, ENCODING=PLAIN", + // iotdb的int类型的plain编码用的是自制的不支持random access,所以值类型用long }; private final String d0s0 = "root.vehicle.d0.s0"; @@ -79,7 +84,7 @@ public class MyTest1 { } @Test - public void test1() { + public void test1() throws Exception { prepareData1(); String[] res = diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java index 08db1d10cc..a679adb012 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java @@ -86,7 +86,7 @@ public class TSFileConfig implements Serializable { * Encoder of time column, TsFile supports TS_2DIFF, PLAIN and RLE(run-length encoding) Default * value is TS_2DIFF. */ - private String timeEncoding = "TS_2DIFF"; + private String timeEncoding = "PLAIN"; //"TS_2DIFF"; /** * Encoder of value series. default value is PLAIN. For int, long data type, TsFile also supports * TS_2DIFF, REGULAR, GORILLA and RLE(run-length encoding). For float, double data type, TsFile diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java index 039ca9f3e4..f19e7d14e8 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java @@ -19,6 +19,10 @@ package org.apache.iotdb.tsfile.file.metadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.controller.IChunkLoader; @@ -91,6 +95,31 @@ public class ChunkMetadata { this.statistics = statistics; } +// // deep copy +// public ChunkMetadata(ChunkMetadata chunkMetadata) { +// this.measurementUid = chunkMetadata.measurementUid; +// this.tsDataType = chunkMetadata.tsDataType; +// this.offsetOfChunkHeader = chunkMetadata.offsetOfChunkHeader; +// this.statistics = null; // this needs deep copy because we will modify it in different M4 spans +// switch (tsDataType) { +// case INT32: +// statistics = new IntegerStatistics(); +// break; +// case INT64: +// statistics = new LongStatistics(); +// break; +// case FLOAT: +// statistics = new FloatStatistics(); +// break; +// case DOUBLE: +// statistics = new DoubleStatistics(); +// break; +// default: +// break; +// } +// this.version = chunkMetadata.version; +// } + @Override public String toString() { return String.format( diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java index 48b7722483..a9d1faf191 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/DoubleStatistics.java @@ -233,6 +233,22 @@ public class DoubleStatistics extends Statistics<Double> { throw new StatisticsClassException("Double statistics does not support: long sum"); } + public void setFirstValue(double value) { + this.firstValue = value; + } + + public void setLastValue(double value) { + this.lastValue = value; + } + + public void setMinInfo(long timestamp, double value) { + this.minInfo.reset(value, timestamp); + } + + public void setMaxInfo(long timestamp, double value) { + this.maxInfo.reset(value, timestamp); + } + /** @author Yuyuan Kang */ @Override protected void mergeStatisticsValue(Statistics stats) { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java index d084152298..335d4b0dee 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/FloatStatistics.java @@ -235,6 +235,22 @@ public class FloatStatistics extends Statistics<Float> { throw new StatisticsClassException("Float statistics does not support: long sum"); } + public void setFirstValue(float value) { + this.firstValue = value; + } + + public void setLastValue(float value) { + this.lastValue = value; + } + + public void setMinInfo(long timestamp, float value) { + this.minInfo.reset(value, timestamp); + } + + public void setMaxInfo(long timestamp, float value) { + this.maxInfo.reset(value, timestamp); + } + /** @author Yuyuan Kang */ @Override protected void mergeStatisticsValue(Statistics stats) { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java index 6ee9cbfd35..670ee5d604 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java @@ -18,26 +18,31 @@ */ package org.apache.iotdb.tsfile.file.metadata.statistics; -import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; +import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -/** Statistics for int type. */ +/** + * Statistics for int type. + */ public class IntegerStatistics extends Statistics<Integer> { - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ private MinMaxInfo<Integer> minInfo; private MinMaxInfo<Integer> maxInfo; private int firstValue; private int lastValue; private long sumValue; - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ private final TSDataType minMaxDataType = TSDataType.MIN_MAX_INT32; static final int INTEGER_STATISTICS_FIXED_RAM_SIZE = 64; @@ -47,13 +52,17 @@ public class IntegerStatistics extends Statistics<Integer> { return TSDataType.INT32; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public IntegerStatistics() { minInfo = new MinMaxInfo<>(Integer.MAX_VALUE, -1); maxInfo = new MinMaxInfo<>(Integer.MIN_VALUE, -1); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public int getStatsSize() { int len = 0; @@ -67,7 +76,9 @@ public class IntegerStatistics extends Statistics<Integer> { return len; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public void initializeStats( MinMaxInfo<Integer> minInfo, MinMaxInfo<Integer> maxInfo, @@ -81,7 +92,9 @@ public class IntegerStatistics extends Statistics<Integer> { this.sumValue += sum; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ public void initializeStats( int min, long bottomTimestamp, @@ -97,7 +110,9 @@ public class IntegerStatistics extends Statistics<Integer> { this.sumValue += sum; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ private void updateStats( int minValue, long bottomTimestamp, @@ -111,7 +126,9 @@ public class IntegerStatistics extends Statistics<Integer> { this.lastValue = lastValue; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ private void updateStats( MinMaxInfo<Integer> minInfo, MinMaxInfo<Integer> maxInfo, @@ -140,7 +157,9 @@ public class IntegerStatistics extends Statistics<Integer> { // maxValue = BytesUtils.bytesToInt(maxBytes); // } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override void updateStats(int value, long timestamp) { if (isEmpty) { @@ -151,7 +170,9 @@ public class IntegerStatistics extends Statistics<Integer> { } } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override void updateStats(int[] values, long[] timestamps, int batchSize) { for (int i = 0; i < batchSize; i++) { @@ -164,37 +185,49 @@ public class IntegerStatistics extends Statistics<Integer> { return INTEGER_STATISTICS_FIXED_RAM_SIZE; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public MinMaxInfo<Integer> getMinInfo() { return this.minInfo; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public MinMaxInfo<Integer> getMaxInfo() { return this.maxInfo; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public Integer getMinValue() { return this.minInfo.val; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public Integer getMaxValue() { return this.maxInfo.val; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public long getBottomTimestamp() { return this.minInfo.timestamp; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public long getTopTimestamp() { return this.maxInfo.timestamp; @@ -205,6 +238,22 @@ public class IntegerStatistics extends Statistics<Integer> { return firstValue; } + public void setFirstValue(int value) { + this.firstValue = value; + } + + public void setLastValue(int value) { + this.lastValue = value; + } + + public void setMinInfo(long timestamp, int value) { + this.minInfo.reset(value, timestamp); + } + + public void setMaxInfo(long timestamp, int value) { + this.maxInfo.reset(value, timestamp); + } + @Override public Integer getLastValue() { return lastValue; @@ -220,7 +269,9 @@ public class IntegerStatistics extends Statistics<Integer> { return sumValue; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override protected void mergeStatisticsValue(Statistics stats) { IntegerStatistics intStats = (IntegerStatistics) stats; @@ -244,7 +295,9 @@ public class IntegerStatistics extends Statistics<Integer> { } } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public void updateMinInfo(Integer val, long timestamp) { if (val < this.minInfo.val) { @@ -252,7 +305,9 @@ public class IntegerStatistics extends Statistics<Integer> { } } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public void updateMaxInfo(Integer val, long timestamp) { if (val > this.maxInfo.val) { @@ -260,7 +315,9 @@ public class IntegerStatistics extends Statistics<Integer> { } } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public int serializeStats(OutputStream outputStream) throws IOException { int byteLen = 0; @@ -272,7 +329,9 @@ public class IntegerStatistics extends Statistics<Integer> { return byteLen; } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public void deserialize(InputStream inputStream) throws IOException { this.minInfo = ReadWriteIOUtils.readMinMaxInfo(inputStream, minMaxDataType); @@ -282,7 +341,9 @@ public class IntegerStatistics extends Statistics<Integer> { this.sumValue = ReadWriteIOUtils.readLong(inputStream); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public void deserialize(ByteBuffer byteBuffer) { this.minInfo = ReadWriteIOUtils.readMinMaxInfo(byteBuffer, minMaxDataType); @@ -292,7 +353,9 @@ public class IntegerStatistics extends Statistics<Integer> { this.sumValue = ReadWriteIOUtils.readLong(byteBuffer); } - /** @author Yuyuan Kang */ + /** + * @author Yuyuan Kang + */ @Override public String toString() { return super.toString() diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java index e21949f4d1..a9e2f4608c 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java @@ -175,6 +175,22 @@ public class LongStatistics extends Statistics<Long> { return firstValue; } + public void setFirstValue(long value) { + this.firstValue = value; + } + + public void setLastValue(long value) { + this.lastValue = value; + } + + public void setMinInfo(long timestamp, long value) { + this.minInfo.reset(value, timestamp); + } + + public void setMaxInfo(long timestamp, long value) { + this.maxInfo.reset(value, timestamp); + } + @Override public Long getLastValue() { return lastValue; diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java index 8991e5816d..70747f2d69 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java @@ -501,6 +501,7 @@ public abstract class Statistics<T> { } segmentKeys.add(this.endTime); this.stepRegress.setSegmentKeys(segmentKeys); + this.stepRegress.inferInterceptsFromSegmentKeys(); // don't forget this, execute once and only once when deserializing } public long getStartTime() { @@ -511,7 +512,10 @@ public abstract class Statistics<T> { return endTime; } - public long getCount() { + public StepRegress getStepRegress(){ + return stepRegress; + } + public int getCount() { return count; } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java index 4d70c4b795..55cd2d0c06 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java @@ -18,13 +18,12 @@ */ package org.apache.iotdb.tsfile.file.metadata.statistics; +import java.io.IOException; +import java.util.Arrays; import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList; import org.eclipse.collections.impl.list.mutable.primitive.IntArrayList; import org.eclipse.collections.impl.list.mutable.primitive.LongArrayList; -import java.io.IOException; -import java.util.Arrays; - public class StepRegress { private double slope; @@ -90,7 +89,10 @@ public class StepRegress { this.segmentIntercepts.add(1 - slope * timestamps.get(0)); // b1 } - /** learn the parameters of the step regression function for the loaded data. */ + /** + * learn the parameters of the step regression function for the loaded data. Executed once and + * only once when serializing. + */ public void learn() throws IOException { initForLearn(); @@ -125,9 +127,9 @@ public class StepRegress { long nextDelta = intervals.get(i + 1); if (isBigInterval(nextDelta) && (nextPos + 1 - < slope * timestamps.get(i + 2) - + segmentIntercepts.get( - tiltLatestSegmentID))) { // when next interval is also level + < slope * timestamps.get(i + 2) + + segmentIntercepts.get( + tiltLatestSegmentID))) { // when next interval is also level isLevel = true; // then fix type from tilt to level, LTL=>LLL } } @@ -356,11 +358,9 @@ public class StepRegress { /** * infer m-1 intercepts b1,b2,...,bm-1 given the slope and m segmentKeys t1,t2,...,tm (tm is not - * used) + * used) Executed once and only once when deserializing. */ - public static DoubleArrayList inferInterceptsFromSegmentKeys( - double slope, DoubleArrayList segmentKeys) { - DoubleArrayList segmentIntercepts = new DoubleArrayList(); + public void inferInterceptsFromSegmentKeys() { segmentIntercepts.add(1 - slope * segmentKeys.get(0)); // b1=1-K*t1 for (int i = 1; i < segmentKeys.size() - 1; i++) { // b2,b3,...,bm-1 if (i % 2 == 0) { // b2i+1=b2i-1-K*(t2i+1-t2i) @@ -371,12 +371,12 @@ public class StepRegress { segmentIntercepts.add(slope * segmentKeys.get(i) + b); } } - return segmentIntercepts; } /** - * @param t input - * @return output the value of the step regression function f(t) + * @param t input timestamp + * @return output the value of the step regression function f(t), which is the estimated position + * in the chunk. Pay attention that f(t) starts from (startTime,1), ends at (endTime,count). */ public double infer(double t) throws IOException { if (t < segmentKeys.get(0) || t > segmentKeys.getLast()) { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java index ba20b96613..26d97a60be 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java @@ -19,30 +19,149 @@ package org.apache.iotdb.tsfile.read.common; +import java.io.IOException; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; - -import java.util.ArrayList; -import java.util.List; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; +import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.StepRegress; +import org.apache.iotdb.tsfile.read.reader.page.PageReader; public class ChunkSuit4CPV { - private ChunkMetadata - chunkMetadata; // this.version = new MergeReaderPriority(chunkMetadata.getVersion(), - // chunkMetadata.getOffsetOfChunkHeader()); - private BatchData batchData; - private List<Long> mergeVersionList = new ArrayList<>(); - private List<Long> mergeOffsetList = new ArrayList<>(); + private ChunkMetadata chunkMetadata; // fixed info, including version and stepRegress + + public Statistics statistics; // includes FP/LP/BP/TP info, may be updated + +// public long startTime; // statistics in chunkMetadata is not deepCopied, so store update here +// +// public long endTime; + +// public int firstValueInt; +// public long firstValueLong; +// public float firstValueFloat; +// public double firstValueDouble; +// +// public int lastValueInt; +// public long lastValueLong; +// public float lastValueFloat; +// public double lastValueDouble; +// +// public MinMaxInfo<Integer> minInfoInt; +// public MinMaxInfo<Long> minInfoLong; +// public MinMaxInfo<Float> minInfoFloat; +// public MinMaxInfo<Double> minInfoDouble; +// +// public MinMaxInfo<Integer> maxInfoInt; +// public MinMaxInfo<Long> maxInfoLong; +// public MinMaxInfo<Float> maxInfoFloat; +// public MinMaxInfo<Double> maxInfoDouble; + +// private BatchData batchData; // deprecated + + private PageReader pageReader; // bears plain timeBuffer and valueBuffer + + // private List<Long> mergeVersionList = new ArrayList<>(); +// private List<Long> mergeOffsetList = new ArrayList<>(); private boolean isLazyLoad = false; public ChunkSuit4CPV(ChunkMetadata chunkMetadata) { + this(chunkMetadata, false); + } + + public ChunkSuit4CPV(ChunkMetadata chunkMetadata, boolean deepCopy) { this.chunkMetadata = chunkMetadata; - this.batchData = null; - this.isLazyLoad = false; + // deep copy initialize + if (deepCopy) { + deepCopyInitialize(chunkMetadata.getStatistics(), chunkMetadata.getDataType()); + } else { + statistics = chunkMetadata.getStatistics(); + } } - public ChunkSuit4CPV(ChunkMetadata chunkMetadata, BatchData batchData) { +// public ChunkSuit4CPV(ChunkMetadata chunkMetadata, BatchData batchData) { +// this.chunkMetadata = chunkMetadata; +// this.batchData = batchData; +// // deep copy initialize +// deepCopyInitialize(chunkMetadata.getStatistics(), chunkMetadata.getDataType()); +// } + + public ChunkSuit4CPV(ChunkMetadata chunkMetadata, PageReader pageReader, boolean deepCopy) { this.chunkMetadata = chunkMetadata; - this.batchData = batchData; + this.pageReader = pageReader; + // deep copy initialize + if (deepCopy) { + deepCopyInitialize(chunkMetadata.getStatistics(), chunkMetadata.getDataType()); + } else { + statistics = chunkMetadata.getStatistics(); + } + } + + public Statistics getStatistics() { + return statistics; + } + + public void deepCopyInitialize(Statistics source, TSDataType type) { + // deep copy initialize + switch (type) { + case INT32: + statistics = new IntegerStatistics(); + ((IntegerStatistics) statistics).initializeStats( + (int) source.getMinInfo().val, + source.getMinInfo().timestamp, + (int) source.getMaxInfo().val, + source.getMaxInfo().timestamp, + (int) source.getFirstValue(), + (int) source.getLastValue(), + source.getSumLongValue() + ); + break; + case INT64: + statistics = new LongStatistics(); + ((LongStatistics) statistics).initializeStats( + (long) source.getMinInfo().val, + source.getMinInfo().timestamp, + (long) source.getMaxInfo().val, + source.getMaxInfo().timestamp, + (long) source.getFirstValue(), + (long) source.getLastValue(), + source.getSumDoubleValue() + ); + break; + case FLOAT: + statistics = new FloatStatistics(); + ((FloatStatistics) statistics).initializeStats( + (float) source.getMinInfo().val, + source.getMinInfo().timestamp, + (float) source.getMaxInfo().val, + source.getMaxInfo().timestamp, + (float) source.getFirstValue(), + (float) source.getLastValue(), + source.getSumDoubleValue() + ); + break; + case DOUBLE: + statistics = new DoubleStatistics(); + ((DoubleStatistics) statistics).initializeStats( + (double) source.getMinInfo().val, + source.getMinInfo().timestamp, + (double) source.getMaxInfo().val, + source.getMaxInfo().timestamp, + (double) source.getFirstValue(), + (double) source.getLastValue(), + source.getSumDoubleValue() + ); + break; + default: + break; + } + statistics.setStartTime(source.getStartTime()); + statistics.setEndTime(source.getEndTime()); + statistics.setCount(source.getCount()); } public void setLazyLoad(boolean lazyLoad) { @@ -57,39 +176,233 @@ public class ChunkSuit4CPV { return chunkMetadata; } - public BatchData getBatchData() { - return batchData; +// public BatchData getBatchData() { +// return batchData; +// } + + public PageReader getPageReader() { + return pageReader; } - public void setBatchData(BatchData batchData) { - this.batchData = batchData; +// public void setBatchData(BatchData batchData) { +// this.batchData = batchData; +// } + + public void setPageReader(PageReader pageReader) { + this.pageReader = pageReader; } public void setChunkMetadata(ChunkMetadata chunkMetadata) { this.chunkMetadata = chunkMetadata; } - public void addMergeVersionList(long version) { - this.mergeVersionList.add(version); +// public void addMergeVersionList(long version) { +// this.mergeVersionList.add(version); +// } +// +// public void addMergeOffsetList(long offset) { +// this.mergeOffsetList.add(offset); +// } +// +// public List<Long> getMergeVersionList() { +// return mergeVersionList; +// } +// +// public List<Long> getMergeOffsetList() { +// return mergeOffsetList; +// } + + public long getVersion() { + return this.getChunkMetadata().getVersion(); } - public void addMergeOffsetList(long offset) { - this.mergeOffsetList.add(offset); + public long getOffset() { + return this.getChunkMetadata().getOffsetOfChunkHeader(); } - public List<Long> getMergeVersionList() { - return mergeVersionList; + + /** + * Find the point with the closet timestamp equal to or larger than the given timestamp in the + * chunk. + * + * @param targetTimestamp must be within the chunk time range [startTime, endTime] + * @return the point with value and timestamp + */ + public MinMaxInfo findTheClosetPointEqualOrAfter(long targetTimestamp) throws IOException { + StepRegress stepRegress = chunkMetadata.getStatistics().getStepRegress(); + // infer position starts from 1, so minus 1 here + // TODO debug buffer.get(index) + int estimatedPos = (int) Math.round(stepRegress.infer(targetTimestamp)) - 1; + + // search from estimatePos in the timeBuffer to find the closet timestamp equal to or larger than the given timestamp + if (pageReader.timeBuffer.getLong(estimatedPos * 8) < targetTimestamp) { + while (pageReader.timeBuffer.getLong(estimatedPos * 8) < targetTimestamp) { + estimatedPos++; + } + } else if (pageReader.timeBuffer.getLong(estimatedPos * 8) > targetTimestamp) { + while (pageReader.timeBuffer.getLong(estimatedPos * 8) > targetTimestamp) { + estimatedPos--; + } + if (pageReader.timeBuffer.getLong(estimatedPos * 8) < targetTimestamp) { + estimatedPos++; + } // else equal + } // else equal + + // since we have constrained that targetTimestamp must be within the chunk time range [startTime, endTime], + // we can definitely find such a point with the closet timestamp equal to or larger than the + // given timestamp in the chunk. + switch (chunkMetadata.getDataType()) { + // iotdb的int类型的plain编码用的是自制的不支持random access +// case INT32: +// return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), +// pageReader.timeBuffer.getLong(estimatedPos * 8)); + case INT64: + return new MinMaxInfo( + pageReader.valueBuffer.getLong(pageReader.timeBufferLength + estimatedPos * 8), + pageReader.timeBuffer.getLong(estimatedPos * 8)); + case FLOAT: + return new MinMaxInfo( + pageReader.valueBuffer.getFloat(pageReader.timeBufferLength + estimatedPos * 4), + pageReader.timeBuffer.getLong(estimatedPos * 8)); + case DOUBLE: + return new MinMaxInfo( + pageReader.valueBuffer.getDouble(pageReader.timeBufferLength + estimatedPos * 8), + pageReader.timeBuffer.getLong(estimatedPos * 8)); + default: + throw new IOException("Unsupported data type!"); + } } - public List<Long> getMergeOffsetList() { - return mergeOffsetList; + /** + * Find the point with the closet timestamp equal to or smaller than the given timestamp in the + * chunk. + * + * @param targetTimestamp must be within the chunk time range [startTime, endTime] + * @return the point with value and timestamp + */ + public MinMaxInfo findTheClosetPointEqualOrBefore(long targetTimestamp) throws IOException { + StepRegress stepRegress = chunkMetadata.getStatistics().getStepRegress(); + // infer position starts from 1, so minus 1 here + int estimatedPos = (int) Math.round(stepRegress.infer(targetTimestamp)) - 1; + + // search from estimatePos in the timeBuffer to find the closet timestamp equal to or smaller than the given timestamp + if (pageReader.timeBuffer.getLong(estimatedPos * 8) > targetTimestamp) { + while (pageReader.timeBuffer.getLong(estimatedPos * 8) > targetTimestamp) { + estimatedPos--; + } + } else if (pageReader.timeBuffer.getLong(estimatedPos * 8) < targetTimestamp) { + while (pageReader.timeBuffer.getLong(estimatedPos * 8) < targetTimestamp) { + estimatedPos++; + } + if (pageReader.timeBuffer.getLong(estimatedPos * 8) > targetTimestamp) { + estimatedPos--; + } // else equal + } // else equal + + // since we have constrained that targetTimestamp must be within the chunk time range [startTime, endTime], + // we can definitely find such a point with the closet timestamp equal to or smaller than the + // given timestamp in the chunk. + switch (chunkMetadata.getDataType()) { + // iotdb的int类型的plain编码用的是自制的不支持random access +// case INT32: +// return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), +// pageReader.timeBuffer.getLong(estimatedPos * 8)); + case INT64: + return new MinMaxInfo( + pageReader.valueBuffer.getLong(pageReader.timeBufferLength + estimatedPos * 8), + pageReader.timeBuffer.getLong(estimatedPos * 8)); + case FLOAT: + return new MinMaxInfo( + pageReader.valueBuffer.getFloat(pageReader.timeBufferLength + estimatedPos * 4), + pageReader.timeBuffer.getLong(estimatedPos * 8)); + case DOUBLE: + return new MinMaxInfo( + pageReader.valueBuffer.getDouble(pageReader.timeBufferLength + estimatedPos * 8), + pageReader.timeBuffer.getLong(estimatedPos * 8)); + default: + throw new IOException("Unsupported data type!"); + } } - public long getVersion() { - return this.getChunkMetadata().getVersion(); + /** + * Check if there exists the point at the target timestamp in the chunk. + * + * @param targetTimestamp must be within the chunk time range [startTime, endTime] + * @return true if exists; false not exist + */ + public boolean checkIfExist(long targetTimestamp) throws IOException { + StepRegress stepRegress = chunkMetadata.getStatistics().getStepRegress(); + // infer position starts from 1, so minus 1 here + // TODO debug buffer.get(index) + int estimatedPos = (int) Math.round(stepRegress.infer(targetTimestamp)) - 1; + + // search from estimatePos in the timeBuffer to find the closet timestamp equal to or smaller than the given timestamp + if (pageReader.timeBuffer.getLong(estimatedPos * 8) > targetTimestamp) { + while (pageReader.timeBuffer.getLong(estimatedPos * 8) > targetTimestamp) { + estimatedPos--; + } + } else if (pageReader.timeBuffer.getLong(estimatedPos * 8) < targetTimestamp) { + while (pageReader.timeBuffer.getLong(estimatedPos * 8) < targetTimestamp) { + estimatedPos++; + } + if (pageReader.timeBuffer.getLong(estimatedPos * 8) > targetTimestamp) { + estimatedPos--; + } // else equal + } // else equal + + // since we have constrained that targetTimestamp must be within the chunk time range [startTime, endTime], + // estimatedPos will not be out of range. + return pageReader.timeBuffer.get(estimatedPos) == targetTimestamp; } - public long getOffset() { - return this.getChunkMetadata().getOffsetOfChunkHeader(); + public void updateFP(MinMaxInfo point) { + long timestamp = point.timestamp; + Object val = point.val; + switch (chunkMetadata.getDataType()) { + case INT32: + statistics.setStartTime(timestamp); + ((IntegerStatistics) statistics).setFirstValue((int) val); + break; + case INT64: + statistics.setStartTime(timestamp); + ((LongStatistics) statistics).setFirstValue((long) val); + break; + case FLOAT: + statistics.setStartTime(timestamp); + ((FloatStatistics) statistics).setFirstValue((float) val); + break; + case DOUBLE: + statistics.setStartTime(timestamp); + ((DoubleStatistics) statistics).setFirstValue((double) val); + break; + default: + break; + } + } + + public void updateLP(MinMaxInfo point) { + long timestamp = point.timestamp; + Object val = point.val; + switch (chunkMetadata.getDataType()) { + case INT32: + statistics.setEndTime(timestamp); + ((IntegerStatistics) statistics).setLastValue((int) val); + break; + case INT64: + statistics.setEndTime(timestamp); + ((LongStatistics) statistics).setLastValue((long) val); + break; + case FLOAT: + statistics.setEndTime(timestamp); + ((FloatStatistics) statistics).setLastValue((float) val); + break; + case DOUBLE: + statistics.setEndTime(timestamp); + ((DoubleStatistics) statistics).setLastValue((double) val); + break; + default: + break; + } } + } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java index a0c1fef97e..218aae9167 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java @@ -18,15 +18,17 @@ */ package org.apache.iotdb.tsfile.read.reader.page; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import org.apache.iotdb.tsfile.encoding.decoder.Decoder; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.header.PageHeader; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; -import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics; -import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics; -import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; +import org.apache.iotdb.tsfile.file.metadata.statistics.MinMaxInfo; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.BatchData; import org.apache.iotdb.tsfile.read.common.BatchDataFactory; @@ -38,54 +40,50 @@ import org.apache.iotdb.tsfile.read.reader.IPageReader; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - public class PageReader implements IPageReader { private PageHeader pageHeader; protected TSDataType dataType; - /** decoder for value column */ + /** + * decoder for value column + */ protected Decoder valueDecoder; - /** decoder for time column */ + /** + * decoder for time column + */ protected Decoder timeDecoder; - /** time column in memory */ - protected ByteBuffer timeBuffer; + /** + * time column in memory + */ + public ByteBuffer timeBuffer; + + /** + * value column in memory + */ + public ByteBuffer valueBuffer; - /** value column in memory */ - protected ByteBuffer valueBuffer; + public int timeBufferLength; protected Filter filter; - /** A list of deleted intervals. */ + /** + * A list of deleted intervals. + */ private List<TimeRange> deleteIntervalList; private int deleteCursor = 0; - public PageReader( - ByteBuffer pageData, - TSDataType dataType, - Decoder valueDecoder, - Decoder timeDecoder, - Filter filter) { + public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder, + Decoder timeDecoder, Filter filter) { this(null, pageData, dataType, valueDecoder, timeDecoder, filter); } - public PageReader( - PageHeader pageHeader, - ByteBuffer pageData, - TSDataType dataType, - Decoder valueDecoder, - Decoder timeDecoder, - Filter filter) { + public PageReader(PageHeader pageHeader, ByteBuffer pageData, TSDataType dataType, + Decoder valueDecoder, Decoder timeDecoder, Filter filter) { this.dataType = dataType; this.valueDecoder = valueDecoder; this.timeDecoder = timeDecoder; @@ -100,7 +98,7 @@ public class PageReader implements IPageReader { * @param pageData uncompressed bytes size of time column, time column, value column */ private void splitDataToTimeStampAndValue(ByteBuffer pageData) { - int timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(pageData); + timeBufferLength = ReadWriteForEncodingUtils.readUnsignedVarInt(pageData); timeBuffer = pageData.slice(); timeBuffer.limit(timeBufferLength); @@ -109,144 +107,176 @@ public class PageReader implements IPageReader { valueBuffer.position(timeBufferLength); } - public void split4CPV( - long startTime, - long endTime, - long interval, - long curStartTime, - List<ChunkSuit4CPV> currentChunkList, - Map<Integer, List<ChunkSuit4CPV>> splitChunkList, + /** + * the chunk partially overlaps in time with the current M4 interval Ii + */ + public void split4CPV(long startTime, long endTime, long interval, long curStartTime, + List<ChunkSuit4CPV> currentChunkList, Map<Integer, List<ChunkSuit4CPV>> splitChunkList, ChunkMetadata chunkMetadata) throws IOException { // note: [startTime,endTime), [curStartTime,curEndTime) - Map<Integer, BatchData> splitBatchDataMap = new HashMap<>(); - Map<Integer, ChunkMetadata> splitChunkMetadataMap = new HashMap<>(); - while (timeDecoder.hasNext(timeBuffer)) { - long timestamp = timeDecoder.readLong(timeBuffer); - // prepare corresponding batchData - if (timestamp < curStartTime) { - switch (dataType) { - case INT32: - valueDecoder.readInt(valueBuffer); - break; - case INT64: - valueDecoder.readLong(valueBuffer); - break; - case FLOAT: - valueDecoder.readFloat(valueBuffer); - break; - case DOUBLE: - valueDecoder.readDouble(valueBuffer); - break; - default: - throw new UnSupportedDataTypeException(String.valueOf(dataType)); - } - continue; - } - if (timestamp >= endTime) { - break; - } - int idx = (int) Math.floor((timestamp - startTime) * 1.0 / interval); - if (!splitBatchDataMap.containsKey(idx)) { - // create batchData - BatchData batch1 = BatchDataFactory.createBatchData(dataType, true, false); - splitBatchDataMap.put(idx, batch1); - Statistics statistics = null; - switch (dataType) { - case INT32: - statistics = new IntegerStatistics(); - break; - case INT64: - statistics = new LongStatistics(); - break; - case FLOAT: - statistics = new FloatStatistics(); - break; - case DOUBLE: - statistics = new DoubleStatistics(); - break; - default: - break; - } - // create chunkMetaData - ChunkMetadata chunkMetadata1 = - new ChunkMetadata( - chunkMetadata.getMeasurementUid(), - chunkMetadata.getDataType(), - chunkMetadata.getOffsetOfChunkHeader(), - statistics); - chunkMetadata1.setVersion(chunkMetadata.getVersion()); // don't miss this - - // // important, used later for candidate point verification - // // (1) candidate point itself whether is in the deleted interval - // // (2) candidate point whether is overlapped by a chunk with a larger version - // number and - // // the chunk does not have a deleted interval overlapping this candidate point - // chunkMetadata1.setDeleteIntervalList(chunkMetadata.getDeleteIntervalList()); - // // not use current Ii to modify deletedIntervalList any more - - splitChunkMetadataMap.put(idx, chunkMetadata1); +// int curIdx = (int) Math.floor((curStartTime - startTime) * 1.0 / interval); // global index + int numberOfSpans = (int) Math.floor( + (Math.min(chunkMetadata.getEndTime(), endTime - 1) // endTime is excluded so -1 + - curStartTime) * 1.0 / interval) + 1; + for (int n = 0; n < numberOfSpans; n++) { + long leftEndIncluded = curStartTime + n * interval; + long rightEndExcluded = curStartTime + (n + 1) * interval; + ChunkSuit4CPV chunkSuit4CPV = new ChunkSuit4CPV(chunkMetadata, this, true); + // TODO update FP,LP with the help of stepRegress index. BP/TP not update here. + MinMaxInfo FP = null; // new FP + MinMaxInfo LP = null; // new LP + if (leftEndIncluded > chunkSuit4CPV.statistics.getStartTime()) { + FP = chunkSuit4CPV.findTheClosetPointEqualOrAfter(leftEndIncluded); + chunkSuit4CPV.updateFP(FP); } - BatchData batchData1 = splitBatchDataMap.get(idx); - ChunkMetadata chunkMetadata1 = splitChunkMetadataMap.get(idx); - switch (dataType) { - case INT32: - int anInt = valueDecoder.readInt(valueBuffer); - if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, anInt))) { - // update batchData1 - batchData1.putInt(timestamp, anInt); - // update statistics of chunkMetadata1 - chunkMetadata1.getStatistics().update(timestamp, anInt); - } - break; - case INT64: - long aLong = valueDecoder.readLong(valueBuffer); - if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aLong))) { - // update batchData1 - batchData1.putLong(timestamp, aLong); - // update statistics of chunkMetadata1 - chunkMetadata1.getStatistics().update(timestamp, aLong); - } - break; - case FLOAT: - float aFloat = valueDecoder.readFloat(valueBuffer); - if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aFloat))) { - // update batchData1 - batchData1.putFloat(timestamp, aFloat); - // update statistics of chunkMetadata1 - chunkMetadata1.getStatistics().update(timestamp, aFloat); - } - break; - case DOUBLE: - double aDouble = valueDecoder.readDouble(valueBuffer); - if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aDouble))) { - // update batchData1 - batchData1.putDouble(timestamp, aDouble); - // update statistics of chunkMetadata1 - chunkMetadata1.getStatistics().update(timestamp, aDouble); - } - break; - default: - throw new UnSupportedDataTypeException(String.valueOf(dataType)); + if (rightEndExcluded <= chunkSuit4CPV.statistics.getEndTime()) { + // -1 is because right end is excluded end + LP = chunkSuit4CPV.findTheClosetPointEqualOrBefore(rightEndExcluded - 1); + chunkSuit4CPV.updateLP(LP); } - } - - int curIdx = (int) Math.floor((curStartTime - startTime) * 1.0 / interval); - for (Integer i : splitBatchDataMap.keySet()) { - if (!splitBatchDataMap.get(i).isEmpty()) { - if (i == curIdx) { - currentChunkList.add( - new ChunkSuit4CPV(splitChunkMetadataMap.get(i), splitBatchDataMap.get(i).flip())); + if (FP != null && LP != null && FP.timestamp > LP.timestamp) { + // the chunk has no point in this span, do nothing + continue; + } else { // add this chunkSuit4CPV into currentChunkList or splitChunkList + if (n == 0) { + currentChunkList.add(chunkSuit4CPV); } else { - splitChunkList.computeIfAbsent(i, k -> new ArrayList<>()); - splitChunkList - .get(i) - .add( - new ChunkSuit4CPV(splitChunkMetadataMap.get(i), splitBatchDataMap.get(i).flip())); + int idx = (int) Math.floor((FP.timestamp - startTime) * 1.0 / interval); // global index + splitChunkList.computeIfAbsent(idx, k -> new ArrayList<>()); + splitChunkList.get(idx).add(chunkSuit4CPV); } } } } +// Map<Integer, BatchData> splitBatchDataMap = new HashMap<>(); +// Map<Integer, ChunkMetadata> splitChunkMetadataMap = new HashMap<>(); +// while (timeDecoder.hasNext(timeBuffer)) { +// long timestamp = timeDecoder.readLong(timeBuffer); +// // prepare corresponding batchData +// if (timestamp < curStartTime) { +// switch (dataType) { +// case INT32: +// valueDecoder.readInt(valueBuffer); +// break; +// case INT64: +// valueDecoder.readLong(valueBuffer); +// break; +// case FLOAT: +// valueDecoder.readFloat(valueBuffer); +// break; +// case DOUBLE: +// valueDecoder.readDouble(valueBuffer); +// break; +// default: +// throw new UnSupportedDataTypeException(String.valueOf(dataType)); +// } +// continue; +// } +// if (timestamp >= endTime) { +// break; +// } +// int idx = (int) Math.floor((timestamp - startTime) * 1.0 / interval); +// if (!splitBatchDataMap.containsKey(idx)) { +// // create batchData +// BatchData batch1 = BatchDataFactory.createBatchData(dataType, true, false); +// splitBatchDataMap.put(idx, batch1); +// Statistics statistics = null; +// switch (dataType) { +// case INT32: +// statistics = new IntegerStatistics(); +// break; +// case INT64: +// statistics = new LongStatistics(); +// break; +// case FLOAT: +// statistics = new FloatStatistics(); +// break; +// case DOUBLE: +// statistics = new DoubleStatistics(); +// break; +// default: +// break; +// } +// // create chunkMetaData +// ChunkMetadata chunkMetadata1 = +// new ChunkMetadata( +// chunkMetadata.getMeasurementUid(), +// chunkMetadata.getDataType(), +// chunkMetadata.getOffsetOfChunkHeader(), +// statistics); +// chunkMetadata1.setVersion(chunkMetadata.getVersion()); // don't miss this +// +// // // important, used later for candidate point verification +// // // (1) candidate point itself whether is in the deleted interval +// // // (2) candidate point whether is overlapped by a chunk with a larger version +// // number and +// // // the chunk does not have a deleted interval overlapping this candidate point +// // chunkMetadata1.setDeleteIntervalList(chunkMetadata.getDeleteIntervalList()); +// // // not use current Ii to modify deletedIntervalList any more +// +// splitChunkMetadataMap.put(idx, chunkMetadata1); +// } +// BatchData batchData1 = splitBatchDataMap.get(idx); +// ChunkMetadata chunkMetadata1 = splitChunkMetadataMap.get(idx); +// switch (dataType) { +// case INT32: +// int anInt = valueDecoder.readInt(valueBuffer); +// if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, anInt))) { +// // update batchData1 +// batchData1.putInt(timestamp, anInt); +// // update statistics of chunkMetadata1 +// chunkMetadata1.getStatistics().update(timestamp, anInt); +// } +// break; +// case INT64: +// long aLong = valueDecoder.readLong(valueBuffer); +// if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aLong))) { +// // update batchData1 +// batchData1.putLong(timestamp, aLong); +// // update statistics of chunkMetadata1 +// chunkMetadata1.getStatistics().update(timestamp, aLong); +// } +// break; +// case FLOAT: +// float aFloat = valueDecoder.readFloat(valueBuffer); +// if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aFloat))) { +// // update batchData1 +// batchData1.putFloat(timestamp, aFloat); +// // update statistics of chunkMetadata1 +// chunkMetadata1.getStatistics().update(timestamp, aFloat); +// } +// break; +// case DOUBLE: +// double aDouble = valueDecoder.readDouble(valueBuffer); +// if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aDouble))) { +// // update batchData1 +// batchData1.putDouble(timestamp, aDouble); +// // update statistics of chunkMetadata1 +// chunkMetadata1.getStatistics().update(timestamp, aDouble); +// } +// break; +// default: +// throw new UnSupportedDataTypeException(String.valueOf(dataType)); +// } +// } +// +// int curIdx = (int) Math.floor((curStartTime - startTime) * 1.0 / interval); +// for (Integer i : splitBatchDataMap.keySet()) { +// if (!splitBatchDataMap.get(i).isEmpty()) { +// if (i == curIdx) { +// currentChunkList.add( +// new ChunkSuit4CPV(splitChunkMetadataMap.get(i), splitBatchDataMap.get(i).flip())); +// } else { +// splitChunkList.computeIfAbsent(i, k -> new ArrayList<>()); +// splitChunkList +// .get(i) +// .add( +// new ChunkSuit4CPV(splitChunkMetadataMap.get(i), splitBatchDataMap.get(i).flip())); +// } +// } +// } + /** * chunk里点时间戳从小到大递增, 所以遍历直到点的时间戳大于或等于candidateTimestamp即可结束 * @@ -265,7 +295,9 @@ public class PageReader implements IPageReader { return false; } - /** @return the returned BatchData may be empty, but never be null */ + /** + * @return the returned BatchData may be empty, but never be null + */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning @Override public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
