This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 39ad69aea4fd [SPARK-49082][SQL] Support widening Date to TimestampNTZ
in Avro reader
39ad69aea4fd is described below
commit 39ad69aea4fdb661ab26b228964078a0c89adbda
Author: Alden Lau <[email protected]>
AuthorDate: Mon Mar 24 12:37:24 2025 +0900
[SPARK-49082][SQL] Support widening Date to TimestampNTZ in Avro reader
### What changes were proposed in this pull request?
This change adds support for widening type promotions from `Date` to
`TimestampNTZ` in `AvroDeserializer. This PR is a follow-up to
https://github.com/apache/spark/pull/47582 which adds support for other
widening type promotions.
### Why are the changes needed?
When reading Avro files with a mix of Date and TimestampNTZ for a given
column, the reader should be able to read all files and promote Date to
TimestampNTZ instead of throwing an error when reading files with Date.
Although [SPARK-49082](https://issues.apache.org/jira/browse/SPARK-49082)
was resolved by https://github.com/apache/spark/pull/47582, that PR did not
include Date -> TimestampNTZ widening. The change in this PR is very similar to
https://github.com/apache/spark/pull/44368 which adds support for Date ->
TimestampNTZ widening for the Parquet reader.
### Does this PR introduce _any_ user-facing change?
Yes, users will no longer see an error when attempting to read a file
containing Date when the read schema contains TimestampNTZ. The time will be
set to 00:00, as has been done in https://github.com/apache/spark/pull/44368.
### How was this patch tested?
New test in `AvroSuite`.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #50315 from aldenlau-db/SPARK-49082.
Authored-by: Alden Lau <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../org/apache/spark/sql/avro/AvroSuite.scala | 31 ++++++++++++++++++++++
.../apache/spark/sql/avro/AvroDeserializer.scala | 7 +++++
2 files changed, 38 insertions(+)
diff --git
a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 1029243c41ea..6f345e069ff7 100644
--- a/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/connector/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -21,6 +21,7 @@ import java.io._
import java.net.URI
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.sql.{Date, Timestamp}
+import java.time.{LocalDate, LocalDateTime}
import java.util.UUID
import scala.jdk.CollectionConverters._
@@ -963,6 +964,36 @@ abstract class AvroSuite
}
}
+ test("SPARK-49082: Widening date to timestampNTZ in AvroDeserializer") {
+ withTempPath { tempPath =>
+ // Since timestampNTZ only supports timestamps from
+ // -290308-12-21 BCE 19:59:06 to +294247-01-10 CE 04:00:54,
+ // dates outside of this range cannot be widened to timestampNTZ
+ // and will throw an ArithmeticException.
+ val datePath = s"$tempPath/date_data"
+ val dateDf =
+ Seq(LocalDate.of(2024, 1, 1),
+ LocalDate.of(2024, 1, 2),
+ LocalDate.of(1312, 2, 27),
+ LocalDate.of(0, 1, 1),
+ LocalDate.of(-1, 12, 31),
+ LocalDate.of(-290308, 12, 22), // minimum timestampNTZ date
+ LocalDate.of(294247, 1, 10)) // maximum timestampNTZ date
+ .toDF("col")
+ dateDf.write.format("avro").save(datePath)
+ checkAnswer(
+ spark.read.schema("col TIMESTAMP_NTZ").format("avro").load(datePath),
+ Seq(Row(LocalDateTime.of(2024, 1, 1, 0, 0)),
+ Row(LocalDateTime.of(2024, 1, 2, 0, 0)),
+ Row(LocalDateTime.of(1312, 2, 27, 0, 0)),
+ Row(LocalDateTime.of(0, 1, 1, 0, 0)),
+ Row(LocalDateTime.of(-1, 12, 31, 0, 0)),
+ Row(LocalDateTime.of(-290308, 12, 22, 0, 0)),
+ Row(LocalDateTime.of(294247, 1, 10, 0, 0)))
+ )
+ }
+ }
+
test("SPARK-43380: Fix Avro data type conversion" +
" of DayTimeIntervalType to avoid producing incorrect results") {
withTempPath { path =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index ac20614553ca..f66b5bd988c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.avro
import java.math.BigDecimal
import java.nio.ByteBuffer
+import java.time.ZoneOffset
import scala.jdk.CollectionConverters._
@@ -159,6 +160,12 @@ private[sql] class AvroDeserializer(
case (INT, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, dateRebaseFunc(value.asInstanceOf[Int]))
+ case (INT, TimestampNTZType) if
avroType.getLogicalType.isInstanceOf[LogicalTypes.Date] =>
+ (updater, ordinal, value) =>
+ val days = dateRebaseFunc(value.asInstanceOf[Int])
+ val micros = DateTimeUtils.daysToMicros(days, ZoneOffset.UTC)
+ updater.setLong(ordinal, micros)
+
case (LONG, dt: DatetimeType)
if preventReadingIncorrectType &&
realDataType.isInstanceOf[DayTimeIntervalType] =>
throw
QueryCompilationErrors.avroIncompatibleReadError(toFieldStr(avroPath),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]