This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 39ad69aea4fd [SPARK-49082][SQL] Support widening Date to TimestampNTZ 
in Avro reader
39ad69aea4fd is described below

commit 39ad69aea4fdb661ab26b228964078a0c89adbda
Author: Alden Lau <[email protected]>
AuthorDate: Mon Mar 24 12:37:24 2025 +0900

    [SPARK-49082][SQL] Support widening Date to TimestampNTZ in Avro reader
    
    ### What changes were proposed in this pull request?
    
    This change adds support for widening type promotions from `Date` to 
`TimestampNTZ` in `AvroDeserializer. This PR is a follow-up to 
https://github.com/apache/spark/pull/47582 which adds support for other 
widening type promotions.
    
    ### Why are the changes needed?
    
    When reading Avro files with a mix of Date and TimestampNTZ for a given 
column, the reader should be able to read all files and promote Date to 
TimestampNTZ instead of throwing an error when reading files with Date.
    
    Although [SPARK-49082](https://issues.apache.org/jira/browse/SPARK-49082) 
was resolved by https://github.com/apache/spark/pull/47582, that PR did not 
include Date -> TimestampNTZ widening. The change in this PR is very similar to 
https://github.com/apache/spark/pull/44368 which adds support for Date -> 
TimestampNTZ widening for the Parquet reader.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, users will no longer see an error when attempting to read a file 
containing Date when the read schema contains TimestampNTZ. The time will be 
set to 00:00, as has been done in https://github.com/apache/spark/pull/44368.
    
    ### How was this patch tested?
    
    New test in `AvroSuite`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #50315 from aldenlau-db/SPARK-49082.
    
    Authored-by: Alden Lau <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 31 ++++++++++++++++++++++
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  7 +++++
 2 files changed, 38 insertions(+)

diff --git 
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 1029243c41ea..6f345e069ff7 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -21,6 +21,7 @@ import java.io._
 import java.net.URI
 import java.nio.file.{Files, Paths, StandardCopyOption}
 import java.sql.{Date, Timestamp}
+import java.time.{LocalDate, LocalDateTime}
 import java.util.UUID
 
 import scala.jdk.CollectionConverters._
@@ -963,6 +964,36 @@ abstract class AvroSuite
     }
   }
 
+  test("SPARK-49082: Widening date to timestampNTZ in AvroDeserializer") {
+    withTempPath { tempPath =>
+      // Since timestampNTZ only supports timestamps from
+      // -290308-12-21 BCE 19:59:06 to +294247-01-10 CE 04:00:54,
+      // dates outside of this range cannot be widened to timestampNTZ
+      // and will throw an ArithmeticException.
+      val datePath = s"$tempPath/date_data"
+      val dateDf =
+        Seq(LocalDate.of(2024, 1, 1),
+          LocalDate.of(2024, 1, 2),
+          LocalDate.of(1312, 2, 27),
+          LocalDate.of(0, 1, 1),
+          LocalDate.of(-1, 12, 31),
+          LocalDate.of(-290308, 12, 22), // minimum timestampNTZ date
+          LocalDate.of(294247, 1, 10)) // maximum timestampNTZ date
+        .toDF("col")
+      dateDf.write.format("avro").save(datePath)
+      checkAnswer(
+        spark.read.schema("col TIMESTAMP_NTZ").format("avro").load(datePath),
+        Seq(Row(LocalDateTime.of(2024, 1, 1, 0, 0)),
+          Row(LocalDateTime.of(2024, 1, 2, 0, 0)),
+          Row(LocalDateTime.of(1312, 2, 27, 0, 0)),
+          Row(LocalDateTime.of(0, 1, 1, 0, 0)),
+          Row(LocalDateTime.of(-1, 12, 31, 0, 0)),
+          Row(LocalDateTime.of(-290308, 12, 22, 0, 0)),
+          Row(LocalDateTime.of(294247, 1, 10, 0, 0)))
+      )
+    }
+  }
+
   test("SPARK-43380: Fix Avro data type conversion" +
     " of DayTimeIntervalType to avoid producing incorrect results") {
     withTempPath { path =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index ac20614553ca..f66b5bd988c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.avro
 
 import java.math.BigDecimal
 import java.nio.ByteBuffer
+import java.time.ZoneOffset
 
 import scala.jdk.CollectionConverters._
 
@@ -159,6 +160,12 @@ private[sql] class AvroDeserializer(
       case (INT, DateType) => (updater, ordinal, value) =>
         updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
 
+      case (INT, TimestampNTZType) if 
avroType.getLogicalType.isInstanceOf[LogicalTypes.Date] =>
+        (updater, ordinal, value) =>
+          val days = dateRebaseFunc(value.asInstanceOf[Int])
+          val micros = DateTimeUtils.daysToMicros(days, ZoneOffset.UTC)
+          updater.setLong(ordinal, micros)
+
       case (LONG, dt: DatetimeType)
         if preventReadingIncorrectType && 
realDataType.isInstanceOf[DayTimeIntervalType] =>
         throw 
QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to