This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 245cbad9e [core] Introduce VectoredReadable to SeekableInputStream
(#3369)
245cbad9e is described below
commit 245cbad9e11c765ec099014ba8f6b310d55f43f0
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed May 29 14:06:24 2024 +0800
[core] Introduce VectoredReadable to SeekableInputStream (#3369)
---
LICENSE | 3 +
.../paimon/benchmark/TableReadBenchmark.java | 25 +-
.../java/org/apache/paimon/fs/FileIOUtils.java | 7 +
.../main/java/org/apache/paimon/fs/FileRange.java | 83 +++
.../org/apache/paimon/fs/VectoredReadUtils.java | 247 +++++++
.../org/apache/paimon/fs/VectoredReadable.java | 79 ++
.../org/apache/paimon/fs/local/LocalFileIO.java | 14 +-
.../org/apache/paimon/utils/BlockingExecutor.java | 51 ++
.../java/org/apache/paimon/utils/ThreadUtils.java | 36 +
.../apache/paimon/fs/VectoredReadUtilsTest.java | 130 ++++
.../org/apache/orc/impl/RecordReaderUtils.java | 805 +++++++++++++++++++++
.../paimon/format/fs/FSDataWrappedInputStream.java | 84 +++
.../paimon/format/fs/HadoopReadOnlyFileSystem.java | 59 --
13 files changed, 1562 insertions(+), 61 deletions(-)
diff --git a/LICENSE b/LICENSE
index 813296254..8f291d3a0 100644
--- a/LICENSE
+++ b/LICENSE
@@ -209,6 +209,8 @@ Apache Software Foundation License 2.0
--------------------------------------
paimon-common/src/main/java/org/apache/paimon/fs/Path.java
+paimon-common/src/main/java/org/apache/paimon/fs/FileRange.java
+paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java
from http://hadoop.apache.org/ version 2.10.2
paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java
@@ -258,6 +260,7 @@ from https://hive.apache.org/ version 3.1.0
paimon-format/src/main/java/org/apache/orc/impl/PhysicalFsWriter.java
paimon-format/src/main/java/org/apache/orc/impl/WriterImpl.java
paimon-format/src/main/java/org/apache/orc/impl/ZstdCodec.java
+paimon-format/src/main/java/org/apache/orc/impl/RecordReaderUtils.java
paimon-format/src/main/java/org/apache/orc/CompressionKind.java
paimon-format/src/main/java/org/apache/orc/OrcConf.java
paimon-format/src/main/java/org/apache/orc/OrcFile.java
diff --git
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java
index 4047b0d57..5ceaa95f4 100644
---
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java
+++
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableReadBenchmark.java
@@ -31,6 +31,8 @@ import org.apache.paimon.table.source.Split;
import org.junit.jupiter.api.Test;
+import javax.annotation.Nullable;
+
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -78,6 +80,20 @@ public class TableReadBenchmark extends TableBenchmark {
*/
}
+ @Test
+ public void testOrcReadProjection() throws Exception {
+ innerTestProjection(
+ Collections.singletonMap("orc", prepareData(orc(), "orc")),
+ new int[] {0, 5, 10, 14});
+ /*
+ * OpenJDK 64-Bit Server VM 1.8.0_292-b10 on Mac OS X 10.16
+ * Apple M1 Pro
+ * read: Best/Avg Time(ms) Row Rate(K/s)
Per Row(ns) Relative
+ *
------------------------------------------------------------------------------------------------
+ * OPERATORTEST_read_read-orc 716 / 728 4187.4
238.8 1.0X
+ */
+ }
+
private Options orc() {
Options options = new Options();
options.set(CoreOptions.FILE_FORMAT, CoreOptions.FILE_FORMAT_ORC);
@@ -97,6 +113,10 @@ public class TableReadBenchmark extends TableBenchmark {
}
private void innerTest(Map<String, Table> tables) {
+ innerTestProjection(tables, null);
+ }
+
+ private void innerTestProjection(Map<String, Table> tables, @Nullable
int[] projection) {
int readTime = 3;
Benchmark benchmark =
new Benchmark("read", readTime * rowCount)
@@ -115,7 +135,10 @@ public class TableReadBenchmark extends TableBenchmark {
try {
for (Split split : splits) {
RecordReader<InternalRow> reader =
-
table.newReadBuilder().newRead().createReader(split);
+ table.newReadBuilder()
+ .withProjection(projection)
+ .newRead()
+ .createReader(split);
reader.forEachRemaining(row ->
readCount.incrementAndGet());
}
System.out.printf("Finish read %d rows.\n",
readCount.get());
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIOUtils.java
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIOUtils.java
index f7637c822..556453424 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIOUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIOUtils.java
@@ -21,10 +21,17 @@ package org.apache.paimon.fs;
import org.apache.paimon.catalog.CatalogContext;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.paimon.utils.ThreadUtils.newDaemonThreadFactory;
/** Utils for {@link FileIO}. */
public class FileIOUtils {
+ public static final ExecutorService IO_THREAD_POOL =
+
Executors.newCachedThreadPool(newDaemonThreadFactory("IO-THREAD-POOL"));
+
public static FileIOLoader checkAccess(FileIOLoader fileIO, Path path,
CatalogContext config)
throws IOException {
if (fileIO == null) {
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileRange.java
b/paimon-common/src/main/java/org/apache/paimon/fs/FileRange.java
new file mode 100644
index 000000000..2c85ae500
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileRange.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.util.concurrent.CompletableFuture;
+
+/* This file is based on source code from the Hadoop Project
(http://hadoop.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** A byte range of a file. */
+public interface FileRange {
+
+ /** Get the starting offset of the range. */
+ long getOffset();
+
+ /** Get the length of the range. */
+ int getLength();
+
+ /** Get the future data for this range. */
+ CompletableFuture<byte[]> getData();
+
+ /**
+ * Factory method to create a FileRange object.
+ *
+ * @param offset starting offset of the range.
+ * @param length length of the range.
+ * @return a new instance of FileRangeImpl.
+ */
+ static FileRange createFileRange(long offset, int length) {
+ return new FileRangeImpl(offset, length);
+ }
+
+ /** An implementation for {@link FileRange}. */
+ class FileRangeImpl implements FileRange {
+
+ private final long offset;
+ private final int length;
+ private final CompletableFuture<byte[]> reader;
+
+ public FileRangeImpl(long offset, int length) {
+ this.offset = offset;
+ this.length = length;
+ this.reader = new CompletableFuture<>();
+ }
+
+ @Override
+ public String toString() {
+ return "range[" + offset + "," + (offset + length) + ")";
+ }
+
+ @Override
+ public long getOffset() {
+ return offset;
+ }
+
+ @Override
+ public int getLength() {
+ return length;
+ }
+
+ @Override
+ public CompletableFuture<byte[]> getData() {
+ return reader;
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java
b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java
new file mode 100644
index 000000000..de7974aca
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.apache.paimon.utils.BlockingExecutor;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.paimon.fs.FileIOUtils.IO_THREAD_POOL;
+import static org.apache.paimon.fs.FileRange.createFileRange;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/* This file is based on source code from the Hadoop Project
(http://hadoop.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/** Utils for {@link VectoredReadable}. */
+public class VectoredReadUtils {
+
+ public static void readVectored(VectoredReadable readable, List<? extends
FileRange> ranges)
+ throws IOException {
+ if (ranges.isEmpty()) {
+ return;
+ }
+
+ List<CombinedRange> combinedRanges =
+ mergeSortedRanges(validateAndSortRanges(ranges),
readable.minSeekForVectorReads());
+
+ int parallelism = readable.parallelismForVectorReads();
+ BlockingExecutor executor = new BlockingExecutor(IO_THREAD_POOL,
parallelism);
+ long batchSize = readable.batchSizeForVectorReads();
+ for (CombinedRange combinedRange : combinedRanges) {
+ if (combinedRange.underlying.size() == 1) {
+ FileRange fileRange = combinedRange.underlying.get(0);
+ executor.submit(() -> readSingleRange(readable, fileRange));
+ } else {
+ List<FileRange> splitBatches =
combinedRange.splitBatches(batchSize, parallelism);
+ splitBatches.forEach(
+ range -> executor.submit(() ->
readSingleRange(readable, range)));
+ List<CompletableFuture<byte[]>> futures =
+
splitBatches.stream().map(FileRange::getData).collect(Collectors.toList());
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture<?>[0]))
+ .thenAcceptAsync(
+ unused -> copyToFileRanges(combinedRange,
futures), IO_THREAD_POOL);
+ }
+ }
+ }
+
+ private static void readSingleRange(VectoredReadable readable, FileRange
range) {
+ if (range.getLength() == 0) {
+ range.getData().complete(new byte[0]);
+ return;
+ }
+ try {
+ long position = range.getOffset();
+ int length = range.getLength();
+ byte[] buffer = new byte[length];
+ readable.preadFully(position, buffer, 0, length);
+ range.getData().complete(buffer);
+ } catch (Exception ex) {
+ range.getData().completeExceptionally(ex);
+ }
+ }
+
+ private static void copyToFileRanges(
+ CombinedRange combinedRange, List<CompletableFuture<byte[]>>
futures) {
+ List<byte[]> segments = new ArrayList<>(futures.size());
+ for (CompletableFuture<byte[]> future : futures) {
+ segments.add(future.join());
+ }
+ long offset = combinedRange.offset;
+ for (FileRange fileRange : combinedRange.underlying) {
+ byte[] buffer = new byte[fileRange.getLength()];
+ copyMultiBytesToBytes(
+ segments,
+ (int) (fileRange.getOffset() - offset),
+ buffer,
+ fileRange.getLength());
+ fileRange.getData().complete(buffer);
+ }
+ }
+
+ private static void copyMultiBytesToBytes(
+ List<byte[]> segments, int offset, byte[] bytes, int numBytes) {
+ int remainSize = numBytes;
+ for (byte[] segment : segments) {
+ int remain = segment.length - offset;
+ if (remain > 0) {
+ int nCopy = Math.min(remain, remainSize);
+ System.arraycopy(segment, offset, bytes, numBytes -
remainSize, nCopy);
+ remainSize -= nCopy;
+ // next new segment.
+ offset = 0;
+ if (remainSize == 0) {
+ return;
+ }
+ } else {
+ // remain is negative, let's advance to next segment
+ // now the offset = offset - segmentSize (-remain)
+ offset = -remain;
+ }
+ }
+ }
+
+ private static List<? extends FileRange> validateAndSortRanges(
+ final List<? extends FileRange> input) throws EOFException {
+ requireNonNull(input, "Null input list");
+ checkArgument(!input.isEmpty(), "Empty input list");
+ final List<? extends FileRange> sortedRanges;
+
+ if (input.size() == 1) {
+ validateRangeRequest(input.get(0));
+ sortedRanges = input;
+ } else {
+ sortedRanges = sortRanges(input);
+ FileRange prev = null;
+ for (final FileRange current : sortedRanges) {
+ validateRangeRequest(current);
+ if (prev != null) {
+ checkArgument(
+ current.getOffset() >= prev.getOffset() +
prev.getLength(),
+ "Overlapping ranges %s and %s",
+ prev,
+ current);
+ }
+ prev = current;
+ }
+ }
+ return sortedRanges;
+ }
+
+ private static void validateRangeRequest(FileRange range) throws
EOFException {
+ requireNonNull(range, "range is null");
+ checkArgument(range.getLength() >= 0, "length is negative in %s",
range);
+ if (range.getOffset() < 0) {
+ throw new EOFException("position is negative in range " + range);
+ }
+ }
+
+ private static List<? extends FileRange> sortRanges(List<? extends
FileRange> input) {
+ List<? extends FileRange> ret = new ArrayList<>(input);
+ ret.sort(Comparator.comparingLong(FileRange::getOffset));
+ return ret;
+ }
+
+ private static List<CombinedRange> mergeSortedRanges(
+ List<? extends FileRange> sortedRanges, int minimumSeek) {
+
+ CombinedRange current = null;
+ List<CombinedRange> result = new ArrayList<>(sortedRanges.size());
+
+ // now merge together the ones that merge
+ for (FileRange range : sortedRanges) {
+ long start = range.getOffset();
+ long end = range.getOffset() + range.getLength();
+ if (current == null || !current.merge(start, end, range,
minimumSeek)) {
+ current = new CombinedRange(start, end, range);
+ result.add(current);
+ }
+ }
+ return result;
+ }
+
+ private static class CombinedRange {
+
+ private final List<FileRange> underlying = new ArrayList<>();
+ private final long offset;
+
+ private int length;
+ private long dataSize;
+
+ public CombinedRange(long offset, long end, FileRange original) {
+ this.offset = offset;
+ this.length = (int) (end - offset);
+ append(original);
+ }
+
+ private void append(final FileRange range) {
+ this.underlying.add(range);
+ dataSize += range.getLength();
+ }
+
+ public boolean merge(long otherOffset, long otherEnd, FileRange other,
int minSeek) {
+ long end = offset + length;
+ long newEnd = Math.max(end, otherEnd);
+ if (otherOffset - end >= minSeek) {
+ return false;
+ }
+ this.length = (int) (newEnd - offset);
+ append(other);
+ return true;
+ }
+
+ private List<FileRange> splitBatches(long batchSize, int parallelism) {
+ long expectedSize = Math.max(batchSize, (length / parallelism) +
1);
+ List<FileRange> splitBatches = new ArrayList<>();
+ long offset = this.offset;
+ long end = offset + length;
+
+ // split only when remain size exceeds twice the batchSize to
avoid small File IO
+ long minRemain = Math.max(expectedSize, batchSize * 2);
+
+ while (true) {
+ if (end < offset + minRemain) {
+ int currentLen = (int) (end - offset);
+ if (currentLen > 0) {
+ splitBatches.add(createFileRange(offset, currentLen));
+ }
+ break;
+ } else {
+ splitBatches.add(createFileRange(offset, (int)
expectedSize));
+ offset += expectedSize;
+ }
+ }
+ return splitBatches;
+ }
+
+ @Override
+ public String toString() {
+ return String.format(
+ "CombinedRange: range count=%d, data size=%,d",
underlying.size(), dataSize);
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadable.java
b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadable.java
new file mode 100644
index 000000000..a3d3faace
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadable.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.List;
+
+/** Stream that permits vectored reading. */
+public interface VectoredReadable {
+
+ /**
+ * Read up to the specified number of bytes, from a given position within
a file, and return the
+ * number of bytes read. This does not change the current offset of a
file, and is thread-safe.
+ */
+ int pread(long position, byte[] buffer, int offset, int length) throws
IOException;
+
+ /**
+ * Read the specified number of bytes fully, from a given position within
a file. This does not
+ * change the current offset of a file, and is thread-safe.
+ */
+ default void preadFully(long position, byte[] buffer, int offset, int
length)
+ throws IOException {
+ int readBytes = 0;
+ while (readBytes < length) {
+ int readBytesCurr = pread(position, buffer, offset + readBytes,
length - readBytes);
+ if (readBytesCurr < 0) {
+ throw new EOFException(
+ String.format(
+ "Input Stream closed before all bytes were
read."
+ + " Expected %,d bytes but only read
%,d bytes. Current position %,d",
+ length, readBytes, position));
+ }
+ readBytes += readBytesCurr;
+ position += readBytesCurr;
+ }
+ }
+
+ /** The smallest reasonable seek. */
+ default int minSeekForVectorReads() {
+ return 256 * 1024;
+ }
+
+ /** The batch size of data read by a single parallelism. */
+ default int batchSizeForVectorReads() {
+ return 1024 * 1024;
+ }
+
+ /** The read parallelism for vector reads. */
+ default int parallelismForVectorReads() {
+ return 4;
+ }
+
+ /**
+ * Read fully a list of file ranges asynchronously from this file.
+ *
+ * <p>As a result of the call, each range will have
FileRange.setData(CompletableFuture) called
+ * with a future that when complete will have a ByteBuffer with the data
from the file's range.
+ */
+ default void readVectored(List<? extends FileRange> ranges) throws
IOException {
+ VectoredReadUtils.readVectored(this, ranges);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
index 55d264f1c..82d8145a2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
@@ -24,12 +24,14 @@ import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.VectoredReadable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.AccessDeniedException;
import java.nio.file.DirectoryNotEmptyException;
@@ -233,7 +235,8 @@ public class LocalFileIO implements FileIO {
}
/** Local {@link SeekableInputStream}. */
- public static class LocalSeekableInputStream extends SeekableInputStream {
+ public static class LocalSeekableInputStream extends SeekableInputStream
+ implements VectoredReadable {
private final FileInputStream in;
private final FileChannel channel;
@@ -269,6 +272,15 @@ public class LocalFileIO implements FileIO {
public void close() throws IOException {
in.close();
}
+
+ @Override
+ public int pread(long position, byte[] b, int off, int len) throws
IOException {
+ if (len == 0) {
+ return 0;
+ }
+
+ return channel.read(ByteBuffer.wrap(b, off, len), position);
+ }
}
/** Local {@link PositionOutputStream}. */
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/BlockingExecutor.java
b/paimon-common/src/main/java/org/apache/paimon/utils/BlockingExecutor.java
new file mode 100644
index 000000000..142312ef4
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/BlockingExecutor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Semaphore;
+
+/** A executor wrapper to execute with {@link Semaphore}. */
+public class BlockingExecutor {
+
+ private final Semaphore semaphore;
+ private final ExecutorService executor;
+
+ public BlockingExecutor(ExecutorService executor, int permits) {
+ this.semaphore = new Semaphore(permits, true);
+ this.executor = executor;
+ }
+
+ public void submit(Runnable task) {
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ executor.submit(
+ () -> {
+ try {
+ task.run();
+ } finally {
+ semaphore.release();
+ }
+ });
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadUtils.java
index 25779892b..5479f0653 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadUtils.java
@@ -23,6 +23,8 @@ import org.slf4j.Logger;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.util.Arrays;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/** Utils for thread. */
@@ -54,4 +56,38 @@ public class ThreadUtils {
}
return false;
}
+
+ public static ThreadFactory newDaemonThreadFactory(final String prefix) {
+ final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
+ return new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = namedFactory.newThread(r);
+ if (!t.isDaemon()) {
+ t.setDaemon(true);
+ }
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ return t;
+ }
+ };
+ }
+
+ private static ThreadFactory getNamedThreadFactory(final String prefix) {
+ SecurityManager s = System.getSecurityManager();
+ final ThreadGroup threadGroup =
+ (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
+
+ return new ThreadFactory() {
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final ThreadGroup group = threadGroup;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ final String name = prefix + "-t" +
threadNumber.getAndIncrement();
+ return new Thread(group, r, name);
+ }
+ };
+ }
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java
new file mode 100644
index 000000000..a3264b08e
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class VectoredReadUtilsTest {
+
+ private final byte[] bytes;
+ private final VectoredReadable readable;
+
+ public VectoredReadUtilsTest() {
+ this.bytes = new byte[1024 * 1024];
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ random.nextBytes(bytes);
+ this.readable =
+ new VectoredReadable() {
+ @Override
+ public int minSeekForVectorReads() {
+ return 100;
+ }
+
+ @Override
+ public int batchSizeForVectorReads() {
+ return 1000;
+ }
+
+ @Override
+ public int pread(long position, byte[] buffer, int offset,
int length)
+ throws IOException {
+ boolean returnAll = random.nextBoolean();
+ int len = returnAll ? length : random.nextInt(length)
+ 1;
+ System.arraycopy(bytes, (int) position, buffer,
offset, len);
+ return len;
+ }
+ };
+ }
+
+ private void doTest(List<FileRange> ranges) throws Exception {
+ VectoredReadUtils.readVectored(readable, ranges);
+ for (FileRange range : ranges) {
+ byte[] expected = new byte[range.getLength()];
+ System.arraycopy(bytes, (int) range.getOffset(), expected, 0,
range.getLength());
+ assertThat(range.getData().get()).isEqualTo(expected);
+ }
+ }
+
+ @Test
+ public void testNormal() throws Exception {
+ // test empty
+ doTest(Collections.emptyList());
+
+ // test without merge
+ doTest(
+ Arrays.asList(
+ FileRange.createFileRange(0, 100),
+ FileRange.createFileRange(100, 200),
+ FileRange.createFileRange(500, 1000)));
+
+ // test with merge
+ doTest(
+ Arrays.asList(
+ FileRange.createFileRange(0, 60),
+ FileRange.createFileRange(100, 90),
+ FileRange.createFileRange(300, 200)));
+
+ // test with batchSize
+ doTest(
+ Arrays.asList(
+ FileRange.createFileRange(60, 800),
+ FileRange.createFileRange(1000, 500),
+ FileRange.createFileRange(1550, 600)));
+
+ // test with align huge
+ doTest(
+ Arrays.asList(
+ FileRange.createFileRange(0, 5000),
+ FileRange.createFileRange(6000, 500),
+ FileRange.createFileRange(7000, 800)));
+
+ // test with no align huge
+ doTest(
+ Arrays.asList(
+ FileRange.createFileRange(60, 5120),
+ FileRange.createFileRange(6020, 520),
+ FileRange.createFileRange(7300, 850)));
+ }
+
+ @Test
+ public void testRandom() throws Exception {
+ List<FileRange> ranges = new ArrayList<>();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int lastEnd = 0;
+ for (int i = 0; i < random.nextInt(10); i++) {
+ int start = random.nextInt(102 * 1024) + lastEnd;
+ int len = random.nextInt(102 * 1024) + 1;
+ if (start + len > bytes.length) {
+ break;
+ }
+ ranges.add(FileRange.createFileRange(start, len));
+ lastEnd = start + len;
+ }
+ doTest(ranges);
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderUtils.java
b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderUtils.java
new file mode 100644
index 000000000..0ceaadb44
--- /dev/null
+++ b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.apache.paimon.format.fs.FSDataWrappedInputStream;
+import org.apache.paimon.fs.FileRange;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.VectoredReadable;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.io.DiskRangeList;
+import org.apache.orc.CompressionCodec;
+import org.apache.orc.DataReader;
+import org.apache.orc.OrcProto;
+import org.apache.orc.StripeInformation;
+import org.apache.orc.TypeDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+/* This file is based on source code from the ORC Project
(http://orc.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+/**
+ * Stateless methods shared between RecordReaderImpl and EncodedReaderImpl.
+ *
+ * <p>NOTE: The file was copied and modified to support {@link
VectoredReadable}.
+ */
+public class RecordReaderUtils {
+
+ // for uncompressed streams, what is the most overlap with the following
set
+ // of rows (long vint literal group).
+ static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+ // the maximum number of values that need to be consumed from the run
+ static final int MAX_VALUES_LENGTH = RunLengthIntegerWriterV2.MAX_SCOPE;
+ // the maximum byte width for each value
+ static final int MAX_BYTE_WIDTH =
+
SerializationUtils.decodeBitWidth(SerializationUtils.FixedBitSizes.SIXTYFOUR.ordinal())
+ / 8;
+ private static final HadoopShims SHIMS = HadoopShimsFactory.get();
+ private static final Logger LOG =
LoggerFactory.getLogger(RecordReaderUtils.class);
+ private static final int BYTE_STREAM_POSITIONS = 1;
+ private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS
+ 1;
+ private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS +
1;
+ private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS
+ 1;
+
+ public static DataReader createDefaultDataReader(DataReaderProperties
properties) {
+ return new DefaultDataReader(properties);
+ }
+
+ /**
+ * Does region A overlap region B? The end points are inclusive on both
sides.
+ *
+ * @param leftA A's left point
+ * @param rightA A's right point
+ * @param leftB B's left point
+ * @param rightB B's right point
+ * @return Does region A overlap region B?
+ */
+ static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
+ if (leftA <= leftB) {
+ return rightA >= leftB;
+ }
+ return rightB >= leftA;
+ }
+
+ public static long estimateRgEndOffset(
+ boolean isCompressed,
+ int bufferSize,
+ boolean isLast,
+ long nextGroupOffset,
+ long streamLength) {
+ // Figure out the worst case last location
+ long slop = WORST_UNCOMPRESSED_SLOP;
+ // Stretch the slop by a factor to safely accommodate following
compression blocks.
+ // We need to calculate the maximum number of blocks(stretchFactor) by
bufferSize
+ // accordingly.
+ if (isCompressed) {
+ int stretchFactor = 2 + (MAX_VALUES_LENGTH * MAX_BYTE_WIDTH - 1) /
bufferSize;
+ slop = (long) stretchFactor * (OutStream.HEADER_SIZE + bufferSize);
+ }
+ return isLast ? streamLength : Math.min(streamLength, nextGroupOffset
+ slop);
+ }
+
+ /**
+ * Get the offset in the index positions for the column that the given
stream starts.
+ *
+ * @param columnEncoding the encoding of the column
+ * @param columnType the type of the column
+ * @param streamType the kind of the stream
+ * @param isCompressed is the stream compressed?
+ * @param hasNulls does the column have a PRESENT stream?
+ * @return the number of positions that will be used for that stream
+ */
+ public static int getIndexPosition(
+ OrcProto.ColumnEncoding.Kind columnEncoding,
+ TypeDescription.Category columnType,
+ OrcProto.Stream.Kind streamType,
+ boolean isCompressed,
+ boolean hasNulls) {
+ if (streamType == OrcProto.Stream.Kind.PRESENT) {
+ return 0;
+ }
+ int compressionValue = isCompressed ? 1 : 0;
+ int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
+ switch (columnType) {
+ case BOOLEAN:
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case DATE:
+ case STRUCT:
+ case MAP:
+ case LIST:
+ case UNION:
+ return base;
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY
+ || columnEncoding ==
OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+ return base;
+ } else {
+ if (streamType == OrcProto.Stream.Kind.DATA) {
+ return base;
+ } else {
+ return base + BYTE_STREAM_POSITIONS + compressionValue;
+ }
+ }
+ case BINARY:
+ case DECIMAL:
+ if (streamType == OrcProto.Stream.Kind.DATA) {
+ return base;
+ }
+ return base + BYTE_STREAM_POSITIONS + compressionValue;
+ case TIMESTAMP:
+ case TIMESTAMP_INSTANT:
+ if (streamType == OrcProto.Stream.Kind.DATA) {
+ return base;
+ }
+ return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
+ default:
+ throw new IllegalArgumentException("Unknown type " +
columnType);
+ }
+ }
+
+ /**
+ * Is this stream part of a dictionary?
+ *
+ * @return is this part of a dictionary?
+ */
+ public static boolean isDictionary(
+ OrcProto.Stream.Kind kind, OrcProto.ColumnEncoding encoding) {
+ assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT;
+ OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
+ return kind == OrcProto.Stream.Kind.DICTIONARY_DATA
+ || (kind == OrcProto.Stream.Kind.LENGTH
+ && (encodingKind ==
OrcProto.ColumnEncoding.Kind.DICTIONARY
+ || encodingKind ==
OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
+ }
+
+ /**
+ * Build a string representation of a list of disk ranges.
+ *
+ * @param range ranges to stringify
+ * @return the resulting string
+ */
+ public static String stringifyDiskRanges(DiskRangeList range) {
+ StringBuilder buffer = new StringBuilder();
+ buffer.append("[");
+ boolean isFirst = true;
+ while (range != null) {
+ if (!isFirst) {
+ buffer.append(", {");
+ } else {
+ buffer.append("{");
+ }
+ isFirst = false;
+ buffer.append(range);
+ buffer.append("}");
+ range = range.next;
+ }
+ buffer.append("]");
+ return buffer.toString();
+ }
+
+ static long computeEnd(BufferChunk first, BufferChunk last) {
+ long end = 0;
+ for (BufferChunk ptr = first; ptr != last.next; ptr = (BufferChunk)
ptr.next) {
+ end = Math.max(ptr.getEnd(), end);
+ }
+ return end;
+ }
+
+ /**
+ * Zero-copy read the data from the file based on a list of ranges in a
single read.
+ *
+ * <p>As a side note, the HDFS zero copy API really sucks from a user's
point of view.
+ *
+ * @param file the file we're reading from
+ * @param zcr the zero copy shim
+ * @param first the first range to read
+ * @param last the last range to read
+ * @param allocateDirect if we need to allocate buffers, should we use
direct
+ * @throws IOException
+ */
+ static void zeroCopyReadRanges(
+ FSDataInputStream file,
+ HadoopShims.ZeroCopyReaderShim zcr,
+ BufferChunk first,
+ BufferChunk last,
+ boolean allocateDirect)
+ throws IOException {
+ // read all of the bytes that we need
+ final long offset = first.getOffset();
+ int length = (int) (computeEnd(first, last) - offset);
+ file.seek(offset);
+ List<ByteBuffer> bytes = new ArrayList<>();
+ while (length > 0) {
+ ByteBuffer read = zcr.readBuffer(length, false);
+ bytes.add(read);
+ length -= read.remaining();
+ }
+ long currentOffset = offset;
+
+ // iterate and fill each range
+ BufferChunk current = first;
+ Iterator<ByteBuffer> buffers = bytes.iterator();
+ ByteBuffer currentBuffer = buffers.next();
+ while (current != last.next) {
+
+ // if we are past the start of the range, restart the iterator
+ if (current.getOffset() < offset) {
+ buffers = bytes.iterator();
+ currentBuffer = buffers.next();
+ currentOffset = offset;
+ }
+
+ // walk through the buffers to find the start of the buffer
+ while (currentOffset + currentBuffer.remaining() <=
current.getOffset()) {
+ currentOffset += currentBuffer.remaining();
+ // We assume that buffers.hasNext is true because we know we
read
+ // enough data to cover the last range.
+ currentBuffer = buffers.next();
+ }
+
+ // did we get the current range in a single read?
+ if (currentOffset + currentBuffer.remaining() >= current.getEnd())
{
+ ByteBuffer copy = currentBuffer.duplicate();
+ copy.position((int) (current.getOffset() - currentOffset));
+ copy.limit(copy.position() + current.getLength());
+ current.setChunk(copy);
+
+ } else {
+ // otherwise, build a single buffer that holds the entire range
+ ByteBuffer result =
+ allocateDirect
+ ?
ByteBuffer.allocateDirect(current.getLength())
+ : ByteBuffer.allocate(current.getLength());
+ // we know that the range spans buffers
+ ByteBuffer copy = currentBuffer.duplicate();
+ // skip over the front matter
+ copy.position((int) (current.getOffset() - currentOffset));
+ result.put(copy);
+ // advance the buffer
+ currentOffset += currentBuffer.remaining();
+ currentBuffer = buffers.next();
+ while (result.hasRemaining()) {
+ if (result.remaining() > currentBuffer.remaining()) {
+ result.put(currentBuffer.duplicate());
+ currentOffset += currentBuffer.remaining();
+ currentBuffer = buffers.next();
+ } else {
+ copy = currentBuffer.duplicate();
+ copy.limit(result.remaining());
+ result.put(copy);
+ }
+ }
+ result.flip();
+ current.setChunk(result);
+ }
+ current = (BufferChunk) current.next;
+ }
+ }
+
+ /**
+ * Find the list of ranges that should be read in a single read. The read
will stop when there
+ * is a gap, one of the ranges already has data, or we have reached the
maximum read size of
+ * 2^31.
+ *
+ * @param first the first range to read
+ * @return the last range to read
+ */
+ static BufferChunk findSingleRead(BufferChunk first) {
+ return findSingleRead(first, 0);
+ }
+
+ /**
+ * Find the list of ranges that should be read in a single read. The read
will stop when there
+ * is a gap, one of the ranges already has data, or we have reached the
maximum read size of
+ * 2^31.
+ *
+ * @param first the first range to read
+ * @param minSeekSize minimum size for seek instead of read
+ * @return the last range to read
+ */
+ private static BufferChunk findSingleRead(BufferChunk first, long
minSeekSize) {
+ BufferChunk last = first;
+ long currentEnd = first.getEnd();
+ while (last.next != null
+ && !last.next.hasData()
+ && last.next.getOffset() <= (currentEnd + minSeekSize)
+ && last.next.getEnd() - first.getOffset() < Integer.MAX_VALUE)
{
+ last = (BufferChunk) last.next;
+ currentEnd = Math.max(currentEnd, last.getEnd());
+ }
+ return last;
+ }
+
+ /**
+ * Read the list of ranges from the file by updating each range in the
list with a buffer that
+ * has the bytes from the file.
+ *
+ * <p>The ranges must be sorted, but may overlap or include holes.
+ *
+ * @param file the file to read
+ * @param zcr the zero copy shim
+ * @param list the disk ranges within the file to read
+ * @param doForceDirect allocate direct buffers
+ * @param minSeekSize the minimum gap to prefer seek vs read
+ * @param minSeekSizeTolerance allowed tolerance for extra bytes in memory
as a result of
+ * minSeekSize
+ */
+ private static void readDiskRanges(
+ FSDataInputStream file,
+ HadoopShims.ZeroCopyReaderShim zcr,
+ BufferChunkList list,
+ boolean doForceDirect,
+ int minSeekSize,
+ double minSeekSizeTolerance)
+ throws IOException {
+ BufferChunk current = list == null ? null : list.get();
+ while (current != null) {
+ while (current.hasData()) {
+ current = (BufferChunk) current.next;
+ }
+ if (zcr != null) {
+ BufferChunk last = findSingleRead(current);
+ zeroCopyReadRanges(file, zcr, current, last, doForceDirect);
+ current = (BufferChunk) last.next;
+ } else {
+ ChunkReader chunkReader = ChunkReader.create(current,
minSeekSize);
+ chunkReader.readRanges(file, doForceDirect,
minSeekSizeTolerance);
+ current = (BufferChunk) chunkReader.to.next;
+ }
+ }
+ }
+
+ /** Read the list of ranges from the file by updating each range in the
list. */
+ private static void readDiskRangesVectored(
+ VectoredReadable fileInputStream, BufferChunkList range, boolean
doForceDirect)
+ throws IOException {
+ if (range == null) {
+ return;
+ }
+
+ if (doForceDirect) {
+ throw new UnsupportedOperationException();
+ }
+
+ List<FileRange> fileRanges = new ArrayList<>();
+ Map<FileRange, BufferChunk> map = new HashMap<>();
+ BufferChunk cur = range.get();
+ while (cur != null) {
+ if (!cur.hasData()) {
+ FileRange fileRange =
FileRange.createFileRange(cur.getOffset(), cur.getLength());
+ fileRanges.add(fileRange);
+ map.put(fileRange, cur);
+ }
+ cur = (BufferChunk) cur.next;
+ }
+ fileInputStream.readVectored(fileRanges);
+
+ for (FileRange r : fileRanges) {
+ cur = map.get(r);
+ try {
+ cur.setChunk(ByteBuffer.wrap(r.getData().get()));
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(
+ FSDataInputStream file, CompressionCodec codec,
ByteBufferAllocatorPool pool)
+ throws IOException {
+ if ((codec == null
+ || ((codec instanceof DirectDecompressionCodec)
+ && ((DirectDecompressionCodec) codec).isAvailable())))
{
+ /* codec is null or is available */
+ return SHIMS.getZeroCopyReader(file, pool);
+ }
+ return null;
+ }
+
+ private static class DefaultDataReader implements DataReader {
+ private final Supplier<FileSystem> fileSystemSupplier;
+ private final Path path;
+ private final boolean useZeroCopy;
+ private final int minSeekSize;
+ private final double minSeekSizeTolerance;
+ private FSDataInputStream file;
+ private ByteBufferAllocatorPool pool;
+ private HadoopShims.ZeroCopyReaderShim zcr = null;
+ private InStream.StreamOptions options;
+ private boolean isOpen = false;
+
+ private DefaultDataReader(DataReaderProperties properties) {
+ this.fileSystemSupplier = properties.getFileSystemSupplier();
+ this.path = properties.getPath();
+ this.file = properties.getFile();
+ this.useZeroCopy = properties.getZeroCopy();
+ this.options = properties.getCompression();
+ this.minSeekSize = properties.getMinSeekSize();
+ this.minSeekSizeTolerance = properties.getMinSeekSizeTolerance();
+ }
+
+ @Override
+ public void open() throws IOException {
+ if (file == null) {
+ this.file = fileSystemSupplier.get().open(path);
+ }
+ if (useZeroCopy) {
+ // ZCR only uses codec for boolean checks.
+ pool = new ByteBufferAllocatorPool();
+ zcr = RecordReaderUtils.createZeroCopyShim(file,
options.getCodec(), pool);
+ } else {
+ zcr = null;
+ }
+ isOpen = true;
+ }
+
+ @Override
+ public OrcProto.StripeFooter readStripeFooter(StripeInformation
stripe) throws IOException {
+ if (!isOpen) {
+ open();
+ }
+ long offset = stripe.getOffset() + stripe.getIndexLength() +
stripe.getDataLength();
+ int tailLength = (int) stripe.getFooterLength();
+
+ // read the footer
+ ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
+ file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(),
tailLength);
+ return OrcProto.StripeFooter.parseFrom(
+ InStream.createCodedInputStream(
+ InStream.create(
+ "footer",
+ new BufferChunk(tailBuf, 0),
+ 0,
+ tailLength,
+ options)));
+ }
+
+ @Override
+ public BufferChunkList readFileData(BufferChunkList range, boolean
doForceDirect)
+ throws IOException {
+ SeekableInputStream wrapped =
+ ((FSDataWrappedInputStream)
file.getWrappedStream()).wrapped();
+ if (zcr == null && wrapped instanceof VectoredReadable) {
+ RecordReaderUtils.readDiskRangesVectored(
+ (VectoredReadable) wrapped, range, doForceDirect);
+ } else {
+ RecordReaderUtils.readDiskRanges(
+ file, zcr, range, doForceDirect, minSeekSize,
minSeekSizeTolerance);
+ }
+
+ return range;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (options.getCodec() != null) {
+ OrcCodecPool.returnCodec(options.getCodec().getKind(),
options.getCodec());
+ options.withCodec(null);
+ }
+ if (pool != null) {
+ pool.clear();
+ }
+ // close both zcr and file
+ try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) {
+ if (file != null) {
+ file.close();
+ file = null;
+ }
+ }
+ }
+
+ @Override
+ public boolean isTrackingDiskRanges() {
+ return zcr != null;
+ }
+
+ @Override
+ public void releaseBuffer(ByteBuffer buffer) {
+ zcr.releaseBuffer(buffer);
+ }
+
+ @Override
+ public DataReader clone() {
+ if (this.file != null) {
+ // We should really throw here, but that will cause failures
in Hive.
+ // While Hive uses clone, just log a warning.
+ LOG.warn(
+ "Cloning an opened DataReader; the stream will be
reused and closed twice");
+ }
+ try {
+ DefaultDataReader clone = (DefaultDataReader) super.clone();
+ if (options.getCodec() != null) {
+ // Make sure we don't share the same codec between two
readers.
+ clone.options = options.clone();
+ }
+ return clone;
+ } catch (CloneNotSupportedException e) {
+ throw new UnsupportedOperationException("uncloneable", e);
+ }
+ }
+
+ @Override
+ public InStream.StreamOptions getCompressionOptions() {
+ return options;
+ }
+ }
+
+ /**
+ * this is an implementation copied from ElasticByteBufferPool in
hadoop-2, which lacks a
+ * clear()/clean() operation.
+ */
+ public static final class ByteBufferAllocatorPool implements
HadoopShims.ByteBufferPoolShim {
+ private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<>();
+ private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<>();
+ private long currentGeneration = 0;
+
+ private TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
+ return direct ? directBuffers : buffers;
+ }
+
+ public void clear() {
+ buffers.clear();
+ directBuffers.clear();
+ }
+
+ @Override
+ public ByteBuffer getBuffer(boolean direct, int length) {
+ TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
+ Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new
Key(length, 0));
+ if (entry == null) {
+ return direct ? ByteBuffer.allocateDirect(length) :
ByteBuffer.allocate(length);
+ }
+ tree.remove(entry.getKey());
+ return entry.getValue();
+ }
+
+ @Override
+ public void putBuffer(ByteBuffer buffer) {
+ TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
+ Key key;
+
+ // Buffers are indexed by (capacity, generation).
+ // If our key is not unique on the first try, try again
+ do {
+ key = new Key(buffer.capacity(), currentGeneration++);
+ } while (tree.putIfAbsent(key, buffer) != null);
+ }
+
+ private static final class Key implements Comparable<Key> {
+ private final int capacity;
+ private final long insertionGeneration;
+
+ Key(int capacity, long insertionGeneration) {
+ this.capacity = capacity;
+ this.insertionGeneration = insertionGeneration;
+ }
+
+ @Override
+ public int compareTo(Key other) {
+ final int c = Integer.compare(capacity, other.capacity);
+ return (c != 0) ? c : Long.compare(insertionGeneration,
other.insertionGeneration);
+ }
+
+ @Override
+ public boolean equals(Object rhs) {
+ if (rhs instanceof Key) {
+ Key o = (Key) rhs;
+ return 0 == compareTo(o);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(capacity)
+ .append(insertionGeneration)
+ .toHashCode();
+ }
+ }
+ }
+
+ static class ChunkReader {
+ private final BufferChunk from;
+ private final BufferChunk to;
+ private final int readBytes;
+ private final int reqBytes;
+
+ private ChunkReader(BufferChunk from, BufferChunk to, int readSize,
int reqBytes) {
+ this.from = from;
+ this.to = to;
+ this.readBytes = readSize;
+ this.reqBytes = reqBytes;
+ }
+
+ static ChunkReader create(BufferChunk from, BufferChunk to) {
+ long f = Integer.MAX_VALUE;
+ long e = Integer.MIN_VALUE;
+
+ long cf = Integer.MAX_VALUE;
+ long ef = Integer.MIN_VALUE;
+ int reqBytes = 0;
+
+ BufferChunk current = from;
+ while (current != to.next) {
+ f = Math.min(f, current.getOffset());
+ e = Math.max(e, current.getEnd());
+ if (ef == Integer.MIN_VALUE || current.getOffset() <= ef) {
+ cf = Math.min(cf, current.getOffset());
+ ef = Math.max(ef, current.getEnd());
+ } else {
+ reqBytes += ef - cf;
+ cf = current.getOffset();
+ ef = current.getEnd();
+ }
+ current = (BufferChunk) current.next;
+ }
+ reqBytes += ef - cf;
+ return new ChunkReader(from, to, (int) (e - f), reqBytes);
+ }
+
+ static ChunkReader create(BufferChunk from, int minSeekSize) {
+ BufferChunk to = findSingleRead(from, minSeekSize);
+ return create(from, to);
+ }
+
+ double getExtraBytesFraction() {
+ return (readBytes - reqBytes) / ((double) reqBytes);
+ }
+
+ public int getReadBytes() {
+ return readBytes;
+ }
+
+ public int getReqBytes() {
+ return reqBytes;
+ }
+
+ public BufferChunk getFrom() {
+ return from;
+ }
+
+ public BufferChunk getTo() {
+ return to;
+ }
+
+ void populateChunks(ByteBuffer bytes, boolean allocateDirect, double
extraByteTolerance) {
+ if (getExtraBytesFraction() > extraByteTolerance) {
+ LOG.debug(
+ "ExtraBytesFraction = {}, ExtraByteTolerance = {},
reducing memory size",
+ getExtraBytesFraction(),
+ extraByteTolerance);
+ populateChunksReduceSize(bytes, allocateDirect);
+ } else {
+ LOG.debug(
+ "ExtraBytesFraction = {}, ExtraByteTolerance = {},
populating as is",
+ getExtraBytesFraction(),
+ extraByteTolerance);
+ populateChunksAsIs(bytes);
+ }
+ }
+
+ void populateChunksAsIs(ByteBuffer bytes) {
+ // populate each BufferChunks with the data
+ BufferChunk current = from;
+ long offset = from.getOffset();
+ while (current != to.next) {
+ ByteBuffer currentBytes = current == to ? bytes :
bytes.duplicate();
+ currentBytes.position((int) (current.getOffset() - offset));
+ currentBytes.limit((int) (current.getEnd() - offset));
+ current.setChunk(currentBytes);
+ current = (BufferChunk) current.next;
+ }
+ }
+
+ void populateChunksReduceSize(ByteBuffer bytes, boolean
allocateDirect) {
+ ByteBuffer newBuffer;
+ if (allocateDirect) {
+ newBuffer = ByteBuffer.allocateDirect(reqBytes);
+ newBuffer.position(reqBytes);
+ newBuffer.flip();
+ } else {
+ byte[] newBytes = new byte[reqBytes];
+ newBuffer = ByteBuffer.wrap(newBytes);
+ }
+
+ final long offset = from.getOffset();
+ int copyStart = 0;
+ int copyEnd;
+ int copyLength;
+ int skippedBytes = 0;
+ int srcPosition;
+ BufferChunk current = from;
+ while (current != to.next) {
+ // We can skip bytes as required, but no need to copy bytes
that are already copied
+ srcPosition = (int) (current.getOffset() - offset);
+ skippedBytes += Math.max(0, srcPosition - copyStart);
+ copyStart = Math.max(copyStart, srcPosition);
+ copyEnd = (int) (current.getEnd() - offset);
+ copyLength = copyStart < copyEnd ? copyEnd - copyStart : 0;
+ newBuffer.put(bytes.array(), copyStart, copyLength);
+ copyStart += copyLength;
+ // Set up new ByteBuffer that wraps on the same backing array
+ ByteBuffer currentBytes = current == to ? newBuffer :
newBuffer.duplicate();
+ currentBytes.position(srcPosition - skippedBytes);
+ currentBytes.limit(currentBytes.position() +
current.getLength());
+ current.setChunk(currentBytes);
+ current = (BufferChunk) current.next;
+ }
+ }
+
+ /**
+ * Read the data from the file based on a list of ranges in a single
read.
+ *
+ * @param file the file to read from
+ * @param allocateDirect should we use direct buffers
+ */
+ void readRanges(FSDataInputStream file, boolean allocateDirect, double
extraByteTolerance)
+ throws IOException {
+ // assume that the chunks are sorted by offset
+ long offset = from.getOffset();
+ int readSize = (int) (computeEnd(from, to) - offset);
+ byte[] buffer = new byte[readSize];
+ try {
+ file.readFully(offset, buffer, 0, buffer.length);
+ } catch (IOException e) {
+ throw new IOException(
+ String.format("Failed while reading %s %d:%d", file,
offset, buffer.length),
+ e);
+ }
+
+ // get the data into a ByteBuffer
+ ByteBuffer bytes;
+ if (allocateDirect) {
+ bytes = ByteBuffer.allocateDirect(readSize);
+ bytes.put(buffer);
+ bytes.flip();
+ } else {
+ bytes = ByteBuffer.wrap(buffer);
+ }
+
+ // populate each BufferChunks with the data
+ populateChunks(bytes, allocateDirect, extraByteTolerance);
+ }
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/fs/FSDataWrappedInputStream.java
b/paimon-format/src/main/java/org/apache/paimon/format/fs/FSDataWrappedInputStream.java
new file mode 100644
index 000000000..576fd8afe
--- /dev/null
+++
b/paimon-format/src/main/java/org/apache/paimon/format/fs/FSDataWrappedInputStream.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.fs;
+
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.IOUtils;
+
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/** A {@link InputStream} to wrap {@link SeekableInputStream} for Paimon's
input streams. */
+public class FSDataWrappedInputStream extends InputStream implements Seekable,
PositionedReadable {
+
+ private final SeekableInputStream seekableInputStream;
+
+ public FSDataWrappedInputStream(SeekableInputStream seekableInputStream) {
+ this.seekableInputStream = seekableInputStream;
+ }
+
+ public SeekableInputStream wrapped() {
+ return seekableInputStream;
+ }
+
+ @Override
+ public int read() throws IOException {
+ return seekableInputStream.read();
+ }
+
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
+ seekableInputStream.seek(position);
+ return seekableInputStream.read(buffer, offset, length);
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int
length) throws IOException {
+ seekableInputStream.seek(position);
+ IOUtils.readFully(seekableInputStream, buffer, offset, length);
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ readFully(position, buffer, 0, buffer.length);
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ seekableInputStream.seek(pos);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return seekableInputStream.getPos();
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ seekableInputStream.close();
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java
b/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java
index 1dcc0da00..57454368c 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/fs/HadoopReadOnlyFileSystem.java
@@ -19,21 +19,16 @@
package org.apache.paimon.format.fs;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.utils.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
-import java.io.InputStream;
import java.net.URI;
/** A read only {@link FileSystem} that wraps an {@link FileIO}. */
@@ -138,58 +133,4 @@ public class HadoopReadOnlyFileSystem extends FileSystem {
public boolean mkdirs(Path path, FsPermission fsPermission) {
throw new UnsupportedOperationException();
}
-
- /** A {@link InputStream} to wrap {@link SeekableInputStream} for Paimon's
input streams. */
- private static class FSDataWrappedInputStream extends InputStream
- implements Seekable, PositionedReadable {
-
- private final SeekableInputStream seekableInputStream;
-
- private FSDataWrappedInputStream(SeekableInputStream
seekableInputStream) {
- this.seekableInputStream = seekableInputStream;
- }
-
- @Override
- public int read() throws IOException {
- return seekableInputStream.read();
- }
-
- @Override
- public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
- seekableInputStream.seek(position);
- return seekableInputStream.read(buffer, offset, length);
- }
-
- @Override
- public void readFully(long position, byte[] buffer, int offset, int
length)
- throws IOException {
- seekableInputStream.seek(position);
- IOUtils.readFully(seekableInputStream, buffer, offset, length);
- }
-
- @Override
- public void readFully(long position, byte[] buffer) throws IOException
{
- readFully(position, buffer, 0, buffer.length);
- }
-
- @Override
- public void seek(long pos) throws IOException {
- seekableInputStream.seek(pos);
- }
-
- @Override
- public long getPos() throws IOException {
- return seekableInputStream.getPos();
- }
-
- @Override
- public boolean seekToNewSource(long targetPos) {
- return false;
- }
-
- @Override
- public void close() throws IOException {
- seekableInputStream.close();
- }
- }
}