This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bbba01714 [core] Make FormatReaderFactory return FileRecordReader to 
reduce cast (#4512)
bbba01714 is described below

commit bbba0171423983714c106e4e62a050f757ecdc12
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 13 15:43:16 2024 +0800

    [core] Make FormatReaderFactory return FileRecordReader to reduce cast 
(#4512)
---
 .../bitmap/ApplyBitmapIndexRecordReader.java       | 20 ++++-----
 .../apache/paimon/format/FormatReaderFactory.java  |  3 +-
 .../EmptyFileRecordReader.java}                    | 31 +++++---------
 .../apache/paimon/reader/FileRecordIterator.java   |  4 +-
 .../FileRecordReader.java}                         | 28 ++++---------
 .../deletionvectors/ApplyDeletionVectorReader.java | 20 ++++-----
 ...RecordReader.java => DataFileRecordReader.java} | 47 +++++++++++-----------
 .../paimon/io/KeyValueDataFileRecordReader.java    | 12 +++---
 .../paimon/io/KeyValueFileReaderFactory.java       |  7 ++--
 .../apache/paimon/operation/RawFileSplitRead.java  | 13 +++---
 .../CompactedChangelogFormatReaderFactory.java     |  4 +-
 .../apache/paimon/format/avro/AvroBulkFormat.java  |  8 ++--
 .../apache/paimon/format/orc/OrcReaderFactory.java |  8 ++--
 .../format/parquet/ParquetReaderFactory.java       |  9 ++---
 14 files changed, 91 insertions(+), 123 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReader.java
