This is an automated email from the ASF dual-hosted git repository. cgivre pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push: new af7cfcd4ca DRILL-8486: fix handling of long variable length entries during bulk parquet reading (#2898) af7cfcd4ca is described below commit af7cfcd4cacf8efeb6ca69bfb5f579c6992d4681 Author: Maksym Rymar <rym...@apache.org> AuthorDate: Wed Apr 10 16:44:14 2024 +0300 DRILL-8486: fix handling of long variable length entries during bulk parquet reading (#2898) --- .../columnreaders/VarLenEntryDictionaryReader.java | 7 +++--- .../parquet/columnreaders/VarLenEntryReader.java | 6 ++--- .../VarLenNullableDictionaryReader.java | 27 ++++++++++++++-------- .../columnreaders/VarLenNullableEntryReader.java | 6 ++--- 4 files changed, 27 insertions(+), 19 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java index e003d8dd3a..03f6a661fe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java @@ -44,7 +44,7 @@ final class VarLenEntryDictionaryReader extends VarLenAbstractPageEntryReader { if (bulkProcess()) { return getEntryBulk(valuesToRead); } - return getEntrySingle(valuesToRead); + return getEntrySingle(); } private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) { @@ -82,7 +82,7 @@ final class VarLenEntryDictionaryReader extends VarLenAbstractPageEntryReader { // We're here either because a) the Parquet metadata is wrong (advertises more values than the real count) // or the first value being processed ended up to be too long for the buffer. if (numValues == 0) { - return getEntrySingle(valuesToRead); + return getEntrySingle(); } // Now set the bulk entry @@ -91,7 +91,7 @@ final class VarLenEntryDictionaryReader extends VarLenAbstractPageEntryReader { return entry; } - private final VarLenColumnBulkEntry getEntrySingle(int valsToReadWithinPage) { + private VarLenColumnBulkEntry getEntrySingle() { final ValuesReaderWrapper valueReader = pageInfo.encodedValueReader; final int[] valueLengths = entry.getValuesLength(); final Binary currEntry = valueReader.getEntry(); @@ -99,6 +99,7 @@ final class VarLenEntryDictionaryReader extends VarLenAbstractPageEntryReader { // Is there enough memory to handle this large value? if (batchMemoryConstraintsReached(0, 4, dataLen)) { + valueReader.pushBack(currEntry); entry.set(0, 0, 0, 0); // no data to be consumed return entry; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java index cec0c7ff63..58aa07be0c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java @@ -43,7 +43,7 @@ final class VarLenEntryReader extends VarLenAbstractPageEntryReader { if (bulkProcess()) { return getEntryBulk(valuesToRead); } - return getEntrySingle(valuesToRead); + return getEntrySingle(); } private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) { @@ -92,7 +92,7 @@ final class VarLenEntryReader extends VarLenAbstractPageEntryReader { // We're here either because a) the Parquet metadata is wrong (advertises more values than the real count) // or the first value being processed ended up to be too long for the buffer. if (numValues == 0) { - return getEntrySingle(valuesToRead); + return getEntrySingle(); } // Update the page data buffer offset @@ -109,7 +109,7 @@ final class VarLenEntryReader extends VarLenAbstractPageEntryReader { return entry; } - private final VarLenColumnBulkEntry getEntrySingle(int valuesToRead) { + private VarLenColumnBulkEntry getEntrySingle() { if (remainingPageData() < 4) { final String message = String.format("Invalid Parquet page metadata; cannot process advertised page count.."); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java index 5bfcb7d025..c41e844eeb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java @@ -18,26 +18,32 @@ package org.apache.drill.exec.store.parquet.columnreaders; import com.google.common.base.Preconditions; + import java.nio.ByteBuffer; + import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo; import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ValuesReaderWrapper; import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo; import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback; import org.apache.parquet.io.api.Binary; -/** Handles nullable variable data types using a dictionary */ +/** + * Handles nullable variable data types using a dictionary + */ final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader { VarLenNullableDictionaryReader(ByteBuffer buffer, - PageDataInfo pageInfo, - ColumnPrecisionInfo columnPrecInfo, - VarLenColumnBulkEntry entry, - VarLenColumnBulkInputCallback containerCallback) { + PageDataInfo pageInfo, + ColumnPrecisionInfo columnPrecInfo, + VarLenColumnBulkEntry entry, + VarLenColumnBulkInputCallback containerCallback) { super(buffer, pageInfo, columnPrecInfo, entry, containerCallback); } - /** {@inheritDoc} */ + /** + * {@inheritDoc} + */ @Override final VarLenColumnBulkEntry getEntry(int valuesToRead) { assert valuesToRead > 0; @@ -46,7 +52,7 @@ final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader if (bulkProcess()) { return getEntryBulk(valuesToRead); } - return getEntrySingle(valuesToRead); + return getEntrySingle(); } private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) { @@ -66,7 +72,7 @@ final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader // Initialize the reader if needed pageInfo.definitionLevels.readFirstIntegerIfNeeded(); - for (int idx = 0; idx < readBatch; ++idx ) { + for (int idx = 0; idx < readBatch; ++idx) { if (pageInfo.definitionLevels.readCurrInteger() == 1) { final Binary currEntry = valueReader.getEntry(); final int dataLen = currEntry.length(); @@ -97,7 +103,7 @@ final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader // We're here either because a) the Parquet metadata is wrong (advertises more values than the real count) // or the first value being processed ended up to be too long for the buffer. if (numValues == 0) { - return getEntrySingle(valuesToRead); + return getEntrySingle(); } entry.set(0, tgtPos, numValues, numValues - numNulls); @@ -105,7 +111,7 @@ final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader return entry; } - private final VarLenColumnBulkEntry getEntrySingle(int valsToReadWithinPage) { + private VarLenColumnBulkEntry getEntrySingle() { final int[] valueLengths = entry.getValuesLength(); final ValuesReaderWrapper valueReader = pageInfo.encodedValueReader; @@ -118,6 +124,7 @@ final class VarLenNullableDictionaryReader extends VarLenAbstractPageEntryReader // Is there enough memory to handle this large value? if (batchMemoryConstraintsReached(1, 4, dataLen)) { + valueReader.pushBack(currEntry); entry.set(0, 0, 0, 0); // no data to be consumed return entry; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java index ce39859ad5..aa2307841a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java @@ -45,7 +45,7 @@ final class VarLenNullableEntryReader extends VarLenAbstractPageEntryReader { if (bulkProcess()) { return getEntryBulk(valuesToRead); } - return getEntrySingle(valuesToRead); + return getEntrySingle(); } VarLenColumnBulkEntry getEntryBulk(int valuesToRead) { @@ -108,7 +108,7 @@ final class VarLenNullableEntryReader extends VarLenAbstractPageEntryReader { // We're here either because a) the Parquet metadata is wrong (advertises more values than the real count) // or the first value being processed ended up to be too long for the buffer. if (numValues == 0) { - return getEntrySingle(valuesToRead); + return getEntrySingle(); } // Update the page data buffer offset @@ -126,7 +126,7 @@ final class VarLenNullableEntryReader extends VarLenAbstractPageEntryReader { return entry; } - VarLenColumnBulkEntry getEntrySingle(int valuesToRead) { + VarLenColumnBulkEntry getEntrySingle() { // Initialize the reader if needed pageInfo.definitionLevels.readFirstIntegerIfNeeded();