This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/M4-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a99118667202d1fca2bf89f09968004d96ec2416 Author: Lei Rui <[email protected]> AuthorDate: Thu Jan 26 15:35:36 2023 +0800 fix --- .../dataset/groupby/LocalGroupByExecutor4CPV.java | 551 ++++++++++++--------- .../apache/iotdb/db/tools/TsFileSketchTool.java | 3 +- .../apache/iotdb/db/integration/m4/MyTest1.java | 136 ++--- .../iotdb/tsfile/common/conf/TSFileConfig.java | 2 +- .../iotdb/tsfile/file/metadata/ChunkMetadata.java | 53 +- .../file/metadata/statistics/BinaryStatistics.java | 6 +- .../metadata/statistics/BooleanStatistics.java | 6 +- .../metadata/statistics/IntegerStatistics.java | 103 ++-- .../file/metadata/statistics/LongStatistics.java | 99 +--- .../file/metadata/statistics/Statistics.java | 7 +- .../file/metadata/statistics/StepRegress.java | 13 +- .../iotdb/tsfile/read/common/ChunkSuit4CPV.java | 225 +++++---- .../iotdb/tsfile/read/reader/page/PageReader.java | 91 ++-- 13 files changed, 658 insertions(+), 637 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java index 14cf495d8f..95be8ffc41 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java @@ -19,14 +19,6 @@ package org.apache.iotdb.db.query.dataset.groupby; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.query.QueryProcessException; @@ -47,6 +39,15 @@ import org.apache.iotdb.tsfile.read.reader.IPageReader; import org.apache.iotdb.tsfile.read.reader.page.PageReader; import org.apache.iotdb.tsfile.utils.Pair; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.Set; + /** * Sql format: SELECT min_time(s0), max_time(s0), first_value(s0), last_value(s0), min_value(s0), * max_value(s0) ROM root.xx group by ([tqs,tqe),IntervalLength). Requirements: (1) Don't change the @@ -73,34 +74,50 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // private PriorityMergeReader mergeReader; - public LocalGroupByExecutor4CPV(PartialPath path, Set<String> allSensors, TSDataType dataType, - QueryContext context, Filter timeFilter, TsFileFilter fileFilter, boolean ascending) + public LocalGroupByExecutor4CPV( + PartialPath path, + Set<String> allSensors, + TSDataType dataType, + QueryContext context, + Filter timeFilter, + TsFileFilter fileFilter, + boolean ascending) throws StorageEngineException, QueryProcessException { this.tsDataType = dataType; // this.mergeReader = new PriorityMergeReader(); // get all data sources - QueryDataSource queryDataSource = QueryResourceManager.getInstance() - .getQueryDataSource(path, context, this.timeFilter); + QueryDataSource queryDataSource = + QueryResourceManager.getInstance().getQueryDataSource(path, context, this.timeFilter); // update filter by TTL this.timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter); - SeriesReader seriesReader = new SeriesReader(path, allSensors, dataType, context, - queryDataSource, timeFilter, null, fileFilter, ascending); + SeriesReader seriesReader = + new SeriesReader( + path, + allSensors, + dataType, + context, + queryDataSource, + timeFilter, + null, + fileFilter, + ascending); // unpackAllOverlappedFilesToTimeSeriesMetadata try { // TODO: this might be bad to load all chunk metadata at first futureChunkList.addAll(seriesReader.getAllChunkMetadatas4CPV()); // order futureChunkList by chunk startTime - futureChunkList.sort(new Comparator<ChunkSuit4CPV>() { - public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - return ((Comparable) (o1.getChunkMetadata().getStartTime())).compareTo( - o2.getChunkMetadata().getStartTime()); - } - }); + futureChunkList.sort( + new Comparator<ChunkSuit4CPV>() { + public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { + return ((Comparable) (o1.getChunkMetadata().getStartTime())) + .compareTo(o2.getChunkMetadata().getStartTime()); + } + }); } catch (IOException e) { throw new QueryProcessException(e.getMessage()); } @@ -111,8 +128,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { results.add(aggrResult); } - private void getCurrentChunkListFromFutureChunkList(long curStartTime, long curEndTime, - long startTime, long endTime, long interval) throws IOException { + private void getCurrentChunkListFromFutureChunkList( + long curStartTime, long curEndTime, long startTime, long endTime, long interval) + throws IOException { // empty currentChunkList currentChunkList = new ArrayList<>(); @@ -151,30 +169,39 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { itr.remove(); // B: loads chunk data from disk to memory // C: decompress page data, split time&value buffers - List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( - chunkSuit4CPV.getChunkMetadata(), this.timeFilter); -// if (pageReaderList.size() > 1) { -// throw new IOException("Against the assumption that there is only one page in a chunk!"); -// } -// for (IPageReader pageReader : pageReaderList) { + List<IPageReader> pageReaderList = + FileLoaderUtils.loadPageReaderList(chunkSuit4CPV.getChunkMetadata(), this.timeFilter); + // if (pageReaderList.size() > 1) { + // throw new IOException("Against the assumption that there is only one page in a + // chunk!"); + // } + // for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk // assume all data on disk, no data in memory - ((PageReader) pageReaderList.get(0)).split4CPV(startTime, endTime, interval, curStartTime, - currentChunkList, splitChunkList, chunkMetadata); -// } + ((PageReader) pageReaderList.get(0)) + .split4CPV( + startTime, + endTime, + interval, + curStartTime, + currentChunkList, + splitChunkList, + chunkMetadata); + // } } } } /** * @param curStartTime closed - * @param curEndTime open - * @param startTime closed - * @param endTime open + * @param curEndTime open + * @param startTime closed + * @param endTime open */ @Override - public List<AggregateResult> calcResult(long curStartTime, long curEndTime, long startTime, - long endTime, long interval) throws IOException { + public List<AggregateResult> calcResult( + long curStartTime, long curEndTime, long startTime, long endTime, long interval) + throws IOException { // clear result cache for (AggregateResult result : results) { result.reset(); @@ -194,81 +221,85 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { return results; } - /** - * 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中 - */ -// private void updateBatchData(ChunkSuit4CPV chunkSuit4CPV, TSDataType dataType) { -// if (chunkSuit4CPV.getBatchData() != null) { -// BatchData batchData1 = BatchDataFactory.createBatchData(dataType, true, false); -// Statistics statistics = null; -// switch (tsDataType) { -// case INT32: -// statistics = new IntegerStatistics(); -// break; -// case INT64: -// statistics = new LongStatistics(); -// break; -// case FLOAT: -// statistics = new FloatStatistics(); -// break; -// case DOUBLE: -// statistics = new DoubleStatistics(); -// break; -// default: -// break; -// } -// BatchDataIterator batchDataIterator = chunkSuit4CPV.getBatchData().getBatchDataIterator(); -// while (batchDataIterator.hasNextTimeValuePair()) { -// TimeValuePair timeValuePair = batchDataIterator.nextTimeValuePair(); -// long timestamp = timeValuePair.getTimestamp(); -// TsPrimitiveType value = timeValuePair.getValue(); -// boolean isDeletedItself = false; -// if (chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList() != null) { -// for (TimeRange timeRange : chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList()) { -// if (timeRange.contains(timestamp)) { -// isDeletedItself = true; -// break; -// } -// } -// } -// if (!isDeletedItself) { -// switch (dataType) { -// case INT32: -// batchData1.putInt(timestamp, value.getInt()); -// statistics.update(timestamp, value.getInt()); -// break; -// case INT64: -// batchData1.putLong(timestamp, value.getLong()); -// statistics.update(timestamp, value.getLong()); -// break; -// case FLOAT: -// batchData1.putFloat(timestamp, value.getFloat()); -// statistics.update(timestamp, value.getFloat()); -// break; -// case DOUBLE: -// batchData1.putDouble(timestamp, value.getDouble()); -// statistics.update(timestamp, value.getDouble()); -// break; -// default: -// throw new UnSupportedDataTypeException(String.valueOf(dataType)); -// } -// } -// } -// chunkSuit4CPV.setBatchData(batchData1); -// chunkSuit4CPV.getChunkMetadata().setStatistics(statistics); -// } -// } - private void calculateBottomPoint(List<ChunkSuit4CPV> currentChunkList, long startTime, - long endTime, long interval, long curStartTime) throws IOException { + /** 对BatchData应用deletes操作,获得更新的BatchData和statistics赋值到chunkSuit4CPV中 */ + // private void updateBatchData(ChunkSuit4CPV chunkSuit4CPV, TSDataType dataType) { + // if (chunkSuit4CPV.getBatchData() != null) { + // BatchData batchData1 = BatchDataFactory.createBatchData(dataType, true, false); + // Statistics statistics = null; + // switch (tsDataType) { + // case INT32: + // statistics = new IntegerStatistics(); + // break; + // case INT64: + // statistics = new LongStatistics(); + // break; + // case FLOAT: + // statistics = new FloatStatistics(); + // break; + // case DOUBLE: + // statistics = new DoubleStatistics(); + // break; + // default: + // break; + // } + // BatchDataIterator batchDataIterator = chunkSuit4CPV.getBatchData().getBatchDataIterator(); + // while (batchDataIterator.hasNextTimeValuePair()) { + // TimeValuePair timeValuePair = batchDataIterator.nextTimeValuePair(); + // long timestamp = timeValuePair.getTimestamp(); + // TsPrimitiveType value = timeValuePair.getValue(); + // boolean isDeletedItself = false; + // if (chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList() != null) { + // for (TimeRange timeRange : chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList()) { + // if (timeRange.contains(timestamp)) { + // isDeletedItself = true; + // break; + // } + // } + // } + // if (!isDeletedItself) { + // switch (dataType) { + // case INT32: + // batchData1.putInt(timestamp, value.getInt()); + // statistics.update(timestamp, value.getInt()); + // break; + // case INT64: + // batchData1.putLong(timestamp, value.getLong()); + // statistics.update(timestamp, value.getLong()); + // break; + // case FLOAT: + // batchData1.putFloat(timestamp, value.getFloat()); + // statistics.update(timestamp, value.getFloat()); + // break; + // case DOUBLE: + // batchData1.putDouble(timestamp, value.getDouble()); + // statistics.update(timestamp, value.getDouble()); + // break; + // default: + // throw new UnSupportedDataTypeException(String.valueOf(dataType)); + // } + // } + // } + // chunkSuit4CPV.setBatchData(batchData1); + // chunkSuit4CPV.getChunkMetadata().setStatistics(statistics); + // } + // } + private void calculateBottomPoint( + List<ChunkSuit4CPV> currentChunkList, + long startTime, + long endTime, + long interval, + long curStartTime) + throws IOException { while (currentChunkList.size() > 0) { // 循环1 TODO debug - // check size>0 because after updateBPTP empty ChunkSuit4CPV will be removed from currentChunkList + // check size>0 because after updateBPTP empty ChunkSuit4CPV will be removed from + // currentChunkList // 按照bottomValue排序,找出BP candidate set currentChunkList.sort( new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for different // aggregations public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - return ((Comparable) (o1.getStatistics().getMinValue())).compareTo( - o2.getStatistics().getMinValue()); + return ((Comparable) (o1.getStatistics().getMinValue())) + .compareTo(o2.getStatistics().getMinValue()); // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata } }); @@ -284,15 +315,19 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } } - List<ChunkSuit4CPV> nonLazyLoad = new ArrayList<>( - candidateSet); // TODO check, whether nonLazyLoad remove affects candidateSet + List<ChunkSuit4CPV> nonLazyLoad = + new ArrayList<>( + candidateSet); // TODO check, whether nonLazyLoad remove affects candidateSet nonLazyLoad.sort( new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for version public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - return new MergeReaderPriority(o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()).compareTo( - new MergeReaderPriority(o1.getChunkMetadata().getVersion(), - o1.getChunkMetadata().getOffsetOfChunkHeader())); + return new MergeReaderPriority( + o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()) + .compareTo( + new MergeReaderPriority( + o1.getChunkMetadata().getVersion(), + o1.getChunkMetadata().getOffsetOfChunkHeader())); } }); while (true) { // 循环2 @@ -302,14 +337,16 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) { // TODO 注意delete intervals的传递 if (chunkSuit4CPV.getPageReader() == null) { - List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( - chunkSuit4CPV.getChunkMetadata(), this.timeFilter); + List<IPageReader> pageReaderList = + FileLoaderUtils.loadPageReaderList( + chunkSuit4CPV.getChunkMetadata(), this.timeFilter); chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0)); } else { // TODO 注意delete intervals的传递:主要是被重写点作为点删除传递 // pageReader does not refer to the same deleteInterval as those in chunkMetadata // after chunkMetadata executes insertIntoSortedDeletions - chunkSuit4CPV.getPageReader() + chunkSuit4CPV + .getPageReader() .setDeleteIntervalList(chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList()); } // TODO chunk data read operation (c): get all data points @@ -323,29 +360,33 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } // 否则,找出candidate set里非lazy load里version最高的那个块的BP点作为candidate point ChunkSuit4CPV candidate = nonLazyLoad.get(0); // TODO check sort right - MergeReaderPriority candidateVersion = new MergeReaderPriority( - candidate.getChunkMetadata().getVersion(), - candidate.getChunkMetadata().getOffsetOfChunkHeader()); + MergeReaderPriority candidateVersion = + new MergeReaderPriority( + candidate.getChunkMetadata().getVersion(), + candidate.getChunkMetadata().getOffsetOfChunkHeader()); long candidateTimestamp = candidate.getStatistics().getBottomTimestamp(); // TODO check Object candidateValue = candidate.getStatistics().getMinValue(); // TODO check // verify if this candidate point is deleted boolean isDeletedItself = false; - // TODO add M4 interval virtual delete since BP/TP is not updated in getCurrentChunkListFromFutureChunkList + // TODO add M4 interval virtual delete since BP/TP is not updated in + // getCurrentChunkListFromFutureChunkList if (candidateTimestamp < curStartTime || candidateTimestamp >= curStartTime + interval) { isDeletedItself = true; } -// else if (candidate.getChunkMetadata().getDeleteIntervalList() != null) { -// for (TimeRange timeRange : candidate.getChunkMetadata().getDeleteIntervalList()) { -// if (timeRange.contains(candidateTimestamp)) { -// isDeletedItself = true; -// break; -// } // TODO add break early -// } -// } + // else if (candidate.getChunkMetadata().getDeleteIntervalList() != null) { + // for (TimeRange timeRange : candidate.getChunkMetadata().getDeleteIntervalList()) + // { + // if (timeRange.contains(candidateTimestamp)) { + // isDeletedItself = true; + // break; + // } // TODO add break early + // } + // } else { - isDeletedItself = PageReader.isDeleted(candidateTimestamp, - candidate.getChunkMetadata().getDeleteIntervalList()); + isDeletedItself = + PageReader.isDeleted( + candidateTimestamp, candidate.getChunkMetadata().getDeleteIntervalList()); } if (isDeletedItself) { // 是被删除,则标记candidate point所在块为lazy load,然后回到循环2 nonLazyLoad.remove(candidate); @@ -360,8 +401,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { List<ChunkSuit4CPV> overlaps = new ArrayList<>(); for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) { ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata(); - MergeReaderPriority version = new MergeReaderPriority(chunkMetadata.getVersion(), - chunkMetadata.getOffsetOfChunkHeader()); + MergeReaderPriority version = + new MergeReaderPriority( + chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader()); if (version.compareTo(candidateVersion) <= 0) { // including bottomChunkMetadata continue; } @@ -380,10 +422,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } if (!isUpdate && overlaps.size() == 0) { // 否被overlap,则当前candidate point就是计算结果,结束 - results.get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, + results + .get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] - .updateResultUsingValues(new long[]{candidateTimestamp}, 1, - new Object[]{candidateValue}); + .updateResultUsingValues( + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); // TODO check updateResult return; // 计算结束 } else if (!isUpdate) { // 是被overlap,则partial scan所有这些overlap的块 @@ -391,8 +434,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // scan这个chunk的数据 // TODO chunk data read operation (a): check existence of data point at a timestamp if (chunkSuit4CPV.getPageReader() == null) { - List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( - chunkSuit4CPV.getChunkMetadata(), this.timeFilter); + List<IPageReader> pageReaderList = + FileLoaderUtils.loadPageReaderList( + chunkSuit4CPV.getChunkMetadata(), this.timeFilter); chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0)); } isUpdate = chunkSuit4CPV.checkIfExist(candidateTimestamp); @@ -402,10 +446,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } } if (!isUpdate) { // partial scan了所有overlap的块都没有找到这样的点,则当前candidate point就是计算结果,结束 - results.get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, + results + .get(4) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] - .updateResultUsingValues(new long[]{candidateTimestamp}, 1, - new Object[]{candidateValue}); + .updateResultUsingValues( + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); // TODO check updateResult return; // 计算结束 } else { // 找到这样的点,于是标记candidate point所在块为lazy @@ -415,10 +460,12 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { tmp.add(new TimeRange(candidateTimestamp, candidateTimestamp)); candidate.getChunkMetadata().setDeleteIntervalList(tmp); } else { -// candidate.getChunkMetadata().getDeleteIntervalList() -// .add(new TimeRange(candidateTimestamp, candidateTimestamp)); // TODO check - candidate.getChunkMetadata() - .insertIntoSortedDeletions(candidateTimestamp, candidateTimestamp);// TODO check + // candidate.getChunkMetadata().getDeleteIntervalList() + // .add(new TimeRange(candidateTimestamp, candidateTimestamp)); // + // TODO check + candidate + .getChunkMetadata() + .insertIntoSortedDeletions(candidateTimestamp, candidateTimestamp); // TODO check // TODO debug chunk and page deleteInterval not the same } // 删除那里不需要再加了,而这里更新就需要手动加一下删除操作 @@ -433,19 +480,26 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } } - private void calculateTopPoint(List<ChunkSuit4CPV> currentChunkList, long startTime, long endTime, - long interval, long curStartTime) throws IOException { + private void calculateTopPoint( + List<ChunkSuit4CPV> currentChunkList, + long startTime, + long endTime, + long interval, + long curStartTime) + throws IOException { while (currentChunkList.size() > 0) { // 循环1 - // check size>0 because after updateBPTP empty ChunkSuit4CPV will be removed from currentChunkList + // check size>0 because after updateBPTP empty ChunkSuit4CPV will be removed from + // currentChunkList // 按照topValue排序,找出TP candidate set currentChunkList.sort( new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for different // aggregations public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - return ((Comparable) (o2.getStatistics().getMaxValue())).compareTo( - o1.getStatistics().getMaxValue()); + return ((Comparable) (o2.getStatistics().getMaxValue())) + .compareTo(o1.getStatistics().getMaxValue()); // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata, - // because statistics of ChunkSuit4CPV is updated, while statistics of ChunkSuit4CPV.ChunkMetadata + // because statistics of ChunkSuit4CPV is updated, while statistics of + // ChunkSuit4CPV.ChunkMetadata // is fixed. } }); @@ -466,10 +520,13 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { nonLazyLoad.sort( new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for version public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - return new MergeReaderPriority(o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()).compareTo( - new MergeReaderPriority(o1.getChunkMetadata().getVersion(), - o1.getChunkMetadata().getOffsetOfChunkHeader())); + return new MergeReaderPriority( + o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()) + .compareTo( + new MergeReaderPriority( + o1.getChunkMetadata().getVersion(), + o1.getChunkMetadata().getOffsetOfChunkHeader())); } }); while (true) { // 循环2 @@ -479,15 +536,17 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) { // TODO 注意delete intervals的传递 if (chunkSuit4CPV.getPageReader() == null) { - List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( - chunkSuit4CPV.getChunkMetadata(), this.timeFilter); + List<IPageReader> pageReaderList = + FileLoaderUtils.loadPageReaderList( + chunkSuit4CPV.getChunkMetadata(), this.timeFilter); // we assume and guarantee only one page in a chunk chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0)); } else { // TODO 注意delete intervals的传递:主要是被重写点作为点删除传递 // pageReader does not refer to the same deleteInterval as those in chunkMetadata // after chunkMetadata executes insertIntoSortedDeletions - chunkSuit4CPV.getPageReader() + chunkSuit4CPV + .getPageReader() .setDeleteIntervalList(chunkSuit4CPV.getChunkMetadata().getDeleteIntervalList()); } // TODO chunk data read operation (c): get all data points @@ -501,11 +560,13 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } // 否则,找出candidate set里非lazy load里version最高的那个块的TP点作为candidate point ChunkSuit4CPV candidate = nonLazyLoad.get(0); // TODO check sort right - MergeReaderPriority candidateVersion = new MergeReaderPriority( - candidate.getChunkMetadata().getVersion(), - candidate.getChunkMetadata().getOffsetOfChunkHeader()); + MergeReaderPriority candidateVersion = + new MergeReaderPriority( + candidate.getChunkMetadata().getVersion(), + candidate.getChunkMetadata().getOffsetOfChunkHeader()); // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata, - // because statistics of ChunkSuit4CPV is updated, while statistics of ChunkSuit4CPV.ChunkMetadata + // because statistics of ChunkSuit4CPV is updated, while statistics of + // ChunkSuit4CPV.ChunkMetadata // is fixed. long candidateTimestamp = candidate.getStatistics().getTopTimestamp(); // TODO check Object candidateValue = candidate.getStatistics().getMaxValue(); // TODO check @@ -513,12 +574,14 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // verify这个candidate point // 是否被删除 boolean isDeletedItself = false; - // TODO add M4 interval virtual delete since BP/TP is not updated in getCurrentChunkListFromFutureChunkList + // TODO add M4 interval virtual delete since BP/TP is not updated in + // getCurrentChunkListFromFutureChunkList if (candidateTimestamp < curStartTime || candidateTimestamp >= curStartTime + interval) { isDeletedItself = true; } else { - isDeletedItself = PageReader.isDeleted(candidateTimestamp, - candidate.getChunkMetadata().getDeleteIntervalList()); + isDeletedItself = + PageReader.isDeleted( + candidateTimestamp, candidate.getChunkMetadata().getDeleteIntervalList()); } if (isDeletedItself) { // 是被删除,则标记candidate point所在块为lazy load,然后回到循环2 nonLazyLoad.remove(candidate); @@ -533,8 +596,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { List<ChunkSuit4CPV> overlaps = new ArrayList<>(); for (ChunkSuit4CPV chunkSuit4CPV : currentChunkList) { ChunkMetadata chunkMetadata = chunkSuit4CPV.getChunkMetadata(); - MergeReaderPriority version = new MergeReaderPriority(chunkMetadata.getVersion(), - chunkMetadata.getOffsetOfChunkHeader()); + MergeReaderPriority version = + new MergeReaderPriority( + chunkMetadata.getVersion(), chunkMetadata.getOffsetOfChunkHeader()); if (version.compareTo(candidateVersion) <= 0) { // including topChunkMetadata continue; } @@ -553,10 +617,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } if (!isUpdate && overlaps.size() == 0) { // 否被overlap,则当前candidate point就是计算结果,结束 - results.get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, + results + .get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] - .updateResultUsingValues(new long[]{candidateTimestamp}, 1, - new Object[]{candidateValue}); + .updateResultUsingValues( + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); // TODO check updateResult return; // 计算结束 } else if (!isUpdate) { // 是被overlap,则partial scan所有这些overlap的块 @@ -564,8 +629,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // scan这个chunk的数据 // TODO chunk data read operation (a): check existence of data point at a timestamp if (chunkSuit4CPV.getPageReader() == null) { - List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( - chunkSuit4CPV.getChunkMetadata(), this.timeFilter); + List<IPageReader> pageReaderList = + FileLoaderUtils.loadPageReaderList( + chunkSuit4CPV.getChunkMetadata(), this.timeFilter); chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0)); } isUpdate = chunkSuit4CPV.checkIfExist(candidateTimestamp); @@ -575,10 +641,11 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } } if (!isUpdate) { // partial scan了所有overlap的块都没有找到这样的点,则当前candidate point就是计算结果,结束 - results.get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, + results + .get(5) // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] - .updateResultUsingValues(new long[]{candidateTimestamp}, 1, - new Object[]{candidateValue}); + .updateResultUsingValues( + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); // TODO check updateResult return; // 计算结束 } else { // 找到这样的点,于是标记candidate point所在块为lazy @@ -588,10 +655,12 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { tmp.add(new TimeRange(candidateTimestamp, candidateTimestamp)); candidate.getChunkMetadata().setDeleteIntervalList(tmp); } else { -// candidate.getChunkMetadata().getDeleteIntervalList() -// .add(new TimeRange(candidateTimestamp, candidateTimestamp)); // TODO check - candidate.getChunkMetadata() - .insertIntoSortedDeletions(candidateTimestamp, candidateTimestamp);// TODO check + // candidate.getChunkMetadata().getDeleteIntervalList() + // .add(new TimeRange(candidateTimestamp, candidateTimestamp)); // + // TODO check + candidate + .getChunkMetadata() + .insertIntoSortedDeletions(candidateTimestamp, candidateTimestamp); // TODO check } // 删除那里不需要再加了,而这里更新就需要手动加一下删除操作 nonLazyLoad.remove(candidate); @@ -605,8 +674,13 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { } } - private void calculateFirstPoint(List<ChunkSuit4CPV> currentChunkList, long startTime, - long endTime, long interval, long curStartTime) throws IOException { + private void calculateFirstPoint( + List<ChunkSuit4CPV> currentChunkList, + long startTime, + long endTime, + long interval, + long curStartTime) + throws IOException { while (currentChunkList.size() > 0) { // 循环1 TODO debug when currentChunkList size=0 // 按照startTime和version排序,找出疑似FP candidate currentChunkList.sort( @@ -614,15 +688,19 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // aggregations public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata - int res = ((Comparable) (o1.getStatistics().getStartTime())).compareTo( - o2.getStatistics().getStartTime()); + int res = + ((Comparable) (o1.getStatistics().getStartTime())) + .compareTo(o2.getStatistics().getStartTime()); if (res != 0) { return res; } else { - return new MergeReaderPriority(o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()).compareTo( - new MergeReaderPriority(o1.getChunkMetadata().getVersion(), - o1.getChunkMetadata().getOffsetOfChunkHeader())); + return new MergeReaderPriority( + o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()) + .compareTo( + new MergeReaderPriority( + o1.getChunkMetadata().getVersion(), + o1.getChunkMetadata().getOffsetOfChunkHeader())); } } }); @@ -632,8 +710,9 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { if (susp_candidate.isLazyLoad()) { // 如果是lazy // load,则此时load、应用deletes、更新batchData和statistics,取消lazyLoad标记,然后回到循环1 if (susp_candidate.getPageReader() == null) { - List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( - susp_candidate.getChunkMetadata(), this.timeFilter); + List<IPageReader> pageReaderList = + FileLoaderUtils.loadPageReaderList( + susp_candidate.getChunkMetadata(), this.timeFilter); susp_candidate.setPageReader((PageReader) pageReaderList.get(0)); } // TODO update FP equal to or after statistics.getEndTime @@ -650,8 +729,8 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { boolean isDeletedItself = false; long deleteEndTime = -1; - List<TimeRange> deleteIntervalList = susp_candidate.getChunkMetadata() - .getDeleteIntervalList(); + List<TimeRange> deleteIntervalList = + susp_candidate.getChunkMetadata().getDeleteIntervalList(); if (deleteIntervalList != null) { int deleteCursor = 0; while (deleteCursor < deleteIntervalList.size()) { @@ -683,34 +762,47 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // 否则,则就是计算结果,结束 // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] - results.get(0).updateResultUsingValues(new long[]{candidateTimestamp}, 1, - new Object[]{candidateValue}); - results.get(2).updateResultUsingValues(new long[]{candidateTimestamp}, 1, - new Object[]{candidateValue}); + results + .get(0) + .updateResultUsingValues( + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + results + .get(2) + .updateResultUsingValues( + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); return; } } } } - private void calculateLastPoint(List<ChunkSuit4CPV> currentChunkList, long startTime, - long endTime, long interval, long curStartTime) throws IOException { + private void calculateLastPoint( + List<ChunkSuit4CPV> currentChunkList, + long startTime, + long endTime, + long interval, + long curStartTime) + throws IOException { while (currentChunkList.size() > 0) { // 循环1 // 按照startTime和version排序,找出疑似LP candidate currentChunkList.sort( new Comparator<ChunkSuit4CPV>() { // TODO double check the sort order logic for different // aggregations public int compare(ChunkSuit4CPV o1, ChunkSuit4CPV o2) { - int res = ((Comparable) (o2.getStatistics().getEndTime())).compareTo( - o1.getStatistics().getEndTime()); + int res = + ((Comparable) (o2.getStatistics().getEndTime())) + .compareTo(o1.getStatistics().getEndTime()); // NOTE here get statistics from ChunkSuit4CPV, not from ChunkSuit4CPV.ChunkMetadata if (res != 0) { return res; } else { - return new MergeReaderPriority(o2.getChunkMetadata().getVersion(), - o2.getChunkMetadata().getOffsetOfChunkHeader()).compareTo( - new MergeReaderPriority(o1.getChunkMetadata().getVersion(), - o1.getChunkMetadata().getOffsetOfChunkHeader())); + return new MergeReaderPriority( + o2.getChunkMetadata().getVersion(), + o2.getChunkMetadata().getOffsetOfChunkHeader()) + .compareTo( + new MergeReaderPriority( + o1.getChunkMetadata().getVersion(), + o1.getChunkMetadata().getOffsetOfChunkHeader())); } } }); @@ -719,17 +811,18 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { ChunkSuit4CPV susp_candidate = currentChunkList.get(0); if (susp_candidate.isLazyLoad()) { // 如果是lazy // load,则此时load、应用deletes、更新batchData和statistics,取消lazyLoad标记,然后回到循环1 -// currentChunkList.remove(susp_candidate); // TODO check this -// List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( -// susp_candidate.getChunkMetadata(), this.timeFilter); -// for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk -// ((PageReader) pageReader).split4CPV(startTime, endTime, interval, curStartTime, -// currentChunkList, null, -// susp_candidate.getChunkMetadata()); // 新增的ChunkSuit4CPV默认isLazyLoad=false -// } + // currentChunkList.remove(susp_candidate); // TODO check this + // List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( + // susp_candidate.getChunkMetadata(), this.timeFilter); + // for (IPageReader pageReader : pageReaderList) { // assume only one page in a chunk + // ((PageReader) pageReader).split4CPV(startTime, endTime, interval, curStartTime, + // currentChunkList, null, + // susp_candidate.getChunkMetadata()); // 新增的ChunkSuit4CPV默认isLazyLoad=false + // } if (susp_candidate.getPageReader() == null) { - List<IPageReader> pageReaderList = FileLoaderUtils.loadPageReaderList( - susp_candidate.getChunkMetadata(), this.timeFilter); + List<IPageReader> pageReaderList = + FileLoaderUtils.loadPageReaderList( + susp_candidate.getChunkMetadata(), this.timeFilter); susp_candidate.setPageReader((PageReader) pageReaderList.get(0)); } // TODO update FP equal to or after statistics.getEndTime @@ -746,18 +839,20 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { boolean isDeletedItself = false; long deleteStartTime = Long.MAX_VALUE; // TODO check - List<TimeRange> deleteIntervalList = susp_candidate.getChunkMetadata() - .getDeleteIntervalList(); -// if (susp_candidate.getChunkMetadata().getDeleteIntervalList() != null) { -// for (TimeRange timeRange : susp_candidate.getChunkMetadata().getDeleteIntervalList()) { -// if (timeRange.contains(candidateTimestamp)) { -// isDeletedItself = true; -// deleteStartTime = Math.min(deleteStartTime, -// timeRange.getMin()); // deleteStartTime不会小于chunkStartTime,因为否则的话这个chunk就会modifyChunkMetaData步骤里被处理掉整个删掉 -// // TODO check -// } -// } -// } + List<TimeRange> deleteIntervalList = + susp_candidate.getChunkMetadata().getDeleteIntervalList(); + // if (susp_candidate.getChunkMetadata().getDeleteIntervalList() != null) { + // for (TimeRange timeRange : + // susp_candidate.getChunkMetadata().getDeleteIntervalList()) { + // if (timeRange.contains(candidateTimestamp)) { + // isDeletedItself = true; + // deleteStartTime = Math.min(deleteStartTime, + // timeRange.getMin()); // + // deleteStartTime不会小于chunkStartTime,因为否则的话这个chunk就会modifyChunkMetaData步骤里被处理掉整个删掉 + // // TODO check + // } + // } + // } if (deleteIntervalList != null) { int deleteCursor = 0; while (deleteCursor < deleteIntervalList.size()) { @@ -789,10 +884,14 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // 否则,则就是计算结果,结束 // TODO check: minTimestamp, maxTimestamp, firstValue, lastValue, // minValue[bottomTimestamp], maxValue[topTimestamp] - results.get(1).updateResultUsingValues(new long[]{candidateTimestamp}, 1, - new Object[]{candidateValue}); - results.get(3).updateResultUsingValues(new long[]{candidateTimestamp}, 1, - new Object[]{candidateValue}); + results + .get(1) + .updateResultUsingValues( + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); + results + .get(3) + .updateResultUsingValues( + new long[] {candidateTimestamp}, 1, new Object[] {candidateValue}); return; } } diff --git a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java index d5489700cc..c416e5690a 100644 --- a/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java +++ b/server/src/main/java/org/apache/iotdb/db/tools/TsFileSketchTool.java @@ -245,8 +245,7 @@ public class TsFileSketchTool { } private static Pair<String, String> checkArgs(String[] args) { - String filename = - "D:\\plain-plain-noindex.tsfile"; + String filename = "D:\\plain-plain-noindex.tsfile"; String outFile = "TsFile_sketch_view.txt"; if (args.length == 1) { filename = args[0]; diff --git a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java index ddbc69e6cc..29ad2828b1 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/m4/MyTest1.java @@ -19,32 +19,34 @@ package org.apache.iotdb.db.integration.m4; -import static org.junit.Assert.fail; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.Statement; -import java.util.Locale; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.compaction.CompactionStrategy; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; + import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Locale; + +import static org.junit.Assert.fail; + public class MyTest1 { private static final String TIMESTAMP_STR = "Time"; private static String[] creationSqls = - new String[]{ - "SET STORAGE GROUP TO root.vehicle.d0", - "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT64, ENCODING=PLAIN", - // iotdb的int类型的plain编码用的是自制的不支持random access,所以值类型用long + new String[] { + "SET STORAGE GROUP TO root.vehicle.d0", + "CREATE TIMESERIES root.vehicle.d0.s0 WITH DATATYPE=INT64, ENCODING=PLAIN", + // iotdb的int类型的plain编码用的是自制的不支持random access,所以值类型用long }; private final String d0s0 = "root.vehicle.d0.s0"; @@ -82,14 +84,14 @@ public class MyTest1 { prepareData1(); String[] res = - new String[]{ - "0,1,20,5,20,5[1],30[10]", - "25,25,45,8,30,8[25],40[30]", - "50,52,54,8,18,8[52],18[54]", - "75,null,null,null,null,null,null" + new String[] { + "0,1,20,5,20,5[1],30[10]", + "25,25,45,8,30,8[25],40[30]", + "50,52,54,8,18,8[52],18[54]", + "75,null,null,null,null,null,null" }; try (Connection connection = - DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute( @@ -129,14 +131,14 @@ public class MyTest1 { prepareData5(); String[] res = - new String[]{ - "0,1,20,5,20,5[1],30[10]", - "25,25,45,8,30,8[25],40[30]", - "50,null,null,null,null,null,null", - "75,null,null,null,null,null,null" + new String[] { + "0,1,20,5,20,5[1],30[10]", + "25,25,45,8,30,8[25],40[30]", + "50,null,null,null,null,null,null", + "75,null,null,null,null,null,null" }; try (Connection connection = - DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute( @@ -175,8 +177,8 @@ public class MyTest1 { // data: // https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { for (String sql : creationSqls) { @@ -218,8 +220,8 @@ public class MyTest1 { // data: // https://user-images.githubusercontent.com/33376433/151985070-73158010-8ba0-409d-a1c1-df69bad1aaee.png try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { for (String sql : creationSqls) { @@ -264,14 +266,14 @@ public class MyTest1 { prepareData2(); String[] res = - new String[]{ - "0,1,20,5,20,5[1],30[10]", - "25,25,27,8,20,8[25],20[27]", - "50,null,null,null,null,null,null", - "75,null,null,null,null,null,null" + new String[] { + "0,1,20,5,20,5[1],30[10]", + "25,25,27,8,20,8[25],20[27]", + "50,null,null,null,null,null,null", + "75,null,null,null,null,null,null" }; try (Connection connection = - DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute( @@ -311,8 +313,8 @@ public class MyTest1 { // data: // https://user-images.githubusercontent.com/33376433/151995378-07a2f8df-5cac-499a-ae88-e3b017eee07a.png try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { for (String sql : creationSqls) { @@ -358,16 +360,16 @@ public class MyTest1 { prepareData2(); String[] res = - new String[]{ - "0,1,20,5,20,5[1],30[10]", - "25,25,27,8,20,8[25],20[27]", - "50,null,null,null,null,null,null", - "75,null,null,null,null,null,null", - "100,120,120,8,8,8[120],8[120]", - "125,null,null,null,null,null,null" + new String[] { + "0,1,20,5,20,5[1],30[10]", + "25,25,27,8,20,8[25],20[27]", + "50,null,null,null,null,null,null", + "75,null,null,null,null,null,null", + "100,120,120,8,8,8[120],8[120]", + "125,null,null,null,null,null,null" }; try (Connection connection = - DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute( @@ -408,14 +410,14 @@ public class MyTest1 { prepareData3(); String[] res = - new String[]{ - "0,1,22,5,4,1[10],10[2]", - "25,30,40,8,2,2[40],8[30]", - "50,55,72,5,4,4[72],20[62]", - "75,80,90,11,1,1[90],11[80]" + new String[] { + "0,1,22,5,4,1[10],10[2]", + "25,30,40,8,2,2[40],8[30]", + "50,55,72,5,4,4[72],20[62]", + "75,80,90,11,1,1[90],11[80]" }; try (Connection connection = - DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute( @@ -455,8 +457,8 @@ public class MyTest1 { // data: // https://user-images.githubusercontent.com/33376433/152003603-6b4e7494-00ff-47e4-bf6e-cab3c8600ce2.png try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { for (String sql : creationSqls) { @@ -496,14 +498,14 @@ public class MyTest1 { prepareData3_2(); String[] res = - new String[]{ - "0,1,22,5,4,1[10],10[2]", - "25,30,40,8,2,2[40],8[30]", - "50,55,72,5,4,4[72],20[62]", - "75,80,90,11,1,1[90],11[80]" + new String[] { + "0,1,22,5,4,1[10],10[2]", + "25,30,40,8,2,2[40],8[30]", + "50,55,72,5,4,4[72],20[62]", + "75,80,90,11,1,1[90],11[80]" }; try (Connection connection = - DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute( @@ -543,8 +545,8 @@ public class MyTest1 { // data: // https://user-images.githubusercontent.com/33376433/152003603-6b4e7494-00ff-47e4-bf6e-cab3c8600ce2.png try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { for (String sql : creationSqls) { @@ -584,14 +586,14 @@ public class MyTest1 { prepareData4(); String[] res = - new String[]{ - "0,1,20,5,20,5[1],30[10]", - "25,25,45,8,30,8[25],30[45]", - "50,52,54,8,18,8[52],18[54]", - "75,null,null,null,null,null,null" + new String[] { + "0,1,20,5,20,5[1],30[10]", + "25,25,45,8,30,8[25],30[45]", + "50,52,54,8,18,8[52],18[54]", + "75,null,null,null,null,null,null" }; try (Connection connection = - DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { boolean hasResultSet = statement.execute( @@ -631,8 +633,8 @@ public class MyTest1 { // data: // https://user-images.githubusercontent.com/33376433/152006061-f1d95952-3f5c-4d88-b34e-45d3bb61b600.png try (Connection connection = - DriverManager.getConnection( - Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + DriverManager.getConnection( + Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { for (String sql : creationSqls) { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java index a679adb012..2905a63900 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java @@ -86,7 +86,7 @@ public class TSFileConfig implements Serializable { * Encoder of time column, TsFile supports TS_2DIFF, PLAIN and RLE(run-length encoding) Default * value is TS_2DIFF. */ - private String timeEncoding = "PLAIN"; //"TS_2DIFF"; + private String timeEncoding = "PLAIN"; // "TS_2DIFF"; /** * Encoder of value series. default value is PLAIN. For int, long data type, TsFile also supports * TS_2DIFF, REGULAR, GORILLA and RLE(run-length encoding). For float, double data type, TsFile diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java index f19e7d14e8..4ce0d92e9b 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkMetadata.java @@ -19,10 +19,6 @@ package org.apache.iotdb.tsfile.file.metadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; -import org.apache.iotdb.tsfile.file.metadata.statistics.FloatStatistics; -import org.apache.iotdb.tsfile.file.metadata.statistics.IntegerStatistics; -import org.apache.iotdb.tsfile.file.metadata.statistics.LongStatistics; import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.controller.IChunkLoader; @@ -95,30 +91,31 @@ public class ChunkMetadata { this.statistics = statistics; } -// // deep copy -// public ChunkMetadata(ChunkMetadata chunkMetadata) { -// this.measurementUid = chunkMetadata.measurementUid; -// this.tsDataType = chunkMetadata.tsDataType; -// this.offsetOfChunkHeader = chunkMetadata.offsetOfChunkHeader; -// this.statistics = null; // this needs deep copy because we will modify it in different M4 spans -// switch (tsDataType) { -// case INT32: -// statistics = new IntegerStatistics(); -// break; -// case INT64: -// statistics = new LongStatistics(); -// break; -// case FLOAT: -// statistics = new FloatStatistics(); -// break; -// case DOUBLE: -// statistics = new DoubleStatistics(); -// break; -// default: -// break; -// } -// this.version = chunkMetadata.version; -// } + // // deep copy + // public ChunkMetadata(ChunkMetadata chunkMetadata) { + // this.measurementUid = chunkMetadata.measurementUid; + // this.tsDataType = chunkMetadata.tsDataType; + // this.offsetOfChunkHeader = chunkMetadata.offsetOfChunkHeader; + // this.statistics = null; // this needs deep copy because we will modify it in different M4 + // spans + // switch (tsDataType) { + // case INT32: + // statistics = new IntegerStatistics(); + // break; + // case INT64: + // statistics = new LongStatistics(); + // break; + // case FLOAT: + // statistics = new FloatStatistics(); + // break; + // case DOUBLE: + // statistics = new DoubleStatistics(); + // break; + // default: + // break; + // } + // this.version = chunkMetadata.version; + // } @Override public String toString() { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java index 5336e89366..0172eeb6fe 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BinaryStatistics.java @@ -47,12 +47,10 @@ public class BinaryStatistics extends Statistics<Binary> { } @Override - public void setMinInfo(MinMaxInfo minInfo) { - } + public void setMinInfo(MinMaxInfo minInfo) {} @Override - public void setMaxInfo(MinMaxInfo maxInfo) { - } + public void setMaxInfo(MinMaxInfo maxInfo) {} /** * initialize Statistics. diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java index 37782f1eda..fdcc4695a4 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/BooleanStatistics.java @@ -46,12 +46,10 @@ public class BooleanStatistics extends Statistics<Boolean> { } @Override - public void setMinInfo(MinMaxInfo minInfo) { - } + public void setMinInfo(MinMaxInfo minInfo) {} @Override - public void setMaxInfo(MinMaxInfo maxInfo) { - } + public void setMaxInfo(MinMaxInfo maxInfo) {} /** * initialize boolean Statistics. * diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java index 3ab173fe60..2bb2052ee3 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/IntegerStatistics.java @@ -18,31 +18,26 @@ */ package org.apache.iotdb.tsfile.file.metadata.statistics; +import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; -import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; -/** - * Statistics for int type. - */ +/** Statistics for int type. */ public class IntegerStatistics extends Statistics<Integer> { - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ private MinMaxInfo<Integer> minInfo; private MinMaxInfo<Integer> maxInfo; private int firstValue; private int lastValue; private long sumValue; - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ private final TSDataType minMaxDataType = TSDataType.MIN_MAX_INT32; static final int INTEGER_STATISTICS_FIXED_RAM_SIZE = 64; @@ -52,17 +47,13 @@ public class IntegerStatistics extends Statistics<Integer> { return TSDataType.INT32; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public IntegerStatistics() { minInfo = new MinMaxInfo<>(Integer.MAX_VALUE, -1); maxInfo = new MinMaxInfo<>(Integer.MIN_VALUE, -1); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public int getStatsSize() { int len = 0; @@ -76,9 +67,7 @@ public class IntegerStatistics extends Statistics<Integer> { return len; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void initializeStats( MinMaxInfo<Integer> minInfo, MinMaxInfo<Integer> maxInfo, @@ -92,9 +81,7 @@ public class IntegerStatistics extends Statistics<Integer> { this.sumValue += sum; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void initializeStats( int min, long bottomTimestamp, @@ -110,9 +97,7 @@ public class IntegerStatistics extends Statistics<Integer> { this.sumValue += sum; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ private void updateStats( int minValue, long bottomTimestamp, @@ -126,9 +111,7 @@ public class IntegerStatistics extends Statistics<Integer> { this.lastValue = lastValue; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ private void updateStats( MinMaxInfo<Integer> minInfo, MinMaxInfo<Integer> maxInfo, @@ -157,9 +140,7 @@ public class IntegerStatistics extends Statistics<Integer> { // maxValue = BytesUtils.bytesToInt(maxBytes); // } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public void updateStats(int value, long timestamp) { if (isEmpty) { @@ -180,9 +161,7 @@ public class IntegerStatistics extends Statistics<Integer> { this.maxInfo = maxInfo; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override void updateStats(int[] values, long[] timestamps, int batchSize) { for (int i = 0; i < batchSize; i++) { @@ -195,49 +174,37 @@ public class IntegerStatistics extends Statistics<Integer> { return INTEGER_STATISTICS_FIXED_RAM_SIZE; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public MinMaxInfo<Integer> getMinInfo() { return this.minInfo; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public MinMaxInfo<Integer> getMaxInfo() { return this.maxInfo; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public Integer getMinValue() { return this.minInfo.val; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public Integer getMaxValue() { return this.maxInfo.val; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public long getBottomTimestamp() { return this.minInfo.timestamp; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public long getTopTimestamp() { return this.maxInfo.timestamp; @@ -279,9 +246,7 @@ public class IntegerStatistics extends Statistics<Integer> { return sumValue; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override protected void mergeStatisticsValue(Statistics stats) { IntegerStatistics intStats = (IntegerStatistics) stats; @@ -305,9 +270,7 @@ public class IntegerStatistics extends Statistics<Integer> { } } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public void updateMinInfo(Integer val, long timestamp) { if (val < this.minInfo.val) { @@ -315,9 +278,7 @@ public class IntegerStatistics extends Statistics<Integer> { } } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public void updateMaxInfo(Integer val, long timestamp) { if (val > this.maxInfo.val) { @@ -325,9 +286,7 @@ public class IntegerStatistics extends Statistics<Integer> { } } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public int serializeStats(OutputStream outputStream) throws IOException { int byteLen = 0; @@ -339,9 +298,7 @@ public class IntegerStatistics extends Statistics<Integer> { return byteLen; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public void deserialize(InputStream inputStream) throws IOException { this.minInfo = ReadWriteIOUtils.readMinMaxInfo(inputStream, minMaxDataType); @@ -351,9 +308,7 @@ public class IntegerStatistics extends Statistics<Integer> { this.sumValue = ReadWriteIOUtils.readLong(inputStream); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public void deserialize(ByteBuffer byteBuffer) { this.minInfo = ReadWriteIOUtils.readMinMaxInfo(byteBuffer, minMaxDataType); @@ -363,9 +318,7 @@ public class IntegerStatistics extends Statistics<Integer> { this.sumValue = ReadWriteIOUtils.readLong(byteBuffer); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public String toString() { return super.toString() diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java index e68f9beabc..2ab67480fb 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/LongStatistics.java @@ -18,19 +18,18 @@ */ package org.apache.iotdb.tsfile.file.metadata.statistics; +import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; -import org.apache.iotdb.tsfile.exception.filter.StatisticsClassException; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; public class LongStatistics extends Statistics<Long> { - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ private MinMaxInfo<Long> minInfo; private MinMaxInfo<Long> maxInfo; @@ -46,17 +45,13 @@ public class LongStatistics extends Statistics<Long> { return TSDataType.INT64; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public LongStatistics() { this.minInfo = new MinMaxInfo<>(Long.MAX_VALUE, -1); this.maxInfo = new MinMaxInfo<>(Long.MIN_VALUE, -1); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public int getStatsSize() { int len = 0; @@ -70,9 +65,7 @@ public class LongStatistics extends Statistics<Long> { return len; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void initializeStats( MinMaxInfo<Long> minInfo, MinMaxInfo<Long> maxInfo, long firstValue, long last, double sum) { this.minInfo = new MinMaxInfo<>(minInfo); @@ -82,9 +75,7 @@ public class LongStatistics extends Statistics<Long> { this.sumValue += sum; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ public void initializeStats( long min, long bottomTimestamp, @@ -100,9 +91,7 @@ public class LongStatistics extends Statistics<Long> { this.sumValue += sum; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ private void updateStats( long minValue, long bottomTimestamp, @@ -126,9 +115,7 @@ public class LongStatistics extends Statistics<Long> { this.maxInfo = maxInfo; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ private void updateStats( MinMaxInfo<Long> minInfo, MinMaxInfo<Long> maxInfo, @@ -157,49 +144,37 @@ public class LongStatistics extends Statistics<Long> { // maxValue = BytesUtils.bytesToLong(maxBytes); // } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public MinMaxInfo<Long> getMinInfo() { return minInfo; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public MinMaxInfo<Long> getMaxInfo() { return maxInfo; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public Long getMinValue() { return this.minInfo.val; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public Long getMaxValue() { return this.maxInfo.val; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public long getBottomTimestamp() { return this.minInfo.timestamp; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public long getTopTimestamp() { return this.maxInfo.timestamp; @@ -241,9 +216,7 @@ public class LongStatistics extends Statistics<Long> { throw new StatisticsClassException("Long statistics does not support: long sum"); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public void updateStats(long value, long timestamp) { if (isEmpty) { @@ -254,9 +227,7 @@ public class LongStatistics extends Statistics<Long> { } } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override void updateStats(long[] values, long[] timestamps, int batchSize) { for (int i = 0; i < batchSize; i++) { @@ -264,9 +235,7 @@ public class LongStatistics extends Statistics<Long> { } } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public void updateStats(long minValue, long bottomTimestamp, long maxValue, long topTimestamp) { updateMinInfo(minValue, bottomTimestamp); @@ -278,9 +247,7 @@ public class LongStatistics extends Statistics<Long> { return LONG_STATISTICS_FIXED_RAM_SIZE; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override protected void mergeStatisticsValue(Statistics stats) { LongStatistics longStats = (LongStatistics) stats; @@ -304,9 +271,7 @@ public class LongStatistics extends Statistics<Long> { } } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public void updateMinInfo(Long val, long timestamp) { if (val < this.minInfo.val) { @@ -314,9 +279,7 @@ public class LongStatistics extends Statistics<Long> { } } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public void updateMaxInfo(Long val, long timestamp) { if (val > this.maxInfo.val) { @@ -324,9 +287,7 @@ public class LongStatistics extends Statistics<Long> { } } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public int serializeStats(OutputStream outputStream) throws IOException { int byteLen = 0; @@ -338,9 +299,7 @@ public class LongStatistics extends Statistics<Long> { return byteLen; } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public void deserialize(InputStream inputStream) throws IOException { this.minInfo = ReadWriteIOUtils.readMinMaxInfo(inputStream, minMaxDataType); @@ -350,9 +309,7 @@ public class LongStatistics extends Statistics<Long> { this.sumValue = ReadWriteIOUtils.readDouble(inputStream); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public void deserialize(ByteBuffer byteBuffer) { this.minInfo = ReadWriteIOUtils.readMinMaxInfo(byteBuffer, minMaxDataType); @@ -362,9 +319,7 @@ public class LongStatistics extends Statistics<Long> { this.sumValue = ReadWriteIOUtils.readDouble(byteBuffer); } - /** - * @author Yuyuan Kang - */ + /** @author Yuyuan Kang */ @Override public String toString() { return super.toString() diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java index 2a19854f1c..e70f3afdab 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/Statistics.java @@ -505,7 +505,9 @@ public abstract class Statistics<T> { } segmentKeys.add(this.endTime); this.stepRegress.setSegmentKeys(segmentKeys); - this.stepRegress.inferInterceptsFromSegmentKeys(); // don't forget this, execute once and only once when deserializing + this.stepRegress + .inferInterceptsFromSegmentKeys(); // don't forget this, execute once and only once when + // deserializing } public long getStartTime() { @@ -516,9 +518,10 @@ public abstract class Statistics<T> { return endTime; } - public StepRegress getStepRegress(){ + public StepRegress getStepRegress() { return stepRegress; } + public int getCount() { return count; } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java index 55cd2d0c06..c7dc0cb50b 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/statistics/StepRegress.java @@ -18,12 +18,13 @@ */ package org.apache.iotdb.tsfile.file.metadata.statistics; -import java.io.IOException; -import java.util.Arrays; import org.eclipse.collections.impl.list.mutable.primitive.DoubleArrayList; import org.eclipse.collections.impl.list.mutable.primitive.IntArrayList; import org.eclipse.collections.impl.list.mutable.primitive.LongArrayList; +import java.io.IOException; +import java.util.Arrays; + public class StepRegress { private double slope; @@ -127,9 +128,9 @@ public class StepRegress { long nextDelta = intervals.get(i + 1); if (isBigInterval(nextDelta) && (nextPos + 1 - < slope * timestamps.get(i + 2) - + segmentIntercepts.get( - tiltLatestSegmentID))) { // when next interval is also level + < slope * timestamps.get(i + 2) + + segmentIntercepts.get( + tiltLatestSegmentID))) { // when next interval is also level isLevel = true; // then fix type from tilt to level, LTL=>LLL } } @@ -376,7 +377,7 @@ public class StepRegress { /** * @param t input timestamp * @return output the value of the step regression function f(t), which is the estimated position - * in the chunk. Pay attention that f(t) starts from (startTime,1), ends at (endTime,count). + * in the chunk. Pay attention that f(t) starts from (startTime,1), ends at (endTime,count). */ public double infer(double t) throws IOException { if (t < segmentKeys.get(0) || t > segmentKeys.getLast()) { diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java index 23fcae2807..8143013834 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java @@ -19,7 +19,6 @@ package org.apache.iotdb.tsfile.read.common; -import java.io.IOException; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; @@ -31,6 +30,8 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.file.metadata.statistics.StepRegress; import org.apache.iotdb.tsfile.read.reader.page.PageReader; +import java.io.IOException; + public class ChunkSuit4CPV { private ChunkMetadata chunkMetadata; // fixed info, including version, dataType, stepRegress @@ -41,29 +42,29 @@ public class ChunkSuit4CPV { public int startPos = -1; // the first point position, starting from 0 public int endPos = -1; // the last point position, starting from 0 -// public long startTime; // statistics in chunkMetadata is not deepCopied, so store update here -// -// public long endTime; - -// public int firstValueInt; -// public long firstValueLong; -// public float firstValueFloat; -// public double firstValueDouble; -// -// public int lastValueInt; -// public long lastValueLong; -// public float lastValueFloat; -// public double lastValueDouble; -// -// public MinMaxInfo<Integer> minInfoInt; -// public MinMaxInfo<Long> minInfoLong; -// public MinMaxInfo<Float> minInfoFloat; -// public MinMaxInfo<Double> minInfoDouble; -// -// public MinMaxInfo<Integer> maxInfoInt; -// public MinMaxInfo<Long> maxInfoLong; -// public MinMaxInfo<Float> maxInfoFloat; -// public MinMaxInfo<Double> maxInfoDouble; + // public long startTime; // statistics in chunkMetadata is not deepCopied, so store update here + // + // public long endTime; + + // public int firstValueInt; + // public long firstValueLong; + // public float firstValueFloat; + // public double firstValueDouble; + // + // public int lastValueInt; + // public long lastValueLong; + // public float lastValueFloat; + // public double lastValueDouble; + // + // public MinMaxInfo<Integer> minInfoInt; + // public MinMaxInfo<Long> minInfoLong; + // public MinMaxInfo<Float> minInfoFloat; + // public MinMaxInfo<Double> minInfoDouble; + // + // public MinMaxInfo<Integer> maxInfoInt; + // public MinMaxInfo<Long> maxInfoLong; + // public MinMaxInfo<Float> maxInfoFloat; + // public MinMaxInfo<Double> maxInfoDouble; private BatchData batchData; // deprecated @@ -72,7 +73,7 @@ public class ChunkSuit4CPV { // after chunkMetadata executes insertIntoSortedDeletions // private List<Long> mergeVersionList = new ArrayList<>(); -// private List<Long> mergeOffsetList = new ArrayList<>(); + // private List<Long> mergeOffsetList = new ArrayList<>(); private boolean isLazyLoad = false; public ChunkSuit4CPV(ChunkMetadata chunkMetadata) { @@ -91,12 +92,12 @@ public class ChunkSuit4CPV { this.endPos = chunkMetadata.getStatistics().getCount() - 1; } -// public ChunkSuit4CPV(ChunkMetadata chunkMetadata, BatchData batchData) { -// this.chunkMetadata = chunkMetadata; -// this.batchData = batchData; -// // deep copy initialize -// deepCopyInitialize(chunkMetadata.getStatistics(), chunkMetadata.getDataType()); -// } + // public ChunkSuit4CPV(ChunkMetadata chunkMetadata, BatchData batchData) { + // this.chunkMetadata = chunkMetadata; + // this.batchData = batchData; + // // deep copy initialize + // deepCopyInitialize(chunkMetadata.getStatistics(), chunkMetadata.getDataType()); + // } public ChunkSuit4CPV(ChunkMetadata chunkMetadata, PageReader pageReader, boolean deepCopy) { this.chunkMetadata = chunkMetadata; @@ -120,51 +121,51 @@ public class ChunkSuit4CPV { switch (type) { case INT32: statistics = new IntegerStatistics(); - ((IntegerStatistics) statistics).initializeStats( - (int) source.getMinInfo().val, - source.getMinInfo().timestamp, - (int) source.getMaxInfo().val, - source.getMaxInfo().timestamp, - (int) source.getFirstValue(), - (int) source.getLastValue(), - source.getSumLongValue() - ); + ((IntegerStatistics) statistics) + .initializeStats( + (int) source.getMinInfo().val, + source.getMinInfo().timestamp, + (int) source.getMaxInfo().val, + source.getMaxInfo().timestamp, + (int) source.getFirstValue(), + (int) source.getLastValue(), + source.getSumLongValue()); break; case INT64: statistics = new LongStatistics(); - ((LongStatistics) statistics).initializeStats( - (long) source.getMinInfo().val, - source.getMinInfo().timestamp, - (long) source.getMaxInfo().val, - source.getMaxInfo().timestamp, - (long) source.getFirstValue(), - (long) source.getLastValue(), - source.getSumDoubleValue() - ); + ((LongStatistics) statistics) + .initializeStats( + (long) source.getMinInfo().val, + source.getMinInfo().timestamp, + (long) source.getMaxInfo().val, + source.getMaxInfo().timestamp, + (long) source.getFirstValue(), + (long) source.getLastValue(), + source.getSumDoubleValue()); break; case FLOAT: statistics = new FloatStatistics(); - ((FloatStatistics) statistics).initializeStats( - (float) source.getMinInfo().val, - source.getMinInfo().timestamp, - (float) source.getMaxInfo().val, - source.getMaxInfo().timestamp, - (float) source.getFirstValue(), - (float) source.getLastValue(), - source.getSumDoubleValue() - ); + ((FloatStatistics) statistics) + .initializeStats( + (float) source.getMinInfo().val, + source.getMinInfo().timestamp, + (float) source.getMaxInfo().val, + source.getMaxInfo().timestamp, + (float) source.getFirstValue(), + (float) source.getLastValue(), + source.getSumDoubleValue()); break; case DOUBLE: statistics = new DoubleStatistics(); - ((DoubleStatistics) statistics).initializeStats( - (double) source.getMinInfo().val, - source.getMinInfo().timestamp, - (double) source.getMaxInfo().val, - source.getMaxInfo().timestamp, - (double) source.getFirstValue(), - (double) source.getLastValue(), - source.getSumDoubleValue() - ); + ((DoubleStatistics) statistics) + .initializeStats( + (double) source.getMinInfo().val, + source.getMinInfo().timestamp, + (double) source.getMaxInfo().val, + source.getMaxInfo().timestamp, + (double) source.getFirstValue(), + (double) source.getLastValue(), + source.getSumDoubleValue()); break; default: break; @@ -206,21 +207,21 @@ public class ChunkSuit4CPV { this.chunkMetadata = chunkMetadata; } -// public void addMergeVersionList(long version) { -// this.mergeVersionList.add(version); -// } -// -// public void addMergeOffsetList(long offset) { -// this.mergeOffsetList.add(offset); -// } -// -// public List<Long> getMergeVersionList() { -// return mergeVersionList; -// } -// -// public List<Long> getMergeOffsetList() { -// return mergeOffsetList; -// } + // public void addMergeVersionList(long version) { + // this.mergeVersionList.add(version); + // } + // + // public void addMergeOffsetList(long offset) { + // this.mergeOffsetList.add(offset); + // } + // + // public List<Long> getMergeVersionList() { + // return mergeVersionList; + // } + // + // public List<Long> getMergeOffsetList() { + // return mergeOffsetList; + // } public long getVersion() { return this.getChunkMetadata().getVersion(); @@ -230,7 +231,6 @@ public class ChunkSuit4CPV { return this.getChunkMetadata().getOffsetOfChunkHeader(); } - /** * Find the point with the closet timestamp equal to or larger than the given timestamp in the * chunk. @@ -244,7 +244,8 @@ public class ChunkSuit4CPV { // TODO debug buffer.get(index) int estimatedPos = (int) Math.round(stepRegress.infer(targetTimestamp)) - 1; - // search from estimatePos in the timeBuffer to find the closet timestamp equal to or larger than the given timestamp + // search from estimatePos in the timeBuffer to find the closet timestamp equal to or larger + // than the given timestamp if (pageReader.timeBuffer.getLong(estimatedPos * 8) < targetTimestamp) { while (pageReader.timeBuffer.getLong(estimatedPos * 8) < targetTimestamp) { estimatedPos++; @@ -259,29 +260,30 @@ public class ChunkSuit4CPV { } // else equal this.startPos = estimatedPos; // note this - // since we have constrained that targetTimestamp must be within the chunk time range [startTime, endTime], + // since we have constrained that targetTimestamp must be within the chunk time range + // [startTime, endTime], // we can definitely find such a point with the closet timestamp equal to or larger than the // given timestamp in the chunk. long timestamp = pageReader.timeBuffer.getLong(estimatedPos * 8); statistics.setStartTime(timestamp); switch (chunkMetadata.getDataType()) { - // iotdb的int类型的plain编码用的是自制的不支持random access -// case INT32: -// return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), -// pageReader.timeBuffer.getLong(estimatedPos * 8)); + // iotdb的int类型的plain编码用的是自制的不支持random access + // case INT32: + // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), + // pageReader.timeBuffer.getLong(estimatedPos * 8)); case INT64: - long longVal = pageReader.valueBuffer.getLong( - pageReader.timeBufferLength + estimatedPos * 8); + long longVal = + pageReader.valueBuffer.getLong(pageReader.timeBufferLength + estimatedPos * 8); ((LongStatistics) statistics).setFirstValue(longVal); break; case FLOAT: - float floatVal = pageReader.valueBuffer.getFloat( - pageReader.timeBufferLength + estimatedPos * 4); + float floatVal = + pageReader.valueBuffer.getFloat(pageReader.timeBufferLength + estimatedPos * 4); ((FloatStatistics) statistics).setFirstValue(floatVal); break; case DOUBLE: - double doubleVal = pageReader.valueBuffer.getDouble( - pageReader.timeBufferLength + estimatedPos * 8); + double doubleVal = + pageReader.valueBuffer.getDouble(pageReader.timeBufferLength + estimatedPos * 8); ((DoubleStatistics) statistics).setFirstValue(doubleVal); break; default: @@ -303,7 +305,8 @@ public class ChunkSuit4CPV { // infer position starts from 1, so minus 1 here int estimatedPos = (int) Math.round(stepRegress.infer(targetTimestamp)) - 1; - // search from estimatePos in the timeBuffer to find the closet timestamp equal to or smaller than the given timestamp + // search from estimatePos in the timeBuffer to find the closet timestamp equal to or smaller + // than the given timestamp if (pageReader.timeBuffer.getLong(estimatedPos * 8) > targetTimestamp) { while (pageReader.timeBuffer.getLong(estimatedPos * 8) > targetTimestamp) { estimatedPos--; @@ -318,29 +321,30 @@ public class ChunkSuit4CPV { } // else equal this.endPos = estimatedPos; // note this - // since we have constrained that targetTimestamp must be within the chunk time range [startTime, endTime], + // since we have constrained that targetTimestamp must be within the chunk time range + // [startTime, endTime], // we can definitely find such a point with the closet timestamp equal to or smaller than the // given timestamp in the chunk. long timestamp = pageReader.timeBuffer.getLong(estimatedPos * 8); statistics.setEndTime(timestamp); switch (chunkMetadata.getDataType()) { - // iotdb的int类型的plain编码用的是自制的不支持random access -// case INT32: -// return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), -// pageReader.timeBuffer.getLong(estimatedPos * 8)); + // iotdb的int类型的plain编码用的是自制的不支持random access + // case INT32: + // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), + // pageReader.timeBuffer.getLong(estimatedPos * 8)); case INT64: - long longVal = pageReader.valueBuffer.getLong( - pageReader.timeBufferLength + estimatedPos * 8); + long longVal = + pageReader.valueBuffer.getLong(pageReader.timeBufferLength + estimatedPos * 8); ((LongStatistics) statistics).setLastValue(longVal); break; case FLOAT: - float floatVal = pageReader.valueBuffer.getFloat( - pageReader.timeBufferLength + estimatedPos * 4); + float floatVal = + pageReader.valueBuffer.getFloat(pageReader.timeBufferLength + estimatedPos * 4); ((FloatStatistics) statistics).setLastValue(floatVal); break; case DOUBLE: - double doubleVal = pageReader.valueBuffer.getDouble( - pageReader.timeBufferLength + estimatedPos * 8); + double doubleVal = + pageReader.valueBuffer.getDouble(pageReader.timeBufferLength + estimatedPos * 8); ((DoubleStatistics) statistics).setLastValue(doubleVal); break; default: @@ -361,7 +365,8 @@ public class ChunkSuit4CPV { // TODO debug buffer.get(index) int estimatedPos = (int) Math.round(stepRegress.infer(targetTimestamp)) - 1; - // search from estimatePos in the timeBuffer to find the closet timestamp equal to or smaller than the given timestamp + // search from estimatePos in the timeBuffer to find the closet timestamp equal to or smaller + // than the given timestamp if (pageReader.timeBuffer.getLong(estimatedPos * 8) > targetTimestamp) { while (pageReader.timeBuffer.getLong(estimatedPos * 8) > targetTimestamp) { estimatedPos--; @@ -375,7 +380,8 @@ public class ChunkSuit4CPV { } // else equal } // else equal - // since we have constrained that targetTimestamp must be within the chunk time range [startTime, endTime], + // since we have constrained that targetTimestamp must be within the chunk time range + // [startTime, endTime], // estimatedPos will not be out of range. return pageReader.timeBuffer.get(estimatedPos) == targetTimestamp; } @@ -471,5 +477,4 @@ public class ChunkSuit4CPV { break; } } - } diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java index 3e143efce9..cd67765f45 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/page/PageReader.java @@ -18,11 +18,6 @@ */ package org.apache.iotdb.tsfile.read.reader.page; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import org.apache.iotdb.tsfile.encoding.decoder.Decoder; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.header.PageHeader; @@ -42,50 +37,55 @@ import org.apache.iotdb.tsfile.read.reader.IPageReader; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + public class PageReader implements IPageReader { private PageHeader pageHeader; protected TSDataType dataType; - /** - * decoder for value column - */ + /** decoder for value column */ protected Decoder valueDecoder; - /** - * decoder for time column - */ + /** decoder for time column */ protected Decoder timeDecoder; - /** - * time column in memory - */ + /** time column in memory */ public ByteBuffer timeBuffer; - /** - * value column in memory - */ + /** value column in memory */ public ByteBuffer valueBuffer; public int timeBufferLength; protected Filter filter; - /** - * A list of deleted intervals. - */ + /** A list of deleted intervals. */ private List<TimeRange> deleteIntervalList; private int deleteCursor = 0; - public PageReader(ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder, - Decoder timeDecoder, Filter filter) { + public PageReader( + ByteBuffer pageData, + TSDataType dataType, + Decoder valueDecoder, + Decoder timeDecoder, + Filter filter) { this(null, pageData, dataType, valueDecoder, timeDecoder, filter); } - public PageReader(PageHeader pageHeader, ByteBuffer pageData, TSDataType dataType, - Decoder valueDecoder, Decoder timeDecoder, Filter filter) { + public PageReader( + PageHeader pageHeader, + ByteBuffer pageData, + TSDataType dataType, + Decoder valueDecoder, + Decoder timeDecoder, + Filter filter) { this.dataType = dataType; this.valueDecoder = valueDecoder; this.timeDecoder = timeDecoder; @@ -109,17 +109,26 @@ public class PageReader implements IPageReader { valueBuffer.position(timeBufferLength); } - /** - * the chunk partially overlaps in time with the current M4 interval Ii - */ - public void split4CPV(long startTime, long endTime, long interval, long curStartTime, - List<ChunkSuit4CPV> currentChunkList, Map<Integer, List<ChunkSuit4CPV>> splitChunkList, + /** the chunk partially overlaps in time with the current M4 interval Ii */ + public void split4CPV( + long startTime, + long endTime, + long interval, + long curStartTime, + List<ChunkSuit4CPV> currentChunkList, + Map<Integer, List<ChunkSuit4CPV>> splitChunkList, ChunkMetadata chunkMetadata) throws IOException { // note: [startTime,endTime), [curStartTime,curEndTime) -// int curIdx = (int) Math.floor((curStartTime - startTime) * 1.0 / interval); // global index - int numberOfSpans = (int) Math.floor( - (Math.min(chunkMetadata.getEndTime(), endTime - 1) // endTime is excluded so -1 - - curStartTime) * 1.0 / interval) + 1; + // int curIdx = (int) Math.floor((curStartTime - startTime) * 1.0 / interval); // global + // index + int numberOfSpans = + (int) + Math.floor( + (Math.min(chunkMetadata.getEndTime(), endTime - 1) // endTime is excluded so -1 + - curStartTime) + * 1.0 + / interval) + + 1; for (int n = 0; n < numberOfSpans; n++) { long leftEndIncluded = curStartTime + n * interval; long rightEndExcluded = curStartTime + (n + 1) * interval; @@ -141,8 +150,12 @@ public class PageReader implements IPageReader { if (n == 0) { currentChunkList.add(chunkSuit4CPV); } else { - int idx = (int) Math.floor((chunkSuit4CPV.statistics.getStartTime() - startTime) * 1.0 - / interval); // global index TODO debug this + int idx = + (int) + Math.floor( + (chunkSuit4CPV.statistics.getStartTime() - startTime) + * 1.0 + / interval); // global index TODO debug this splitChunkList.computeIfAbsent(idx, k -> new ArrayList<>()); splitChunkList.get(idx).add(chunkSuit4CPV); } @@ -151,7 +164,7 @@ public class PageReader implements IPageReader { } public void updateBPTP(ChunkSuit4CPV chunkSuit4CPV) { - deleteCursor = 0;//TODO DEBUG + deleteCursor = 0; // TODO DEBUG Statistics statistics = null; switch (dataType) { case INT64: @@ -175,7 +188,7 @@ public class PageReader implements IPageReader { long aLong = valueBuffer.getLong(timeBufferLength + pos * 8); if (!isDeleted(timestamp) && (filter == null || filter.satisfy(timestamp, aLong))) { // update statistics of chunkMetadata1 - statistics.updateStats(aLong, timestamp); //TODO DEBUG + statistics.updateStats(aLong, timestamp); // TODO DEBUG count++; // ATTENTION: do not use update() interface which will also update StepRegress! // only updateStats, actually only need to update BP and TP @@ -231,9 +244,7 @@ public class PageReader implements IPageReader { return false; } - /** - * @return the returned BatchData may be empty, but never be null - */ + /** @return the returned BatchData may be empty, but never be null */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning @Override public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException {
