Repository: spark
Updated Branches:
  refs/heads/master 131ca146e -> 819c4de45


[SPARK-24772][SQL] Avro: support logical date type

## What changes were proposed in this pull request?

Support Avro logical date type:
https://avro.apache.org/docs/1.8.2/spec.html#Date

## How was this patch tested?

Unit test

Closes #21984 from gengliangwang/avro_date.

Authored-by: Gengliang Wang <gengliang.w...@databricks.com>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/819c4de4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/819c4de4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/819c4de4

Branch: refs/heads/master
Commit: 819c4de45af2fe39bac8363241d0001b2e83f858
Parents: 131ca14
Author: Gengliang Wang <gengliang.w...@databricks.com>
Authored: Tue Aug 7 17:24:25 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Aug 7 17:24:25 2018 +0800

----------------------------------------------------------------------
 .../spark/sql/avro/AvroDeserializer.scala       |   5 ++
 .../apache/spark/sql/avro/AvroSerializer.scala  |   2 +-
 .../spark/sql/avro/SchemaConverters.scala       |  12 +++--
 external/avro/src/test/resources/date.avro      | Bin 0 -> 209 bytes
 .../org/apache/spark/sql/avro/AvroSuite.scala   |  54 +++++++++++++++++--
 5 files changed, 66 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/819c4de4/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 394a62b..74677a2 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -84,6 +84,9 @@ class AvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataType) {
       case (INT, IntegerType) => (updater, ordinal, value) =>
         updater.setInt(ordinal, value.asInstanceOf[Int])
 
+      case (INT, DateType) => (updater, ordinal, value) =>
+        updater.setInt(ordinal, value.asInstanceOf[Int])
+
       case (LONG, LongType) => (updater, ordinal, value) =>
         updater.setLong(ordinal, value.asInstanceOf[Long])
 
@@ -100,6 +103,8 @@ class AvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataType) {
           s"Cannot convert Avro logical type ${other} to Catalyst Timestamp 
type.")
       }
 
+      // Before we upgrade Avro to 1.8 for logical type support, spark-avro 
converts Long to Date.
+      // For backward compatibility, we still keep this conversion.
       case (LONG, DateType) => (updater, ordinal, value) =>
         updater.setInt(ordinal, (value.asInstanceOf[Long] / 
DateTimeUtils.MILLIS_PER_DAY).toInt)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/819c4de4/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index 382f9a7..9885826 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -92,7 +92,7 @@ class AvroSerializer(rootCatalystType: DataType, 
rootAvroType: Schema, nullable:
       case BinaryType =>
         (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal))
       case DateType =>
