Repository: spark
Updated Branches:
  refs/heads/master 6b8fbbfb1 -> 60af2501e


[SPARK-25160][SQL] Avro: remove sql configuration 
spark.sql.avro.outputTimestampType

## What changes were proposed in this pull request?

In the PR for supporting logical timestamp types 
https://github.com/apache/spark/pull/21935, a SQL configuration 
spark.sql.avro.outputTimestampType is added, so that user can specify the 
output timestamp precision they want.

With PR https://github.com/apache/spark/pull/21847,  the output file can be 
written with user specified types.

So there is no need to have such trivial configuration. Otherwise to make it 
consistent we need to add configuration for all the Catalyst types that can be 
converted into different Avro types.

This PR also add a test case for user specified output schema with different 
timestamp types.

## How was this patch tested?

Unit test

Closes #22151 from gengliangwang/removeOutputTimestampType.

Authored-by: Gengliang Wang <gengliang.w...@databricks.com>
Signed-off-by: hyukjinkwon <gurwls...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60af2501
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60af2501
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60af2501

Branch: refs/heads/master
Commit: 60af2501e1afc00192c779f2736a4e3de12428fa
Parents: 6b8fbbf
Author: Gengliang Wang <gengliang.w...@databricks.com>
Authored: Mon Aug 20 20:42:27 2018 +0800
Committer: hyukjinkwon <gurwls...@apache.org>
Committed: Mon Aug 20 20:42:27 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/avro/AvroOptions.scala | 11 -------
 .../apache/spark/sql/avro/AvroSerializer.scala  |  6 ++--
 .../spark/sql/avro/SchemaConverters.scala       | 22 ++++----------
 .../spark/sql/avro/AvroLogicalTypeSuite.scala   | 31 ++++++++++++++------
 .../org/apache/spark/sql/internal/SQLConf.scala | 18 ------------
 5 files changed, 30 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/60af2501/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
