This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a48365dd98c9 [SPARK-48387][SQL] Postgres: Map TimestampType to 
TIMESTAMP WITH TIME ZONE
a48365dd98c9 is described below

commit a48365dd98c9e52b5648d1cc0af203a7290cb1dc
Author: Kent Yao <y...@apache.org>
AuthorDate: Thu May 23 10:27:16 2024 +0800

    [SPARK-48387][SQL] Postgres: Map TimestampType to TIMESTAMP WITH TIME ZONE
    
    ### What changes were proposed in this pull request?
    
    Currently, Both TimestampType/TimestampNTZType are mapped to TIMESTAMP 
WITHOUT TIME ZONE for writing while being differentiated for reading.
    
    In this PR, we map TimestampType to TIMESTAMP WITH TIME ZONE to 
differentiate TimestampType/TimestampNTZType for writing against Postgres.
    
    ### Why are the changes needed?
    
    TimestampType <-> TIMESTAMP WITHOUT TIME ZONE is incorrect and ambiguous 
with TimestampNTZType
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes
    migration guide and legacy configuration provided
    
    ### How was this patch tested?
    
    new tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #46701 from yaooqinn/SPARK-48387.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../spark/sql/jdbc/PostgresIntegrationSuite.scala  | 46 ++++++++++++++++++++++
 docs/sql-data-sources-jdbc.md                      |  4 +-
 docs/sql-migration-guide.md                        |  3 +-
 .../org/apache/spark/sql/internal/SQLConf.scala    | 14 +++++++
 .../apache/spark/sql/jdbc/PostgresDialect.scala    |  6 ++-
 5 files changed, 68 insertions(+), 5 deletions(-)

diff --git 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
index dd6f1bfd3b3f..5ad4f15216b7 100644
--- 
a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
+++ 
b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/PostgresIntegrationSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.{Column, DataFrame, Row}
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.tags.DockerTest
 
@@ -583,4 +584,49 @@ class PostgresIntegrationSuite extends 
DockerJDBCIntegrationSuite {
       assert(cause.getSQLState === "22003")
     }
   }
+
+  test("SPARK-48387: Timestamp write as timestamp with time zone") {
+    val df = spark.sql("select TIMESTAMP '2018-11-17 13:33:33' as col0")
+    // write timestamps for preparation
+    withSQLConf(SQLConf.LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED.key -> 
"false") {
+      // write timestamp as timestamp with time zone
+      df.write.jdbc(jdbcUrl, "ts_with_timezone_copy_false", new Properties)
+    }
+    withSQLConf(SQLConf.LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED.key -> 
"true") {
+      // write timestamp as timestamp without time zone
+      df.write.jdbc(jdbcUrl, "ts_with_timezone_copy_true", new Properties)
+    }
+
+    // read timestamps for test
+    withSQLConf(SQLConf.LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED.key -> 
"true") {
+      val df1 = spark.read.option("preferTimestampNTZ", false)
+        .jdbc(jdbcUrl, "ts_with_timezone_copy_false", new Properties)
+      checkAnswer(df1, Row(Timestamp.valueOf("2018-11-17 13:33:33")))
+      val df2 = spark.read.option("preferTimestampNTZ", true)
+        .jdbc(jdbcUrl, "ts_with_timezone_copy_false", new Properties)
+      checkAnswer(df2, Row(LocalDateTime.of(2018, 11, 17, 13, 33, 33)))
+
+      val df3 = spark.read.option("preferTimestampNTZ", false)
+        .jdbc(jdbcUrl, "ts_with_timezone_copy_true", new Properties)
+      checkAnswer(df3, Row(Timestamp.valueOf("2018-11-17 13:33:33")))
+      val df4 = spark.read.option("preferTimestampNTZ", true)
+        .jdbc(jdbcUrl, "ts_with_timezone_copy_true", new Properties)
+      checkAnswer(df4, Row(LocalDateTime.of(2018, 11, 17, 13, 33, 33)))
+    }
+    withSQLConf(SQLConf.LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED.key -> 
"false") {
+      Seq("true", "false").foreach { prefer =>
+        val prop = new Properties
+        prop.setProperty("preferTimestampNTZ", prefer)
+        val dfCopy = spark.read.jdbc(jdbcUrl, "ts_with_timezone_copy_false", 
prop)
+        checkAnswer(dfCopy, Row(Timestamp.valueOf("2018-11-17 13:33:33")))
+      }
+
+      val df5 = spark.read.option("preferTimestampNTZ", false)
+        .jdbc(jdbcUrl, "ts_with_timezone_copy_true", new Properties)
+      checkAnswer(df5, Row(Timestamp.valueOf("2018-11-17 13:33:33")))
+      val df6 = spark.read.option("preferTimestampNTZ", true)
+        .jdbc(jdbcUrl, "ts_with_timezone_copy_true", new Properties)
+      checkAnswer(df6, Row(LocalDateTime.of(2018, 11, 17, 13, 33, 33)))
+    }
+  }
 }
