This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 466de5a26 [core] Unify FileRecordReader and reduce file access (#3098)
466de5a26 is described below
commit 466de5a261792e658bf0520f1344dd4254bd88e3
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 26 20:28:47 2024 +0800
[core] Unify FileRecordReader and reduce file access (#3098)
---
...FileRecordReader.java => FileRecordReader.java} | 20 +++++-----
.../paimon/io/KeyValueDataFileRecordReader.java | 44 +---------------------
.../paimon/io/KeyValueFileReaderFactory.java | 16 ++++----
.../paimon/operation/AppendOnlyFileStoreRead.java | 12 +++---
.../java/org/apache/paimon/utils/FileUtils.java | 12 ++++--
.../paimon/table/AppendOnlyFileStoreTableTest.java | 23 +++++++++++
6 files changed, 58 insertions(+), 69 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
similarity index 86%
rename from
paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java
rename to paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
index ed891a32b..0eac2961a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
@@ -24,10 +24,7 @@ import org.apache.paimon.casting.CastedRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
-import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatReaderFactory;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.ProjectedRow;
@@ -37,25 +34,26 @@ import javax.annotation.Nullable;
import java.io.IOException;
/** Reads {@link InternalRow} from data files. */
-public class RowDataFileRecordReader implements RecordReader<InternalRow> {
+public class FileRecordReader implements RecordReader<InternalRow> {
private final RecordReader<InternalRow> reader;
@Nullable private final int[] indexMapping;
@Nullable private final PartitionInfo partitionInfo;
@Nullable private final CastFieldGetter[] castMapping;
- public RowDataFileRecordReader(
- FileIO fileIO,
- Path path,
- long fileSize,
+ public FileRecordReader(
FormatReaderFactory readerFactory,
+ FormatReaderFactory.Context context,
@Nullable int[] indexMapping,
@Nullable CastFieldGetter[] castMapping,
@Nullable PartitionInfo partitionInfo)
throws IOException {
- FileUtils.checkExists(fileIO, path);
- FormatReaderContext context = new FormatReaderContext(fileIO, path,
fileSize);
- this.reader = readerFactory.createReader(context);
+ try {
+ this.reader = readerFactory.createReader(context);
+ } catch (Exception e) {
+ FileUtils.checkExists(context.fileIO(), context.filePath());
+ throw e;
+ }
this.indexMapping = indexMapping;
this.partitionInfo = partitionInfo;
this.castMapping = castMapping;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index 92be0ff68..e44ad79ff 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -20,17 +20,9 @@ package org.apache.paimon.io;
import org.apache.paimon.KeyValue;
import org.apache.paimon.KeyValueSerializer;
-import org.apache.paimon.PartitionSettedRow;
-import org.apache.paimon.casting.CastFieldGetter;
-import org.apache.paimon.casting.CastedRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.PartitionInfo;
-import org.apache.paimon.data.columnar.ColumnarRowIterator;
-import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.FileUtils;
-import org.apache.paimon.utils.ProjectedRow;
import javax.annotation.Nullable;
@@ -42,27 +34,12 @@ public class KeyValueDataFileRecordReader implements
RecordReader<KeyValue> {
private final RecordReader<InternalRow> reader;
private final KeyValueSerializer serializer;
private final int level;
- @Nullable private final int[] indexMapping;
- @Nullable private final PartitionInfo partitionInfo;
- @Nullable private final CastFieldGetter[] castMapping;
public KeyValueDataFileRecordReader(
- FormatReaderFactory readerFactory,
- FormatReaderFactory.Context context,
- RowType keyType,
- RowType valueType,
- int level,
- @Nullable int[] indexMapping,
- @Nullable CastFieldGetter[] castMapping,
- @Nullable PartitionInfo partitionInfo)
- throws IOException {
- FileUtils.checkExists(context.fileIO(), context.filePath());
- this.reader = readerFactory.createReader(context);
+ RecordReader<InternalRow> reader, RowType keyType, RowType
valueType, int level) {
+ this.reader = reader;
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
- this.indexMapping = indexMapping;
- this.partitionInfo = partitionInfo;
- this.castMapping = castMapping;
}
@Nullable
@@ -73,23 +50,6 @@ public class KeyValueDataFileRecordReader implements
RecordReader<KeyValue> {
return null;
}
- if (iterator instanceof ColumnarRowIterator) {
- iterator = ((ColumnarRowIterator) iterator).mapping(partitionInfo,
indexMapping);
- } else {
- if (partitionInfo != null) {
- final PartitionSettedRow partitionSettedRow =
- PartitionSettedRow.from(partitionInfo);
- iterator = iterator.transform(partitionSettedRow::replaceRow);
- }
- if (indexMapping != null) {
- final ProjectedRow projectedRow =
ProjectedRow.from(indexMapping);
- iterator = iterator.transform(projectedRow::replaceRow);
- }
- }
- if (castMapping != null) {
- final CastedRow castedRow = CastedRow.from(castMapping);
- iterator = iterator.transform(castedRow::replaceRow);
- }
return iterator.transform(
internalRow ->
internalRow == null
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 184857b45..27ddd1ff6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -21,6 +21,7 @@ package org.apache.paimon.io;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.format.FileFormatDiscover;
@@ -125,24 +126,25 @@ public class KeyValueFileReaderFactory {
key -> formatSupplier.get())
: formatSupplier.get();
Path filePath = pathFactory.toPath(fileName);
- RecordReader<KeyValue> recordReader =
- new KeyValueDataFileRecordReader(
+
+ RecordReader<InternalRow> fileRecordReader =
+ new FileRecordReader(
bulkFormatMapping.getReaderFactory(),
orcPoolSize == null
? new FormatReaderContext(fileIO, filePath,
fileSize)
: new OrcFormatReaderContext(
fileIO, filePath, fileSize,
orcPoolSize),
- keyType,
- valueType,
- level,
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
+
Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
- recordReader = new ApplyDeletionVectorReader<>(recordReader,
deletionVector.get());
+ fileRecordReader =
+ new ApplyDeletionVectorReader<>(fileRecordReader,
deletionVector.get());
}
- return recordReader;
+
+ return new KeyValueDataFileRecordReader(fileRecordReader, keyType,
valueType, level);
}
public static Builder builder(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
index b67edaaf3..49eea905c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
@@ -23,10 +23,11 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.format.FormatKey;
+import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.RowDataFileRecordReader;
+import org.apache.paimon.io.FileRecordReader;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
import org.apache.paimon.partition.PartitionUtils;
import org.apache.paimon.predicate.Predicate;
@@ -174,11 +175,12 @@ public class AppendOnlyFileStoreRead implements
FileStoreRead<InternalRow> {
final BinaryRow partition = split.partition();
suppliers.add(
() ->
- new RowDataFileRecordReader(
- fileIO,
-
dataFilePathFactory.toPath(file.fileName()),
- file.fileSize(),
+ new FileRecordReader(
bulkFormatMapping.getReaderFactory(),
+ new FormatReaderContext(
+ fileIO,
+
dataFilePathFactory.toPath(file.fileName()),
+ file.fileSize()),
bulkFormatMapping.getIndexMapping(),
bulkFormatMapping.getCastMapping(),
PartitionUtils.create(
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
index f1278ab71..eddc7273e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
@@ -134,10 +134,14 @@ public class FileUtils {
public static RecordReader<InternalRow> createFormatReader(
FileIO fileIO, FormatReaderFactory format, Path file, @Nullable
Long fileSize)
throws IOException {
- checkExists(fileIO, file);
- if (fileSize == null) {
- fileSize = fileIO.getFileSize(file);
+ try {
+ if (fileSize == null) {
+ fileSize = fileIO.getFileSize(file);
+ }
+ return format.createReader(new FormatReaderContext(fileIO, file,
fileSize));
+ } catch (Exception e) {
+ checkExists(fileIO, file);
+ throw e;
}
- return format.createReader(new FormatReaderContext(fileIO, file,
fileSize));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 9e0c6d2ab..3846ab234 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
@@ -57,10 +58,32 @@ import java.util.stream.Collectors;
import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket;
import static
org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link AppendOnlyFileStoreTable}. */
public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
+ @Test
+ public void testReadDeletedFiles() throws Exception {
+ writeData();
+ FileStoreTable table = createFileStoreTable();
+ List<Split> splits =
toSplits(table.newSnapshotReader().read().dataSplits());
+ TableRead read = table.newRead();
+
+ // delete one file
+ DataSplit split = (DataSplit) splits.get(0);
+ Path path =
+ table.store()
+ .pathFactory()
+ .createDataFilePathFactory(split.partition(),
split.bucket())
+ .toPath(split.dataFiles().get(0).fileName());
+ table.fileIO().deleteQuietly(path);
+
+ // read
+ assertThatThrownBy(() -> getResult(read, splits, BATCH_ROW_TO_STRING))
+ .hasMessageContaining("snapshot expires too fast");
+ }
+
@Test
public void testBatchReadWrite() throws Exception {
writeData();