This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d21e5547580 [SPARK-46466][SQL] Vectorized parquet reader should never 
do rebase for timestamp ntz
4d21e5547580 is described below

commit 4d21e55475807a089979cffb54076bcb3ae9c02d
Author: Wenchen Fan <cloud0...@gmail.com>
AuthorDate: Thu Dec 21 12:42:19 2023 +0300

    [SPARK-46466][SQL] Vectorized parquet reader should never do rebase for 
timestamp ntz
    
    ### What changes were proposed in this pull request?
    
    This fixes a correctness bug. The TIMESTAMP_NTZ is a new data type in Spark 
and has no legacy files that need to do calendar rebase. However, the 
vectorized parquet reader treat it the same as LTZ and may do rebase if the 
parquet file was written with the legacy rebase mode. This PR fixes it to never 
do rebase for NTZ.
    
    ### Why are the changes needed?
    
    bug fix
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, now we can correctly write and read back NTZ value even if the date is 
before 1582.
    
    ### How was this patch tested?
    
    new test
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #44428 from cloud-fan/ntz.
    
    Lead-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Co-authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../parquet/ParquetVectorUpdaterFactory.java       | 27 ++++++++++++----------
 .../datasources/parquet/ParquetQuerySuite.scala    | 12 ++++++++++
 2 files changed, 27 insertions(+), 12 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
index 918f21716f45..31a1957b4fb9 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java
@@ -109,24 +109,32 @@ public class ParquetVectorUpdaterFactory {
           // For unsigned int64, it stores as plain signed int64 in Parquet 
when dictionary
           // fallbacks. We read them as decimal values.
           return new UnsignedLongUpdater();
-        } else if (isTimestamp(sparkType) &&
+        } else if (sparkType == DataTypes.TimestampType &&
           isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
-          validateTimestampType(sparkType);
           if ("CORRECTED".equals(datetimeRebaseMode)) {
             return new LongUpdater();
           } else {
             boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
             return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz);
           }
-        } else if (isTimestamp(sparkType) &&
+        } else if (sparkType == DataTypes.TimestampType &&
           isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
-          validateTimestampType(sparkType);
           if ("CORRECTED".equals(datetimeRebaseMode)) {
             return new LongAsMicrosUpdater();
           } else {
             final boolean failIfRebase = 
"EXCEPTION".equals(datetimeRebaseMode);
             return new LongAsMicrosRebaseUpdater(failIfRebase, 
datetimeRebaseTz);
           }
+        } else if (sparkType == DataTypes.TimestampNTZType &&
+          isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
+          validateTimestampNTZType();
+          // TIMESTAMP_NTZ is a new data type and has no legacy files that 
need to do rebase.
+          return new LongUpdater();
+        } else if (sparkType == DataTypes.TimestampNTZType &&
+          isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
+          validateTimestampNTZType();
+          // TIMESTAMP_NTZ is a new data type and has no legacy files that 
need to do rebase.
+          return new LongAsMicrosUpdater();
         } else if (sparkType instanceof DayTimeIntervalType) {
           return new LongUpdater();
         }
@@ -195,12 +203,11 @@ public class ParquetVectorUpdaterFactory {
       annotation.getUnit() == unit;
   }
 
-  void validateTimestampType(DataType sparkType) {
+  private void validateTimestampNTZType() {
     assert(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation);
-    // Throw an exception if the Parquet type is TimestampLTZ and the Catalyst 
type is TimestampNTZ.
+    // Throw an exception if the Parquet type is TimestampLTZ as the Catalyst 
type is TimestampNTZ.
     // This is to avoid mistakes in reading the timestamp values.
-    if (((TimestampLogicalTypeAnnotation) 
logicalTypeAnnotation).isAdjustedToUTC() &&
-      sparkType == DataTypes.TimestampNTZType) {
+    if (((TimestampLogicalTypeAnnotation) 
logicalTypeAnnotation).isAdjustedToUTC()) {
       convertErrorForTimestampNTZ("int64 time(" + logicalTypeAnnotation + ")");
     }
   }
@@ -1149,10 +1156,6 @@ public class ParquetVectorUpdaterFactory {
     return false;
   }
 
-  private static boolean isTimestamp(DataType dt) {
-    return dt == DataTypes.TimestampType || dt == DataTypes.TimestampNTZType;
-  }
-
   private static boolean isDecimalTypeMatched(ColumnDescriptor descriptor, 
DataType dt) {
     DecimalType d = (DecimalType) dt;
     LogicalTypeAnnotation typeAnnotation = 
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 43103db522ba..2ba01eea51e2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -255,6 +255,18 @@ abstract class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedS
     }
   }
 
+  test("SPARK-46466: write and read TimestampNTZ with legacy rebase mode") {
+    withSQLConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key -> "LEGACY") {
+      withTable("ts") {
+        sql("create table ts (c1 timestamp_ntz) using parquet")
+        sql("insert into ts values (timestamp_ntz'0900-01-01 01:10:10')")
+        withAllParquetReaders {
+          checkAnswer(spark.table("ts"), sql("select timestamp_ntz'0900-01-01 
01:10:10'"))
+        }
+      }
+    }
+  }
+
   test("Enabling/disabling merging partfiles when merging parquet schema") {
     def testSchemaMerging(expectedColumnNumber: Int): Unit = {
       withTempDir { dir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to