Repository: spark Updated Branches: refs/heads/master 131ca146e -> 819c4de45
[SPARK-24772][SQL] Avro: support logical date type ## What changes were proposed in this pull request? Support Avro logical date type: https://avro.apache.org/docs/1.8.2/spec.html#Date ## How was this patch tested? Unit test Closes #21984 from gengliangwang/avro_date. Authored-by: Gengliang Wang <gengliang.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/819c4de4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/819c4de4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/819c4de4 Branch: refs/heads/master Commit: 819c4de45af2fe39bac8363241d0001b2e83f858 Parents: 131ca14 Author: Gengliang Wang <gengliang.w...@databricks.com> Authored: Tue Aug 7 17:24:25 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Tue Aug 7 17:24:25 2018 +0800 ---------------------------------------------------------------------- .../spark/sql/avro/AvroDeserializer.scala | 5 ++ .../apache/spark/sql/avro/AvroSerializer.scala | 2 +- .../spark/sql/avro/SchemaConverters.scala | 12 +++-- external/avro/src/test/resources/date.avro | Bin 0 -> 209 bytes .../org/apache/spark/sql/avro/AvroSuite.scala | 54 +++++++++++++++++-- 5 files changed, 66 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/819c4de4/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 394a62b..74677a2 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -84,6 +84,9 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { case (INT, IntegerType) => (updater, ordinal, value) => updater.setInt(ordinal, value.asInstanceOf[Int]) + case (INT, DateType) => (updater, ordinal, value) => + updater.setInt(ordinal, value.asInstanceOf[Int]) + case (LONG, LongType) => (updater, ordinal, value) => updater.setLong(ordinal, value.asInstanceOf[Long]) @@ -100,6 +103,8 @@ class AvroDeserializer(rootAvroType: Schema, rootCatalystType: DataType) { s"Cannot convert Avro logical type ${other} to Catalyst Timestamp type.") } + // Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date. + // For backward compatibility, we still keep this conversion. case (LONG, DateType) => (updater, ordinal, value) => updater.setInt(ordinal, (value.asInstanceOf[Long] / DateTimeUtils.MILLIS_PER_DAY).toInt) http://git-wip-us.apache.org/repos/asf/spark/blob/819c4de4/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index 382f9a7..9885826 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -92,7 +92,7 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: case BinaryType => (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal)) case DateType => - (getter, ordinal) => getter.getInt(ordinal) * DateTimeUtils.MILLIS_PER_DAY + (getter, ordinal) => getter.getInt(ordinal) case TimestampType => avroType.getLogicalType match { case _: TimestampMillis => (getter, ordinal) => getter.getLong(ordinal) / 1000 case _: TimestampMicros => (getter, ordinal) => getter.getLong(ordinal) http://git-wip-us.apache.org/repos/asf/spark/blob/819c4de4/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 6929539..245e68d 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.avro import scala.collection.JavaConverters._ import org.apache.avro.{LogicalType, LogicalTypes, Schema, SchemaBuilder} -import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis} +import org.apache.avro.LogicalTypes.{Date, TimestampMicros, TimestampMillis} import org.apache.avro.Schema.Type._ import org.apache.spark.sql.internal.SQLConf.AvroOutputTimestampType @@ -38,7 +38,10 @@ object SchemaConverters { */ def toSqlType(avroSchema: Schema): SchemaType = { avroSchema.getType match { - case INT => SchemaType(IntegerType, nullable = false) + case INT => avroSchema.getLogicalType match { + case _: Date => SchemaType(DateType, nullable = false) + case _ => SchemaType(IntegerType, nullable = false) + } case STRING => SchemaType(StringType, nullable = false) case BOOLEAN => SchemaType(BooleanType, nullable = false) case BYTES => SchemaType(BinaryType, nullable = false) @@ -121,7 +124,10 @@ object SchemaConverters { case BooleanType => builder.booleanType() case ByteType | ShortType | IntegerType => builder.intType() case LongType => builder.longType() - case DateType => builder.longType() + case DateType => builder + .intBuilder() + .prop(LogicalType.LOGICAL_TYPE_PROP, LogicalTypes.date().getName) + .endInt() case TimestampType => val timestampType = outputTimestampType match { case AvroOutputTimestampType.TIMESTAMP_MILLIS => LogicalTypes.timestampMillis() http://git-wip-us.apache.org/repos/asf/spark/blob/819c4de4/external/avro/src/test/resources/date.avro ---------------------------------------------------------------------- diff --git a/external/avro/src/test/resources/date.avro b/external/avro/src/test/resources/date.avro new file mode 100644 index 0000000..3a67617 Binary files /dev/null and b/external/avro/src/test/resources/date.avro differ http://git-wip-us.apache.org/repos/asf/spark/blob/819c4de4/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index b4dcf6c..47995bb 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -33,6 +33,7 @@ import org.apache.avro.generic.GenericData.{EnumSymbol, Fixed} import org.apache.commons.io.FileUtils import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} @@ -67,6 +68,27 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { // writer.close() val timestampAvro = testFile("timestamp.avro") + // The test file date.avro is generated via following Python code: + // import json + // import avro.schema + // from avro.datafile import DataFileWriter + // from avro.io import DatumWriter + // + // write_schema = avro.schema.parse(json.dumps({ + // "namespace": "logical", + // "type": "record", + // "name": "test", + // "fields": [ + // {"name": "date", "type": {"type": "int", "logicalType": "date"}} + // ] + // })) + // + // writer = DataFileWriter(open("date.avro", "wb"), DatumWriter(), write_schema) + // writer.append({"date": 7}) + // writer.append({"date": 365}) + // writer.close() + val dateAvro = testFile("date.avro") + override protected def beforeAll(): Unit = { super.beforeAll() spark.conf.set("spark.sql.files.maxPartitionBytes", 1024) @@ -350,9 +372,35 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { val df = spark.createDataFrame(rdd, schema) df.write.format("avro").save(dir.toString) assert(spark.read.format("avro").load(dir.toString).count == rdd.count) - assert( - spark.read.format("avro").load(dir.toString).select("date").collect().map(_(0)).toSet == - Array(null, 1451865600000L, 1459987200000L).toSet) + checkAnswer( + spark.read.format("avro").load(dir.toString).select("date"), + Seq(Row(null), Row(new Date(1451865600000L)), Row(new Date(1459987200000L)))) + } + } + + test("Logical type: date") { + val expected = Seq(7, 365).map(t => Row(DateTimeUtils.toJavaDate(t))) + val df = spark.read.format("avro").load(dateAvro) + + checkAnswer(df, expected) + + val avroSchema = s""" + { + "namespace": "logical", + "type": "record", + "name": "test", + "fields": [ + {"name": "date", "type": {"type": "int", "logicalType": "date"}} + ] + } + """ + + checkAnswer(spark.read.format("avro").option("avroSchema", avroSchema).load(dateAvro), + expected) + + withTempPath { dir => + df.write.format("avro").save(dir.toString) + checkAnswer(spark.read.format("avro").load(dir.toString), expected) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org