This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 245cbad9e [core] Introduce VectoredReadable to SeekableInputStream 
(#3369)
245cbad9e is described below

commit 245cbad9e11c765ec099014ba8f6b310d55f43f0
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed May 29 14:06:24 2024 +0800

    [core] Introduce VectoredReadable to SeekableInputStream (#3369)
---
 LICENSE                                            |   3 +
 .../paimon/benchmark/TableReadBenchmark.java       |  25 +-
 .../java/org/apache/paimon/fs/FileIOUtils.java     |   7 +
 .../main/java/org/apache/paimon/fs/FileRange.java  |  83 +++
 .../org/apache/paimon/fs/VectoredReadUtils.java    | 247 +++++++
 .../org/apache/paimon/fs/VectoredReadable.java     |  79 ++
 .../org/apache/paimon/fs/local/LocalFileIO.java    |  14 +-
 .../org/apache/paimon/utils/BlockingExecutor.java  |  51 ++
 .../java/org/apache/paimon/utils/ThreadUtils.java  |  36 +
 .../apache/paimon/fs/VectoredReadUtilsTest.java    | 130 ++++
 .../org/apache/orc/impl/RecordReaderUtils.java     | 805 +++++++++++++++++++++
 .../paimon/format/fs/FSDataWrappedInputStream.java |  84 +++
 .../paimon/format/fs/HadoopReadOnlyFileSystem.java |  59 --
 13 files changed, 1562 insertions(+), 61 deletions(-)

diff --git a/LICENSE b/LICENSE
index 813296254..8f291d3a0 100644
--- a/LICENSE
+++ b/LICENSE
@@ -209,6 +209,8 @@ Apache Software Foundation License 2.0
 --------------------------------------
 
 paimon-common/src/main/java/org/apache/paimon/fs/Path.java
+paimon-common/src/main/java/org/apache/paimon/fs/FileRange.java
+paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java
 from http://hadoop.apache.org/ version 2.10.2
 
 
paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java
@@ -258,6 +260,7 @@ from https://hive.apache.org/ version 3.1.0
 paimon-format/src/main/java/org/apache/orc/impl/PhysicalFsWriter.java
 paimon-format/src/main/java/org/apache/orc/impl/WriterImpl.java
 paimon-format/src/main/java/org/apache/orc/impl/ZstdCodec.java
+paimon-format/src/main/java/org/apache/orc/impl/RecordReaderUtils.java
 paimon-format/src/main/java/org/apache/orc/CompressionKind.java
 paimon-format/src/main/java/org/apache/orc/OrcConf.java
 paimon-format/src/main/java/org/apache/orc/OrcFile.java
diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java
index 4047b0d57..5ceaa95f4 100644
--- 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java
@@ -31,6 +31,8 @@ import org.apache.paimon.table.source.Split;
 
 import org.junit.jupiter.api.Test;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -78,6 +80,20 @@ public class TableReadBenchmark extends TableBenchmark {
          */
     }
 
+    @Test
+    public void testOrcReadProjection() throws Exception {
+        innerTestProjection(
+                Collections.singletonMap("orc", prepareData(orc(), "orc")),
+                new int[] {0, 5, 10, 14});
+        /*
+         * OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16
+         * Apple M1 Pro
+         * read:                            Best/Avg Time(ms)    Row Rate(K/s) 
     Per Row(ns)   Relative
+         * 
------------------------------------------------------------------------------------------------
+         * OPERATORTEST_read_read-orc            716 /  728           4187.4   
         238.8       1.0X
+         */
+    }
+
     private Options orc() {
         Options options = new Options();
         options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_ORC);
@@ -97,6 +113,10 @@ public class TableReadBenchmark extends TableBenchmark {
     }
 
     private void innerTest(Map<String, Table> tables) {
+        innerTestProjection(tables, null);
+    }
+
+    private void innerTestProjection(Map<String, Table> tables, @Nullable 
int[] projection) {
         int readTime = 3;
         Benchmark benchmark =
                 new Benchmark("read", readTime * rowCount)
@@ -115,7 +135,10 @@ public class TableReadBenchmark extends TableBenchmark {
                             try {
                                 for (Split split : splits) {
                                     RecordReader<InternalRow> reader =
-                                            
table.newReadBuilder().newRead().createReader(split);
+                                            table.newReadBuilder()
+                                                    .withProjection(projection)
+                                                    .newRead()
+                                                    .createReader(split);
                                     reader.forEachRemaining(row -> 
readCount.incrementAndGet());
                                 }
                                 System.out.printf("Finish read %d rows.\n", 
readCount.get());
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIOUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIOUtils.java
index f7637c822..556453424 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIOUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIOUtils.java
@@ -21,10 +21,17 @@ package org.apache.paimon.fs;
 import org.apache.paimon.catalog.CatalogContext;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.paimon.utils.ThreadUtils.newDaemonThreadFactory;
 
 /** Utils for {@link FileIO}. */
 public class FileIOUtils {
 
+    public static final ExecutorService IO_THREAD_POOL =
+            
Executors.newCachedThreadPool(newDaemonThreadFactory("IO-THREAD-POOL"));
+
     public static FileIOLoader checkAccess(FileIOLoader fileIO, Path path, 
CatalogContext config)
             throws IOException {
         if (fileIO == null) {
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileRange.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/FileRange.java
new file mode 100644
index 000000000..2c85ae500
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileRange.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.util.concurrent.CompletableFuture;
+
+/* This file is based on source code from the Hadoop Project 
(http://hadoop.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A byte range of a file. */
+public interface FileRange {
+
+    /** Get the starting offset of the range. */
+    long getOffset();
+
+    /** Get the length of the range. */
+    int getLength();
+
+    /** Get the future data for this range. */
+    CompletableFuture<byte[]> getData();
+
+    /**
+     * Factory method to create a FileRange object.
+     *
+     * @param offset starting offset of the range.
+     * @param length length of the range.
+     * @return a new instance of FileRangeImpl.
+     */
+    static FileRange createFileRange(long offset, int length) {
+        return new FileRangeImpl(offset, length);
+    }
+
+    /** An implementation for {@link FileRange}. */
+    class FileRangeImpl implements FileRange {
+
+        private final long offset;
+        private final int length;
+        private final CompletableFuture<byte[]> reader;
+
+        public FileRangeImpl(long offset, int length) {
+            this.offset = offset;
+            this.length = length;
+            this.reader = new CompletableFuture<>();
+        }
+
+        @Override
+        public String toString() {
+            return "range[" + offset + "," + (offset + length) + ")";
+        }
+
+        @Override
+        public long getOffset() {
+            return offset;
+        }
+
+        @Override
+        public int getLength() {
+            return length;
+        }
+
+        @Override
+        public CompletableFuture<byte[]> getData() {
+            return reader;
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java
new file mode 100644
index 000000000..de7974aca
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.apache.paimon.utils.BlockingExecutor;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.paimon.fs.FileIOUtils.IO_THREAD_POOL;
+import static org.apache.paimon.fs.FileRange.createFileRange;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/* This file is based on source code from the Hadoop Project 
(http://hadoop.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Utils for {@link VectoredReadable}. */
+public class VectoredReadUtils {
+
+    public static void readVectored(VectoredReadable readable, List<? extends 
FileRange> ranges)
+            throws IOException {
+        if (ranges.isEmpty()) {
+            return;
+        }
+
+        List<CombinedRange> combinedRanges =
+                mergeSortedRanges(validateAndSortRanges(ranges), 
readable.minSeekForVectorReads());
+
+        int parallelism = readable.parallelismForVectorReads();
+        BlockingExecutor executor = new BlockingExecutor(IO_THREAD_POOL, 
parallelism);
+        long batchSize = readable.batchSizeForVectorReads();
+        for (CombinedRange combinedRange : combinedRanges) {
+            if (combinedRange.underlying.size() == 1) {
+                FileRange fileRange = combinedRange.underlying.get(0);
+                executor.submit(() -> readSingleRange(readable, fileRange));
+            } else {
+                List<FileRange> splitBatches = 
combinedRange.splitBatches(batchSize, parallelism);
+                splitBatches.forEach(
+                        range -> executor.submit(() -> 
readSingleRange(readable, range)));
+                List<CompletableFuture<byte[]>> futures =
+                        
splitBatches.stream().map(FileRange::getData).collect(Collectors.toList());
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture<?>[0]))
+                        .thenAcceptAsync(
+                                unused -> copyToFileRanges(combinedRange, 
futures), IO_THREAD_POOL);
+            }
+        }
+    }
+
+    private static void readSingleRange(VectoredReadable readable, FileRange 
range) {
+        if (range.getLength() == 0) {
+            range.getData().complete(new byte[0]);
+            return;
+        }
+        try {
+            long position = range.getOffset();
+            int length = range.getLength();
+            byte[] buffer = new byte[length];
+            readable.preadFully(position, buffer, 0, length);
+            range.getData().complete(buffer);
+        } catch (Exception ex) {
+            range.getData().completeExceptionally(ex);
+        }
+    }
+
+    private static void copyToFileRanges(
+            CombinedRange combinedRange, List<CompletableFuture<byte[]>> 
futures) {
+        List<byte[]> segments = new ArrayList<>(futures.size());
+        for (CompletableFuture<byte[]> future : futures) {
+            segments.add(future.join());
+        }
+        long offset = combinedRange.offset;
+        for (FileRange fileRange : combinedRange.underlying) {
+            byte[] buffer = new byte[fileRange.getLength()];
+            copyMultiBytesToBytes(
+                    segments,
+                    (int) (fileRange.getOffset() - offset),
+                    buffer,
+                    fileRange.getLength());
+            fileRange.getData().complete(buffer);
+        }
+    }
+
+    private static void copyMultiBytesToBytes(
+            List<byte[]> segments, int offset, byte[] bytes, int numBytes) {
+        int remainSize = numBytes;
+        for (byte[] segment : segments) {
+            int remain = segment.length - offset;
+            if (remain > 0) {
+                int nCopy = Math.min(remain, remainSize);
+                System.arraycopy(segment, offset, bytes, numBytes - 
remainSize, nCopy);
+                remainSize -= nCopy;
+                // next new segment.
+                offset = 0;
+                if (remainSize == 0) {
+                    return;
+                }
+            } else {
+                // remain is negative, let's advance to next segment
+                // now the offset = offset - segmentSize (-remain)
+                offset = -remain;
+            }
+        }
+    }
+
+    private static List<? extends FileRange> validateAndSortRanges(
+            final List<? extends FileRange> input) throws EOFException {
+        requireNonNull(input, "Null input list");
+        checkArgument(!input.isEmpty(), "Empty input list");
+        final List<? extends FileRange> sortedRanges;
+
+        if (input.size() == 1) {
+            validateRangeRequest(input.get(0));
+            sortedRanges = input;
+        } else {
+            sortedRanges = sortRanges(input);
+            FileRange prev = null;
+            for (final FileRange current : sortedRanges) {
+                validateRangeRequest(current);
+                if (prev != null) {
+                    checkArgument(
+                            current.getOffset() >= prev.getOffset() + 
prev.getLength(),
+                            "Overlapping ranges %s and %s",
+                            prev,
+                            current);
+                }
+                prev = current;
+            }
+        }
+        return sortedRanges;
+    }
+
+    private static void validateRangeRequest(FileRange range) throws 
EOFException {
+        requireNonNull(range, "range is null");
+        checkArgument(range.getLength() >= 0, "length is negative in %s", 
range);
+        if (range.getOffset() < 0) {
+            throw new EOFException("position is negative in range " + range);
+        }
+    }
+
+    private static List<? extends FileRange> sortRanges(List<? extends 
FileRange> input) {
+        List<? extends FileRange> ret = new ArrayList<>(input);
+        ret.sort(Comparator.comparingLong(FileRange::getOffset));
+        return ret;
+    }
+
+    private static List<CombinedRange> mergeSortedRanges(
+            List<? extends FileRange> sortedRanges, int minimumSeek) {
+
+        CombinedRange current = null;
+        List<CombinedRange> result = new ArrayList<>(sortedRanges.size());
+
+        // now merge together the ones that merge
+        for (FileRange range : sortedRanges) {
+            long start = range.getOffset();
+            long end = range.getOffset() + range.getLength();
+            if (current == null || !current.merge(start, end, range, 
minimumSeek)) {
+                current = new CombinedRange(start, end, range);
+                result.add(current);
+            }
+        }
+        return result;
+    }
+
+    private static class CombinedRange {
+
+        private final List<FileRange> underlying = new ArrayList<>();
+        private final long offset;
+
+        private int length;
+        private long dataSize;
+
+        public CombinedRange(long offset, long end, FileRange original) {
+            this.offset = offset;
+            this.length = (int) (end - offset);
+            append(original);
+        }
+
+        private void append(final FileRange range) {
+            this.underlying.add(range);
+            dataSize += range.getLength();
+        }
+
+        public boolean merge(long otherOffset, long otherEnd, FileRange other, 
int minSeek) {
+            long end = offset + length;
+            long newEnd = Math.max(end, otherEnd);
+            if (otherOffset - end >= minSeek) {
+                return false;
+            }
+            this.length = (int) (newEnd - offset);
+            append(other);
+            return true;
+        }
+
+        private List<FileRange> splitBatches(long batchSize, int parallelism) {
+            long expectedSize = Math.max(batchSize, (length / parallelism) + 
1);
+            List<FileRange> splitBatches = new ArrayList<>();
+            long offset = this.offset;
+            long end = offset + length;
+
+            // split only when remain size exceeds twice the batchSize to 
avoid small File IO
+            long minRemain = Math.max(expectedSize, batchSize * 2);
+
+            while (true) {
+                if (end < offset + minRemain) {
+                    int currentLen = (int) (end - offset);
+                    if (currentLen > 0) {
+                        splitBatches.add(createFileRange(offset, currentLen));
+                    }
+                    break;
+                } else {
+                    splitBatches.add(createFileRange(offset, (int) 
expectedSize));
+                    offset += expectedSize;
+                }
+            }
+            return splitBatches;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "CombinedRange: range count=%d, data size=%,d", 
underlying.size(), dataSize);
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadable.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadable.java
new file mode 100644
index 000000000..a3d3faace
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadable.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/** Stream that permits vectored reading. */
+public interface VectoredReadable {
+
+    /**
+     * Read up to the specified number of bytes, from a given position within 
a file, and return the
+     * number of bytes read. This does not change the current offset of a 
file, and is thread-safe.
+     */
+    int pread(long position, byte[] buffer, int offset, int length) throws 
IOException;
+
+    /**
+     * Read the specified number of bytes fully, from a given position within 
a file. This does not
+     * change the current offset of a file, and is thread-safe.
+     */
+    default void preadFully(long position, byte[] buffer, int offset, int 
length)
+            throws IOException {
+        int readBytes = 0;
+        while (readBytes < length) {
+            int readBytesCurr = pread(position, buffer, offset + readBytes, 
length - readBytes);
+            if (readBytesCurr < 0) {
+                throw new EOFException(
+                        String.format(
+                                "Input Stream closed before all bytes were 
read."
+                                        + " Expected %,d bytes but only read 
%,d bytes. Current position %,d",
+                                length, readBytes, position));
+            }
+            readBytes += readBytesCurr;
+            position += readBytesCurr;
+        }
+    }
+
+    /** The smallest reasonable seek. */
+    default int minSeekForVectorReads() {
+        return 256 * 1024;
+    }
+
+    /** The batch size of data read by a single parallelism. */
+    default int batchSizeForVectorReads() {
+        return 1024 * 1024;
+    }
+
+    /** The read parallelism for vector reads. */
+    default int parallelismForVectorReads() {
+        return 4;
+    }
+
+    /**
+     * Read fully a list of file ranges asynchronously from this file.
+     *
+     * <p>As a result of the call, each range will have 
FileRange.setData(CompletableFuture) called
+     * with a future that when complete will have a ByteBuffer with the data 
from the file's range.
+     */
+    default void readVectored(List<? extends FileRange> ranges) throws 
IOException {
+        VectoredReadUtils.readVectored(this, ranges);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
index 55d264f1c..82d8145a2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
@@ -24,12 +24,14 @@ import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.VectoredReadable;
 
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.DirectoryNotEmptyException;
@@ -233,7 +235,8 @@ public class LocalFileIO implements FileIO {
     }
 
     /** Local {@link SeekableInputStream}. */
-    public static class LocalSeekableInputStream extends SeekableInputStream {
+    public static class LocalSeekableInputStream extends SeekableInputStream
+            implements VectoredReadable {
 
         private final FileInputStream in;
         private final FileChannel channel;
@@ -269,6 +272,15 @@ public class LocalFileIO implements FileIO {
         public void close() throws IOException {
             in.close();
         }
+
+        @Override
+        public int pread(long position, byte[] b, int off, int len) throws 
IOException {
+            if (len == 0) {
+                return 0;
+            }
+
+            return channel.read(ByteBuffer.wrap(b, off, len), position);
+        }
     }
 
     /** Local {@link PositionOutputStream}. */
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/BlockingExecutor.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/BlockingExecutor.java
new file mode 100644
index 000000000..142312ef4
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/BlockingExecutor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
+
+/** A executor wrapper to execute with {@link Semaphore}. */
+public class BlockingExecutor {
+
+    private final Semaphore semaphore;
+    private final ExecutorService executor;
+
+    public BlockingExecutor(ExecutorService executor, int permits) {
+        this.semaphore = new Semaphore(permits, true);
+        this.executor = executor;
+    }
+
+    public void submit(Runnable task) {
+        try {
+            semaphore.acquire();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        }
+        executor.submit(
+                () -> {
+                    try {
+                        task.run();
+                    } finally {
+                        semaphore.release();
+                    }
+                });
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadUtils.java
index 25779892b..5479f0653 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadUtils.java
@@ -23,6 +23,8 @@ import org.slf4j.Logger;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.util.Arrays;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 /** Utils for thread. */
@@ -54,4 +56,38 @@ public class ThreadUtils {
         }
         return false;
     }
+
+    public static ThreadFactory newDaemonThreadFactory(final String prefix) {
+        final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
+        return new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread t = namedFactory.newThread(r);
+                if (!t.isDaemon()) {
+                    t.setDaemon(true);
+                }
+                if (t.getPriority() != Thread.NORM_PRIORITY) {
+                    t.setPriority(Thread.NORM_PRIORITY);
+                }
+                return t;
+            }
+        };
+    }
+
+    private static ThreadFactory getNamedThreadFactory(final String prefix) {
+        SecurityManager s = System.getSecurityManager();
+        final ThreadGroup threadGroup =
+                (s != null) ? s.getThreadGroup() : 
Thread.currentThread().getThreadGroup();
+
+        return new ThreadFactory() {
+            private final AtomicInteger threadNumber = new AtomicInteger(1);
+            private final ThreadGroup group = threadGroup;
+
+            @Override
+            public Thread newThread(Runnable r) {
+                final String name = prefix + "-t" + 
threadNumber.getAndIncrement();
+                return new Thread(group, r, name);
+            }
+        };
+    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java 
b/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java
new file mode 100644
index 000000000..a3264b08e
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class VectoredReadUtilsTest {
+
+    private final byte[] bytes;
+    private final VectoredReadable readable;
+
+    public VectoredReadUtilsTest() {
+        this.bytes = new byte[1024 * 1024];
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        random.nextBytes(bytes);
+        this.readable =
+                new VectoredReadable() {
+                    @Override
+                    public int minSeekForVectorReads() {
+                        return 100;
+                    }
+
+                    @Override
+                    public int batchSizeForVectorReads() {
+                        return 1000;
+                    }
+
+                    @Override
+                    public int pread(long position, byte[] buffer, int offset, 
int length)
+                            throws IOException {
+                        boolean returnAll = random.nextBoolean();
+                        int len = returnAll ? length : random.nextInt(length) 
+ 1;
+                        System.arraycopy(bytes, (int) position, buffer, 
offset, len);
+                        return len;
+                    }
+                };
+    }
+
+    private void doTest(List<FileRange> ranges) throws Exception {
+        VectoredReadUtils.readVectored(readable, ranges);
+        for (FileRange range : ranges) {
+            byte[] expected = new byte[range.getLength()];
+            System.arraycopy(bytes, (int) range.getOffset(), expected, 0, 
range.getLength());
+            assertThat(range.getData().get()).isEqualTo(expected);
+        }
+    }
+
+    @Test
+    public void testNormal() throws Exception {
+        // test empty
+        doTest(Collections.emptyList());
+
+        // test without merge
+        doTest(
+                Arrays.asList(
+                        FileRange.createFileRange(0, 100),
+                        FileRange.createFileRange(100, 200),
+                        FileRange.createFileRange(500, 1000)));
+
+        // test with merge
+        doTest(
+                Arrays.asList(
+                        FileRange.createFileRange(0, 60),
+                        FileRange.createFileRange(100, 90),
+                        FileRange.createFileRange(300, 200)));
+
+        // test with batchSize
+        doTest(
+                Arrays.asList(
+                        FileRange.createFileRange(60, 800),
+                        FileRange.createFileRange(1000, 500),
+                        FileRange.createFileRange(1550, 600)));
+
+        // test with align huge
+        doTest(
+                Arrays.asList(
+                        FileRange.createFileRange(0, 5000),
+                        FileRange.createFileRange(6000, 500),
+                        FileRange.createFileRange(7000, 800)));
+
+        // test with no align huge
+        doTest(
+                Arrays.asList(
+                        FileRange.createFileRange(60, 5120),
+                        FileRange.createFileRange(6020, 520),
+                        FileRange.createFileRange(7300, 850)));
+    }
+
+    @Test
+    public void testRandom() throws Exception {
+        List<FileRange> ranges = new ArrayList<>();
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        int lastEnd = 0;
+        for (int i = 0; i < random.nextInt(10); i++) {
+            int start = random.nextInt(102 * 1024) + lastEnd;
+            int len = random.nextInt(102 * 1024) + 1;
+            if (start + len > bytes.length) {
+                break;
+            }
+            ranges.add(FileRange.createFileRange(start, len));
+            lastEnd = start + len;
+        }
+        doTest(ranges);
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderUtils.java 
b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderUtils.java
new file mode 100644
index 000000000..0ceaadb44
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.paimon.format.fs.FSDataWrappedInputStream;
+import org.apache.paimon.fs.FileRange;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.VectoredReadable;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.DataReader;
+import org.apache.orc.OrcProto;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+/* This file is based on source code from the ORC Project 
(http://orc.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * Stateless methods shared between RecordReaderImpl and EncodedReaderImpl.
+ *
+ * <p>NOTE: The file was copied and modified to support {@link 
VectoredReadable}.
+ */
+public class RecordReaderUtils {
+
+    // for uncompressed streams, what is the most overlap with the following 
set
+    // of rows (long vint literal group).
+    static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+    // the maximum number of values that need to be consumed from the run
+    static final int MAX_VALUES_LENGTH = RunLengthIntegerWriterV2.MAX_SCOPE;
+    // the maximum byte width for each value
+    static final int MAX_BYTE_WIDTH =
+            
SerializationUtils.decodeBitWidth(SerializationUtils.FixedBitSizes.SIXTYFOUR.ordinal())
+                    / 8;
+    private static final HadoopShims SHIMS = HadoopShimsFactory.get();
+    private static final Logger LOG = 
LoggerFactory.getLogger(RecordReaderUtils.class);
+    private static final int BYTE_STREAM_POSITIONS = 1;
+    private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS 
+ 1;
+    private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 
1;
+    private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS 
+ 1;
+
+    public static DataReader createDefaultDataReader(DataReaderProperties 
properties) {
+        return new DefaultDataReader(properties);
+    }
+
+    /**
+     * Does region A overlap region B? The end points are inclusive on both 
sides.
+     *
+     * @param leftA A's left point
+     * @param rightA A's right point
+     * @param leftB B's left point
+     * @param rightB B's right point
+     * @return Does region A overlap region B?
+     */
+    static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
+        if (leftA <= leftB) {
+            return rightA >= leftB;
+        }
+        return rightB >= leftA;
+    }
+
+    public static long estimateRgEndOffset(
+            boolean isCompressed,
+            int bufferSize,
+            boolean isLast,
+            long nextGroupOffset,
+            long streamLength) {
+        // Figure out the worst case last location
+        long slop = WORST_UNCOMPRESSED_SLOP;
+        // Stretch the slop by a factor to safely accommodate following 
compression blocks.
+        // We need to calculate the maximum number of blocks(stretchFactor) by 
bufferSize
+        // accordingly.
+        if (isCompressed) {
+            int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) / 
bufferSize;
+            slop = (long) stretchFactor * (OutStream.HEADER_SIZE + bufferSize);
+        }
+        return isLast ? streamLength : Math.min(streamLength, nextGroupOffset 
+ slop);
+    }
+
+    /**
+     * Get the offset in the index positions for the column that the given 
stream starts.
+     *
+     * @param columnEncoding the encoding of the column
+     * @param columnType the type of the column
+     * @param streamType the kind of the stream
+     * @param isCompressed is the stream compressed?
+     * @param hasNulls does the column have a PRESENT stream?
+     * @return the number of positions that will be used for that stream
+     */
+    public static int getIndexPosition(
+            OrcProto.ColumnEncoding.Kind columnEncoding,
+            TypeDescription.Category columnType,
+            OrcProto.Stream.Kind streamType,
+            boolean isCompressed,
+            boolean hasNulls) {
+        if (streamType == OrcProto.Stream.Kind.PRESENT) {
+            return 0;
+        }
+        int compressionValue = isCompressed ? 1 : 0;
+        int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
+        switch (columnType) {
+            case BOOLEAN:
+            case BYTE:
+            case SHORT:
+            case INT:
+            case LONG:
+            case FLOAT:
+            case DOUBLE:
+            case DATE:
+            case STRUCT:
+            case MAP:
+            case LIST:
+            case UNION:
+                return base;
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY
+                        || columnEncoding == 
OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+                    return base;
+                } else {
+                    if (streamType == OrcProto.Stream.Kind.DATA) {
+                        return base;
+                    } else {
+                        return base + BYTE_STREAM_POSITIONS + compressionValue;
+                    }
+                }
+            case BINARY:
+            case DECIMAL:
+                if (streamType == OrcProto.Stream.Kind.DATA) {
+                    return base;
+                }
+                return base + BYTE_STREAM_POSITIONS + compressionValue;
+            case TIMESTAMP:
+            case TIMESTAMP_INSTANT:
+                if (streamType == OrcProto.Stream.Kind.DATA) {
+                    return base;
+                }
+                return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
+            default:
+                throw new IllegalArgumentException("Unknown type " + 
columnType);
+        }
+    }
+
+    /**
+     * Is this stream part of a dictionary?
+     *
+     * @return is this part of a dictionary?
+     */
+    public static boolean isDictionary(
+            OrcProto.Stream.Kind kind, OrcProto.ColumnEncoding encoding) {
+        assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT;
+        OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
+        return kind == OrcProto.Stream.Kind.DICTIONARY_DATA
+                || (kind == OrcProto.Stream.Kind.LENGTH
+                        && (encodingKind == 
OrcProto.ColumnEncoding.Kind.DICTIONARY
+                                || encodingKind == 
OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
+    }
+
+    /**
+     * Build a string representation of a list of disk ranges.
+     *
+     * @param range ranges to stringify
+     * @return the resulting string
+     */
+    public static String stringifyDiskRanges(DiskRangeList range) {
+        StringBuilder buffer = new StringBuilder();
+        buffer.append("[");
+        boolean isFirst = true;
+        while (range != null) {
+            if (!isFirst) {
+                buffer.append(", {");
+            } else {
+                buffer.append("{");
+            }
+            isFirst = false;
+            buffer.append(range);
+            buffer.append("}");
+            range = range.next;
+        }
+        buffer.append("]");
+        return buffer.toString();
+    }
+
+    static long computeEnd(BufferChunk first, BufferChunk last) {
+        long end = 0;
+        for (BufferChunk ptr = first; ptr != last.next; ptr = (BufferChunk) 
ptr.next) {
+            end = Math.max(ptr.getEnd(), end);
+        }
+        return end;
+    }
+
+    /**
+     * Zero-copy read the data from the file based on a list of ranges in a 
single read.
+     *
+     * <p>As a side note, the HDFS zero copy API really sucks from a user's 
point of view.
+     *
+     * @param file the file we're reading from
+     * @param zcr the zero copy shim
+     * @param first the first range to read
+     * @param last the last range to read
+     * @param allocateDirect if we need to allocate buffers, should we use 
direct
+     * @throws IOException
+     */
+    static void zeroCopyReadRanges(
+            FSDataInputStream file,
+            HadoopShims.ZeroCopyReaderShim zcr,
+            BufferChunk first,
+            BufferChunk last,
+            boolean allocateDirect)
+            throws IOException {
+        // read all of the bytes that we need
+        final long offset = first.getOffset();
+        int length = (int) (computeEnd(first, last) - offset);
+        file.seek(offset);
+        List<ByteBuffer> bytes = new ArrayList<>();
+        while (length > 0) {
+            ByteBuffer read = zcr.readBuffer(length, false);
+            bytes.add(read);
+            length -= read.remaining();
+        }
+        long currentOffset = offset;
+
+        // iterate and fill each range
+        BufferChunk current = first;
+        Iterator<ByteBuffer> buffers = bytes.iterator();
+        ByteBuffer currentBuffer = buffers.next();
+        while (current != last.next) {
+
+            // if we are past the start of the range, restart the iterator
+            if (current.getOffset() < offset) {
+                buffers = bytes.iterator();
+                currentBuffer = buffers.next();
+                currentOffset = offset;
+            }
+
+            // walk through the buffers to find the start of the buffer
+            while (currentOffset + currentBuffer.remaining() <= 
current.getOffset()) {
+                currentOffset += currentBuffer.remaining();
+                // We assume that buffers.hasNext is true because we know we 
read
+                // enough data to cover the last range.
+                currentBuffer = buffers.next();
+            }
+
+            // did we get the current range in a single read?
+            if (currentOffset + currentBuffer.remaining() >= current.getEnd()) 
{
+                ByteBuffer copy = currentBuffer.duplicate();
+                copy.position((int) (current.getOffset() - currentOffset));
+                copy.limit(copy.position() + current.getLength());
+                current.setChunk(copy);
+
+            } else {
+                // otherwise, build a single buffer that holds the entire range
+                ByteBuffer result =
+                        allocateDirect
+                                ? 
ByteBuffer.allocateDirect(current.getLength())
+                                : ByteBuffer.allocate(current.getLength());
+                // we know that the range spans buffers
+                ByteBuffer copy = currentBuffer.duplicate();
+                // skip over the front matter
+                copy.position((int) (current.getOffset() - currentOffset));
+                result.put(copy);
+                // advance the buffer
+                currentOffset += currentBuffer.remaining();
+                currentBuffer = buffers.next();
+                while (result.hasRemaining()) {
+                    if (result.remaining() > currentBuffer.remaining()) {
+                        result.put(currentBuffer.duplicate());
+                        currentOffset += currentBuffer.remaining();
+                        currentBuffer = buffers.next();
+                    } else {
+                        copy = currentBuffer.duplicate();
+                        copy.limit(result.remaining());
+                        result.put(copy);
+                    }
+                }
+                result.flip();
+                current.setChunk(result);
+            }
+            current = (BufferChunk) current.next;
+        }
+    }
+
+    /**
+     * Find the list of ranges that should be read in a single read. The read 
will stop when there
+     * is a gap, one of the ranges already has data, or we have reached the 
maximum read size of
+     * 2^31.
+     *
+     * @param first the first range to read
+     * @return the last range to read
+     */
+    static BufferChunk findSingleRead(BufferChunk first) {
+        return findSingleRead(first, 0);
+    }
+
+    /**
+     * Find the list of ranges that should be read in a single read. The read 
will stop when there
+     * is a gap, one of the ranges already has data, or we have reached the 
maximum read size of
+     * 2^31.
+     *
+     * @param first the first range to read
+     * @param minSeekSize minimum size for seek instead of read
+     * @return the last range to read
+     */
+    private static BufferChunk findSingleRead(BufferChunk first, long 
minSeekSize) {
+        BufferChunk last = first;
+        long currentEnd = first.getEnd();
+        while (last.next != null
+                && !last.next.hasData()
+                && last.next.getOffset() <= (currentEnd + minSeekSize)
+                && last.next.getEnd() - first.getOffset() < Integer.MAX_VALUE) 
{
+            last = (BufferChunk) last.next;
+            currentEnd = Math.max(currentEnd, last.getEnd());
+        }
+        return last;
+    }
+
+    /**
+     * Read the list of ranges from the file by updating each range in the 
list with a buffer that
+     * has the bytes from the file.
+     *
+     * <p>The ranges must be sorted, but may overlap or include holes.
+     *
+     * @param file the file to read
+     * @param zcr the zero copy shim
+     * @param list the disk ranges within the file to read
+     * @param doForceDirect allocate direct buffers
+     * @param minSeekSize the minimum gap to prefer seek vs read
+     * @param minSeekSizeTolerance allowed tolerance for extra bytes in memory 
as a result of
+     *     minSeekSize
+     */
+    private static void readDiskRanges(
+            FSDataInputStream file,
+            HadoopShims.ZeroCopyReaderShim zcr,
+            BufferChunkList list,
+            boolean doForceDirect,
+            int minSeekSize,
+            double minSeekSizeTolerance)
+            throws IOException {
+        BufferChunk current = list == null ? null : list.get();
+        while (current != null) {
+            while (current.hasData()) {
+                current = (BufferChunk) current.next;
+            }
+            if (zcr != null) {
+                BufferChunk last = findSingleRead(current);
+                zeroCopyReadRanges(file, zcr, current, last, doForceDirect);
+                current = (BufferChunk) last.next;
+            } else {
+                ChunkReader chunkReader = ChunkReader.create(current, 
minSeekSize);
+                chunkReader.readRanges(file, doForceDirect, 
minSeekSizeTolerance);
+                current = (BufferChunk) chunkReader.to.next;
+            }
+        }
+    }
+
+    /** Read the list of ranges from the file by updating each range in the 
list. */
+    private static void readDiskRangesVectored(
+            VectoredReadable fileInputStream, BufferChunkList range, boolean 
doForceDirect)
+            throws IOException {
+        if (range == null) {
+            return;
+        }
+
+        if (doForceDirect) {
+            throw new UnsupportedOperationException();
+        }
+
+        List<FileRange> fileRanges = new ArrayList<>();
+        Map<FileRange, BufferChunk> map = new HashMap<>();
+        BufferChunk cur = range.get();
+        while (cur != null) {
+            if (!cur.hasData()) {
+                FileRange fileRange = 
FileRange.createFileRange(cur.getOffset(), cur.getLength());
+                fileRanges.add(fileRange);
+                map.put(fileRange, cur);
+            }
+            cur = (BufferChunk) cur.next;
+        }
+        fileInputStream.readVectored(fileRanges);
+
+        for (FileRange r : fileRanges) {
+            cur = map.get(r);
+            try {
+                cur.setChunk(ByteBuffer.wrap(r.getData().get()));
+            } catch (InterruptedException | ExecutionException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(
+            FSDataInputStream file, CompressionCodec codec, 
ByteBufferAllocatorPool pool)
+            throws IOException {
+        if ((codec == null
+                || ((codec instanceof DirectDecompressionCodec)
+                        && ((DirectDecompressionCodec) codec).isAvailable()))) 
{
+            /* codec is null or is available */
+            return SHIMS.getZeroCopyReader(file, pool);
+        }
+        return null;
+    }
+
+    private static class DefaultDataReader implements DataReader {
+        private final Supplier<FileSystem> fileSystemSupplier;
+        private final Path path;
+        private final boolean useZeroCopy;
+        private final int minSeekSize;
+        private final double minSeekSizeTolerance;
+        private FSDataInputStream file;
+        private ByteBufferAllocatorPool pool;
+        private HadoopShims.ZeroCopyReaderShim zcr = null;
+        private InStream.StreamOptions options;
+        private boolean isOpen = false;
+
+        private DefaultDataReader(DataReaderProperties properties) {
+            this.fileSystemSupplier = properties.getFileSystemSupplier();
+            this.path = properties.getPath();
+            this.file = properties.getFile();
+            this.useZeroCopy = properties.getZeroCopy();
+            this.options = properties.getCompression();
+            this.minSeekSize = properties.getMinSeekSize();
+            this.minSeekSizeTolerance = properties.getMinSeekSizeTolerance();
+        }
+
+        @Override
+        public void open() throws IOException {
+            if (file == null) {
+                this.file = fileSystemSupplier.get().open(path);
+            }
+            if (useZeroCopy) {
+                // ZCR only uses codec for boolean checks.
+                pool = new ByteBufferAllocatorPool();
+                zcr = RecordReaderUtils.createZeroCopyShim(file, 
options.getCodec(), pool);
+            } else {
+                zcr = null;
+            }
+            isOpen = true;
+        }
+
+        @Override
+        public OrcProto.StripeFooter readStripeFooter(StripeInformation 
stripe) throws IOException {
+            if (!isOpen) {
+                open();
+            }
+            long offset = stripe.getOffset() + stripe.getIndexLength() + 
stripe.getDataLength();
+            int tailLength = (int) stripe.getFooterLength();
+
+            // read the footer
+            ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
+            file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), 
tailLength);
+            return OrcProto.StripeFooter.parseFrom(
+                    InStream.createCodedInputStream(
+                            InStream.create(
+                                    "footer",
+                                    new BufferChunk(tailBuf, 0),
+                                    0,
+                                    tailLength,
+                                    options)));
+        }
+
+        @Override
+        public BufferChunkList readFileData(BufferChunkList range, boolean 
doForceDirect)
+                throws IOException {
+            SeekableInputStream wrapped =
+                    ((FSDataWrappedInputStream) 
file.getWrappedStream()).wrapped();
+            if (zcr == null && wrapped instanceof VectoredReadable) {
+                RecordReaderUtils.readDiskRangesVectored(
+                        (VectoredReadable) wrapped, range, doForceDirect);
+            } else {
+                RecordReaderUtils.readDiskRanges(
+                        file, zcr, range, doForceDirect, minSeekSize, 
minSeekSizeTolerance);
+            }
+
+            return range;
+        }
+
+        @Override
+        public void close() throws IOException {
+            if (options.getCodec() != null) {
+                OrcCodecPool.returnCodec(options.getCodec().getKind(), 
options.getCodec());
+                options.withCodec(null);
+            }
+            if (pool != null) {
+                pool.clear();
+            }
+            // close both zcr and file
+            try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) {
+                if (file != null) {
+                    file.close();
+                    file = null;
+                }
+            }
+        }
+
+        @Override
+        public boolean isTrackingDiskRanges() {
+            return zcr != null;
+        }
+
+        @Override
+        public void releaseBuffer(ByteBuffer buffer) {
+            zcr.releaseBuffer(buffer);
+        }
+
+        @Override
+        public DataReader clone() {
+            if (this.file != null) {
+                // We should really throw here, but that will cause failures 
in Hive.
+                // While Hive uses clone, just log a warning.
+                LOG.warn(
+                        "Cloning an opened DataReader; the stream will be 
reused and closed twice");
+            }
+            try {
+                DefaultDataReader clone = (DefaultDataReader) super.clone();
+                if (options.getCodec() != null) {
+                    // Make sure we don't share the same codec between two 
readers.
+                    clone.options = options.clone();
+                }
+                return clone;
+            } catch (CloneNotSupportedException e) {
+                throw new UnsupportedOperationException("uncloneable", e);
+            }
+        }
+
+        @Override
+        public InStream.StreamOptions getCompressionOptions() {
+            return options;
+        }
+    }
+
+    /**
+     * this is an implementation copied from ElasticByteBufferPool in 
hadoop-2, which lacks a
+     * clear()/clean() operation.
+     */
+    public static final class ByteBufferAllocatorPool implements 
HadoopShims.ByteBufferPoolShim {
+        private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<>();
+        private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<>();
+        private long currentGeneration = 0;
+
+        private TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
+            return direct ? directBuffers : buffers;
+        }
+
+        public void clear() {
+            buffers.clear();
+            directBuffers.clear();
+        }
+
+        @Override
+        public ByteBuffer getBuffer(boolean direct, int length) {
+            TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
+            Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new 
Key(length, 0));
+            if (entry == null) {
+                return direct ? ByteBuffer.allocateDirect(length) : 
ByteBuffer.allocate(length);
+            }
+            tree.remove(entry.getKey());
+            return entry.getValue();
+        }
+
+        @Override
+        public void putBuffer(ByteBuffer buffer) {
+            TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
+            Key key;
+
+            // Buffers are indexed by (capacity, generation).
+            // If our key is not unique on the first try, try again
+            do {
+                key = new Key(buffer.capacity(), currentGeneration++);
+            } while (tree.putIfAbsent(key, buffer) != null);
+        }
+
+        private static final class Key implements Comparable<Key> {
+            private final int capacity;
+            private final long insertionGeneration;
+
+            Key(int capacity, long insertionGeneration) {
+                this.capacity = capacity;
+                this.insertionGeneration = insertionGeneration;
+            }
+
+            @Override
+            public int compareTo(Key other) {
+                final int c = Integer.compare(capacity, other.capacity);
+                return (c != 0) ? c : Long.compare(insertionGeneration, 
other.insertionGeneration);
+            }
+
+            @Override
+            public boolean equals(Object rhs) {
+                if (rhs instanceof Key) {
+                    Key o = (Key) rhs;
+                    return 0 == compareTo(o);
+                }
+                return false;
+            }
+
+            @Override
+            public int hashCode() {
+                return new HashCodeBuilder()
+                        .append(capacity)
+                        .append(insertionGeneration)
+                        .toHashCode();
+            }
+        }
+    }
+
+    static class ChunkReader {
+        private final BufferChunk from;
+        private final BufferChunk to;
+        private final int readBytes;
+        private final int reqBytes;
+
+        private ChunkReader(BufferChunk from, BufferChunk to, int readSize, 
int reqBytes) {
+            this.from = from;
+            this.to = to;
+            this.readBytes = readSize;
+            this.reqBytes = reqBytes;
+        }
+
+        static ChunkReader create(BufferChunk from, BufferChunk to) {
+            long f = Integer.MAX_VALUE;
+            long e = Integer.MIN_VALUE;
+
+            long cf = Integer.MAX_VALUE;
+            long ef = Integer.MIN_VALUE;
+            int reqBytes = 0;
+
+            BufferChunk current = from;
+            while (current != to.next) {
+                f = Math.min(f, current.getOffset());
+                e = Math.max(e, current.getEnd());
+                if (ef == Integer.MIN_VALUE || current.getOffset() <= ef) {
+                    cf = Math.min(cf, current.getOffset());
+                    ef = Math.max(ef, current.getEnd());
+                } else {
+                    reqBytes += ef - cf;
+                    cf = current.getOffset();
+                    ef = current.getEnd();
+                }
+                current = (BufferChunk) current.next;
+            }
+            reqBytes += ef - cf;
+            return new ChunkReader(from, to, (int) (e - f), reqBytes);
+        }
+
+        static ChunkReader create(BufferChunk from, int minSeekSize) {
+            BufferChunk to = findSingleRead(from, minSeekSize);
+            return create(from, to);
+        }
+
+        double getExtraBytesFraction() {
+            return (readBytes - reqBytes) / ((double) reqBytes);
+        }
+
+        public int getReadBytes() {
+            return readBytes;
+        }
+
+        public int getReqBytes() {
+            return reqBytes;
+        }
+
+        public BufferChunk getFrom() {
+            return from;
+        }
+
+        public BufferChunk getTo() {
+            return to;
+        }
+
+        void populateChunks(ByteBuffer bytes, boolean allocateDirect, double 
extraByteTolerance) {
+            if (getExtraBytesFraction() > extraByteTolerance) {
+                LOG.debug(
+                        "ExtraBytesFraction = {}, ExtraByteTolerance = {}, 
reducing memory size",
+                        getExtraBytesFraction(),
+                        extraByteTolerance);
+                populateChunksReduceSize(bytes, allocateDirect);
+            } else {
+                LOG.debug(
+                        "ExtraBytesFraction = {}, ExtraByteTolerance = {}, 
populating as is",
+                        getExtraBytesFraction(),
+                        extraByteTolerance);
+                populateChunksAsIs(bytes);
+            }
+        }
+
+        void populateChunksAsIs(ByteBuffer bytes) {
+            // populate each BufferChunks with the data
+            BufferChunk current = from;
+            long offset = from.getOffset();
+            while (current != to.next) {
+                ByteBuffer currentBytes = current == to ? bytes : 
bytes.duplicate();
+                currentBytes.position((int) (current.getOffset() - offset));
+                currentBytes.limit((int) (current.getEnd() - offset));
+                current.setChunk(currentBytes);
+                current = (BufferChunk) current.next;
+            }
+        }
+
+        void populateChunksReduceSize(ByteBuffer bytes, boolean 
allocateDirect) {
+            ByteBuffer newBuffer;
+            if (allocateDirect) {
+                newBuffer = ByteBuffer.allocateDirect(reqBytes);
+                newBuffer.position(reqBytes);
+                newBuffer.flip();
+            } else {
+                byte[] newBytes = new byte[reqBytes];
+                newBuffer = ByteBuffer.wrap(newBytes);
+            }
+
+            final long offset = from.getOffset();
+            int copyStart = 0;
+            int copyEnd;
+            int copyLength;
+            int skippedBytes = 0;
+            int srcPosition;
+            BufferChunk current = from;
+            while (current != to.next) {
+                // We can skip bytes as required, but no need to copy bytes 
that are already copied
+                srcPosition = (int) (current.getOffset() - offset);
+                skippedBytes += Math.max(0, srcPosition - copyStart);
+                copyStart = Math.max(copyStart, srcPosition);
+                copyEnd = (int) (current.getEnd() - offset);
+                copyLength = copyStart < copyEnd ? copyEnd - copyStart : 0;
+                newBuffer.put(bytes.array(), copyStart, copyLength);
+                copyStart += copyLength;
+                // Set up new ByteBuffer that wraps on the same backing array
+                ByteBuffer currentBytes = current == to ? newBuffer : 
newBuffer.duplicate();
+                currentBytes.position(srcPosition - skippedBytes);
+                currentBytes.limit(currentBytes.position() + 
current.getLength());
+                current.setChunk(currentBytes);
+                current = (BufferChunk) current.next;
+            }
+        }
+
+        /**
+         * Read the data from the file based on a list of ranges in a single 
read.
+         *
+         * @param file the file to read from
+         * @param allocateDirect should we use direct buffers
+         */
+        void readRanges(FSDataInputStream file, boolean allocateDirect, double 
extraByteTolerance)
+                throws IOException {
+            // assume that the chunks are sorted by offset
+            long offset = from.getOffset();
+            int readSize = (int) (computeEnd(from, to) - offset);
+            byte[] buffer = new byte[readSize];
+            try {
+                file.readFully(offset, buffer, 0, buffer.length);
+            } catch (IOException e) {
+                throw new IOException(
+                        String.format("Failed while reading %s %d:%d", file, 
offset, buffer.length),
+                        e);
+            }
+
+            // get the data into a ByteBuffer
+            ByteBuffer bytes;
+            if (allocateDirect) {
+                bytes = ByteBuffer.allocateDirect(readSize);
+                bytes.put(buffer);
+                bytes.flip();
+            } else {
+                bytes = ByteBuffer.wrap(buffer);
+            }
+
+            // populate each BufferChunks with the data
+            populateChunks(bytes, allocateDirect, extraByteTolerance);
+        }
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/fs/FSDataWrappedInputStream.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/fs/FSDataWrappedInputStream.java
new file mode 100644
index 000000000..576fd8afe
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/fs/FSDataWrappedInputStream.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.fs;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.IOUtils;
+
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/** A {@link InputStream} to wrap {@link SeekableInputStream} for Paimon's 
input streams. */
+public class FSDataWrappedInputStream extends InputStream implements Seekable, 
PositionedReadable {
+
+    private final SeekableInputStream seekableInputStream;
+
+    public FSDataWrappedInputStream(SeekableInputStream seekableInputStream) {
+        this.seekableInputStream = seekableInputStream;
+    }
+
+    public SeekableInputStream wrapped() {
+        return seekableInputStream;
+    }
+
+    @Override
+    public int read() throws IOException {
+        return seekableInputStream.read();
+    }
+
+    @Override
+    public int read(long position, byte[] buffer, int offset, int length) 
throws IOException {
+        seekableInputStream.seek(position);
+        return seekableInputStream.read(buffer, offset, length);
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer, int offset, int 
length) throws IOException {
+        seekableInputStream.seek(position);
+        IOUtils.readFully(seekableInputStream, buffer, offset, length);
+    }
+
+    @Override
+    public void readFully(long position, byte[] buffer) throws IOException {
+        readFully(position, buffer, 0, buffer.length);
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+        seekableInputStream.seek(pos);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return seekableInputStream.getPos();
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        seekableInputStream.close();
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java
index 1dcc0da00..57454368c 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java
@@ -19,21 +19,16 @@
 package org.apache.paimon.format.fs;
 
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.utils.IOUtils;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.Progressable;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.URI;
 
 /** A read only {@link FileSystem} that wraps an {@link FileIO}. */
@@ -138,58 +133,4 @@ public class HadoopReadOnlyFileSystem extends FileSystem {
     public boolean mkdirs(Path path, FsPermission fsPermission) {
         throw new UnsupportedOperationException();
     }
-
-    /** A {@link InputStream} to wrap {@link SeekableInputStream} for Paimon's 
input streams. */
-    private static class FSDataWrappedInputStream extends InputStream
-            implements Seekable, PositionedReadable {
-
-        private final SeekableInputStream seekableInputStream;
-
-        private FSDataWrappedInputStream(SeekableInputStream 
seekableInputStream) {
-            this.seekableInputStream = seekableInputStream;
-        }
-
-        @Override
-        public int read() throws IOException {
-            return seekableInputStream.read();
-        }
-
-        @Override
-        public int read(long position, byte[] buffer, int offset, int length) 
throws IOException {
-            seekableInputStream.seek(position);
-            return seekableInputStream.read(buffer, offset, length);
-        }
-
-        @Override
-        public void readFully(long position, byte[] buffer, int offset, int 
length)
-                throws IOException {
-            seekableInputStream.seek(position);
-            IOUtils.readFully(seekableInputStream, buffer, offset, length);
-        }
-
-        @Override
-        public void readFully(long position, byte[] buffer) throws IOException 
{
-            readFully(position, buffer, 0, buffer.length);
-        }
-
-        @Override
-        public void seek(long pos) throws IOException {
-            seekableInputStream.seek(pos);
-        }
-
-        @Override
-        public long getPos() throws IOException {
-            return seekableInputStream.getPos();
-        }
-
-        @Override
-        public boolean seekToNewSource(long targetPos) {
-            return false;
-        }
-
-        @Override
-        public void close() throws IOException {
-            seekableInputStream.close();
-        }
-    }
 }

Reply via email to