This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 4c65231  [SPARK-34144][SQL] Exception thrown when trying to write 
LocalDate and Instant values to a JDBC relation
4c65231 is described below

commit 4c652315ec0c9b6e86cc475a92095b8c42f07115
Author: Chircu <[email protected]>
AuthorDate: Fri Jan 29 17:48:13 2021 +0900

    [SPARK-34144][SQL] Exception thrown when trying to write LocalDate and 
Instant values to a JDBC relation
    
    ### What changes were proposed in this pull request?
    
    When writing rows to a table only the old date time API types are handled 
in org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils#makeSetter. If the 
new API is used (spark.sql.datetime.java8API.enabled=true) casting Instant and 
LocalDate to Timestamp and Date respectively fails. The proposed change is to 
handle Instant and LocalDate values and transform them to Timestamp and Date.
    
    ### Why are the changes needed?
    
    In the current state writing Instant or LocalDate values to a table fails 
with something like:
    Caused by: java.lang.ClassCastException: class java.time.LocalDate cannot 
be cast to class java.sql.Date (java.time.LocalDate is in module java.base of 
loader 'bootstrap'; java.sql.Date is in module java.sql of loader 'platform') 
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeSetter$11(JdbcUtils.scala:573)
 at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeSetter$11$adapted(JdbcUtils.scala:572)
 at org.apache.spark.sql.execution.datas [...]
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added tests
    
    Closes #31264 from cristichircu/SPARK-34144.
    
    Lead-authored-by: Chircu <[email protected]>
    Co-authored-by: Cristi Chircu <[email protected]>
    Signed-off-by: HyukjinKwon <[email protected]>
    (cherry picked from commit 520e5d2ab8c25e99c6149fb752b18a1f65bd9fa0)
    Signed-off-by: HyukjinKwon <[email protected]>
---
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 21 ++++++--
 .../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 57 +++++++++++++++++++++-
 2 files changed, 73 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index f997e57..2b56beb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.jdbc
 
 import java.sql.{Connection, Driver, JDBCType, PreparedStatement, ResultSet, 
ResultSetMetaData, SQLException, SQLFeatureNotSupportedException}
+import java.time.{Instant, LocalDate}
 import java.util.Locale
 
 import scala.util.Try
@@ -33,8 +34,10 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, 
GenericArrayData}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, 
localDateToDays, toJavaDate, toJavaTimestamp}
 import org.apache.spark.sql.connector.catalog.TableChange
 import 
org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.SchemaUtils
@@ -553,12 +556,22 @@ object JdbcUtils extends Logging {
         stmt.setBytes(pos + 1, row.getAs[Array[Byte]](pos))
 
     case TimestampType =>
-      (stmt: PreparedStatement, row: Row, pos: Int) =>
-        stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos))
+      if (SQLConf.get.datetimeJava8ApiEnabled) {
+        (stmt: PreparedStatement, row: Row, pos: Int) =>
+          stmt.setTimestamp(pos + 1, 
toJavaTimestamp(instantToMicros(row.getAs[Instant](pos))))
+      } else {
+        (stmt: PreparedStatement, row: Row, pos: Int) =>
+          stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos))
+      }
 
     case DateType =>
-      (stmt: PreparedStatement, row: Row, pos: Int) =>
-        stmt.setDate(pos + 1, row.getAs[java.sql.Date](pos))
+      if (SQLConf.get.datetimeJava8ApiEnabled) {
+        (stmt: PreparedStatement, row: Row, pos: Int) =>
+          stmt.setDate(pos + 1, 
toJavaDate(localDateToDays(row.getAs[LocalDate](pos))))
+      } else {
+        (stmt: PreparedStatement, row: Row, pos: Int) =>
+          stmt.setDate(pos + 1, row.getAs[java.sql.Date](pos))
+      }
 
     case t: DecimalType =>
       (stmt: PreparedStatement, row: Row, pos: Int) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 1a28523..efa2773 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.DriverManager
