Repository: spark
Updated Branches:
  refs/heads/master ca9ef86c8 -> 917f4000b


[SPARK-13719][SQL] Parse JSON rows having an array type and a struct type in 
the same fieild

## What changes were proposed in this pull request?

This https://github.com/apache/spark/pull/2400 added the support to parse JSON 
rows wrapped with an array. However, this throws an exception when the given 
data contains array data and struct data in the same field as below:

```json
{"a": {"b": 1}}
{"a": []}
```

and the schema is given as below:

```scala
val schema =
  StructType(
    StructField("a", StructType(
      StructField("b", StringType) :: Nil
    )) :: Nil)
```

- **Before**

```scala
sqlContext.read.schema(schema).json(path).show()
```

```scala
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 7 in stage 0.0 failed 4 times, most recent failure: Lost 
task 7.3 in stage 0.0 (TID 10, 192.168.1.170): java.lang.ClassCastException: 
org.apache.spark.sql.types.GenericArrayData cannot be cast to 
org.apache.spark.sql.catalyst.InternalRow
        at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getStruct(rows.scala:50)
        at 
org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getStruct(rows.scala:247)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
...
```

- **After**

```scala
sqlContext.read.schema(schema).json(path).show()
```

```bash
+----+
|   a|
+----+
| [1]|
|null|
+----+
```

For other data types, in this case it converts the given values are `null` but 
only this case emits an exception.

This PR makes the support for wrapped rows applied only at the top level.

## How was this patch tested?

Unit tests were used and `./dev/run_tests` for code style tests.

Author: hyukjinkwon <[email protected]>

Closes #11752 from HyukjinKwon/SPARK-3308-follow-up.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/917f4000
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/917f4000
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/917f4000

Branch: refs/heads/master
Commit: 917f4000b418ee0997551d4dfabb369c4e9243a9
Parents: ca9ef86
Author: hyukjinkwon <[email protected]>
Authored: Wed Mar 16 18:20:30 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Wed Mar 16 18:20:30 2016 -0700

----------------------------------------------------------------------
 .../datasources/json/JSONRelation.scala         |  1 -
 .../datasources/json/JacksonParser.scala        | 37 +++++++++++++-------
 .../execution/datasources/json/JsonSuite.scala  | 19 +++++++++-
 .../datasources/json/TestJsonData.scala         |  5 +++
 4 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/917f4000/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 3fa5ebf..751d78d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.json
 import java.io.CharArrayWriter
 
 import com.fasterxml.jackson.core.JsonFactory
-import com.google.common.base.Objects
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
 import org.apache.hadoop.mapred.{JobConf, TextInputFormat}

http://git-wip-us.apache.org/repos/asf/spark/blob/917f4000/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index b2f5c1e..3252b6c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -49,8 +49,31 @@ object JacksonParser {
 
   /**
    * Parse the current token (and related children) according to a desired 
schema
+   * This is an wrapper for the method `convertField()` to handle a row wrapped
+   * with an array.
    */
-  def convertField(
+  def convertRootField(
+        factory: JsonFactory,
+        parser: JsonParser,
+        schema: DataType): Any = {
+    import com.fasterxml.jackson.core.JsonToken._
+    (parser.getCurrentToken, schema) match {
+      case (START_ARRAY, st: StructType) =>
+        // SPARK-3308: support reading top level JSON arrays and take every 
element
+        // in such an array as a row
+        convertArray(factory, parser, st)
+
+      case (START_OBJECT, ArrayType(st, _)) =>
+        // the business end of SPARK-3308:
+        // when an object is found but an array is requested just wrap it in a 
list
+        convertField(factory, parser, st) :: Nil
+
+      case _ =>
+        convertField(factory, parser, schema)
+    }
+  }
+
+  private def convertField(
       factory: JsonFactory,
       parser: JsonParser,
       schema: DataType): Any = {
@@ -157,19 +180,9 @@ object JacksonParser {
       case (START_OBJECT, st: StructType) =>
         convertObject(factory, parser, st)
 
-      case (START_ARRAY, st: StructType) =>
-        // SPARK-3308: support reading top level JSON arrays and take every 
element
-        // in such an array as a row
-        convertArray(factory, parser, st)
-
       case (START_ARRAY, ArrayType(st, _)) =>
         convertArray(factory, parser, st)
 
-      case (START_OBJECT, ArrayType(st, _)) =>
-        // the business end of SPARK-3308:
-        // when an object is found but an array is requested just wrap it in a 
list
-        convertField(factory, parser, st) :: Nil
-
       case (START_OBJECT, MapType(StringType, kt, _)) =>
         convertMap(factory, parser, kt)
 
@@ -264,7 +277,7 @@ object JacksonParser {
           Utils.tryWithResource(factory.createParser(record)) { parser =>
             parser.nextToken()
 
-            convertField(factory, parser, schema) match {
+            convertRootField(factory, parser, schema) match {
               case null => failedRecord(record)
               case row: InternalRow => row :: Nil
               case array: ArrayData =>

http://git-wip-us.apache.org/repos/asf/spark/blob/917f4000/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 4a8c128..6d942c4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -65,7 +65,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with 
TestJsonData {
 
       Utils.tryWithResource(factory.createParser(writer.toString)) { parser =>
         parser.nextToken()
-        JacksonParser.convertField(factory, parser, dataType)
+        JacksonParser.convertRootField(factory, parser, dataType)
       }
     }
 
@@ -1426,6 +1426,23 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
     }
   }
 
+  test("Parse JSON rows having an array type and a struct type in the same 
field.") {
+    withTempDir { dir =>
+      val dir = Utils.createTempDir()
+      dir.delete()
+      val path = dir.getCanonicalPath
+      arrayAndStructRecords.map(record => record.replaceAll("\n", " 
")).saveAsTextFile(path)
+
+      val schema =
+        StructType(
+          StructField("a", StructType(
+            StructField("b", StringType) :: Nil
+          )) :: Nil)
+      val jsonDF = sqlContext.read.schema(schema).json(path)
+      assert(jsonDF.count() == 2)
+    }
+  }
+
   test("SPARK-12872 Support to specify the option for compression codec") {
     withTempDir { dir =>
       val dir = Utils.createTempDir()

http://git-wip-us.apache.org/repos/asf/spark/blob/917f4000/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
index a083605..b2eff81 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala
@@ -209,6 +209,11 @@ private[json] trait TestJsonData {
     sqlContext.sparkContext.parallelize(
       """{"ts":1451732645}""" :: Nil)
 
+  def arrayAndStructRecords: RDD[String] =
+    sqlContext.sparkContext.parallelize(
+      """{"a": {"b": 1}}""" ::
+      """{"a": []}""" :: Nil)
+
   lazy val singleRow: RDD[String] = 
sqlContext.sparkContext.parallelize("""{"a":123}""" :: Nil)
 
   def empty: RDD[String] = sqlContext.sparkContext.parallelize(Seq[String]())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to