This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ed03dceafd [SPARK-41572][SQL] Assign name to _LEGACY_ERROR_TEMP_2149
6ed03dceafd is described below

commit 6ed03dceafd5b677b083ae87c44d2c2d2f8c94d3
Author: itholic <[email protected]>
AuthorDate: Wed Jan 18 17:21:57 2023 +0800

    [SPARK-41572][SQL] Assign name to _LEGACY_ERROR_TEMP_2149
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to assign name to _LEGACY_ERROR_TEMP_2149, 
"MALFORMED_CSV_RECORD".
    
    ### Why are the changes needed?
    
    We should assign proper name to _LEGACY_ERROR_TEMP_*
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Fix UT & run `./build/sbt "sql/testOnly 
org.apache.spark.sql.SQLQueryTestSuite*`
    
    Closes #39258 from itholic/LEGACY_2149.
    
    Authored-by: itholic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 core/src/main/resources/error/error-classes.json   | 10 ++---
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  8 ++--
 .../spark/sql/errors/QueryExecutionErrors.scala    |  6 +--
 .../sql/execution/datasources/csv/CSVSuite.scala   | 48 ++++++++++++++++++++--
 4 files changed, 57 insertions(+), 15 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 465aa162981..3f0abe77d0b 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -941,6 +941,11 @@
     ],
     "sqlState" : "42710"
   },
+  "MALFORMED_CSV_RECORD" : {
+    "message" : [
+      "Malformed CSV record: <badRecord>"
+    ]
+  },
   "MALFORMED_PROTOBUF_MESSAGE" : {
     "message" : [
       "Malformed Protobuf messages are detected in message deserialization. 
Parse Mode: <failFastMode>. To process malformed protobuf message as null 
result, try setting the option 'mode' as 'PERMISSIVE'."
@@ -4226,11 +4231,6 @@
       "null value found but field <name> is not nullable."
     ]
   },
-  "_LEGACY_ERROR_TEMP_2149" : {
-    "message" : [
-      "Malformed CSV record"
-    ]
-  },
   "_LEGACY_ERROR_TEMP_2150" : {
     "message" : [
       "Due to Scala's limited support of tuple, tuple with more than 22 
elements are not supported."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 8464e394ab5..42e03630b14 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -319,15 +319,17 @@ class UnivocityParser(
       throw BadRecordException(
         () => getCurrentInput,
         () => None,
-        QueryExecutionErrors.malformedCSVRecordError())
+        QueryExecutionErrors.malformedCSVRecordError(""))
     }
 
+    val currentInput = getCurrentInput
+
     var badRecordException: Option[Throwable] = if (tokens.length != 
parsedSchema.length) {
       // If the number of tokens doesn't match the schema, we should treat it 
as a malformed record.
       // However, we still have chance to parse some of the tokens. It 
continues to parses the
       // tokens normally and sets null when `ArrayIndexOutOfBoundsException` 
occurs for missing
       // tokens.
-      Some(QueryExecutionErrors.malformedCSVRecordError())
+      Some(QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
     } else None
     // When the length of the returned tokens is identical to the length of 
the parsed schema,
     // we just need to:
@@ -360,7 +362,7 @@ class UnivocityParser(
     } else {
       if (badRecordException.isDefined) {
         throw BadRecordException(
-          () => getCurrentInput, () => requiredRow.headOption, 
badRecordException.get)
+          () => currentInput, () => requiredRow.headOption, 
badRecordException.get)
       } else {
         requiredRow
       }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 8634f60e34e..b89c624870e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1528,10 +1528,10 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
         "name" -> name))
   }
 
-  def malformedCSVRecordError(): SparkRuntimeException = {
+  def malformedCSVRecordError(badRecord: String): SparkRuntimeException = {
     new SparkRuntimeException(
-      errorClass = "_LEGACY_ERROR_TEMP_2149",
-      messageParameters = Map.empty)
+      errorClass = "MALFORMED_CSV_RECORD",
+      messageParameters = Map("badRecord" -> badRecord))
   }
 
   def elementsOfTupleExceedLimitError(): SparkUnsupportedOperationException = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index dbeadeb949a..b458a0e1b08 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -36,7 +36,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
 import org.apache.hadoop.io.compress.GzipCodec
 import org.apache.logging.log4j.Level
 
-import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException, 
TestUtils}
+import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException, 
SparkUpgradeException, TestUtils}
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders, 
QueryTest, Row}
 import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
@@ -56,7 +56,7 @@ abstract class CSVSuite
 
   override protected def dataSourceFormat = "csv"
 
-  private val carsFile = "test-data/cars.csv"
+  protected val carsFile = "test-data/cars.csv"
   private val carsMalformedFile = "test-data/cars-malformed.csv"
   private val carsFile8859 = "test-data/cars_iso-8859-1.csv"
   private val carsTsvFile = "test-data/cars.tsv"
@@ -370,8 +370,11 @@ abstract class CSVSuite
           .load(testFile(carsFile)).collect()
       }
 
-      assert(exception.getMessage.contains("Malformed CSV record"))
-      
assert(ExceptionUtils.getRootCause(exception).isInstanceOf[RuntimeException])
+      checkError(
+        exception = 
ExceptionUtils.getRootCause(exception).asInstanceOf[SparkRuntimeException],
+        errorClass = "MALFORMED_CSV_RECORD",
+        parameters = Map("badRecord" -> "2015,Chevy,Volt")
+      )
     }
   }
 
@@ -3138,6 +3141,24 @@ class CSVv1Suite extends CSVSuite {
     super
       .sparkConf
       .set(SQLConf.USE_V1_SOURCE_LIST, "csv")
+
+  test("test for FAILFAST parsing mode on CSV v1") {
+    Seq(false, true).foreach { multiLine =>
+      val exception = intercept[SparkException] {
+        spark.read
+          .format("csv")
+          .option("multiLine", multiLine)
+          .options(Map("header" -> "true", "mode" -> "failfast"))
+          .load(testFile(carsFile)).collect()
+      }
+
+      checkError(
+        exception = exception.getCause.asInstanceOf[SparkException],
+        errorClass = "_LEGACY_ERROR_TEMP_2177",
+        parameters = Map("failFastMode" -> "FAILFAST")
+      )
+    }
+  }
 }
 
 class CSVv2Suite extends CSVSuite {
@@ -3145,6 +3166,25 @@ class CSVv2Suite extends CSVSuite {
     super
       .sparkConf
       .set(SQLConf.USE_V1_SOURCE_LIST, "")
+
+  test("test for FAILFAST parsing mode on CSV v2") {
+    Seq(false, true).foreach { multiLine =>
+      val exception = intercept[SparkException] {
+        spark.read
+          .format("csv")
+          .option("multiLine", multiLine)
+          .options(Map("header" -> "true", "mode" -> "failfast"))
+          .load(testFile(carsFile)).collect()
+      }
+
+      checkError(
+        exception = exception.getCause.asInstanceOf[SparkException],
+        errorClass = "_LEGACY_ERROR_TEMP_2064",
+        parameters = Map("path" -> s".*$carsFile"),
+        matchPVals = true
+      )
+    }
+  }
 }
 
 class CSVLegacyTimeParserSuite extends CSVSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to