Repository: spark
Updated Branches:
  refs/heads/master 72869883f -> 0494c80ef


[SPARK-10495] [SQL] Read date values in JSON data stored by Spark 1.5.0.

https://issues.apache.org/jira/browse/SPARK-10681

Author: Yin Huai <yh...@databricks.com>

Closes #8806 from yhuai/SPARK-10495.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0494c80e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0494c80e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0494c80e

Branch: refs/heads/master
Commit: 0494c80ef54f6f3a8c6f2d92abfe1a77a91df8b0
Parents: 7286988
Author: Yin Huai <yh...@databricks.com>
Authored: Mon Sep 21 18:06:45 2015 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Mon Sep 21 18:06:45 2015 -0700

----------------------------------------------------------------------
 .../datasources/json/JacksonGenerator.scala     |  36 +++++++
 .../datasources/json/JacksonParser.scala        |  15 ++-
 .../execution/datasources/json/JsonSuite.scala  | 103 ++++++++++++++++++-
 3 files changed, 152 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0494c80e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
index f65c7bb..23bada1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonGenerator.scala
@@ -73,6 +73,38 @@ private[sql] object JacksonGenerator {
             valWriter(field.dataType, v)
         }
         gen.writeEndObject()
+
+      // For UDT, udt.serialize will produce SQL types. So, we need the 
following three cases.
+      case (ArrayType(ty, _), v: ArrayData) =>
+        gen.writeStartArray()
+        v.foreach(ty, (_, value) => valWriter(ty, value))
+        gen.writeEndArray()
+
+      case (MapType(kt, vt, _), v: MapData) =>
+        gen.writeStartObject()
+        v.foreach(kt, vt, { (k, v) =>
+          gen.writeFieldName(k.toString)
+          valWriter(vt, v)
+        })
+        gen.writeEndObject()
+
+      case (StructType(ty), v: InternalRow) =>
+        gen.writeStartObject()
+        var i = 0
+        while (i < ty.length) {
+          val field = ty(i)
+          val value = v.get(i, field.dataType)
+          if (value != null) {
+            gen.writeFieldName(field.name)
+            valWriter(field.dataType, value)
+          }
+          i += 1
+        }
+        gen.writeEndObject()
+
+      case (dt, v) =>
+        sys.error(
+          s"Failed to convert value $v (class of ${v.getClass}}) with the type 
of $dt to JSON.")
     }
 
     valWriter(rowSchema, row)
@@ -133,6 +165,10 @@ private[sql] object JacksonGenerator {
           i += 1
         }
         gen.writeEndObject()
+
+      case (dt, v) =>
+        sys.error(
+          s"Failed to convert value $v (class of ${v.getClass}}) with the type 
of $dt to JSON.")
     }
 
     valWriter(rowSchema, row)

http://git-wip-us.apache.org/repos/asf/spark/blob/0494c80e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
index ff4d8c0..c511407 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala
@@ -62,10 +62,23 @@ private[sql] object JacksonParser {
         // guard the non string type
         null
 
+      case (VALUE_STRING, BinaryType) =>
+        parser.getBinaryValue
+
       case (VALUE_STRING, DateType) =>
-        
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
+        val stringValue = parser.getText
+        if (stringValue.contains("-")) {
+          // The format of this string will probably be "yyyy-mm-dd".
+          
DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(parser.getText).getTime)
+        } else {
+          // In Spark 1.5.0, we store the data as number of days since epoch 
in string.
+          // So, we just convert it to Int.
+          stringValue.toInt
+        }
 
       case (VALUE_STRING, TimestampType) =>
+        // This one will lose microseconds parts.
+        // See https://issues.apache.org/jira/browse/SPARK-10681.
         DateTimeUtils.stringToTime(parser.getText).getTime * 1000L
 
       case (VALUE_NUMBER_INT, TimestampType) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/0494c80e/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 6a18cc6..b614e6c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.core.JsonFactory
 import org.apache.spark.rdd.RDD
 import org.scalactic.Tolerance._
 
-import org.apache.spark.sql.{QueryTest, Row, SQLConf}
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, 
LogicalRelation}
 import 
org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
@@ -1159,4 +1159,105 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
           "SELECT count(a) FROM test_myjson_with_part where d1 = 1"), Row(9))
     })
   }
