This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 11e7ea4f11d [SPARK-45035][SQL] Fix
ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will report error
11e7ea4f11d is described below
commit 11e7ea4f11df71e2942322b01fcaab57dac20c83
Author: Jia Fan <[email protected]>
AuthorDate: Wed Oct 18 11:06:43 2023 +0500
[SPARK-45035][SQL] Fix ignoreCorruptFiles/ignoreMissingFiles with multiline
CSV/JSON will report error
### What changes were proposed in this pull request?
Fix ignoreCorruptFiles/ignoreMissingFiles with multiline CSV/JSON will
report error, it would be like:
```log
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
in stage 4940.0 failed 4 times, most recent failure: Lost task 0.3 in stage
4940.0 (TID 4031) (10.68.177.106 executor 0):
com.univocity.parsers.common.TextParsingException:
java.lang.IllegalStateException - Error reading from input
Parser Configuration: CsvParserSettings:
Auto configuration enabled=true
Auto-closing enabled=true
Autodetect column delimiter=false
Autodetect quotes=false
Column reordering enabled=true
Delimiters for detection=null
Empty value=
Escape unquoted values=false
Header extraction enabled=null
Headers=null
Ignore leading whitespaces=false
Ignore leading whitespaces in quotes=false
Ignore trailing whitespaces=false
Ignore trailing whitespaces in quotes=false
Input buffer size=1048576
Input reading on separate thread=false
Keep escape sequences=false
Keep quotes=false
Length of content displayed on error=1000
Line separator detection enabled=true
Maximum number of characters per column=-1
Maximum number of columns=20480
Normalize escaped line separators=true
Null value=
Number of records to read=all
Processor=none
Restricting data in exceptions=false
RowProcessor error handler=null
Selected fields=none
Skip bits as whitespace=true
Skip empty lines=true
Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
CsvFormat:
Comment character=#
Field delimiter=,
Line separator (normalized)=\n
Line separator sequence=\n
Quote character="
Quote escape character=\
Quote escape escape character=null
Internal state when error was thrown: line=0, column=0, record=0
at
com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:402)
at
com.univocity.parsers.common.AbstractParser.beginParsing(AbstractParser.java:277)
at
com.univocity.parsers.common.AbstractParser.beginParsing(AbstractParser.java:843)
at
org.apache.spark.sql.catalyst.csv.UnivocityParser$$anon$1.<init>(UnivocityParser.scala:463)
at
org.apache.spark.sql.catalyst.csv.UnivocityParser$.convertStream(UnivocityParser.scala:46...
```
Because multiline CSV/JSON use `BinaryFileRDD` not `FileScanRDD`. Unlike
`FileScanRDD`, when met corrupt files will check `ignoreCorruptFiles` config to
avoid report IOException, `BinaryFileRDD` will not report error because it
return normal `PortableDataStream`. So we should catch it when infer schema in
lambda function. Also do same thing for `ignoreMissingFiles`.
### Why are the changes needed?
Fix the bug when use mulitline mode with
ignoreCorruptFiles/ignoreMissingFiles config.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
add new test.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #42979 from Hisoka-X/SPARK-45035_csv_multi_line.
Authored-by: Jia Fan <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
.../spark/sql/catalyst/json/JsonInferSchema.scala | 18 +++++--
.../execution/datasources/csv/CSVDataSource.scala | 28 ++++++++---
.../datasources/CommonFileDataSourceSuite.scala | 28 +++++++++++
.../sql/execution/datasources/csv/CSVSuite.scala | 58 +++++++++++++---------
.../sql/execution/datasources/json/JsonSuite.scala | 46 ++++++++++++++++-
5 files changed, 142 insertions(+), 36 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 4123c5290b6..4d04b34876c 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.json
-import java.io.CharConversionException
+import java.io.{CharConversionException, FileNotFoundException, IOException}
import java.nio.charset.MalformedInputException
import java.util.Comparator
@@ -25,6 +25,7 @@ import scala.util.control.Exception.allCatch
import com.fasterxml.jackson.core._
+import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
@@ -36,7 +37,7 @@ import org.apache.spark.sql.internal.{LegacyBehaviorPolicy,
SQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
-private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable {
+private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable
with Logging {
private val decimalParser = ExprUtils.getDecimalParser(options.locale)
@@ -53,6 +54,9 @@ private[sql] class JsonInferSchema(options: JSONOptions)
extends Serializable {
isParsing = true,
forTimestampNTZ = true)
+ private val ignoreCorruptFiles = options.ignoreCorruptFiles
+ private val ignoreMissingFiles = options.ignoreMissingFiles
+
private def handleJsonErrorsByParseMode(parseMode: ParseMode,
columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = {
parseMode match {
@@ -88,8 +92,7 @@ private[sql] class JsonInferSchema(options: JSONOptions)
extends Serializable {
Some(inferField(parser))
}
} catch {
- case e @ (_: RuntimeException | _: JsonProcessingException |
- _: MalformedInputException) =>
+ case e @ (_: JsonProcessingException | _: MalformedInputException) =>
handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord,
e)
case e: CharConversionException if options.encoding.isEmpty =>
val msg =
@@ -99,6 +102,13 @@ private[sql] class JsonInferSchema(options: JSONOptions)
extends Serializable {
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord,
wrappedCharException)
+ case e: FileNotFoundException if ignoreMissingFiles =>
+ logWarning("Skipped missing file", e)
+ Some(StructType(Nil))
+ case e: FileNotFoundException if !ignoreMissingFiles => throw e
+ case e @ (_: IOException | _: RuntimeException) if
ignoreCorruptFiles =>
+ logWarning("Skipped the rest of the content in the corrupted
file", e)
+ Some(StructType(Nil))
}
}.reduceOption(typeMerger).iterator
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
index 99d43953c4c..cf7c536bdae 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources.csv
+import java.io.{FileNotFoundException, IOException}
import java.nio.charset.{Charset, StandardCharsets}
import com.univocity.parsers.csv.CsvParser
@@ -168,7 +169,7 @@ object TextInputCSVDataSource extends CSVDataSource {
}
}
-object MultiLineCSVDataSource extends CSVDataSource {
+object MultiLineCSVDataSource extends CSVDataSource with Logging {
override val isSplitable: Boolean = false
override def readFile(
@@ -189,13 +190,26 @@ object MultiLineCSVDataSource extends CSVDataSource {
inputPaths: Seq[FileStatus],
parsedOptions: CSVOptions): StructType = {
val csv = createBaseRdd(sparkSession, inputPaths, parsedOptions)
+ val ignoreCorruptFiles = parsedOptions.ignoreCorruptFiles
+ val ignoreMissingFiles = parsedOptions.ignoreMissingFiles
csv.flatMap { lines =>
- val path = new Path(lines.getPath())
- UnivocityParser.tokenizeStream(
-
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
- shouldDropHeader = false,
- new CsvParser(parsedOptions.asParserSettings),
- encoding = parsedOptions.charset)
+ try {
+ val path = new Path(lines.getPath())
+ UnivocityParser.tokenizeStream(
+
CodecStreams.createInputStreamWithCloseResource(lines.getConfiguration, path),
+ shouldDropHeader = false,
+ new CsvParser(parsedOptions.asParserSettings),
+ encoding = parsedOptions.charset)
+ } catch {
+ case e: FileNotFoundException if ignoreMissingFiles =>
+ logWarning(s"Skipped missing file: ${lines.getPath()}", e)
+ Array.empty[Array[String]]
+ case e: FileNotFoundException if !ignoreMissingFiles => throw e
+ case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles
=>
+ logWarning(
+ s"Skipped the rest of the content in the corrupted file:
${lines.getPath()}", e)
+ Array.empty[Array[String]]
+ }
}.take(1).headOption match {
case Some(firstRow) =>
val caseSensitive =
sparkSession.sessionState.conf.caseSensitiveAnalysis
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
index 739f4c440be..2e3a4bbafb8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.execution.datasources
+import java.io.{ByteArrayOutputStream, File, FileOutputStream}
+import java.util.zip.GZIPOutputStream
+
import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite
import org.apache.spark.sql.{Dataset, Encoders,
FakeFileSystemRequiringDSOption, SparkSession}
@@ -60,4 +63,29 @@ trait CommonFileDataSourceSuite extends SQLHelper {
}
}
}
+
+ protected def withCorruptFile(f: File => Unit): Unit = {
+ val inputFile = File.createTempFile("input-", ".gz")
+ try {
+ // Create a corrupt gzip file
+ val byteOutput = new ByteArrayOutputStream()
+ val gzip = new GZIPOutputStream(byteOutput)
+ try {
+ gzip.write(Array[Byte](1, 2, 3, 4))
+ } finally {
+ gzip.close()
+ }
+ val bytes = byteOutput.toByteArray
+ val o = new FileOutputStream(inputFile)
+ try {
+ // It's corrupt since we only write half of bytes into the file.
+ o.write(bytes.take(bytes.length / 2))
+ } finally {
+ o.close()
+ }
+ f(inputFile)
+ } finally {
+ inputFile.delete()
+ }
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 7655635fc62..38fbf466882 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -17,14 +17,13 @@
package org.apache.spark.sql.execution.datasources.csv
-import java.io.{ByteArrayOutputStream, EOFException, File, FileOutputStream}
+import java.io.{EOFException, File}
import java.nio.charset.{Charset, StandardCharsets,
UnsupportedCharsetException}
import java.nio.file.{Files, StandardOpenOption}
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
import java.util.Locale
-import java.util.zip.GZIPOutputStream
import scala.jdk.CollectionConverters._
import scala.util.Properties
@@ -32,11 +31,12 @@ import scala.util.Properties
import com.univocity.parsers.common.TextParsingException
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.commons.lang3.time.FastDateFormat
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.logging.log4j.Level
-import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException,
SparkUpgradeException, TestUtils}
+import org.apache.spark.{SparkConf, SparkException,
SparkFileNotFoundException, SparkRuntimeException, SparkUpgradeException,
TestUtils}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders,
QueryTest, Row}
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
@@ -1447,37 +1447,47 @@ abstract class CSVSuite
}
}
- test("Enabling/disabling ignoreCorruptFiles") {
- val inputFile = File.createTempFile("input-", ".gz")
- try {
- // Create a corrupt gzip file
- val byteOutput = new ByteArrayOutputStream()
- val gzip = new GZIPOutputStream(byteOutput)
- try {
- gzip.write(Array[Byte](1, 2, 3, 4))
- } finally {
- gzip.close()
- }
- val bytes = byteOutput.toByteArray
- val o = new FileOutputStream(inputFile)
- try {
- // It's corrupt since we only write half of bytes into the file.
- o.write(bytes.take(bytes.length / 2))
- } finally {
- o.close()
- }
+ test("Enabling/disabling ignoreCorruptFiles/ignoreMissingFiles") {
+ withCorruptFile(inputFile => {
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
val e = intercept[SparkException] {
spark.read.csv(inputFile.toURI.toString).collect()
}
assert(e.getCause.getCause.isInstanceOf[EOFException])
assert(e.getCause.getCause.getMessage === "Unexpected end of input
stream")
+ val e2 = intercept[SparkException] {
+ spark.read.option("multiLine",
true).csv(inputFile.toURI.toString).collect()
+ }
+ assert(e2.getCause.getCause.getCause.isInstanceOf[EOFException])
+ assert(e2.getCause.getCause.getCause.getMessage === "Unexpected end of
input stream")
}
withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
assert(spark.read.csv(inputFile.toURI.toString).collect().isEmpty)
+ assert(spark.read.option("multiLine",
true).csv(inputFile.toURI.toString).collect()
+ .isEmpty)
+ }
+ })
+ withTempPath { dir =>
+ val csvPath = new Path(dir.getCanonicalPath, "csv")
+ val fs = csvPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+ sampledTestData.write.csv(csvPath.toString)
+ val df = spark.read.option("multiLine", true).csv(csvPath.toString)
+ fs.delete(csvPath, true)
+ withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
+ val e = intercept[SparkException] {
+ df.collect()
+ }
+ assert(e.getCause.isInstanceOf[SparkFileNotFoundException])
+ assert(e.getCause.getMessage.contains(".csv does not exist"))
+ }
+
+ sampledTestData.write.csv(csvPath.toString)
+ val df2 = spark.read.option("multiLine", true).csv(csvPath.toString)
+ fs.delete(csvPath, true)
+ withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
+ assert(df2.collect().isEmpty)
}
- } finally {
- inputFile.delete()
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 11779286ec2..5096b241f56 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
-import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException,
SparkUpgradeException, TestUtils}
+import org.apache.spark.{SparkConf, SparkException,
SparkFileNotFoundException, SparkRuntimeException, SparkUpgradeException,
TestUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{functions => F, _}
import org.apache.spark.sql.catalyst.json._
@@ -1913,6 +1913,50 @@ abstract class JsonSuite
}
}
+ test("SPARK-45035: json enable ignoreCorruptFiles/ignoreMissingFiles") {
+ withCorruptFile(inputFile => {
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "false") {
+ val e = intercept[SparkException] {
+ spark.read.json(inputFile.toURI.toString).collect()
+ }
+ assert(e.getCause.getCause.isInstanceOf[EOFException])
+ assert(e.getCause.getCause.getMessage === "Unexpected end of input
stream")
+ val e2 = intercept[SparkException] {
+ spark.read.option("multiLine",
true).json(inputFile.toURI.toString).collect()
+ }
+ assert(e2.getCause.isInstanceOf[EOFException])
+ assert(e2.getCause.getMessage === "Unexpected end of input stream")
+ }
+ withSQLConf(SQLConf.IGNORE_CORRUPT_FILES.key -> "true") {
+ assert(spark.read.json(inputFile.toURI.toString).collect().isEmpty)
+ assert(spark.read.option("multiLine",
true).json(inputFile.toURI.toString).collect()
+ .isEmpty)
+ }
+ })
+ withTempPath { dir =>
+ val jsonPath = new Path(dir.getCanonicalPath, "json")
+ val fs = jsonPath.getFileSystem(spark.sessionState.newHadoopConf())
+
+ sampledTestData.write.json(jsonPath.toString)
+ val df = spark.read.option("multiLine", true).json(jsonPath.toString)
+ fs.delete(jsonPath, true)
+ withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "false") {
+ val e = intercept[SparkException] {
+ df.collect()
+ }
+ assert(e.getCause.isInstanceOf[SparkFileNotFoundException])
+ assert(e.getCause.getMessage.contains(".json does not exist"))
+ }
+
+ sampledTestData.write.json(jsonPath.toString)
+ val df2 = spark.read.option("multiLine", true).json(jsonPath.toString)
+ fs.delete(jsonPath, true)
+ withSQLConf(SQLConf.IGNORE_MISSING_FILES.key -> "true") {
+ assert(df2.collect().isEmpty)
+ }
+ }
+ }
+
test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") {
withTempPath { dir =>
val path = dir.getCanonicalPath
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]