This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 4bc4af3 [SPARK-31183][SQL][FOLLOWUP][3.0] Move rebase tests to
`AvroSuite` and check the rebase flag out of function bodies
4bc4af3 is described below
commit 4bc4af31534deccb0d12c5b5b84900f720c31381
Author: Maxim Gekk <[email protected]>
AuthorDate: Mon Mar 23 13:09:14 2020 +0900
[SPARK-31183][SQL][FOLLOWUP][3.0] Move rebase tests to `AvroSuite` and
check the rebase flag out of function bodies
### What changes were proposed in this pull request?
1. The tests added by #27953 are moved from `AvroLogicalTypeSuite` to
`AvroSuite`.
2. Checking of the `rebaseDateTime` flag is moved out from functions bodies.
This is a backport of https://github.com/apache/spark/pull/27964
### Why are the changes needed?
1. The tests are moved because they are not directly related to logical
types.
2. Checking the flag out of functions bodies should improve performance.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
By running Avro tests via the command `build/sbt avro/test`
Closes #27977 from MaxGekk/rebase-avro-datetime-followup-3.0.
Authored-by: Maxim Gekk <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
---
.../apache/spark/sql/avro/AvroDeserializer.scala | 21 ++--
.../org/apache/spark/sql/avro/AvroSerializer.scala | 16 ++-
.../spark/sql/avro/AvroLogicalTypeSuite.scala | 98 +---------------
.../org/apache/spark/sql/avro/AvroSuite.scala | 124 ++++++++++++++++++---
4 files changed, 130 insertions(+), 129 deletions(-)
diff --git
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index b98f303..3e8a7f9 100644
---
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -106,21 +106,22 @@ class AvroDeserializer(rootAvroType: Schema,
rootCatalystType: DataType) {
case (LONG, TimestampType) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not
logical type
// (the `null` case), the value is processed as timestamp type with
millisecond precision.
+ case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal,
value) =>
+ val millis = value.asInstanceOf[Long]
+ val micros = DateTimeUtils.fromMillis(millis)
+ val rebasedMicros =
DateTimeUtils.rebaseJulianToGregorianMicros(micros)
+ updater.setLong(ordinal, rebasedMicros)
case null | _: TimestampMillis => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.fromMillis(millis)
- if (rebaseDateTime) {
- updater.setLong(ordinal,
DateTimeUtils.rebaseJulianToGregorianMicros(micros))
- } else {
- updater.setLong(ordinal, micros)
- }
+ updater.setLong(ordinal, micros)
+ case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value)
=>
+ val micros = value.asInstanceOf[Long]
+ val rebasedMicros =
DateTimeUtils.rebaseJulianToGregorianMicros(micros)
+ updater.setLong(ordinal, rebasedMicros)
case _: TimestampMicros => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
- if (rebaseDateTime) {
- updater.setLong(ordinal,
DateTimeUtils.rebaseJulianToGregorianMicros(micros))
- } else {
- updater.setLong(ordinal, micros)
- }
+ updater.setLong(ordinal, micros)
case other => throw new IncompatibleSchemaException(
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp
type.")
}
diff --git
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index af9e3a5..b6e2a2b 100644
---
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -149,17 +149,15 @@ class AvroSerializer(rootCatalystType: DataType,
rootAvroType: Schema, nullable:
case (TimestampType, LONG) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is
not logical type
// (the `null` case), output the timestamp value as with millisecond
precision.
- case null | _: TimestampMillis => (getter, ordinal) =>
+ case null | _: TimestampMillis if rebaseDateTime => (getter,
ordinal) =>
val micros = getter.getLong(ordinal)
- val rebasedMicros = if (rebaseDateTime) {
- DateTimeUtils.rebaseGregorianToJulianMicros(micros)
- } else micros
+ val rebasedMicros =
DateTimeUtils.rebaseGregorianToJulianMicros(micros)
DateTimeUtils.toMillis(rebasedMicros)
- case _: TimestampMicros => (getter, ordinal) =>
- val micros = getter.getLong(ordinal)
- if (rebaseDateTime) {
- DateTimeUtils.rebaseGregorianToJulianMicros(micros)
- } else micros
+ case null | _: TimestampMillis => (getter, ordinal) =>
+ DateTimeUtils.toMillis(getter.getLong(ordinal))
+ case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
+
DateTimeUtils.rebaseGregorianToJulianMicros(getter.getLong(ordinal))
+ case _: TimestampMicros => (getter, ordinal) =>
getter.getLong(ordinal)
case other => throw new IncompatibleSchemaException(
s"Cannot convert Catalyst Timestamp type to Avro logical type
${other}")
}
diff --git
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
index 9e89b69..8256965 100644
---
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
+++
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.avro
import java.io.File
-import java.sql.{Date, Timestamp}
+import java.sql.Timestamp
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Conversions.DecimalConversion
@@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -348,100 +348,6 @@ abstract class AvroLogicalTypeSuite extends QueryTest
with SharedSparkSession {
assert(msg.contains("Unscaled value too large for precision"))
}
}
-
- private def readResourceAvroFile(name: String): DataFrame = {
- val url = Thread.currentThread().getContextClassLoader.getResource(name)
- spark.read.format("avro").load(url.toString)
- }
-
- test("SPARK-31183: compatibility with Spark 2.4 in reading
dates/timestamps") {
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
- checkAnswer(
- readResourceAvroFile("before_1582_date_v2_4.avro"),
- Row(java.sql.Date.valueOf("1001-01-01")))
- checkAnswer(
- readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
- checkAnswer(
- readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
- }
- }
-
- test("SPARK-31183: rebasing microseconds timestamps in write") {
- val tsStr = "1001-01-01 01:02:03.123456"
- val nonRebased = "1001-01-07 01:09:05.123456"
- withTempPath { dir =>
- val path = dir.getAbsolutePath
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
- Seq(tsStr).toDF("tsS")
- .select($"tsS".cast("timestamp").as("ts"))
- .write.format("avro")
- .save(path)
-
- checkAnswer(spark.read.format("avro").load(path),
Row(Timestamp.valueOf(tsStr)))
- }
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
- checkAnswer(spark.read.format("avro").load(path),
Row(Timestamp.valueOf(nonRebased)))
- }
- }
- }
-
- test("SPARK-31183: rebasing milliseconds timestamps in write") {
- val tsStr = "1001-01-01 01:02:03.123456"
- val rebased = "1001-01-01 01:02:03.123"
- val nonRebased = "1001-01-07 01:09:05.123"
- Seq(
- """{"type": "long","logicalType": "timestamp-millis"}""",
- """"long"""").foreach { tsType =>
- val timestampSchema = s"""
- |{
- | "namespace": "logical",
- | "type": "record",
- | "name": "test",
- | "fields": [
- | {"name": "ts", "type": $tsType}
- | ]
- |}""".stripMargin
- withTempPath { dir =>
- val path = dir.getAbsolutePath
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
- Seq(tsStr).toDF("tsS")
- .select($"tsS".cast("timestamp").as("ts"))
- .write
- .option("avroSchema", timestampSchema)
- .format("avro")
- .save(path)
-
- checkAnswer(
- spark.read.schema("ts timestamp").format("avro").load(path),
- Row(Timestamp.valueOf(rebased)))
- }
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
- checkAnswer(
- spark.read.schema("ts timestamp").format("avro").load(path),
- Row(Timestamp.valueOf(nonRebased)))
- }
- }
- }
- }
-
- test("SPARK-31183: rebasing dates in write") {
- withTempPath { dir =>
- val path = dir.getAbsolutePath
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
- Seq("1001-01-01").toDF("dateS")
- .select($"dateS".cast("date").as("date"))
- .write.format("avro")
- .save(path)
-
- checkAnswer(spark.read.format("avro").load(path),
Row(Date.valueOf("1001-01-01")))
- }
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
- checkAnswer(spark.read.format("avro").load(path),
Row(Date.valueOf("1001-01-07")))
- }
- }
- }
}
class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite {
diff --git
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 360160c..34a0e2b 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -21,7 +21,7 @@ import java.io._
import java.net.URL
import java.nio.file.{Files, Paths}
import java.sql.{Date, Timestamp}
-import java.util.{Locale, TimeZone, UUID}
+import java.util.{Locale, UUID}
import scala.collection.JavaConverters._
@@ -35,9 +35,10 @@ import org.apache.commons.io.FileUtils
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql._
-import org.apache.spark.sql.TestingUDT.{IntervalData, NullData, NullUDT}
+import org.apache.spark.sql.TestingUDT.IntervalData
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -83,6 +84,11 @@ abstract class AvroSuite extends QueryTest with
SharedSparkSession {
}, new GenericDatumReader[Any]()).getSchema.toString(false)
}
+ private def readResourceAvroFile(name: String): DataFrame = {
+ val url = Thread.currentThread().getContextClassLoader.getResource(name)
+ spark.read.format("avro").load(url.toString)
+ }
+
test("resolve avro data source") {
val databricksAvro = "com.databricks.spark.avro"
// By default the backward compatibility for com.databricks.spark.avro is
enabled.
@@ -402,18 +408,19 @@ abstract class AvroSuite extends QueryTest with
SharedSparkSession {
StructField("float", FloatType, true),
StructField("date", DateType, true)
))
- TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
- val rdd = spark.sparkContext.parallelize(Seq(
- Row(1f, null),
- Row(2f, new Date(1451948400000L)),
- Row(3f, new Date(1460066400500L))
- ))
- val df = spark.createDataFrame(rdd, schema)
- df.write.format("avro").save(dir.toString)
- assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
- checkAnswer(
- spark.read.format("avro").load(dir.toString).select("date"),
- Seq(Row(null), Row(new Date(1451865600000L)), Row(new
Date(1459987200000L))))
+ DateTimeTestUtils.withDefaultTimeZone(DateTimeUtils.TimeZoneUTC) {
+ val rdd = spark.sparkContext.parallelize(Seq(
+ Row(1f, null),
+ Row(2f, new Date(1451948400000L)),
+ Row(3f, new Date(1460066400500L))
+ ))
+ val df = spark.createDataFrame(rdd, schema)
+ df.write.format("avro").save(dir.toString)
+ assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
+ checkAnswer(
+ spark.read.format("avro").load(dir.toString).select("date"),
+ Seq(Row(null), Row(new Date(1451865600000L)), Row(new
Date(1459987200000L))))
+ }
}
}
@@ -1521,6 +1528,95 @@ abstract class AvroSuite extends QueryTest with
SharedSparkSession {
assert(deprecatedEvents.size === 1)
}
}
+
+ test("SPARK-31183: compatibility with Spark 2.4 in reading
dates/timestamps") {
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ checkAnswer(
+ readResourceAvroFile("before_1582_date_v2_4.avro"),
+ Row(java.sql.Date.valueOf("1001-01-01")))
+ checkAnswer(
+ readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
+ Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
+ checkAnswer(
+ readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
+ Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
+ }
+ }
+
+ test("SPARK-31183: rebasing microseconds timestamps in write") {
+ val tsStr = "1001-01-01 01:02:03.123456"
+ val nonRebased = "1001-01-07 01:09:05.123456"
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ Seq(tsStr).toDF("tsS")
+ .select($"tsS".cast("timestamp").as("ts"))
+ .write.format("avro")
+ .save(path)
+
+ checkAnswer(spark.read.format("avro").load(path),
Row(Timestamp.valueOf(tsStr)))
+ }
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+ checkAnswer(spark.read.format("avro").load(path),
Row(Timestamp.valueOf(nonRebased)))
+ }
+ }
+ }
+
+ test("SPARK-31183: rebasing milliseconds timestamps in write") {
+ val tsStr = "1001-01-01 01:02:03.123456"
+ val rebased = "1001-01-01 01:02:03.123"
+ val nonRebased = "1001-01-07 01:09:05.123"
+ Seq(
+ """{"type": "long","logicalType": "timestamp-millis"}""",
+ """"long"""").foreach { tsType =>
+ val timestampSchema = s"""
+ |{
+ | "namespace": "logical",
+ | "type": "record",
+ | "name": "test",
+ | "fields": [
+ | {"name": "ts", "type": $tsType}
+ | ]
+ |}""".stripMargin
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ Seq(tsStr).toDF("tsS")
+ .select($"tsS".cast("timestamp").as("ts"))
+ .write
+ .option("avroSchema", timestampSchema)
+ .format("avro")
+ .save(path)
+
+ checkAnswer(
+ spark.read.schema("ts timestamp").format("avro").load(path),
+ Row(Timestamp.valueOf(rebased)))
+ }
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+ checkAnswer(
+ spark.read.schema("ts timestamp").format("avro").load(path),
+ Row(Timestamp.valueOf(nonRebased)))
+ }
+ }
+ }
+ }
+
+ test("SPARK-31183: rebasing dates in write") {
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ Seq("1001-01-01").toDF("dateS")
+ .select($"dateS".cast("date").as("date"))
+ .write.format("avro")
+ .save(path)
+
+ checkAnswer(spark.read.format("avro").load(path),
Row(Date.valueOf("1001-01-01")))
+ }
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+ checkAnswer(spark.read.format("avro").load(path),
Row(Date.valueOf("1001-01-07")))
+ }
+ }
+ }
}
class AvroV1Suite extends AvroSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]