This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d2ddaa9bf [core] Introduce file-reader-async-threshold to speed up 
merging (#2118)
d2ddaa9bf is described below

commit d2ddaa9bfb865a93b12c7a0294cf936071bbc08b
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Oct 13 17:49:03 2023 +0800

    [core] Introduce file-reader-async-threshold to speed up merging (#2118)
---
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  10 ++
 .../apache/paimon/format/FormatReaderFactory.java  |   3 +
 .../src/main/java/org/apache/paimon/fs/FileIO.java |   3 +
 .../java/org/apache/paimon/KeyValueFileStore.java  |   3 +-
 .../org/apache/paimon/casting/CastExecutor.java    |   3 +
 .../apache/paimon/format/FileFormatDiscover.java   |   7 +-
 .../paimon/io/KeyValueDataFileRecordReader.java    |   7 +-
 .../paimon/io/KeyValueFileReaderFactory.java       |  69 ++++++---
 .../apache/paimon/mergetree/MergeTreeReaders.java  |   5 +-
 .../paimon/operation/KeyValueFileStoreRead.java    |   8 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   |   7 +-
 .../paimon/schema/KeyValueFieldsExtractor.java     |   4 +
 .../org/apache/paimon/schema/SchemaManager.java    |   2 +
 .../table/ChangelogValueCountFileStoreTable.java   |   1 +
 .../paimon/table/ChangelogWithKeyTableUtils.java   |   1 +
 .../org/apache/paimon/utils/AsyncRecordReader.java | 122 ++++++++++++++++
 .../org/apache/paimon/utils/BulkFormatMapping.java |   4 +-
 .../java/org/apache/paimon/utils/FileUtils.java    |   7 +-
 .../IOExceptionSupplier.java}                      |  23 ++-
 .../paimon/io/KeyValueFileReadWriteTest.java       |  10 +-
 .../paimon/mergetree/ContainsLevelsTest.java       |   8 +-
 .../apache/paimon/mergetree/LookupLevelsTest.java  |   8 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |   3 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  38 +++++
 .../apache/paimon/utils/AsyncRecordReaderTest.java | 161 +++++++++++++++++++++
 .../flink/source/TestChangelogDataReadWrite.java   |   3 +-
 .../apache/paimon/format/avro/AvroBulkFormat.java  |   6 +
 .../apache/paimon/format/orc/OrcFileFormat.java    |   2 +
 .../apache/paimon/format/orc/OrcReaderFactory.java |  10 +-
 .../format/parquet/ParquetReaderFactory.java       |   6 +
 31 files changed, 499 insertions(+), 51 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index ee89b81ee..5e3094e89 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -146,6 +146,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether only overwrite dynamic partition when overwriting a 
partitioned table with dynamic partition columns. Works only when the table has 
partition keys.</td>
         </tr>
+        <tr>
+            <td><h5>file-reader-async-threshold</h5></td>
+            <td style="word-wrap: break-word;">10 mb</td>
+            <td>MemorySize</td>
+            <td>The threshold for read file async.</td>
+        </tr>
         <tr>
             <td><h5>file.compression</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 1935e9dd7..bd67a5fa2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -904,6 +904,12 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "The bytes of types (CHAR, VARCHAR, BINARY, 
VARBINARY) devote to the zorder sort.");
 
+    public static final ConfigOption<MemorySize> FILE_READER_ASYNC_THRESHOLD =
+            key("file-reader-async-threshold")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(10))
+                    .withDescription("The threshold for read file async.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -992,6 +998,10 @@ public class CoreOptions implements Serializable {
         return options.get(FILE_COMPRESSION);
     }
 
+    public MemorySize fileReaderAsyncThreshold() {
+        return options.get(FILE_READER_ASYNC_THRESHOLD);
+    }
+
     public int snapshotNumRetainMin() {
         return options.get(SNAPSHOT_NUM_RETAINED_MIN);
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
index 7e7855016..b2b179159 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
@@ -30,4 +30,7 @@ import java.io.Serializable;
 public interface FormatReaderFactory extends Serializable {
 
     RecordReader<InternalRow> createReader(FileIO fileIO, Path file) throws 
IOException;
+
+    RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int 
poolSize)
+            throws IOException;
 }
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
index 535791246..819d70223 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java
@@ -26,6 +26,8 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.concurrent.ThreadSafe;
+
 import java.io.BufferedReader;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -53,6 +55,7 @@ import static org.apache.paimon.fs.FileIOUtils.checkAccess;
  * @since 0.4.0
  */
 @Public
