Repository: spark Updated Branches: refs/heads/master 6b8fbbfb1 -> 60af2501e
[SPARK-25160][SQL] Avro: remove sql configuration spark.sql.avro.outputTimestampType ## What changes were proposed in this pull request? In the PR for supporting logical timestamp types https://github.com/apache/spark/pull/21935, a SQL configuration spark.sql.avro.outputTimestampType is added, so that user can specify the output timestamp precision they want. With PR https://github.com/apache/spark/pull/21847, the output file can be written with user specified types. So there is no need to have such trivial configuration. Otherwise to make it consistent we need to add configuration for all the Catalyst types that can be converted into different Avro types. This PR also add a test case for user specified output schema with different timestamp types. ## How was this patch tested? Unit test Closes #22151 from gengliangwang/removeOutputTimestampType. Authored-by: Gengliang Wang <gengliang.w...@databricks.com> Signed-off-by: hyukjinkwon <gurwls...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60af2501 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60af2501 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60af2501 Branch: refs/heads/master Commit: 60af2501e1afc00192c779f2736a4e3de12428fa Parents: 6b8fbbf Author: Gengliang Wang <gengliang.w...@databricks.com> Authored: Mon Aug 20 20:42:27 2018 +0800 Committer: hyukjinkwon <gurwls...@apache.org> Committed: Mon Aug 20 20:42:27 2018 +0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/avro/AvroOptions.scala | 11 ------- .../apache/spark/sql/avro/AvroSerializer.scala | 6 ++-- .../spark/sql/avro/SchemaConverters.scala | 22 ++++---------- .../spark/sql/avro/AvroLogicalTypeSuite.scala | 31 ++++++++++++++------ .../org/apache/spark/sql/internal/SQLConf.scala | 18 ------------ 5 files changed, 30 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/60af2501/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index 8c62d5d..67f5634 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -22,7 +22,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.AvroOutputTimestampType /** * Options for Avro Reader and Writer stored in case insensitive manner. @@ -80,14 +79,4 @@ class AvroOptions( val compression: String = { parameters.get("compression").getOrElse(SQLConf.get.avroCompressionCodec) } - - /** - * Avro timestamp type used when Spark writes data to Avro files. - * Currently supported types are `TIMESTAMP_MICROS` and `TIMESTAMP_MILLIS`. - * TIMESTAMP_MICROS is a logical timestamp type in Avro, which stores number of microseconds - * from the Unix epoch. TIMESTAMP_MILLIS is also logical, but with millisecond precision, - * which means Spark has to truncate the microsecond portion of its timestamp value. - * The related configuration is set via SQLConf, and it is not exposed as an option. - */ - val outputTimestampType: AvroOutputTimestampType.Value = SQLConf.get.avroOutputTimestampType } http://git-wip-us.apache.org/repos/asf/spark/blob/60af2501/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala index f551c83..e902b4c 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala @@ -201,13 +201,11 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: private def newStructConverter( catalystStruct: StructType, avroStruct: Schema): InternalRow => Record = { - if (avroStruct.getType != RECORD) { + if (avroStruct.getType != RECORD || avroStruct.getFields.size() != catalystStruct.length) { throw new IncompatibleSchemaException(s"Cannot convert Catalyst type $catalystStruct to " + s"Avro type $avroStruct.") } - val avroFields = avroStruct.getFields - assert(avroFields.size() == catalystStruct.length) - val fieldConverters = catalystStruct.zip(avroFields.asScala).map { + val fieldConverters = catalystStruct.zip(avroStruct.getFields.asScala).map { case (f1, f2) => newConverter(f1.dataType, resolveNullableType(f2.schema(), f1.nullable)) } val numFields = catalystStruct.length http://git-wip-us.apache.org/repos/asf/spark/blob/60af2501/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 7b33cf6..3a15e8d 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -20,14 +20,11 @@ package org.apache.spark.sql.avro import scala.collection.JavaConverters._ import scala.util.Random -import com.fasterxml.jackson.annotation.ObjectIdGenerators.UUIDGenerator -import org.apache.avro.{LogicalType, LogicalTypes, Schema, SchemaBuilder} +import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder} import org.apache.avro.LogicalTypes.{Date, Decimal, TimestampMicros, TimestampMillis} import org.apache.avro.Schema.Type._ -import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.util.RandomUUIDGenerator -import org.apache.spark.sql.internal.SQLConf.AvroOutputTimestampType import org.apache.spark.sql.types._ import org.apache.spark.sql.types.Decimal.{maxPrecisionForBytes, minBytesForPrecision} @@ -126,8 +123,7 @@ object SchemaConverters { catalystType: DataType, nullable: Boolean = false, recordName: String = "topLevelRecord", - prevNameSpace: String = "", - outputTimestampType: AvroOutputTimestampType.Value = AvroOutputTimestampType.TIMESTAMP_MICROS) + prevNameSpace: String = "") : Schema = { val builder = SchemaBuilder.builder() @@ -138,13 +134,7 @@ object SchemaConverters { case DateType => LogicalTypes.date().addToSchema(builder.intType()) case TimestampType => - val timestampType = outputTimestampType match { - case AvroOutputTimestampType.TIMESTAMP_MILLIS => LogicalTypes.timestampMillis() - case AvroOutputTimestampType.TIMESTAMP_MICROS => LogicalTypes.timestampMicros() - case other => - throw new IncompatibleSchemaException(s"Unexpected output timestamp type $other.") - } - timestampType.addToSchema(builder.longType()) + LogicalTypes.timestampMicros().addToSchema(builder.longType()) case FloatType => builder.floatType() case DoubleType => builder.doubleType() @@ -162,10 +152,10 @@ object SchemaConverters { case BinaryType => builder.bytesType() case ArrayType(et, containsNull) => builder.array() - .items(toAvroType(et, containsNull, recordName, prevNameSpace, outputTimestampType)) + .items(toAvroType(et, containsNull, recordName, prevNameSpace)) case MapType(StringType, vt, valueContainsNull) => builder.map() - .values(toAvroType(vt, valueContainsNull, recordName, prevNameSpace, outputTimestampType)) + .values(toAvroType(vt, valueContainsNull, recordName, prevNameSpace)) case st: StructType => val nameSpace = prevNameSpace match { case "" => recordName @@ -175,7 +165,7 @@ object SchemaConverters { val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields() st.foreach { f => val fieldAvroType = - toAvroType(f.dataType, f.nullable, f.name, nameSpace, outputTimestampType) + toAvroType(f.dataType, f.nullable, f.name, nameSpace) fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault() } fieldsAssembler.endRecord() http://git-wip-us.apache.org/repos/asf/spark/blob/60af2501/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala ---------------------------------------------------------------------- diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala index ca7eef2..79ba287 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala @@ -148,7 +148,7 @@ class AvroLogicalTypeSuite extends QueryTest with SharedSQLContext with SQLTestU } } - test("Logical type: specify different output timestamp types") { + test("Logical type: user specified output schema with different timestamp types") { withTempDir { dir => val timestampAvro = timestampFile(dir.getAbsolutePath) val df = @@ -156,13 +156,26 @@ class AvroLogicalTypeSuite extends QueryTest with SharedSQLContext with SQLTestU val expected = timestampInputData.map(t => Row(new Timestamp(t._1), new Timestamp(t._2))) - Seq("TIMESTAMP_MILLIS", "TIMESTAMP_MICROS").foreach { timestampType => - withSQLConf(SQLConf.AVRO_OUTPUT_TIMESTAMP_TYPE.key -> timestampType) { - withTempPath { path => - df.write.format("avro").save(path.toString) - checkAnswer(spark.read.format("avro").load(path.toString), expected) - } - } + val userSpecifiedTimestampSchema = s""" + { + "namespace": "logical", + "type": "record", + "name": "test", + "fields": [ + {"name": "timestamp_millis", + "type": [{"type": "long","logicalType": "timestamp-micros"}, "null"]}, + {"name": "timestamp_micros", + "type": [{"type": "long","logicalType": "timestamp-millis"}, "null"]} + ] + } + """ + + withTempPath { path => + df.write + .format("avro") + .option("avroSchema", userSpecifiedTimestampSchema) + .save(path.toString) + checkAnswer(spark.read.format("avro").load(path.toString), expected) } } } @@ -179,7 +192,7 @@ class AvroLogicalTypeSuite extends QueryTest with SharedSQLContext with SQLTestU } } - test("Logical type: user specified schema") { + test("Logical type: user specified read schema") { withTempDir { dir => val timestampAvro = timestampFile(dir.getAbsolutePath) val expected = timestampInputData http://git-wip-us.apache.org/repos/asf/spark/blob/60af2501/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index dbb5bb4..bffdddc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1444,21 +1444,6 @@ object SQLConf { .intConf .createWithDefault(20) - object AvroOutputTimestampType extends Enumeration { - val TIMESTAMP_MICROS, TIMESTAMP_MILLIS = Value - } - - val AVRO_OUTPUT_TIMESTAMP_TYPE = buildConf("spark.sql.avro.outputTimestampType") - .doc("Sets which Avro timestamp type to use when Spark writes data to Avro files. " + - "TIMESTAMP_MICROS is a logical timestamp type in Avro, which stores number of " + - "microseconds from the Unix epoch. TIMESTAMP_MILLIS is also logical, but with " + - "millisecond precision, which means Spark has to truncate the microsecond portion of its " + - "timestamp value.") - .stringConf - .transform(_.toUpperCase(Locale.ROOT)) - .checkValues(AvroOutputTimestampType.values.map(_.toString)) - .createWithDefault(AvroOutputTimestampType.TIMESTAMP_MICROS.toString) - val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec") .doc("Compression codec used in writing of AVRO files. Supported codecs: " + "uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.") @@ -1882,9 +1867,6 @@ class SQLConf extends Serializable with Logging { def replEagerEvalTruncate: Int = getConf(SQLConf.REPL_EAGER_EVAL_TRUNCATE) - def avroOutputTimestampType: AvroOutputTimestampType.Value = - AvroOutputTimestampType.withName(getConf(SQLConf.AVRO_OUTPUT_TIMESTAMP_TYPE)) - def avroCompressionCodec: String = getConf(SQLConf.AVRO_COMPRESSION_CODEC) def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org