This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8127e1be22 [parquet] Fix parquet Vectored read in ParquetFileReader 
(#5792)
8127e1be22 is described below

commit 8127e1be221209ccd625293f38191ec8eaff0501
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jun 25 15:00:18 2025 +0800

    [parquet] Fix parquet Vectored read in ParquetFileReader (#5792)
---
 .../apache/parquet/hadoop/ParquetFileReader.java   | 29 ++++++++--------------
 1 file changed, 11 insertions(+), 18 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 256eec1a1c..05e9d12033 100644
--- 
a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -20,6 +20,7 @@ package org.apache.parquet.hadoop;
 
 import org.apache.paimon.format.parquet.ParquetInputFile;
 import org.apache.paimon.format.parquet.ParquetInputStream;
+import org.apache.paimon.fs.FileRange;
 import org.apache.paimon.fs.VectoredReadable;
 import org.apache.paimon.utils.RoaringBitmap32;
 
@@ -74,7 +75,6 @@ import 
org.apache.parquet.internal.filter2.columnindex.RowRanges;
 import org.apache.parquet.internal.hadoop.metadata.IndexReference;
 import org.apache.parquet.io.InputFile;
 import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.io.ParquetFileRange;
 import org.apache.parquet.io.SeekableInputStream;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.PrimitiveType;
@@ -585,14 +585,8 @@ public class ParquetFileReader implements Closeable {
             List<ConsecutivePartList> allParts, ChunkListBuilder builder) 
throws IOException {
 
         if (shouldUseVectoredIo(allParts)) {
-            try {
-                readVectored(allParts, builder);
-                return;
-            } catch (IllegalArgumentException | UnsupportedOperationException 
e) {
-                // Either the arguments are wrong or somehow this is being 
invoked against
-                // a hadoop release which doesn't have the API and yet somehow 
it got here.
-                LOG.warn("readVectored() failed; falling back to normal IO 
against {}", f, e);
-            }
+            readVectored(allParts, builder);
+            return;
         }
         for (ConsecutivePartList consecutiveChunks : allParts) {
             consecutiveChunks.readAll(f, builder);
@@ -650,7 +644,7 @@ public class ParquetFileReader implements Closeable {
     private void readVectored(List<ConsecutivePartList> allParts, 
ChunkListBuilder builder)
             throws IOException {
 
-        List<ParquetFileRange> ranges = new ArrayList<>(allParts.size());
+        List<FileRange> ranges = new ArrayList<>(allParts.size());
         long totalSize = 0;
         for (ConsecutivePartList consecutiveChunks : allParts) {
             final long len = consecutiveChunks.length;
@@ -658,16 +652,16 @@ public class ParquetFileReader implements Closeable {
                     len < Integer.MAX_VALUE,
                     "Invalid length %s for vectored read operation. It must be 
less than max integer value.",
                     len);
-            ranges.add(new ParquetFileRange(consecutiveChunks.offset, (int) 
len));
+            ranges.add(FileRange.createFileRange(consecutiveChunks.offset, 
(int) len));
             totalSize += len;
         }
         LOG.debug(
                 "Reading {} bytes of data with vectored IO in {} ranges", 
totalSize, ranges.size());
         // Request a vectored read;
-        f.readVectored(ranges, options.getAllocator());
+        ((VectoredReadable) f.in()).readVectored(ranges);
         int k = 0;
         for (ConsecutivePartList consecutivePart : allParts) {
-            ParquetFileRange currRange = ranges.get(k++);
+            FileRange currRange = ranges.get(k++);
             consecutivePart.readFromVectoredRange(currRange, builder);
         }
     }
@@ -1667,9 +1661,9 @@ public class ParquetFileReader implements Closeable {
          * @throws IOException if there is an error while reading from the 
stream, including a
          *     timeout.
          */
-        public void readFromVectoredRange(ParquetFileRange currRange, 
ChunkListBuilder builder)
+        public void readFromVectoredRange(FileRange currRange, 
ChunkListBuilder builder)
                 throws IOException {
-            ByteBuffer buffer;
+            byte[] buffer;
             final long timeoutSeconds = HADOOP_VECTORED_READ_TIMEOUT_SECONDS;
             long readStart = System.nanoTime();
             try {
@@ -1678,8 +1672,7 @@ public class ParquetFileReader implements Closeable {
                         currRange,
                         timeoutSeconds);
                 buffer =
-                        FutureIO.awaitFuture(
-                                currRange.getDataReadFuture(), timeoutSeconds, 
TimeUnit.SECONDS);
+                        FutureIO.awaitFuture(currRange.getData(), 
timeoutSeconds, TimeUnit.SECONDS);
                 setReadMetrics(readStart, currRange.getLength());
                 // report in a counter the data we just scanned
                 BenchmarkCounter.incrementBytesRead(currRange.getLength());
@@ -1691,7 +1684,7 @@ public class ParquetFileReader implements Closeable {
                 LOG.error(error, e);
                 throw new IOException(error, e);
             }
-            ByteBufferInputStream stream = ByteBufferInputStream.wrap(buffer);
+            ByteBufferInputStream stream = 
ByteBufferInputStream.wrap(ByteBuffer.wrap(buffer));
             for (ChunkDescriptor descriptor : chunks) {
                 builder.add(descriptor, stream.sliceBuffers(descriptor.size), 
f);
             }

Reply via email to