This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4bc4af3  [SPARK-31183][SQL][FOLLOWUP][3.0] Move rebase tests to 
`AvroSuite` and check the rebase flag out of function bodies
4bc4af3 is described below

commit 4bc4af31534deccb0d12c5b5b84900f720c31381
Author: Maxim Gekk <[email protected]>
AuthorDate: Mon Mar 23 13:09:14 2020 +0900

    [SPARK-31183][SQL][FOLLOWUP][3.0] Move rebase tests to `AvroSuite` and 
check the rebase flag out of function bodies
    
    ### What changes were proposed in this pull request?
    1. The tests added by #27953 are moved from `AvroLogicalTypeSuite` to 
`AvroSuite`.
    2. Checking of the `rebaseDateTime` flag is moved out from functions bodies.
    
    This is a backport of https://github.com/apache/spark/pull/27964
    
    ### Why are the changes needed?
    1. The tests are moved because they are not directly related to logical 
types.
    2. Checking the flag out of functions bodies should improve performance.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    By running Avro tests via the command `build/sbt avro/test`
    
    Closes #27977 from MaxGekk/rebase-avro-datetime-followup-3.0.
    
    Authored-by: Maxim Gekk <[email protected]>
    Signed-off-by: HyukjinKwon <[email protected]>
---
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  21 ++--
 .../org/apache/spark/sql/avro/AvroSerializer.scala |  16 ++-
 .../spark/sql/avro/AvroLogicalTypeSuite.scala      |  98 +---------------
 .../org/apache/spark/sql/avro/AvroSuite.scala      | 124 ++++++++++++++++++---
 4 files changed, 130 insertions(+), 129 deletions(-)

diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index b98f303..3e8a7f9 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -106,21 +106,22 @@ class AvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataType) {
       case (LONG, TimestampType) => avroType.getLogicalType match {
         // For backward compatibility, if the Avro type is Long and it is not 
logical type
         // (the `null` case), the value is processed as timestamp type with 
millisecond precision.
+        case null | _: TimestampMillis if rebaseDateTime => (updater, ordinal, 
value) =>
+          val millis = value.asInstanceOf[Long]
+          val micros = DateTimeUtils.fromMillis(millis)
+          val rebasedMicros = 
DateTimeUtils.rebaseJulianToGregorianMicros(micros)
+          updater.setLong(ordinal, rebasedMicros)
         case null | _: TimestampMillis => (updater, ordinal, value) =>
           val millis = value.asInstanceOf[Long]
           val micros = DateTimeUtils.fromMillis(millis)
-          if (rebaseDateTime) {
-            updater.setLong(ordinal, 
DateTimeUtils.rebaseJulianToGregorianMicros(micros))
-          } else {
-            updater.setLong(ordinal, micros)
-          }
+          updater.setLong(ordinal, micros)
+        case _: TimestampMicros if rebaseDateTime => (updater, ordinal, value) 
=>
+          val micros = value.asInstanceOf[Long]
+          val rebasedMicros = 
DateTimeUtils.rebaseJulianToGregorianMicros(micros)
+          updater.setLong(ordinal, rebasedMicros)
         case _: TimestampMicros => (updater, ordinal, value) =>
           val micros = value.asInstanceOf[Long]
-          if (rebaseDateTime) {
-            updater.setLong(ordinal, 
DateTimeUtils.rebaseJulianToGregorianMicros(micros))
-          } else {
-            updater.setLong(ordinal, micros)
-          }
+          updater.setLong(ordinal, micros)
         case other => throw new IncompatibleSchemaException(
           s"Cannot convert Avro logical type ${other} to Catalyst Timestamp 
type.")
       }
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index af9e3a5..b6e2a2b 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -149,17 +149,15 @@ class AvroSerializer(rootCatalystType: DataType, 
rootAvroType: Schema, nullable:
       case (TimestampType, LONG) => avroType.getLogicalType match {
           // For backward compatibility, if the Avro type is Long and it is 
not logical type
           // (the `null` case), output the timestamp value as with millisecond 
precision.
-          case null | _: TimestampMillis => (getter, ordinal) =>
+          case null | _: TimestampMillis if rebaseDateTime => (getter, 
ordinal) =>
             val micros = getter.getLong(ordinal)
-            val rebasedMicros = if (rebaseDateTime) {
-              DateTimeUtils.rebaseGregorianToJulianMicros(micros)
-            } else micros
+            val rebasedMicros = 
DateTimeUtils.rebaseGregorianToJulianMicros(micros)
             DateTimeUtils.toMillis(rebasedMicros)
-          case _: TimestampMicros => (getter, ordinal) =>
-            val micros = getter.getLong(ordinal)
-            if (rebaseDateTime) {
-              DateTimeUtils.rebaseGregorianToJulianMicros(micros)
-            } else micros
+          case null | _: TimestampMillis => (getter, ordinal) =>
+            DateTimeUtils.toMillis(getter.getLong(ordinal))
+          case _: TimestampMicros if rebaseDateTime => (getter, ordinal) =>
+            
DateTimeUtils.rebaseGregorianToJulianMicros(getter.getLong(ordinal))
+          case _: TimestampMicros => (getter, ordinal) => 
getter.getLong(ordinal)
           case other => throw new IncompatibleSchemaException(
             s"Cannot convert Catalyst Timestamp type to Avro logical type 
${other}")
         }
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
index 9e89b69..8256965 100644
--- 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
+++ 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.avro
 
 import java.io.File
-import java.sql.{Date, Timestamp}
+import java.sql.Timestamp
 
 import org.apache.avro.{LogicalTypes, Schema}
 import org.apache.avro.Conversions.DecimalConversion
@@ -25,7 +25,7 @@ import org.apache.avro.file.DataFileWriter
 import org.apache.avro.generic.{GenericData, GenericDatumWriter, GenericRecord}
 
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.{QueryTest, Row}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -348,100 +348,6 @@ abstract class AvroLogicalTypeSuite extends QueryTest 
with SharedSparkSession {
       assert(msg.contains("Unscaled value too large for precision"))
     }
   }
-
-  private def readResourceAvroFile(name: String): DataFrame = {
-    val url = Thread.currentThread().getContextClassLoader.getResource(name)
-    spark.read.format("avro").load(url.toString)
-  }
-
-  test("SPARK-31183: compatibility with Spark 2.4 in reading 
dates/timestamps") {
-    withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
-      checkAnswer(
-        readResourceAvroFile("before_1582_date_v2_4.avro"),
-        Row(java.sql.Date.valueOf("1001-01-01")))
-      checkAnswer(
-        readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
-        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
-      checkAnswer(
-        readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
-        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
-    }
-  }
-
-  test("SPARK-31183: rebasing microseconds timestamps in write") {
-    val tsStr = "1001-01-01 01:02:03.123456"
-    val nonRebased = "1001-01-07 01:09:05.123456"
-    withTempPath { dir =>
-      val path = dir.getAbsolutePath
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
-        Seq(tsStr).toDF("tsS")
-          .select($"tsS".cast("timestamp").as("ts"))
-          .write.format("avro")
-          .save(path)
-
-        checkAnswer(spark.read.format("avro").load(path), 
Row(Timestamp.valueOf(tsStr)))
-      }
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
-        checkAnswer(spark.read.format("avro").load(path), 
Row(Timestamp.valueOf(nonRebased)))
-      }
-    }
-  }
-
-  test("SPARK-31183: rebasing milliseconds timestamps in write") {
-    val tsStr = "1001-01-01 01:02:03.123456"
-    val rebased = "1001-01-01 01:02:03.123"
-    val nonRebased = "1001-01-07 01:09:05.123"
-    Seq(
-      """{"type": "long","logicalType": "timestamp-millis"}""",
-      """"long"""").foreach { tsType =>
-      val timestampSchema = s"""
-          |{
-          |  "namespace": "logical",
-          |  "type": "record",
-          |  "name": "test",
-          |  "fields": [
-          |    {"name": "ts", "type": $tsType}
-          |  ]
-          |}""".stripMargin
-      withTempPath { dir =>
-        val path = dir.getAbsolutePath
-        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
-          Seq(tsStr).toDF("tsS")
-            .select($"tsS".cast("timestamp").as("ts"))
-            .write
-            .option("avroSchema", timestampSchema)
-            .format("avro")
-            .save(path)
-
-          checkAnswer(
-            spark.read.schema("ts timestamp").format("avro").load(path),
-            Row(Timestamp.valueOf(rebased)))
-        }
-        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
-          checkAnswer(
-            spark.read.schema("ts timestamp").format("avro").load(path),
-            Row(Timestamp.valueOf(nonRebased)))
-        }
-      }
-    }
-  }
-
-  test("SPARK-31183: rebasing dates in write") {
-    withTempPath { dir =>
-      val path = dir.getAbsolutePath
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
-        Seq("1001-01-01").toDF("dateS")
-          .select($"dateS".cast("date").as("date"))
-          .write.format("avro")
-          .save(path)
-
-        checkAnswer(spark.read.format("avro").load(path), 
Row(Date.valueOf("1001-01-01")))
-      }
-      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
-        checkAnswer(spark.read.format("avro").load(path), 
Row(Date.valueOf("1001-01-07")))
-      }
-    }
-  }
 }
 
 class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite {
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index 360160c..34a0e2b 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.net.URL
 import java.nio.file.{Files, Paths}
 import java.sql.{Date, Timestamp}
-import java.util.{Locale, TimeZone, UUID}
+import java.util.{Locale, UUID}
 
 import scala.collection.JavaConverters._
 
@@ -35,9 +35,10 @@ import org.apache.commons.io.FileUtils
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql._
-import org.apache.spark.sql.TestingUDT.{IntervalData, NullData, NullUDT}
+import org.apache.spark.sql.TestingUDT.IntervalData
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -83,6 +84,11 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
     }, new GenericDatumReader[Any]()).getSchema.toString(false)
   }
 
