This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new f8eb533774a8 [SPARK-46466][SQL][3.5] Vectorized parquet reader should 
never do rebase for timestamp ntz
f8eb533774a8 is described below

commit f8eb533774a847ea3aeaa5063022513a6e89fb2b
Author: Wenchen Fan <cloud0...@gmail.com>
AuthorDate: Fri Dec 22 23:25:12 2023 +0800

    [SPARK-46466][SQL][3.5] Vectorized parquet reader should never do rebase 
for timestamp ntz
    
    backport https://github.com/apache/spark/pull/44428
    
    ### What changes were proposed in this pull request?
    
    This fixes a correctness bug. The TIMESTAMP_NTZ is a new data type in Spark 
and has no legacy files that need to do calendar rebase. However, the 
vectorized parquet reader treat it the same as LTZ and may do rebase if the 
parquet file was written with the legacy rebase mode. This PR fixes it to never 
do rebase for NTZ.
    
    ### Why are the changes needed?
    
    bug fix
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, now we can correctly write and read back NTZ value even if the date is 
before 1582.
    
    ### How was this patch tested?
    
    new test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #44446 from cloud-fan/ntz2.
    
    Authored-by: Wenchen Fan <cloud0...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 0948e24c30f6f7a05110f6e45b6723897e095aeb)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../parquet/ParquetVectorUpdaterFactory.java       | 31 ++++++++++++----------
 .../datasources/parquet/ParquetQuerySuite.scala    | 12 +++++++++
 2 files changed, 29 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 42442cf8ea8a..8c4fe2085387 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -109,24 +109,32 @@ public class ParquetVectorUpdaterFactory {
           // For unsigned int64, it stores as plain signed int64 in Parquet 
when dictionary
           // fallbacks. We read them as decimal values.
           return new UnsignedLongUpdater();
-        } else if (isTimestamp(sparkType) &&
-            isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
-          validateTimestampType(sparkType);
+        } else if (sparkType == DataTypes.TimestampType &&
+          isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
           if ("CORRECTED".equals(datetimeRebaseMode)) {
             return new LongUpdater();
           } else {
             boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
             return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz);
           }
-        } else if (isTimestamp(sparkType) &&
-            isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
-          validateTimestampType(sparkType);
+        } else if (sparkType == DataTypes.TimestampType &&
+          isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
           if ("CORRECTED".equals(datetimeRebaseMode)) {
             return new LongAsMicrosUpdater();
           } else {
             final boolean failIfRebase = 
"EXCEPTION".equals(datetimeRebaseMode);
             return new LongAsMicrosRebaseUpdater(failIfRebase, 
datetimeRebaseTz);
           }
+        } else if (sparkType == DataTypes.TimestampNTZType &&
+          isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
+          validateTimestampNTZType();
+          // TIMESTAMP_NTZ is a new data type and has no legacy files that 
need to do rebase.
+          return new LongUpdater();
+        } else if (sparkType == DataTypes.TimestampNTZType &&
+          isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
+          validateTimestampNTZType();
+          // TIMESTAMP_NTZ is a new data type and has no legacy files that 
need to do rebase.
+          return new LongAsMicrosUpdater();
         } else if (sparkType instanceof DayTimeIntervalType) {
           return new LongUpdater();
         }
@@ -196,12 +204,11 @@ public class ParquetVectorUpdaterFactory {
       ((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).getUnit() == 
unit;
   }
 
-  void validateTimestampType(DataType sparkType) {
+  private void validateTimestampNTZType() {
     assert(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation);
-    // Throw an exception if the Parquet type is TimestampLTZ and the Catalyst 
type is TimestampNTZ.
+    // Throw an exception if the Parquet type is TimestampLTZ as the Catalyst 
type is TimestampNTZ.
     // This is to avoid mistakes in reading the timestamp values.
-    if (((TimestampLogicalTypeAnnotation) 
logicalTypeAnnotation).isAdjustedToUTC() &&
-      sparkType == DataTypes.TimestampNTZType) {
+    if (((TimestampLogicalTypeAnnotation) 
logicalTypeAnnotation).isAdjustedToUTC()) {
       convertErrorForTimestampNTZ("int64 time(" + logicalTypeAnnotation + ")");
     }
   }
@@ -1152,10 +1159,6 @@ public class ParquetVectorUpdaterFactory {
     return false;
   }
 
-  private static boolean isTimestamp(DataType dt) {
-    return dt == DataTypes.TimestampType || dt == DataTypes.TimestampNTZType;
-  }
-
   private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, 
DataType dt) {
     DecimalType d = (DecimalType) dt;
     LogicalTypeAnnotation typeAnnotation = 
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index ea5444a1791f..828ec39c7d72 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -255,6 +255,18 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
     }
   }
 
+  test("SPARK-46466: write and read TimestampNTZ with legacy rebase mode") {
+    withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "LEGACY") {
+      withTable("ts") {
+        sql("create table ts (c1 timestamp_ntz) using parquet")
+        sql("insert into ts values (timestamp_ntz'0900-01-01 01:10:10')")
+        withAllParquetReaders {
+          checkAnswer(spark.table("ts"), sql("select timestamp_ntz'0900-01-01 
01:10:10'"))
+        }
+      }
+    }
+  }
+
   test("Enabling/disabling merging partfiles when merging parquet schema") {
     def testSchemaMerging(expectedColumnNumber: Int): Unit = {
       withTempDir { dir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to