This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c91066772b59 [SPARK-52482][SQL][CORE] Improve exception handling for reading certain corrupt zstd files c91066772b59 is described below commit c91066772b5946284f1988c68e566ccab4efc623 Author: Matt Zhang <matt@zhang.software> AuthorDate: Tue Aug 12 17:29:16 2025 +0800 [SPARK-52482][SQL][CORE] Improve exception handling for reading certain corrupt zstd files ### What changes were proposed in this pull request? This PR makes us handle skipping corrupted files more consistently by catching a different type of exception that may arise from reading corrupted files. ### Why are the changes needed? Reading files compressed with ZSTD corrupted in some ways yields an exception type we catch and compressed files corrupted in other ways results in an exception we don't. This means that sometimes the `ignoreCorruptFiles` setting is sometimes not respected depending on how the file is corrupted. For example: Reading a file with the last 10 bytes removed: ``` scala> spark.read.option("ignoreCorruptFiles", "true").csv("scratch/corrupt_tail.csv.zst").count() val res3: Long = 0 ``` Reading a file with the first 10 bytes removed: ``` scala> spark.read.option("ignoreCorruptFiles", "true").csv("scratch/corrupt_head.csv.zst").count() 25/08/08 19:31:31 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.InternalError: Unknown frame descriptor at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.inflateBytesDirect(Native Method) at org.apache.hadoop.io.compress.zstd.ZStandardDecompressor.decompress(ZStandardDecompressor.java:187) at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111) ... ``` This diff makes it so that errors that result from ZSTD codec, including the latter's InternalError, does not cause this inconsistency. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added tests in CSVSuite ### Was this patch authored or co-authored using generative AI tooling? No Closes #51916 from mzhang/corrupted-zst. Authored-by: Matt Zhang <matt@zhang.software> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../execution/datasources/DataSourceUtils.scala | 6 +++++ .../sql/execution/datasources/FileScanRDD.scala | 5 ++-- .../datasources/v2/FilePartitionReader.scala | 8 +++--- .../sql/execution/datasources/csv/CSVSuite.scala | 29 +++++++++++++++++++++- 4 files changed, 41 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala index 3e66b97f61a6..10cfe9f145f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import java.io.IOException import java.util.Locale import scala.jdk.CollectionConverters._ @@ -197,6 +198,11 @@ object DataSourceUtils extends PredicateHelper { QueryExecutionErrors.sparkUpgradeInWritingDatesError(format, config) } + def shouldIgnoreCorruptFileException(e: Throwable): Boolean = e match { + case _: RuntimeException | _: IOException | _: InternalError => true + case _ => false + } + def createDateRebaseFuncInRead( rebaseMode: LegacyBehaviorPolicy.Value, format: String): Int => Int = rebaseMode match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index f874de8c1abd..5dc13ccee9ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources -import java.io.{Closeable, FileNotFoundException, IOException} +import java.io.{Closeable, FileNotFoundException} import java.net.URI import org.apache.hadoop.fs.Path @@ -268,7 +268,8 @@ class FileScanRDD( // Throw FileNotFoundException even if `ignoreCorruptFiles` is true case e: FileNotFoundException if !ignoreMissingFiles => throw e case e @ (_ : AccessControlException | _ : BlockMissingException) => throw e - case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => + case e if ignoreCorruptFiles && + DataSourceUtils.shouldIgnoreCorruptFileException(e) => logWarning(log"Skipped the rest of the content in the corrupted file: " + log"${MDC(PATH, currentFile)}", e) finished = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala index d1f445ec63d9..37060f9bd081 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FilePartitionReader.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.execution.datasources.v2 -import java.io.{FileNotFoundException, IOException} +import java.io.FileNotFoundException import org.apache.hadoop.hdfs.BlockMissingException import org.apache.hadoop.security.AccessControlException @@ -26,7 +26,7 @@ import org.apache.spark.internal.LogKeys.{CURRENT_FILE, PARTITIONED_FILE_READER} import org.apache.spark.rdd.InputFileBlockHolder import org.apache.spark.sql.catalyst.FileSourceOptions import org.apache.spark.sql.connector.read.PartitionReader -import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.{DataSourceUtils, PartitionedFile} class FilePartitionReader[T]( files: Iterator[PartitionedFile], @@ -53,7 +53,7 @@ class FilePartitionReader[T]( currentReader = null case e @ (_ : AccessControlException | _ : BlockMissingException) => throw FileDataSourceV2.attachFilePath(file.urlEncodedPath, e) - case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => + case e if ignoreCorruptFiles && DataSourceUtils.shouldIgnoreCorruptFileException(e) => logWarning( s"Skipped the rest of the content in the corrupted file.", e) currentReader = null @@ -71,7 +71,7 @@ class FilePartitionReader[T]( } catch { case e @ (_ : AccessControlException | _ : BlockMissingException) => throw FileDataSourceV2.attachFilePath(currentReader.file.urlEncodedPath, e) - case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles => + case e if ignoreCorruptFiles && DataSourceUtils.shouldIgnoreCorruptFileException(e) => logWarning(log"Skipped the rest of the content in the corrupted file: " + log"${MDC(PARTITIONED_FILE_READER, currentReader)}", e) false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 29d28552c320..eaf137cbf8f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -18,8 +18,9 @@ package org.apache.spark.sql.execution.datasources.csv import java.io.{EOFException, File, FileOutputStream} +import java.net.URI import java.nio.charset.{Charset, StandardCharsets} -import java.nio.file.{Files, StandardOpenOption} +import java.nio.file.{Files, Paths, StandardOpenOption} import java.sql.{Date, Timestamp} import java.text.SimpleDateFormat import java.time._ @@ -3794,6 +3795,32 @@ abstract class CSVSuite verifyCars(cars, withHeader = true, checkTypes = true) } } + + test("corrupted ZSTD compressed csv respects ignoreCorruptFiles") { + withTempDir { dir => + val originalFile = new File(dir, "original.csv.zst") + val corruptedHeadFile = new File(dir, "corrupted_head.csv.zst") + val corruptedTailFile = new File(dir, "corrupted_tail.csv.zst") + val bytes = Files.readAllBytes(Paths.get(new URI(testFile(zstCompressedCarsFile)))) + Files.write(originalFile.toPath(), bytes) + Files.write(corruptedHeadFile.toPath(), bytes.drop(10)) + Files.write(corruptedTailFile.toPath(), bytes.dropRight(10)) + + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") { + val df = spark.read.format("csv").option("header", "true").load(dir.getAbsolutePath) + // check that the entries from originalFile are still read + assert(df.count() == 3) + } + + withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") { + val ex = intercept[SparkException] { + spark.read.format("csv").option("header", "true").load(dir.getAbsolutePath).collect() + } + checkErrorMatchPVals(ex, "FAILED_READ_FILE.NO_HINT", + Map("path" -> ".*corrupted.*\\.csv\\.zst")) + } + } + } } class CSVv1Suite extends CSVSuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org