This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2349175e1b8 [SPARK-39339][SQL] Support TimestampNTZ type in JDBC data 
source
2349175e1b8 is described below

commit 2349175e1b81b0a61e1ed90c2d051c01cf78de9b
Author: Ivan Sadikov <[email protected]>
AuthorDate: Mon Jun 13 21:22:15 2022 -0700

    [SPARK-39339][SQL] Support TimestampNTZ type in JDBC data source
    
    ### What changes were proposed in this pull request?
    
    This PR adds support for TimestampNTZ (TIMESTAMP WITHOUT TIME ZONE) in JDBC 
data source. It also introduces a new configuration option 
`inferTimestampNTZType` which allows to read written timestamps as timestamp 
without time zone. By default this is set to `false`, i.e. all timestamps are 
read as legacy timestamp type.
    
    Here is the state of timestamp without time zone support in the built-in 
dialects:
    - H2: timestamp without time zone, seems to map to timestamp type
    - Derby: only has timestamp type
    - MySQL: only has timestamp type
    - Postgres: has timestamp without time zone, which maps to timestamp
    - SQL Server: only datetime/datetime2, neither are time zone aware
    - Oracle: seems to only have timestamp and timestamp with time zone
    - Teradata: similar to Oracle but I could not verify
    - DB2: has TIMESTAMP WITHOUT TIME ZONE but I could not make this type work 
in my test, only TIMESTAMP seems to work
    
    ### Why are the changes needed?
    
    Adds support for the new TimestampNTZ type, see 
https://issues.apache.org/jira/browse/SPARK-35662.
    
    ### Does this PR introduce _any_ user-facing change?
    
    JDBC data source is now capable of writing and reading TimestampNTZ types. 
When reading timestamp values, configuration option `inferTimestampNTZType` 
allows to infer those values as TIMESTAMP WITHOUT TIME ZONE. By default the 
option is set to `false` so the behaviour is unchanged and all timestamps are 
read TIMESTAMP WITH LOCAL TIME ZONE.
    
    ### How was this patch tested?
    
    I added a unit test to ensure the general functionality works. I also 
manually verified the write/read test for TimestampNTZ in the following 
databases (all I could get access to):
    - H2, `jdbc:h2:mem:testdb0`
    - Derby, `jdbc:derby:<filepath>`
    - MySQL, `docker run --name mysql -e MYSQL_ROOT_PASSWORD=secret -e 
MYSQL_DATABASE=db -e MYSQL_USER=user -e MYSQL_PASSWORD=secret -p 3306:3306 -d 
mysql:5.7`, `jdbc:mysql://127.0.0.1:3306/db?user=user&password=secret`
    - PostgreSQL, `docker run -d --name postgres -e POSTGRES_PASSWORD=secret -e 
POSTGRES_USER=user -e POSTGRES_DB=db -p 5432:5432 postgres:12.11`, 
`jdbc:postgresql://127.0.0.1:5432/db?user=user&password=secret`
    - SQL Server, `docker run -e "ACCEPT_EULA=Y" -e 
SA_PASSWORD='yourStrong(!)Password' -p 1433:1433 -d 
mcr.microsoft.com/mssql/server:2019-CU15-ubuntu-20.04`, 
`jdbc:sqlserver://127.0.0.1:1433;user=sa;password=yourStrong(!)Password`
    - DB2, ` docker run -itd --name mydb2 --privileged=true -p 50000:50000 -e 
LICENSE=accept -e DB2INST1_PASSWORD=secret -e DBNAME=db ibmcom/db2`, 
`jdbc:db2://127.0.0.1:50000/db:user=db2inst1;password=secret;`.
    
    Closes #36726 from sadikovi/timestamp_ntz_jdbc.
    
    Authored-by: Ivan Sadikov <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 docs/sql-data-sources-jdbc.md                      | 15 ++++--
 .../execution/datasources/jdbc/JDBCOptions.scala   |  4 ++
 .../sql/execution/datasources/jdbc/JDBCRDD.scala   |  3 +-
 .../sql/execution/datasources/jdbc/JdbcUtils.scala | 34 +++++++++++---
 .../apache/spark/sql/jdbc/MsSqlServerDialect.scala |  1 +
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala      | 53 +++++++++++++++++++++-
 6 files changed, 98 insertions(+), 12 deletions(-)

diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md
index 3b83bf5bc14..dec7bc36116 100644
--- a/docs/sql-data-sources-jdbc.md
+++ b/docs/sql-data-sources-jdbc.md
@@ -103,7 +103,7 @@ logging into the data sources.
     <td>(none)</td>
     <td>
       A prefix that will form the final query together with <code>query</code>.
-      As the specified <code>query</code> will be parenthesized as a subquery 
in the <code>FROM</code> clause and some databases do not 
+      As the specified <code>query</code> will be parenthesized as a subquery 
in the <code>FROM</code> clause and some databases do not
       support all clauses in subqueries, the <code>prepareQuery</code> 
property offers a way to run such complex queries.
       As an example, spark will issue a query of the following form to the 
JDBC Source.<br><br>
       <code>&lt;prepareQuery&gt; SELECT &lt;columns&gt; FROM 
(&lt;user_specified_query&gt;) spark_gen_alias</code><br><br>
@@ -340,10 +340,19 @@ logging into the data sources.
     <td>
       The name of the JDBC connection provider to use to connect to this URL, 
e.g. <code>db2</code>, <code>mssql</code>.
       Must be one of the providers loaded with the JDBC data source. Used to 
disambiguate when more than one provider can handle
-      the specified driver and options. The selected provider must not be 
disabled by <code>spark.sql.sources.disabledJdbcConnProviderList</code>. 
+      the specified driver and options. The selected provider must not be 
disabled by <code>spark.sql.sources.disabledJdbcConnProviderList</code>.
     </td>
     <td>read/write</td>
- </tr>  
+  </tr>
+  <tr>
+    <td><code>inferTimestampNTZType</code></td>
+    <td>false</td>
+    <td>
+      When the option is set to <code>true</code>, all timestamps are inferred 
as TIMESTAMP WITHOUT TIME ZONE.
+      Otherwise, timestamps are read as TIMESTAMP with local time zone.
+    </td>
+    <td>read</td>
+  </tr>
 </table>
 
 Note that kerberos authentication with keytab is not always supported by the 
JDBC driver.<br>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index df21a9820f9..80675c7dc47 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -226,6 +226,9 @@ class JDBCOptions(
   // The prefix that is added to the query sent to the JDBC database.
   // This is required to support some complex queries with some JDBC databases.
   val prepareQuery = parameters.get(JDBC_PREPARE_QUERY).map(_ + " 
").getOrElse("")
+
+  // Infers timestamp values as TimestampNTZ type when reading data.
+  val inferTimestampNTZType = parameters.getOrElse(JDBC_INFER_TIMESTAMP_NTZ, 
"false").toBoolean
 }
 
 class JdbcOptionsInWrite(
@@ -287,4 +290,5 @@ object JDBCOptions {
   val JDBC_REFRESH_KRB5_CONFIG = newOption("refreshKrb5Config")
   val JDBC_CONNECTION_PROVIDER = newOption("connectionProvider")
   val JDBC_PREPARE_QUERY = newOption("prepareQuery")
+  val JDBC_INFER_TIMESTAMP_NTZ = newOption("inferTimestampNTZType")
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index 27d2d9c84c3..e95fe280c76 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -67,7 +67,8 @@ object JDBCRDD extends Logging {
         statement.setQueryTimeout(options.queryTimeout)
         val rs = statement.executeQuery()
         try {
-          JdbcUtils.getSchema(rs, dialect, alwaysNullable = true)
+          JdbcUtils.getSchema(rs, dialect, alwaysNullable = true,
+            isTimestampNTZ = options.inferTimestampNTZType)
         } finally {
           rs.close()
         }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 1f17d4f0b14..cc8746ea5c4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils, 
GenericArrayData}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, 
localDateToDays, toJavaDate, toJavaTimestamp}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils.{instantToMicros, 
localDateTimeToMicros, localDateToDays, toJavaDate, toJavaTimestamp}
 import org.apache.spark.sql.connector.catalog.TableChange
 import org.apache.spark.sql.connector.catalog.index.{SupportsIndex, TableIndex}
 import org.apache.spark.sql.connector.expressions.NamedReference
@@ -150,6 +150,10 @@ object JdbcUtils extends Logging with SQLConfHelper {
       case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB))
       case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB))
       case TimestampType => Option(JdbcType("TIMESTAMP", 
java.sql.Types.TIMESTAMP))
+      // This is a common case of timestamp without time zone. Most of the 
databases either only
+      // support TIMESTAMP type or use TIMESTAMP as an alias for TIMESTAMP 
WITHOUT TIME ZONE.
+      // Note that some dialects override this setting, e.g. as SQL Server.
+      case TimestampNTZType => Option(JdbcType("TIMESTAMP", 
java.sql.Types.TIMESTAMP))
       case DateType => Option(JdbcType("DATE", java.sql.Types.DATE))
       case t: DecimalType => Option(
         JdbcType(s"DECIMAL(${t.precision},${t.scale})", 
java.sql.Types.DECIMAL))
@@ -173,7 +177,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
       sqlType: Int,
       precision: Int,
       scale: Int,