-        (getter, ordinal) => getter.getInt(ordinal) * 
DateTimeUtils.MILLIS_PER_DAY
+        (getter, ordinal) => getter.getInt(ordinal)
       case TimestampType => avroType.getLogicalType match {
           case _: TimestampMillis => (getter, ordinal) => 
getter.getLong(ordinal) / 1000
           case _: TimestampMicros => (getter, ordinal) => 
getter.getLong(ordinal)

http://git-wip-us.apache.org/repos/asf/spark/blob/819c4de4/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
index 6929539..245e68d 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.avro
 import scala.collection.JavaConverters._
 
 import org.apache.avro.{LogicalType, LogicalTypes, Schema, SchemaBuilder}
-import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
+import org.apache.avro.LogicalTypes.{Date, TimestampMicros, TimestampMillis}
 import org.apache.avro.Schema.Type._
 
 import org.apache.spark.sql.internal.SQLConf.AvroOutputTimestampType
@@ -38,7 +38,10 @@ object SchemaConverters {
    */
   def toSqlType(avroSchema: Schema): SchemaType = {
     avroSchema.getType match {
-      case INT => SchemaType(IntegerType, nullable = false)
+      case INT => avroSchema.getLogicalType match {
+        case _: Date => SchemaType(DateType, nullable = false)
+        case _ => SchemaType(IntegerType, nullable = false)
+      }
       case STRING => SchemaType(StringType, nullable = false)
       case BOOLEAN => SchemaType(BooleanType, nullable = false)
       case BYTES => SchemaType(BinaryType, nullable = false)
@@ -121,7 +124,10 @@ object SchemaConverters {
       case BooleanType => builder.booleanType()
       case ByteType | ShortType | IntegerType => builder.intType()
       case LongType => builder.longType()
-      case DateType => builder.longType()
+      case DateType => builder
+        .intBuilder()
+        .prop(LogicalType.LOGICAL_TYPE_PROP, LogicalTypes.date().getName)
+        .endInt()
       case TimestampType =>
         val timestampType = outputTimestampType match {
           case AvroOutputTimestampType.TIMESTAMP_MILLIS => 
LogicalTypes.timestampMillis()

http://git-wip-us.apache.org/repos/asf/spark/blob/819c4de4/external/avro/src/test/resources/date.avro
----------------------------------------------------------------------
diff --git a/external/avro/src/test/resources/date.avro 
b/external/avro/src/test/resources/date.avro
new file mode 100644
index 0000000..3a67617
Binary files /dev/null and b/external/avro/src/test/resources/date.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/819c4de4/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index b4dcf6c..47995bb 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -33,6 +33,7 @@ import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed}
 import org.apache.commons.io.FileUtils
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
@@ -67,6 +68,27 @@ class AvroSuite extends QueryTest with SharedSQLContext with 
SQLTestUtils {
   // writer.close()
   val timestampAvro = testFile("timestamp.avro")
 
+  // The test file date.avro is generated via following Python code:
+  // import json
+  // import avro.schema
+  // from avro.datafile import DataFileWriter
+  // from avro.io import DatumWriter
+  //
+  // write_schema = avro.schema.parse(json.dumps({
+  //   "namespace": "logical",
+  //   "type": "record",
+  //   "name": "test",
+  //   "fields": [
+  //   {"name": "date", "type": {"type": "int", "logicalType": "date"}}
+  //   ]
+  // }))
+  //
+  // writer = DataFileWriter(open("date.avro", "wb"), DatumWriter(), 
write_schema)
+  // writer.append({"date": 7})
+  // writer.append({"date": 365})
+  // writer.close()
+  val dateAvro = testFile("date.avro")
+
   override protected def beforeAll(): Unit = {
     super.beforeAll()
     spark.conf.set("spark.sql.files.maxPartitionBytes", 1024)
@@ -350,9 +372,35 @@ class AvroSuite extends QueryTest with SharedSQLContext 
with SQLTestUtils {
       val df = spark.createDataFrame(rdd, schema)
       df.write.format("avro").save(dir.toString)
       assert(spark.read.format("avro").load(dir.toString).count == rdd.count)
-      assert(
-        
spark.read.format("avro").load(dir.toString).select("date").collect().map(_(0)).toSet
 ==
-        Array(null, 1451865600000L, 1459987200000L).toSet)
+      checkAnswer(
+        spark.read.format("avro").load(dir.toString).select("date"),
+        Seq(Row(null), Row(new Date(1451865600000L)), Row(new 
Date(1459987200000L))))
+    }
+  }
+
+  test("Logical type: date") {
+    val expected = Seq(7, 365).map(t => Row(DateTimeUtils.toJavaDate(t)))
+    val df = spark.read.format("avro").load(dateAvro)
+
+    checkAnswer(df, expected)
+
+    val avroSchema = s"""
+      {
+        "namespace": "logical",
+        "type": "record",
+        "name": "test",
+        "fields": [
+          {"name": "date", "type": {"type": "int", "logicalType": "date"}}
+        ]
+      }
+    """
+
+    checkAnswer(spark.read.format("avro").option("avroSchema", 
avroSchema).load(dateAvro),
+      expected)
+
+    withTempPath { dir =>
+      df.write.format("avro").save(dir.toString)
+      checkAnswer(spark.read.format("avro").load(dir.toString), expected)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to