This is an automated email from the ASF dual-hosted git repository. jshao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push: new 92062e1 [LIVY-613][REPL] Livy can't handle the java.sql.Date type correctly. 92062e1 is described below commit 92062e1659db2af85711b1f35c50ff4050fec675 Author: yangping.wyp <yangping....@alibaba-inc.com> AuthorDate: Fri Jul 26 19:34:42 2019 +0800 [LIVY-613][REPL] Livy can't handle the java.sql.Date type correctly. ## What changes were proposed in this pull request? When Spark table has `java.sql.Date` type column, Livy can't handle the `java.sql.Date` type correctly. e.g ``` create table test( name string, birthday date ); insert into test values ('Livy', '2019-07-24') curl -H "Content-Type:application/json" -X POST -d '{"code":"select * from test", "kind":"sql"}' 192.168.1.6:8998/sessions/48/statements {"id":1,"code":"select * from test","state":"waiting","output":null,"progress":0.0} curl 192.168.1.6:8998/sessions/48/statements/1 {"id":1,"code":"select * from test","state":"available","output":{"status":"ok","execution_count":1,"data":{"application/json":{"schema":{"type":"struct","fields":[{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"birthday","type":"date","nullable":true,"metadata":{}}]},"data":[["Livy",{}]]}}},"progress":1.0} ``` as you can see, the output of `select * from test` is `["Livy",{}]` , birthday column's value isn't handle correctly. The reason is that json4s can't handle `java.sql.Date`, so we should define the `CustomSerializer` for `java.sql.Date`. This PR add a `DateSerializer` to support `java.sql.Date` parser. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review https://livy.incubator.apache.org/community/ before opening a pull request. Author: yangping.wyp <yangping....@alibaba-inc.com> Closes #186 from 397090770/master. --- .../org/apache/livy/repl/SQLInterpreter.scala | 11 +++- .../org/apache/livy/repl/SQLInterpreterSpec.scala | 69 +++++++++++++++++++++- 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala b/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala index 5a7b606..9abbf2c 100644 --- a/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala +++ b/repl/src/main/scala/org/apache/livy/repl/SQLInterpreter.scala @@ -18,6 +18,7 @@ package org.apache.livy.repl import java.lang.reflect.InvocationTargetException +import java.sql.Date import scala.util.control.NonFatal @@ -25,6 +26,7 @@ import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.json4s._ +import org.json4s.JsonAST.{JNull, JString} import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -66,7 +68,14 @@ class SQLInterpreter( rscConf: RSCConf, sparkEntries: SparkEntries) extends Interpreter with Logging { - private implicit def formats = DefaultFormats + case object DateSerializer extends CustomSerializer[Date](_ => ( { + case JString(s) => Date.valueOf(s) + case JNull => null + }, { + case d: Date => JString(d.toString) + })) + + private implicit def formats: Formats = DefaultFormats + DateSerializer private var spark: SparkSession = null diff --git a/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala b/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala index 37c9594..781ed72 100644 --- a/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala +++ b/repl/src/test/scala/org/apache/livy/repl/SQLInterpreterSpec.scala @@ -17,17 +17,20 @@ package org.apache.livy.repl +import java.sql.Date + import scala.util.Try import org.apache.spark.SparkConf import org.json4s.{DefaultFormats, JValue} -import org.json4s.JsonAST.JArray +import org.json4s.JsonAST.{JArray, JNull} import org.json4s.JsonDSL._ import org.apache.livy.rsc.RSCConf import org.apache.livy.rsc.driver.SparkEntries case class People(name: String, age: Int) +case class Person(name: String, birthday: Date) class SQLInterpreterSpec extends BaseInterpreterSpec { @@ -43,6 +46,70 @@ class SQLInterpreterSpec extends BaseInterpreterSpec { new SQLInterpreter(conf, new RSCConf(), sparkEntries) } + it should "handle java.sql.Date tpye" in withInterpreter { interpreter => + val personList = Seq(Person("Jerry", Date.valueOf("2019-07-24")), + Person("Michael", Date.valueOf("2019-07-23"))) + + val rdd = sparkEntries.sc().parallelize(personList) + val df = sparkEntries.sqlctx().createDataFrame(rdd) + df.createOrReplaceTempView("person") + + // Test normal behavior + val resp1 = interpreter.execute("SELECT * FROM person") + + val expectedResult = Interpreter.ExecuteSuccess( + APPLICATION_JSON -> (("schema" -> + (("type" -> "struct") ~ + ("fields" -> List( + ("name" -> "name") ~ ("type" -> "string") ~ ("nullable" -> true) ~ + ("metadata" -> List()), + ("name" -> "birthday") ~ ("type" -> "date") ~ ("nullable" -> true) ~ + ("metadata" -> List()) + )))) ~ + ("data" -> List( + List[JValue]("Jerry", "2019-07-24"), + List[JValue]("Michael", "2019-07-23") + ))) + ) + + val result = Try { resp1 should equal(expectedResult)} + if (result.isFailure) { + fail(s"$resp1 doesn't equal to expected result") + } + } + + it should "test java.sql.Date null" in withInterpreter { interpreter => + val personList = Seq(Person("Jerry", null), + Person("Michael", Date.valueOf("2019-07-23"))) + + val rdd = sparkEntries.sc().parallelize(personList) + val df = sparkEntries.sqlctx().createDataFrame(rdd) + df.createOrReplaceTempView("person") + + // Test normal behavior + val resp1 = interpreter.execute("SELECT * FROM person") + + val expectedResult = Interpreter.ExecuteSuccess( + APPLICATION_JSON -> (("schema" -> + (("type" -> "struct") ~ + ("fields" -> List( + ("name" -> "name") ~ ("type" -> "string") ~ ("nullable" -> true) ~ + ("metadata" -> List()), + ("name" -> "birthday") ~ ("type" -> "date") ~ ("nullable" -> true) ~ + ("metadata" -> List()) + )))) ~ + ("data" -> List( + List[JValue]("Jerry", JNull), + List[JValue]("Michael", "2019-07-23") + ))) + ) + + val result = Try { resp1 should equal(expectedResult)} + if (result.isFailure) { + fail(s"$resp1 doesn't equal to expected result") + } + } + it should "execute sql queries" in withInterpreter { interpreter => val rdd = sparkEntries.sc().parallelize(Seq(People("Jerry", 20), People("Michael", 21))) val df = sparkEntries.sqlctx().createDataFrame(rdd)