Repository: spark
Updated Branches:
  refs/heads/master ce7b57cb5 -> 38628dd1b


[SPARK-25935][SQL] Prevent null rows from JSON parser

## What changes were proposed in this pull request?

An input without valid JSON tokens on the root level will be treated as a bad 
record, and handled according to `mode`. Previously such input was converted to 
`null`. After the changes, the input is converted to a row with `null`s in the 
`PERMISSIVE` mode according the schema. This allows to remove a code in the 
`from_json` function which can produce `null` as result rows.

## How was this patch tested?

It was tested by existing test suites. Some of them I have to modify 
(`JsonSuite` for example) because previously bad input was just silently 
ignored. For now such input is handled according to specified `mode`.

Closes #22938 from MaxGekk/json-nulls.

Lead-authored-by: Maxim Gekk <max.g...@gmail.com>
Co-authored-by: Maxim Gekk <maxim.g...@databricks.com>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38628dd1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38628dd1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38628dd1

Branch: refs/heads/master
Commit: 38628dd1b8298d2686e5d00de17c461c70db99a8
Parents: ce7b57c
Author: Maxim Gekk <max.g...@gmail.com>
Authored: Thu Nov 22 09:35:29 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Nov 22 09:35:29 2018 +0800

----------------------------------------------------------------------
 R/pkg/tests/fulltests/test_sparkSQL.R           |  2 +-
 docs/sql-migration-guide-upgrade.md             |  2 ++
 .../catalyst/expressions/jsonExpressions.scala  | 26 +++++++++++++-------
 .../spark/sql/catalyst/json/JacksonParser.scala |  2 +-
 .../expressions/JsonExpressionsSuite.scala      |  2 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 10 --------
 .../execution/datasources/json/JsonSuite.scala  | 12 ++++++---
 7 files changed, 31 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/38628dd1/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index 059c9f3..f355a51 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -1674,7 +1674,7 @@ test_that("column functions", {
 
   # check for unparseable
   df <- as.DataFrame(list(list("a" = "")))
-  expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]], NA)
+  expect_equal(collect(select(df, from_json(df$a, schema)))[[1]][[1]]$a, NA)
 
   # check if array type in string is correctly supported.
   jsonArr <- "[{\"name\":\"Bob\"}, {\"name\":\"Alice\"}]"

