This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7175bd8e8 [core] orc/parquet reader obtain the fileSize from metadata
(#2918)
7175bd8e8 is described below
commit 7175bd8e8ac97d0561fdb2cf92352febdc3fad75
Author: wgcn <[email protected]>
AuthorDate: Tue Mar 19 14:29:03 2024 +0800
[core] orc/parquet reader obtain the fileSize from metadata (#2918)
---
...ReaderFactory.java => FormatReaderContext.java} | 34 +++++++++++++++++-----
.../apache/paimon/format/FormatReaderFactory.java | 7 +++--
.../paimon/io/KeyValueDataFileRecordReader.java | 9 +++---
.../paimon/io/KeyValueFileReaderFactory.java | 10 ++++---
.../apache/paimon/io/RowDataFileRecordReader.java | 6 +++-
.../paimon/operation/AppendOnlyFileStoreRead.java | 1 +
.../paimon/io/KeyValueFileReadWriteTest.java | 27 +++++++++++++++++
.../apache/paimon/format/avro/AvroBulkFormat.java | 10 ++-----
.../apache/paimon/format/orc/OrcReaderFactory.java | 16 +++++-----
.../format/parquet/ParquetReaderFactory.java | 14 ++++-----
.../paimon/format/orc/OrcReaderFactoryTest.java | 9 ++++--
11 files changed, 98 insertions(+), 45 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java
similarity index 57%
copy from
paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
copy to
paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java
index b2b179159..b1ad3fa47 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java
@@ -18,19 +18,37 @@
package org.apache.paimon.format;
-import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;
-import java.io.IOException;
-import java.io.Serializable;
+/** the context for creating RecordReader {@link RecordReader}. */
+public class FormatReaderContext {
+ private final FileIO fileIO;
+ private final Path file;
+ private final Integer poolSize;
+ private final Long fileSize;
-/** A factory to create {@link RecordReader} for file. */
-public interface FormatReaderFactory extends Serializable {
+ public FormatReaderContext(FileIO fileIO, Path file, Integer poolSize,
Long fileSize) {
+ this.fileIO = fileIO;
+ this.file = file;
+ this.poolSize = poolSize;
+ this.fileSize = fileSize;
+ }
- RecordReader<InternalRow> createReader(FileIO fileIO, Path file) throws
IOException;
+ public FileIO getFileIO() {
+ return fileIO;
+ }
- RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int
poolSize)
- throws IOException;
+ public Path getFile() {
+ return file;
+ }
+
+ public Integer getPoolSize() {
+ return poolSize;
+ }
+
+ public Long getFileSize() {
+ return fileSize;
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
index b2b179159..f524ff4a1 100644
---
a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java
@@ -29,8 +29,9 @@ import java.io.Serializable;
/** A factory to create {@link RecordReader} for file. */
public interface FormatReaderFactory extends Serializable {
- RecordReader<InternalRow> createReader(FileIO fileIO, Path file) throws
IOException;
+ default RecordReader<InternalRow> createReader(FileIO fileIO, Path file)
throws IOException {
+ return createReader(new FormatReaderContext(fileIO, file, null, null));
+ }
- RecordReader<InternalRow> createReader(FileIO fileIO, Path file, int
poolSize)
- throws IOException;
+ RecordReader<InternalRow> createReader(FormatReaderContext context) throws
IOException;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index fe38ae146..4e7dfec9e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -26,6 +26,7 @@ import org.apache.paimon.casting.CastedRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
+import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -58,13 +59,13 @@ public class KeyValueDataFileRecordReader implements
RecordReader<KeyValue> {
@Nullable Integer poolSize,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
- @Nullable PartitionInfo partitionInfo)
+ @Nullable PartitionInfo partitionInfo,
+ long fileSize)
throws IOException {
FileUtils.checkExists(fileIO, path);
this.reader =
- poolSize == null
- ? readerFactory.createReader(fileIO, path)
- : readerFactory.createReader(fileIO, path, poolSize);
+ readerFactory.createReader(
+ new FormatReaderContext(fileIO, path, poolSize,
fileSize));
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
this.indexMapping = indexMapping;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index cc7534e9a..3123518c2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -93,9 +93,9 @@ public class KeyValueFileReaderFactory {
long schemaId, String fileName, long fileSize, int level) throws
IOException {
if (fileSize >= asyncThreshold && fileName.endsWith("orc")) {
return new AsyncRecordReader<>(
- () -> createRecordReader(schemaId, fileName, level, false,
2));
+ () -> createRecordReader(schemaId, fileName, level, false,
2, fileSize));
}
- return createRecordReader(schemaId, fileName, level, true, null);
+ return createRecordReader(schemaId, fileName, level, true, null,
fileSize);
}
private RecordReader<KeyValue> createRecordReader(
@@ -103,7 +103,8 @@ public class KeyValueFileReaderFactory {
String fileName,
int level,
boolean reuseFormat,
- @Nullable Integer poolSize)
+ @Nullable Integer poolSize,
+ long fileSize)
throws IOException {
String formatIdentifier =
DataFilePathFactory.formatIdentifier(fileName);
@@ -131,7 +132,8 @@ public class KeyValueFileReaderFactory {
poolSize,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
-
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
+
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition),
+ fileSize);
Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
recordReader = new ApplyDeletionVectorReader<>(recordReader,
deletionVector.get());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java
index ee8f9c26f..b461ebf0b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java
@@ -24,6 +24,7 @@ import org.apache.paimon.casting.CastedRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
+import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -46,12 +47,15 @@ public class RowDataFileRecordReader implements
RecordReader<InternalRow> {
public RowDataFileRecordReader(
FileIO fileIO,
Path path,
+ long fileSize,
FormatReaderFactory readerFactory,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo)
throws IOException {
- this.reader = FileUtils.createFormatReader(fileIO, readerFactory,
path);
+ FileUtils.checkExists(fileIO, path);
+ FormatReaderContext context = new FormatReaderContext(fileIO, path,
null, fileSize);
+ this.reader = readerFactory.createReader(context);
this.indexMapping = indexMapping;
this.partitionInfo = partitionInfo;
this.castMapping = castMapping;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
index 8363c297a..b67edaaf3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
@@ -177,6 +177,7 @@ public class AppendOnlyFileStoreRead implements
FileStoreRead<InternalRow> {
new RowDataFileRecordReader(
fileIO,
dataFilePathFactory.toPath(file.fileName()),
+ file.fileSize(),
bulkFormatMapping.getReaderFactory(),
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index ca7f75d6e..552eac0a1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -48,6 +48,8 @@ import org.apache.paimon.utils.FileStorePathFactory;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.HashMap;
@@ -385,4 +387,29 @@ public class KeyValueFileReadWriteTest {
assertThat(meta.level()).isEqualTo(expected.level());
}
}
+
+ @ParameterizedTest
+ @ValueSource(strings = {"parquet", "orc", "avro"})
+ public void testReaderUseFileSizeFromMetadata(String format) throws
Exception {
+ DataFileTestDataGenerator.Data data = gen.next();
+ KeyValueFileWriterFactory writerFactory =
createWriterFactory(tempDir.toString(), format);
+ DataFileMetaSerializer serializer = new DataFileMetaSerializer();
+
+ RollingFileWriter<KeyValue, DataFileMeta> writer =
+ writerFactory.createRollingMergeTreeFileWriter(0);
+ writer.write(CloseableIterator.fromList(data.content, kv -> {}));
+ writer.close();
+ List<DataFileMeta> actualMetas = writer.result();
+
+ KeyValueFileReaderFactory readerFactory =
+ createReaderFactory(tempDir.toString(), format, null, null);
+ assertData(
+ data,
+ actualMetas,
+ TestKeyValueGenerator.KEY_SERIALIZER,
+ TestKeyValueGenerator.DEFAULT_ROW_SERIALIZER,
+ serializer,
+ readerFactory,
+ kv -> kv);
+ }
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
index da60eedda..717f99895 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.avro;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -49,14 +50,9 @@ public class AvroBulkFormat implements FormatReaderFactory {
}
@Override
- public RecordReader<InternalRow> createReader(FileIO fileIO, Path file)
throws IOException {
- return new AvroReader(fileIO, file);
- }
-
- @Override
- public RecordReader<InternalRow> createReader(FileIO fileIO, Path file,
int poolSize)
+ public RecordReader<InternalRow> createReader(FormatReaderContext
formatReaderContext)
throws IOException {
- throw new UnsupportedOperationException();
+ return new AvroReader(formatReaderContext.getFileIO(),
formatReaderContext.getFile());
}
private class AvroReader implements RecordReader<InternalRow> {
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index 696665777..cdc46139f 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem;
import org.apache.paimon.format.orc.filter.OrcFilters;
@@ -88,14 +89,13 @@ public class OrcReaderFactory implements
FormatReaderFactory {
// ------------------------------------------------------------------------
@Override
- public OrcVectorizedReader createReader(FileIO fileIO, Path file) throws
IOException {
- return createReader(fileIO, file, 1);
- }
-
- @Override
- public OrcVectorizedReader createReader(FileIO fileIO, Path file, int
poolSize)
- throws IOException {
+ public OrcVectorizedReader createReader(FormatReaderContext context)
throws IOException {
+ int poolSize = context.getPoolSize() == null ? 1 :
context.getPoolSize();
Pool<OrcReaderBatch> poolOfBatches = createPoolOfBatches(poolSize);
+
+ FileIO fileIO = context.getFileIO();
+ Long fileSize = context.getFileSize();
+ Path file = context.getFile();
RecordReader orcReader =
createRecordReader(
hadoopConfigWrapper.getHadoopConfig(),
@@ -104,7 +104,7 @@ public class OrcReaderFactory implements
FormatReaderFactory {
fileIO,
file,
0,
- fileIO.getFileSize(file));
+ fileSize == null ? fileIO.getFileSize(file) :
fileSize);
return new OrcVectorizedReader(orcReader, poolOfBatches);
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 2c2985d32..29cf45a65 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.reader.ColumnReader;
import org.apache.paimon.format.parquet.reader.ParquetDecimalVector;
@@ -87,9 +88,12 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
}
@Override
- public ParquetReader createReader(FileIO fileIO, Path filePath) throws
IOException {
+ public ParquetReader createReader(FormatReaderContext context) throws
IOException {
+ Path filePath = context.getFile();
+ FileIO fileIO = context.getFileIO();
+ Long fileSize = context.getFileSize();
final long splitOffset = 0;
- final long splitLength = fileIO.getFileSize(filePath);
+ final long splitLength = fileSize == null ?
fileIO.getFileSize(filePath) : fileSize;
ParquetReadOptions.Builder builder =
ParquetReadOptions.builder().withRange(splitOffset,
splitOffset + splitLength);
@@ -108,12 +112,6 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
return new ParquetReader(reader, requestedSchema,
reader.getRecordCount(), poolOfBatches);
}
- @Override
- public RecordReader<InternalRow> createReader(FileIO fileIO, Path file,
int poolSize)
- throws IOException {
- throw new UnsupportedOperationException();
- }
-
private void setReadOptions(ParquetReadOptions.Builder builder) {
builder.useSignedStringMinMax(
conf.getBoolean("parquet.strings.signed-min-max.enabled",
false));
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
index a5160f5c7..5a0f4925d 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcReaderFactoryTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.orc;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.orc.filter.OrcFilters;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
@@ -179,8 +180,10 @@ class OrcReaderFactoryTest {
AtomicBoolean isFirst = new AtomicBoolean(true);
+ LocalFileIO localFileIO = new LocalFileIO();
try (RecordReader<InternalRow> reader =
- format.createReader(new LocalFileIO(), flatFile,
randomPooSize)) {
+ format.createReader(
+ new FormatReaderContext(localFileIO, flatFile,
randomPooSize, null))) {
reader.forEachRemainingWithPosition(
(rowPosition, row) -> {
// check filter: _col0 > randomStart
@@ -202,8 +205,10 @@ class OrcReaderFactoryTest {
int randomPooSize = new Random().nextInt(3) + 1;
OrcReaderFactory format = createFormat(FLAT_FILE_TYPE, new int[] {2,
0, 1});
+ LocalFileIO localFileIO = new LocalFileIO();
try (RecordReader<InternalRow> reader =
- format.createReader(new LocalFileIO(), flatFile,
randomPooSize)) {
+ format.createReader(
+ new FormatReaderContext(localFileIO, flatFile,
randomPooSize, null))) {
reader.transform(row -> row)
.filter(row -> row.getInt(1) % 123 == 0)
.forEachRemainingWithPosition(