This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ce150735bc5 [SPARK-47394][SQL] Support TIMESTAMP WITH TIME ZONE for 
H2Dialect
5ce150735bc5 is described below

commit 5ce150735bc57f482f18fa5a04d16caae0e24041
Author: Kent Yao <y...@apache.org>
AuthorDate: Thu Mar 14 07:49:41 2024 -0700

    [SPARK-47394][SQL] Support TIMESTAMP WITH TIME ZONE for H2Dialect
    
    ### What changes were proposed in this pull request?
    
    Following the guidelines of SPARK-47375, this PR supports TIMESTAMP WITH 
TIME ZONE for H2Dialect and maps it to TimestampType regardless of the option 
`preferTimestampNTZ`
    
    https://www.h2database.com/html/datatypes.html#timestamp_with_time_zone_type
    
    ### Why are the changes needed?
    
    H2Dialect improvement, we currently don't have a default mapping for 
`java.sql.Types.TIME_WITH_TIMEZONE, TIMESTAMP_WITH_TIMEZONE`
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #45516 from yaooqinn/SPARK-47394.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../scala/org/apache/spark/sql/jdbc/H2Dialect.scala    |  3 ++-
 .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala    | 18 +++++++++++-------
 2 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 74eca7e48577..f4a1650b3e8c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -35,7 +35,7 @@ import 
org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.connector.catalog.index.TableIndex
 import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, 
NamedReference}
 import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
-import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, 
DecimalType, MetadataBuilder, ShortType, StringType}
+import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, 
DecimalType, MetadataBuilder, ShortType, StringType, TimestampType}
 
 private[sql] object H2Dialect extends JdbcDialect {
   override def canHandle(url: String): Boolean =
@@ -68,6 +68,7 @@ private[sql] object H2Dialect extends JdbcDialect {
         val scale = if (null != md) md.build().getLong("scale") else 0L
         val selectedScale = (DecimalType.MAX_PRECISION * (scale.toDouble / 
size.toDouble)).toInt
         Option(DecimalType(DecimalType.MAX_PRECISION, selectedScale))
+      case Types.TIMESTAMP_WITH_TIMEZONE | Types.TIME_WITH_TIMEZONE => 
Some(TimestampType)
       case _ => None
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index b8ca70e0b175..8f286eaa2c54 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -1467,13 +1467,6 @@ class JDBCSuite extends QueryTest with 
SharedSparkSession {
   }
 
   test("unsupported types") {
-    checkError(
-      exception = intercept[SparkSQLException] {
-        spark.read.jdbc(urlWithUserAndPass, "TEST.TIMEZONE", new 
Properties()).collect()
-      },
-      errorClass = "UNRECOGNIZED_SQL_TYPE",
-      parameters =
-        Map("typeName" -> "TIMESTAMP WITH TIME ZONE", "jdbcType" -> 
"TIMESTAMP_WITH_TIMEZONE"))
     checkError(
       exception = intercept[SparkSQLException] {
         spark.read.jdbc(urlWithUserAndPass, "TEST.ARRAY_TABLE", new 
Properties()).collect()
@@ -1482,6 +1475,17 @@ class JDBCSuite extends QueryTest with 
SharedSparkSession {
       parameters = Map("typeName" -> "INTEGER ARRAY", "jdbcType" -> "ARRAY"))
   }
 
+
+  test("SPARK-47394: Convert TIMESTAMP WITH TIME ZONE to TimestampType") {
+    Seq(true, false).foreach { prefer =>
+      val df = spark.read
+        .option("preferTimestampNTZ", prefer)
+        .jdbc(urlWithUserAndPass, "TEST.TIMEZONE", new Properties())
+      val expected = sql("select timestamp'1999-01-08 04:05:06.543544-08:00'")
+      checkAnswer(df, expected)
+    }
+  }
+
   test("SPARK-19318: Connection properties keys should be case-sensitive.") {
     def testJdbcOptions(options: JDBCOptions): Unit = {
       // Spark JDBC data source options are case-insensitive


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to