+@ThreadSafe
 public interface FileIO extends Serializable {
 
     Logger LOG = LoggerFactory.getLogger(FileIO.class);
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index a6b22e7c3..ef122ee78 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -116,7 +116,8 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 mfFactory,
                 FileFormatDiscover.of(options),
                 pathFactory(),
-                keyValueFieldsExtractor);
+                keyValueFieldsExtractor,
+                options);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java 
b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
index 10b831da3..8f6499dd2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
@@ -18,12 +18,15 @@
 
 package org.apache.paimon.casting;
 
+import javax.annotation.concurrent.ThreadSafe;
+
 /**
  * Interface to model a function that performs the casting of a value from one 
type to another.
  *
  * @param <IN> Input internal type
  * @param <OUT> Output internal type
  */
+@ThreadSafe
 public interface CastExecutor<IN, OUT> {
 
     /** Cast the input value. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java 
b/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java
index d0765efec..f983e2690 100644
--- a/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java
+++ b/paimon-core/src/main/java/org/apache/paimon/format/FileFormatDiscover.java
@@ -20,14 +20,17 @@ package org.apache.paimon.format;
 
 import org.apache.paimon.CoreOptions;
 
-import java.util.HashMap;
+import javax.annotation.concurrent.ThreadSafe;
+
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** A class to discover {@link FileFormat}. */
+@ThreadSafe
 public interface FileFormatDiscover {
 
     static FileFormatDiscover of(CoreOptions options) {
-        Map<String, FileFormat> formats = new HashMap<>();
+        Map<String, FileFormat> formats = new ConcurrentHashMap<>();
         return new FileFormatDiscover() {
 
             @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index 8690e3217..ca6fdc89f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -49,10 +49,15 @@ public class KeyValueDataFileRecordReader implements 
RecordReader<KeyValue> {
             RowType keyType,
             RowType valueType,
             int level,
+            @Nullable Integer poolSize,
             @Nullable int[] indexMapping,
             @Nullable CastFieldGetter[] castMapping)
             throws IOException {
-        this.reader = FileUtils.createFormatReader(fileIO, readerFactory, 
path);
+        FileUtils.checkExists(fileIO, path);
+        this.reader =
+                poolSize == null
+                        ? readerFactory.createReader(fileIO, path)
+                        : readerFactory.createReader(fileIO, path, poolSize);
         this.serializer = new KeyValueSerializer(keyType, valueType);
         this.level = level;
         this.indexMapping = indexMapping;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 71fbbec9a..a2eea014d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.io;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.format.FileFormatDiscover;
@@ -27,8 +28,8 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.AsyncRecordReader;
 import org.apache.paimon.utils.BulkFormatMapping;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Projection;
@@ -40,6 +41,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 
 /** Factory to create {@link RecordReader}s for reading {@link KeyValue} 
files. */
 public class KeyValueFileReaderFactory {
@@ -51,8 +53,10 @@ public class KeyValueFileReaderFactory {
     private final RowType valueType;
 
     private final BulkFormatMapping.BulkFormatMappingBuilder 
bulkFormatMappingBuilder;
-    private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
     private final DataFilePathFactory pathFactory;
+    private final long asyncThreshold;
+
+    private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
 
     private KeyValueFileReaderFactory(
             FileIO fileIO,
@@ -61,7 +65,8 @@ public class KeyValueFileReaderFactory {
             RowType keyType,
             RowType valueType,
             BulkFormatMapping.BulkFormatMappingBuilder 
bulkFormatMappingBuilder,
-            DataFilePathFactory pathFactory) {
+            DataFilePathFactory pathFactory,
+            long asyncThreshold) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.schemaId = schemaId;
@@ -69,21 +74,41 @@ public class KeyValueFileReaderFactory {
         this.valueType = valueType;
         this.bulkFormatMappingBuilder = bulkFormatMappingBuilder;
         this.pathFactory = pathFactory;
+        this.asyncThreshold = asyncThreshold;
         this.bulkFormatMappings = new HashMap<>();
     }
 
-    public RecordReader<KeyValue> createRecordReader(long schemaId, String 
fileName, int level)
+    public RecordReader<KeyValue> createRecordReader(
+            long schemaId, String fileName, long fileSize, int level) throws 
IOException {
+        if (fileSize >= asyncThreshold && fileName.endsWith("orc")) {
+            return new AsyncRecordReader<>(
+                    () -> createRecordReader(schemaId, fileName, level, false, 
2));
+        }
+        return createRecordReader(schemaId, fileName, level, true, null);
+    }
+
+    private RecordReader<KeyValue> createRecordReader(
+            long schemaId,
+            String fileName,
+            int level,
+            boolean reuseFormat,
+            @Nullable Integer poolSize)
             throws IOException {
         String formatIdentifier = 
DataFilePathFactory.formatIdentifier(fileName);
+
+        Supplier<BulkFormatMapping> formatSupplier =
+                () ->
+                        bulkFormatMappingBuilder.build(
+                                formatIdentifier,
+                                schemaManager.schema(this.schemaId),
+                                schemaManager.schema(schemaId));
+
         BulkFormatMapping bulkFormatMapping =
-                bulkFormatMappings.computeIfAbsent(
-                        new FormatKey(schemaId, formatIdentifier),
-                        key -> {
-                            TableSchema tableSchema = 
schemaManager.schema(this.schemaId);
-                            TableSchema dataSchema = 
schemaManager.schema(key.schemaId);
-                            return bulkFormatMappingBuilder.build(
-                                    formatIdentifier, tableSchema, dataSchema);
-                        });
+                reuseFormat
+                        ? bulkFormatMappings.computeIfAbsent(
+                                new FormatKey(schemaId, formatIdentifier),
+                                key -> formatSupplier.get())
+                        : formatSupplier.get();
         return new KeyValueDataFileRecordReader(
                 fileIO,
                 bulkFormatMapping.getReaderFactory(),
@@ -91,6 +116,7 @@ public class KeyValueFileReaderFactory {
                 keyType,
                 valueType,
                 level,
+                poolSize,
                 bulkFormatMapping.getIndexMapping(),
                 bulkFormatMapping.getCastMapping());
     }
@@ -103,7 +129,8 @@ public class KeyValueFileReaderFactory {
             RowType valueType,
             FileFormatDiscover formatDiscover,
             FileStorePathFactory pathFactory,
-            KeyValueFieldsExtractor extractor) {
+            KeyValueFieldsExtractor extractor,
+            CoreOptions options) {
         return new Builder(
                 fileIO,
                 schemaManager,
@@ -112,7 +139,8 @@ public class KeyValueFileReaderFactory {
                 valueType,
                 formatDiscover,
                 pathFactory,
-                extractor);
+                extractor,
+                options);
     }
 
     /** Builder for {@link KeyValueFileReaderFactory}. */
@@ -126,8 +154,9 @@ public class KeyValueFileReaderFactory {
         private final FileFormatDiscover formatDiscover;
         private final FileStorePathFactory pathFactory;
         private final KeyValueFieldsExtractor extractor;
-
         private final int[][] fullKeyProjection;
+        private final CoreOptions options;
+
         private int[][] keyProjection;
         private int[][] valueProjection;
         private RowType projectedKeyType;
@@ -141,7 +170,8 @@ public class KeyValueFileReaderFactory {
                 RowType valueType,
                 FileFormatDiscover formatDiscover,
                 FileStorePathFactory pathFactory,
-                KeyValueFieldsExtractor extractor) {
+                KeyValueFieldsExtractor extractor,
+                CoreOptions options) {
             this.fileIO = fileIO;
             this.schemaManager = schemaManager;
             this.schemaId = schemaId;
@@ -152,6 +182,7 @@ public class KeyValueFileReaderFactory {
             this.extractor = extractor;
 
             this.fullKeyProjection = Projection.range(0, 
keyType.getFieldCount()).toNestedIndexes();
+            this.options = options;
             this.keyProjection = fullKeyProjection;
             this.valueProjection = Projection.range(0, 
valueType.getFieldCount()).toNestedIndexes();
             applyProjection();
@@ -166,7 +197,8 @@ public class KeyValueFileReaderFactory {
                     valueType,
                     formatDiscover,
                     pathFactory,
-                    extractor);
+                    extractor,
+                    options);
         }
 
         public Builder withKeyProjection(int[][] projection) {
@@ -205,7 +237,8 @@ public class KeyValueFileReaderFactory {
                     projectedValueType,
                     BulkFormatMapping.newBuilder(
                             formatDiscover, extractor, keyProjection, 
valueProjection, filters),
-                    pathFactory.createDataFilePathFactory(partition, bucket));
+                    pathFactory.createDataFilePathFactory(partition, bucket),
+                    options.fileReaderAsyncThreshold().getBytes());
         }
 
         private void applyProjection() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
index aa2bb4a30..3fe28c073 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java
@@ -86,7 +86,10 @@ public class MergeTreeReaders {
             readers.add(
                     () ->
                             readerFactory.createRecordReader(
-                                    file.schemaId(), file.fileName(), 
file.level()));
+                                    file.schemaId(),
+                                    file.fileName(),
+                                    file.fileSize(),
+                                    file.level()));
         }
         return ConcatRecordReader.create(readers);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
index 0a2a31dcf..d3a3615bc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java
@@ -95,7 +95,8 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
             MergeFunctionFactory<KeyValue> mfFactory,
             FileFormatDiscover formatDiscover,
             FileStorePathFactory pathFactory,
-            KeyValueFieldsExtractor extractor) {
+            KeyValueFieldsExtractor extractor,
+            CoreOptions options) {
         this.tableSchema = schemaManager.schema(schemaId);
         this.readerFactoryBuilder =
                 KeyValueFileReaderFactory.builder(
@@ -106,7 +107,8 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
                         valueType,
                         formatDiscover,
                         pathFactory,
-                        extractor);
+                        extractor,
+                        options);
         this.keyComparator = keyComparator;
         this.mfFactory = mfFactory;
         this.valueCountMode = tableSchema.trimmedPrimaryKeys().isEmpty();
@@ -261,7 +263,7 @@ public class KeyValueFileStoreRead implements 
FileStoreRead<KeyValue> {
                         // See comments on DataFileMeta#extraFiles.
                         String fileName = 
changelogFile(file).orElse(file.fileName());
                         return readerFactory.createRecordReader(
-                                file.schemaId(), fileName, file.level());
+                                file.schemaId(), fileName, file.fileSize(), 
file.level());
                     });
         }
         return ConcatRecordReader.create(suppliers);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 6a27d67d2..6f4735be3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -115,7 +115,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         valueType,
                         FileFormatDiscover.of(options),
                         pathFactory,
-                        extractor);
+                        extractor,
+                        options);
         this.writerFactoryBuilder =
                 KeyValueFileWriterFactory.builder(
                         fileIO,
@@ -266,7 +267,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 valueType,
                 file ->
                         readerFactory.createRecordReader(
-                                file.schemaId(), file.fileName(), 
file.level()),
+                                file.schemaId(), file.fileName(), 
file.fileSize(), file.level()),
                 () -> ioManager.createChannel().getPathFile(),
                 new HashLookupStoreFactory(
                         cacheManager,
@@ -287,7 +288,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 keyType,
                 file ->
                         readerFactory.createRecordReader(
-                                file.schemaId(), file.fileName(), 
file.level()),
+                                file.schemaId(), file.fileName(), 
file.fileSize(), file.level()),
                 () -> ioManager.createChannel().getPathFile(),
                 new HashLookupStoreFactory(
                         cacheManager,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/KeyValueFieldsExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/schema/KeyValueFieldsExtractor.java
index 245c4b3a2..1c1d8d7a7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/schema/KeyValueFieldsExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/schema/KeyValueFieldsExtractor.java
@@ -20,11 +20,15 @@ package org.apache.paimon.schema;
 
 import org.apache.paimon.types.DataField;
 
+import javax.annotation.concurrent.ThreadSafe;
+
 import java.io.Serializable;
 import java.util.List;
 
 /** Extractor of schema for different tables. */
+@ThreadSafe
 public interface KeyValueFieldsExtractor extends Serializable {
+
     /**
      * Extract key fields from table schema.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 15c4e4b04..42f136c1a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -45,6 +45,7 @@ import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -66,6 +67,7 @@ import static 
org.apache.paimon.utils.FileUtils.listVersionedFiles;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** Schema Manager to manage schema versions. */
+@ThreadSafe
 public class SchemaManager implements Serializable {
 
     private static final String SCHEMA_PREFIX = "schema-";
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
index bb7a9a9b4..f7a457f1e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
@@ -191,6 +191,7 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
      * {@link KeyValueFieldsExtractor} implementation for {@link 
ChangelogValueCountFileStoreTable}.
      */
     static class ValueCountTableKeyValueFieldsExtractor implements 
KeyValueFieldsExtractor {
+
         private static final long serialVersionUID = 1L;
 
         static final ValueCountTableKeyValueFieldsExtractor EXTRACTOR =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
index 36770365f..0ae8cc3cd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
@@ -83,6 +83,7 @@ public class ChangelogWithKeyTableUtils {
     }
 
     static class ChangelogWithKeyKeyValueFieldsExtractor implements 
KeyValueFieldsExtractor {
+
         private static final long serialVersionUID = 1L;
 
         static final ChangelogWithKeyKeyValueFieldsExtractor EXTRACTOR =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/AsyncRecordReader.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/AsyncRecordReader.java
new file mode 100644
index 000000000..123b02453
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/AsyncRecordReader.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.reader.RecordReader;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/** A {@link RecordReader} to use ASYNC_EXECUTOR to read records async. */
+public class AsyncRecordReader<T> implements RecordReader<T> {
+
+    private static final ExecutorService ASYNC_EXECUTOR =
+            Executors.newCachedThreadPool(new 
ExecutorThreadFactory("paimon-reader-async-thread"));
+
+    private final BlockingQueue<Element> queue;
+    private final Future<Void> future;
+    private final ClassLoader classLoader;
+
+    private boolean isEnd = false;
+
+    public AsyncRecordReader(IOExceptionSupplier<RecordReader<T>> supplier) {
+        this.queue = new LinkedBlockingQueue<>();
+        this.future = ASYNC_EXECUTOR.submit(() -> asyncRead(supplier));
+        this.classLoader = Thread.currentThread().getContextClassLoader();
+    }
+
+    private Void asyncRead(IOExceptionSupplier<RecordReader<T>> supplier) 
throws IOException {
+        // set classloader, otherwise, its classloader belongs to its creator. 
It is possible that
+        // its creator's classloader has already exited, which will cause 
subsequent reads to report
+        // exceptions
+        Thread.currentThread().setContextClassLoader(classLoader);
+
+        try (RecordReader<T> reader = supplier.get()) {
+            while (true) {
+                RecordIterator<T> batch = reader.readBatch();
+                if (batch == null) {
+                    queue.add(new Element(true, null));
+                    return null;
+                }
+
+                queue.add(new Element(false, batch));
+            }
+        }
+    }
+
+    @Nullable
+    @Override
+    public RecordIterator<T> readBatch() throws IOException {
+        if (isEnd) {
+            return null;
+        }
+
+        try {
+            Element element;
+            do {
+                element = queue.poll(2, TimeUnit.SECONDS);
+                checkException();
+            } while (element == null);
+
+            if (element.isEnd) {
+                isEnd = true;
+                return null;
+            }
+
+            return element.batch;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException(e);
+        }
+    }
+
+    private void checkException() throws IOException, InterruptedException {
+        if (future.isDone()) {
+            try {
+                future.get();
+            } catch (ExecutionException e) {
+                throw new IOException(e.getCause());
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        future.cancel(true);
+    }
+
+    private class Element {
+
+        private final boolean isEnd;
+        private final RecordIterator<T> batch;
+
+        private Element(boolean isEnd, RecordIterator<T> batch) {
+            this.isEnd = isEnd;
+            this.batch = batch;
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java
index 3b6488c6c..e1691381a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BulkFormatMapping.java
@@ -36,6 +36,7 @@ import java.util.List;
 
 /** Class with index mapping and bulk format. */
 public class BulkFormatMapping {
+
     @Nullable private final int[] indexMapping;
     @Nullable private final CastFieldGetter[] castMapping;
     private final FormatReaderFactory bulkFormat;
@@ -75,6 +76,7 @@ public class BulkFormatMapping {
 
     /** Builder to build {@link BulkFormatMapping}. */
     public static class BulkFormatMappingBuilder {
+
         private final FileFormatDiscover formatDiscover;
         private final KeyValueFieldsExtractor extractor;
         private final int[][] keyProjection;
@@ -117,7 +119,7 @@ public class BulkFormatMapping {
             int[][] dataProjection =
                     KeyValue.project(dataKeyProjection, dataValueProjection, 
dataKeyFields.size());
 
-            /**
+            /*
              * We need to create index mapping on projection instead of key 
and value separately
              * here, for example
              *
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
index e41ba0241..faa27afa4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
@@ -112,8 +112,7 @@ public class FileUtils {
                 .filter(status -> 
status.getPath().getName().startsWith(prefix));
     }
 
-    public static RecordReader<InternalRow> createFormatReader(
-            FileIO fileIO, FormatReaderFactory format, Path file) throws 
IOException {
+    public static void checkExists(FileIO fileIO, Path file) throws 
IOException {
         if (!fileIO.exists(file)) {
             throw new FileNotFoundException(
                     String.format(
@@ -124,7 +123,11 @@ public class FileUtils {
                                     + " (For example, increasing 
parallelism).",
                             file));
         }
+    }
 
+    public static RecordReader<InternalRow> createFormatReader(
+            FileIO fileIO, FormatReaderFactory format, Path file) throws 
IOException {
+        checkExists(fileIO, file);
         return format.createReader(fileIO, file);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/IOExceptionSupplier.java
similarity index 64%
copy from paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
copy to 
paimon-core/src/main/java/org/apache/paimon/utils/IOExceptionSupplier.java
index 10b831da3..347a8991a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutor.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/IOExceptionSupplier.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,16 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.casting;
+package org.apache.paimon.utils;
+
+import java.io.IOException;
+import java.util.function.Supplier;
 
 /**
- * Interface to model a function that performs the casting of a value from one 
type to another.
+ * A {@link Supplier} throws {@link IOException}.
  *
- * @param <IN> Input internal type
- * @param <OUT> Output internal type
+ * @param <T> the type of results supplied by this supplier
  */
-public interface CastExecutor<IN, OUT> {
+@FunctionalInterface
+public interface IOExceptionSupplier<T> {
 
-    /** Cast the input value. */
-    OUT cast(IN value);
+    /**
+     * Gets a result.
+     *
+     * @return a result
+     */
+    T get() throws IOException;
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index dbfcbcd0b..aab4a5f98 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -75,7 +75,7 @@ public class KeyValueFileReadWriteTest {
     public void testReadNonExistentFile() {
         KeyValueFileReaderFactory readerFactory =
                 createReaderFactory(tempDir.toString(), "avro", null, null);
-        assertThatThrownBy(() -> readerFactory.createRecordReader(0, 
"dummy_file.avro", 0))
+        assertThatThrownBy(() -> readerFactory.createRecordReader(0, 
"dummy_file.avro", 1, 0))
                 .hasMessageContaining(
                         "you can configure 'snapshot.time-retained' option 
with a larger value.");
     }
@@ -285,7 +285,8 @@ public class KeyValueFileReadWriteTest {
                         DEFAULT_ROW_TYPE,
                         ignore -> new FlushingFileFormat(format),
                         pathFactory,
-                        new 
TestKeyValueGenerator.TestKeyValueFieldsExtractor());
+                        new 
TestKeyValueGenerator.TestKeyValueFieldsExtractor(),
+                        new CoreOptions(new HashMap<>()));
         if (keyProjection != null) {
             builder.withKeyProjection(keyProjection);
         }
@@ -310,7 +311,10 @@ public class KeyValueFileReadWriteTest {
             CloseableIterator<KeyValue> actualKvsIterator =
                     new RecordReaderIterator<>(
                             readerFactory.createRecordReader(
-                                    meta.schemaId(), meta.fileName(), 
meta.level()));
+                                    meta.schemaId(),
+                                    meta.fileName(),
+                                    meta.fileSize(),
+                                    meta.level()));
             while (actualKvsIterator.hasNext()) {
                 assertThat(expectedIterator.hasNext()).isTrue();
                 KeyValue actualKv = actualKvsIterator.next();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
index 424fe9315..28c86b754 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java
@@ -180,7 +180,10 @@ public class ContainsLevelsTest {
                 levels,
                 comparator,
                 keyType,
-                file -> createReaderFactory().createRecordReader(0, 
file.fileName(), file.level()),
+                file ->
+                        createReaderFactory()
+                                .createRecordReader(
+                                        0, file.fileName(), file.fileSize(), 
file.level()),
                 () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new HashLookupStoreFactory(new CacheManager(2048, 
MemorySize.ofMebiBytes(1)), 0.75),
                 Duration.ofHours(1),
@@ -239,7 +242,8 @@ public class ContainsLevelsTest {
                             public List<DataField> valueFields(TableSchema 
schema) {
                                 return schema.fields();
                             }
-                        });
+                        },
+                        new CoreOptions(new HashMap<>()));
         return builder.build(BinaryRow.EMPTY_ROW, 0);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
index ef77b75e0..35b847bcf 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java
@@ -204,7 +204,10 @@ public class LookupLevelsTest {
                 comparator,
                 keyType,
                 rowType,
-                file -> createReaderFactory().createRecordReader(0, 
file.fileName(), file.level()),
+                file ->
+                        createReaderFactory()
+                                .createRecordReader(
+                                        0, file.fileName(), file.fileSize(), 
file.level()),
                 () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + 
UUID.randomUUID()),
                 new HashLookupStoreFactory(new CacheManager(2048, 
MemorySize.ofMebiBytes(1)), 0.75),
                 Duration.ofHours(1),
@@ -263,7 +266,8 @@ public class LookupLevelsTest {
                             public List<DataField> valueFields(TableSchema 
schema) {
                                 return schema.fields();
                             }
-                        });
+                        },
+                        new CoreOptions(new HashMap<>()));
         return builder.build(BinaryRow.EMPTY_ROW, 0);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 50743892b..718d8714a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -168,7 +168,8 @@ public abstract class MergeTreeTestBase {
                                                 "v",
                                                 new 
org.apache.paimon.types.IntType(false)));
                             }
-                        });
+                        },
+                        new CoreOptions(new HashMap<>()));
         readerFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 0);
         compactReaderFactory = readerFactoryBuilder.build(BinaryRow.EMPTY_ROW, 
0);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index ae5e6c7ef..02a8ef6f3 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -66,6 +66,7 @@ import org.junit.jupiter.api.Test;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -110,6 +111,43 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                             + " "
                             + COMPATIBILITY_BATCH_ROW_TO_STRING.apply(rowData);
 
+    @Test
+    public void testAsyncReader() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        table =
+                table.copy(
+                        Collections.singletonMap(
+                                CoreOptions.FILE_READER_ASYNC_THRESHOLD.key(), 
"1 b"));
+
+        Map<Integer, GenericRow> rows = new HashMap<>();
+        for (int i = 0; i < 20; i++) {
+            BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+            BatchTableWrite write = writeBuilder.newWrite();
+            BatchTableCommit commit = writeBuilder.newCommit();
+            for (int j = 0; j < 1000; j++) {
+                GenericRow row = rowData(1, i * j, 100L * i * j);
+                rows.put(row.getInt(1), row);
+                write.write(row);
+            }
+            commit.commit(write.prepareCommit());
+            write.close();
+            commit.close();
+        }
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        TableRead read = readBuilder.newRead();
+
+        Function<InternalRow, String> toString =
+                r -> r.getInt(0) + "|" + r.getInt(1) + "|" + r.getLong(2);
+        String[] expected =
+                rows.values().stream()
+                        .sorted(Comparator.comparingInt(o -> o.getInt(1)))
+                        .map(toString)
+                        .toArray(String[]::new);
+        assertThat(getResult(read, splits, 
toString)).containsExactly(expected);
+    }
+
     @Test
     public void testBatchWriteBuilder() throws Exception {
         FileStoreTable table = createFileStoreTable();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/AsyncRecordReaderTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/AsyncRecordReaderTest.java
new file mode 100644
index 000000000..41039aabb
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/AsyncRecordReaderTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.reader.RecordReader;
+
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link AsyncRecordReader}. */
+public class AsyncRecordReaderTest {
+
+    @Test
+    public void testNormal() throws IOException {
+        Queue<List<Integer>> queue = new LinkedList<>();
+        queue.add(Arrays.asList(1, 5, 6));
+        queue.add(Arrays.asList(4, 6, 8));
+        queue.add(Arrays.asList(9, 1));
+        AtomicInteger released = new AtomicInteger(0);
+        AtomicBoolean closed = new AtomicBoolean(false);
+        RecordReader<Integer> reader =
+                new RecordReader<Integer>() {
+                    @Nullable
+                    @Override
+                    public RecordIterator<Integer> readBatch() {
+                        List<Integer> values = queue.poll();
+                        if (values == null) {
+                            return null;
+                        }
+                        Queue<Integer> vQueue = new LinkedList<>(values);
+                        return new RecordIterator<Integer>() {
+                            @Nullable
+                            @Override
+                            public Integer next() {
+                                return vQueue.poll();
+                            }
+
+                            @Override
+                            public void releaseBatch() {
+                                released.incrementAndGet();
+                            }
+                        };
+                    }
+
+                    @Override
+                    public void close() {
+                        closed.set(true);
+                    }
+                };
+
+        AsyncRecordReader<Integer> asyncReader = new AsyncRecordReader<>(() -> 
reader);
+        List<Integer> results = new ArrayList<>();
+        asyncReader.forEachRemaining(results::add);
+        assertThat(results).containsExactly(1, 5, 6, 4, 6, 8, 9, 1);
+        assertThat(released.get()).isEqualTo(3);
+        assertThat(closed.get()).isTrue();
+    }
+
+    @Test
+    public void testNonBlockingWhenException() {
+        String message = "Test Exception";
+        RecordReader<Integer> reader =
+                new RecordReader<Integer>() {
+                    @Nullable
+                    @Override
+                    public RecordIterator<Integer> readBatch() {
+                        throw new RuntimeException(message);
+                    }
+
+                    @Override
+                    public void close() {}
+                };
+
+        AsyncRecordReader<Integer> asyncReader = new AsyncRecordReader<>(() -> 
reader);
+        assertThatThrownBy(() -> asyncReader.forEachRemaining(v -> {}))
+                .hasMessageContaining(message);
+    }
+
+    @Test
+    public void testClassLoader() throws IOException {
+        ClassLoader goodClassLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            ClassLoader badClassLoader =
+                    new ClassLoader() {
+                        @Override
+                        public Class<?> loadClass(String name) {
+                            throw new RuntimeException();
+                        }
+                    };
+            Thread.currentThread().setContextClassLoader(badClassLoader);
+
+            RecordReader<Integer> reader1 =
+                    new RecordReader<Integer>() {
+                        @Nullable
+                        @Override
+                        public RecordIterator<Integer> readBatch() {
+                            return null;
+                        }
+
+                        @Override
+                        public void close() {}
+                    };
+
+            AsyncRecordReader<Integer> asyncReader = new 
AsyncRecordReader<>(() -> reader1);
+            asyncReader.forEachRemaining(v -> {});
+
+            Thread.currentThread().setContextClassLoader(goodClassLoader);
+            RecordReader<Integer> reader2 =
+                    new RecordReader<Integer>() {
+                        @Nullable
+                        @Override
+                        public RecordIterator<Integer> readBatch() {
+                            try {
+                                Thread.currentThread()
+                                        .getContextClassLoader()
+                                        
.loadClass(AsyncRecordReaderTest.class.getName());
+                            } catch (ClassNotFoundException e) {
+                                throw new RuntimeException(e);
+                            }
+                            return null;
+                        }
+
+                        @Override
+                        public void close() {}
+                    };
+
+            asyncReader = new AsyncRecordReader<>(() -> reader2);
+            asyncReader.forEachRemaining(v -> {});
+        } finally {
+            Thread.currentThread().setContextClassLoader(goodClassLoader);
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index a8741b450..16e1942cd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -134,7 +134,8 @@ public class TestChangelogDataReadWrite {
                         DeduplicateMergeFunction.factory(),
                         ignore -> avro,
                         pathFactory,
-                        EXTRACTOR);
+                        EXTRACTOR,
+                        new CoreOptions(new HashMap<>()));
         return new KeyValueTableRead(read) {
             @Override
             public KeyValueTableRead withFilter(Predicate predicate) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
index 0f5867e64..594988ab6 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
@@ -55,6 +55,12 @@ public class AvroBulkFormat implements FormatReaderFactory {
         return new AvroReader(fileIO, file);
     }
 
+    @Override
+    public RecordReader<InternalRow> createReader(FileIO fileIO, Path file, 
int poolSize)
+            throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
     private class AvroReader implements RecordReader<InternalRow> {
 
         private final FileIO fileIO;
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index e09a72999..62957f24a 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -47,6 +47,7 @@ import org.apache.paimon.utils.Projection;
 import org.apache.orc.TypeDescription;
 
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -57,6 +58,7 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.types.DataTypeChecks.getFieldTypes;
 
 /** Orc {@link FileFormat}. */
+@ThreadSafe
 public class OrcFileFormat extends FileFormat {
 
     public static final String IDENTIFIER = "orc";
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 3b794f039..02bb65802 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -94,7 +94,13 @@ public class OrcReaderFactory implements FormatReaderFactory 
{
 
     @Override
     public OrcVectorizedReader createReader(FileIO fileIO, Path file) throws 
IOException {
-        Pool<OrcReaderBatch> poolOfBatches = createPoolOfBatches(1);
+        return createReader(fileIO, file, 1);
+    }
+
+    @Override
+    public OrcVectorizedReader createReader(FileIO fileIO, Path file, int 
poolSize)
+            throws IOException {
+        Pool<OrcReaderBatch> poolOfBatches = createPoolOfBatches(poolSize);
         RecordReader orcReader =
                 createRecordReader(
                         hadoopConfigWrapper.getHadoopConfig(),
@@ -135,7 +141,7 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
         final Pool<OrcReaderBatch> pool = new Pool<>(numBatches);
 
         for (int i = 0; i < numBatches; i++) {
-            final VectorizedRowBatch orcBatch = createBatchWrapper(schema, 
batchSize);
+            final VectorizedRowBatch orcBatch = createBatchWrapper(schema, 
batchSize / numBatches);
             final OrcReaderBatch batch = createReaderBatch(orcBatch, 
pool.recycler());
             pool.add(batch);
         }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 22c023167..995770601 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -108,6 +108,12 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
         return new ParquetReader(reader, requestedSchema, 
reader.getRecordCount(), poolOfBatches);
     }
 
+    @Override
+    public RecordReader<InternalRow> createReader(FileIO fileIO, Path file, 
int poolSize)
+            throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
     private void setReadOptions(ParquetReadOptions.Builder builder) {
         builder.useSignedStringMinMax(
                 conf.getBoolean("parquet.strings.signed-min-max.enabled", 
false));

Reply via email to