This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 624eda5030eb [SPARK-49444][SQL] Modified UnivocityParser to throw 
runtime exceptions caused by ArrayIndexOutOfBounds with more user-oriented 
messages
624eda5030eb is described below

commit 624eda5030eb3a4a426a1c225952af40dba30d1e
Author: Vladan Vasić <[email protected]>
AuthorDate: Thu Sep 26 23:00:22 2024 +0800

    [SPARK-49444][SQL] Modified UnivocityParser to throw runtime exceptions 
caused by ArrayIndexOutOfBounds with more user-oriented messages
    
    ### What changes were proposed in this pull request?
    
    I propose to catch and rethrow runtime `ArrayIndexOutOfBounds` exceptions 
in the `UnivocityParser` class - `parse` method, but with more user-oriented 
messages. Instead of throwing exceptions in the original format, I propose to 
inform the users which csv record caused the error.
    
    ### Why are the changes needed?
    
    Proper informing of users' errors improves user experience. Instead of 
throwing `ArrayIndexOutOfBounds` exception without clear reason why it 
happened, proposed changes throw `SparkRuntimeException` with the message that 
includes original csv line which caused the error.
    
    ### Does this PR introduce _any_ user-facing change?
    
    This PR introduces a user-facing change which happens when 
`UnivocityParser` parses malformed csv line with from the input. More 
specifically, the change is reproduces in the test case within 
`UnivocityParserSuite` when user specifies `maxColumns` in parser options and 
parsed csv record has more columns. Instead of resulting in 
`ArrayIndexOutOfBounds` like mentioned in the HMR ticket, users now get 
`SparkRuntimeException` with message that contains the input line which caused 
the error.
    
    ### How was this patch tested?
    
    This patch was tested in `UnivocityParserSuite`. Test named "Array index 
out of bounds when parsing CSV with more columns than expected" covers this 
patch. Additionally, test for bad records in `UnivocityParser`'s `PERMISSIVE` 
mode is added to confirm that `BadRecordException` is being thrown properly.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #47906 from 
vladanvasi-db/vladanvasi-db/univocity-parser-index-out-of-bounds-handling.
    
    Authored-by: Vladan Vasić <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/csv/UnivocityParser.scala   | 19 +++++++++--
 .../sql/catalyst/csv/UnivocityParserSuite.scala    | 39 ++++++++++++++++++++--
 .../src/test/resources/test-data/more-columns.csv  |  1 +
 .../org/apache/spark/sql/CsvFunctionsSuite.scala   |  5 +--
 .../sql/execution/datasources/csv/CSVSuite.scala   | 34 +++++++++++++++++++
 5 files changed, 92 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index ccc8f30a9a9c..0fd0601803a6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -21,9 +21,10 @@ import java.io.InputStream
 
 import scala.util.control.NonFatal
 
+import com.univocity.parsers.common.TextParsingException
 import com.univocity.parsers.csv.CsvParser
 
-import org.apache.spark.SparkUpgradeException
+import org.apache.spark.{SparkRuntimeException, SparkUpgradeException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters}
 import org.apache.spark.sql.catalyst.expressions.{ExprUtils, 
GenericInternalRow}
@@ -294,6 +295,20 @@ class UnivocityParser(
     }
   }
 
