This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 624eda5030eb [SPARK-49444][SQL] Modified UnivocityParser to throw
runtime exceptions caused by ArrayIndexOutOfBounds with more user-oriented
messages
624eda5030eb is described below
commit 624eda5030eb3a4a426a1c225952af40dba30d1e
Author: Vladan Vasić <[email protected]>
AuthorDate: Thu Sep 26 23:00:22 2024 +0800
[SPARK-49444][SQL] Modified UnivocityParser to throw runtime exceptions
caused by ArrayIndexOutOfBounds with more user-oriented messages
### What changes were proposed in this pull request?
I propose to catch and rethrow runtime `ArrayIndexOutOfBounds` exceptions
in the `UnivocityParser` class - `parse` method, but with more user-oriented
messages. Instead of throwing exceptions in the original format, I propose to
inform the users which csv record caused the error.
### Why are the changes needed?
Proper informing of users' errors improves user experience. Instead of
throwing `ArrayIndexOutOfBounds` exception without clear reason why it
happened, proposed changes throw `SparkRuntimeException` with the message that
includes original csv line which caused the error.
### Does this PR introduce _any_ user-facing change?
This PR introduces a user-facing change which happens when
`UnivocityParser` parses malformed csv line with from the input. More
specifically, the change is reproduces in the test case within
`UnivocityParserSuite` when user specifies `maxColumns` in parser options and
parsed csv record has more columns. Instead of resulting in
`ArrayIndexOutOfBounds` like mentioned in the HMR ticket, users now get
`SparkRuntimeException` with message that contains the input line which caused
the error.
### How was this patch tested?
This patch was tested in `UnivocityParserSuite`. Test named "Array index
out of bounds when parsing CSV with more columns than expected" covers this
patch. Additionally, test for bad records in `UnivocityParser`'s `PERMISSIVE`
mode is added to confirm that `BadRecordException` is being thrown properly.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47906 from
vladanvasi-db/vladanvasi-db/univocity-parser-index-out-of-bounds-handling.
Authored-by: Vladan Vasić <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/csv/UnivocityParser.scala | 19 +++++++++--
.../sql/catalyst/csv/UnivocityParserSuite.scala | 39 ++++++++++++++++++++--
.../src/test/resources/test-data/more-columns.csv | 1 +
.../org/apache/spark/sql/CsvFunctionsSuite.scala | 5 +--
.../sql/execution/datasources/csv/CSVSuite.scala | 34 +++++++++++++++++++
5 files changed, 92 insertions(+), 6 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index ccc8f30a9a9c..0fd0601803a6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -21,9 +21,10 @@ import java.io.InputStream
import scala.util.control.NonFatal
+import com.univocity.parsers.common.TextParsingException
import com.univocity.parsers.csv.CsvParser
-import org.apache.spark.SparkUpgradeException
+import org.apache.spark.{SparkRuntimeException, SparkUpgradeException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters}
import org.apache.spark.sql.catalyst.expressions.{ExprUtils,
GenericInternalRow}
@@ -294,6 +295,20 @@ class UnivocityParser(
}
}
+ private def parseLine(line: String): Array[String] = {
+ try {
+ tokenizer.parseLine(line)
+ }
+ catch {
+ case e: TextParsingException if
e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
+ throw new SparkRuntimeException(
+ errorClass = "MALFORMED_CSV_RECORD",
+ messageParameters = Map("badRecord" -> line),
+ cause = e
+ )
+ }
+ }
+
/**
* Parses a single CSV string and turns it into either one resulting row or
no row (if the
* the record is malformed).
@@ -306,7 +321,7 @@ class UnivocityParser(
(_: String) => Some(InternalRow.empty)
} else {
// parse if the columnPruning is disabled or requiredSchema is nonEmpty
- (input: String) => convert(tokenizer.parseLine(input))
+ (input: String) => convert(parseLine(input))
}
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
index 514b529ea8cc..7974bf68bdd3 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
@@ -23,12 +23,12 @@ import java.util.{Locale, TimeZone}
import org.apache.commons.lang3.time.FastDateFormat
-import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException}
+import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException,
SparkRuntimeException}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.sources.{EqualTo, Filter, StringStartsWith}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -323,6 +323,41 @@ class UnivocityParserSuite extends SparkFunSuite with
SQLHelper {
parameters = Map("fieldName" -> "`i`", "fields" -> ""))
}
+ test("Bad records test in permissive mode") {
+ def checkBadRecord(
+ input: String = "1,a",
+ dataSchema: StructType = StructType.fromDDL("i INTEGER, s STRING, d
DOUBLE"),
+ requiredSchema: StructType = StructType.fromDDL("i INTEGER, s STRING"),
+ options: Map[String, String] = Map("mode" -> "PERMISSIVE")):
BadRecordException = {
+ val csvOptions = new CSVOptions(options, false, "UTC")
+ val parser = new UnivocityParser(dataSchema, requiredSchema, csvOptions,
Seq())
+ intercept[BadRecordException] {
+ parser.parse(input)
+ }
+ }
+
+ // Bad record exception caused by conversion error
+ checkBadRecord(input = "1.5,a,10.3")
+
+ // Bad record exception caused by insufficient number of columns
+ checkBadRecord(input = "2")
+ }
+
+ test("Array index out of bounds when parsing CSV with more columns than
expected") {
+ val input = "1,string,3.14,5,7"
+ val dataSchema: StructType = StructType.fromDDL("i INTEGER, a STRING")
+ val requiredSchema: StructType = StructType.fromDDL("i INTEGER, a STRING")
+ val options = new CSVOptions(Map("maxColumns" -> "2"), false, "UTC")
+ val filters = Seq()
+ val parser = new UnivocityParser(dataSchema, requiredSchema, options,
filters)
+ checkError(
+ exception = intercept[SparkRuntimeException] {
+ parser.parse(input)
+ },
+ condition = "MALFORMED_CSV_RECORD",
+ parameters = Map("badRecord" -> "1,string,3.14,5,7"))
+ }
+
test("SPARK-30960: parse date/timestamp string with legacy format") {
def check(parser: UnivocityParser): Unit = {
// The legacy format allows 1 or 2 chars for some fields.
diff --git a/sql/core/src/test/resources/test-data/more-columns.csv
b/sql/core/src/test/resources/test-data/more-columns.csv
new file mode 100644
index 000000000000..06db38f0a145
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/more-columns.csv
@@ -0,0 +1 @@
+1,3.14,string,5,7
\ No newline at end of file
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index 6589282fd3a5..e6907b865648 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -24,7 +24,8 @@ import java.util.Locale
import scala.jdk.CollectionConverters._
-import org.apache.spark.{SparkException, SparkUnsupportedOperationException,
SparkUpgradeException}
+import org.apache.spark.{SparkException, SparkRuntimeException,
+ SparkUnsupportedOperationException, SparkUpgradeException}
import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@@ -234,7 +235,7 @@ class CsvFunctionsSuite extends QueryTest with
SharedSparkSession {
val schema = new StructType().add("str", StringType)
val options = Map("maxCharsPerColumn" -> "2")
- val exception = intercept[SparkException] {
+ val exception = intercept[SparkRuntimeException] {
df.select(from_csv($"value", schema, options)).collect()
}.getCause.getMessage
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index e2d1d9b05c3c..023f401516dc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -85,6 +85,7 @@ abstract class CSVSuite
private val badAfterGoodFile = "test-data/bad_after_good.csv"
private val malformedRowFile = "test-data/malformedRow.csv"
private val charFile = "test-data/char.csv"
+ private val moreColumnsFile = "test-data/more-columns.csv"
/** Verifies data and schema. */
private def verifyCars(
@@ -3439,6 +3440,39 @@ abstract class CSVSuite
expected)
}
}
+
+ test("SPARK-49444: CSV parsing failure with more than max columns") {
+ val schema = new StructType()
+ .add("intColumn", IntegerType, nullable = true)
+ .add("decimalColumn", DecimalType(10, 2), nullable = true)
+
+ val fileReadException = intercept[SparkException] {
+ spark
+ .read
+ .schema(schema)
+ .option("header", "false")
+ .option("maxColumns", "2")
+ .csv(testFile(moreColumnsFile))
+ .collect()
+ }
+
+ checkErrorMatchPVals(
+ exception = fileReadException,
+ condition = "FAILED_READ_FILE.NO_HINT",
+ parameters = Map("path" -> s".*$moreColumnsFile"))
+
+ val malformedCSVException =
fileReadException.getCause.asInstanceOf[SparkRuntimeException]
+
+ checkError(
+ exception = malformedCSVException,
+ condition = "MALFORMED_CSV_RECORD",
+ parameters = Map("badRecord" -> "1,3.14,string,5,7"),
+ sqlState = "KD000")
+
+ assert(malformedCSVException.getCause.isInstanceOf[TextParsingException])
+ val textParsingException =
malformedCSVException.getCause.asInstanceOf[TextParsingException]
+
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
+ }
}
class CSVv1Suite extends CSVSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]