This is an automated email from the ASF dual-hosted git repository.

aitozi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a53d34aef [core] Support ignore corrupt or lost files during read 
(#6821)
8a53d34aef is described below

commit 8a53d34aef9b5c03ed74455fadcb3b8e1bdfc365
Author: baiyangtx <[email protected]>
AuthorDate: Sat Feb 28 16:01:33 2026 +0800

    [core] Support ignore corrupt or lost files during read (#6821)
---
 .../shortcodes/generated/core_configuration.html   | 12 ++++
 .../main/java/org/apache/paimon/CoreOptions.java   | 20 ++++++
 .../org/apache/paimon/AppendOnlyFileStore.java     | 10 +--
 .../java/org/apache/paimon/KeyValueFileStore.java  |  3 +-
 .../paimon/io/ChainKeyValueFileReaderFactory.java  | 12 ++--
 .../org/apache/paimon/io/DataFileRecordReader.java | 80 +++++++++++++++++++---
 .../paimon/io/KeyValueFileReaderFactory.java       | 21 ++++--
 .../paimon/operation/DataEvolutionSplitRead.java   | 11 ++-
 .../apache/paimon/operation/RawFileSplitRead.java  | 14 ++--
 .../paimon/table/format/FormatReadBuilder.java     |  5 +-
 .../java/org/apache/paimon/utils/FileUtils.java    | 20 +++---
 .../flink/source/TestChangelogDataReadWrite.java   |  4 +-
 .../apache/paimon/spark/sql/PaimonQueryTest.scala  | 68 ++++++++++++++++++
 13 files changed, 233 insertions(+), 47 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index cf7c139792..9d6c1948dd 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -1121,6 +1121,18 @@ This config option does not affect the default 
filesystem metastore.</td>
             <td>Long</td>
             <td>After configuring this time, only the data files created after 
this time will be read. It is independent of snapshots, but it is imprecise 
filtering (depending on whether or not compaction occurs).</td>
         </tr>
+        <tr>
+            <td><h5>scan.ignore-corrupt-files</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Ignore corrupt files while scanning.</td>
+        </tr>
+        <tr>
+            <td><h5>scan.ignore-lost-files</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Ignore lost files while scanning.</td>
+        </tr>
         <tr>
             <td><h5>scan.manifest.parallelism</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index de9da3325f..34e680433b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1040,6 +1040,18 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "The delay duration of stream read when scan 
incremental snapshots.");
 
+    public static final ConfigOption<Boolean> SCAN_IGNORE_CORRUPT_FILE =
+            key("scan.ignore-corrupt-files")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Ignore corrupt files while scanning.");
+
+    public static final ConfigOption<Boolean> SCAN_IGNORE_LOST_FILE =
+            key("scan.ignore-lost-files")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Ignore lost files while scanning.");
+
     public static final ConfigOption<Boolean> AUTO_CREATE =
             key("auto-create")
                     .booleanType()
@@ -2969,6 +2981,14 @@ public class CoreOptions implements Serializable {
         return options.get(SCAN_VERSION);
     }
 
+    public boolean scanIgnoreCorruptFile() {
+        return options.get(SCAN_IGNORE_CORRUPT_FILE);
+    }
+
+    public boolean scanIgnoreLostFile() {
+        return options.get(SCAN_IGNORE_LOST_FILE);
+    }
+
     public Pair<String, String> incrementalBetween() {
         String str = options.get(INCREMENTAL_BETWEEN);
         String[] split = str.split(",");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 117e183f0d..ad71a1d595 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -83,8 +83,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 rowType,
                 FileFormatDiscover.of(options),
                 pathFactory(),
-                options.fileIndexReadEnabled(),
-                options.rowTrackingEnabled());
+                options);
     }
 
     public DataEvolutionSplitRead newDataEvolutionRead() {
@@ -93,12 +92,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                     "Field merge read is only supported when 
data-evolution.enabled is true.");
         }
         return new DataEvolutionSplitRead(
-                fileIO,
-                schemaManager,
-                schema,
-                rowType,
-                FileFormatDiscover.of(options),
-                pathFactory());
+                fileIO, schemaManager, schema, rowType, options, 
pathFactory());
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 077068df73..710e2dc3a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -129,8 +129,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 valueType,
                 FileFormatDiscover.of(options),
                 pathFactory(),