+  private def parseLine(line: String): Array[String] = {
+    try {
+      tokenizer.parseLine(line)
+    }
+    catch {
+      case e: TextParsingException if 
e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
+        throw new SparkRuntimeException(
+          errorClass = "MALFORMED_CSV_RECORD",
+          messageParameters = Map("badRecord" -> line),
+          cause = e
+        )
+    }
+  }
+
   /**
    * Parses a single CSV string and turns it into either one resulting row or 
no row (if the
    * the record is malformed).
@@ -306,7 +321,7 @@ class UnivocityParser(
       (_: String) => Some(InternalRow.empty)
     } else {
       // parse if the columnPruning is disabled or requiredSchema is nonEmpty
-      (input: String) => convert(tokenizer.parseLine(input))
+      (input: String) => convert(parseLine(input))
     }
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
index 514b529ea8cc..7974bf68bdd3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/csv/UnivocityParserSuite.scala
@@ -23,12 +23,12 @@ import java.util.{Locale, TimeZone}
 
 import org.apache.commons.lang3.time.FastDateFormat
 
-import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException}
+import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException, 
SparkRuntimeException}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants._
 import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.sources.{EqualTo, Filter, StringStartsWith}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -323,6 +323,41 @@ class UnivocityParserSuite extends SparkFunSuite with 
SQLHelper {
       parameters = Map("fieldName" -> "`i`", "fields" -> ""))
   }
 
+  test("Bad records test in permissive mode") {
+    def checkBadRecord(
+      input: String = "1,a",
+      dataSchema: StructType = StructType.fromDDL("i INTEGER, s STRING, d 
DOUBLE"),
+      requiredSchema: StructType = StructType.fromDDL("i INTEGER, s STRING"),
+      options: Map[String, String] = Map("mode" -> "PERMISSIVE")): 
BadRecordException = {
+      val csvOptions = new CSVOptions(options, false, "UTC")
+      val parser = new UnivocityParser(dataSchema, requiredSchema, csvOptions, 
Seq())
+      intercept[BadRecordException] {
+        parser.parse(input)
+      }
+    }
+
+    // Bad record exception caused by conversion error
+    checkBadRecord(input = "1.5,a,10.3")
+
+    // Bad record exception caused by insufficient number of columns
+    checkBadRecord(input = "2")
+  }
+
+  test("Array index out of bounds when parsing CSV with more columns than 
expected") {
+    val input = "1,string,3.14,5,7"
+    val dataSchema: StructType = StructType.fromDDL("i INTEGER, a STRING")
+    val requiredSchema: StructType = StructType.fromDDL("i INTEGER, a STRING")
+    val options = new CSVOptions(Map("maxColumns" -> "2"), false, "UTC")
+    val filters = Seq()
+    val parser = new UnivocityParser(dataSchema, requiredSchema, options, 
filters)
+    checkError(
+      exception = intercept[SparkRuntimeException] {
+        parser.parse(input)
+      },
+      condition = "MALFORMED_CSV_RECORD",
+      parameters = Map("badRecord" -> "1,string,3.14,5,7"))
+  }
+
   test("SPARK-30960: parse date/timestamp string with legacy format") {
     def check(parser: UnivocityParser): Unit = {
       // The legacy format allows 1 or 2 chars for some fields.
diff --git a/sql/core/src/test/resources/test-data/more-columns.csv 
b/sql/core/src/test/resources/test-data/more-columns.csv
new file mode 100644
index 000000000000..06db38f0a145
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/more-columns.csv
@@ -0,0 +1 @@
+1,3.14,string,5,7
\ No newline at end of file
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
index 6589282fd3a5..e6907b865648 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala
@@ -24,7 +24,8 @@ import java.util.Locale
 
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.{SparkException, SparkUnsupportedOperationException, 
SparkUpgradeException}
+import org.apache.spark.{SparkException, SparkRuntimeException,
+  SparkUnsupportedOperationException, SparkUpgradeException}
 import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -234,7 +235,7 @@ class CsvFunctionsSuite extends QueryTest with 
SharedSparkSession {
     val schema = new StructType().add("str", StringType)
     val options = Map("maxCharsPerColumn" -> "2")
 
-    val exception = intercept[SparkException] {
+    val exception = intercept[SparkRuntimeException] {
       df.select(from_csv($"value", schema, options)).collect()
     }.getCause.getMessage
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index e2d1d9b05c3c..023f401516dc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -85,6 +85,7 @@ abstract class CSVSuite
   private val badAfterGoodFile = "test-data/bad_after_good.csv"
   private val malformedRowFile = "test-data/malformedRow.csv"
   private val charFile = "test-data/char.csv"
+  private val moreColumnsFile = "test-data/more-columns.csv"
 
   /** Verifies data and schema. */
   private def verifyCars(
@@ -3439,6 +3440,39 @@ abstract class CSVSuite
         expected)
     }
   }
+
+  test("SPARK-49444: CSV parsing failure with more than max columns") {
+    val schema = new StructType()
+      .add("intColumn", IntegerType, nullable = true)
+      .add("decimalColumn", DecimalType(10, 2), nullable = true)
+
+    val fileReadException = intercept[SparkException] {
+      spark
+        .read
+        .schema(schema)
+        .option("header", "false")
+        .option("maxColumns", "2")
+        .csv(testFile(moreColumnsFile))
+        .collect()
+    }
+
+    checkErrorMatchPVals(
+      exception = fileReadException,
+      condition = "FAILED_READ_FILE.NO_HINT",
+      parameters = Map("path" -> s".*$moreColumnsFile"))
+
+    val malformedCSVException = 
fileReadException.getCause.asInstanceOf[SparkRuntimeException]
+
+    checkError(
+      exception = malformedCSVException,
+      condition = "MALFORMED_CSV_RECORD",
+      parameters = Map("badRecord" -> "1,3.14,string,5,7"),
+      sqlState = "KD000")
+
+    assert(malformedCSVException.getCause.isInstanceOf[TextParsingException])
+    val textParsingException = 
malformedCSVException.getCause.asInstanceOf[TextParsingException]
+    
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
+  }
 }
 
 class CSVv1Suite extends CSVSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to