index d5d15095f..3b1207c8b 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/ApplyBitmapIndexRecordReader.java
@@ -20,41 +20,35 @@ package org.apache.paimon.fileindex.bitmap;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.reader.RecordReader;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
 /** A {@link RecordReader} which apply {@link BitmapIndexResult} to filter 
record. */
-public class ApplyBitmapIndexRecordReader implements RecordReader<InternalRow> 
{
+public class ApplyBitmapIndexRecordReader implements 
FileRecordReader<InternalRow> {
 
-    private final RecordReader<InternalRow> reader;
+    private final FileRecordReader<InternalRow> reader;
 
     private final BitmapIndexResult fileIndexResult;
 
     public ApplyBitmapIndexRecordReader(
-            RecordReader<InternalRow> reader, BitmapIndexResult 
fileIndexResult) {
+            FileRecordReader<InternalRow> reader, BitmapIndexResult 
fileIndexResult) {
         this.reader = reader;
         this.fileIndexResult = fileIndexResult;
     }
 
     @Nullable
     @Override
-    public RecordIterator<InternalRow> readBatch() throws IOException {
-        RecordIterator<InternalRow> batch = reader.readBatch();
+    public FileRecordIterator<InternalRow> readBatch() throws IOException {
+        FileRecordIterator<InternalRow> batch = reader.readBatch();
         if (batch == null) {
             return null;
         }
 
-        checkArgument(
-                batch instanceof FileRecordIterator,
-                "There is a bug, RecordIterator in 
ApplyBitmapIndexRecordReader must be FileRecordIterator");
-
-        return new ApplyBitmapIndexFileRecordIterator(
-                (FileRecordIterator<InternalRow>) batch, fileIndexResult);
+        return new ApplyBitmapIndexFileRecordIterator(batch, fileIndexResult);
     }
 
     @Override
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
index 420d44e0f..d8af3e2fe 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fileindex.FileIndexResult;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.reader.RecordReader;
 
 import java.io.IOException;
@@ -29,7 +30,7 @@ import java.io.IOException;
 /** A factory to create {@link RecordReader} for file. */
 public interface FormatReaderFactory {
 
-    RecordReader<InternalRow> createReader(Context context) throws IOException;
+    FileRecordReader<InternalRow> createReader(Context context) throws 
IOException;
 
     /** Context for creating reader. */
     interface Context {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/EmptyFileRecordReader.java
similarity index 58%
copy from 
paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
copy to 
paimon-common/src/main/java/org/apache/paimon/reader/EmptyFileRecordReader.java
index 420d44e0f..3fa25dce5 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/EmptyFileRecordReader.java
@@ -16,30 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.format;
+package org.apache.paimon.reader;
 
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fileindex.FileIndexResult;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.reader.RecordReader;
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 
-/** A factory to create {@link RecordReader} for file. */
-public interface FormatReaderFactory {
+/** An empty {@link FileRecordReader}. */
+public class EmptyFileRecordReader<T> implements FileRecordReader<T> {
 
-    RecordReader<InternalRow> createReader(Context context) throws IOException;
-
-    /** Context for creating reader. */
-    interface Context {
-
-        FileIO fileIO();
-
-        Path filePath();
-
-        long fileSize();
-
-        FileIndexResult fileIndex();
+    @Nullable
+    @Override
+    public FileRecordIterator<T> readBatch() throws IOException {
+        return null;
     }
+
+    @Override
+    public void close() throws IOException {}
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
index d22b27053..2d3c85f19 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java
@@ -27,10 +27,8 @@ import java.io.IOException;
 import java.util.function.Function;
 
 /**
- * Wrap {@link RecordReader.RecordIterator} to support returning the record's 
row position and file
+ * A {@link RecordReader.RecordIterator} to support returning the record's row 
position and file
  * Path.
- *
- * @param <T> The type of the record.
  */
 public interface FileRecordIterator<T> extends RecordReader.RecordIterator<T> {
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordReader.java
similarity index 58%
copy from 
paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
copy to 
paimon-common/src/main/java/org/apache/paimon/reader/FileRecordReader.java
index 420d44e0f..4d5356edf 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordReader.java
@@ -16,30 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.format;
+package org.apache.paimon.reader;
 
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fileindex.FileIndexResult;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.reader.RecordReader;
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 
-/** A factory to create {@link RecordReader} for file. */
-public interface FormatReaderFactory {
+/** A {@link RecordReader} to support returning {@link FileRecordIterator}. */
+public interface FileRecordReader<T> extends RecordReader<T> {
 
-    RecordReader<InternalRow> createReader(Context context) throws IOException;
-
-    /** Context for creating reader. */
-    interface Context {
-
-        FileIO fileIO();
-
-        Path filePath();
-
-        long fileSize();
-
-        FileIndexResult fileIndex();
-    }
+    @Override
+    @Nullable
+    FileRecordIterator<T> readBatch() throws IOException;
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
index c1dc16a78..2fc292e54 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java
@@ -20,23 +20,22 @@ package org.apache.paimon.deletionvectors;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.reader.RecordReader;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
 
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
 /** A {@link RecordReader} which apply {@link DeletionVector} to filter 
record. */
-public class ApplyDeletionVectorReader implements RecordReader<InternalRow> {
+public class ApplyDeletionVectorReader implements 
FileRecordReader<InternalRow> {
 
-    private final RecordReader<InternalRow> reader;
+    private final FileRecordReader<InternalRow> reader;
 
     private final DeletionVector deletionVector;
 
     public ApplyDeletionVectorReader(
-            RecordReader<InternalRow> reader, DeletionVector deletionVector) {
+            FileRecordReader<InternalRow> reader, DeletionVector 
deletionVector) {
         this.reader = reader;
         this.deletionVector = deletionVector;
     }
@@ -51,19 +50,14 @@ public class ApplyDeletionVectorReader implements 
RecordReader<InternalRow> {
 
     @Nullable
     @Override
-    public RecordIterator<InternalRow> readBatch() throws IOException {
-        RecordIterator<InternalRow> batch = reader.readBatch();
+    public FileRecordIterator<InternalRow> readBatch() throws IOException {
+        FileRecordIterator<InternalRow> batch = reader.readBatch();
 
         if (batch == null) {
             return null;
         }
 
-        checkArgument(
-                batch instanceof FileRecordIterator,
-                "There is a bug, RecordIterator in ApplyDeletionVectorReader 
must be FileRecordIterator");
-
-        return new ApplyDeletionFileRecordIterator(
-                (FileRecordIterator<InternalRow>) batch, deletionVector);
+        return new ApplyDeletionFileRecordIterator(batch, deletionVector);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
similarity index 88%
rename from paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
rename to 
paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
index 1e12025ba..d2559fe62 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
@@ -25,7 +25,8 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.data.columnar.ColumnarRowIterator;
 import org.apache.paimon.format.FormatReaderFactory;
-import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.ProjectedRow;
 
@@ -34,17 +35,35 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 
 /** Reads {@link InternalRow} from data files. */
-public class FileRecordReader implements RecordReader<InternalRow> {
+public class DataFileRecordReader implements FileRecordReader<InternalRow> {
 
-    private final RecordReader<InternalRow> reader;
+    private final FileRecordReader<InternalRow> reader;
     @Nullable private final int[] indexMapping;
     @Nullable private final PartitionInfo partitionInfo;
     @Nullable private final CastFieldGetter[] castMapping;
 
+    public DataFileRecordReader(
+            FormatReaderFactory readerFactory,
+            FormatReaderFactory.Context context,
+            @Nullable int[] indexMapping,
+            @Nullable CastFieldGetter[] castMapping,
+            @Nullable PartitionInfo partitionInfo)
+            throws IOException {
+        try {
+            this.reader = readerFactory.createReader(context);
+        } catch (Exception e) {
+            FileUtils.checkExists(context.fileIO(), context.filePath());
+            throw e;
+        }
+        this.indexMapping = indexMapping;
+        this.partitionInfo = partitionInfo;
+        this.castMapping = castMapping;
+    }
+
     @Nullable
     @Override
-    public RecordReader.RecordIterator<InternalRow> readBatch() throws 
IOException {
-        RecordIterator<InternalRow> iterator = reader.readBatch();
+    public FileRecordIterator<InternalRow> readBatch() throws IOException {
+        FileRecordIterator<InternalRow> iterator = reader.readBatch();
         if (iterator == null) {
             return null;
         }
@@ -71,24 +90,6 @@ public class FileRecordReader implements 
RecordReader<InternalRow> {
         return iterator;
     }
 
-    public FileRecordReader(
-            FormatReaderFactory readerFactory,
-            FormatReaderFactory.Context context,
-            @Nullable int[] indexMapping,
-            @Nullable CastFieldGetter[] castMapping,
-            @Nullable PartitionInfo partitionInfo)
-            throws IOException {
-        try {
-            this.reader = readerFactory.createReader(context);
-        } catch (Exception e) {
-            FileUtils.checkExists(context.fileIO(), context.filePath());
-            throw e;
-        }
-        this.indexMapping = indexMapping;
-        this.partitionInfo = partitionInfo;
-        this.castMapping = castMapping;
-    }
-
     @Override
     public void close() throws IOException {
         reader.close();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index e44ad79ff..6cf087697 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -21,6 +21,8 @@ package org.apache.paimon.io;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueSerializer;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.reader.FileRecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.types.RowType;
 
@@ -29,14 +31,14 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 
 /** {@link RecordReader} for reading {@link KeyValue} data files. */
-public class KeyValueDataFileRecordReader implements RecordReader<KeyValue> {
+public class KeyValueDataFileRecordReader implements 
FileRecordReader<KeyValue> {
 
-    private final RecordReader<InternalRow> reader;
+    private final FileRecordReader<InternalRow> reader;
     private final KeyValueSerializer serializer;
     private final int level;
 
     public KeyValueDataFileRecordReader(
-            RecordReader<InternalRow> reader, RowType keyType, RowType 
valueType, int level) {
+            FileRecordReader<InternalRow> reader, RowType keyType, RowType 
valueType, int level) {
         this.reader = reader;
         this.serializer = new KeyValueSerializer(keyType, valueType);
         this.level = level;
@@ -44,8 +46,8 @@ public class KeyValueDataFileRecordReader implements 
RecordReader<KeyValue> {
 
     @Nullable
     @Override
-    public RecordIterator<KeyValue> readBatch() throws IOException {
-        RecordReader.RecordIterator<InternalRow> iterator = reader.readBatch();
+    public FileRecordIterator<KeyValue> readBatch() throws IOException {
+        FileRecordIterator<InternalRow> iterator = reader.readBatch();
         if (iterator == null) {
             return null;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index fdbb727e5..7d3acd729 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -32,6 +32,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.partition.PartitionUtils;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
@@ -109,7 +110,7 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
         return createRecordReader(schemaId, fileName, level, true, null, 
fileSize);
     }
 
-    private RecordReader<KeyValue> createRecordReader(
+    private FileRecordReader<KeyValue> createRecordReader(
             long schemaId,
             String fileName,
             int level,
@@ -134,8 +135,8 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                         : formatSupplier.get();
         Path filePath = pathFactory.toPath(fileName);
 
-        RecordReader<InternalRow> fileRecordReader =
-                new FileRecordReader(
+        FileRecordReader<InternalRow> fileRecordReader =
+                new DataFileRecordReader(
                         bulkFormatMapping.getReaderFactory(),
                         orcPoolSize == null
                                 ? new FormatReaderContext(fileIO, filePath, 
fileSize)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 4a6fa5b3d..46977457c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -32,12 +32,13 @@ import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataFileRecordReader;
 import org.apache.paimon.io.FileIndexEvaluator;
-import org.apache.paimon.io.FileRecordReader;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.partition.PartitionUtils;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.reader.EmptyRecordReader;
+import org.apache.paimon.reader.EmptyFileRecordReader;
+import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.reader.ReaderSupplier;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.SchemaManager;
@@ -187,7 +188,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
         return ConcatRecordReader.create(suppliers);
     }
 
-    private RecordReader<InternalRow> createFileReader(
+    private FileRecordReader<InternalRow> createFileReader(
             BinaryRow partition,
             DataFileMeta file,
             DataFilePathFactory dataFilePathFactory,
@@ -204,7 +205,7 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
                             dataFilePathFactory,
                             file);
             if (!fileIndexResult.remain()) {
-                return new EmptyRecordReader<>();
+                return new EmptyFileRecordReader<>();
             }
         }
 
@@ -214,8 +215,8 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
                         dataFilePathFactory.toPath(file.fileName()),
                         file.fileSize(),
                         fileIndexResult);
-        RecordReader<InternalRow> fileRecordReader =
-                new FileRecordReader(
+        FileRecordReader<InternalRow> fileRecordReader =
+                new DataFileRecordReader(
                         bulkFormatMapping.getReaderFactory(),
                         formatReaderContext,
                         bulkFormatMapping.getIndexMapping(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
index e17566f30..e0aed448d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java
@@ -27,7 +27,7 @@ import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.SeekableInputStream;
-import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.FileRecordReader;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -60,7 +60,7 @@ public class CompactedChangelogFormatReaderFactory implements 
FormatReaderFactor
     }
 
     @Override
-    public RecordReader<InternalRow> createReader(Context context) throws 
IOException {
+    public FileRecordReader<InternalRow> createReader(Context context) throws 
IOException {
         OffsetReadOnlyFileIO fileIO = new 
OffsetReadOnlyFileIO(context.fileIO());
         long length = decodePath(context.filePath()).length;
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
index 7f3e27518..a06ca9948 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
@@ -22,7 +22,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.IteratorResultIterator;
@@ -49,12 +49,12 @@ public class AvroBulkFormat implements FormatReaderFactory {
     }
 
     @Override
-    public RecordReader<InternalRow> createReader(FormatReaderFactory.Context 
context)
+    public FileRecordReader<InternalRow> 
createReader(FormatReaderFactory.Context context)
             throws IOException {
         return new AvroReader(context.fileIO(), context.filePath(), 
context.fileSize());
     }
 
-    private class AvroReader implements RecordReader<InternalRow> {
+    private class AvroReader implements FileRecordReader<InternalRow> {
 
         private final FileIO fileIO;
         private final DataFileReader<InternalRow> reader;
@@ -90,7 +90,7 @@ public class AvroBulkFormat implements FormatReaderFactory {
 
         @Nullable
         @Override
-        public RecordIterator<InternalRow> readBatch() throws IOException {
+        public IteratorResultIterator readBatch() throws IOException {
             Object ticket;
             try {
                 ticket = pool.pollEntry();
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index dbc5de265..05f3dd785 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -30,6 +30,7 @@ import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem;
 import org.apache.paimon.format.orc.filter.OrcFilters;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.reader.RecordReader.RecordIterator;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
@@ -184,7 +185,7 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
             return orcVectorizedRowBatch;
         }
 
-        private RecordIterator<InternalRow> convertAndGetIterator(
+        private ColumnarRowIterator convertAndGetIterator(
                 VectorizedRowBatch orcBatch, long rowNumber) {
             // no copying from the ORC column vectors to the Paimon columns 
vectors necessary,
             // because they point to the same data arrays internally design
@@ -209,8 +210,7 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
      * batch is addressed by the starting row number of the batch, plus the 
number of records to be
      * skipped before.
      */
-    private static final class OrcVectorizedReader
-            implements org.apache.paimon.reader.RecordReader<InternalRow> {
+    private static final class OrcVectorizedReader implements 
FileRecordReader<InternalRow> {
 
         private final RecordReader orcReader;
         private final Pool<OrcReaderBatch> pool;
@@ -222,7 +222,7 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
 
         @Nullable
         @Override
-        public RecordIterator<InternalRow> readBatch() throws IOException {
+        public ColumnarRowIterator readBatch() throws IOException {
             final OrcReaderBatch batch = getCachedEntry();
             final VectorizedRowBatch orcVectorBatch = 
batch.orcVectorizedRowBatch();
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 2a62c0bc8..2e792d153 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -31,8 +31,7 @@ import 
org.apache.paimon.format.parquet.reader.ParquetTimestampVector;
 import org.apache.paimon.format.parquet.type.ParquetField;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.reader.RecordReader.RecordIterator;
+import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
@@ -307,7 +306,7 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
         return new VectorizedColumnBatch(vectors);
     }
 
-    private class ParquetReader implements RecordReader<InternalRow> {
+    private class ParquetReader implements FileRecordReader<InternalRow> {
 
         private ParquetFileReader reader;
 
@@ -360,7 +359,7 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
 
         @Nullable
         @Override
-        public RecordIterator<InternalRow> readBatch() throws IOException {
+        public ColumnarRowIterator readBatch() throws IOException {
             final ParquetReaderBatch batch = getCachedEntry();
 
             if (!nextBatch(batch)) {
@@ -488,7 +487,7 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
             recycler.recycle(this);
         }
 
-        public RecordIterator<InternalRow> convertAndGetIterator(long 
rowNumber) {
+        public ColumnarRowIterator convertAndGetIterator(long rowNumber) {
             result.reset(rowNumber);
             return result;
         }

Reply via email to