This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new db6247f [SPARK-31211][SQL] Fix rebasing of 29 February of Julian leap
years
db6247f is described below
commit db6247faa8780bca8f8d3ba71b568ea63b162973
Author: Maxim Gekk <[email protected]>
AuthorDate: Mon Mar 23 14:21:24 2020 +0800
[SPARK-31211][SQL] Fix rebasing of 29 February of Julian leap years
### What changes were proposed in this pull request?
In the PR, I propose to fix the issue of rebasing leap years in Julian
calendar to Proleptic Gregorian calendar in which the years are not leap years.
In the Julian calendar, every four years is a leap year, with a leap day added
to the month of February. In Proleptic Gregorian calendar, every year that is
exactly divisible by four is a leap year, except for years that are exactly
divisible by 100, but these centurial years are leap years, if they are exactly
divisible by 400. In this [...]
I modified the `rebaseJulianToGregorianMicros()` and
`rebaseJulianToGregorianDays()` in `DateTimeUtils` by passing 1 as a day number
of month while forming `LocalDate` or `LocalDateTime`, and adding the number of
days using the `plusDays()` method. For example, **1000-02-29** doesn't exist
in Proleptic Gregorian calendar, and `LocalDate.of(1000, 2, 29)` throws an
exception. To avoid the issue, I build the `LocalDate.of(1000, 2, 1)` date and
add 28 days. The `plusDays(28)` method produ [...]
### Why are the changes needed?
Before the changes, the `java.time.DateTimeException` exception is raised
while loading the date `1000-02-29` from parquet files saved by Spark 2.4.5:
```scala
scala> spark.conf.set("spark.sql.legacy.parquet.rebaseDateTime.enabled",
true)
scala>
spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_date_leap").show
20/03/21 03:03:59 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.time.DateTimeException: Invalid date 'February 29' as '1000' is not a
leap year
```
The parquet files were saved via the commands:
```shell
$ export TZ="America/Los_Angeles"
```
```scala
scala> scala> spark.conf.set("spark.sql.session.timeZone",
"America/Los_Angeles")
scala> val df =
Seq(java.sql.Date.valueOf("1000-02-29")).toDF("dateS").select($"dateS".as("date"))
df: org.apache.spark.sql.DataFrame = [date: date]
scala>
df.write.mode("overwrite").parquet("/Users/maxim/tmp/before_1582/2_4_5_date_leap")
scala>
spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_date_leap").show
+----------+
| date|
+----------+
|1000-02-29|
+----------+
```
### Does this PR introduce any user-facing change?
Yes, after the fix:
```scala
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
scala> spark.conf.set("spark.sql.legacy.parquet.rebaseDateTime.enabled",
true)
scala>
spark.read.parquet("/Users/maxim/tmp/before_1582/2_4_5_date_leap").show
+----------+
| date|
+----------+
|1000-03-01|
+----------+
```
### How was this patch tested?
Added tests to `DateTimeUtilsSuite`.
Closes #27974 from MaxGekk/julian-date-29-feb.
Authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/util/DateTimeUtils.scala | 14 +++-
.../sql/catalyst/util/DateTimeUtilsSuite.scala | 82 +++++++++++++++++-----
2 files changed, 77 insertions(+), 19 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index d672f4e..3a5f95b 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -977,11 +977,16 @@ object DateTimeUtils {
val localDateTime = LocalDateTime.of(
cal.get(Calendar.YEAR),
cal.get(Calendar.MONTH) + 1,
- cal.get(Calendar.DAY_OF_MONTH),
+ // The number of days will be added later to handle non-existing
+ // Julian dates in Proleptic Gregorian calendar.
+ // For example, 1000-02-29 exists in Julian calendar because 1000
+ // is a leap year but it is not a leap year in Gregorian calendar.
+ 1,
cal.get(Calendar.HOUR_OF_DAY),
cal.get(Calendar.MINUTE),
cal.get(Calendar.SECOND),
(Math.floorMod(micros, MICROS_PER_SECOND) * NANOS_PER_MICROS).toInt)
+ .plusDays(cal.get(Calendar.DAY_OF_MONTH) - 1)
instantToMicros(localDateTime.atZone(ZoneId.systemDefault).toInstant)
}
@@ -1005,7 +1010,12 @@ object DateTimeUtils {
val localDate = LocalDate.of(
utcCal.get(Calendar.YEAR),
utcCal.get(Calendar.MONTH) + 1,
- utcCal.get(Calendar.DAY_OF_MONTH))
+ // The number of days will be added later to handle non-existing
+ // Julian dates in Proleptic Gregorian calendar.
+ // For example, 1000-02-29 exists in Julian calendar because 1000
+ // is a leap year but it is not a leap year in Gregorian calendar.
+ 1)
+ .plusDays(utcCal.get(Calendar.DAY_OF_MONTH) - 1)
Math.toIntExact(localDate.toEpochDay)
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
index 39caa63..87bc2e1 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala
@@ -671,6 +671,17 @@ class DateTimeUtilsSuite extends SparkFunSuite with
Matchers with SQLHelper {
}
}
+ private def parseToJulianMicros(s: String): Long = {
+ val ts = Timestamp.valueOf(s)
+ val julianMicros = millisToMicros(ts.getTime) +
+ ((ts.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS)
+ julianMicros
+ }
+
+ private def parseToGregMicros(s: String, zoneId: ZoneId): Long = {
+ instantToMicros(LocalDateTime.parse(s).atZone(zoneId).toInstant)
+ }
+
test("rebase julian to/from gregorian micros") {
outstandingTimezones.foreach { timeZone =>
withDefaultTimeZone(timeZone) {
@@ -685,30 +696,27 @@ class DateTimeUtilsSuite extends SparkFunSuite with
Matchers with SQLHelper {
"1970-01-01 00:00:00.000001", // The epoch day
"2020-03-14 09:33:01.500000").foreach { ts =>
withClue(s"time zone = ${timeZone.getID} ts = $ts") {
- val julianTs = Timestamp.valueOf(ts)
- val julianMicros = millisToMicros(julianTs.getTime) +
- ((julianTs.getNanos / NANOS_PER_MICROS) % MICROS_PER_MILLIS)
- val gregorianMicros =
instantToMicros(LocalDateTime.parse(ts.replace(' ', 'T'))
- .atZone(timeZone.toZoneId)
- .toInstant)
+ val julianMicros = parseToJulianMicros(ts)
+ val gregMicros = parseToGregMicros(ts.replace(' ', 'T'),
timeZone.toZoneId)
- assert(rebaseJulianToGregorianMicros(julianMicros) ===
gregorianMicros)
- assert(rebaseGregorianToJulianMicros(gregorianMicros) ===
julianMicros)
+ assert(rebaseJulianToGregorianMicros(julianMicros) === gregMicros)
+ assert(rebaseGregorianToJulianMicros(gregMicros) === julianMicros)
}
}
}
}
}
+ // millisToDays() and fromJavaDate() are taken from Spark 2.4
+ private def millisToDaysLegacy(millisUtc: Long, timeZone: TimeZone): Int = {
+ val millisLocal = millisUtc + timeZone.getOffset(millisUtc)
+ Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt
+ }
+ private def fromJavaDateLegacy(date: Date): Int = {
+ millisToDaysLegacy(date.getTime, defaultTimeZone())
+ }
+
test("rebase gregorian to/from julian days") {
- // millisToDays() and fromJavaDate() are taken from Spark 2.4
- def millisToDays(millisUtc: Long, timeZone: TimeZone): Int = {
- val millisLocal = millisUtc + timeZone.getOffset(millisUtc)
- Math.floor(millisLocal.toDouble / MILLIS_PER_DAY).toInt
- }
- def fromJavaDate(date: Date): Int = {
- millisToDays(date.getTime, defaultTimeZone())
- }
outstandingTimezones.foreach { timeZone =>
withDefaultTimeZone(timeZone) {
Seq(
@@ -721,7 +729,7 @@ class DateTimeUtilsSuite extends SparkFunSuite with
Matchers with SQLHelper {
"1969-12-31",
"1970-01-01", // The epoch day
"2020-03-14").foreach { date =>
- val julianDays = fromJavaDate(Date.valueOf(date))
+ val julianDays = fromJavaDateLegacy(Date.valueOf(date))
val gregorianDays = localDateToDays(LocalDate.parse(date))
assert(rebaseGregorianToJulianDays(gregorianDays) === julianDays)
@@ -730,4 +738,44 @@ class DateTimeUtilsSuite extends SparkFunSuite with
Matchers with SQLHelper {
}
}
}
+
+ test("rebase julian to gregorian date for leap years") {
+ outstandingTimezones.foreach { timeZone =>
+ withDefaultTimeZone(timeZone) {
+ Seq(
+ "1000-02-29" -> "1000-03-01",
+ "1600-02-29" -> "1600-02-29",
+ "1700-02-29" -> "1700-03-01",
+ "2000-02-29" -> "2000-02-29").foreach { case (julianDate, gregDate)
=>
+ withClue(s"tz = ${timeZone.getID} julian date = $julianDate greg
date = $gregDate") {
+ val date = Date.valueOf(julianDate)
+ val julianDays = fromJavaDateLegacy(date)
+ val gregorianDays = localDateToDays(LocalDate.parse(gregDate))
+
+ assert(rebaseJulianToGregorianDays(julianDays) === gregorianDays)
+ }
+ }
+ }
+ }
+ }
+
+ test("rebase julian to gregorian timestamp for leap years") {
+ outstandingTimezones.foreach { timeZone =>
+ withDefaultTimeZone(timeZone) {
+ Seq(
+ "1000-02-29 01:02:03.123456" -> "1000-03-01T01:02:03.123456",
+ "1600-02-29 11:12:13.654321" -> "1600-02-29T11:12:13.654321",
+ "1700-02-29 21:22:23.000001" -> "1700-03-01T21:22:23.000001",
+ "2000-02-29 00:00:00.999999" -> "2000-02-29T00:00:00.999999"
+ ).foreach { case (julianTs, gregTs) =>
+ withClue(s"tz = ${timeZone.getID} julian ts = $julianTs greg ts =
$gregTs") {
+ val julianMicros = parseToJulianMicros(julianTs)
+ val gregorianMicros = parseToGregMicros(gregTs, timeZone.toZoneId)
+
+ assert(rebaseJulianToGregorianMicros(julianMicros) ===
gregorianMicros)
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]