diff --git a/docs/sql-data-sources-jdbc.md b/docs/sql-data-sources-jdbc.md
index 54a8506bff51..371dc0595071 100644
--- a/docs/sql-data-sources-jdbc.md
+++ b/docs/sql-data-sources-jdbc.md
@@ -1074,8 +1074,8 @@ the [PostgreSQL JDBC 
Driver](https://mvnrepository.com/artifact/org.postgresql/p
     </tr>
     <tr>
       <td>TimestampType</td>
-      <td>timestamp</td>
-      <td></td>
+      <td>timestamp with time zone</td>
+      <td>Before Spark 4.0, it was mapped as timestamp. Please refer to the 
migration guide for more information</td>
     </tr>
     <tr>
       <td>TimestampNTZType</td>
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index e668a9f9ef75..8f6a41556986 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -41,7 +41,8 @@ license: |
   - `spark.sql.avro.datetimeRebaseModeInRead` instead of 
`spark.sql.legacy.avro.datetimeRebaseModeInRead`
 - Since Spark 4.0, the default value of `spark.sql.orc.compression.codec` is 
changed from `snappy` to `zstd`. To restore the previous behavior, set 
`spark.sql.orc.compression.codec` to `snappy`.
 - Since Spark 4.0, the SQL config 
`spark.sql.legacy.allowZeroIndexInFormatString` is deprecated. Consider to 
change `strfmt` of the `format_string` function to use 1-based indexes. The 
first argument must be referenced by "1$", the second by "2$", etc.
-- Since Spark 4.0, JDBC read option `preferTimestampNTZ=true` will not convert 
Postgres TIMESTAMP WITH TIME ZONE and TIME WITH TIME ZONE data types to 
TimestampNTZType, which is available in Spark 3.5. 
+- Since Spark 4.0, Postgres JDBC datasource will read JDBC read TIMESTAMP WITH 
TIME ZONE as TimestampType regardless of the JDBC read option 
`preferTimestampNTZ`, while in 3.5 and previous, TimestampNTZType when 
`preferTimestampNTZ=true`. To restore the previous behavior, set 
`spark.sql.legacy.postgres.datetimeMapping.enabled` to `true`.
+- Since Spark 4.0, Postgres JDBC datasource will write TimestampType as 
TIMESTAMP WITH TIME ZONE, while in 3.5 and previous, it wrote as TIMESTAMP 
a.k.a. TIMESTAMP WITHOUT TIME ZONE. To restore the previous behavior, set 
`spark.sql.legacy.postgres.datetimeMapping.enabled` to `true`.
 - Since Spark 4.0, MySQL JDBC datasource will read TIMESTAMP as TimestampType 
regardless of the JDBC read option `preferTimestampNTZ`, while in 3.5 and 
previous, TimestampNTZType when `preferTimestampNTZ=true`. To restore the 
previous behavior, set `spark.sql.legacy.mysql.timestampNTZMapping.enabled` to 
`true`, MySQL DATETIME is not affected.
 - Since Spark 4.0, MySQL JDBC datasource will read SMALLINT as ShortType, 
while in Spark 3.5 and previous, it was read as IntegerType. MEDIUMINT UNSIGNED 
is read as IntegerType, while in Spark 3.5 and previous, it was read as 
LongType. To restore the previous behavior, you can cast the column to the old 
type.
 - Since Spark 4.0, MySQL JDBC datasource will read FLOAT as FloatType, while 
in Spark 3.5 and previous, it was read as DoubleType. To restore the previous 
behavior, you can cast the column to the old type.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 545b0a610cdf..06e0c6eda589 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4265,6 +4265,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED =
+    buildConf("spark.sql.legacy.postgres.datetimeMapping.enabled")
+      .internal()
+      .doc("When true, TimestampType maps to TIMESTAMP WITHOUT TIME ZONE in 
PostgreSQL for " +
+        "writing; otherwise, TIMESTAMP WITH TIME ZONE. When true, TIMESTAMP 
WITH TIME ZONE " +
+        "can be converted to TimestampNTZType when JDBC read option 
preferTimestampNTZ is " +
+        "true; otherwise, converted to TimestampType regardless of 
preferTimestampNTZ.")
+      .version("4.0.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val CSV_FILTER_PUSHDOWN_ENABLED = 
buildConf("spark.sql.csv.filterPushdown.enabled")
     .doc("When true, enable filter pushdown to CSV datasource.")
     .version("3.0.0")
@@ -5410,6 +5421,9 @@ class SQLConf extends Serializable with Logging with 
SqlApiConf {
   def legacyDB2BooleanMappingEnabled: Boolean =
     getConf(LEGACY_DB2_BOOLEAN_MAPPING_ENABLED)
 
+  def legacyPostgresDatetimeMappingEnabled: Boolean =
+    getConf(LEGACY_POSTGRES_DATETIME_MAPPING_ENABLED)
+
   override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = {
     LegacyBehaviorPolicy.withName(getConf(SQLConf.LEGACY_TIME_PARSER_POLICY))
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
index f3fb115c7057..93052a0c37b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala
@@ -61,8 +61,8 @@ private case class PostgresDialect() extends JdbcDialect with 
SQLConfHelper {
         // money type seems to be broken but one workaround is to handle it as 
string.
         // See SPARK-34333 and https://github.com/pgjdbc/pgjdbc/issues/100
         Some(StringType)
-      case Types.TIMESTAMP
-        if "timestamptz".equalsIgnoreCase(typeName) =>
+      case Types.TIMESTAMP if "timestamptz".equalsIgnoreCase(typeName) &&
+          !conf.legacyPostgresDatetimeMappingEnabled =>
         // timestamptz represents timestamp with time zone, currently it maps 
to Types.TIMESTAMP.
         // We need to change to Types.TIMESTAMP_WITH_TIMEZONE if the upstream 
changes.
         Some(TimestampType)
@@ -149,6 +149,8 @@ private case class PostgresDialect() extends JdbcDialect 
with SQLConfHelper {
     case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT))
     case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE))
     case ShortType | ByteType => Some(JdbcType("SMALLINT", Types.SMALLINT))
+    case TimestampType if !conf.legacyPostgresDatetimeMappingEnabled =>
+      Some(JdbcType("TIMESTAMP WITH TIME ZONE", Types.TIMESTAMP))
     case t: DecimalType => Some(
       JdbcType(s"NUMERIC(${t.precision},${t.scale})", java.sql.Types.NUMERIC))
     case ArrayType(et, _) if et.isInstanceOf[AtomicType] || 
et.isInstanceOf[ArrayType] =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to