This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6ed03dceafd [SPARK-41572][SQL] Assign name to _LEGACY_ERROR_TEMP_2149
6ed03dceafd is described below
commit 6ed03dceafd5b677b083ae87c44d2c2d2f8c94d3
Author: itholic <[email protected]>
AuthorDate: Wed Jan 18 17:21:57 2023 +0800
[SPARK-41572][SQL] Assign name to _LEGACY_ERROR_TEMP_2149
### What changes were proposed in this pull request?
This PR proposes to assign name to _LEGACY_ERROR_TEMP_2149,
"MALFORMED_CSV_RECORD".
### Why are the changes needed?
We should assign proper name to _LEGACY_ERROR_TEMP_*
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Fix UT & run `./build/sbt "sql/testOnly
org.apache.spark.sql.SQLQueryTestSuite*`
Closes #39258 from itholic/LEGACY_2149.
Authored-by: itholic <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
core/src/main/resources/error/error-classes.json | 10 ++---
.../spark/sql/catalyst/csv/UnivocityParser.scala | 8 ++--
.../spark/sql/errors/QueryExecutionErrors.scala | 6 +--
.../sql/execution/datasources/csv/CSVSuite.scala | 48 ++++++++++++++++++++--
4 files changed, 57 insertions(+), 15 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json
b/core/src/main/resources/error/error-classes.json
index 465aa162981..3f0abe77d0b 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -941,6 +941,11 @@
],
"sqlState" : "42710"
},
+ "MALFORMED_CSV_RECORD" : {
+ "message" : [
+ "Malformed CSV record: <badRecord>"
+ ]
+ },
"MALFORMED_PROTOBUF_MESSAGE" : {
"message" : [
"Malformed Protobuf messages are detected in message deserialization.
Parse Mode: <failFastMode>. To process malformed protobuf message as null
result, try setting the option 'mode' as 'PERMISSIVE'."
@@ -4226,11 +4231,6 @@
"null value found but field <name> is not nullable."
]
},
- "_LEGACY_ERROR_TEMP_2149" : {
- "message" : [
- "Malformed CSV record"
- ]
- },
"_LEGACY_ERROR_TEMP_2150" : {
"message" : [
"Due to Scala's limited support of tuple, tuple with more than 22
elements are not supported."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 8464e394ab5..42e03630b14 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -319,15 +319,17 @@ class UnivocityParser(
throw BadRecordException(
() => getCurrentInput,
() => None,
- QueryExecutionErrors.malformedCSVRecordError())
+ QueryExecutionErrors.malformedCSVRecordError(""))
}
+ val currentInput = getCurrentInput
+
var badRecordException: Option[Throwable] = if (tokens.length !=
parsedSchema.length) {
// If the number of tokens doesn't match the schema, we should treat it
as a malformed record.
// However, we still have chance to parse some of the tokens. It
continues to parses the
// tokens normally and sets null when `ArrayIndexOutOfBoundsException`
occurs for missing
// tokens.
- Some(QueryExecutionErrors.malformedCSVRecordError())
+ Some(QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
} else None
// When the length of the returned tokens is identical to the length of
the parsed schema,
// we just need to:
@@ -360,7 +362,7 @@ class UnivocityParser(
} else {
if (badRecordException.isDefined) {
throw BadRecordException(
- () => getCurrentInput, () => requiredRow.headOption,
badRecordException.get)
+ () => currentInput, () => requiredRow.headOption,
badRecordException.get)
} else {
requiredRow
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 8634f60e34e..b89c624870e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1528,10 +1528,10 @@ private[sql] object QueryExecutionErrors extends
QueryErrorsBase {
"name" -> name))
}
- def malformedCSVRecordError(): SparkRuntimeException = {
+ def malformedCSVRecordError(badRecord: String): SparkRuntimeException = {
new SparkRuntimeException(
- errorClass = "_LEGACY_ERROR_TEMP_2149",
- messageParameters = Map.empty)
+ errorClass = "MALFORMED_CSV_RECORD",
+ messageParameters = Map("badRecord" -> badRecord))
}
def elementsOfTupleExceedLimitError(): SparkUnsupportedOperationException = {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index dbeadeb949a..b458a0e1b08 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -36,7 +36,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType
import org.apache.hadoop.io.compress.GzipCodec
import org.apache.logging.log4j.Level
-import org.apache.spark.{SparkConf, SparkException, SparkUpgradeException,
TestUtils}
+import org.apache.spark.{SparkConf, SparkException, SparkRuntimeException,
SparkUpgradeException, TestUtils}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, Encoders,
QueryTest, Row}
import org.apache.spark.sql.catalyst.csv.CSVOptions
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
@@ -56,7 +56,7 @@ abstract class CSVSuite
override protected def dataSourceFormat = "csv"
- private val carsFile = "test-data/cars.csv"
+ protected val carsFile = "test-data/cars.csv"
private val carsMalformedFile = "test-data/cars-malformed.csv"
private val carsFile8859 = "test-data/cars_iso-8859-1.csv"
private val carsTsvFile = "test-data/cars.tsv"
@@ -370,8 +370,11 @@ abstract class CSVSuite
.load(testFile(carsFile)).collect()
}
- assert(exception.getMessage.contains("Malformed CSV record"))
-
assert(ExceptionUtils.getRootCause(exception).isInstanceOf[RuntimeException])
+ checkError(
+ exception =
ExceptionUtils.getRootCause(exception).asInstanceOf[SparkRuntimeException],
+ errorClass = "MALFORMED_CSV_RECORD",
+ parameters = Map("badRecord" -> "2015,Chevy,Volt")
+ )
}
}
@@ -3138,6 +3141,24 @@ class CSVv1Suite extends CSVSuite {
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "csv")
+
+ test("test for FAILFAST parsing mode on CSV v1") {
+ Seq(false, true).foreach { multiLine =>
+ val exception = intercept[SparkException] {
+ spark.read
+ .format("csv")
+ .option("multiLine", multiLine)
+ .options(Map("header" -> "true", "mode" -> "failfast"))
+ .load(testFile(carsFile)).collect()
+ }
+
+ checkError(
+ exception = exception.getCause.asInstanceOf[SparkException],
+ errorClass = "_LEGACY_ERROR_TEMP_2177",
+ parameters = Map("failFastMode" -> "FAILFAST")
+ )
+ }
+ }
}
class CSVv2Suite extends CSVSuite {
@@ -3145,6 +3166,25 @@ class CSVv2Suite extends CSVSuite {
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "")
+
+ test("test for FAILFAST parsing mode on CSV v2") {
+ Seq(false, true).foreach { multiLine =>
+ val exception = intercept[SparkException] {
+ spark.read
+ .format("csv")
+ .option("multiLine", multiLine)
+ .options(Map("header" -> "true", "mode" -> "failfast"))
+ .load(testFile(carsFile)).collect()
+ }
+
+ checkError(
+ exception = exception.getCause.asInstanceOf[SparkException],
+ errorClass = "_LEGACY_ERROR_TEMP_2064",
+ parameters = Map("path" -> s".*$carsFile"),
+ matchPVals = true
+ )
+ }
+ }
}
class CSVLegacyTimeParserSuite extends CSVSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]