This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 028091f [SPARK-31183][SQL] Rebase date/timestamp from/to Julian
calendar in Avro
028091f is described below
commit 028091fea4bfb3bc1a8ae114fd01be2c9bfcb82d
Author: Maxim Gekk <[email protected]>
AuthorDate: Fri Mar 20 13:57:49 2020 +0800
[SPARK-31183][SQL] Rebase date/timestamp from/to Julian calendar in Avro
The PR addresses the issue of compatibility with Spark 2.4 and earlier
version in reading/writing dates and timestamp via **Avro** datasource.
Previous releases are based on a hybrid calendar - Julian + Gregorian. Since
Spark 3.0, Proleptic Gregorian calendar is used by default, see SPARK-26651. In
particular, the issue pops up for dates/timestamps before 1582-10-15 when the
hybrid calendar switches from/to Gregorian to/from Julian calendar. The same
local date in different calendar i [...]
- -719164 in Julian calendar. Spark 2.4 saves the number as a value of DATE
type into **Avro** files.
- -719162 in Proleptic Gregorian calendar. Spark 3.0 saves the number as a
date value.
The PR proposes rebasing from/to Proleptic Gregorian calendar to the hybrid
one under the SQL config:
```
spark.sql.legacy.avro.rebaseDateTime.enabled
```
which is set to `false` by default which means the rebasing is not
performed by default.
The details of the implementation:
1. Re-use 2 methods of `DateTimeUtils` added by the PR
https://github.com/apache/spark/pull/27915 for rebasing microseconds.
2. Re-use 2 methods of `DateTimeUtils` added by the PR
https://github.com/apache/spark/pull/27915 for rebasing days.
3. Use `rebaseGregorianToJulianMicros()` and
`rebaseGregorianToJulianDays()` while saving timestamps/dates to **Avro** files
if the SQL config is on.
4. Use `rebaseJulianToGregorianMicros()` and
`rebaseJulianToGregorianDays()` while loading timestamps/dates from **Avro**
files if the SQL config is on.
5. The SQL config `spark.sql.legacy.avro.rebaseDateTime.enabled` controls
conversions from/to dates, and timestamps of the `timestamp-millis`,
`timestamp-micros` logical types.
For the backward compatibility with Spark 2.4 and earlier versions. The
changes allow users to read dates/timestamps saved by previous version, and get
the same result. Also after the changes, users can enable the rebasing in
write, and save dates/timestamps that can be loaded correctly by Spark 2.4 and
earlier versions.
Yes, the timestamp `1001-01-01 01:02:03.123456` saved by Spark 2.4.5 as
`timestamp-micros` is interpreted by Spark 3.0.0-preview2 differently:
```scala
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
scala>
spark.read.format("avro").load("/Users/maxim/tmp/before_1582/2_4_5_date_avro").show(false)
+----------+
|date |
+----------+
|1001-01-07|
+----------+
```
After the changes:
```scala
scala> spark.conf.set("spark.sql.legacy.avro.rebaseDateTime.enabled", true)
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
scala>
spark.read.format("avro").load("/Users/maxim/tmp/before_1582/2_4_5_date_avro").show(false)
+----------+
|date |
+----------+
|1001-01-01|
+----------+
```
1. Added tests to `AvroLogicalTypeSuite` to check rebasing in read. The
test reads back avro files saved by Spark 2.4.5 via:
```shell
$ export TZ="America/Los_Angeles"
```
```scala
scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
scala> val df =
Seq("1001-01-01").toDF("dateS").select($"dateS".cast("date").as("date"))
df: org.apache.spark.sql.DataFrame = [date: date]
scala>
df.write.format("avro").save("/Users/maxim/tmp/before_1582/2_4_5_date_avro")
scala> val df2 = Seq("1001-01-01
01:02:03.123456").toDF("tsS").select($"tsS".cast("timestamp").as("ts"))
df2: org.apache.spark.sql.DataFrame = [ts: timestamp]
scala>
df2.write.format("avro").save("/Users/maxim/tmp/before_1582/2_4_5_ts_avro")
scala> :paste
// Entering paste mode (ctrl-D to finish)
val timestampSchema = s"""
| {
| "namespace": "logical",
| "type": "record",
| "name": "test",
| "fields": [
| {"name": "ts", "type": ["null", {"type": "long","logicalType":
"timestamp-millis"}], "default": null}
| ]
| }
|""".stripMargin
// Exiting paste mode, now interpreting.
scala> df3.write.format("avro").option("avroSchema",
timestampSchema).save("/Users/maxim/tmp/before_1582/2_4_5_ts_millis_avro")
```
2. Added the following tests to `AvroLogicalTypeSuite` to check rebasing of
dates/timestamps (in microsecond and millisecond precision). The tests write
rebased a date/timestamps and read them back w/ enabled/disabled rebasing, and
compare results. :
- `rebasing microseconds timestamps in write`
- `rebasing milliseconds timestamps in write`
- `rebasing dates in write`
Closes #27953 from MaxGekk/rebase-avro-datetime.
Authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 4766a3664729644b5391b13805cdee44501025d8)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/avro/AvroDeserializer.scala | 34 +++++--
.../org/apache/spark/sql/avro/AvroSerializer.scala | 26 ++++--
.../src/test/resources/before_1582_date_v2_4.avro | Bin 0 -> 202 bytes
.../test/resources/before_1582_ts_micros_v2_4.avro | Bin 0 -> 218 bytes
.../test/resources/before_1582_ts_millis_v2_4.avro | Bin 0 -> 244 bytes
.../spark/sql/avro/AvroLogicalTypeSuite.scala | 98 ++++++++++++++++++++-
.../org/apache/spark/sql/internal/SQLConf.scala | 15 ++++
7 files changed, 158 insertions(+), 15 deletions(-)
diff --git
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 2c17c16..b98f303 100644
---
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -32,8 +32,9 @@ import org.apache.avro.util.Utf8
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow,
UnsafeArrayData}
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData,
GenericArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData,
DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MILLIS_PER_DAY
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
/**
@@ -42,6 +43,9 @@ import org.apache.spark.unsafe.types.UTF8String
class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) {
private lazy val decimalConversions = new DecimalConversion()
+ // Enable rebasing date/timestamp from Julian to Proleptic Gregorian calendar
+ private val rebaseDateTime = SQLConf.get.avroRebaseDateTimeEnabled
+
private val converter: Any => Any = rootCatalystType match {
// A shortcut for empty schema.
case st: StructType if st.isEmpty =>
@@ -88,6 +92,11 @@ class AvroDeserializer(rootAvroType: Schema,
rootCatalystType: DataType) {
case (INT, IntegerType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])
+ case (INT, DateType) if rebaseDateTime => (updater, ordinal, value) =>
+ val days = value.asInstanceOf[Int]
+ val rebasedDays = DateTimeUtils.rebaseJulianToGregorianDays(days)
+ updater.setInt(ordinal, rebasedDays)
+
case (INT, DateType) => (updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[Int])
@@ -95,14 +104,23 @@ class AvroDeserializer(rootAvroType: Schema,
rootCatalystType: DataType) {
updater.setLong(ordinal, value.asInstanceOf[Long])
case (LONG, TimestampType) => avroType.getLogicalType match {
- case _: TimestampMillis => (updater, ordinal, value) =>
- updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
+ // For backward compatibility, if the Avro type is Long and it is not
logical type
+ // (the `null` case), the value is processed as timestamp type with
millisecond precision.
+ case null | _: TimestampMillis => (updater, ordinal, value) =>
+ val millis = value.asInstanceOf[Long]
+ val micros = DateTimeUtils.fromMillis(millis)
+ if (rebaseDateTime) {
+ updater.setLong(ordinal,
DateTimeUtils.rebaseJulianToGregorianMicros(micros))
+ } else {
+ updater.setLong(ordinal, micros)
+ }
case _: TimestampMicros => (updater, ordinal, value) =>
- updater.setLong(ordinal, value.asInstanceOf[Long])
- case null => (updater, ordinal, value) =>
- // For backward compatibility, if the Avro type is Long and it is
not logical type,
- // the value is processed as timestamp type with millisecond
precision.
- updater.setLong(ordinal, value.asInstanceOf[Long] * 1000)
+ val micros = value.asInstanceOf[Long]
+ if (rebaseDateTime) {
+ updater.setLong(ordinal,
DateTimeUtils.rebaseJulianToGregorianMicros(micros))
+ } else {
+ updater.setLong(ordinal, micros)
+ }
case other => throw new IncompatibleSchemaException(
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp
type.")
}
diff --git
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index b7bf7e5..af9e3a5 100644
---
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -34,6 +34,8 @@ import org.apache.avro.util.Utf8
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecializedGetters,
SpecificInternalRow}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
/**
@@ -42,6 +44,9 @@ import org.apache.spark.sql.types._
class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema,
nullable: Boolean)
extends Logging {
+ // Whether to rebase datetimes from Gregorian to Julian calendar in write
+ private val rebaseDateTime: Boolean = SQLConf.get.avroRebaseDateTimeEnabled
+
def serialize(catalystData: Any): Any = {
converter.apply(catalystData)
}
@@ -135,15 +140,26 @@ class AvroSerializer(rootCatalystType: DataType,
rootAvroType: Schema, nullable:
case (BinaryType, BYTES) =>
(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
+ case (DateType, INT) if rebaseDateTime =>
+ (getter, ordinal) =>
DateTimeUtils.rebaseGregorianToJulianDays(getter.getInt(ordinal))
+
case (DateType, INT) =>
(getter, ordinal) => getter.getInt(ordinal)
case (TimestampType, LONG) => avroType.getLogicalType match {
- case _: TimestampMillis => (getter, ordinal) =>
getter.getLong(ordinal) / 1000
- case _: TimestampMicros => (getter, ordinal) =>
getter.getLong(ordinal)
- // For backward compatibility, if the Avro type is Long and it is
not logical type,
- // output the timestamp value as with millisecond precision.
- case null => (getter, ordinal) => getter.getLong(ordinal) / 1000
+ // For backward compatibility, if the Avro type is Long and it is
not logical type
+ // (the `null` case), output the timestamp value as with millisecond
precision.
+ case null | _: TimestampMillis => (getter, ordinal) =>
+ val micros = getter.getLong(ordinal)
+ val rebasedMicros = if (rebaseDateTime) {
+ DateTimeUtils.rebaseGregorianToJulianMicros(micros)
+ } else micros
+ DateTimeUtils.toMillis(rebasedMicros)
+ case _: TimestampMicros => (getter, ordinal) =>
+ val micros = getter.getLong(ordinal)
+ if (rebaseDateTime) {
+ DateTimeUtils.rebaseGregorianToJulianMicros(micros)
+ } else micros
case other => throw new IncompatibleSchemaException(
s"Cannot convert Catalyst Timestamp type to Avro logical type
${other}")
}
diff --git a/external/avro/src/test/resources/before_1582_date_v2_4.avro
b/external/avro/src/test/resources/before_1582_date_v2_4.avro
new file mode 100644
index 0000000..96aa7cb
Binary files /dev/null and
b/external/avro/src/test/resources/before_1582_date_v2_4.avro differ
diff --git a/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro
b/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro
new file mode 100644
index 0000000..efe5e71
Binary files /dev/null and
b/external/avro/src/test/resources/before_1582_ts_micros_v2_4.avro differ
diff --git a/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro
b/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro
new file mode 100644
index 0000000..dbaec81
Binary files /dev/null and
b/external/avro/src/test/resources/before_1582_ts_millis_v2_4.avro differ
diff --git
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
index 8256965..9e89b69 100644
---
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
+++
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.avro
import java.io.File
-import java.sql.Timestamp
+import java.sql.{Date, Timestamp}
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Conversions.DecimalConversion
@@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -348,6 +348,100 @@ abstract class AvroLogicalTypeSuite extends QueryTest
with SharedSparkSession {
assert(msg.contains("Unscaled value too large for precision"))
}
}
+
+ private def readResourceAvroFile(name: String): DataFrame = {
+ val url = Thread.currentThread().getContextClassLoader.getResource(name)
+ spark.read.format("avro").load(url.toString)
+ }
+
+ test("SPARK-31183: compatibility with Spark 2.4 in reading
dates/timestamps") {
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ checkAnswer(
+ readResourceAvroFile("before_1582_date_v2_4.avro"),
+ Row(java.sql.Date.valueOf("1001-01-01")))
+ checkAnswer(
+ readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
+ Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
+ checkAnswer(
+ readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
+ Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
+ }
+ }
+
+ test("SPARK-31183: rebasing microseconds timestamps in write") {
+ val tsStr = "1001-01-01 01:02:03.123456"
+ val nonRebased = "1001-01-07 01:09:05.123456"
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ Seq(tsStr).toDF("tsS")
+ .select($"tsS".cast("timestamp").as("ts"))
+ .write.format("avro")
+ .save(path)
+
+ checkAnswer(spark.read.format("avro").load(path),
Row(Timestamp.valueOf(tsStr)))
+ }
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+ checkAnswer(spark.read.format("avro").load(path),
Row(Timestamp.valueOf(nonRebased)))
+ }
+ }
+ }
+
+ test("SPARK-31183: rebasing milliseconds timestamps in write") {
+ val tsStr = "1001-01-01 01:02:03.123456"
+ val rebased = "1001-01-01 01:02:03.123"
+ val nonRebased = "1001-01-07 01:09:05.123"
+ Seq(
+ """{"type": "long","logicalType": "timestamp-millis"}""",
+ """"long"""").foreach { tsType =>
+ val timestampSchema = s"""
+ |{
+ | "namespace": "logical",
+ | "type": "record",
+ | "name": "test",
+ | "fields": [
+ | {"name": "ts", "type": $tsType}
+ | ]
+ |}""".stripMargin
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ Seq(tsStr).toDF("tsS")
+ .select($"tsS".cast("timestamp").as("ts"))
+ .write
+ .option("avroSchema", timestampSchema)
+ .format("avro")
+ .save(path)
+
+ checkAnswer(
+ spark.read.schema("ts timestamp").format("avro").load(path),
+ Row(Timestamp.valueOf(rebased)))
+ }
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+ checkAnswer(
+ spark.read.schema("ts timestamp").format("avro").load(path),
+ Row(Timestamp.valueOf(nonRebased)))
+ }
+ }
+ }
+ }
+
+ test("SPARK-31183: rebasing dates in write") {
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ Seq("1001-01-01").toDF("dateS")
+ .select($"dateS".cast("date").as("date"))
+ .write.format("avro")
+ .save(path)
+
+ checkAnswer(spark.read.format("avro").load(path),
Row(Date.valueOf("1001-01-01")))
+ }
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+ checkAnswer(spark.read.format("avro").load(path),
Row(Date.valueOf("1001-01-07")))
+ }
+ }
+ }
}
class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index a47f7d9..9681a5d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2245,6 +2245,19 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val LEGACY_AVRO_REBASE_DATETIME =
+ buildConf("spark.sql.legacy.avro.rebaseDateTime.enabled")
+ .internal()
+ .doc("When true, rebase dates/timestamps from Proleptic Gregorian
calendar " +
+ "to the hybrid calendar (Julian + Gregorian) in write and " +
+ "from the hybrid calendar to Proleptic Gregorian calendar in read. " +
+ "The rebasing is performed by converting micros/millis/days to " +
+ "a local date/timestamp in the source calendar, interpreting the
resulted date/" +
+ "timestamp in the target calendar, and getting the number of
micros/millis/days " +
+ "since the epoch 1970-01-01 00:00:00Z.")
+ .booleanConf
+ .createWithDefault(false)
+
/**
* Holds information about keys that have been deprecated.
*
@@ -2822,6 +2835,8 @@ class SQLConf extends Serializable with Logging {
def parquetRebaseDateTimeEnabled: Boolean =
getConf(SQLConf.LEGACY_PARQUET_REBASE_DATETIME)
+ def avroRebaseDateTimeEnabled: Boolean =
getConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME)
+
/** ********************** SQLConf functionality methods ************ */
/** Set Spark SQL configuration properties. */
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]