This is an automated email from the ASF dual-hosted git repository. leirui pushed a commit to branch research/M4-visualization in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e51c3e1f554f0b685e37b808b7b06bd4da1d6e76 Author: Lei Rui <[email protected]> AuthorDate: Sat Jan 28 12:58:03 2023 +0800 add check --- .../dataset/groupby/LocalGroupByExecutor4CPV.java | 91 +++++++++++----------- .../org/apache/iotdb/db/utils/FileLoaderUtils.java | 48 +++++++++++- .../iotdb/tsfile/read/common/ChunkSuit4CPV.java | 19 ++--- 3 files changed, 101 insertions(+), 57 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java index 6d96897bfe..1dce4eb0f3 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java +++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/LocalGroupByExecutor4CPV.java @@ -35,7 +35,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.ChunkSuit4CPV; import org.apache.iotdb.tsfile.read.common.TimeRange; import org.apache.iotdb.tsfile.read.filter.basic.Filter; -import org.apache.iotdb.tsfile.read.reader.IPageReader; import org.apache.iotdb.tsfile.read.reader.page.PageReader; import org.apache.iotdb.tsfile.utils.Pair; @@ -169,30 +168,23 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { itr.remove(); // B: loads chunk data from disk to memory // C: decompress page data, split time&value buffers - List<IPageReader> pageReaderList = - FileLoaderUtils.loadPageReaderList(chunkSuit4CPV.getChunkMetadata(), this.timeFilter); - // if (pageReaderList.size() > 1) { - // throw new IOException("Against the assumption that there is only one page in a - // chunk!"); - // } - // for (IPageReader pageReader : pageReaderList) { - // assume only one page in a chunk - // assume all data on disk, no data in memory + PageReader pageReader = + FileLoaderUtils.loadPageReaderList4CPV( + chunkSuit4CPV.getChunkMetadata(), this.timeFilter); // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE - // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN + // DIRECTLY), // WHICH WILL INTRODUCE BUGS! - ((PageReader) pageReaderList.get(0)) - .split4CPV( - startTime, - endTime, - interval, - curStartTime, - currentChunkList, - splitChunkList, - chunkMetadata); - // } + pageReader.split4CPV( + startTime, + endTime, + interval, + curStartTime, + currentChunkList, + splitChunkList, + chunkMetadata); } } } @@ -280,15 +272,16 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) { // TODO 注意delete intervals的传递 if (chunkSuit4CPV.getPageReader() == null) { - List<IPageReader> pageReaderList = - FileLoaderUtils.loadPageReaderList( + PageReader pageReader = + FileLoaderUtils.loadPageReaderList4CPV( chunkSuit4CPV.getChunkMetadata(), this.timeFilter); // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE - // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN + // DIRECTLY), // WHICH WILL INTRODUCE BUGS! - chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0)); + chunkSuit4CPV.setPageReader(pageReader); } else { // TODO 注意delete intervals的传递:主要是被重写点作为点删除传递 // pageReader does not refer to the same deleteInterval as those in chunkMetadata @@ -321,8 +314,7 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // getCurrentChunkListFromFutureChunkList if (candidateTimestamp < curStartTime || candidateTimestamp >= curStartTime + interval) { isDeletedItself = true; - } - else { + } else { isDeletedItself = PageReader.isDeleted( candidateTimestamp, candidate.getChunkMetadata().getDeleteIntervalList()); @@ -373,15 +365,16 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // scan这个chunk的数据 // TODO chunk data read operation (a): check existence of data point at a timestamp if (chunkSuit4CPV.getPageReader() == null) { - List<IPageReader> pageReaderList = - FileLoaderUtils.loadPageReaderList( + PageReader pageReader = + FileLoaderUtils.loadPageReaderList4CPV( chunkSuit4CPV.getChunkMetadata(), this.timeFilter); // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE - // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS + // ASSIGN DIRECTLY), // WHICH WILL INTRODUCE BUGS! - chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0)); + chunkSuit4CPV.setPageReader(pageReader); } isUpdate = chunkSuit4CPV.checkIfExist(candidateTimestamp); if (isUpdate) { // 提前结束对overlaps块的scan,因为已经找到一个update点证明candidate失效 @@ -480,15 +473,16 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { for (ChunkSuit4CPV chunkSuit4CPV : candidateSet) { // TODO 注意delete intervals的传递 if (chunkSuit4CPV.getPageReader() == null) { - List<IPageReader> pageReaderList = - FileLoaderUtils.loadPageReaderList( + PageReader pageReader = + FileLoaderUtils.loadPageReaderList4CPV( chunkSuit4CPV.getChunkMetadata(), this.timeFilter); // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE - // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN + // DIRECTLY), // WHICH WILL INTRODUCE BUGS! - chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0)); + chunkSuit4CPV.setPageReader(pageReader); } else { // TODO 注意delete intervals的传递:主要是被重写点作为点删除传递 // pageReader does not refer to the same deleteInterval as those in chunkMetadata @@ -577,15 +571,16 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { // scan这个chunk的数据 // TODO chunk data read operation (a): check existence of data point at a timestamp if (chunkSuit4CPV.getPageReader() == null) { - List<IPageReader> pageReaderList = - FileLoaderUtils.loadPageReaderList( + PageReader pageReader = + FileLoaderUtils.loadPageReaderList4CPV( chunkSuit4CPV.getChunkMetadata(), this.timeFilter); // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE - // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS + // ASSIGN DIRECTLY), // WHICH WILL INTRODUCE BUGS! - chunkSuit4CPV.setPageReader((PageReader) pageReaderList.get(0)); + chunkSuit4CPV.setPageReader(pageReader); } isUpdate = chunkSuit4CPV.checkIfExist(candidateTimestamp); if (isUpdate) { // 提前结束对overlaps块的scan,因为已经找到一个update点证明candidate失效 @@ -663,15 +658,16 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { if (susp_candidate.isLazyLoad()) { // 如果是lazy // load,则此时load、应用deletes、更新batchData和statistics,取消lazyLoad标记,然后回到循环1 if (susp_candidate.getPageReader() == null) { - List<IPageReader> pageReaderList = - FileLoaderUtils.loadPageReaderList( + PageReader pageReader = + FileLoaderUtils.loadPageReaderList4CPV( susp_candidate.getChunkMetadata(), this.timeFilter); // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE - // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN + // DIRECTLY), // WHICH WILL INTRODUCE BUGS! - susp_candidate.setPageReader((PageReader) pageReaderList.get(0)); + susp_candidate.setPageReader(pageReader); } // TODO update FP equal to or after statistics.getEndTime susp_candidate.updateFPwithTheClosetPointEqualOrAfter( @@ -770,15 +766,16 @@ public class LocalGroupByExecutor4CPV implements GroupByExecutor { if (susp_candidate.isLazyLoad()) { // 如果是lazy // load,则此时load、应用deletes、更新batchData和statistics,取消lazyLoad标记,然后回到循环1 if (susp_candidate.getPageReader() == null) { - List<IPageReader> pageReaderList = - FileLoaderUtils.loadPageReaderList( + PageReader pageReader = + FileLoaderUtils.loadPageReaderList4CPV( susp_candidate.getChunkMetadata(), this.timeFilter); // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE - // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN DIRECTLY), + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN + // DIRECTLY), // WHICH WILL INTRODUCE BUGS! - susp_candidate.setPageReader((PageReader) pageReaderList.get(0)); + susp_candidate.setPageReader(pageReader); } // TODO update FP equal to or after statistics.getEndTime susp_candidate.updateLPwithTheClosetPointEqualOrBefore( diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java index 52c7f8ff9c..7d2b9dc5cd 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java @@ -40,6 +40,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.read.reader.IChunkReader; import org.apache.iotdb.tsfile.read.reader.IPageReader; import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader; +import org.apache.iotdb.tsfile.read.reader.page.PageReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,11 +87,11 @@ public class FileLoaderUtils { } /** - * @author Yuyuan Kang * @param resource TsFile * @param seriesPath Timeseries path * @param allSensors measurements queried at the same time of this device * @param filter any filter, only used to check time range + * @author Yuyuan Kang */ public static TimeseriesMetadata loadTimeSeriesMetadata( TsFileResource resource, @@ -191,6 +192,51 @@ public class FileLoaderUtils { } } + /** + * load all page readers in one chunk that satisfying the timeFilter + * + * @param chunkMetaData the corresponding chunk metadata + * @param timeFilter it should be a TimeFilter instead of a ValueFilter + */ + public static PageReader loadPageReaderList4CPV(ChunkMetadata chunkMetaData, Filter timeFilter) + throws IOException { + long start = System.nanoTime(); + if (chunkMetaData == null) { + throw new IOException("Can't init null chunkMeta"); + } + try { + IChunkReader chunkReader; + IChunkLoader chunkLoader = chunkMetaData.getChunkLoader(); + if (chunkLoader instanceof MemChunkLoader) { + MemChunkLoader memChunkLoader = (MemChunkLoader) chunkLoader; + chunkReader = new MemChunkReader(memChunkLoader.getChunk(), timeFilter); + } else { + Chunk chunk = chunkLoader.loadChunk(chunkMetaData); // loads chunk data from disk to memory + chunk.setFromOldFile(chunkMetaData.isFromOldTsFile()); + chunkReader = + new ChunkReader(chunk, timeFilter); // decompress page data, split time&value buffers + chunkReader.hasNextSatisfiedPage(); + } + long duration = System.nanoTime() - start; + IOMonitor.incDataIOTime(duration); + List<IPageReader> pageReaderList = chunkReader.loadPageReaderList(); + if (pageReaderList.size() > 1) { + // TODO ATTENTION: YOU HAVE TO ENSURE THAT THERE IS ONLY ONE PAGE IN A CHUNK, + // BECAUSE THE WHOLE IMPLEMENTATION IS BASED ON THIS ASSUMPTION. + // OTHERWISE, PAGEREADER IS FOR THE FIRST PAGE IN THE CHUNK WHILE + // STEPREGRESS IS FOR THE LAST PAGE IN THE CHUNK (THE MERGE OF STEPREGRESS IS ASSIGN + // DIRECTLY), + // WHICH WILL INTRODUCE BUGS! + throw new IOException("Wrong: more than one page in a chunk!"); + } + return (PageReader) pageReaderList.get(0); + } catch (IOException e) { + logger.error( + "Something wrong happened while reading chunk from " + chunkMetaData.getFilePath()); + throw e; + } + } + public static List<ChunkMetadata> getChunkMetadataList(Path path, String filePath) throws IOException { TsFileSequenceReader tsFileReader = FileReaderManager.getInstance().get(filePath, true); diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java index cb4a691325..55116a7619 100644 --- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java +++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/common/ChunkSuit4CPV.java @@ -19,7 +19,6 @@ package org.apache.iotdb.tsfile.read.common; -import java.io.IOException; import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.statistics.DoubleStatistics; @@ -31,6 +30,8 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics; import org.apache.iotdb.tsfile.file.metadata.statistics.StepRegress; import org.apache.iotdb.tsfile.read.reader.page.PageReader; +import java.io.IOException; + public class ChunkSuit4CPV { private ChunkMetadata chunkMetadata; // fixed info, including version, dataType, stepRegress @@ -271,10 +272,10 @@ public class ChunkSuit4CPV { long timestamp = pageReader.timeBuffer.getLong(estimatedPos * 8); statistics.setStartTime(timestamp); switch (chunkMetadata.getDataType()) { - // iotdb的int类型的plain编码用的是自制的不支持random access - // case INT32: - // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), - // pageReader.timeBuffer.getLong(estimatedPos * 8)); + // iotdb的int类型的plain编码用的是自制的不支持random access + // case INT32: + // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), + // pageReader.timeBuffer.getLong(estimatedPos * 8)); case INT64: long longVal = pageReader.valueBuffer.getLong(pageReader.timeBufferLength + estimatedPos * 8); @@ -332,10 +333,10 @@ public class ChunkSuit4CPV { long timestamp = pageReader.timeBuffer.getLong(estimatedPos * 8); statistics.setEndTime(timestamp); switch (chunkMetadata.getDataType()) { - // iotdb的int类型的plain编码用的是自制的不支持random access - // case INT32: - // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), - // pageReader.timeBuffer.getLong(estimatedPos * 8)); + // iotdb的int类型的plain编码用的是自制的不支持random access + // case INT32: + // return new MinMaxInfo(pageReader.valueBuffer.getInt(estimatedPos * 4), + // pageReader.timeBuffer.getLong(estimatedPos * 8)); case INT64: long longVal = pageReader.valueBuffer.getLong(pageReader.timeBufferLength + estimatedPos * 8);
