Repository: spark
Updated Branches:
  refs/heads/master 881c5c807 -> e0d7665ce


[SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support writing to Hive table 
which uses Avro schema url 'avro.schema.url'

## What changes were proposed in this pull request?
SPARK-19580 Support for avro.schema.url while writing to hive table
SPARK-19878 Add hive configuration when initialize hive serde in 
InsertIntoHiveTable.scala
SPARK-17920 HiveWriterContainer passes null configuration to serde.initialize, 
causing NullPointerException in AvroSerde when using avro.schema.url

Support writing to Hive table which uses Avro schema url 'avro.schema.url'
For ex:
create external table avro_in (a string) stored as avro location '/avro-in/' 
tblproperties ('avro.schema.url'='/avro-schema/avro.avsc');

create external table avro_out (a string) stored as avro location '/avro-out/' 
tblproperties ('avro.schema.url'='/avro-schema/avro.avsc');

 insert overwrite table avro_out select * from avro_in;  // fails with 
java.lang.NullPointerException

 WARN AvroSerDe: Encountered exception determining schema. Returning signal 
schema to indicate problem
java.lang.NullPointerException
        at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:182)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174)

## Changes proposed in this fix
Currently 'null' value is passed to serializer, which causes NPE during insert 
operation, instead pass Hadoop configuration object
## How was this patch tested?
Added new test case in VersionsSuite

Author: vinodkc <[email protected]>

Closes #19779 from vinodkc/br_Fix_SPARK-17920.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0d7665c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0d7665c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0d7665c

Branch: refs/heads/master
Commit: e0d7665cec1e6954d640f422c79ebba4c273be7d
Parents: 881c5c8
Author: vinodkc <[email protected]>
Authored: Tue Nov 21 22:31:46 2017 -0800
Committer: gatorsmile <[email protected]>
Committed: Tue Nov 21 22:31:46 2017 -0800

----------------------------------------------------------------------
 .../sql/hive/execution/HiveFileFormat.scala     |  4 +-
 .../spark/sql/hive/client/VersionsSuite.scala   | 72 +++++++++++++++++++-
 2 files changed, 73 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e0d7665c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index ac735e8..4a7cd69 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -116,7 +116,7 @@ class HiveOutputWriter(
 
   private val serializer = {
     val serializer = 
tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
-    serializer.initialize(null, tableDesc.getProperties)
+    serializer.initialize(jobConf, tableDesc.getProperties)
     serializer
   }
 
@@ -130,7 +130,7 @@ class HiveOutputWriter(
 
   private val standardOI = ObjectInspectorUtils
     .getStandardObjectInspector(
-      tableDesc.getDeserializer.getObjectInspector,
+      tableDesc.getDeserializer(jobConf).getObjectInspector,
       ObjectInspectorCopyOption.JAVA)
     .asInstanceOf[StructObjectInspector]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e0d7665c/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 9ed39cc..fbf6877 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.client
 
-import java.io.{ByteArrayOutputStream, File, PrintStream}
+import java.io.{ByteArrayOutputStream, File, PrintStream, PrintWriter}
 import java.net.URI
 
 import org.apache.hadoop.conf.Configuration
@@ -841,6 +841,76 @@ class VersionsSuite extends SparkFunSuite with Logging {
       }
     }
 
+    test(s"$version: SPARK-17920: Insert into/overwrite avro table") {
+      withTempDir { dir =>
+        val path = dir.getAbsolutePath
+        val schemaPath = s"""$path${File.separator}avroschemadir"""
+
+        new File(schemaPath).mkdir()
+        val avroSchema =
+          """{
+            |  "name": "test_record",
+            |  "type": "record",
+            |  "fields": [ {
+            |    "name": "f0",
+            |    "type": [
+            |      "null",
+            |      {
+            |        "precision": 38,
+            |        "scale": 2,
+            |        "type": "bytes",
+            |        "logicalType": "decimal"
+            |      }
+            |    ]
+            |  } ]
+            |}
+          """.stripMargin
+        val schemaUrl = s"""$schemaPath${File.separator}avroDecimal.avsc"""
+        val schemaFile = new File(schemaPath, "avroDecimal.avsc")
+        val writer = new PrintWriter(schemaFile)
+        writer.write(avroSchema)
+        writer.close()
+
+        val url = 
Thread.currentThread().getContextClassLoader.getResource("avroDecimal")
+        val srcLocation = new File(url.getFile)
+        val destTableName = "tab1"
+        val srcTableName = "tab2"
+
+        withTable(srcTableName, destTableName) {
+          versionSpark.sql(
+            s"""
+               |CREATE EXTERNAL TABLE $srcTableName
+               |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+               |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true')
+               |STORED AS
+               |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+               |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+               |LOCATION '$srcLocation'
+               |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl')
+           """.stripMargin
+          )
+
+          versionSpark.sql(
+            s"""
+               |CREATE TABLE $destTableName
+               |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+               |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true')
+               |STORED AS
+               |  INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+               |  OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+               |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl')
+           """.stripMargin
+          )
+          versionSpark.sql(
+            s"""INSERT OVERWRITE TABLE $destTableName SELECT * FROM 
$srcTableName""")
+          val result = versionSpark.table(srcTableName).collect()
+          assert(versionSpark.table(destTableName).collect() === result)
+          versionSpark.sql(
+            s"""INSERT INTO TABLE $destTableName SELECT * FROM 
$srcTableName""")
+          assert(versionSpark.table(destTableName).collect().toSeq === result 
++ result)
+        }
+      }
+    }
     // TODO: add more tests.
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to