This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 32d127d [SPARK-36379][SQL][3.1] Null at root level of a JSON array
should not fail w/ permissive mode
32d127d is described below
commit 32d127de4a4a628276e659bd6a5d572c625ed565
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Tue Aug 17 21:10:44 2021 +0900
[SPARK-36379][SQL][3.1] Null at root level of a JSON array should not fail
w/ permissive mode
This PR backports https://github.com/apache/spark/pull/33608 to branch-3.1
-----------------------------------------------------------------------------
### What changes were proposed in this pull request?
This PR proposes to fail properly so JSON parser can proceed and parse the
input with the permissive mode.
Previously, we passed `null`s as are, the root `InternalRow`s became
`null`s, and it causes the query fails even with permissive mode on.
Now, we fail explicitly if `null` is passed when the input array contains
`null`.
Note that this is consistent with non-array JSON input:
**Permissive mode:**
```scala
spark.read.json(Seq("""{"a": "str"}""", """null""").toDS).collect()
```
```
res0: Array[org.apache.spark.sql.Row] = Array([str], [null])
```
**Failfast mode**:
```scala
spark.read.option("mode", "failfast").json(Seq("""{"a": "str"}""",
"""null""").toDS).collect()
```
```
org.apache.spark.SparkException: Malformed records are detected in record
parsing. Parse Mode: FAILFAST. To process malformed records as null result, try
setting the option 'mode' as 'PERMISSIVE'.
at
org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:70)
at
org.apache.spark.sql.DataFrameReader.$anonfun$json$7(DataFrameReader.scala:540)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
```
### Why are the changes needed?
To make the permissive mode to proceed and parse without throwing an
exception.
### Does this PR introduce _any_ user-facing change?
**Permissive mode:**
```scala
spark.read.json(Seq("""[{"a": "str"}, null]""").toDS).collect()
```
Before:
```
java.lang.NullPointerException
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
```
After:
```
res0: Array[org.apache.spark.sql.Row] = Array([null])
```
NOTE that this behaviour is consistent when JSON object is malformed:
```scala
spark.read.schema("a int").json(Seq("""[{"a": 123}, {123123}, {"a":
123}]""").toDS).collect()
```
```
res0: Array[org.apache.spark.sql.Row] = Array([null])
```
Since we're parsing _one_ JSON array, related records all fail together.
**Failfast mode:**
```scala
spark.read.option("mode", "failfast").json(Seq("""[{"a": "str"},
null]""").toDS).collect()
```
Before:
```
java.lang.NullPointerException
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
```
After:
```
org.apache.spark.SparkException: Malformed records are detected in record
parsing. Parse Mode: FAILFAST. To process malformed records as null result, try
setting the option 'mode' as 'PERMISSIVE'.
at
org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:70)
at
org.apache.spark.sql.DataFrameReader.$anonfun$json$7(DataFrameReader.scala:540)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
```
### How was this patch tested?
Manually tested, and unit test was added.
Closes #33762 from HyukjinKwon/cherry-pick-SPARK-36379.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/sql/catalyst/json/JacksonParser.scala | 9 ++++++---
.../spark/sql/execution/datasources/json/JsonSuite.scala | 14 ++++++++++++++
2 files changed, 20 insertions(+), 3 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index bbcff49..8f3f703 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -108,7 +108,7 @@ class JacksonParser(
// List([str_a_2,null], [null,str_b_3])
//
case START_ARRAY if allowArrayAsStructs =>
- val array = convertArray(parser, elementConverter)
+ val array = convertArray(parser, elementConverter, isRoot = true)
// Here, as we support reading top level JSON arrays and take every
element
// in such an array as a row, this case is possible.
if (array.numElements() == 0) {
@@ -439,10 +439,13 @@ class JacksonParser(
*/
private def convertArray(
parser: JsonParser,
- fieldConverter: ValueConverter): ArrayData = {
+ fieldConverter: ValueConverter,
+ isRoot: Boolean = false): ArrayData = {
val values = ArrayBuffer.empty[Any]
while (nextUntil(parser, JsonToken.END_ARRAY)) {
- values += fieldConverter.apply(parser)
+ val v = fieldConverter.apply(parser)
+ if (isRoot && v == null) throw new RuntimeException("Root converter
returned null")
+ values += v
}
new GenericArrayData(values.toArray)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 76e05a2..c60cba2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2844,6 +2844,20 @@ abstract class JsonSuite
assert(readback.collect sameElements Array(Row(0), Row(1), Row(2)))
}
}
+
+ test("SPARK-36379: proceed parsing with root nulls in permissive mode") {
+ assert(intercept[SparkException] {
+ spark.read.option("mode", "failfast")
+ .schema("a string").json(Seq("""[{"a": "str"},
null]""").toDS).collect()
+ }.getMessage.contains("Malformed records are detected"))
+
+ // Permissive modes should proceed parsing malformed records (null).
+ // Here, since an array fails to parse in the middle, we will return one
row.
+ checkAnswer(
+ spark.read.option("mode", "permissive")
+ .json(Seq("""[{"a": "str"}, null, {"a": "str"}]""").toDS),
+ Row(null) :: Nil)
+ }
}
class JsonV1Suite extends JsonSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]