-      signed: Boolean): DataType = {
+      signed: Boolean,
+      isTimestampNTZ: Boolean): DataType = {
     val answer = sqlType match {
       // scalastyle:off
       case java.sql.Types.ARRAY         => null
@@ -215,6 +220,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
       case java.sql.Types.TIME          => TimestampType
       case java.sql.Types.TIME_WITH_TIMEZONE
                                         => null
+      case java.sql.Types.TIMESTAMP
+        if isTimestampNTZ               => TimestampNTZType
       case java.sql.Types.TIMESTAMP     => TimestampType
       case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
                                         => null
@@ -243,7 +250,8 @@ object JdbcUtils extends Logging with SQLConfHelper {
         conn.prepareStatement(options.prepareQuery + 
dialect.getSchemaQuery(options.tableOrQuery))
       try {
         statement.setQueryTimeout(options.queryTimeout)
-        Some(getSchema(statement.executeQuery(), dialect))
+        Some(getSchema(statement.executeQuery(), dialect,
+          isTimestampNTZ = options.inferTimestampNTZType))
       } catch {
         case _: SQLException => None
       } finally {
@@ -258,13 +266,15 @@ object JdbcUtils extends Logging with SQLConfHelper {
    * Takes a [[ResultSet]] and returns its Catalyst schema.
    *
    * @param alwaysNullable If true, all the columns are nullable.
+   * @param isTimestampNTZ If true, all timestamp columns are interpreted as 
TIMESTAMP_NTZ.
    * @return A [[StructType]] giving the Catalyst schema.
    * @throws SQLException if the schema contains an unsupported type.
    */
   def getSchema(
       resultSet: ResultSet,
       dialect: JdbcDialect,
-      alwaysNullable: Boolean = false): StructType = {
+      alwaysNullable: Boolean = false,
+      isTimestampNTZ: Boolean = false): StructType = {
     val rsmd = resultSet.getMetaData
     val ncols = rsmd.getColumnCount
     val fields = new Array[StructField](ncols)
@@ -306,7 +316,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
 
       val columnType =
         dialect.getCatalystType(dataType, typeName, fieldSize, 
metadata).getOrElse(
-          getCatalystType(dataType, fieldSize, fieldScale, isSigned))
+          getCatalystType(dataType, fieldSize, fieldScale, isSigned, 
isTimestampNTZ))
       fields(i) = StructField(columnName, columnType, nullable, 
metadata.build())
       i = i + 1
     }
@@ -463,7 +473,7 @@ object JdbcUtils extends Logging with SQLConfHelper {
         }
       }
 
-    case TimestampType =>
+    case TimestampType | TimestampNTZType =>
       (rs: ResultSet, row: InternalRow, pos: Int) =>
         val t = rs.getTimestamp(pos + 1)
         if (t != null) {
@@ -583,6 +593,18 @@ object JdbcUtils extends Logging with SQLConfHelper {
           stmt.setTimestamp(pos + 1, row.getAs[java.sql.Timestamp](pos))
       }
 
+    case TimestampNTZType =>
+      if (conf.datetimeJava8ApiEnabled) {
+        (stmt: PreparedStatement, row: Row, pos: Int) =>
+          stmt.setTimestamp(pos + 1, 
toJavaTimestamp(instantToMicros(row.getAs[Instant](pos))))
+      } else {
+        (stmt: PreparedStatement, row: Row, pos: Int) =>
+          stmt.setTimestamp(
+            pos + 1,
+            
toJavaTimestamp(localDateTimeToMicros(row.getAs[java.time.LocalDateTime](pos)))
+          )
+      }
+
     case DateType =>
       if (conf.datetimeJava8ApiEnabled) {
         (stmt: PreparedStatement, row: Row, pos: Int) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
index a42129dbe8d..c95489a2876 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MsSqlServerDialect.scala
@@ -98,6 +98,7 @@ private object MsSqlServerDialect extends JdbcDialect {
 
   override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
     case TimestampType => Some(JdbcType("DATETIME", java.sql.Types.TIMESTAMP))
+    case TimestampNTZType => Some(JdbcType("DATETIME", 
java.sql.Types.TIMESTAMP))
     case StringType => Some(JdbcType("NVARCHAR(MAX)", java.sql.Types.NVARCHAR))
     case BooleanType => Some(JdbcType("BIT", java.sql.Types.BIT))
     case BinaryType => Some(JdbcType("VARBINARY(MAX)", 
java.sql.Types.VARBINARY))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index c96b27ee7f3..a07ef5ecd30 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.jdbc
 
 import java.math.BigDecimal
 import java.sql.{Date, DriverManager, SQLException, Timestamp}
-import java.time.{Instant, LocalDate}
+import java.time.{Instant, LocalDate, LocalDateTime}
 import java.util.{Calendar, GregorianCalendar, Properties, TimeZone}
 
 import scala.collection.JavaConverters._
@@ -1230,6 +1230,7 @@ class JDBCSuite extends QueryTest
     assert(getJdbcType(oracleDialect, BinaryType) == "BLOB")
     assert(getJdbcType(oracleDialect, DateType) == "DATE")
     assert(getJdbcType(oracleDialect, TimestampType) == "TIMESTAMP")
+    assert(getJdbcType(oracleDialect, TimestampNTZType) == "TIMESTAMP")
   }
 
   private def assertEmptyQuery(sqlString: String): Unit = {
@@ -1879,5 +1880,53 @@ class JDBCSuite extends QueryTest
     val fields = schema.fields
     assert(fields.length === 1)
     assert(fields(0).dataType === StringType)
-   }
+  }
+
+  test("SPARK-39339: Handle TimestampNTZType null values") {
+    val tableName = "timestamp_ntz_null_table"
+
+    val df = Seq(null.asInstanceOf[LocalDateTime]).toDF("col1")
+
+    df.write.format("jdbc")
+      .option("url", urlWithUserAndPass)
+      .option("dbtable", tableName).save()
+
+    val res = spark.read.format("jdbc")
+      .option("inferTimestampNTZType", "true")
+      .option("url", urlWithUserAndPass)
+      .option("dbtable", tableName)
+      .load()
+
+    checkAnswer(res, Seq(Row(null)))
+  }
+
+  test("SPARK-39339: TimestampNTZType with different local time zones") {
+    val tableName = "timestamp_ntz_diff_tz_support_table"
+
+    DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
+      DateTimeTestUtils.withDefaultTimeZone(zoneId) {
+        Seq(
+          "1972-07-04 03:30:00",
+          "2019-01-20 12:00:00.502",
+          "2019-01-20T00:00:00.123456",
+          "1500-01-20T00:00:00.123456"
+        ).foreach { case datetime =>
+          val df = spark.sql(s"select timestamp_ntz '$datetime'")
+          df.write.format("jdbc")
+            .mode("overwrite")
+            .option("url", urlWithUserAndPass)
+            .option("dbtable", tableName)
+            .save()
+
+          val res = spark.read.format("jdbc")
+            .option("inferTimestampNTZType", "true")
+            .option("url", urlWithUserAndPass)
+            .option("dbtable", tableName)
+            .load()
+
+          checkAnswer(res, df)
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to