This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d33b8711fc [core] Optimization of Parquet Predicate Pushdown
Capability (#4608)
d33b8711fc is described below
commit d33b8711fc6b4e1f35ba7d85336be4ff3baa956d
Author: aiden.dong <[email protected]>
AuthorDate: Tue Dec 3 13:33:44 2024 +0800
[core] Optimization of Parquet Predicate Pushdown Capability (#4608)
---
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 63 +++++++
.../format/parquet/ParquetReaderFactory.java | 66 ++++++-
.../parquet/reader/AbstractColumnReader.java | 204 +++++++++++++++------
.../format/parquet/reader/BooleanColumnReader.java | 36 +++-
.../format/parquet/reader/ByteColumnReader.java | 39 +++-
.../format/parquet/reader/BytesColumnReader.java | 41 ++++-
.../format/parquet/reader/DoubleColumnReader.java | 38 +++-
.../parquet/reader/FixedLenBytesColumnReader.java | 36 +++-
.../format/parquet/reader/FloatColumnReader.java | 38 +++-
.../format/parquet/reader/IntColumnReader.java | 39 +++-
.../format/parquet/reader/LongColumnReader.java | 39 +++-
.../format/parquet/reader/NestedColumnReader.java | 2 +-
.../reader/NestedPrimitiveColumnReader.java | 141 +++++++++-----
.../format/parquet/reader/ParquetReadState.java | 148 +++++++++++++++
.../parquet/reader/ParquetSplitReaderUtil.java | 41 ++---
.../format/parquet/reader/RunLengthDecoder.java | 45 +++++
.../format/parquet/reader/ShortColumnReader.java | 38 +++-
.../parquet/reader/TimestampColumnReader.java | 15 +-
18 files changed, 898 insertions(+), 171 deletions(-)
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 46b85223bc..e80b49a0f0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -84,6 +84,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
@@ -809,6 +810,68 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
"1|4|500|binary|varbinary|mapKey:mapVal|multiset"));
}
+ @Test
+ public void testDeletionVectorsWithParquetFilter() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(BUCKET, 1);
+ conf.set(DELETION_VECTORS_ENABLED, true);
+ conf.set(FILE_FORMAT, "parquet");
+ conf.set("parquet.block.size", "1048576");
+ conf.set("parquet.page.size", "1024");
+ });
+
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+
+ BatchTableWrite write =
+ (BatchTableWrite)
+ writeBuilder
+ .newWrite()
+ .withIOManager(new
IOManagerImpl(tempDir.toString()));
+
+ for (int i = 0; i < 200000; i++) {
+ write.write(rowData(1, i, i * 100L));
+ }
+
+ List<CommitMessage> messages = write.prepareCommit();
+ BatchTableCommit commit = writeBuilder.newCommit();
+ commit.commit(messages);
+ write =
+ (BatchTableWrite)
+ writeBuilder
+ .newWrite()
+ .withIOManager(new
IOManagerImpl(tempDir.toString()));
+ for (int i = 180000; i < 200000; i++) {
+ write.write(rowDataWithKind(RowKind.DELETE, 1, i, i * 100L));
+ }
+
+ messages = write.prepareCommit();
+ commit = writeBuilder.newCommit();
+ commit.commit(messages);
+
+ PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ Random random = new Random();
+
+ for (int i = 0; i < 10; i++) {
+ int value = random.nextInt(180000);
+ TableRead read = table.newRead().withFilter(builder.equal(1,
value)).executeFilter();
+ assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
+ .isEqualTo(
+ Arrays.asList(
+ String.format(
+
"%d|%d|%d|binary|varbinary|mapKey:mapVal|multiset",
+ 1, value, value * 100L)));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ int value = 180000 + random.nextInt(20000);
+ TableRead read = table.newRead().withFilter(builder.equal(1,
value)).executeFilter();
+ assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)).isEmpty();
+ }
+ }
+
@Test
public void testDeletionVectorsWithFileIndexInMeta() throws Exception {
FileStoreTable table =
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index f0151d6f3d..0c99653120 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -28,6 +28,7 @@ import
org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.reader.ColumnReader;
import org.apache.paimon.format.parquet.reader.ParquetDecimalVector;
+import org.apache.paimon.format.parquet.reader.ParquetReadState;
import org.apache.paimon.format.parquet.reader.ParquetTimestampVector;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.fs.Path;
@@ -130,7 +131,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
buildFieldsList(projectedType.getFields(),
projectedType.getFieldNames(), columnIO);
return new ParquetReader(
- reader, requestedSchema, reader.getRecordCount(),
poolOfBatches, fields);
+ reader, requestedSchema, reader.getFilteredRecordCount(),
poolOfBatches, fields);
}
private void setReadOptions(ParquetReadOptions.Builder builder) {
@@ -336,6 +337,10 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
private long nextRowPosition;
+ private ParquetReadState currentRowGroupReadState;
+
+ private long currentRowGroupFirstRowIndex;
+
/**
* For each request column, the reader to read this column. This is
NULL if this column is
* missing from the file, in which case we populate the attribute with
NULL.
@@ -359,6 +364,7 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
this.totalCountLoadedSoFar = 0;
this.currentRowPosition = 0;
this.nextRowPosition = 0;
+ this.currentRowGroupFirstRowIndex = 0;
this.fields = fields;
}
@@ -390,7 +396,8 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
currentRowPosition = nextRowPosition;
}
- int num = (int) Math.min(batchSize, totalCountLoadedSoFar -
rowsReturned);
+ int num = getBachSize();
+
for (int i = 0; i < columnReaders.length; ++i) {
if (columnReaders[i] == null) {
batch.writableVectors[i].fillWithNulls();
@@ -400,13 +407,13 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
}
}
rowsReturned += num;
- nextRowPosition = currentRowPosition + num;
+ nextRowPosition = getNextRowPosition(num);
batch.columnarBatch.setNumRows(num);
return true;
}
private void readNextRowGroup() throws IOException {
- PageReadStore rowGroup = reader.readNextRowGroup();
+ PageReadStore rowGroup = reader.readNextFilteredRowGroup();
if (rowGroup == null) {
throw new IOException(
"expecting more rows but reached last block. Read "
@@ -415,6 +422,9 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
+ totalRowCount);
}
+ this.currentRowGroupReadState =
+ new
ParquetReadState(rowGroup.getRowIndexes().orElse(null));
+
List<Type> types = requestedSchema.getFields();
columnReaders = new ColumnReader[types.size()];
for (int i = 0; i < types.size(); ++i) {
@@ -429,18 +439,62 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
0);
}
}
+
totalCountLoadedSoFar += rowGroup.getRowCount();
- if (rowGroup.getRowIndexOffset().isPresent()) {
- currentRowPosition = rowGroup.getRowIndexOffset().get();
+
+ if (rowGroup.getRowIndexOffset().isPresent()) { // filter
+ currentRowGroupFirstRowIndex =
rowGroup.getRowIndexOffset().get();
+ long pageIndex = 0;
+ if (!this.currentRowGroupReadState.isMaxRange()) {
+ pageIndex =
this.currentRowGroupReadState.currentRangeStart();
+ }
+ currentRowPosition = currentRowGroupFirstRowIndex + pageIndex;
} else {
if (reader.rowGroupsFiltered()) {
throw new RuntimeException(
"There is a bug, rowIndexOffset must be present
when row groups are filtered.");
}
+ currentRowGroupFirstRowIndex = nextRowPosition;
currentRowPosition = nextRowPosition;
}
}
+ private int getBachSize() throws IOException {
+
+ long rangeBatchSize = Long.MAX_VALUE;
+ if (this.currentRowGroupReadState.isFinished()) {
+ throw new IOException(
+ "expecting more rows but reached last page block. Read
"
+ + rowsReturned
+ + " out of "
+ + totalRowCount);
+ } else if (!this.currentRowGroupReadState.isMaxRange()) {
+ long pageIndex = this.currentRowPosition -
this.currentRowGroupFirstRowIndex;
+ rangeBatchSize =
this.currentRowGroupReadState.currentRangeEnd() - pageIndex + 1;
+ }
+
+ return (int)
+ Math.min(
+ batchSize,
+ Math.min(rangeBatchSize, totalCountLoadedSoFar -
rowsReturned));
+ }
+
+ private long getNextRowPosition(int num) {
+ if (this.currentRowGroupReadState.isMaxRange()) {
+ return this.currentRowPosition + num;
+ } else {
+ long pageIndex = this.currentRowPosition -
this.currentRowGroupFirstRowIndex;
+ long nextIndex = pageIndex + num;
+
+ if (this.currentRowGroupReadState.currentRangeEnd() <
nextIndex) {
+ this.currentRowGroupReadState.nextRange();
+ nextIndex =
this.currentRowGroupReadState.currentRangeStart();
+ }
+
+ return nextIndex;
+ }
+ }
+
private ParquetReaderBatch getCachedEntry() throws IOException {
try {
return pool.pollEntry();
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java
index 7e2ab6d5e7..5e3f4a7e6a 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/AbstractColumnReader.java
@@ -32,6 +32,7 @@ import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV1;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
@@ -65,20 +66,16 @@ public abstract class AbstractColumnReader<VECTOR extends
WritableColumnVector>
protected final ColumnDescriptor descriptor;
- /** Total number of values read. */
- private long valuesRead;
-
- /**
- * value that indicates the end of the current page. That is, if
valuesRead ==
- * endOfPageValueCount, we are at the end of the page.
- */
- private long endOfPageValueCount;
-
/** If true, the current page is dictionary encoded. */
private boolean isCurrentPageDictionaryEncoded;
/** Total values in the current page. */
- private int pageValueCount;
+ // private int pageValueCount;
+
+ /**
+ * Helper struct to track intermediate states while reading Parquet pages
in the column chunk.
+ */
+ private final ParquetReadState readState;
/*
* Input streams:
@@ -101,12 +98,14 @@ public abstract class AbstractColumnReader<VECTOR extends
WritableColumnVector>
/** Dictionary decoder to wrap dictionary ids input stream. */
private RunLengthDecoder dictionaryIdsDecoder;
- public AbstractColumnReader(ColumnDescriptor descriptor, PageReader
pageReader)
+ public AbstractColumnReader(ColumnDescriptor descriptor, PageReadStore
pageReadStore)
throws IOException {
this.descriptor = descriptor;
- this.pageReader = pageReader;
+ this.pageReader = pageReadStore.getPageReader(descriptor);
this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+ this.readState = new
ParquetReadState(pageReadStore.getRowIndexes().orElse(null));
+
DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
if (dictionaryPage != null) {
try {
@@ -147,56 +146,136 @@ public abstract class AbstractColumnReader<VECTOR
extends WritableColumnVector>
if (dictionary != null) {
dictionaryIds = vector.reserveDictionaryIds(readNumber);
}
- while (readNumber > 0) {
+
+ readState.resetForNewBatch(readNumber);
+
+ while (readState.rowsToReadInBatch > 0) {
// Compute the number of values we want to read in this page.
- int leftInPage = (int) (endOfPageValueCount - valuesRead);
- if (leftInPage == 0) {
- DataPage page = pageReader.readPage();
- if (page instanceof DataPageV1) {
- readPageV1((DataPageV1) page);
- } else if (page instanceof DataPageV2) {
- readPageV2((DataPageV2) page);
- } else {
- throw new RuntimeException("Unsupported page type: " +
page.getClass());
+ if (readState.valuesToReadInPage == 0) {
+ int pageValueCount = readPage();
+ if (pageValueCount < 0) {
+ // we've read all the pages; this could happen when we're
reading a repeated
+ // list and we
+ // don't know where the list will end until we've seen all
the pages.
+ break;
}
- leftInPage = (int) (endOfPageValueCount - valuesRead);
}
- int num = Math.min(readNumber, leftInPage);
- if (isCurrentPageDictionaryEncoded) {
- // Read and decode dictionary ids.
- runLenDecoder.readDictionaryIds(
- num, dictionaryIds, vector, rowId, maxDefLevel,
this.dictionaryIdsDecoder);
-
- if (vector.hasDictionary() || (rowId == 0 &&
supportLazyDecode())) {
- // Column vector supports lazy decoding of dictionary
values so just set the
- // dictionary.
- // We can't do this if rowId != 0 AND the column doesn't
have a dictionary (i.e.
- // some
- // non-dictionary encoded values have already been added).
- vector.setDictionary(new ParquetDictionary(dictionary));
+
+ if (readState.isFinished()) {
+ break;
+ }
+
+ long pageRowId = readState.rowId;
+ int leftInBatch = readState.rowsToReadInBatch;
+ int leftInPage = readState.valuesToReadInPage;
+
+ int readBatch = Math.min(leftInBatch, leftInPage);
+
+ long rangeStart = readState.currentRangeStart();
+ long rangeEnd = readState.currentRangeEnd();
+
+ if (pageRowId < rangeStart) {
+ int toSkip = (int) (rangeStart - pageRowId);
+ if (toSkip >= leftInPage) { // drop page
+ pageRowId += leftInPage;
+ leftInPage = 0;
} else {
- readBatchFromDictionaryIds(rowId, num, vector,
dictionaryIds);
+ if (isCurrentPageDictionaryEncoded) {
+ runLenDecoder.skipDictionaryIds(
+ toSkip, maxDefLevel,
this.dictionaryIdsDecoder);
+ pageRowId += toSkip;
+ leftInPage -= toSkip;
+ } else {
+ skipBatch(toSkip);
+ pageRowId += toSkip;
+ leftInPage -= toSkip;
+ }
}
+ } else if (pageRowId > rangeEnd) {
+ readState.nextRange();
} else {
- if (vector.hasDictionary() && rowId != 0) {
- // This batch already has dictionary encoded values but
this new page is not.
- // The batch
- // does not support a mix of dictionary and not so we will
decode the
- // dictionary.
- readBatchFromDictionaryIds(0, rowId, vector,
vector.getDictionaryIds());
+ long start = pageRowId;
+ long end = Math.min(rangeEnd, pageRowId + readBatch - 1);
+ int num = (int) (end - start + 1);
+
+ if (isCurrentPageDictionaryEncoded) {
+ // Read and decode dictionary ids.
+ runLenDecoder.readDictionaryIds(
+ num,
+ dictionaryIds,
+ vector,
+ rowId,
+ maxDefLevel,
+ this.dictionaryIdsDecoder);
+
+ if (vector.hasDictionary() || (rowId == 0 &&
supportLazyDecode())) {
+ // Column vector supports lazy decoding of dictionary
values so just set the
+ // dictionary.
+ // We can't do this if rowId != 0 AND the column
doesn't have a dictionary
+ // (i.e.
+ // some
+ // non-dictionary encoded values have already been
added).
+ vector.setDictionary(new
ParquetDictionary(dictionary));
+ } else {
+ readBatchFromDictionaryIds(rowId, num, vector,
dictionaryIds);
+ }
+ } else {
+ if (vector.hasDictionary() && rowId != 0) {
+ // This batch already has dictionary encoded values
but this new page is
+ // not.
+ // The batch
+ // does not support a mix of dictionary and not so we
will decode the
+ // dictionary.
+ readBatchFromDictionaryIds(0, rowId, vector,
vector.getDictionaryIds());
+ }
+ vector.setDictionary(null);
+ readBatch(rowId, num, vector);
}
- vector.setDictionary(null);
- readBatch(rowId, num, vector);
+ leftInBatch -= num;
+ pageRowId += num;
+ leftInPage -= num;
+ rowId += num;
}
+ readState.rowsToReadInBatch = leftInBatch;
+ readState.valuesToReadInPage = leftInPage;
+ readState.rowId = pageRowId;
+ }
+ }
- valuesRead += num;
- rowId += num;
- readNumber -= num;
+ private int readPage() {
+ DataPage page = pageReader.readPage();
+ if (page == null) {
+ return -1;
}
+ long pageFirstRowIndex = page.getFirstRowIndex().orElse(0L);
+
+ int pageValueCount =
+ page.accept(
+ new DataPage.Visitor<Integer>() {
+ @Override
+ public Integer visit(DataPageV1 dataPageV1) {
+ try {
+ return readPageV1(dataPageV1);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Integer visit(DataPageV2 dataPageV2) {
+ try {
+ return readPageV2(dataPageV2);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ readState.resetForNewPage(pageValueCount, pageFirstRowIndex);
+ return pageValueCount;
}
- private void readPageV1(DataPageV1 page) throws IOException {
- this.pageValueCount = page.getValueCount();
+ private int readPageV1(DataPageV1 page) throws IOException {
+ int pageValueCount = page.getValueCount();
ValuesReader rlReader =
page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
// Initialize the decoders.
@@ -211,30 +290,31 @@ public abstract class AbstractColumnReader<VECTOR extends
WritableColumnVector>
ByteBufferInputStream in = bytes.toInputStream();
rlReader.initFromPage(pageValueCount, in);
this.runLenDecoder.initFromStream(pageValueCount, in);
- prepareNewPage(page.getValueEncoding(), in);
+ prepareNewPage(page.getValueEncoding(), in, pageValueCount);
+ return pageValueCount;
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " +
descriptor, e);
}
}
- private void readPageV2(DataPageV2 page) throws IOException {
- this.pageValueCount = page.getValueCount();
+ private int readPageV2(DataPageV2 page) throws IOException {
+ int pageValueCount = page.getValueCount();
int bitWidth =
BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
// do not read the length from the stream. v2 pages handle dividing
the page bytes.
this.runLenDecoder = new RunLengthDecoder(bitWidth, false);
this.runLenDecoder.initFromStream(
- this.pageValueCount,
page.getDefinitionLevels().toInputStream());
+ pageValueCount, page.getDefinitionLevels().toInputStream());
try {
- prepareNewPage(page.getDataEncoding(),
page.getData().toInputStream());
+ prepareNewPage(page.getDataEncoding(),
page.getData().toInputStream(), pageValueCount);
+ return pageValueCount;
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " +
descriptor, e);
}
}
- private void prepareNewPage(Encoding dataEncoding, ByteBufferInputStream
in)
+ private void prepareNewPage(Encoding dataEncoding, ByteBufferInputStream
in, int pageValueCount)
throws IOException {
- this.endOfPageValueCount = valuesRead + pageValueCount;
if (dataEncoding.usesDictionary()) {
if (dictionary == null) {
throw new IOException(
@@ -269,6 +349,14 @@ public abstract class AbstractColumnReader<VECTOR extends
WritableColumnVector>
afterReadPage();
}
+ final void skipDataBuffer(int length) {
+ try {
+ dataInputStream.skipFully(length);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to skip " + length + "
bytes", e);
+ }
+ }
+
final ByteBuffer readDataBuffer(int length) {
try {
return
dataInputStream.slice(length).order(ByteOrder.LITTLE_ENDIAN);
@@ -291,6 +379,8 @@ public abstract class AbstractColumnReader<VECTOR extends
WritableColumnVector>
/** Read batch from {@link #runLenDecoder} and {@link #dataInputStream}. */
protected abstract void readBatch(int rowId, int num, VECTOR column);
+ protected abstract void skipBatch(int num);
+
/**
* Decode dictionary ids to data. From {@link #runLenDecoder} and {@link
#dictionaryIdsDecoder}.
*/
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java
index d5dc231d84..83d3c5a07d 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BooleanColumnReader.java
@@ -22,7 +22,7 @@ import
org.apache.paimon.data.columnar.writable.WritableBooleanVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.PrimitiveType;
@@ -36,9 +36,9 @@ public class BooleanColumnReader extends
AbstractColumnReader<WritableBooleanVec
private byte currentByte = 0;
- public BooleanColumnReader(ColumnDescriptor descriptor, PageReader
pageReader)
+ public BooleanColumnReader(ColumnDescriptor descriptor, PageReadStore
pageReadStore)
throws IOException {
- super(descriptor, pageReader);
+ super(descriptor, pageReadStore);
checkTypeName(PrimitiveType.PrimitiveTypeName.BOOLEAN);
}
@@ -94,6 +94,36 @@ public class BooleanColumnReader extends
AbstractColumnReader<WritableBooleanVec
}
}
+ @Override
+ protected void skipBatch(int num) {
+ int left = num;
+ while (left > 0) {
+ if (runLenDecoder.currentCount == 0) {
+ runLenDecoder.readNextGroup();
+ }
+ int n = Math.min(left, runLenDecoder.currentCount);
+ switch (runLenDecoder.mode) {
+ case RLE:
+ if (runLenDecoder.currentValue == maxDefLevel) {
+ for (int i = 0; i < n; i++) {
+ readBoolean();
+ }
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+ == maxDefLevel) {
+ readBoolean();
+ }
+ }
+ break;
+ }
+ left -= n;
+ runLenDecoder.currentCount -= n;
+ }
+ }
+
private boolean readBoolean() {
if (bitOffset == 0) {
try {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java
index bed9923d9b..804b8bc027 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ByteColumnReader.java
@@ -22,7 +22,7 @@ import
org.apache.paimon.data.columnar.writable.WritableByteVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
@@ -31,8 +31,9 @@ import java.nio.ByteBuffer;
/** Byte {@link ColumnReader}. Using INT32 to store byte, so just cast int to
byte. */
public class ByteColumnReader extends AbstractColumnReader<WritableByteVector>
{
- public ByteColumnReader(ColumnDescriptor descriptor, PageReader
pageReader) throws IOException {
- super(descriptor, pageReader);
+ public ByteColumnReader(ColumnDescriptor descriptor, PageReadStore
pageReadStore)
+ throws IOException {
+ super(descriptor, pageReadStore);
checkTypeName(PrimitiveType.PrimitiveTypeName.INT32);
}
@@ -69,6 +70,38 @@ public class ByteColumnReader extends
AbstractColumnReader<WritableByteVector> {
}
}
+ @Override
+ protected void skipBatch(int num) {
+ int left = num;
+ while (left > 0) {
+ if (runLenDecoder.currentCount == 0) {
+ runLenDecoder.readNextGroup();
+ }
+ int n = Math.min(left, runLenDecoder.currentCount);
+ switch (runLenDecoder.mode) {
+ case RLE:
+ if (runLenDecoder.currentValue == maxDefLevel) {
+ skipByte(n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+ == maxDefLevel) {
+ skipByte(1);
+ }
+ }
+ break;
+ }
+ left -= n;
+ runLenDecoder.currentCount -= n;
+ }
+ }
+
+ private void skipByte(int num) {
+ skipDataBuffer(4 * num);
+ }
+
@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, WritableByteVector column, WritableIntVector
dictionaryIds) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java
index e83115c8a6..6ee395e585 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BytesColumnReader.java
@@ -22,7 +22,7 @@ import
org.apache.paimon.data.columnar.writable.WritableBytesVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
@@ -31,9 +31,9 @@ import java.nio.ByteBuffer;
/** Bytes {@link ColumnReader}. A int length and bytes data. */
public class BytesColumnReader extends
AbstractColumnReader<WritableBytesVector> {
- public BytesColumnReader(ColumnDescriptor descriptor, PageReader
pageReader)
+ public BytesColumnReader(ColumnDescriptor descriptor, PageReadStore
pageReadStore)
throws IOException {
- super(descriptor, pageReader);
+ super(descriptor, pageReadStore);
checkTypeName(PrimitiveType.PrimitiveTypeName.BINARY);
}
@@ -70,6 +70,41 @@ public class BytesColumnReader extends
AbstractColumnReader<WritableBytesVector>
}
}
+ @Override
+ protected void skipBatch(int num) {
+ int left = num;
+ while (left > 0) {
+ if (runLenDecoder.currentCount == 0) {
+ runLenDecoder.readNextGroup();
+ }
+ int n = Math.min(left, runLenDecoder.currentCount);
+ switch (runLenDecoder.mode) {
+ case RLE:
+ if (runLenDecoder.currentValue == maxDefLevel) {
+ skipBinary(n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+ == maxDefLevel) {
+ skipBinary(1);
+ }
+ }
+ break;
+ }
+ left -= n;
+ runLenDecoder.currentCount -= n;
+ }
+ }
+
+ private void skipBinary(int num) {
+ for (int i = 0; i < num; i++) {
+ int len = readDataBuffer(4).getInt();
+ skipDataBuffer(len);
+ }
+ }
+
@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, WritableBytesVector column, WritableIntVector
dictionaryIds) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java
index d6d8aa2bbb..2cffd40624 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/DoubleColumnReader.java
@@ -22,7 +22,7 @@ import
org.apache.paimon.data.columnar.writable.WritableDoubleVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
@@ -31,9 +31,9 @@ import java.nio.ByteBuffer;
/** Double {@link ColumnReader}. */
public class DoubleColumnReader extends
AbstractColumnReader<WritableDoubleVector> {
- public DoubleColumnReader(ColumnDescriptor descriptor, PageReader
pageReader)
+ public DoubleColumnReader(ColumnDescriptor descriptor, PageReadStore
pageReadStore)
throws IOException {
- super(descriptor, pageReader);
+ super(descriptor, pageReadStore);
checkTypeName(PrimitiveType.PrimitiveTypeName.DOUBLE);
}
@@ -70,6 +70,38 @@ public class DoubleColumnReader extends
AbstractColumnReader<WritableDoubleVecto
}
}
+ @Override
+ protected void skipBatch(int num) {
+ int left = num;
+ while (left > 0) {
+ if (runLenDecoder.currentCount == 0) {
+ runLenDecoder.readNextGroup();
+ }
+ int n = Math.min(left, runLenDecoder.currentCount);
+ switch (runLenDecoder.mode) {
+ case RLE:
+ if (runLenDecoder.currentValue == maxDefLevel) {
+ skipDouble(n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+ == maxDefLevel) {
+ skipDouble(1);
+ }
+ }
+ break;
+ }
+ left -= n;
+ runLenDecoder.currentCount -= n;
+ }
+ }
+
+ private void skipDouble(int num) {
+ skipDataBuffer(8 * num);
+ }
+
@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, WritableDoubleVector column, WritableIntVector
dictionaryIds) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java
index afce717a67..25e1b466e4 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FixedLenBytesColumnReader.java
@@ -25,7 +25,7 @@ import
org.apache.paimon.data.columnar.writable.WritableLongVector;
import org.apache.paimon.format.parquet.ParquetSchemaConverter;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveType;
@@ -39,8 +39,9 @@ public class FixedLenBytesColumnReader<VECTOR extends
WritableColumnVector>
private final int precision;
public FixedLenBytesColumnReader(
- ColumnDescriptor descriptor, PageReader pageReader, int precision)
throws IOException {
- super(descriptor, pageReader);
+ ColumnDescriptor descriptor, PageReadStore pageReadStore, int
precision)
+ throws IOException {
+ super(descriptor, pageReadStore);
checkTypeName(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY);
this.precision = precision;
}
@@ -79,6 +80,35 @@ public class FixedLenBytesColumnReader<VECTOR extends
WritableColumnVector>
}
}
+ @Override
+ protected void skipBatch(int num) {
+ int bytesLen = descriptor.getPrimitiveType().getTypeLength();
+ if (ParquetSchemaConverter.is32BitDecimal(precision)) {
+ for (int i = 0; i < num; i++) {
+ if (runLenDecoder.readInteger() == maxDefLevel) {
+ skipDataBinary(bytesLen);
+ }
+ }
+ } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
+
+ for (int i = 0; i < num; i++) {
+ if (runLenDecoder.readInteger() == maxDefLevel) {
+ skipDataBinary(bytesLen);
+ }
+ }
+ } else {
+ for (int i = 0; i < num; i++) {
+ if (runLenDecoder.readInteger() == maxDefLevel) {
+ skipDataBinary(bytesLen);
+ }
+ }
+ }
+ }
+
+ private void skipDataBinary(int len) {
+ skipDataBuffer(len);
+ }
+
@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, VECTOR column, WritableIntVector
dictionaryIds) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java
index 1f4adfa4b9..e9eec13df5 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/FloatColumnReader.java
@@ -22,7 +22,7 @@ import
org.apache.paimon.data.columnar.writable.WritableFloatVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
@@ -31,9 +31,9 @@ import java.nio.ByteBuffer;
/** Float {@link ColumnReader}. */
public class FloatColumnReader extends
AbstractColumnReader<WritableFloatVector> {
- public FloatColumnReader(ColumnDescriptor descriptor, PageReader
pageReader)
+ public FloatColumnReader(ColumnDescriptor descriptor, PageReadStore
pageReadStore)
throws IOException {
- super(descriptor, pageReader);
+ super(descriptor, pageReadStore);
checkTypeName(PrimitiveType.PrimitiveTypeName.FLOAT);
}
@@ -70,6 +70,38 @@ public class FloatColumnReader extends
AbstractColumnReader<WritableFloatVector>
}
}
+ @Override
+ protected void skipBatch(int num) {
+ int left = num;
+ while (left > 0) {
+ if (runLenDecoder.currentCount == 0) {
+ runLenDecoder.readNextGroup();
+ }
+ int n = Math.min(left, runLenDecoder.currentCount);
+ switch (runLenDecoder.mode) {
+ case RLE:
+ if (runLenDecoder.currentValue == maxDefLevel) {
+ skipFloat(n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+ == maxDefLevel) {
+ skipFloat(1);
+ }
+ }
+ break;
+ }
+ left -= n;
+ runLenDecoder.currentCount -= n;
+ }
+ }
+
+ private void skipFloat(int num) {
+ skipDataBuffer(4 * num);
+ }
+
@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, WritableFloatVector column, WritableIntVector
dictionaryIds) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java
index e38e916d18..521ad998f6 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/IntColumnReader.java
@@ -21,7 +21,7 @@ package org.apache.paimon.format.parquet.reader;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
@@ -30,8 +30,9 @@ import java.nio.ByteBuffer;
/** Int {@link ColumnReader}. */
public class IntColumnReader extends AbstractColumnReader<WritableIntVector> {
- public IntColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
throws IOException {
- super(descriptor, pageReader);
+ public IntColumnReader(ColumnDescriptor descriptor, PageReadStore
pageReadStore)
+ throws IOException {
+ super(descriptor, pageReadStore);
checkTypeName(PrimitiveType.PrimitiveTypeName.INT32);
}
@@ -68,6 +69,38 @@ public class IntColumnReader extends
AbstractColumnReader<WritableIntVector> {
}
}
+ @Override
+ protected void skipBatch(int num) {
+ int left = num;
+ while (left > 0) {
+ if (runLenDecoder.currentCount == 0) {
+ runLenDecoder.readNextGroup();
+ }
+ int n = Math.min(left, runLenDecoder.currentCount);
+ switch (runLenDecoder.mode) {
+ case RLE:
+ if (runLenDecoder.currentValue == maxDefLevel) {
+ skipInteger(n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+ == maxDefLevel) {
+ skipInteger(1);
+ }
+ }
+ break;
+ }
+ left -= n;
+ runLenDecoder.currentCount -= n;
+ }
+ }
+
+ private void skipInteger(int num) {
+ skipDataBuffer(4 * num);
+ }
+
@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, WritableIntVector column, WritableIntVector
dictionaryIds) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java
index a8e04eae67..c4af086a70 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/LongColumnReader.java
@@ -22,7 +22,7 @@ import
org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.paimon.data.columnar.writable.WritableLongVector;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
@@ -31,8 +31,9 @@ import java.nio.ByteBuffer;
/** Long {@link ColumnReader}. */
public class LongColumnReader extends AbstractColumnReader<WritableLongVector>
{
- public LongColumnReader(ColumnDescriptor descriptor, PageReader
pageReader) throws IOException {
- super(descriptor, pageReader);
+ public LongColumnReader(ColumnDescriptor descriptor, PageReadStore
pageReadStore)
+ throws IOException {
+ super(descriptor, pageReadStore);
checkTypeName(PrimitiveType.PrimitiveTypeName.INT64);
}
@@ -69,6 +70,38 @@ public class LongColumnReader extends
AbstractColumnReader<WritableLongVector> {
}
}
+ @Override
+ protected void skipBatch(int num) {
+ int left = num;
+ while (left > 0) {
+ if (runLenDecoder.currentCount == 0) {
+ runLenDecoder.readNextGroup();
+ }
+ int n = Math.min(left, runLenDecoder.currentCount);
+ switch (runLenDecoder.mode) {
+ case RLE:
+ if (runLenDecoder.currentValue == maxDefLevel) {
+ skipValue(n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+ == maxDefLevel) {
+ skipValue(1);
+ }
+ }
+ break;
+ }
+ left -= n;
+ runLenDecoder.currentCount -= n;
+ }
+ }
+
+ private void skipValue(int num) {
+ skipDataBuffer(num * 8);
+ }
+
@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, WritableLongVector column, WritableIntVector
dictionaryIds) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
index 68225fbd13..8f20be2754 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
@@ -279,7 +279,7 @@ public class NestedColumnReader implements
ColumnReader<WritableColumnVector> {
reader =
new NestedPrimitiveColumnReader(
descriptor,
- pages.getPageReader(descriptor),
+ pages,
isUtcTimestamp,
descriptor.getPrimitiveType(),
field.getType(),
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
index 7d00ff7923..7db7aedbf6 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
@@ -44,6 +44,7 @@ import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV1;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
@@ -82,15 +83,6 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
private final boolean isUtcTimestamp;
- /** Total number of values read. */
- private long valuesRead;
-
- /**
- * value that indicates the end of the current page. That is, if
valuesRead ==
- * endOfPageValueCount, we are at the end of the page.
- */
- private long endOfPageValueCount;
-
/** If true, the current page is dictionary encoded. */
private boolean isCurrentPageDictionaryEncoded;
@@ -104,7 +96,12 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
private ParquetDataColumnReader dataColumn;
/** Total values in the current page. */
- private int pageValueCount;
+ // private int pageValueCount;
+
+ /**
+ * Helper struct to track intermediate states while reading Parquet pages
in the column chunk.
+ */
+ protected final ParquetReadState readState;
// flag to indicate if there is no data in parquet data page
private boolean eof = false;
@@ -115,7 +112,7 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
public NestedPrimitiveColumnReader(
ColumnDescriptor descriptor,
- PageReader pageReader,
+ PageReadStore pageReadStore,
boolean isUtcTimestamp,
Type parquetType,
DataType dataType,
@@ -124,7 +121,7 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
throws IOException {
this.descriptor = descriptor;
this.type = parquetType;
- this.pageReader = pageReader;
+ this.pageReader = pageReadStore.getPageReader(descriptor);
this.maxDefLevel = descriptor.getMaxDefinitionLevel();
this.isUtcTimestamp = isUtcTimestamp;
this.dataType = dataType;
@@ -132,6 +129,9 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
this.readMapKey = readMapKey;
DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+
+ this.readState = new
ParquetReadState(pageReadStore.getRowIndexes().orElse(null));
+
if (dictionaryPage != null) {
try {
this.dictionary =
@@ -166,23 +166,55 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
isFirstRow = false;
}
- // index to set value.
- int index = 0;
- int valueIndex = 0;
List<Object> valueList = new ArrayList<>();
+ int valueIndex = collectDataFromParquetPage(readNumber, valueList);
+
+ return fillColumnVector(valueIndex, valueList);
+ }
+
+ private int collectDataFromParquetPage(int total, List<Object> valueList)
throws IOException {
+ int valueIndex = 0;
// repeated type need two loops to read data.
- while (!eof && index < readNumber) {
+
+ readState.resetForNewBatch(total);
+
+ while (!eof && readState.rowsToReadInBatch > 0) {
+
+ if (readState.isFinished()) { // finished to read
+ eof = true;
+ break;
+ }
+
+ long pageRowId = readState.rowId;
+ long rangeStart = readState.currentRangeStart();
+ long rangeEnd = readState.currentRangeEnd();
+
+ if (pageRowId > rangeEnd) {
+ readState.nextRange();
+ continue;
+ }
+
+ boolean needFilterSkip = pageRowId < rangeStart;
+
do {
- if (!lastValue.shouldSkip) {
+
+ if (!lastValue.shouldSkip && !needFilterSkip) {
valueList.add(lastValue.value);
valueIndex++;
}
} while (readValue() && (repetitionLevel != 0));
- index++;
+
+ if (pageRowId == readState.rowId) {
+ readState.rowId = readState.rowId + 1;
+ }
+
+ if (!needFilterSkip) {
+ readState.rowsToReadInBatch = readState.rowsToReadInBatch - 1;
+ }
}
- return fillColumnVector(valueIndex, valueList);
+ return valueIndex;
}
public LevelDelegation getLevelDelegation() {
@@ -255,20 +287,24 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
// get the values of repetition and definitionLevel
repetitionLevel = repetitionLevelColumn.nextInt();
definitionLevel = definitionLevelColumn.nextInt();
- valuesRead++;
+ readState.valuesToReadInPage = readState.valuesToReadInPage - 1;
repetitionLevelList.add(repetitionLevel);
definitionLevelList.add(definitionLevel);
}
private int readPageIfNeed() throws IOException {
// Compute the number of values we want to read in this page.
- int leftInPage = (int) (endOfPageValueCount - valuesRead);
- if (leftInPage == 0) {
- // no data left in current page, load data from new page
- readPage();
- leftInPage = (int) (endOfPageValueCount - valuesRead);
+ if (readState.valuesToReadInPage == 0) {
+ int pageValueCount = readPage();
+ // 返回当前 page 的数据量
+ if (pageValueCount < 0) {
+ // we've read all the pages; this could happen when we're
reading a repeated list
+ // and we
+ // don't know where the list will end until we've seen all the
pages.
+ return -1;
+ }
}
- return leftInPage;
+ return readState.valuesToReadInPage;
}
private Object readPrimitiveTypedRow(DataType category) {
@@ -528,33 +564,36 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
return phbv;
}
- protected void readPage() {
+ protected int readPage() {
DataPage page = pageReader.readPage();
if (page == null) {
- return;
+ return -1;
}
- page.accept(
- new DataPage.Visitor<Void>() {
- @Override
- public Void visit(DataPageV1 dataPageV1) {
- readPageV1(dataPageV1);
- return null;
- }
+ long pageFirstRowIndex = page.getFirstRowIndex().orElse(0L);
- @Override
- public Void visit(DataPageV2 dataPageV2) {
- readPageV2(dataPageV2);
- return null;
- }
- });
+ int pageValueCount =
+ page.accept(
+ new DataPage.Visitor<Integer>() {
+ @Override
+ public Integer visit(DataPageV1 dataPageV1) {
+ return readPageV1(dataPageV1);
+ }
+
+ @Override
+ public Integer visit(DataPageV2 dataPageV2) {
+ return readPageV2(dataPageV2);
+ }
+ });
+ readState.resetForNewPage(pageValueCount, pageFirstRowIndex);
+ return pageValueCount;
}
private void initDataReader(Encoding dataEncoding, ByteBufferInputStream
in, int valueCount)
throws IOException {
- this.pageValueCount = valueCount;
- this.endOfPageValueCount = valuesRead + pageValueCount;
+ // this.pageValueCount = valueCount;
+ // this.endOfPageValueCount = valuesRead + pageValueCount;
if (dataEncoding.usesDictionary()) {
this.dataColumn = null;
if (dictionary == null) {
@@ -577,13 +616,14 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
}
try {
- dataColumn.initFromPage(pageValueCount, in);
+ dataColumn.initFromPage(valueCount, in);
} catch (IOException e) {
throw new IOException(String.format("Could not read page in col
%s.", descriptor), e);
}
}
- private void readPageV1(DataPageV1 page) {
+ private int readPageV1(DataPageV1 page) {
+ int pageValueCount = page.getValueCount();
ValuesReader rlReader =
page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
ValuesReader dlReader =
page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
@@ -597,15 +637,16 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
LOG.debug("Reading definition levels at {}.", in.position());
dlReader.initFromPage(pageValueCount, in);
LOG.debug("Reading data at {}.", in.position());
- initDataReader(page.getValueEncoding(), in, page.getValueCount());
+ initDataReader(page.getValueEncoding(), in, pageValueCount);
+ return pageValueCount;
} catch (IOException e) {
throw new ParquetDecodingException(
String.format("Could not read page %s in col %s.", page,
descriptor), e);
}
}
- private void readPageV2(DataPageV2 page) {
- this.pageValueCount = page.getValueCount();
+ private int readPageV2(DataPageV2 page) {
+ int pageValueCount = page.getValueCount();
this.repetitionLevelColumn =
newRLEIterator(descriptor.getMaxRepetitionLevel(),
page.getRepetitionLevels());
this.definitionLevelColumn =
@@ -615,8 +656,8 @@ public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnV
"Page data size {} bytes and {} records.",
page.getData().size(),
pageValueCount);
- initDataReader(
- page.getDataEncoding(), page.getData().toInputStream(),
page.getValueCount());
+ initDataReader(page.getDataEncoding(),
page.getData().toInputStream(), pageValueCount);
+ return pageValueCount;
} catch (IOException e) {
throw new ParquetDecodingException(
String.format("Could not read page %s in col %s.", page,
descriptor), e);
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReadState.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReadState.java
new file mode 100644
index 0000000000..a600367682
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReadState.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.reader;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PrimitiveIterator;
+
+/** Parquet reader state for column index. */
+public class ParquetReadState {
+ /** A special row range used when there is no row indexes (hence all rows
must be included). */
+ private static final RowRange MAX_ROW_RANGE = new RowRange(Long.MIN_VALUE,
Long.MAX_VALUE);
+
+ /**
+ * A special row range used when the row indexes are present AND all the
row ranges have been
+ * processed. This serves as a sentinel at the end indicating that all
rows come after the last
+ * row range should be skipped.
+ */
+ private static final RowRange END_ROW_RANGE = new RowRange(Long.MAX_VALUE,
Long.MIN_VALUE);
+
+ private final Iterator<RowRange> rowRanges;
+
+ private RowRange currentRange;
+
+ /** row index for the next read. */
+ long rowId;
+
+ int valuesToReadInPage;
+ int rowsToReadInBatch;
+
+ public ParquetReadState(PrimitiveIterator.OfLong rowIndexes) {
+ this.rowRanges = constructRanges(rowIndexes);
+ nextRange();
+ }
+
+ /**
+ * Construct a list of row ranges from the given `rowIndexes`. For
example, suppose the
+ * `rowIndexes` are `[0, 1, 2, 4, 5, 7, 8, 9]`, it will be converted into
3 row ranges: `[0-2],
+ * [4-5], [7-9]`.
+ */
+ private Iterator<RowRange> constructRanges(PrimitiveIterator.OfLong
rowIndexes) {
+ if (rowIndexes == null) {
+ return null;
+ }
+
+ List<RowRange> rowRanges = new ArrayList<>();
+ long currentStart = Long.MIN_VALUE;
+ long previous = Long.MIN_VALUE;
+
+ while (rowIndexes.hasNext()) {
+ long idx = rowIndexes.nextLong();
+ if (currentStart == Long.MIN_VALUE) {
+ currentStart = idx;
+ } else if (previous + 1 != idx) {
+ RowRange range = new RowRange(currentStart, previous);
+ rowRanges.add(range);
+ currentStart = idx;
+ }
+ previous = idx;
+ }
+
+ if (previous != Long.MIN_VALUE) {
+ rowRanges.add(new RowRange(currentStart, previous));
+ }
+
+ return rowRanges.iterator();
+ }
+
+ /** Must be called at the beginning of reading a new batch. */
+ void resetForNewBatch(int batchSize) {
+ this.rowsToReadInBatch = batchSize;
+ }
+
+ /** Must be called at the beginning of reading a new page. */
+ void resetForNewPage(int totalValuesInPage, long pageFirstRowIndex) {
+ this.valuesToReadInPage = totalValuesInPage;
+ this.rowId = pageFirstRowIndex;
+ }
+
+ /** Returns the start index of the current row range. */
+ public long currentRangeStart() {
+ return currentRange.start;
+ }
+
+ /** Returns the end index of the current row range. */
+ public long currentRangeEnd() {
+ return currentRange.end;
+ }
+
+ public boolean isFinished() {
+ return this.currentRange.equals(this.END_ROW_RANGE);
+ }
+
+ public boolean isMaxRange() {
+ return this.currentRange.equals(this.MAX_ROW_RANGE);
+ }
+
+ public RowRange getCurrentRange() {
+ return currentRange;
+ }
+
+ /** Advance to the next range. */
+ public void nextRange() {
+ if (rowRanges == null) {
+ currentRange = MAX_ROW_RANGE;
+ } else if (!rowRanges.hasNext()) {
+ currentRange = END_ROW_RANGE;
+ } else {
+ currentRange = rowRanges.next();
+ }
+ }
+
+ /** Helper struct to represent a range of row indexes `[start, end]`. */
+ public static class RowRange {
+ final long start;
+ final long end;
+
+ RowRange(long start, long end) {
+ this.start = start;
+ this.end = end;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof RowRange)) {
+ return false;
+ }
+ return ((RowRange) obj).start == this.start && ((RowRange)
obj).end == this.end;
+ }
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
index 860ec54fa8..a2be77414d 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
@@ -87,58 +87,45 @@ public class ParquetSplitReaderUtil {
getAllColumnDescriptorByType(depth, type, columnDescriptors);
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
- return new BooleanColumnReader(
- descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
+ return new BooleanColumnReader(descriptors.get(0), pages);
case TINYINT:
- return new ByteColumnReader(
- descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
+ return new ByteColumnReader(descriptors.get(0), pages);
case DOUBLE:
- return new DoubleColumnReader(
- descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
+ return new DoubleColumnReader(descriptors.get(0), pages);
case FLOAT:
- return new FloatColumnReader(
- descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
+ return new FloatColumnReader(descriptors.get(0), pages);
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
- return new IntColumnReader(
- descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
+ return new IntColumnReader(descriptors.get(0), pages);
case BIGINT:
- return new LongColumnReader(
- descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
+ return new LongColumnReader(descriptors.get(0), pages);
case SMALLINT:
- return new ShortColumnReader(
- descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
+ return new ShortColumnReader(descriptors.get(0), pages);
case CHAR:
case VARCHAR:
case BINARY:
case VARBINARY:
- return new BytesColumnReader(
- descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
+ return new BytesColumnReader(descriptors.get(0), pages);
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
if
(descriptors.get(0).getPrimitiveType().getPrimitiveTypeName()
== PrimitiveType.PrimitiveTypeName.INT64) {
- return new LongColumnReader(
- descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
+ return new LongColumnReader(descriptors.get(0), pages);
}
- return new TimestampColumnReader(
- true, descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
+ return new TimestampColumnReader(true, descriptors.get(0),
pages);
case DECIMAL:
switch
(descriptors.get(0).getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
- return new IntColumnReader(
- descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
+ return new IntColumnReader(descriptors.get(0), pages);
case INT64:
- return new LongColumnReader(
- descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
+ return new LongColumnReader(descriptors.get(0), pages);
case BINARY:
- return new BytesColumnReader(
- descriptors.get(0),
pages.getPageReader(descriptors.get(0)));
+ return new BytesColumnReader(descriptors.get(0),
pages);
case FIXED_LEN_BYTE_ARRAY:
return new FixedLenBytesColumnReader(
descriptors.get(0),
- pages.getPageReader(descriptors.get(0)),
+ pages,
((DecimalType) fieldType).getPrecision());
}
case ARRAY:
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java
index 2dd1655d57..ebb8f28fa1 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RunLengthDecoder.java
@@ -194,6 +194,51 @@ final class RunLengthDecoder {
}
}
+ void skipDictionaryIds(int total, int level, RunLengthDecoder data) {
+ int left = total;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ if (currentValue == level) {
+ data.skipDictionaryIdData(n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if (currentBuffer[currentBufferIdx++] == level) {
+ data.readInteger();
+ }
+ }
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
+ private void skipDictionaryIdData(int total) {
+ int left = total;
+ while (left > 0) {
+ if (this.currentCount == 0) {
+ this.readNextGroup();
+ }
+ int n = Math.min(left, this.currentCount);
+ switch (mode) {
+ case RLE:
+ break;
+ case PACKED:
+ currentBufferIdx += n;
+ break;
+ }
+ left -= n;
+ currentCount -= n;
+ }
+ }
+
/** Reads the next varint encoded int. */
private int readUnsignedVarInt() throws IOException {
int value = 0;
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java
index 7b32232261..bdb2f401fa 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ShortColumnReader.java
@@ -22,7 +22,7 @@ import
org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.paimon.data.columnar.writable.WritableShortVector;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
@@ -30,9 +30,9 @@ import java.io.IOException;
/** Short {@link ColumnReader}. Using INT32 to store short, so just cast int
to short. */
public class ShortColumnReader extends
AbstractColumnReader<WritableShortVector> {
- public ShortColumnReader(ColumnDescriptor descriptor, PageReader
pageReader)
+ public ShortColumnReader(ColumnDescriptor descriptor, PageReadStore
pageReadStore)
throws IOException {
- super(descriptor, pageReader);
+ super(descriptor, pageReadStore);
checkTypeName(PrimitiveType.PrimitiveTypeName.INT32);
}
@@ -71,6 +71,38 @@ public class ShortColumnReader extends
AbstractColumnReader<WritableShortVector>
}
}
+ @Override
+ protected void skipBatch(int num) {
+ int left = num;
+ while (left > 0) {
+ if (runLenDecoder.currentCount == 0) {
+ runLenDecoder.readNextGroup();
+ }
+ int n = Math.min(left, runLenDecoder.currentCount);
+ switch (runLenDecoder.mode) {
+ case RLE:
+ if (runLenDecoder.currentValue == maxDefLevel) {
+ skipShot(n);
+ }
+ break;
+ case PACKED:
+ for (int i = 0; i < n; ++i) {
+ if
(runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
+ == maxDefLevel) {
+ skipShot(1);
+ }
+ }
+ break;
+ }
+ left -= n;
+ runLenDecoder.currentCount -= n;
+ }
+ }
+
+ private void skipShot(int num) {
+ skipDataBuffer(4 * num);
+ }
+
@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, WritableShortVector column, WritableIntVector
dictionaryIds) {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java
index 4a279ff90e..8767173315 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/TimestampColumnReader.java
@@ -23,7 +23,7 @@ import
org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.paimon.data.columnar.writable.WritableTimestampVector;
import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.PrimitiveType;
@@ -49,9 +49,9 @@ public class TimestampColumnReader extends
AbstractColumnReader<WritableTimestam
private final boolean utcTimestamp;
public TimestampColumnReader(
- boolean utcTimestamp, ColumnDescriptor descriptor, PageReader
pageReader)
+ boolean utcTimestamp, ColumnDescriptor descriptor, PageReadStore
pageReadStore)
throws IOException {
- super(descriptor, pageReader);
+ super(descriptor, pageReadStore);
this.utcTimestamp = utcTimestamp;
checkTypeName(PrimitiveType.PrimitiveTypeName.INT96);
}
@@ -75,6 +75,15 @@ public class TimestampColumnReader extends
AbstractColumnReader<WritableTimestam
}
}
+ @Override
+ protected void skipBatch(int num) {
+ for (int i = 0; i < num; i++) {
+ if (runLenDecoder.readInteger() == maxDefLevel) {
+ skipDataBuffer(12);
+ }
+ }
+ }
+
@Override
protected void readBatchFromDictionaryIds(
int rowId, int num, WritableTimestampVector column,
WritableIntVector dictionaryIds) {