This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 205b65e [SPARK-33134][SQL][3.0] Return partial results only for root
JSON objects
205b65e is described below
commit 205b65e35c45d3e4f5b3f798a3b48410e94ab2c5
Author: Max Gekk <[email protected]>
AuthorDate: Wed Oct 14 12:14:32 2020 +0900
[SPARK-33134][SQL][3.0] Return partial results only for root JSON objects
### What changes were proposed in this pull request?
In the PR, I propose to restrict the partial result feature only by root
JSON objects. JSON datasource as well as `from_json()` will return `null` for
malformed nested JSON objects.
### Why are the changes needed?
1. To not raise exception to users in the PERMISSIVE mode
2. To fix a regression and to have the same behavior as Spark 2.4.x has
3. Current implementation of partial result is supposed to work only for
root (top-level) JSON objects, and not tested for bad nested complex JSON
fields.
### Does this PR introduce _any_ user-facing change?
Yes. Before the changes, the code below:
```scala
val pokerhand_raw = Seq("""[{"cards": [19], "playerId":
123456}]""").toDF("events")
val event = new StructType().add("playerId", LongType).add("cards",
ArrayType(new StructType().add("id", LongType).add("rank", StringType)))
val pokerhand_events = pokerhand_raw.select(from_json($"events",
ArrayType(event)).as("event"))
pokerhand_events.show
```
throws the exception even in the default **PERMISSIVE** mode:
```java
java.lang.ClassCastException: java.lang.Long cannot be cast to
org.apache.spark.sql.catalyst.util.ArrayData
at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:48)
at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:48)
at
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:195)
```
After the changes:
```
+-----+
|event|
+-----+
| null|
+-----+
```
### How was this patch tested?
Added a test to `JsonFunctionsSuite`.
Closes #30032 from MaxGekk/json-skip-row-wrong-schema-3.0.
Authored-by: Max Gekk <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
---
.../org/apache/spark/sql/catalyst/json/JacksonParser.scala | 7 ++++---
.../scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 14 ++++++++++++++
2 files changed, 18 insertions(+), 3 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index e038f77..70f7f8f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -86,7 +86,7 @@ class JacksonParser(
val elementConverter = makeConverter(st)
val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
(parser: JsonParser) => parseJsonToken[Iterable[InternalRow]](parser, st) {
- case START_OBJECT => Some(convertObject(parser, st, fieldConverters))
+ case START_OBJECT => Some(convertObject(parser, st, fieldConverters,
isRoot = true))
// SPARK-3308: support reading top level JSON arrays and take every
element
// in such an array as a row
//
@@ -375,7 +375,8 @@ class JacksonParser(
private def convertObject(
parser: JsonParser,
schema: StructType,
- fieldConverters: Array[ValueConverter]): InternalRow = {
+ fieldConverters: Array[ValueConverter],
+ isRoot: Boolean = false): InternalRow = {
val row = new GenericInternalRow(schema.length)
var badRecordException: Option[Throwable] = None
@@ -386,7 +387,7 @@ class JacksonParser(
row.update(index, fieldConverters(index).apply(parser))
} catch {
case e: SparkUpgradeException => throw e
- case NonFatal(e) =>
+ case NonFatal(e) if isRoot =>
badRecordException = badRecordException.orElse(Some(e))
parser.skipChildren()
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index a4ed7cc..5e3931c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -712,4 +712,18 @@ class JsonFunctionsSuite extends QueryTest with
SharedSparkSession {
| """.stripMargin)
checkAnswer(toDF("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"),
toDF("yyyy-MM-dd'T'HH:mm:ss[.SSS][XXX]"))
}
+
+ test("SPARK-33134: return partial results only for root JSON objects") {
+ val st = new StructType()
+ .add("c1", LongType)
+ .add("c2", ArrayType(new StructType().add("c3", LongType).add("c4",
StringType)))
+ val df1 = Seq("""{"c2": [19], "c1": 123456}""").toDF("c0")
+ checkAnswer(df1.select(from_json($"c0", st)), Row(Row(123456, null)))
+ val df2 = Seq("""{"data": {"c2": [19], "c1": 123456}}""").toDF("c0")
+ checkAnswer(df2.select(from_json($"c0", new StructType().add("data",
st))), Row(Row(null)))
+ val df3 = Seq("""[{"c2": [19], "c1": 123456}]""").toDF("c0")
+ checkAnswer(df3.select(from_json($"c0", ArrayType(st))), Row(null))
+ val df4 = Seq("""{"c2": [19]}""").toDF("c0")
+ checkAnswer(df4.select(from_json($"c0", MapType(StringType, st))),
Row(null))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]