This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new a6f3e3b [SPARK-31183][SQL][FOLLOWUP] Move rebase tests to `AvroSuite`
and check the rebase flag out of function bodies
a6f3e3b is described below
commit a6f3e3b096e2d7a39e0b2fdec6452e6d633baf7e
Author: Maxim Gekk <[email protected]>
AuthorDate: Fri Mar 20 19:02:54 2020 +0900
[SPARK-31183][SQL][FOLLOWUP] Move rebase tests to `AvroSuite` and check the
rebase flag out of function bodies
1. The tests added by #27953 are moved from `AvroLogicalTypeSuite` to
`AvroSuite`.
2. Checking of the `rebaseDateTime` flag is moved out from functions bodies.
1. The tests are moved because they are not directly related to logical
types.
2. Checking the flag out of functions bodies should improve performance.
No
By running Avro tests via the command `build/sbt avro/test`
Closes #27964 from MaxGekk/rebase-avro-datetime-followup.
Authored-by: Maxim Gekk <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
---
.../apache/spark/sql/avro/AvroDeserializer.scala | 21 ++--
.../org/apache/spark/sql/avro/AvroSerializer.scala | 18 ++-
.../spark/sql/avro/AvroLogicalTypeSuite.scala | 98 +---------------
.../org/apache/spark/sql/avro/AvroSuite.scala | 124 ++++++++++++++++++---
4 files changed, 131 insertions(+), 130 deletions(-)
diff --git
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index b98f303..3e8a7f9 100644
---
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -106,21 +106,22 @@ class AvroDeserializer(rootAvroType: Schema,
rootCatalystType: DataType) {
case (LONG, TimestampType) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is not
logical type
// (the `null` case), the value is processed as timestamp type with
millisecond precision.
+ case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal,
value) =>
+ val millis = value.asInstanceOf[Long]
+ val micros = DateTimeUtils.fromMillis(millis)
+ val rebasedMicros =
DateTimeUtils.rebaseJulianToGregorianMicros(micros)
+ updater.setLong(ordinal, rebasedMicros)
case null | _: TimestampMillis => (updater, ordinal, value) =>
val millis = value.asInstanceOf[Long]
val micros = DateTimeUtils.fromMillis(millis)
- if (rebaseDateTime) {
- updater.setLong(ordinal,
DateTimeUtils.rebaseJulianToGregorianMicros(micros))
- } else {
- updater.setLong(ordinal, micros)
- }
+ updater.setLong(ordinal, micros)
+ case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value)
=>
+ val micros = value.asInstanceOf[Long]
+ val rebasedMicros =
DateTimeUtils.rebaseJulianToGregorianMicros(micros)
+ updater.setLong(ordinal, rebasedMicros)
case _: TimestampMicros => (updater, ordinal, value) =>
val micros = value.asInstanceOf[Long]
- if (rebaseDateTime) {
- updater.setLong(ordinal,
DateTimeUtils.rebaseJulianToGregorianMicros(micros))
- } else {
- updater.setLong(ordinal, micros)
- }
+ updater.setLong(ordinal, micros)
case other => throw new IncompatibleSchemaException(
s"Cannot convert Avro logical type ${other} to Catalyst Timestamp
type.")
}
diff --git
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index af9e3a5..68df7c0 100644
---
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -149,17 +149,15 @@ class AvroSerializer(rootCatalystType: DataType,
rootAvroType: Schema, nullable:
case (TimestampType, LONG) => avroType.getLogicalType match {
// For backward compatibility, if the Avro type is Long and it is
not logical type
// (the `null` case), output the timestamp value as with millisecond
precision.
- case null | _: TimestampMillis => (getter, ordinal) =>
- val micros = getter.getLong(ordinal)
- val rebasedMicros = if (rebaseDateTime) {
- DateTimeUtils.rebaseGregorianToJulianMicros(micros)
- } else micros
- DateTimeUtils.toMillis(rebasedMicros)
- case _: TimestampMicros => (getter, ordinal) =>
+ case null | _: TimestampMillis if rebaseDateTime => (getter,
ordinal) =>
val micros = getter.getLong(ordinal)
- if (rebaseDateTime) {
- DateTimeUtils.rebaseGregorianToJulianMicros(micros)
- } else micros
+ val rebasedMicros =
DateTimeUtils.rebaseGregorianToJulianMicros(micros)
+ DateTimeUtils.fromMillis(rebasedMicros)
+ case null | _: TimestampMillis => (getter, ordinal) =>
+ DateTimeUtils.fromMillis(getter.getLong(ordinal))
+ case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
+
DateTimeUtils.rebaseGregorianToJulianMicros(getter.getLong(ordinal))
+ case _: TimestampMicros => (getter, ordinal) =>
getter.getLong(ordinal)
case other => throw new IncompatibleSchemaException(
s"Cannot convert Catalyst Timestamp type to Avro logical type
${other}")
}
diff --git
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
index 9e89b69..8256965 100644
---
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
+++
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.avro
import java.io.File
-import java.sql.{Date, Timestamp}
+import java.sql.Timestamp
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.Conversions.DecimalConversion
@@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileWriter
import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -348,100 +348,6 @@ abstract class AvroLogicalTypeSuite extends QueryTest
with SharedSparkSession {
assert(msg.contains("Unscaled value too large for precision"))
}
}
-
- private def readResourceAvroFile(name: String): DataFrame = {
- val url = Thread.currentThread().getContextClassLoader.getResource(name)
- spark.read.format("avro").load(url.toString)
- }
-
- test("SPARK-31183: compatibility with Spark 2.4 in reading
dates/timestamps") {
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
- checkAnswer(
- readResourceAvroFile("before_1582_date_v2_4.avro"),
- Row(java.sql.Date.valueOf("1001-01-01")))
- checkAnswer(
- readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
- checkAnswer(
- readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
- Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
- }
- }
-
- test("SPARK-31183: rebasing microseconds timestamps in write") {
- val tsStr = "1001-01-01 01:02:03.123456"
- val nonRebased = "1001-01-07 01:09:05.123456"
- withTempPath { dir =>
- val path = dir.getAbsolutePath
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
- Seq(tsStr).toDF("tsS")
- .select($"tsS".cast("timestamp").as("ts"))
- .write.format("avro")
- .save(path)
-
- checkAnswer(spark.read.format("avro").load(path),
Row(Timestamp.valueOf(tsStr)))
- }
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
- checkAnswer(spark.read.format("avro").load(path),
Row(Timestamp.valueOf(nonRebased)))
- }
- }
- }
-
- test("SPARK-31183: rebasing milliseconds timestamps in write") {
- val tsStr = "1001-01-01 01:02:03.123456"
- val rebased = "1001-01-01 01:02:03.123"
- val nonRebased = "1001-01-07 01:09:05.123"
- Seq(
- """{"type": "long","logicalType": "timestamp-millis"}""",
- """"long"""").foreach { tsType =>
- val timestampSchema = s"""
- |{
- | "namespace": "logical",
- | "type": "record",
- | "name": "test",
- | "fields": [
- | {"name": "ts", "type": $tsType}
- | ]
- |}""".stripMargin
- withTempPath { dir =>
- val path = dir.getAbsolutePath
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
- Seq(tsStr).toDF("tsS")
- .select($"tsS".cast("timestamp").as("ts"))
- .write
- .option("avroSchema", timestampSchema)
- .format("avro")
- .save(path)
-
- checkAnswer(
- spark.read.schema("ts timestamp").format("avro").load(path),
- Row(Timestamp.valueOf(rebased)))
- }
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
- checkAnswer(
- spark.read.schema("ts timestamp").format("avro").load(path),
- Row(Timestamp.valueOf(nonRebased)))
- }
- }
- }
- }
-
- test("SPARK-31183: rebasing dates in write") {
- withTempPath { dir =>
- val path = dir.getAbsolutePath
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
- Seq("1001-01-01").toDF("dateS")
- .select($"dateS".cast("date").as("date"))
- .write.format("avro")
- .save(path)
-
- checkAnswer(spark.read.format("avro").load(path),
Row(Date.valueOf("1001-01-01")))
- }
- withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
- checkAnswer(spark.read.format("avro").load(path),
Row(Date.valueOf("1001-01-07")))
- }
- }
- }
}
class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite {
diff --git
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 360160c..34a0e2b 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -21,7 +21,7 @@ import java.io._
import java.net.URL
import java.nio.file.{Files, Paths}
import java.sql.{Date, Timestamp}
-import java.util.{Locale, TimeZone, UUID}
+import java.util.{Locale, UUID}
import scala.collection.JavaConverters._
@@ -35,9 +35,10 @@ import org.apache.commons.io.FileUtils
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql._
-import org.apache.spark.sql.TestingUDT.{IntervalData, NullData, NullUDT}
+import org.apache.spark.sql.TestingUDT.IntervalData
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -83,6 +84,11 @@ abstract class AvroSuite extends QueryTest with
SharedSparkSession {
}, new GenericDatumReader[Any]()).getSchema.toString(false)
}
+ private def readResourceAvroFile(name: String): DataFrame = {
+ val url = Thread.currentThread().getContextClassLoader.getResource(name)
+ spark.read.format("avro").load(url.toString)
+ }
+
test("resolve avro data source") {
val databricksAvro = "com.databricks.spark.avro"
// By default the backward compatibility for com.databricks.spark.avro is
enabled.
@@ -402,18 +408,19 @@ abstract class AvroSuite extends QueryTest with
SharedSparkSession {
StructField("float", FloatType, true),
StructField("date", DateType, true)
))
- TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
- val rdd = spark.sparkContext.parallelize(Seq(
- Row(1f, null),
- Row(2f, new Date(1451948400000L)),
- Row(3f, new Date(1460066400500L))
- ))
- val df = spark.createDataFrame(rdd, schema)
- df.write.format("avro").save(dir.toString)
- assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
- checkAnswer(
- spark.read.format("avro").load(dir.toString).select("date"),
- Seq(Row(null), Row(new Date(1451865600000L)), Row(new
Date(1459987200000L))))
+ DateTimeTestUtils.withDefaultTimeZone(DateTimeUtils.TimeZoneUTC) {
+ val rdd = spark.sparkContext.parallelize(Seq(
+ Row(1f, null),
+ Row(2f, new Date(1451948400000L)),
+ Row(3f, new Date(1460066400500L))
+ ))
+ val df = spark.createDataFrame(rdd, schema)
+ df.write.format("avro").save(dir.toString)
+ assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
+ checkAnswer(
+ spark.read.format("avro").load(dir.toString).select("date"),
+ Seq(Row(null), Row(new Date(1451865600000L)), Row(new
Date(1459987200000L))))
+ }
}
}
@@ -1521,6 +1528,95 @@ abstract class AvroSuite extends QueryTest with
SharedSparkSession {
assert(deprecatedEvents.size === 1)
}
}
+
+ test("SPARK-31183: compatibility with Spark 2.4 in reading
dates/timestamps") {
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ checkAnswer(
+ readResourceAvroFile("before_1582_date_v2_4.avro"),
+ Row(java.sql.Date.valueOf("1001-01-01")))
+ checkAnswer(
+ readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
+ Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
+ checkAnswer(
+ readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
+ Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
+ }
+ }
+
+ test("SPARK-31183: rebasing microseconds timestamps in write") {
+ val tsStr = "1001-01-01 01:02:03.123456"
+ val nonRebased = "1001-01-07 01:09:05.123456"
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ Seq(tsStr).toDF("tsS")
+ .select($"tsS".cast("timestamp").as("ts"))
+ .write.format("avro")
+ .save(path)
+
+ checkAnswer(spark.read.format("avro").load(path),
Row(Timestamp.valueOf(tsStr)))
+ }
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+ checkAnswer(spark.read.format("avro").load(path),
Row(Timestamp.valueOf(nonRebased)))
+ }
+ }
+ }
+
+ test("SPARK-31183: rebasing milliseconds timestamps in write") {
+ val tsStr = "1001-01-01 01:02:03.123456"
+ val rebased = "1001-01-01 01:02:03.123"
+ val nonRebased = "1001-01-07 01:09:05.123"
+ Seq(
+ """{"type": "long","logicalType": "timestamp-millis"}""",
+ """"long"""").foreach { tsType =>
+ val timestampSchema = s"""
+ |{
+ | "namespace": "logical",
+ | "type": "record",
+ | "name": "test",
+ | "fields": [
+ | {"name": "ts", "type": $tsType}
+ | ]
+ |}""".stripMargin
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ Seq(tsStr).toDF("tsS")
+ .select($"tsS".cast("timestamp").as("ts"))
+ .write
+ .option("avroSchema", timestampSchema)
+ .format("avro")
+ .save(path)
+
+ checkAnswer(
+ spark.read.schema("ts timestamp").format("avro").load(path),
+ Row(Timestamp.valueOf(rebased)))
+ }
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+ checkAnswer(
+ spark.read.schema("ts timestamp").format("avro").load(path),
+ Row(Timestamp.valueOf(nonRebased)))
+ }
+ }
+ }
+ }
+
+ test("SPARK-31183: rebasing dates in write") {
+ withTempPath { dir =>
+ val path = dir.getAbsolutePath
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+ Seq("1001-01-01").toDF("dateS")
+ .select($"dateS".cast("date").as("date"))
+ .write.format("avro")
+ .save(path)
+
+ checkAnswer(spark.read.format("avro").load(path),
Row(Date.valueOf("1001-01-01")))
+ }
+ withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+ checkAnswer(spark.read.format("avro").load(path),
Row(Date.valueOf("1001-01-07")))
+ }
+ }
+ }
}
class AvroV1Suite extends AvroSuite {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]