This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 801d6a9 [SPARK-31261][SQL] Avoid npe when reading bad csv input with
`columnNameCorruptRecord` specified
801d6a9 is described below
commit 801d6a92d958f7b9762466e3c6643e54c48eb3a2
Author: Zhenhua Wang <[email protected]>
AuthorDate: Sun Mar 29 13:30:14 2020 +0900
[SPARK-31261][SQL] Avoid npe when reading bad csv input with
`columnNameCorruptRecord` specified
### What changes were proposed in this pull request?
SPARK-25387 avoids npe for bad csv input, but when reading bad csv input
with `columnNameCorruptRecord` specified, `getCurrentInput` is called and it
still throws npe.
### Why are the changes needed?
Bug fix.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Add a test.
Closes #28029 from wzhfy/corrupt_column_npe.
Authored-by: Zhenhua Wang <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 791d2ba346f3358fc280adbbbe27f2cd50fd3732)
Signed-off-by: HyukjinKwon <[email protected]>
---
.../sql/execution/datasources/csv/UnivocityParser.scala | 3 ++-
.../spark/sql/execution/datasources/csv/CSVSuite.scala | 14 ++++++++++++++
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
index e847e40..5579e95 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala
@@ -82,7 +82,8 @@ class UnivocityParser(
// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
-
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
+ val currentContent = tokenizer.getContext.currentParsedContent()
+ if (currentContent == null) null else
UTF8String.fromString(currentContent.stripLineEnd)
}
// This parser first picks some tokens from the input tokens, according to
the required schema,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 2ea8f4f..866d8de 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -1822,6 +1822,20 @@ class CSVSuite extends QueryTest with SharedSQLContext
with SQLTestUtils with Te
assert(spark.read.csv(input).collect().toSet == Set(Row()))
}
+ test("SPARK-31261: bad csv input with `columnNameCorruptRecord` should not
cause NPE") {
+ val schema = StructType(
+ StructField("a", IntegerType) :: StructField("_corrupt_record",
StringType) :: Nil)
+ val input = spark.createDataset(Seq("\u0000\u0000\u0001234"))
+
+ checkAnswer(
+ spark.read
+ .option("columnNameOfCorruptRecord", "_corrupt_record")
+ .schema(schema)
+ .csv(input),
+ Row(null, null))
+ assert(spark.read.csv(input).collect().toSet == Set(Row()))
+ }
+
test("field names of inferred schema shouldn't compare to the first row") {
val input = Seq("1,2").toDS()
val df = spark.read.option("enforceSchema", false).csv(input)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]