-                options.fileIndexReadEnabled(),
-                false);
+                options);
     }
 
     public KeyValueFileReaderFactory.Builder newReaderFactoryBuilder() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
index 38dfb05c78..d4e98af85e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
@@ -51,10 +51,10 @@ public class ChainKeyValueFileReaderFactory extends 
KeyValueFileReaderFactory {
             RowType valueType,
             FormatReaderMapping.Builder formatReaderMappingBuilder,
             DataFilePathFactory pathFactory,
-            long asyncThreshold,
             BinaryRow partition,
             DeletionVector.Factory dvFactory,
-            ChainReadContext chainReadContext) {
+            ChainReadContext chainReadContext,
+            CoreOptions coreOptions) {
         super(
                 fileIO,
                 schemaManager,
@@ -63,9 +63,9 @@ public class ChainKeyValueFileReaderFactory extends 
KeyValueFileReaderFactory {
                 valueType,
                 formatReaderMappingBuilder,
                 pathFactory,
-                asyncThreshold,
                 partition,
-                dvFactory);
+                dvFactory,
+                coreOptions);
         this.chainReadContext = chainReadContext;
         CoreOptions options = new CoreOptions(schema.options());
         this.currentBranch = options.branch();
@@ -130,10 +130,10 @@ public class ChainKeyValueFileReaderFactory extends 
KeyValueFileReaderFactory {
                     wrapped.readValueType,
                     builder,
                     
wrapped.pathFactory.createChainReadDataFilePathFactory(chainReadContext),
-                    wrapped.options.fileReaderAsyncThreshold().getBytes(),
                     partition,
                     dvFactory,
-                    chainReadContext);
+                    chainReadContext,
+                    wrapped.options);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
index 2584aef00f..70650fbf4b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.data.columnar.ColumnarRowIterator;
 import org.apache.paimon.format.FormatReaderFactory;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.table.SpecialFields;