+import java.sql.{Date, DriverManager, Timestamp}
+import java.time.{Instant, LocalDate}
 import java.util.Properties
 
 import scala.collection.JavaConverters.propertiesAsScalaMapConverter
@@ -81,6 +82,9 @@ class JDBCWriteSuite extends SharedSparkSession with 
BeforeAndAfter {
         |USING org.apache.spark.sql.jdbc
         |OPTIONS (url '$url1', dbtable 'TEST.PEOPLE1', user 'testUser', 
password 'testPass')
       """.stripMargin.replaceAll("\n", " "))
+
+    conn1.prepareStatement("create table test.timetypes (d DATE, t 
TIMESTAMP)").executeUpdate()
+    conn.commit()
   }
 
   after {
@@ -606,4 +610,55 @@ class JDBCWriteSuite extends SharedSparkSession with 
BeforeAndAfter {
     sparkContext.removeSparkListener(listener)
     taskMetrics.sum
   }
+
+  test("SPARK-34144: write and read java.time LocalDate and Instant") {
+    withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
+      val schema = new StructType().add("d", DateType).add("t", TimestampType);
+      val values = Seq(Row.apply(LocalDate.parse("2020-01-01"),
+        Instant.parse("2020-02-02T12:13:14.56789Z")))
+      val df = spark.createDataFrame(sparkContext.makeRDD(values), schema)
+
+      df.write.jdbc(url, "TEST.TIMETYPES", new Properties())
+
+      val rows = spark.read.jdbc(url, "TEST.TIMETYPES", new 
Properties()).collect()
+      assert(1 === rows.length);
+      assert(rows(0).getAs[LocalDate](0) === LocalDate.parse("2020-01-01"))
+      assert(rows(0).getAs[Instant](1) === 
Instant.parse("2020-02-02T12:13:14.56789Z"))
+    }
+  }
+
+  test("SPARK-34144: write Date and Timestampt, read LocalDate and Instant") {
+    val schema = new StructType().add("d", DateType).add("t", TimestampType);
+    val values = Seq(Row.apply(Date.valueOf("2020-01-01"),
+      Timestamp.valueOf("2020-02-02 12:13:14.56789")))
+    val df = spark.createDataFrame(sparkContext.makeRDD(values), schema)
+
+    df.write.jdbc(url, "TEST.TIMETYPES", new Properties())
+
+    withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
+      val rows = spark.read.jdbc(url, "TEST.TIMETYPES", new 
Properties()).collect()
+      assert(1 === rows.length);
+      assert(rows(0).getAs[LocalDate](0) === LocalDate.parse("2020-01-01"))
+      // 8 hour difference since Timestamp was America/Los_Angeles and Instant 
is GMT
+      assert(rows(0).getAs[Instant](1) === 
Instant.parse("2020-02-02T20:13:14.56789Z"))
+    }
+  }
+
+  test("SPARK-34144: write LocalDate and Instant, read Date and Timestampt") {
+    withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
+      val schema = new StructType().add("d", DateType).add("t", TimestampType);
+      val values = Seq(Row.apply(LocalDate.parse("2020-01-01"),
+        Instant.parse("2020-02-02T12:13:14.56789Z")))
+      val df = spark.createDataFrame(sparkContext.makeRDD(values), schema)
+
+      df.write.jdbc(url, "TEST.TIMETYPES", new Properties())
+    }
+
+    val rows = spark.read.jdbc(url, "TEST.TIMETYPES", new 
Properties()).collect()
+    assert(1 === rows.length);
+    assert(rows(0).getAs[java.sql.Date](0) === 
java.sql.Date.valueOf("2020-01-01"))
+    // -8 hour difference since Instant was GMT and Timestamp is 
America/Los_Angeles
+    assert(rows(0).getAs[java.sql.Timestamp](1)
+      === java.sql.Timestamp.valueOf("2020-02-02 04:13:14.56789"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to