http://git-wip-us.apache.org/repos/asf/spark/blob/38628dd1/docs/sql-migration-guide-upgrade.md
----------------------------------------------------------------------
diff --git a/docs/sql-migration-guide-upgrade.md 
b/docs/sql-migration-guide-upgrade.md
index 07079d9..e8f2bcc 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -15,6 +15,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`.
 
+  - In Spark version 2.4 and earlier, the `from_json` function produces 
`null`s for JSON strings and JSON datasource skips the same independetly of its 
mode if there is no valid root JSON token in its input (` ` for example). Since 
Spark 3.0, such input is treated as a bad record and handled according to 
specified mode. For example, in the `PERMISSIVE` mode the ` ` input is 
converted to `Row(null, null)` if specified schema is `key STRING, value INT`. 
+
   - The `ADD JAR` command previously returned a result set with the single 
value 0. It now returns an empty result set.
 
   - In Spark version 2.4 and earlier, users can create map values with map 
type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since 
Spark 3.0, it's not allowed to create map values with map type key with these 
built-in functions. Users can still read map values with map type key from data 
source or Java/Scala collections, though they are not very useful.

http://git-wip-us.apache.org/repos/asf/spark/blob/38628dd1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index 52d0677..543c6c4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -550,15 +550,23 @@ case class JsonToStructs(
       s"Input schema ${nullableSchema.catalogString} must be a struct, an 
array or a map.")
   }
 
-  // This converts parsed rows to the desired output by the given schema.
   @transient
-  lazy val converter = nullableSchema match {
-    case _: StructType =>
-      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null
-    case _: ArrayType =>
-      (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
-    case _: MapType =>
-      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) 
else null
+  private lazy val castRow = nullableSchema match {
+    case _: StructType => (row: InternalRow) => row
+    case _: ArrayType => (row: InternalRow) => row.getArray(0)
+    case _: MapType => (row: InternalRow) => row.getMap(0)
+  }
+
+  // This converts parsed rows to the desired output by the given schema.
+  private def convertRow(rows: Iterator[InternalRow]) = {
+    if (rows.hasNext) {
+      val result = rows.next()
+      // JSON's parser produces one record only.
+      assert(!rows.hasNext)
+      castRow(result)
+    } else {
+      throw new IllegalArgumentException("Expected one row from JSON parser.")
+    }
   }
 
   val nameOfCorruptRecord = 
SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
@@ -593,7 +601,7 @@ case class JsonToStructs(
     copy(timeZoneId = Option(timeZoneId))
 
   override def nullSafeEval(json: Any): Any = {
-    converter(parser.parse(json.asInstanceOf[UTF8String]))
+    convertRow(parser.parse(json.asInstanceOf[UTF8String]))
   }
 
   override def inputTypes: Seq[AbstractDataType] = StringType :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/38628dd1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 57c7f2f..773ff5a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -399,7 +399,7 @@ class JacksonParser(
         // a null first token is equivalent to testing for input.trim.isEmpty
         // but it works on any token stream and not just strings
         parser.nextToken() match {
-          case null => Nil
+          case null => throw new RuntimeException("Not found any JSON token")
           case _ => rootConverter.apply(parser) match {
             case null => throw new RuntimeException("Root converter returned 
null")
             case rows => rows

http://git-wip-us.apache.org/repos/asf/spark/blob/38628dd1/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index 6ee8c74..34bd2a9 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -547,7 +547,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
       JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
-      null
+      InternalRow(null)
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/38628dd1/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index dbb0790..4cc8a45 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -240,16 +240,6 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
       Seq(Row("1"), Row("2")))
   }
 
-  test("SPARK-11226 Skip empty line in json file") {
-    spark.read
-      .json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", 
"").toDS())
-      .createOrReplaceTempView("d")
-
-    checkAnswer(
-      sql("select count(1) from d"),
-      Seq(Row(3)))
-  }
-
   test("SPARK-8828 sum should return null if all input values are null") {
     checkAnswer(
       sql("select sum(a), avg(a) from allNulls"),

http://git-wip-us.apache.org/repos/asf/spark/blob/38628dd1/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 06032de..9ea9189 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -1115,6 +1115,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
         Row(null, null, null),
         Row(null, null, null),
         Row(null, null, null),
+        Row(null, null, null),
         Row("str_a_4", "str_b_4", "str_c_4"),
         Row(null, null, null))
     )
@@ -1136,6 +1137,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
       checkAnswer(
         jsonDF.select($"a", $"b", $"c", $"_unparsed"),
         Row(null, null, null, "{") ::
+          Row(null, null, null, "") ::
           Row(null, null, null, """{"a":1, b:2}""") ::
           Row(null, null, null, """{"a":{, b:3}""") ::
           Row("str_a_4", "str_b_4", "str_c_4", null) ::
@@ -1150,6 +1152,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
       checkAnswer(
         jsonDF.filter($"_unparsed".isNotNull).select($"_unparsed"),
         Row("{") ::
+          Row("") ::
           Row("""{"a":1, b:2}""") ::
           Row("""{"a":{, b:3}""") ::
           Row("]") :: Nil
@@ -1171,6 +1174,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
     checkAnswer(
       jsonDF.selectExpr("a", "b", "c", "_malformed"),
       Row(null, null, null, "{") ::
+        Row(null, null, null, "") ::
         Row(null, null, null, """{"a":1, b:2}""") ::
         Row(null, null, null, """{"a":{, b:3}""") ::
         Row("str_a_4", "str_b_4", "str_c_4", null) ::
@@ -1813,6 +1817,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
       val path = dir.getCanonicalPath
       primitiveFieldAndType
         .toDF("value")
+        .repartition(1)
         .write
         .option("compression", "GzIp")
         .text(path)
@@ -1838,6 +1843,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
       val path = dir.getCanonicalPath
       primitiveFieldAndType
         .toDF("value")
+        .repartition(1)
         .write
         .text(path)
 
@@ -1892,7 +1898,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
         .text(path)
 
       val jsonDF = spark.read.option("multiLine", true).option("mode", 
"PERMISSIVE").json(path)
-      assert(jsonDF.count() === corruptRecordCount)
+      assert(jsonDF.count() === corruptRecordCount + 1) // null row for empty 
file
       assert(jsonDF.schema === new StructType()
         .add("_corrupt_record", StringType)
         .add("dummy", StringType))
@@ -1905,7 +1911,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
           F.count($"dummy").as("valid"),
           F.count($"_corrupt_record").as("corrupt"),
           F.count("*").as("count"))
-      checkAnswer(counts, Row(1, 4, 6))
+      checkAnswer(counts, Row(1, 5, 7)) // null row for empty file
     }
   }
 
@@ -2513,7 +2519,7 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
     }
 
     checkCount(2)
-    countForMalformedJSON(0, Seq(""))
+    countForMalformedJSON(1, Seq(""))
   }
 
   test("SPARK-25040: empty strings should be disallowed") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to