@@ -35,6 +36,9 @@ import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.ProjectedRow;
 import org.apache.paimon.utils.RoaringBitmap32;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -43,9 +47,11 @@ import java.util.Map;
 
 /** Reads {@link InternalRow} from data files. */
 public class DataFileRecordReader implements FileRecordReader<InternalRow> {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(DataFileRecordReader.class);
+    private final Path filePath;
     private final RowType tableRowType;
     private final FileRecordReader<InternalRow> reader;
+    private final boolean ignoreCorruptFiles;
     @Nullable private final int[] indexMapping;
     @Nullable private final PartitionInfo partitionInfo;
     @Nullable private final CastFieldGetter[] castMapping;
@@ -59,6 +65,8 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
             RowType tableRowType,
             FormatReaderFactory readerFactory,
             FormatReaderFactory.Context context,
+            boolean ignoreCorruptFiles,
+            boolean ignoreLostFiles,
             @Nullable int[] indexMapping,
             @Nullable CastFieldGetter[] castMapping,
             @Nullable PartitionInfo partitionInfo,
@@ -69,7 +77,9 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
             throws IOException {
         this(
                 tableRowType,
-                createReader(readerFactory, context),
+                createReader(readerFactory, context, ignoreCorruptFiles, 
ignoreLostFiles),
+                ignoreCorruptFiles,
+                ignoreLostFiles,
                 indexMapping,
                 castMapping,
                 partitionInfo,
@@ -77,12 +87,15 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
                 firstRowId,
                 maxSequenceNumber,
                 systemFields,
-                context.selection());
+                context.selection(),
+                context.filePath());
     }
 
     public DataFileRecordReader(
             RowType tableRowType,
             FileRecordReader<InternalRow> reader,
+            boolean ignoreCorruptFiles,
+            boolean ignoreLostFiles,
             @Nullable int[] indexMapping,
             @Nullable CastFieldGetter[] castMapping,
             @Nullable PartitionInfo partitionInfo,
@@ -90,9 +103,11 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
             @Nullable Long firstRowId,
             long maxSequenceNumber,
             Map<String, Integer> systemFields,
-            @Nullable RoaringBitmap32 selection) {
+            @Nullable RoaringBitmap32 selection,
+            Path filePath) {
         this.tableRowType = tableRowType;
         this.reader = reader;
+        this.ignoreCorruptFiles = ignoreCorruptFiles;
         this.indexMapping = indexMapping;
         this.partitionInfo = partitionInfo;
         this.castMapping = castMapping;
@@ -101,22 +116,62 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
         this.maxSequenceNumber = maxSequenceNumber;
         this.systemFields = systemFields;
         this.selection = selection;
+        this.filePath = filePath;
     }
 
     private static FileRecordReader<InternalRow> createReader(
-            FormatReaderFactory readerFactory, FormatReaderFactory.Context 
context)
+            FormatReaderFactory readerFactory,
+            FormatReaderFactory.Context context,
+            boolean ignoreCorruptFiles,
+            boolean ignoreLostFiles)
             throws IOException {
         try {
             return readerFactory.createReader(context);
         } catch (Exception e) {
-            FileUtils.checkExists(context.fileIO(), context.filePath());
-            throw e;
+            boolean exists = context.fileIO().exists(context.filePath());
+            if (!exists) {
+                if (ignoreLostFiles) {
+                    LOG.warn(
+                            "Failed to create FileRecordReader for file: {}, 
file lost",
+                            context.filePath());
+                    return null;
+                } else {
+                    throw 
FileUtils.newFileNotFoundException(context.filePath());
+                }
+            } else {
+                if (ignoreCorruptException(e, ignoreCorruptFiles)) {
+                    LOG.warn(
+                            "Failed to create FileRecordReader for file: {}, 
ignore exception",
+                            context.filePath(),
+                            e);
+                    return null;
+                } else {
+                    throw new IOException(
+                            "Failed to create FileRecordReader for file: " + 
context.filePath(), e);
+                }
+            }
         }
     }
 
     @Nullable
     @Override
     public FileRecordIterator<InternalRow> readBatch() throws IOException {
+        if (reader == null) {
+            LOG.warn("Reader is not initialized, maybe file: {} is corrupt.", 
filePath);
+            return null;
+        }
+        try {
+            return readBatchInternal();
+        } catch (Exception e) {
+            if (ignoreCorruptException(e, ignoreCorruptFiles)) {
+                LOG.warn("Failed to read batch from file: {}, ignore 
exception", filePath, e);
+                return null;
+            }
+            throw new IOException("Failed to read batch from file: " + 
filePath, e);
+        }
+    }
+
+    private FileRecordIterator<InternalRow> readBatchInternal() throws 
IOException {
         FileRecordIterator<InternalRow> iterator = reader.readBatch();
         if (iterator == null) {
             return null;
@@ -186,6 +241,15 @@ public class DataFileRecordReader implements 
FileRecordReader<InternalRow> {
 
     @Override
     public void close() throws IOException {
-        reader.close();
+        if (reader != null) {
+            reader.close();
+        }
+    }
+
+    private static boolean ignoreCorruptException(Throwable e, boolean 
ignoreCorruptFiles) {
+        return ignoreCorruptFiles
+                && (e instanceof IOException
+                        || e instanceof RuntimeException
+                        || e instanceof InternalError);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
index 49244757a9..f8b3341b52 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java
@@ -66,7 +66,8 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
     private final FormatReaderMapping.Builder formatReaderMappingBuilder;
     private final DataFilePathFactory pathFactory;
     private final long asyncThreshold;
-
+    private final boolean ignoreCorruptFiles;
+    private final boolean ignoreLostFiles;
     private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
     private final BinaryRow partition;
     private final DeletionVector.Factory dvFactory;
@@ -79,9 +80,9 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
             RowType valueType,
             FormatReaderMapping.Builder formatReaderMappingBuilder,
             DataFilePathFactory pathFactory,
-            long asyncThreshold,
             BinaryRow partition,
-            DeletionVector.Factory dvFactory) {
+            DeletionVector.Factory dvFactory,
+            CoreOptions coreOptions) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.schema = schema;
@@ -89,7 +90,9 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
         this.valueType = valueType;
         this.formatReaderMappingBuilder = formatReaderMappingBuilder;
         this.pathFactory = pathFactory;
-        this.asyncThreshold = asyncThreshold;
+        this.asyncThreshold = 
coreOptions.fileReaderAsyncThreshold().getBytes();
+        this.ignoreCorruptFiles = coreOptions.scanIgnoreCorruptFile();
+        this.ignoreLostFiles = coreOptions.scanIgnoreLostFile();
         this.partition = partition;
         this.formatReaderMappings = new HashMap<>();
         this.dvFactory = dvFactory;
@@ -148,6 +151,8 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                                 ? new FormatReaderContext(fileIO, filePath, 
fileSize)
                                 : new OrcFormatReaderContext(
                                         fileIO, filePath, fileSize, 
orcPoolSize),
+                        ignoreCorruptFiles,
+                        ignoreLostFiles,
                         formatReaderMapping.getIndexMapping(),
                         formatReaderMapping.getCastMapping(),
                         PartitionUtils.create(
@@ -279,9 +284,9 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
                     readValueType,
                     builder,
                     pathFactory.createDataFilePathFactory(partition, bucket),
-                    options.fileReaderAsyncThreshold().getBytes(),
                     partition,
-                    dvFactory);
+                    dvFactory,
+                    options);
         }
 
         protected FormatReaderMapping.Builder formatReaderMappingBuilder(
@@ -303,5 +308,9 @@ public class KeyValueFileReaderFactory implements 
FileReaderFactory<KeyValue> {
         public FileIO fileIO() {
             return fileIO;
         }
+
+        public CoreOptions options() {
+            return options;
+        }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index 153fc4e6ce..ec38b3bf15 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.append.ForceSingleBatchReader;
 import org.apache.paimon.data.BinaryRow;
@@ -88,6 +89,7 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
     private final FileStorePathFactory pathFactory;
     private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
     private final Function<Long, TableSchema> schemaFetcher;
+    private final CoreOptions coreOptions;
 
     protected RowType readRowType;
 
@@ -96,14 +98,15 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
             SchemaManager schemaManager,
             TableSchema schema,
             RowType rowType,
-            FileFormatDiscover formatDiscover,
+            CoreOptions coreOptions,
             FileStorePathFactory pathFactory) {
         this.fileIO = fileIO;
         final Map<Long, TableSchema> cache = new HashMap<>();
         this.schemaFetcher =
                 schemaId -> cache.computeIfAbsent(schemaId, key -> 
schemaManager.schema(schemaId));
         this.schema = schema;
-        this.formatDiscover = formatDiscover;
+        this.formatDiscover = FileFormatDiscover.of(coreOptions);
+        this.coreOptions = coreOptions;
         this.pathFactory = pathFactory;
         this.formatReaderMappings = new HashMap<>();
         this.readRowType = rowType;
@@ -371,6 +374,8 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                                     readRowType,
                                     formatReaderMapping.getReaderFactory(),
                                     formatReaderContext,
+                                    coreOptions.scanIgnoreCorruptFile(),
+                                    coreOptions.scanIgnoreLostFile(),
                                     formatReaderMapping.getIndexMapping(),
                                     formatReaderMapping.getCastMapping(),
                                     PartitionUtils.create(
@@ -398,6 +403,8 @@ public class DataEvolutionSplitRead implements 
SplitRead<InternalRow> {
                 readRowType,
                 formatReaderMapping.getReaderFactory(),
                 formatReaderContext,
+                coreOptions.scanIgnoreCorruptFile(),
+                coreOptions.scanIgnoreLostFile(),
                 formatReaderMapping.getIndexMapping(),
                 formatReaderMapping.getCastMapping(),
                 PartitionUtils.create(formatReaderMapping.getPartitionPair(), 
partition),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
index 4b4beeaf31..464d9ea0e5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.operation;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
@@ -82,6 +83,8 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
     private final Map<FormatKey, FormatReaderMapping> formatReaderMappings;
     private final boolean fileIndexReadEnabled;
     private final boolean rowTrackingEnabled;
+    private final boolean ignoreCorruptFiles;
+    private final boolean ignoreLostFiles;
 
     private RowType readRowType;
     @Nullable private List<Predicate> filters;
@@ -95,16 +98,17 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
             RowType rowType,
             FileFormatDiscover formatDiscover,
             FileStorePathFactory pathFactory,
-            boolean fileIndexReadEnabled,
-            boolean rowTrackingEnabled) {
+            CoreOptions coreOptions) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.schema = schema;
         this.formatDiscover = formatDiscover;
         this.pathFactory = pathFactory;
         this.formatReaderMappings = new HashMap<>();
-        this.fileIndexReadEnabled = fileIndexReadEnabled;
-        this.rowTrackingEnabled = rowTrackingEnabled;
+        this.fileIndexReadEnabled = coreOptions.fileIndexReadEnabled();
+        this.ignoreCorruptFiles = coreOptions.scanIgnoreCorruptFile();
+        this.ignoreLostFiles = coreOptions.scanIgnoreLostFile();
+        this.rowTrackingEnabled = coreOptions.rowTrackingEnabled();
         this.readRowType = rowType;
     }
 
@@ -285,6 +289,8 @@ public class RawFileSplitRead implements 
SplitRead<InternalRow> {
                         schema.logicalRowType(),
                         formatReaderMapping.getReaderFactory(),
                         formatReaderContext,
+                        ignoreCorruptFiles,
+                        ignoreLostFiles,
                         formatReaderMapping.getIndexMapping(),
                         formatReaderMapping.getCastMapping(),
                         
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 976c95f1f1..50aba047c1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -190,6 +190,8 @@ public class FormatReadBuilder implements ReadBuilder {
             return new DataFileRecordReader(
                     readType(),
                     reader,
+                    options.scanIgnoreCorruptFile(),
+                    options.scanIgnoreLostFile(),
                     null,
                     null,
                     PartitionUtils.create(partitionMapping, 
dataSplit.partition()),
@@ -197,7 +199,8 @@ public class FormatReadBuilder implements ReadBuilder {
                     null,
                     0,
                     Collections.emptyMap(),
-                    null);
+                    null,
+                    formatReaderContext.filePath());
         } catch (Exception e) {
             FileUtils.checkExists(formatReaderContext.fileIO(), 
formatReaderContext.filePath());
             throw e;
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
index d714487f47..8c41953761 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/FileUtils.java
@@ -109,17 +109,21 @@ public class FileUtils {
 
     public static void checkExists(FileIO fileIO, Path file) throws 
IOException {
         if (!fileIO.exists(file)) {
-            throw new FileNotFoundException(
-                    String.format(
-                            "File '%s' not found, Possible causes: "
-                                    + "1.snapshot expires too fast, you can 
configure 'snapshot.time-retained'"
-                                    + " option with a larger value. "
-                                    + "2.consumption is too slow, you can 
improve the performance of consumption"
-                                    + " (For example, increasing 
parallelism).",
-                            file));
+            throw newFileNotFoundException(file);
         }
     }
 
+    public static FileNotFoundException newFileNotFoundException(Path file) {
+        return new FileNotFoundException(
+                String.format(
+                        "File '%s' not found, Possible causes: "
+                                + "1.snapshot expires too fast, you can 
configure 'snapshot.time-retained'"
+                                + " option with a larger value. "
+                                + "2.consumption is too slow, you can improve 
the performance of consumption"
+                                + " (For example, increasing parallelism).",
+                        file));
+    }
+
     public static RecordReader<InternalRow> createFormatReader(
             FileIO fileIO, FormatReaderFactory format, Path file, @Nullable 
Long fileSize)
             throws IOException {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index dcd734cad7..23e837d31d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -141,6 +141,7 @@ public class TestChangelogDataReadWrite {
                                 pathFactory,
                                 EXTRACTOR,
                                 options));
+
         RawFileSplitRead rawFileRead =
                 new RawFileSplitRead(
                         LocalFileIO.create(),
@@ -149,8 +150,7 @@ public class TestChangelogDataReadWrite {
                         VALUE_TYPE,
                         FileFormatDiscover.of(options),
                         pathFactory,
-                        options.fileIndexReadEnabled(),
-                        false);
+                        options);
         return new KeyValueTableRead(() -> read, () -> rawFileRead, null);
     }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
index 317182f7ee..d381af6ded 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.sql
 
+import org.apache.paimon.fs.Path
 import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.paimon.table.source.DataSplit
 
@@ -428,6 +429,73 @@ class PaimonQueryTest extends PaimonSparkTestBase {
     }
   }
 
+  fileFormats.foreach {
+    fileFormat =>
+      test(s"Query ignore-corrupt-files: file.format=$fileFormat") {
+        withTable("T") {
+          spark.sql(s"""
+                       |CREATE TABLE T (id INT, name STRING, pt STRING)
+                       |PARTITIONED BY (pt)
+                       |TBLPROPERTIES ('file.format'='$fileFormat', 
'bucket'='4', 'bucket-key'='id')
+                       |""".stripMargin)
+          spark.sql("INSERT INTO T VALUES (1, 'x1', '2024'), (3, 'x3', 
'2024')")
+
+          spark.sql("INSERT INTO T VALUES (2, 'x2', '2024'), (4, 'x4', 
'2024')")
+
+          val allFiles = getAllFiles("T", Seq("pt"), null)
+          Assertions.assertEquals(4, allFiles.length)
+          val corruptFile = allFiles.head
+          val io = loadTable("T").fileIO()
+          io.overwriteFileUtf8(new Path(corruptFile), "corrupt file")
+          val content = io.readFileUtf8(new Path(corruptFile))
+          Assertions.assertEquals("corrupt file", content)
+
+          withSQLConf("spark.paimon.scan.ignore-corrupt-files" -> "true") {
+            val res = spark.sql("SELECT * FROM T")
+            Assertions.assertEquals(3, res.collect().length)
+          }
+        }
+      }
+  }
+
+  fileFormats.foreach {
+    fileFormat =>
+      test(s"Query ignore-lost-files: file.format=$fileFormat") {
+        withTable("T") {
+          spark.sql(s"""
+                       |CREATE TABLE T (id INT, name STRING, pt STRING)
+                       |PARTITIONED BY (pt)
+                       |TBLPROPERTIES ('file.format'='$fileFormat', 
'bucket'='4', 'bucket-key'='id')
+                       |""".stripMargin)
+          spark.sql("INSERT INTO T VALUES (1, 'x1', '2024'), (3, 'x3', 
'2024')")
+
+          spark.sql("INSERT INTO T VALUES (2, 'x2', '2024'), (4, 'x4', 
'2024')")
+
+          val allFiles = getAllFiles("T", Seq("pt"), null)
+          Assertions.assertEquals(4, allFiles.length)
+          val lostFile = allFiles.head
+          val io = loadTable("T").fileIO()
+          io.deleteQuietly(new Path(lostFile))
+
+          withSQLConf("spark.paimon.scan.ignore-corrupt-files" -> "true") {
+            var failed: Boolean = false
+            try {
+              spark.sql("SELECT * FROM T").collect()
+            } catch {
+              case e: Exception => failed = true
+            }
+            Assertions.assertTrue(failed)
+          }
+
+          withSQLConf("spark.paimon.scan.ignore-lost-files" -> "true") {
+            val res = spark.sql("SELECT * FROM T")
+            Assertions.assertEquals(3, res.collect().length)
+          }
+
+        }
+      }
+  }
+
   private def getAllFiles(
       tableName: String,
       partitions: Seq[String],

Reply via email to