This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch iotdb in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit c151b5e57a8f9accf071afb6cf45289da4b3e2f9 Author: Liao Lanyu <[email protected]> AuthorDate: Mon Jun 17 16:28:38 2024 +0800 Introducing Lazy-decoding of page data in PageReader --- .../read/reader/chunk/AlignedChunkReader.java | 25 ++- .../tsfile/read/reader/chunk/ChunkReader.java | 22 ++- .../tsfile/read/reader/page/AlignedPageReader.java | 43 ++++- .../tsfile/read/reader/page/LazyLoadPageData.java | 64 +++++++ .../apache/tsfile/read/reader/page/PageReader.java | 29 +++ .../tsfile/read/reader/page/ValuePageReader.java | 50 ++++- .../read/reader/AlignedPageReaderPushDownTest.java | 201 +++++++++++++++++++++ 7 files changed, 403 insertions(+), 31 deletions(-) diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java index fe2543df..c967a298 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AlignedChunkReader.java @@ -19,6 +19,7 @@ package org.apache.tsfile.read.reader.chunk; +import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.encoding.decoder.Decoder; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.MetaMarker; @@ -29,6 +30,7 @@ import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.read.reader.page.AlignedPageReader; +import org.apache.tsfile.read.reader.page.LazyLoadPageData; import java.io.IOException; import java.io.Serializable; @@ -189,7 +191,7 @@ public class AlignedChunkReader extends AbstractChunkReader { ChunkReader.deserializePageData(timePageHeader, timeChunkDataBuffer, timeChunkHeader); List<PageHeader> valuePageHeaderList = new ArrayList<>(); - List<ByteBuffer> valuePageDataList = new ArrayList<>(); + LazyLoadPageData[] lazyLoadPageDataArray = new LazyLoadPageData[rawValuePageHeaderList.size()]; List<TSDataType> valueDataTypeList = new ArrayList<>(); List<Decoder> valueDecoderList = new ArrayList<>(); @@ -200,7 +202,7 @@ public class AlignedChunkReader extends AbstractChunkReader { if (valuePageHeader == null || valuePageHeader.getUncompressedSize() == 0) { // Empty Page valuePageHeaderList.add(null); - valuePageDataList.add(null); + lazyLoadPageDataArray[i] = null; valueDataTypeList.add(null); valueDecoderList.add(null); } else if (pageDeleted(valuePageHeader, valueDeleteIntervalsList.get(i))) { @@ -209,15 +211,24 @@ public class AlignedChunkReader extends AbstractChunkReader { .position( valueChunkDataBufferList.get(i).position() + valuePageHeader.getCompressedSize()); valuePageHeaderList.add(null); - valuePageDataList.add(null); + lazyLoadPageDataArray[i] = null; valueDataTypeList.add(null); valueDecoderList.add(null); } else { ChunkHeader valueChunkHeader = valueChunkHeaderList.get(i); + int currentPagePosition = valueChunkDataBufferList.get(i).position(); + // adjust position as if we have read the page data even if it is just lazy-loaded + valueChunkDataBufferList + .get(i) + .position( + valueChunkDataBufferList.get(i).position() + valuePageHeader.getCompressedSize()); + valuePageHeaderList.add(valuePageHeader); - valuePageDataList.add( - ChunkReader.deserializePageData( - valuePageHeader, valueChunkDataBufferList.get(i), valueChunkHeader)); + lazyLoadPageDataArray[i] = + new LazyLoadPageData( + valueChunkDataBufferList.get(i).array(), + currentPagePosition, + IUnCompressor.getUnCompressor(valueChunkHeader.getCompressionType())); valueDataTypeList.add(valueChunkHeader.getDataType()); valueDecoderList.add( Decoder.getDecoderByType( @@ -234,7 +245,7 @@ public class AlignedChunkReader extends AbstractChunkReader { timePageData, defaultTimeDecoder, valuePageHeaderList, - valuePageDataList, + lazyLoadPageDataArray, valueDataTypeList, valueDecoderList, queryFilter); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java index 4f724d21..a85683b1 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java @@ -28,6 +28,7 @@ import org.apache.tsfile.file.metadata.statistics.Statistics; import org.apache.tsfile.read.common.Chunk; import org.apache.tsfile.read.common.TimeRange; import org.apache.tsfile.read.filter.basic.Filter; +import org.apache.tsfile.read.reader.page.LazyLoadPageData; import org.apache.tsfile.read.reader.page.PageReader; import java.io.IOException; @@ -42,7 +43,7 @@ public class ChunkReader extends AbstractChunkReader { private final List<TimeRange> deleteIntervalList; @SuppressWarnings("unchecked") - public ChunkReader(Chunk chunk, long readStopTime, Filter queryFilter) throws IOException { + public ChunkReader(Chunk chunk, long readStopTime, Filter queryFilter) { super(readStopTime, queryFilter); this.chunkHeader = chunk.getHeader(); this.chunkDataBuffer = chunk.getData(); @@ -55,22 +56,19 @@ public class ChunkReader extends AbstractChunkReader { this(chunk, Long.MIN_VALUE, null); } - public ChunkReader(Chunk chunk, Filter queryFilter) throws IOException { + public ChunkReader(Chunk chunk, Filter queryFilter) { this(chunk, Long.MIN_VALUE, queryFilter); } /** * Constructor of ChunkReader by timestamp. This constructor is used to accelerate queries by * filtering out pages whose endTime is less than current timestamp. - * - * @throws IOException exception when initAllPageReaders */ - public ChunkReader(Chunk chunk, long readStopTime) throws IOException { + public ChunkReader(Chunk chunk, long readStopTime) { this(chunk, readStopTime, null); } - private void initAllPageReaders(Statistics<? extends Serializable> chunkStatistic) - throws IOException { + private void initAllPageReaders(Statistics<? extends Serializable> chunkStatistic) { // construct next satisfied page header while (chunkDataBuffer.remaining() > 0) { // deserialize a PageHeader from chunkDataBuffer @@ -126,12 +124,16 @@ public class ChunkReader extends AbstractChunkReader { chunkDataBuffer.position(chunkDataBuffer.position() + pageHeader.getCompressedSize()); } - private PageReader constructPageReader(PageHeader pageHeader) throws IOException { - ByteBuffer pageData = deserializePageData(pageHeader, chunkDataBuffer, chunkHeader); + private PageReader constructPageReader(PageHeader pageHeader) { + IUnCompressor unCompressor = IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()); + // record the current position of chunkDataBuffer, use this to get the page data in PageReader + // through directly accessing the buffer array + int currentPagePosition = chunkDataBuffer.position(); + skipCurrentPage(pageHeader); PageReader reader = new PageReader( pageHeader, - pageData, + new LazyLoadPageData(chunkDataBuffer.array(), currentPagePosition, unCompressor), chunkHeader.getDataType(), Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType()), defaultTimeDecoder, diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java index b1ff14e3..75f664f5 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java @@ -92,6 +92,40 @@ public class AlignedPageReader implements IPageReader { this.valueCount = valuePageReaderList.size(); } + @SuppressWarnings("squid:S107") + public AlignedPageReader( + PageHeader timePageHeader, + ByteBuffer timePageData, + Decoder timeDecoder, + List<PageHeader> valuePageHeaderList, + // The reason for using Array here, rather than passing in + // List<LazyLoadPageData> as a parameter, is that after type erasure, it would + // conflict with the existing constructor. + LazyLoadPageData[] lazyLoadPageDataArray, + List<TSDataType> valueDataTypeList, + List<Decoder> valueDecoderList, + Filter globalTimeFilter) { + timePageReader = new TimePageReader(timePageHeader, timePageData, timeDecoder); + isModified = timePageReader.isModified(); + valuePageReaderList = new ArrayList<>(valuePageHeaderList.size()); + for (int i = 0; i < valuePageHeaderList.size(); i++) { + if (valuePageHeaderList.get(i) != null) { + ValuePageReader valuePageReader = + new ValuePageReader( + valuePageHeaderList.get(i), + lazyLoadPageDataArray[i], + valueDataTypeList.get(i), + valueDecoderList.get(i)); + valuePageReaderList.add(valuePageReader); + isModified = isModified || valuePageReader.isModified(); + } else { + valuePageReaderList.add(null); + } + } + this.globalTimeFilter = globalTimeFilter; + this.valueCount = valuePageReaderList.size(); + } + @Override public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException { BatchData pageData = BatchDataFactory.createBatchData(TSDataType.VECTOR, ascending, false); @@ -216,7 +250,7 @@ public class AlignedPageReader implements IPageReader { unFilteredBlock, builder, pushDownFilter, paginationController); } - private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) { + private void buildResultWithoutAnyFilterAndDelete(long[] timeBatch) throws IOException { if (paginationController.hasCurOffset(timeBatch.length)) { paginationController.consumeOffset(timeBatch.length); } else { @@ -295,8 +329,8 @@ public class AlignedPageReader implements IPageReader { return readEndIndex + 1; } - private void buildValueColumns( - int readEndIndex, boolean[] keepCurrentRow, boolean[][] isDeleted) { + private void buildValueColumns(int readEndIndex, boolean[] keepCurrentRow, boolean[][] isDeleted) + throws IOException { for (int i = 0; i < valueCount; i++) { ValuePageReader pageReader = valuePageReaderList.get(i); if (pageReader != null) { @@ -320,7 +354,8 @@ public class AlignedPageReader implements IPageReader { } } - private void fillIsDeletedAndBitMask(long[] timeBatch, boolean[][] isDeleted, byte[] bitmask) { + private void fillIsDeletedAndBitMask(long[] timeBatch, boolean[][] isDeleted, byte[] bitmask) + throws IOException { for (int columnIndex = 0; columnIndex < valueCount; columnIndex++) { ValuePageReader pageReader = valuePageReaderList.get(columnIndex); if (pageReader != null) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/LazyLoadPageData.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/LazyLoadPageData.java new file mode 100644 index 00000000..d89cc2d8 --- /dev/null +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/LazyLoadPageData.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tsfile.read.reader.page; + +import org.apache.tsfile.compress.IUnCompressor; +import org.apache.tsfile.file.header.PageHeader; + +import java.io.IOException; +import java.nio.ByteBuffer; + +public class LazyLoadPageData { + /** Reference to the data of original chunkDataBuffer. * */ + private final byte[] chunkData; + + private final int pageDataOffset; + + private final IUnCompressor unCompressor; + + public LazyLoadPageData(byte[] data, int offset, IUnCompressor unCompressor) { + this.chunkData = data; + this.pageDataOffset = offset; + this.unCompressor = unCompressor; + } + + public ByteBuffer uncompressPageData(PageHeader pageHeader) throws IOException { + int compressedPageBodyLength = pageHeader.getCompressedSize(); + byte[] uncompressedPageData = new byte[pageHeader.getUncompressedSize()]; + try { + unCompressor.uncompress( + chunkData, pageDataOffset, compressedPageBodyLength, uncompressedPageData, 0); + } catch (Exception e) { + throw new IOException( + "Uncompress error! uncompress size: " + + pageHeader.getUncompressedSize() + + "compressed size: " + + pageHeader.getCompressedSize() + + "page header: " + + pageHeader + + e.getMessage()); + } + return ByteBuffer.wrap(uncompressedPageData); + } + + public IUnCompressor getUnCompressor() { + return unCompressor; + } +} diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java index bbf0b49c..3bb57a26 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/PageReader.java @@ -74,6 +74,10 @@ public class PageReader implements IPageReader { private int deleteCursor = 0; + // used for lazy decoding + + private LazyLoadPageData lazyLoadPageData; + public PageReader( ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder, Decoder timeDecoder) { this(null, pageData, dataType, valueDecoder, timeDecoder, null); @@ -103,6 +107,21 @@ public class PageReader implements IPageReader { splitDataToTimeStampAndValue(pageData); } + public PageReader( + PageHeader pageHeader, + LazyLoadPageData lazyLoadPageData, + TSDataType dataType, + Decoder valueDecoder, + Decoder timeDecoder, + Filter recordFilter) { + this.dataType = dataType; + this.valueDecoder = valueDecoder; + this.timeDecoder = timeDecoder; + this.recordFilter = recordFilter; + this.pageHeader = pageHeader; + this.lazyLoadPageData = lazyLoadPageData; + } + /** * split pageContent into two stream: time and value * @@ -118,10 +137,19 @@ public class PageReader implements IPageReader { valueBuffer.position(timeBufferLength); } + /** Call this method before accessing data. */ + private void uncompressDataIfNecessary() throws IOException { + if (lazyLoadPageData != null && (timeBuffer == null || valueBuffer == null)) { + splitDataToTimeStampAndValue(lazyLoadPageData.uncompressPageData(pageHeader)); + lazyLoadPageData = null; + } + } + /** @return the returned BatchData may be empty, but never be null */ @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning @Override public BatchData getAllSatisfiedPageData(boolean ascending) throws IOException { + uncompressDataIfNecessary(); BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending, false); boolean allSatisfy = recordFilter == null || recordFilter.allSatisfy(this); while (timeDecoder.hasNext(timeBuffer)) { @@ -176,6 +204,7 @@ public class PageReader implements IPageReader { @Override public TsBlock getAllSatisfiedData() throws IOException { + uncompressDataIfNecessary(); TsBlockBuilder builder; int initialExpectedEntries = (int) pageHeader.getStatistics().getCount(); if (paginationController.hasLimit()) { diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java index 8bf13791..7f708452 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/ValuePageReader.java @@ -33,6 +33,7 @@ import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.utils.TsPrimitiveType; import org.apache.tsfile.write.UnSupportedDataTypeException; +import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; import java.util.Arrays; @@ -61,6 +62,8 @@ public class ValuePageReader { private int deleteCursor = 0; + private LazyLoadPageData lazyLoadPageData; + public ValuePageReader( PageHeader pageHeader, ByteBuffer pageData, TSDataType dataType, Decoder valueDecoder) { this.dataType = dataType; @@ -72,6 +75,17 @@ public class ValuePageReader { this.valueBuffer = pageData; } + public ValuePageReader( + PageHeader pageHeader, + LazyLoadPageData lazyLoadPageData, + TSDataType dataType, + Decoder valueDecoder) { + this.dataType = dataType; + this.valueDecoder = valueDecoder; + this.pageHeader = pageHeader; + this.lazyLoadPageData = lazyLoadPageData; + } + private void splitDataToBitmapAndValue(ByteBuffer pageData) { if (!pageData.hasRemaining()) { // Empty Page return; @@ -82,11 +96,23 @@ public class ValuePageReader { this.valueBuffer = pageData.slice(); } + /** Call this method before accessing data. */ + private void uncompressDataIfNecessary() throws IOException { + if (lazyLoadPageData != null && valueBuffer == null) { + ByteBuffer pageData = lazyLoadPageData.uncompressPageData(pageHeader); + splitDataToBitmapAndValue(pageData); + this.valueBuffer = pageData; + lazyLoadPageData = null; + } + } + /** * return a BatchData with the corresponding timeBatch, the BatchData's dataType is same as this * sub sensor */ - public BatchData nextBatch(long[] timeBatch, boolean ascending, Filter filter) { + public BatchData nextBatch(long[] timeBatch, boolean ascending, Filter filter) + throws IOException { + uncompressDataIfNecessary(); BatchData pageData = BatchDataFactory.createBatchData(dataType, ascending, false); for (int i = 0; i < timeBatch.length; i++) { if (((bitmap[i / 8] & 0xFF) & (MASK >>> (i % 8))) == 0) { @@ -141,7 +167,8 @@ public class ValuePageReader { return pageData.flip(); } - public TsPrimitiveType nextValue(long timestamp, int timeIndex) { + public TsPrimitiveType nextValue(long timestamp, int timeIndex) throws IOException { + uncompressDataIfNecessary(); TsPrimitiveType resultValue = null; if (valueBuffer == null || ((bitmap[timeIndex / 8] & 0xFF) & (MASK >>> (timeIndex % 8))) == 0) { return null; @@ -198,7 +225,8 @@ public class ValuePageReader { * return the value array of the corresponding time, if this sub sensor don't have a value in a * time, just fill it with null */ - public TsPrimitiveType[] nextValueBatch(long[] timeBatch) { + public TsPrimitiveType[] nextValueBatch(long[] timeBatch) throws IOException { + uncompressDataIfNecessary(); TsPrimitiveType[] valueBatch = new TsPrimitiveType[size]; if (valueBuffer == null) { return valueBatch; @@ -256,10 +284,9 @@ public class ValuePageReader { } public void writeColumnBuilderWithNextBatch( - int readEndIndex, - ColumnBuilder columnBuilder, - boolean[] keepCurrentRow, - boolean[] isDeleted) { + int readEndIndex, ColumnBuilder columnBuilder, boolean[] keepCurrentRow, boolean[] isDeleted) + throws IOException { + uncompressDataIfNecessary(); if (valueBuffer == null) { for (int i = 0; i < readEndIndex; i++) { if (keepCurrentRow[i]) { @@ -347,7 +374,8 @@ public class ValuePageReader { } public void writeColumnBuilderWithNextBatch( - int readEndIndex, ColumnBuilder columnBuilder, boolean[] keepCurrentRow) { + int readEndIndex, ColumnBuilder columnBuilder, boolean[] keepCurrentRow) throws IOException { + uncompressDataIfNecessary(); if (valueBuffer == null) { for (int i = 0; i < readEndIndex; i++) { if (keepCurrentRow[i]) { @@ -411,7 +439,8 @@ public class ValuePageReader { } public void writeColumnBuilderWithNextBatch( - int readStartIndex, int readEndIndex, ColumnBuilder columnBuilder) { + int readStartIndex, int readEndIndex, ColumnBuilder columnBuilder) throws IOException { + uncompressDataIfNecessary(); if (valueBuffer == null) { columnBuilder.appendNull(readEndIndex - readStartIndex); return; @@ -574,7 +603,8 @@ public class ValuePageReader { return dataType; } - public byte[] getBitmap() { + public byte[] getBitmap() throws IOException { + uncompressDataIfNecessary(); return Arrays.copyOf(bitmap, bitmap.length); } } diff --git a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/AlignedPageReaderPushDownTest.java b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/AlignedPageReaderPushDownTest.java index b28a03d6..47dc4024 100644 --- a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/AlignedPageReaderPushDownTest.java +++ b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/AlignedPageReaderPushDownTest.java @@ -20,6 +20,7 @@ package org.apache.tsfile.read.reader; import org.apache.tsfile.compress.ICompressor; +import org.apache.tsfile.compress.IUnCompressor; import org.apache.tsfile.encoding.decoder.Decoder; import org.apache.tsfile.encoding.decoder.DeltaBinaryDecoder; import org.apache.tsfile.encoding.decoder.IntRleDecoder; @@ -34,6 +35,7 @@ import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.read.filter.factory.TimeFilterApi; import org.apache.tsfile.read.filter.factory.ValueFilterApi; import org.apache.tsfile.read.reader.page.AlignedPageReader; +import org.apache.tsfile.read.reader.page.LazyLoadPageData; import org.apache.tsfile.read.reader.series.PaginationController; import org.apache.tsfile.write.page.TimePageWriter; import org.apache.tsfile.write.page.ValuePageWriter; @@ -142,10 +144,53 @@ public class AlignedPageReaderPushDownTest { return alignedPageReader; } + private AlignedPageReader generateAlignedPageReaderUsingLazyLoad( + Filter globalTimeFilter, List<Boolean> modified) throws IOException { + resetDataBuffer(); + testValuePageHeader1.setModified(modified.get(0)); + testValuePageHeader2.setModified(modified.get(1)); + testValuePageHeader1.setCompressedSize(testValuePageData1.array().length); + testValuePageHeader1.setUncompressedSize(testValuePageData1.array().length); + testValuePageHeader2.setCompressedSize(testValuePageData2.array().length); + testValuePageHeader2.setUncompressedSize(testValuePageData2.array().length); + List<PageHeader> valuePageHeaderList = + Arrays.asList(testValuePageHeader1, testValuePageHeader2); + LazyLoadPageData[] lazyLoadPageDataArray = new LazyLoadPageData[2]; + lazyLoadPageDataArray[0] = + new LazyLoadPageData( + testValuePageData1.array(), + 0, + IUnCompressor.getUnCompressor(CompressionType.UNCOMPRESSED)); + lazyLoadPageDataArray[1] = + new LazyLoadPageData( + testValuePageData2.array(), + 0, + IUnCompressor.getUnCompressor(CompressionType.UNCOMPRESSED)); + List<TSDataType> valueDataTypeList = Arrays.asList(TSDataType.INT32, TSDataType.INT32); + List<Decoder> valueDecoderList = Arrays.asList(new IntRleDecoder(), new IntRleDecoder()); + AlignedPageReader alignedPageReader = + new AlignedPageReader( + testTimePageHeader, + testTimePageData, + new DeltaBinaryDecoder.LongDeltaDecoder(), + valuePageHeaderList, + lazyLoadPageDataArray, + valueDataTypeList, + valueDecoderList, + globalTimeFilter); + alignedPageReader.initTsBlockBuilder(valueDataTypeList); + return alignedPageReader; + } + private AlignedPageReader generateAlignedPageReader(Filter globalTimeFilter) throws IOException { return generateAlignedPageReader(globalTimeFilter, Arrays.asList(false, false)); } + private AlignedPageReader generateAlignedPageReaderUsingLazyLoad(Filter globalTimeFilter) + throws IOException { + return generateAlignedPageReaderUsingLazyLoad(globalTimeFilter, Arrays.asList(false, false)); + } + private AlignedPageReader generateSingleColumnAlignedPageReader( Filter globalTimeFilter, boolean modified) { resetDataBuffer(); @@ -168,10 +213,44 @@ public class AlignedPageReaderPushDownTest { return alignedPageReader; } + private AlignedPageReader generateSingleColumnAlignedPageReaderUsingLazyLoad( + Filter globalTimeFilter, boolean modified) { + resetDataBuffer(); + testValuePageHeader2.setModified(modified); + testValuePageHeader2.setCompressedSize(testValuePageData2.array().length); + testValuePageHeader2.setUncompressedSize(testValuePageData2.array().length); + List<PageHeader> valuePageHeaderList = Collections.singletonList(testValuePageHeader2); + LazyLoadPageData[] lazyLoadPageDataArray = new LazyLoadPageData[1]; + lazyLoadPageDataArray[0] = + new LazyLoadPageData( + testValuePageData2.array(), + 0, + IUnCompressor.getUnCompressor(CompressionType.UNCOMPRESSED)); + List<TSDataType> valueDataTypeList = Collections.singletonList(TSDataType.INT32); + List<Decoder> valueDecoderList = Collections.singletonList(new IntRleDecoder()); + AlignedPageReader alignedPageReader = + new AlignedPageReader( + testTimePageHeader, + testTimePageData, + new DeltaBinaryDecoder.LongDeltaDecoder(), + valuePageHeaderList, + lazyLoadPageDataArray, + valueDataTypeList, + valueDecoderList, + globalTimeFilter); + alignedPageReader.initTsBlockBuilder(valueDataTypeList); + return alignedPageReader; + } + private AlignedPageReader generateSingleColumnAlignedPageReader(Filter globalTimeFilter) { return generateSingleColumnAlignedPageReader(globalTimeFilter, false); } + private AlignedPageReader generateSingleColumnAlignedPageReaderUsingLazyLoad( + Filter globalTimeFilter) { + return generateSingleColumnAlignedPageReaderUsingLazyLoad(globalTimeFilter, false); + } + @Test public void testNullFilter() throws IOException { AlignedPageReader alignedPageReader1 = generateAlignedPageReader(null); @@ -181,6 +260,14 @@ public class AlignedPageReaderPushDownTest { AlignedPageReader alignedPageReader2 = generateSingleColumnAlignedPageReader(null); TsBlock tsBlock2 = alignedPageReader2.getAllSatisfiedData(); Assert.assertEquals(80, tsBlock2.getPositionCount()); + + AlignedPageReader alignedPageReader3 = generateAlignedPageReaderUsingLazyLoad(null); + TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData(); + Assert.assertEquals(100, tsBlock3.getPositionCount()); + + AlignedPageReader alignedPageReader4 = generateSingleColumnAlignedPageReaderUsingLazyLoad(null); + TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData(); + Assert.assertEquals(80, tsBlock4.getPositionCount()); } @Test @@ -199,6 +286,22 @@ public class AlignedPageReaderPushDownTest { Collections.singletonList(Collections.singletonList(new TimeRange(30, 39)))); TsBlock tsBlock2 = alignedPageReader2.getAllSatisfiedData(); Assert.assertEquals(70, tsBlock2.getPositionCount()); + + AlignedPageReader alignedPageReader3 = + generateAlignedPageReaderUsingLazyLoad(null, Arrays.asList(true, true)); + alignedPageReader3.setDeleteIntervalList( + Arrays.asList( + Arrays.asList(new TimeRange(0, 9), new TimeRange(20, 29)), + Collections.singletonList(new TimeRange(30, 39)))); + TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData(); + Assert.assertEquals(90, tsBlock3.getPositionCount()); + + AlignedPageReader alignedPageReader4 = + generateSingleColumnAlignedPageReaderUsingLazyLoad(null, true); + alignedPageReader4.setDeleteIntervalList( + Collections.singletonList(Collections.singletonList(new TimeRange(30, 39)))); + TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData(); + Assert.assertEquals(70, tsBlock4.getPositionCount()); } @Test @@ -216,6 +319,20 @@ public class AlignedPageReaderPushDownTest { Assert.assertEquals(10, tsBlock2.getPositionCount()); Assert.assertEquals(20, tsBlock2.getTimeByIndex(0)); Assert.assertEquals(29, tsBlock2.getTimeByIndex(9)); + + AlignedPageReader alignedPageReader3 = generateAlignedPageReaderUsingLazyLoad(null); + alignedPageReader3.setLimitOffset(new PaginationController(10, 10)); + TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData(); + Assert.assertEquals(10, tsBlock3.getPositionCount()); + Assert.assertEquals(10, tsBlock3.getTimeByIndex(0)); + Assert.assertEquals(19, tsBlock3.getTimeByIndex(9)); + + AlignedPageReader alignedPageReader4 = generateSingleColumnAlignedPageReaderUsingLazyLoad(null); + alignedPageReader4.setLimitOffset(new PaginationController(10, 10)); + TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData(); + Assert.assertEquals(10, tsBlock4.getPositionCount()); + Assert.assertEquals(20, tsBlock4.getTimeByIndex(0)); + Assert.assertEquals(29, tsBlock4.getTimeByIndex(9)); } @Test @@ -230,6 +347,17 @@ public class AlignedPageReaderPushDownTest { alignedPageReader2.addRecordFilter(ValueFilterApi.gtEq(50)); TsBlock tsBlock2 = alignedPageReader2.getAllSatisfiedData(); Assert.assertEquals(40, tsBlock2.getPositionCount()); + + AlignedPageReader alignedPageReader3 = generateAlignedPageReaderUsingLazyLoad(globalTimeFilter); + alignedPageReader3.addRecordFilter(ValueFilterApi.gtEq(50)); + TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData(); + Assert.assertEquals(50, tsBlock3.getPositionCount()); + + AlignedPageReader alignedPageReader4 = + generateSingleColumnAlignedPageReaderUsingLazyLoad(globalTimeFilter); + alignedPageReader4.addRecordFilter(ValueFilterApi.gtEq(50)); + TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData(); + Assert.assertEquals(40, tsBlock4.getPositionCount()); } @Test @@ -250,6 +378,23 @@ public class AlignedPageReaderPushDownTest { Assert.assertEquals(10, tsBlock2.getPositionCount()); Assert.assertEquals(60, tsBlock2.getTimeByIndex(0)); Assert.assertEquals(69, tsBlock2.getTimeByIndex(9)); + + AlignedPageReader alignedPageReader3 = generateAlignedPageReaderUsingLazyLoad(globalTimeFilter); + alignedPageReader3.addRecordFilter(ValueFilterApi.gtEq(50)); + alignedPageReader3.setLimitOffset(new PaginationController(10, 10)); + TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData(); + Assert.assertEquals(10, tsBlock3.getPositionCount()); + Assert.assertEquals(60, tsBlock3.getTimeByIndex(0)); + Assert.assertEquals(69, tsBlock3.getTimeByIndex(9)); + + AlignedPageReader alignedPageReader4 = + generateSingleColumnAlignedPageReaderUsingLazyLoad(globalTimeFilter); + alignedPageReader4.addRecordFilter(ValueFilterApi.gtEq(50)); + alignedPageReader4.setLimitOffset(new PaginationController(10, 10)); + TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData(); + Assert.assertEquals(10, tsBlock4.getPositionCount()); + Assert.assertEquals(60, tsBlock4.getTimeByIndex(0)); + Assert.assertEquals(69, tsBlock4.getTimeByIndex(9)); } @Test @@ -264,6 +409,17 @@ public class AlignedPageReaderPushDownTest { alignedPageReader2.addRecordFilter(ValueFilterApi.gtEq(0)); TsBlock tsBlock2 = alignedPageReader2.getAllSatisfiedData(); Assert.assertEquals(40, tsBlock2.getPositionCount()); + + AlignedPageReader alignedPageReader3 = generateAlignedPageReaderUsingLazyLoad(globalTimeFilter); + alignedPageReader3.addRecordFilter(ValueFilterApi.gtEq(0)); + TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData(); + Assert.assertEquals(50, tsBlock3.getPositionCount()); + + AlignedPageReader alignedPageReader4 = + generateSingleColumnAlignedPageReaderUsingLazyLoad(globalTimeFilter); + alignedPageReader4.addRecordFilter(ValueFilterApi.gtEq(0)); + TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData(); + Assert.assertEquals(40, tsBlock4.getPositionCount()); } @Test @@ -284,6 +440,23 @@ public class AlignedPageReaderPushDownTest { Assert.assertEquals(10, tsBlock2.getPositionCount()); Assert.assertEquals(60, tsBlock2.getTimeByIndex(0)); Assert.assertEquals(69, tsBlock2.getTimeByIndex(9)); + + AlignedPageReader alignedPageReader3 = generateAlignedPageReaderUsingLazyLoad(globalTimeFilter); + alignedPageReader3.addRecordFilter(ValueFilterApi.gtEq(0)); + alignedPageReader3.setLimitOffset(new PaginationController(10, 10)); + TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData(); + Assert.assertEquals(10, tsBlock3.getPositionCount()); + Assert.assertEquals(60, tsBlock3.getTimeByIndex(0)); + Assert.assertEquals(69, tsBlock3.getTimeByIndex(9)); + + AlignedPageReader alignedPageReader4 = + generateSingleColumnAlignedPageReaderUsingLazyLoad(globalTimeFilter); + alignedPageReader4.addRecordFilter(ValueFilterApi.gtEq(0)); + alignedPageReader4.setLimitOffset(new PaginationController(10, 10)); + TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData(); + Assert.assertEquals(10, tsBlock4.getPositionCount()); + Assert.assertEquals(60, tsBlock4.getTimeByIndex(0)); + Assert.assertEquals(69, tsBlock4.getTimeByIndex(9)); } @Test @@ -298,6 +471,17 @@ public class AlignedPageReaderPushDownTest { alignedPageReader2.addRecordFilter(ValueFilterApi.lt(80)); TsBlock tsBlock2 = alignedPageReader2.getAllSatisfiedData(); Assert.assertEquals(50, tsBlock2.getPositionCount()); + + AlignedPageReader alignedPageReader3 = generateAlignedPageReaderUsingLazyLoad(globalTimeFilter); + alignedPageReader3.addRecordFilter(ValueFilterApi.lt(80)); + TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData(); + Assert.assertEquals(50, tsBlock3.getPositionCount()); + + AlignedPageReader alignedPageReader4 = + generateSingleColumnAlignedPageReaderUsingLazyLoad(globalTimeFilter); + alignedPageReader4.addRecordFilter(ValueFilterApi.lt(80)); + TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData(); + Assert.assertEquals(50, tsBlock4.getPositionCount()); } @Test @@ -318,5 +502,22 @@ public class AlignedPageReaderPushDownTest { Assert.assertEquals(10, tsBlock2.getPositionCount()); Assert.assertEquals(60, tsBlock2.getTimeByIndex(0)); Assert.assertEquals(69, tsBlock2.getTimeByIndex(9)); + + AlignedPageReader alignedPageReader3 = generateAlignedPageReaderUsingLazyLoad(globalTimeFilter); + alignedPageReader3.addRecordFilter(ValueFilterApi.lt(80)); + alignedPageReader3.setLimitOffset(new PaginationController(10, 10)); + TsBlock tsBlock3 = alignedPageReader3.getAllSatisfiedData(); + Assert.assertEquals(10, tsBlock3.getPositionCount()); + Assert.assertEquals(60, tsBlock3.getTimeByIndex(0)); + Assert.assertEquals(69, tsBlock3.getTimeByIndex(9)); + + AlignedPageReader alignedPageReader4 = + generateSingleColumnAlignedPageReaderUsingLazyLoad(globalTimeFilter); + alignedPageReader4.addRecordFilter(ValueFilterApi.lt(80)); + alignedPageReader4.setLimitOffset(new PaginationController(10, 10)); + TsBlock tsBlock4 = alignedPageReader4.getAllSatisfiedData(); + Assert.assertEquals(10, tsBlock4.getPositionCount()); + Assert.assertEquals(60, tsBlock4.getTimeByIndex(0)); + Assert.assertEquals(69, tsBlock4.getTimeByIndex(9)); } }
