Repository: spark
Updated Branches:
  refs/heads/master 584e767d3 -> 4d6704db4


[SPARK-25243][SQL] Use FailureSafeParser in from_json

## What changes were proposed in this pull request?

In the PR, I propose to switch `from_json` on `FailureSafeParser`, and to make 
the function compatible to `PERMISSIVE` mode by default, and to support the 
`FAILFAST` mode as well. The `DROPMALFORMED` mode is not supported by 
`from_json`.

## How was this patch tested?

It was tested by existing `JsonSuite`/`CSVSuite`, `JsonFunctionsSuite` and 
`JsonExpressionsSuite` as well as new tests for `from_json` which checks 
different modes.

Closes #22237 from MaxGekk/from_json-failuresafe.

Lead-authored-by: Maxim Gekk <[email protected]>
Co-authored-by: hyukjinkwon <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d6704db
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d6704db
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d6704db

Branch: refs/heads/master
Commit: 4d6704db4d490bd1830ed3c757525f41058523e0
Parents: 584e767
Author: Maxim Gekk <[email protected]>
Authored: Wed Oct 24 19:09:15 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Wed Oct 24 19:09:15 2018 +0800

----------------------------------------------------------------------
 R/pkg/tests/fulltests/test_sparkSQL.R           |  2 +-
 docs/sql-migration-guide-upgrade.md             |  2 +
 python/pyspark/sql/functions.py                 |  2 +-
 .../catalyst/expressions/jsonExpressions.scala  | 64 +++++++++-----------
 .../spark/sql/catalyst/json/JacksonParser.scala |  7 ++-
 .../expressions/JsonExpressionsSuite.scala      | 38 +++++++-----
 .../org/apache/spark/sql/DataFrameReader.scala  |  2 +-
 .../datasources/json/JsonFileFormat.scala       |  2 +-
 .../native/stringCastAndExpressions.sql.out     |  2 +-
 .../apache/spark/sql/JsonFunctionsSuite.scala   | 33 +++++++++-
 .../execution/datasources/json/JsonSuite.scala  |  2 +-
 11 files changed, 95 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4d6704db/R/pkg/tests/fulltests/test_sparkSQL.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index 5ad5d78..509f689 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -1694,7 +1694,7 @@ test_that("column functions", {
   df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
   schema2 <- structType(structField("date", "date"))
   s <- collect(select(df, from_json(df$col, schema2)))
-  expect_equal(s[[1]][[1]], NA)
+  expect_equal(s[[1]][[1]]$date, NA)
   s <- collect(select(df, from_json(df$col, schema2, dateFormat = 
"dd/MM/yyyy")))
   expect_is(s[[1]][[1]]$date, "Date")
   expect_equal(as.character(s[[1]][[1]]$date), "2014-10-21")

http://git-wip-us.apache.org/repos/asf/spark/blob/4d6704db/docs/sql-migration-guide-upgrade.md
----------------------------------------------------------------------
diff --git a/docs/sql-migration-guide-upgrade.md 
b/docs/sql-migration-guide-upgrade.md
index b8b9ad8..dfa35b8 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -13,6 +13,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, the parser of JSON data source treats 
empty strings as null for some data types such as `IntegerType`. For 
`FloatType` and `DoubleType`, it fails on empty strings and throws exceptions. 
Since Spark 3.0, we disallow empty strings and will throw exceptions for data 
types except for `StringType` and `BinaryType`.
 
+  - Since Spark 3.0, the `from_json` functions supports two modes - 
`PERMISSIVE` and `FAILFAST`. The modes can be set via the `mode` option. The 
default mode became `PERMISSIVE`. In previous versions, behavior of `from_json` 
did not conform to either `PERMISSIVE` nor `FAILFAST`, especially in processing 
of malformed JSON records. For example, the JSON string `{"a" 1}` with the 
schema `a INT` is converted to `null` by previous versions but Spark 3.0 
converts it to `Row(null)`.
+
 ## Upgrading From Spark SQL 2.3 to 2.4
 
   - In Spark version 2.3 and earlier, the second parameter to array_contains 
function is implicitly promoted to the element type of first array type 
parameter. This type promotion can be lossy and may cause `array_contains` 
function to return wrong result. This problem has been addressed in 2.4 by 
employing a safer type promotion mechanism. This can cause some change in 
behavior and are illustrated in the table below.

http://git-wip-us.apache.org/repos/asf/spark/blob/4d6704db/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 32d7f02..2694e77 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2305,7 +2305,7 @@ def from_json(col, schema, options={}):
     [Row(json=[Row(a=1)])]
     >>> schema = schema_of_json(lit('''{"a": 0}'''))
     >>> df.select(from_json(df.value, schema).alias("json")).collect()
-    [Row(json=Row(a=1))]
+    [Row(json=Row(a=None))]
     >>> data = [(1, '''[1, 2, 3]''')]
     >>> schema = ArrayType(IntegerType())
     >>> df = spark.createDataFrame(data, ("key", "value"))

http://git-wip-us.apache.org/repos/asf/spark/blob/4d6704db/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
index b4815b4..e966924 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
@@ -554,18 +554,36 @@ case class JsonToStructs(
   @transient
   lazy val converter = nullableSchema match {
     case _: StructType =>
-      (rows: Seq[InternalRow]) => if (rows.length == 1) rows.head else null
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next() else null
     case _: ArrayType =>
-      (rows: Seq[InternalRow]) => rows.head.getArray(0)
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) 
rows.next().getArray(0) else null
     case _: MapType =>
-      (rows: Seq[InternalRow]) => rows.head.getMap(0)
+      (rows: Iterator[InternalRow]) => if (rows.hasNext) rows.next().getMap(0) 
else null
   }
 
-  @transient
-  lazy val parser =
-    new JacksonParser(
-      nullableSchema,
-      new JSONOptions(options + ("mode" -> FailFastMode.name), timeZoneId.get))
+  val nameOfCorruptRecord = 
SQLConf.get.getConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD)
+  @transient lazy val parser = {
+    val parsedOptions = new JSONOptions(options, timeZoneId.get, 
nameOfCorruptRecord)
+    val mode = parsedOptions.parseMode
+    if (mode != PermissiveMode && mode != FailFastMode) {
+      throw new IllegalArgumentException(s"from_json() doesn't support the 
${mode.name} mode. " +
+        s"Acceptable modes are ${PermissiveMode.name} and 
${FailFastMode.name}.")
+    }
+    val rawParser = new JacksonParser(nullableSchema, parsedOptions, 
allowArrayAsStructs = false)
+    val createParser = CreateJacksonParser.utf8String _
+
+    val parserSchema = nullableSchema match {
+      case s: StructType => s
+      case other => StructType(StructField("value", other) :: Nil)
+    }
+
+    new FailureSafeParser[UTF8String](
+      input => rawParser.parse(input, createParser, identity[UTF8String]),
+      mode,
+      parserSchema,
+      parsedOptions.columnNameOfCorruptRecord,
+      parsedOptions.multiLine)
+  }
 
   override def dataType: DataType = nullableSchema
 
@@ -573,35 +591,7 @@ case class JsonToStructs(
     copy(timeZoneId = Option(timeZoneId))
 
   override def nullSafeEval(json: Any): Any = {
-    // When input is,
-    //   - `null`: `null`.
-    //   - invalid json: `null`.
-    //   - empty string: `null`.
-    //
-    // When the schema is array,
-    //   - json array: `Array(Row(...), ...)`
-    //   - json object: `Array(Row(...))`
-    //   - empty json array: `Array()`.
-    //   - empty json object: `Array(Row(null))`.
-    //
-    // When the schema is a struct,
-    //   - json object/array with single element: `Row(...)`
-    //   - json array with multiple elements: `null`
-    //   - empty json array: `null`.
-    //   - empty json object: `Row(null)`.
-
-    // We need `null` if the input string is an empty string. `JacksonParser` 
can
-    // deal with this but produces `Nil`.
-    if (json.toString.trim.isEmpty) return null
-
-    try {
-      converter(parser.parse(
-        json.asInstanceOf[UTF8String],
-        CreateJacksonParser.utf8String,
-        identity[UTF8String]))
-    } catch {
-      case _: BadRecordException => null
-    }
+    converter(parser.parse(json.asInstanceOf[UTF8String]))
   }
 
   override def inputTypes: Seq[AbstractDataType] = StringType :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/4d6704db/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index 918c9e7..57c7f2f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -38,7 +38,8 @@ import org.apache.spark.util.Utils
  */
 class JacksonParser(
     schema: DataType,
-    val options: JSONOptions) extends Logging {
+    val options: JSONOptions,
+    allowArrayAsStructs: Boolean) extends Logging {
 
   import JacksonUtils._
   import com.fasterxml.jackson.core.JsonToken._
@@ -84,7 +85,7 @@ class JacksonParser(
         // List([str_a_1,null])
         // List([str_a_2,null], [null,str_b_3])
         //
-      case START_ARRAY =>
+      case START_ARRAY if allowArrayAsStructs =>
         val array = convertArray(parser, elementConverter)
         // Here, as we support reading top level JSON arrays and take every 
element
         // in such an array as a row, this case is possible.
@@ -93,6 +94,8 @@ class JacksonParser(
         } else {
           array.toArray[InternalRow](schema).toSeq
         }
+      case START_ARRAY =>
+        throw new RuntimeException("Parsing JSON arrays as structs is 
forbidden.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4d6704db/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
index f31b294..3046421 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/JsonExpressionsSuite.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.expressions
 
 import java.util.Calendar
 
-import org.apache.spark.SparkFunSuite
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.plans.PlanTestBase
@@ -409,14 +411,18 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
       JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
-      null
+      InternalRow(null)
     )
 
-    // Other modes should still return `null`.
-    checkEvaluation(
-      JsonToStructs(schema, Map("mode" -> PermissiveMode.name), 
Literal(jsonData), gmtId),
-      null
-    )
+    val exception = intercept[TestFailedException] {
+      checkEvaluation(
+        JsonToStructs(schema, Map("mode" -> FailFastMode.name), 
Literal(jsonData), gmtId),
+        InternalRow(null)
+      )
+    }.getCause
+    assert(exception.isInstanceOf[SparkException])
+    assert(exception.getMessage.contains(
+      "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST"))
   }
 
   test("from_json - input=array, schema=array, output=array") {
@@ -450,21 +456,23 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
   test("from_json - input=array of single object, schema=struct, output=single 
row") {
     val input = """[{"a": 1}]"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
-    val output = InternalRow(1)
+    val output = InternalRow(null)
     checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
   }
 
-  test("from_json - input=array, schema=struct, output=null") {
+  test("from_json - input=array, schema=struct, output=single row") {
     val input = """[{"a": 1}, {"a": 2}]"""
-    val schema = StructType(StructField("a", IntegerType) :: Nil)
-    val output = null
-    checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
+    val corrupted = "corrupted"
+    val schema = new StructType().add("a", IntegerType).add(corrupted, 
StringType)
+    val output = InternalRow(null, UTF8String.fromString(input))
+    val options = Map("columnNameOfCorruptRecord" -> corrupted)
+    checkEvaluation(JsonToStructs(schema, options, Literal(input), gmtId), 
output)
   }
 
-  test("from_json - input=empty array, schema=struct, output=null") {
+  test("from_json - input=empty array, schema=struct, output=single row with 
null") {
     val input = """[]"""
     val schema = StructType(StructField("a", IntegerType) :: Nil)
-    val output = null
+    val output = InternalRow(null)
     checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), 
output)
   }
 
@@ -487,7 +495,7 @@ class JsonExpressionsSuite extends SparkFunSuite with 
ExpressionEvalHelper with
     val schema = StructType(StructField("a", IntegerType) :: Nil)
     checkEvaluation(
       JsonToStructs(schema, Map.empty, Literal(badJson), gmtId),
-      null)
+      InternalRow(null))
   }
 
   test("from_json with timestamp") {

http://git-wip-us.apache.org/repos/asf/spark/blob/4d6704db/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 4f6d8b8..95c97e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -446,7 +446,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 
     val createParser = CreateJacksonParser.string _
     val parsed = jsonDataset.rdd.mapPartitions { iter =>
-      val rawParser = new JacksonParser(actualSchema, parsedOptions)
+      val rawParser = new JacksonParser(actualSchema, parsedOptions, 
allowArrayAsStructs = true)
       val parser = new FailureSafeParser[String](
         input => rawParser.parse(input, createParser, UTF8String.fromString),
         parsedOptions.parseMode,

http://git-wip-us.apache.org/repos/asf/spark/blob/4d6704db/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index a9241af..1f7c9d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -130,7 +130,7 @@ class JsonFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
     }
 
     (file: PartitionedFile) => {
-      val parser = new JacksonParser(actualSchema, parsedOptions)
+      val parser = new JacksonParser(actualSchema, parsedOptions, 
allowArrayAsStructs = true)
       JsonDataSource(parsedOptions).readFile(
         broadcastedHadoopConf.value.value,
         file,

http://git-wip-us.apache.org/repos/asf/spark/blob/4d6704db/sql/core/src/test/resources/sql-tests/results/typeCoercion/native/stringCastAndExpressions.sql.out
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/resources/sql-tests/results/typeCoercion/native/stringCastAndExpressions.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/typeCoercion/native/stringCastAndExpressions.sql.out
index ba9bf76..31ee700 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/typeCoercion/native/stringCastAndExpressions.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/typeCoercion/native/stringCastAndExpressions.sql.out
@@ -258,4 +258,4 @@ select from_json(a, 'a INT') from t
 -- !query 31 schema
 struct<from_json(a):struct<a:int>>
 -- !query 31 output
-NULL
+{"a":null}

http://git-wip-us.apache.org/repos/asf/spark/blob/4d6704db/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
index 5cbf101..797b274 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql
 
 import collection.JavaConverters._
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 
@@ -132,7 +134,7 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
 
     checkAnswer(
       df.select(from_json($"value", schema)),
-      Row(null) :: Nil)
+      Row(Row(null)) :: Nil)
   }
 
   test("from_json - json doesn't conform to the array type") {
@@ -547,4 +549,33 @@ class JsonFunctionsSuite extends QueryTest with 
SharedSQLContext {
           Map("pretty" -> "true"))),
       Seq(Row(expected)))
   }
+
+  test("from_json invalid json - check modes") {
+    withSQLConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD.key -> "_unparsed") {
+      val schema = new StructType()
+        .add("a", IntegerType)
+        .add("b", IntegerType)
+        .add("_unparsed", StringType)
+      val badRec = """{"a" 1, "b": 11}"""
+      val df = Seq(badRec, """{"a": 2, "b": 12}""").toDS()
+
+      checkAnswer(
+        df.select(from_json($"value", schema, Map("mode" -> "PERMISSIVE"))),
+        Row(Row(null, null, badRec)) :: Row(Row(2, 12, null)) :: Nil)
+
+      val exception1 = intercept[SparkException] {
+        df.select(from_json($"value", schema, Map("mode" -> 
"FAILFAST"))).collect()
+      }.getMessage
+      assert(exception1.contains(
+        "Malformed records are detected in record parsing. Parse Mode: 
FAILFAST."))
+
+      val exception2 = intercept[SparkException] {
+        df.select(from_json($"value", schema, Map("mode" -> "DROPMALFORMED")))
+          .collect()
+      }.getMessage
+      assert(exception2.contains(
+        "from_json() doesn't support the DROPMALFORMED mode. " +
+          "Acceptable modes are PERMISSIVE and FAILFAST."))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4d6704db/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 43e1a61..06032de 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -67,7 +67,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with 
TestJsonData {
 
       val dummyOption = new JSONOptions(Map.empty[String, String], "GMT")
       val dummySchema = StructType(Seq.empty)
-      val parser = new JacksonParser(dummySchema, dummyOption)
+      val parser = new JacksonParser(dummySchema, dummyOption, 
allowArrayAsStructs = true)
 
       Utils.tryWithResource(factory.createParser(writer.toString)) { 
jsonParser =>
         jsonParser.nextToken()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to