This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new af7cfcd4ca DRILL-8486: fix handling of long variable length entries 
during bulk parquet reading (#2898)
af7cfcd4ca is described below

commit af7cfcd4cacf8efeb6ca69bfb5f579c6992d4681
Author: Maksym Rymar <rym...@apache.org>
AuthorDate: Wed Apr 10 16:44:14 2024 +0300

    DRILL-8486: fix handling of long variable length entries during bulk 
parquet reading (#2898)
---
 .../columnreaders/VarLenEntryDictionaryReader.java |  7 +++---
 .../parquet/columnreaders/VarLenEntryReader.java   |  6 ++---
 .../VarLenNullableDictionaryReader.java            | 27 ++++++++++++++--------
 .../columnreaders/VarLenNullableEntryReader.java   |  6 ++---
 4 files changed, 27 insertions(+), 19 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
index e003d8dd3a..03f6a661fe 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryDictionaryReader.java
@@ -44,7 +44,7 @@ final class VarLenEntryDictionaryReader extends 
VarLenAbstractPageEntryReader {
     if (bulkProcess()) {
       return getEntryBulk(valuesToRead);
     }
-    return getEntrySingle(valuesToRead);
+    return getEntrySingle();
   }
 
   private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
@@ -82,7 +82,7 @@ final class VarLenEntryDictionaryReader extends 
VarLenAbstractPageEntryReader {
     // We're here either because a) the Parquet metadata is wrong (advertises 
more values than the real count)
     // or the first value being processed ended up to be too long for the 
buffer.
     if (numValues == 0) {
-      return getEntrySingle(valuesToRead);
+      return getEntrySingle();
     }
 
     // Now set the bulk entry
@@ -91,7 +91,7 @@ final class VarLenEntryDictionaryReader extends 
VarLenAbstractPageEntryReader {
     return entry;
   }
 
-  private final VarLenColumnBulkEntry getEntrySingle(int valsToReadWithinPage) 
{
+  private VarLenColumnBulkEntry getEntrySingle() {
     final ValuesReaderWrapper valueReader = pageInfo.encodedValueReader;
     final int[] valueLengths = entry.getValuesLength();
     final Binary currEntry = valueReader.getEntry();
@@ -99,6 +99,7 @@ final class VarLenEntryDictionaryReader extends 
VarLenAbstractPageEntryReader {
 
     // Is there enough memory to handle this large value?
     if (batchMemoryConstraintsReached(0, 4, dataLen)) {
+      valueReader.pushBack(currEntry);
       entry.set(0, 0, 0, 0); // no data to be consumed
       return entry;
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
index cec0c7ff63..58aa07be0c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenEntryReader.java
@@ -43,7 +43,7 @@ final class VarLenEntryReader extends 
VarLenAbstractPageEntryReader {
     if (bulkProcess()) {
       return getEntryBulk(valuesToRead);
     }
-    return getEntrySingle(valuesToRead);
+    return getEntrySingle();
   }
 
   private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
@@ -92,7 +92,7 @@ final class VarLenEntryReader extends 
VarLenAbstractPageEntryReader {
     // We're here either because a) the Parquet metadata is wrong (advertises 
more values than the real count)
     // or the first value being processed ended up to be too long for the 
buffer.
     if (numValues == 0) {
-      return getEntrySingle(valuesToRead);
+      return getEntrySingle();
     }
 
     // Update the page data buffer offset
@@ -109,7 +109,7 @@ final class VarLenEntryReader extends 
VarLenAbstractPageEntryReader {
     return entry;
   }
 
-  private final VarLenColumnBulkEntry getEntrySingle(int valuesToRead) {
+  private VarLenColumnBulkEntry getEntrySingle() {
 
     if (remainingPageData() < 4) {
       final String message = String.format("Invalid Parquet page metadata; 
cannot process advertised page count..");
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
index 5bfcb7d025..c41e844eeb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableDictionaryReader.java
@@ -18,26 +18,32 @@
 package org.apache.drill.exec.store.parquet.columnreaders;
 
 import com.google.common.base.Preconditions;
+
 import java.nio.ByteBuffer;
+
 import 
org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
 import 
org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ValuesReaderWrapper;
 import 
org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
 import 
org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
 import org.apache.parquet.io.api.Binary;
 
-/** Handles nullable variable data types using a dictionary */
+/**
+ * Handles nullable variable data types using a dictionary
+ */
 final class VarLenNullableDictionaryReader extends 
VarLenAbstractPageEntryReader {
 
   VarLenNullableDictionaryReader(ByteBuffer buffer,
-    PageDataInfo pageInfo,
-    ColumnPrecisionInfo columnPrecInfo,
-    VarLenColumnBulkEntry entry,
-    VarLenColumnBulkInputCallback containerCallback) {
+                                 PageDataInfo pageInfo,
+                                 ColumnPrecisionInfo columnPrecInfo,
+                                 VarLenColumnBulkEntry entry,
+                                 VarLenColumnBulkInputCallback 
containerCallback) {
 
     super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
   }
 
-  /** {@inheritDoc} */
+  /**
+   * {@inheritDoc}
+   */
   @Override
   final VarLenColumnBulkEntry getEntry(int valuesToRead) {
     assert valuesToRead > 0;
@@ -46,7 +52,7 @@ final class VarLenNullableDictionaryReader extends 
VarLenAbstractPageEntryReader
     if (bulkProcess()) {
       return getEntryBulk(valuesToRead);
     }
-    return getEntrySingle(valuesToRead);
+    return getEntrySingle();
   }
 
   private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
@@ -66,7 +72,7 @@ final class VarLenNullableDictionaryReader extends 
VarLenAbstractPageEntryReader
     // Initialize the reader if needed
     pageInfo.definitionLevels.readFirstIntegerIfNeeded();
 
-    for (int idx = 0; idx < readBatch; ++idx ) {
+    for (int idx = 0; idx < readBatch; ++idx) {
       if (pageInfo.definitionLevels.readCurrInteger() == 1) {
         final Binary currEntry = valueReader.getEntry();
         final int dataLen = currEntry.length();
@@ -97,7 +103,7 @@ final class VarLenNullableDictionaryReader extends 
VarLenAbstractPageEntryReader
     // We're here either because a) the Parquet metadata is wrong (advertises 
more values than the real count)
     // or the first value being processed ended up to be too long for the 
buffer.
     if (numValues == 0) {
-      return getEntrySingle(valuesToRead);
+      return getEntrySingle();
     }
 
     entry.set(0, tgtPos, numValues, numValues - numNulls);
@@ -105,7 +111,7 @@ final class VarLenNullableDictionaryReader extends 
VarLenAbstractPageEntryReader
     return entry;
   }
 
-  private final VarLenColumnBulkEntry getEntrySingle(int valsToReadWithinPage) 
{
+  private VarLenColumnBulkEntry getEntrySingle() {
     final int[] valueLengths = entry.getValuesLength();
     final ValuesReaderWrapper valueReader = pageInfo.encodedValueReader;
 
@@ -118,6 +124,7 @@ final class VarLenNullableDictionaryReader extends 
VarLenAbstractPageEntryReader
 
       // Is there enough memory to handle this large value?
       if (batchMemoryConstraintsReached(1, 4, dataLen)) {
+        valueReader.pushBack(currEntry);
         entry.set(0, 0, 0, 0); // no data to be consumed
         return entry;
       }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
index ce39859ad5..aa2307841a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableEntryReader.java
@@ -45,7 +45,7 @@ final class VarLenNullableEntryReader extends 
VarLenAbstractPageEntryReader {
     if (bulkProcess()) {
       return getEntryBulk(valuesToRead);
     }
-    return getEntrySingle(valuesToRead);
+    return getEntrySingle();
   }
 
   VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
@@ -108,7 +108,7 @@ final class VarLenNullableEntryReader extends 
VarLenAbstractPageEntryReader {
     // We're here either because a) the Parquet metadata is wrong (advertises 
more values than the real count)
     // or the first value being processed ended up to be too long for the 
buffer.
     if (numValues == 0) {
-      return getEntrySingle(valuesToRead);
+      return getEntrySingle();
     }
 
     // Update the page data buffer offset
@@ -126,7 +126,7 @@ final class VarLenNullableEntryReader extends 
VarLenAbstractPageEntryReader {
     return entry;
   }
 
-  VarLenColumnBulkEntry getEntrySingle(int valuesToRead) {
+  VarLenColumnBulkEntry getEntrySingle() {
 
     // Initialize the reader if needed
     pageInfo.definitionLevels.readFirstIntegerIfNeeded();

Reply via email to