This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 92062e1  [LIVY-613][REPL] Livy can't handle the java.sql.Date type 
correctly.
92062e1 is described below

commit 92062e1659db2af85711b1f35c50ff4050fec675
Author: yangping.wyp <yangping....@alibaba-inc.com>
AuthorDate: Fri Jul 26 19:34:42 2019 +0800

    [LIVY-613][REPL] Livy can't handle the java.sql.Date type correctly.
    
    ## What changes were proposed in this pull request?
    
    When Spark table has `java.sql.Date` type column, Livy can't handle the 
`java.sql.Date` type correctly. e.g
    ```
    create table test(
        name string,
        birthday date
    );
    
    insert into test values ('Livy', '2019-07-24')
    
    curl -H "Content-Type:application/json" -X POST -d '{"code":"select * from 
test", "kind":"sql"}' 192.168.1.6:8998/sessions/48/statements
    {"id":1,"code":"select * from 
test","state":"waiting","output":null,"progress":0.0}
    
    curl 192.168.1.6:8998/sessions/48/statements/1
    {"id":1,"code":"select * from 
test","state":"available","output":{"status":"ok","execution_count":1,"data":{"application/json":{"schema":{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"birthday","type":"date","nullable":true,"metadata":{}}]},"data":[["Livy",{}]]}}},"progress":1.0}
    ```
    as you can see, the output of `select * from test` is `["Livy",{}]` , 
birthday column's value isn't handle  correctly.
    
    The reason is that json4s can't handle `java.sql.Date`, so we should define 
the `CustomSerializer` for `java.sql.Date`.
    
    This PR  add a `DateSerializer` to support `java.sql.Date` parser.
    
    ## How was this patch tested?
    
    (Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
    
    Please review https://livy.incubator.apache.org/community/ before opening a 
pull request.
    
    Author: yangping.wyp <yangping....@alibaba-inc.com>
    
    Closes #186 from 397090770/master.
---
 .../org/apache/livy/repl/SQLInterpreter.scala      | 11 +++-
 .../org/apache/livy/repl/SQLInterpreterSpec.scala  | 69 +++++++++++++++++++++-
 2 files changed, 78 insertions(+), 2 deletions(-)

diff --git a/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala 
b/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala
index 5a7b606..9abbf2c 100644
--- a/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala
+++ b/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala
@@ -18,6 +18,7 @@
 package org.apache.livy.repl
 
 import java.lang.reflect.InvocationTargetException
+import java.sql.Date
 
 import scala.util.control.NonFatal
 
@@ -25,6 +26,7 @@ import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.SparkSession
 import org.json4s._
+import org.json4s.JsonAST.{JNull, JString}
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
 
@@ -66,7 +68,14 @@ class SQLInterpreter(
     rscConf: RSCConf,
     sparkEntries: SparkEntries) extends Interpreter with Logging {
 
-  private implicit def formats = DefaultFormats
+  case object DateSerializer extends CustomSerializer[Date](_ => ( {
+    case JString(s) => Date.valueOf(s)
+    case JNull => null
+  }, {
+    case d: Date => JString(d.toString)
+  }))
+
+  private implicit def formats: Formats = DefaultFormats + DateSerializer
 
   private var spark: SparkSession = null
 
diff --git a/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala 
b/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala
index 37c9594..781ed72 100644
--- a/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala
+++ b/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala
@@ -17,17 +17,20 @@
 
 package org.apache.livy.repl
 
+import java.sql.Date
+
 import scala.util.Try
 
 import org.apache.spark.SparkConf
 import org.json4s.{DefaultFormats, JValue}
-import org.json4s.JsonAST.JArray
+import org.json4s.JsonAST.{JArray, JNull}
 import org.json4s.JsonDSL._
 
 import org.apache.livy.rsc.RSCConf
 import org.apache.livy.rsc.driver.SparkEntries
 
 case class People(name: String, age: Int)
+case class Person(name: String, birthday: Date)
 
 class SQLInterpreterSpec extends BaseInterpreterSpec {
 
@@ -43,6 +46,70 @@ class SQLInterpreterSpec extends BaseInterpreterSpec {
     new SQLInterpreter(conf, new RSCConf(), sparkEntries)
   }
 
+  it should "handle java.sql.Date tpye" in withInterpreter { interpreter =>
+    val personList = Seq(Person("Jerry", Date.valueOf("2019-07-24")),
+      Person("Michael", Date.valueOf("2019-07-23")))
+
+    val rdd = sparkEntries.sc().parallelize(personList)
+    val df = sparkEntries.sqlctx().createDataFrame(rdd)
+    df.createOrReplaceTempView("person")
+
+    // Test normal behavior
+    val resp1 = interpreter.execute("SELECT * FROM person")
+
+    val expectedResult = Interpreter.ExecuteSuccess(
+      APPLICATION_JSON -> (("schema" ->
+        (("type" -> "struct") ~
+          ("fields" -> List(
+            ("name" -> "name") ~ ("type" -> "string") ~ ("nullable" -> true) ~
+              ("metadata" -> List()),
+            ("name" -> "birthday") ~ ("type" -> "date") ~ ("nullable" -> true) 
~
+              ("metadata" -> List())
+          )))) ~
+        ("data" -> List(
+          List[JValue]("Jerry", "2019-07-24"),
+          List[JValue]("Michael", "2019-07-23")
+        )))
+    )
+
+    val result = Try { resp1 should equal(expectedResult)}
+    if (result.isFailure) {
+      fail(s"$resp1 doesn't equal to expected result")
+    }
+  }
+
+  it should "test java.sql.Date null" in withInterpreter { interpreter =>
+    val personList = Seq(Person("Jerry", null),
+      Person("Michael", Date.valueOf("2019-07-23")))
+
+    val rdd = sparkEntries.sc().parallelize(personList)
+    val df = sparkEntries.sqlctx().createDataFrame(rdd)
+    df.createOrReplaceTempView("person")
+
+    // Test normal behavior
+    val resp1 = interpreter.execute("SELECT * FROM person")
+
+    val expectedResult = Interpreter.ExecuteSuccess(
+      APPLICATION_JSON -> (("schema" ->
+        (("type" -> "struct") ~
+          ("fields" -> List(
+            ("name" -> "name") ~ ("type" -> "string") ~ ("nullable" -> true) ~
+              ("metadata" -> List()),
+            ("name" -> "birthday") ~ ("type" -> "date") ~ ("nullable" -> true) 
~
+              ("metadata" -> List())
+          )))) ~
+        ("data" -> List(
+          List[JValue]("Jerry", JNull),
+          List[JValue]("Michael", "2019-07-23")
+        )))
+    )
+
+    val result = Try { resp1 should equal(expectedResult)}
+    if (result.isFailure) {
+      fail(s"$resp1 doesn't equal to expected result")
+    }
+  }
+
   it should "execute sql queries" in withInterpreter { interpreter =>
     val rdd = sparkEntries.sc().parallelize(Seq(People("Jerry", 20), 
People("Michael", 21)))
     val df = sparkEntries.sqlctx().createDataFrame(rdd)

Reply via email to