+
+  test("backward compatibility") {
+    // This test we make sure our JSON support can read JSON data generated by 
previous version
+    // of Spark generated through toJSON method and JSON data source.
+    // The data is generated by the following program.
+    // Here are a few notes:
+    //  - Spark 1.5.0 cannot save timestamp data. So, we manually added 
timestamp field (col13)
+    //      in the JSON object.
+    //  - For Spark before 1.5.1, we do not generate UDTs. So, we manually 
added the UDT value to
+    //      JSON objects generated by those Spark versions (col17).
+    //  - If the type is NullType, we do not write data out.
+
+    // Create the schema.
+    val struct =
+      StructType(
+        StructField("f1", FloatType, true) ::
+          StructField("f2", ArrayType(BooleanType), true) :: Nil)
+
+    val dataTypes =
+      Seq(
+        StringType, BinaryType, NullType, BooleanType,
+        ByteType, ShortType, IntegerType, LongType,
+        FloatType, DoubleType, DecimalType(25, 5), DecimalType(6, 5),
+        DateType, TimestampType,
+        ArrayType(IntegerType), MapType(StringType, LongType), struct,
+        new MyDenseVectorUDT())
+    val fields = dataTypes.zipWithIndex.map { case (dataType, index) =>
+      StructField(s"col$index", dataType, nullable = true)
+    }
+    val schema = StructType(fields)
+
+    val constantValues =
+      Seq(
+        "a string in binary".getBytes("UTF-8"),
+        null,
+        true,
+        1.toByte,
+        2.toShort,
+        3,
+        Long.MaxValue,
+        0.25.toFloat,
+        0.75,
+        new java.math.BigDecimal(s"1234.23456"),
+        new java.math.BigDecimal(s"1.23456"),
+        java.sql.Date.valueOf("2015-01-01"),
+        java.sql.Timestamp.valueOf("2015-01-01 23:50:59.123"),
+        Seq(2, 3, 4),
+        Map("a string" -> 2000L),
+        Row(4.75.toFloat, Seq(false, true)),
+        new MyDenseVector(Array(0.25, 2.25, 4.25)))
+    val data =
+      Row.fromSeq(Seq("Spark " + sqlContext.sparkContext.version) ++ 
constantValues) :: Nil
+
+    // Data generated by previous versions.
+    // scalastyle:off
+    val existingJSONData =
+      """{"col0":"Spark 
1.2.2","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01
 23:50:59.123","col14":[2,3,4],"col15":{"a 
string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}"""
 ::
+      """{"col0":"Spark 
1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01
 23:50:59.123","col14":[2,3,4],"col15":{"a 
string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}"""
 ::
+      """{"col0":"Spark 
1.3.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01
 23:50:59.123","col14":[2,3,4],"col15":{"a 
string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}"""
 ::
+      """{"col0":"Spark 
1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01
 23:50:59.123","col14":[2,3,4],"col15":{"a 
string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}"""
 ::
+      """{"col0":"Spark 
1.4.1","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01
 23:50:59.123","col14":[2,3,4],"col15":{"a 
string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}"""
 ::
+      """{"col0":"Spark 
1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"2015-01-01","col13":"2015-01-01
 23:50:59.123","col14":[2,3,4],"col15":{"a 
string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}"""
 ::
+      """{"col0":"Spark 
1.5.0","col1":"YSBzdHJpbmcgaW4gYmluYXJ5","col3":true,"col4":1,"col5":2,"col6":3,"col7":9223372036854775807,"col8":0.25,"col9":0.75,"col10":1234.23456,"col11":1.23456,"col12":"16436","col13":"2015-01-01
 23:50:59.123","col14":[2,3,4],"col15":{"a 
string":2000},"col16":{"f1":4.75,"f2":[false,true]},"col17":[0.25,2.25,4.25]}"""
 :: Nil
+    // scalastyle:on
+
+    // Generate data for the current version.
+    val df = 
sqlContext.createDataFrame(sqlContext.sparkContext.parallelize(data, 1), schema)
+    withTempPath { path =>
+      df.write.format("json").mode("overwrite").save(path.getCanonicalPath)
+
+      // df.toJSON will convert internal rows to external rows first and then 
generate
+      // JSON objects. While, df.write.format("json") will write internal rows 
directly.
+      val allJSON =
+        existingJSONData ++
+          df.toJSON.collect() ++
+          sparkContext.textFile(path.getCanonicalPath).collect()
+
+      Utils.deleteRecursively(path)
+      sparkContext.parallelize(allJSON, 
1).saveAsTextFile(path.getCanonicalPath)
+
+      // Read data back with the schema specified.
+      val col0Values =
+        Seq(
+          "Spark 1.2.2",
+          "Spark 1.3.1",
+          "Spark 1.3.1",
+          "Spark 1.4.1",
+          "Spark 1.4.1",
+          "Spark 1.5.0",
+          "Spark 1.5.0",
+          "Spark " + sqlContext.sparkContext.version,
+          "Spark " + sqlContext.sparkContext.version)
+      val expectedResult = col0Values.map { v =>
+        Row.fromSeq(Seq(v) ++ constantValues)
+      }
+      checkAnswer(
+        
sqlContext.read.format("json").schema(schema).load(path.getCanonicalPath),
+        expectedResult
+      )
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to