This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c91066772b59 [SPARK-52482][SQL][CORE] Improve exception handling for 
reading certain corrupt zstd files
c91066772b59 is described below

commit c91066772b5946284f1988c68e566ccab4efc623
Author: Matt Zhang <matt@zhang.software>
AuthorDate: Tue Aug 12 17:29:16 2025 +0800

    [SPARK-52482][SQL][CORE] Improve exception handling for reading certain 
corrupt zstd files
    
    ### What changes were proposed in this pull request?
    
    This PR makes us handle skipping corrupted files more consistently by 
catching a different type of exception that may arise from reading corrupted 
files.
    
    ### Why are the changes needed?
    
    Reading files compressed with ZSTD corrupted in some ways yields an 
exception type we catch and compressed files corrupted in other ways results in 
an exception we don't. This means that sometimes the `ignoreCorruptFiles` 
setting is sometimes not respected depending on how the file is corrupted.
    
    For example:
    
    Reading a file with the last 10 bytes removed:
    ```
    scala> spark.read.option("ignoreCorruptFiles", 
"true").csv("scratch/corrupt_tail.csv.zst").count()
    val res3: Long = 0
    ```
    
    Reading a file with the first 10 bytes removed:
    ```
    scala> spark.read.option("ignoreCorruptFiles", 
"true").csv("scratch/corrupt_head.csv.zst").count()
    25/08/08 19:31:31 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    java.lang.InternalError: Unknown frame descriptor
            at 
org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native
 Method)
            at 
org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:187)
            at 
org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
            ...
    ```
    
    This diff makes it so that errors that result from ZSTD codec, including 
the latter's InternalError, does not cause this inconsistency.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added tests in CSVSuite
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51916 from mzhang/corrupted-zst.
    
    Authored-by: Matt Zhang <matt@zhang.software>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../execution/datasources/DataSourceUtils.scala    |  6 +++++
 .../sql/execution/datasources/FileScanRDD.scala    |  5 ++--
 .../datasources/v2/FilePartitionReader.scala       |  8 +++---
 .../sql/execution/datasources/csv/CSVSuite.scala   | 29 +++++++++++++++++++++-
 4 files changed, 41 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index 3e66b97f61a6..10cfe9f145f6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.IOException
 import java.util.Locale
 
 import scala.jdk.CollectionConverters._
@@ -197,6 +198,11 @@ object DataSourceUtils extends PredicateHelper {
     QueryExecutionErrors.sparkUpgradeInWritingDatesError(format, config)
   }
 
+  def shouldIgnoreCorruptFileException(e: Throwable): Boolean = e match {
+    case _: RuntimeException | _: IOException | _: InternalError => true
+    case _ => false
+  }
+
   def createDateRebaseFuncInRead(
       rebaseMode: LegacyBehaviorPolicy.Value,
       format: String): Int => Int = rebaseMode match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index f874de8c1abd..5dc13ccee9ce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.io.{Closeable, FileNotFoundException, IOException}
+import java.io.{Closeable, FileNotFoundException}
 import java.net.URI
 
 import org.apache.hadoop.fs.Path
@@ -268,7 +268,8 @@ class FileScanRDD(
                   // Throw FileNotFoundException even if `ignoreCorruptFiles` 
is true
                   case e: FileNotFoundException if !ignoreMissingFiles => 
throw e
                   case e @ (_ : AccessControlException | _ : 
BlockMissingException) => throw e
-                  case e @ (_: RuntimeException | _: IOException) if 
ignoreCorruptFiles =>
+                  case e if ignoreCorruptFiles &&
+                      DataSourceUtils.shouldIgnoreCorruptFileException(e) =>
                     logWarning(log"Skipped the rest of the content in the 
corrupted file: " +
                       log"${MDC(PATH, currentFile)}", e)
                     finished = true
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
index d1f445ec63d9..37060f9bd081 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.execution.datasources.v2
 
-import java.io.{FileNotFoundException, IOException}
+import java.io.FileNotFoundException
 
 import org.apache.hadoop.hdfs.BlockMissingException
 import org.apache.hadoop.security.AccessControlException
@@ -26,7 +26,7 @@ import org.apache.spark.internal.LogKeys.{CURRENT_FILE, 
PARTITIONED_FILE_READER}
 import org.apache.spark.rdd.InputFileBlockHolder
 import org.apache.spark.sql.catalyst.FileSourceOptions
 import org.apache.spark.sql.connector.read.PartitionReader
-import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.{DataSourceUtils, 
PartitionedFile}
 
 class FilePartitionReader[T](
     files: Iterator[PartitionedFile],
@@ -53,7 +53,7 @@ class FilePartitionReader[T](
             currentReader = null
           case e @ (_ : AccessControlException | _ : BlockMissingException) =>
             throw FileDataSourceV2.attachFilePath(file.urlEncodedPath, e)
-          case e @ (_: RuntimeException | _: IOException) if 
ignoreCorruptFiles =>
+          case e if ignoreCorruptFiles && 
DataSourceUtils.shouldIgnoreCorruptFileException(e) =>
             logWarning(
               s"Skipped the rest of the content in the corrupted file.", e)
             currentReader = null
@@ -71,7 +71,7 @@ class FilePartitionReader[T](
     } catch {
       case e @ (_ : AccessControlException | _ : BlockMissingException) =>
         throw 
FileDataSourceV2.attachFilePath(currentReader.file.urlEncodedPath, e)
-      case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
+      case e if ignoreCorruptFiles && 
DataSourceUtils.shouldIgnoreCorruptFileException(e) =>
         logWarning(log"Skipped the rest of the content in the corrupted file: 
" +
           log"${MDC(PARTITIONED_FILE_READER, currentReader)}", e)
         false
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 29d28552c320..eaf137cbf8f7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -18,8 +18,9 @@
 package org.apache.spark.sql.execution.datasources.csv
 
 import java.io.{EOFException, File, FileOutputStream}
+import java.net.URI
 import java.nio.charset.{Charset, StandardCharsets}
-import java.nio.file.{Files, StandardOpenOption}
+import java.nio.file.{Files, Paths, StandardOpenOption}
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
 import java.time._
@@ -3794,6 +3795,32 @@ abstract class CSVSuite
       verifyCars(cars, withHeader = true, checkTypes = true)
     }
   }
+
+  test("corrupted ZSTD compressed csv respects ignoreCorruptFiles") {
+    withTempDir { dir =>
+      val originalFile = new File(dir, "original.csv.zst")
+      val corruptedHeadFile = new File(dir, "corrupted_head.csv.zst")
+      val corruptedTailFile = new File(dir, "corrupted_tail.csv.zst")
+      val bytes = Files.readAllBytes(Paths.get(new 
URI(testFile(zstCompressedCarsFile))))
+      Files.write(originalFile.toPath(), bytes)
+      Files.write(corruptedHeadFile.toPath(), bytes.drop(10))
+      Files.write(corruptedTailFile.toPath(), bytes.dropRight(10))
+
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+        val df = spark.read.format("csv").option("header", 
"true").load(dir.getAbsolutePath)
+        // check that the entries from originalFile are still read
+        assert(df.count() == 3)
+      }
+
+      withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+        val ex = intercept[SparkException] {
+          spark.read.format("csv").option("header", 
"true").load(dir.getAbsolutePath).collect()
+        }
+        checkErrorMatchPVals(ex, "FAILED_READ_FILE.NO_HINT",
+          Map("path" -> ".*corrupted.*\\.csv\\.zst"))
+      }
+    }
+  }
 }
 
 class CSVv1Suite extends CSVSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to