index 8c62d5d..67f5634 100644
--- a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
+++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala
@@ -22,7 +22,6 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.internal.SQLConf.AvroOutputTimestampType
 
 /**
  * Options for Avro Reader and Writer stored in case insensitive manner.
@@ -80,14 +79,4 @@ class AvroOptions(
   val compression: String = {
     parameters.get("compression").getOrElse(SQLConf.get.avroCompressionCodec)
   }
-
-  /**
-   * Avro timestamp type used when Spark writes data to Avro files.
-   * Currently supported types are `TIMESTAMP_MICROS` and `TIMESTAMP_MILLIS`.
-   * TIMESTAMP_MICROS is a logical timestamp type in Avro, which stores number 
of microseconds
-   * from the Unix epoch. TIMESTAMP_MILLIS is also logical, but with 
millisecond precision,
-   * which means Spark has to truncate the microsecond portion of its 
timestamp value.
-   * The related configuration is set via SQLConf, and it is not exposed as an 
option.
-   */
-  val outputTimestampType: AvroOutputTimestampType.Value = 
SQLConf.get.avroOutputTimestampType
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/60af2501/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
index f551c83..e902b4c 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala
@@ -201,13 +201,11 @@ class AvroSerializer(rootCatalystType: DataType, 
rootAvroType: Schema, nullable:
 
   private def newStructConverter(
       catalystStruct: StructType, avroStruct: Schema): InternalRow => Record = 
{
-    if (avroStruct.getType != RECORD) {
+    if (avroStruct.getType != RECORD || avroStruct.getFields.size() != 
catalystStruct.length) {
       throw new IncompatibleSchemaException(s"Cannot convert Catalyst type 
$catalystStruct to " +
         s"Avro type $avroStruct.")
     }
-    val avroFields = avroStruct.getFields
-    assert(avroFields.size() == catalystStruct.length)
-    val fieldConverters = catalystStruct.zip(avroFields.asScala).map {
+    val fieldConverters = catalystStruct.zip(avroStruct.getFields.asScala).map 
{
       case (f1, f2) => newConverter(f1.dataType, 
resolveNullableType(f2.schema(), f1.nullable))
     }
     val numFields = catalystStruct.length

http://git-wip-us.apache.org/repos/asf/spark/blob/60af2501/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
index 7b33cf6..3a15e8d 100644
--- 
a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
+++ 
b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala
@@ -20,14 +20,11 @@ package org.apache.spark.sql.avro
 import scala.collection.JavaConverters._
 import scala.util.Random
 
-import com.fasterxml.jackson.annotation.ObjectIdGenerators.UUIDGenerator
-import org.apache.avro.{LogicalType, LogicalTypes, Schema, SchemaBuilder}
+import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
 import org.apache.avro.LogicalTypes.{Date, Decimal, TimestampMicros, 
TimestampMillis}
 import org.apache.avro.Schema.Type._
 
-import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.util.RandomUUIDGenerator
-import org.apache.spark.sql.internal.SQLConf.AvroOutputTimestampType
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.types.Decimal.{maxPrecisionForBytes, 
minBytesForPrecision}
 
@@ -126,8 +123,7 @@ object SchemaConverters {
       catalystType: DataType,
       nullable: Boolean = false,
       recordName: String = "topLevelRecord",
-      prevNameSpace: String = "",
-      outputTimestampType: AvroOutputTimestampType.Value = 
AvroOutputTimestampType.TIMESTAMP_MICROS)
+      prevNameSpace: String = "")
     : Schema = {
     val builder = SchemaBuilder.builder()
 
@@ -138,13 +134,7 @@ object SchemaConverters {
       case DateType =>
         LogicalTypes.date().addToSchema(builder.intType())
       case TimestampType =>
-        val timestampType = outputTimestampType match {
-          case AvroOutputTimestampType.TIMESTAMP_MILLIS => 
LogicalTypes.timestampMillis()
-          case AvroOutputTimestampType.TIMESTAMP_MICROS => 
LogicalTypes.timestampMicros()
-          case other =>
-            throw new IncompatibleSchemaException(s"Unexpected output 
timestamp type $other.")
-        }
-        timestampType.addToSchema(builder.longType())
+        LogicalTypes.timestampMicros().addToSchema(builder.longType())
 
       case FloatType => builder.floatType()
       case DoubleType => builder.doubleType()
@@ -162,10 +152,10 @@ object SchemaConverters {
       case BinaryType => builder.bytesType()
       case ArrayType(et, containsNull) =>
         builder.array()
-          .items(toAvroType(et, containsNull, recordName, prevNameSpace, 
outputTimestampType))
+          .items(toAvroType(et, containsNull, recordName, prevNameSpace))
       case MapType(StringType, vt, valueContainsNull) =>
         builder.map()
-          .values(toAvroType(vt, valueContainsNull, recordName, prevNameSpace, 
outputTimestampType))
+          .values(toAvroType(vt, valueContainsNull, recordName, prevNameSpace))
       case st: StructType =>
         val nameSpace = prevNameSpace match {
           case "" => recordName
@@ -175,7 +165,7 @@ object SchemaConverters {
         val fieldsAssembler = 
builder.record(recordName).namespace(nameSpace).fields()
         st.foreach { f =>
           val fieldAvroType =
-            toAvroType(f.dataType, f.nullable, f.name, nameSpace, 
outputTimestampType)
+            toAvroType(f.dataType, f.nullable, f.name, nameSpace)
           fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault()
         }
         fieldsAssembler.endRecord()

http://git-wip-us.apache.org/repos/asf/spark/blob/60af2501/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
index ca7eef2..79ba287 100644
--- 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
+++ 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
@@ -148,7 +148,7 @@ class AvroLogicalTypeSuite extends QueryTest with 
SharedSQLContext with SQLTestU
     }
   }
 
-  test("Logical type: specify different output timestamp types") {
+  test("Logical type: user specified output schema with different timestamp 
types") {
     withTempDir { dir =>
       val timestampAvro = timestampFile(dir.getAbsolutePath)
       val df =
@@ -156,13 +156,26 @@ class AvroLogicalTypeSuite extends QueryTest with 
SharedSQLContext with SQLTestU
 
       val expected = timestampInputData.map(t => Row(new Timestamp(t._1), new 
Timestamp(t._2)))
 
-      Seq("TIMESTAMP_MILLIS", "TIMESTAMP_MICROS").foreach { timestampType =>
-        withSQLConf(SQLConf.AVRO_OUTPUT_TIMESTAMP_TYPE.key -> timestampType) {
-          withTempPath { path =>
-            df.write.format("avro").save(path.toString)
-            checkAnswer(spark.read.format("avro").load(path.toString), 
expected)
-          }
-        }
+      val userSpecifiedTimestampSchema = s"""
+      {
+        "namespace": "logical",
+        "type": "record",
+        "name": "test",
+        "fields": [
+          {"name": "timestamp_millis",
+            "type": [{"type": "long","logicalType": "timestamp-micros"}, 
"null"]},
+          {"name": "timestamp_micros",
+            "type": [{"type": "long","logicalType": "timestamp-millis"}, 
"null"]}
+        ]
+      }
+    """
+
+      withTempPath { path =>
+        df.write
+          .format("avro")
+          .option("avroSchema", userSpecifiedTimestampSchema)
+          .save(path.toString)
+        checkAnswer(spark.read.format("avro").load(path.toString), expected)
       }
     }
   }
@@ -179,7 +192,7 @@ class AvroLogicalTypeSuite extends QueryTest with 
SharedSQLContext with SQLTestU
     }
   }
 
-  test("Logical type: user specified schema") {
+  test("Logical type: user specified read schema") {
     withTempDir { dir =>
       val timestampAvro = timestampFile(dir.getAbsolutePath)
       val expected = timestampInputData

http://git-wip-us.apache.org/repos/asf/spark/blob/60af2501/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index dbb5bb4..bffdddc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1444,21 +1444,6 @@ object SQLConf {
     .intConf
     .createWithDefault(20)
 
-  object AvroOutputTimestampType extends Enumeration {
-    val TIMESTAMP_MICROS, TIMESTAMP_MILLIS = Value
-  }
-
-  val AVRO_OUTPUT_TIMESTAMP_TYPE = 
buildConf("spark.sql.avro.outputTimestampType")
-    .doc("Sets which Avro timestamp type to use when Spark writes data to Avro 
files. " +
-      "TIMESTAMP_MICROS is a logical timestamp type in Avro, which stores 
number of " +
-      "microseconds from the Unix epoch. TIMESTAMP_MILLIS is also logical, but 
with " +
-      "millisecond precision, which means Spark has to truncate the 
microsecond portion of its " +
-      "timestamp value.")
-    .stringConf
-    .transform(_.toUpperCase(Locale.ROOT))
-    .checkValues(AvroOutputTimestampType.values.map(_.toString))
-    .createWithDefault(AvroOutputTimestampType.TIMESTAMP_MICROS.toString)
-
   val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec")
     .doc("Compression codec used in writing of AVRO files. Supported codecs: " 
+
       "uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.")
@@ -1882,9 +1867,6 @@ class SQLConf extends Serializable with Logging {
 
   def replEagerEvalTruncate: Int = getConf(SQLConf.REPL_EAGER_EVAL_TRUNCATE)
 
-  def avroOutputTimestampType: AvroOutputTimestampType.Value =
-    
AvroOutputTimestampType.withName(getConf(SQLConf.AVRO_OUTPUT_TIMESTAMP_TYPE))
-
   def avroCompressionCodec: String = getConf(SQLConf.AVRO_COMPRESSION_CODEC)
 
   def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to