This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 466de5a26 [core] Unify FileRecordReader and reduce file access (#3098)
466de5a26 is described below

commit 466de5a261792e658bf0520f1344dd4254bd88e3
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 26 20:28:47 2024 +0800

    [core] Unify FileRecordReader and reduce file access (#3098)
---
 ...FileRecordReader.java => FileRecordReader.java} | 20 +++++-----
 .../paimon/io/KeyValueDataFileRecordReader.java    | 44 +---------------------
 .../paimon/io/KeyValueFileReaderFactory.java       | 16 ++++----
 .../paimon/operation/AppendOnlyFileStoreRead.java  | 12 +++---
 .../java/org/apache/paimon/utils/FileUtils.java    | 12 ++++--
 .../paimon/table/AppendOnlyFileStoreTableTest.java | 23 +++++++++++
 6 files changed, 58 insertions(+), 69 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java 
b/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
similarity index 86%
rename from 
paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java
rename to paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
index ed891a32b..0eac2961a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/FileRecordReader.java
@@ -24,10 +24,7 @@ import org.apache.paimon.casting.CastedRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.data.columnar.ColumnarRowIterator;
-import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.FormatReaderFactory;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.ProjectedRow;
@@ -37,25 +34,26 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 
 /** Reads {@link InternalRow} from data files. */
-public class RowDataFileRecordReader implements RecordReader<InternalRow> {
+public class FileRecordReader implements RecordReader<InternalRow> {
 
     private final RecordReader<InternalRow> reader;
     @Nullable private final int[] indexMapping;
     @Nullable private final PartitionInfo partitionInfo;
     @Nullable private final CastFieldGetter[] castMapping;
 
-    public RowDataFileRecordReader(
-            FileIO fileIO,
-            Path path,
-            long fileSize,
+    public FileRecordReader(
             FormatReaderFactory readerFactory,
+            FormatReaderFactory.Context context,
             @Nullable int[] indexMapping,
             @Nullable CastFieldGetter[] castMapping,
             @Nullable PartitionInfo partitionInfo)
             throws IOException {
-        FileUtils.checkExists(fileIO, path);
-        FormatReaderContext context = new FormatReaderContext(fileIO, path, 
fileSize);
-        this.reader = readerFactory.createReader(context);
+        try {
+            this.reader = readerFactory.createReader(context);
+        } catch (Exception e) {
+            FileUtils.checkExists(context.fileIO(), context.filePath());
+            throw e;
+        }
         this.indexMapping = indexMapping;
         this.partitionInfo = partitionInfo;
         this.castMapping = castMapping;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
index 92be0ff68..e44ad79ff 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileRecordReader.java
@@ -20,17 +20,9 @@ package org.apache.paimon.io;
 
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueSerializer;
-import org.apache.paimon.PartitionSettedRow;
-import org.apache.paimon.casting.CastFieldGetter;
-import org.apache.paimon.casting.CastedRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.PartitionInfo;
-import org.apache.paimon.data.columnar.ColumnarRowIterator;
-import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.FileUtils;
-import org.apache.paimon.utils.ProjectedRow;
 
 import javax.annotation.Nullable;
 
@@ -42,27 +34,12 @@ public class KeyValueDataFileRecordReader implements 
RecordReader<KeyValue> {
     private final RecordReader<InternalRow> reader;
     private final KeyValueSerializer serializer;
     private final int level;
-    @Nullable private final int[] indexMapping;
-    @Nullable private final PartitionInfo partitionInfo;
-    @Nullable private final CastFieldGetter[] castMapping;
 
     public KeyValueDataFileRecordReader(
-            FormatReaderFactory readerFactory,
-            FormatReaderFactory.Context context,
-            RowType keyType,
-            RowType valueType,
-            int level,
-            @Nullable int[] indexMapping,
-            @Nullable CastFieldGetter[] castMapping,
-            @Nullable PartitionInfo partitionInfo)
-            throws IOException {
-        FileUtils.checkExists(context.fileIO(), context.filePath());
-        this.reader = readerFactory.createReader(context);
+            RecordReader<InternalRow> reader, RowType keyType, RowType 
valueType, int level) {
+        this.reader = reader;
         this.serializer = new KeyValueSerializer(keyType, valueType);
         this.level = level;
-        this.indexMapping = indexMapping;
-        this.partitionInfo = partitionInfo;
-        this.castMapping = castMapping;
     }
 
     @Nullable
@@ -73,23 +50,6 @@ public class KeyValueDataFileRecordReader implements 
RecordReader<KeyValue> {
             return null;
         }
 
-        if (iterator instanceof ColumnarRowIterator) {
-            iterator = ((ColumnarRowIterator) iterator).mapping(partitionInfo, 
indexMapping);
-        } else {
-            if (partitionInfo != null) {
-                final PartitionSettedRow partitionSettedRow =
-                        PartitionSettedRow.from(partitionInfo);
-                iterator = iterator.transform(partitionSettedRow::replaceRow);
-            }
-            if (indexMapping != null) {
-                final ProjectedRow projectedRow = 
ProjectedRow.from(indexMapping);
-                iterator = iterator.transform(projectedRow::replaceRow);
-            }
-        }
-        if (castMapping != null) {
-            final CastedRow castedRow = CastedRow.from(castMapping);
-            iterator = iterator.transform(castedRow::replaceRow);
-        }
         return iterator.transform(
                 internalRow ->
                         internalRow == null
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 184857b45..27ddd1ff6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -21,6 +21,7 @@ package org.apache.paimon.io;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.format.FileFormatDiscover;
@@ -125,24 +126,25 @@ public class KeyValueFileReaderFactory {
                                 key -> formatSupplier.get())
                         : formatSupplier.get();
         Path filePath = pathFactory.toPath(fileName);
-        RecordReader<KeyValue> recordReader =
-                new KeyValueDataFileRecordReader(
+
+        RecordReader<InternalRow> fileRecordReader =
+                new FileRecordReader(
                         bulkFormatMapping.getReaderFactory(),
                         orcPoolSize == null
                                 ? new FormatReaderContext(fileIO, filePath, 
fileSize)
                                 : new OrcFormatReaderContext(
                                         fileIO, filePath, fileSize, 
orcPoolSize),
-                        keyType,
-                        valueType,
-                        level,
                         bulkFormatMapping.getIndexMapping(),
                         bulkFormatMapping.getCastMapping(),
                         
PartitionUtils.create(bulkFormatMapping.getPartitionPair(), partition));
+
         Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
         if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
-            recordReader = new ApplyDeletionVectorReader<>(recordReader, 
deletionVector.get());
+            fileRecordReader =
+                    new ApplyDeletionVectorReader<>(fileRecordReader, 
deletionVector.get());
         }
-        return recordReader;
+
+        return new KeyValueDataFileRecordReader(fileRecordReader, keyType, 
valueType, level);
     }
 
     public static Builder builder(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
index b67edaaf3..49eea905c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java
@@ -23,10 +23,11 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.format.FormatKey;
+import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
-import org.apache.paimon.io.RowDataFileRecordReader;
+import org.apache.paimon.io.FileRecordReader;
 import org.apache.paimon.mergetree.compact.ConcatRecordReader;
 import org.apache.paimon.partition.PartitionUtils;
 import org.apache.paimon.predicate.Predicate;
@@ -174,11 +175,12 @@ public class AppendOnlyFileStoreRead implements 
FileStoreRead<InternalRow> {
             final BinaryRow partition = split.partition();
             suppliers.add(
                     () ->
-                            new RowDataFileRecordReader(
-                                    fileIO,
-                                    
dataFilePathFactory.toPath(file.fileName()),
-                                    file.fileSize(),
+                            new FileRecordReader(
                                     bulkFormatMapping.getReaderFactory(),
+                                    new FormatReaderContext(
+                                            fileIO,
+                                            
dataFilePathFactory.toPath(file.fileName()),
+                                            file.fileSize()),
                                     bulkFormatMapping.getIndexMapping(),
                                     bulkFormatMapping.getCastMapping(),
                                     PartitionUtils.create(
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
index f1278ab71..eddc7273e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
@@ -134,10 +134,14 @@ public class FileUtils {
     public static RecordReader<InternalRow> createFormatReader(
             FileIO fileIO, FormatReaderFactory format, Path file, @Nullable 
Long fileSize)
             throws IOException {
-        checkExists(fileIO, file);
-        if (fileSize == null) {
-            fileSize = fileIO.getFileSize(file);
+        try {
+            if (fileSize == null) {
+                fileSize = fileIO.getFileSize(file);
+            }
+            return format.createReader(new FormatReaderContext(fileIO, file, 
fileSize));
+        } catch (Exception e) {
+            checkExists(fileIO, file);
+            throw e;
         }
-        return format.createReader(new FormatReaderContext(fileIO, file, 
fileSize));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
index 9e0c6d2ab..3846ab234 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
@@ -57,10 +58,32 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.table.sink.KeyAndBucketExtractor.bucket;
 import static 
org.apache.paimon.table.sink.KeyAndBucketExtractor.bucketKeyHashCode;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link AppendOnlyFileStoreTable}. */
 public class AppendOnlyFileStoreTableTest extends FileStoreTableTestBase {
 
+    @Test
+    public void testReadDeletedFiles() throws Exception {
+        writeData();
+        FileStoreTable table = createFileStoreTable();
+        List<Split> splits = 
toSplits(table.newSnapshotReader().read().dataSplits());
+        TableRead read = table.newRead();
+
+        // delete one file
+        DataSplit split = (DataSplit) splits.get(0);
+        Path path =
+                table.store()
+                        .pathFactory()
+                        .createDataFilePathFactory(split.partition(), 
split.bucket())
+                        .toPath(split.dataFiles().get(0).fileName());
+        table.fileIO().deleteQuietly(path);
+
+        // read
+        assertThatThrownBy(() -> getResult(read, splits, BATCH_ROW_TO_STRING))
+                .hasMessageContaining("snapshot expires too fast");
+    }
+
     @Test
     public void testBatchReadWrite() throws Exception {
         writeData();

Reply via email to