This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 4c65231 [SPARK-34144][SQL] Exception thrown when trying to write
LocalDate and Instant values to a JDBC relation
4c65231 is described below
commit 4c652315ec0c9b6e86cc475a92095b8c42f07115
Author: Chircu <[email protected]>
AuthorDate: Fri Jan 29 17:48:13 2021 +0900
[SPARK-34144][SQL] Exception thrown when trying to write LocalDate and
Instant values to a JDBC relation
### What changes were proposed in this pull request?
When writing rows to a table only the old date time API types are handled
in org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils#makeSetter. If the
new API is used (spark.sql.datetime.java8API.enabled=true) casting Instant and
LocalDate to Timestamp and Date respectively fails. The proposed change is to
handle Instant and LocalDate values and transform them to Timestamp and Date.
### Why are the changes needed?
In the current state writing Instant or LocalDate values to a table fails
with something like:
Caused by: java.lang.ClassCastException: class java.time.LocalDate cannot
be cast to class java.sql.Date (java.time.LocalDate is in module java.base of
loader 'bootstrap'; java.sql.Date is in module java.sql of loader 'platform')
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeSetter$11(JdbcUtils.scala:573)
at
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeSetter$11$adapted(JdbcUtils.scala:572)
at org.apache.spark.sql.execution.datas [...]
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added tests
Closes #31264 from cristichircu/SPARK-34144.
Lead-authored-by: Chircu <[email protected]>
Co-authored-by: Cristi Chircu <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 520e5d2ab8c25e99c6149fb752b18a1f65bd9fa0)
Signed-off-by: HyukjinKwon <[email protected]>
---
.../sql/execution/datasources/jdbc/JdbcUtils.scala | 21 ++++++--
.../org/apache/spark/sql/jdbc/JDBCWriteSuite.scala | 57 +++++++++++++++++++++-
2 files changed, 73 insertions(+), 5 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index f997e57..2b56beb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.jdbc
import java.sql.{Connection, Driver, JDBCType, PreparedStatement, ResultSet,
ResultSetMetaData, SQLException, SQLFeatureNotSupportedException}
+import java.time.{Instant, LocalDate}
import java.util.Locale
import scala.util.Try
@@ -33,8 +34,10 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils,
GenericArrayData}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros,
localDateToDays, toJavaDate, toJavaTimestamp}
import org.apache.spark.sql.connector.catalog.TableChange
import
org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects, JdbcType}
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
@@ -553,12 +556,22 @@ object JdbcUtils extends Logging {
stmt.setBytes(pos + 1, row.getAs[Array[Byte]](pos))
case TimestampType =>
- (stmt: PreparedStatement, row: Row, pos: Int) =>
- stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos))
+ if (SQLConf.get.datetimeJava8ApiEnabled) {
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setTimestamp(pos + 1,
toJavaTimestamp(instantToMicros(row.getAs[Instant](pos))))
+ } else {
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos))
+ }
case DateType =>
- (stmt: PreparedStatement, row: Row, pos: Int) =>
- stmt.setDate(pos + 1, row.getAs[java.sql.Date](pos))
+ if (SQLConf.get.datetimeJava8ApiEnabled) {
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setDate(pos + 1,
toJavaDate(localDateToDays(row.getAs[LocalDate](pos))))
+ } else {
+ (stmt: PreparedStatement, row: Row, pos: Int) =>
+ stmt.setDate(pos + 1, row.getAs[java.sql.Date](pos))
+ }
case t: DecimalType =>
(stmt: PreparedStatement, row: Row, pos: Int) =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 1a28523..efa2773 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql.jdbc
-import java.sql.DriverManager
+import java.sql.{Date, DriverManager, Timestamp}
+import java.time.{Instant, LocalDate}
import java.util.Properties
import scala.collection.JavaConverters.propertiesAsScalaMapConverter
@@ -81,6 +82,9 @@ class JDBCWriteSuite extends SharedSparkSession with
BeforeAndAfter {
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$url1', dbtable 'TEST.PEOPLE1', user 'testUser',
password 'testPass')
""".stripMargin.replaceAll("\n", " "))
+
+ conn1.prepareStatement("create table test.timetypes (d DATE, t
TIMESTAMP)").executeUpdate()
+ conn.commit()
}
after {
@@ -606,4 +610,55 @@ class JDBCWriteSuite extends SharedSparkSession with
BeforeAndAfter {
sparkContext.removeSparkListener(listener)
taskMetrics.sum
}
+
+ test("SPARK-34144: write and read java.time LocalDate and Instant") {
+ withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
+ val schema = new StructType().add("d", DateType).add("t", TimestampType);
+ val values = Seq(Row.apply(LocalDate.parse("2020-01-01"),
+ Instant.parse("2020-02-02T12:13:14.56789Z")))
+ val df = spark.createDataFrame(sparkContext.makeRDD(values), schema)
+
+ df.write.jdbc(url, "TEST.TIMETYPES", new Properties())
+
+ val rows = spark.read.jdbc(url, "TEST.TIMETYPES", new
Properties()).collect()
+ assert(1 === rows.length);
+ assert(rows(0).getAs[LocalDate](0) === LocalDate.parse("2020-01-01"))
+ assert(rows(0).getAs[Instant](1) ===
Instant.parse("2020-02-02T12:13:14.56789Z"))
+ }
+ }
+
+ test("SPARK-34144: write Date and Timestampt, read LocalDate and Instant") {
+ val schema = new StructType().add("d", DateType).add("t", TimestampType);
+ val values = Seq(Row.apply(Date.valueOf("2020-01-01"),
+ Timestamp.valueOf("2020-02-02 12:13:14.56789")))
+ val df = spark.createDataFrame(sparkContext.makeRDD(values), schema)
+
+ df.write.jdbc(url, "TEST.TIMETYPES", new Properties())
+
+ withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
+ val rows = spark.read.jdbc(url, "TEST.TIMETYPES", new
Properties()).collect()
+ assert(1 === rows.length);
+ assert(rows(0).getAs[LocalDate](0) === LocalDate.parse("2020-01-01"))
+ // 8 hour difference since Timestamp was America/Los_Angeles and Instant
is GMT
+ assert(rows(0).getAs[Instant](1) ===
Instant.parse("2020-02-02T20:13:14.56789Z"))
+ }
+ }
+
+ test("SPARK-34144: write LocalDate and Instant, read Date and Timestampt") {
+ withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true") {
+ val schema = new StructType().add("d", DateType).add("t", TimestampType);
+ val values = Seq(Row.apply(LocalDate.parse("2020-01-01"),
+ Instant.parse("2020-02-02T12:13:14.56789Z")))
+ val df = spark.createDataFrame(sparkContext.makeRDD(values), schema)
+
+ df.write.jdbc(url, "TEST.TIMETYPES", new Properties())
+ }
+
+ val rows = spark.read.jdbc(url, "TEST.TIMETYPES", new
Properties()).collect()
+ assert(1 === rows.length);
+ assert(rows(0).getAs[java.sql.Date](0) ===
java.sql.Date.valueOf("2020-01-01"))
+ // -8 hour difference since Instant was GMT and Timestamp is
America/Los_Angeles
+ assert(rows(0).getAs[java.sql.Timestamp](1)
+ === java.sql.Timestamp.valueOf("2020-02-02 04:13:14.56789"))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]