This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7175bd8e8 [core] orc/parquet reader obtain the fileSize from metadata 
(#2918)
7175bd8e8 is described below

commit 7175bd8e8ac97d0561fdb2cf92352febdc3fad75
Author: wgcn <[email protected]>
AuthorDate: Tue Mar 19 14:29:03 2024 +0800

    [core] orc/parquet reader obtain the fileSize from metadata (#2918)
---
 ...ReaderFactory.java => FormatReaderContext.java} | 34 +++++++++++++++++-----
 .../apache/paimon/format/FormatReaderFactory.java  |  7 +++--
 .../paimon/io/KeyValueDataFileRecordReader.java    |  9 +++---
 .../paimon/io/KeyValueFileReaderFactory.java       | 10 ++++---
 .../apache/paimon/io/RowDataFileRecordReader.java  |  6 +++-
 .../paimon/operation/AppendOnlyFileStoreRead.java  |  1 +
 .../paimon/io/KeyValueFileReadWriteTest.java       | 27 +++++++++++++++++
 .../apache/paimon/format/avro/AvroBulkFormat.java  | 10 ++-----
 .../apache/paimon/format/orc/OrcReaderFactory.java | 16 +++++-----
 .../format/parquet/ParquetReaderFactory.java       | 14 ++++-----
 .../paimon/format/orc/OrcReaderFactoryTest.java    |  9 ++++--
 11 files changed, 98 insertions(+), 45 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java
similarity index 57%
copy from 
paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
copy to 
paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java
index b2b179159..b1ad3fa47 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java
@@ -18,19 +18,37 @@
 
 package org.apache.paimon.format;
 
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.reader.RecordReader;
 
-import java.io.IOException;
-import java.io.Serializable;
+/** the context for creating RecordReader {@link RecordReader}. */
+public class FormatReaderContext {
+    private final FileIO fileIO;
+    private final Path file;
+    private final Integer poolSize;
+    private final Long fileSize;
 
-/** A factory to create {@link RecordReader} for file. */
-public interface FormatReaderFactory extends Serializable {
+    public FormatReaderContext(FileIO fileIO, Path file, Integer poolSize, 
Long fileSize) {
+        this.fileIO = fileIO;
+        this.file = file;
+        this.poolSize = poolSize;
+        this.fileSize = fileSize;
+    }
 
-    RecordReader<InternalRow> createReader(FileIO fileIO, Path file) throws 
IOException;
+    public FileIO getFileIO() {
+        return fileIO;
+    }
 
-    RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int 
poolSize)
-            throws IOException;
+    public Path getFile() {
+        return file;
+    }
+
+    public Integer getPoolSize() {
+        return poolSize;
+    }
+
+    public Long getFileSize() {
+        return fileSize;
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
index b2b179159..f524ff4a1 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
@@ -29,8 +29,9 @@ import java.io.Serializable;
 /** A factory to create {@link RecordReader} for file. */
 public interface FormatReaderFactory extends Serializable {
 
-    RecordReader<InternalRow> createReader(FileIO fileIO, Path file) throws 
IOException;
+    default RecordReader<InternalRow> createReader(FileIO fileIO, Path file) 
throws IOException {
+        return createReader(new FormatReaderContext(fileIO, file, null, null));
+    }
 
-    RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int 
poolSize)
-            throws IOException;
+    RecordReader<InternalRow> createReader(FormatReaderContext context) throws 
IOException;
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index fe38ae146..4e7dfec9e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -26,6 +26,7 @@ import org.apache.paimon.casting.CastedRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.data.columnar.ColumnarRowIterator;
+import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -58,13 +59,13 @@ public class KeyValueDataFileRecordReader implements 
RecordReader<KeyValue> {
             @Nullable Integer poolSize,
             @Nullable int[] indexMapping,
             @Nullable CastFieldGetter[] castMapping,
-            @Nullable PartitionInfo partitionInfo)
+            @Nullable PartitionInfo partitionInfo,
+            long fileSize)
             throws IOException {
         FileUtils.checkExists(fileIO, path);
         this.reader =
-                poolSize == null
-                        ? readerFactory.createReader(fileIO, path)
-                        : readerFactory.createReader(fileIO, path, poolSize);
+                readerFactory.createReader(
+                        new FormatReaderContext(fileIO, path, poolSize, 
fileSize));
         this.serializer = new KeyValueSerializer(keyType, valueType);
         this.level = level;
         this.indexMapping = indexMapping;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index cc7534e9a..3123518c2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -93,9 +93,9 @@ public class KeyValueFileReaderFactory {
             long schemaId, String fileName, long fileSize, int level) throws 
IOException {
         if (fileSize >= asyncThreshold && fileName.endsWith("orc")) {
             return new AsyncRecordReader<>(
-                    () -> createRecordReader(schemaId, fileName, level, false, 
2));
+                    () -> createRecordReader(schemaId, fileName, level, false, 
2, fileSize));
         }
-        return createRecordReader(schemaId, fileName, level, true, null);
+        return createRecordReader(schemaId, fileName, level, true, null, 
fileSize);
     }
 
     private RecordReader<KeyValue> createRecordReader(
@@ -103,7 +103,8 @@ public class KeyValueFileReaderFactory {
             String fileName,
             int level,
             boolean reuseFormat,
-            @Nullable Integer poolSize)
+            @Nullable Integer poolSize,
+            long fileSize)
             throws IOException {
         String formatIdentifier = 
DataFilePathFactory.formatIdentifier(fileName);
 
@@ -131,7 +132,8 @@ public class KeyValueFileReaderFactory {
                         poolSize,
                         bulkFormatMapping.getIndexMapping(),
                         bulkFormatMapping.getCastMapping(),
-                        
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
+                        
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition),
+                        fileSize);
         Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
         if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
             recordReader = new ApplyDeletionVectorReader<>(recordReader, 
deletionVector.get());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java
index ee8f9c26f..b461ebf0b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java
@@ -24,6 +24,7 @@ import org.apache.paimon.casting.CastedRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.data.columnar.ColumnarRowIterator;
+import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -46,12 +47,15 @@ public class RowDataFileRecordReader implements 
RecordReader<InternalRow> {
     public RowDataFileRecordReader(
             FileIO fileIO,
             Path path,
+            long fileSize,
             FormatReaderFactory readerFactory,
             @Nullable int[] indexMapping,
             @Nullable CastFieldGetter[] castMapping,
             @Nullable PartitionInfo partitionInfo)
             throws IOException {
-        this.reader = FileUtils.createFormatReader(fileIO, readerFactory, 
path);
+        FileUtils.checkExists(fileIO, path);
+        FormatReaderContext context = new FormatReaderContext(fileIO, path, 
null, fileSize);
+        this.reader = readerFactory.createReader(context);
         this.indexMapping = indexMapping;
         this.partitionInfo = partitionInfo;
         this.castMapping = castMapping;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
index 8363c297a..b67edaaf3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
@@ -177,6 +177,7 @@ public class AppendOnlyFileStoreRead implements 
FileStoreRead<InternalRow> {
                             new RowDataFileRecordReader(
                                     fileIO,
                                     
dataFilePathFactory.toPath(file.fileName()),
+                                    file.fileSize(),
                                     bulkFormatMapping.getReaderFactory(),
                                     bulkFormatMapping.getIndexMapping(),
                                     bulkFormatMapping.getCastMapping(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index ca7f75d6e..552eac0a1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -48,6 +48,8 @@ import org.apache.paimon.utils.FileStorePathFactory;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -385,4 +387,29 @@ public class KeyValueFileReadWriteTest {
             assertThat(meta.level()).isEqualTo(expected.level());
         }
     }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"parquet", "orc", "avro"})
+    public void testReaderUseFileSizeFromMetadata(String format) throws 
Exception {
+        DataFileTestDataGenerator.Data data = gen.next();
+        KeyValueFileWriterFactory writerFactory = 
createWriterFactory(tempDir.toString(), format);
+        DataFileMetaSerializer serializer = new DataFileMetaSerializer();
+
+        RollingFileWriter<KeyValue, DataFileMeta> writer =
+                writerFactory.createRollingMergeTreeFileWriter(0);
+        writer.write(CloseableIterator.fromList(data.content, kv -> {}));
+        writer.close();
+        List<DataFileMeta> actualMetas = writer.result();
+
+        KeyValueFileReaderFactory readerFactory =
+                createReaderFactory(tempDir.toString(), format, null, null);
+        assertData(
+                data,
+                actualMetas,
+                TestKeyValueGenerator.KEY_SERIALIZER,
+                TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER,
+                serializer,
+                readerFactory,
+                kv -> kv);
+    }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
index da60eedda..717f99895 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.avro;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -49,14 +50,9 @@ public class AvroBulkFormat implements FormatReaderFactory {
     }
 
     @Override
-    public RecordReader<InternalRow> createReader(FileIO fileIO, Path file) 
throws IOException {
-        return new AvroReader(fileIO, file);
-    }
-
-    @Override
-    public RecordReader<InternalRow> createReader(FileIO fileIO, Path file, 
int poolSize)
+    public RecordReader<InternalRow> createReader(FormatReaderContext 
formatReaderContext)
             throws IOException {
-        throw new UnsupportedOperationException();
+        return new AvroReader(formatReaderContext.getFileIO(), 
formatReaderContext.getFile());
     }
 
     private class AvroReader implements RecordReader<InternalRow> {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 696665777..cdc46139f 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.columnar.ColumnVector;
 import org.apache.paimon.data.columnar.ColumnarRow;
 import org.apache.paimon.data.columnar.ColumnarRowIterator;
 import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem;
 import org.apache.paimon.format.orc.filter.OrcFilters;
@@ -88,14 +89,13 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
     // ------------------------------------------------------------------------
 
     @Override
-    public OrcVectorizedReader createReader(FileIO fileIO, Path file) throws 
IOException {
-        return createReader(fileIO, file, 1);
-    }
-
-    @Override
-    public OrcVectorizedReader createReader(FileIO fileIO, Path file, int 
poolSize)
-            throws IOException {
+    public OrcVectorizedReader createReader(FormatReaderContext context) 
throws IOException {
+        int poolSize = context.getPoolSize() == null ? 1 : 
context.getPoolSize();
         Pool<OrcReaderBatch> poolOfBatches = createPoolOfBatches(poolSize);
+
+        FileIO fileIO = context.getFileIO();
+        Long fileSize = context.getFileSize();
+        Path file = context.getFile();
         RecordReader orcReader =
                 createRecordReader(
                         hadoopConfigWrapper.getHadoopConfig(),
@@ -104,7 +104,7 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
                         fileIO,
                         file,
                         0,
-                        fileIO.getFileSize(file));
+                        fileSize == null ? fileIO.getFileSize(file) : 
fileSize);
         return new OrcVectorizedReader(orcReader, poolOfBatches);
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 2c2985d32..29cf45a65 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.columnar.ColumnarRow;
 import org.apache.paimon.data.columnar.ColumnarRowIterator;
 import org.apache.paimon.data.columnar.VectorizedColumnBatch;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.parquet.reader.ColumnReader;
 import org.apache.paimon.format.parquet.reader.ParquetDecimalVector;
@@ -87,9 +88,12 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
     }
 
     @Override
-    public ParquetReader createReader(FileIO fileIO, Path filePath) throws 
IOException {
+    public ParquetReader createReader(FormatReaderContext context) throws 
IOException {
+        Path filePath = context.getFile();
+        FileIO fileIO = context.getFileIO();
+        Long fileSize = context.getFileSize();
         final long splitOffset = 0;
-        final long splitLength = fileIO.getFileSize(filePath);
+        final long splitLength = fileSize == null ? 
fileIO.getFileSize(filePath) : fileSize;
 
         ParquetReadOptions.Builder builder =
                 ParquetReadOptions.builder().withRange(splitOffset, 
splitOffset + splitLength);
@@ -108,12 +112,6 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
         return new ParquetReader(reader, requestedSchema, 
reader.getRecordCount(), poolOfBatches);
     }
 
-    @Override
-    public RecordReader<InternalRow> createReader(FileIO fileIO, Path file, 
int poolSize)
-            throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
     private void setReadOptions(ParquetReadOptions.Builder builder) {
         builder.useSignedStringMinMax(
                 conf.getBoolean("parquet.strings.signed-min-max.enabled", 
false));
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
index a5160f5c7..5a0f4925d 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.orc;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.orc.filter.OrcFilters;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
@@ -179,8 +180,10 @@ class OrcReaderFactoryTest {
 
         AtomicBoolean isFirst = new AtomicBoolean(true);
 
+        LocalFileIO localFileIO = new LocalFileIO();
         try (RecordReader<InternalRow> reader =
-                format.createReader(new LocalFileIO(), flatFile, 
randomPooSize)) {
+                format.createReader(
+                        new FormatReaderContext(localFileIO, flatFile, 
randomPooSize, null))) {
             reader.forEachRemainingWithPosition(
                     (rowPosition, row) -> {
                         // check filter: _col0 > randomStart
@@ -202,8 +205,10 @@ class OrcReaderFactoryTest {
         int randomPooSize = new Random().nextInt(3) + 1;
         OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2, 
0, 1});
 
+        LocalFileIO localFileIO = new LocalFileIO();
         try (RecordReader<InternalRow> reader =
-                format.createReader(new LocalFileIO(), flatFile, 
randomPooSize)) {
+                format.createReader(
+                        new FormatReaderContext(localFileIO, flatFile, 
randomPooSize, null))) {
             reader.transform(row -> row)
                     .filter(row -> row.getInt(1) % 123 == 0)
                     .forEachRemainingWithPosition(

Reply via email to