+  private def readResourceAvroFile(name: String): DataFrame = {
+    val url = Thread.currentThread().getContextClassLoader.getResource(name)
+    spark.read.format("avro").load(url.toString)
+  }
+
   test("resolve avro data source") {
     val databricksAvro = "com.databricks.spark.avro"
     // By default the backward compatibility for com.databricks.spark.avro is 
enabled.
@@ -402,18 +408,19 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
         StructField("float", FloatType, true),
         StructField("date", DateType, true)
       ))
-      TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
-      val rdd = spark.sparkContext.parallelize(Seq(
-        Row(1f, null),
-        Row(2f, new Date(1451948400000L)),
-        Row(3f, new Date(1460066400500L))
-      ))
-      val df = spark.createDataFrame(rdd, schema)
-      df.write.format("avro").save(dir.toString)
-      assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
-      checkAnswer(
-        spark.read.format("avro").load(dir.toString).select("date"),
-        Seq(Row(null), Row(new Date(1451865600000L)), Row(new 
Date(1459987200000L))))
+      DateTimeTestUtils.withDefaultTimeZone(DateTimeUtils.TimeZoneUTC) {
+        val rdd = spark.sparkContext.parallelize(Seq(
+          Row(1f, null),
+          Row(2f, new Date(1451948400000L)),
+          Row(3f, new Date(1460066400500L))
+        ))
+        val df = spark.createDataFrame(rdd, schema)
+        df.write.format("avro").save(dir.toString)
+        assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
+        checkAnswer(
+          spark.read.format("avro").load(dir.toString).select("date"),
+          Seq(Row(null), Row(new Date(1451865600000L)), Row(new 
Date(1459987200000L))))
+      }
     }
   }
 
@@ -1521,6 +1528,95 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
       assert(deprecatedEvents.size === 1)
     }
   }
+
+  test("SPARK-31183: compatibility with Spark 2.4 in reading 
dates/timestamps") {
+    withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+      checkAnswer(
+        readResourceAvroFile("before_1582_date_v2_4.avro"),
+        Row(java.sql.Date.valueOf("1001-01-01")))
+      checkAnswer(
+        readResourceAvroFile("before_1582_ts_micros_v2_4.avro"),
+        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456")))
+      checkAnswer(
+        readResourceAvroFile("before_1582_ts_millis_v2_4.avro"),
+        Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.124")))
+    }
+  }
+
+  test("SPARK-31183: rebasing microseconds timestamps in write") {
+    val tsStr = "1001-01-01 01:02:03.123456"
+    val nonRebased = "1001-01-07 01:09:05.123456"
+    withTempPath { dir =>
+      val path = dir.getAbsolutePath
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+        Seq(tsStr).toDF("tsS")
+          .select($"tsS".cast("timestamp").as("ts"))
+          .write.format("avro")
+          .save(path)
+
+        checkAnswer(spark.read.format("avro").load(path), 
Row(Timestamp.valueOf(tsStr)))
+      }
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+        checkAnswer(spark.read.format("avro").load(path), 
Row(Timestamp.valueOf(nonRebased)))
+      }
+    }
+  }
+
+  test("SPARK-31183: rebasing milliseconds timestamps in write") {
+    val tsStr = "1001-01-01 01:02:03.123456"
+    val rebased = "1001-01-01 01:02:03.123"
+    val nonRebased = "1001-01-07 01:09:05.123"
+    Seq(
+      """{"type": "long","logicalType": "timestamp-millis"}""",
+      """"long"""").foreach { tsType =>
+      val timestampSchema = s"""
+        |{
+        |  "namespace": "logical",
+        |  "type": "record",
+        |  "name": "test",
+        |  "fields": [
+        |    {"name": "ts", "type": $tsType}
+        |  ]
+        |}""".stripMargin
+      withTempPath { dir =>
+        val path = dir.getAbsolutePath
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+          Seq(tsStr).toDF("tsS")
+            .select($"tsS".cast("timestamp").as("ts"))
+            .write
+            .option("avroSchema", timestampSchema)
+            .format("avro")
+            .save(path)
+
+          checkAnswer(
+            spark.read.schema("ts timestamp").format("avro").load(path),
+            Row(Timestamp.valueOf(rebased)))
+        }
+        withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+          checkAnswer(
+            spark.read.schema("ts timestamp").format("avro").load(path),
+            Row(Timestamp.valueOf(nonRebased)))
+        }
+      }
+    }
+  }
+
+  test("SPARK-31183: rebasing dates in write") {
+    withTempPath { dir =>
+      val path = dir.getAbsolutePath
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "true") {
+        Seq("1001-01-01").toDF("dateS")
+          .select($"dateS".cast("date").as("date"))
+          .write.format("avro")
+          .save(path)
+
+        checkAnswer(spark.read.format("avro").load(path), 
Row(Date.valueOf("1001-01-01")))
+      }
+      withSQLConf(SQLConf.LEGACY_AVRO_REBASE_DATETIME.key -> "false") {
+        checkAnswer(spark.read.format("avro").load(path), 
Row(Date.valueOf("1001-01-07")))
+      }
+    }
+  }
 }
 
 class AvroV1Suite extends AvroSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to