Repository: spark
Updated Branches:
refs/heads/master 881c5c807 -> e0d7665ce
[SPARK-17920][SPARK-19580][SPARK-19878][SQL] Support writing to Hive table
which uses Avro schema url 'avro.schema.url'
## What changes were proposed in this pull request?
SPARK-19580 Support for avro.schema.url while writing to hive table
SPARK-19878 Add hive configuration when initialize hive serde in
InsertIntoHiveTable.scala
SPARK-17920 HiveWriterContainer passes null configuration to serde.initialize,
causing NullPointerException in AvroSerde when using avro.schema.url
Support writing to Hive table which uses Avro schema url 'avro.schema.url'
For ex:
create external table avro_in (a string) stored as avro location '/avro-in/'
tblproperties ('avro.schema.url'='/avro-schema/avro.avsc');
create external table avro_out (a string) stored as avro location '/avro-out/'
tblproperties ('avro.schema.url'='/avro-schema/avro.avsc');
insert overwrite table avro_out select * from avro_in; // fails with
java.lang.NullPointerException
WARN AvroSerDe: Encountered exception determining schema. Returning signal
schema to indicate problem
java.lang.NullPointerException
at org.apache.hadoop.fs.FileSystem.getDefaultUri(FileSystem.java:182)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:174)
## Changes proposed in this fix
Currently 'null' value is passed to serializer, which causes NPE during insert
operation, instead pass Hadoop configuration object
## How was this patch tested?
Added new test case in VersionsSuite
Author: vinodkc <[email protected]>
Closes #19779 from vinodkc/br_Fix_SPARK-17920.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0d7665c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0d7665c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0d7665c
Branch: refs/heads/master
Commit: e0d7665cec1e6954d640f422c79ebba4c273be7d
Parents: 881c5c8
Author: vinodkc <[email protected]>
Authored: Tue Nov 21 22:31:46 2017 -0800
Committer: gatorsmile <[email protected]>
Committed: Tue Nov 21 22:31:46 2017 -0800
----------------------------------------------------------------------
.../sql/hive/execution/HiveFileFormat.scala | 4 +-
.../spark/sql/hive/client/VersionsSuite.scala | 72 +++++++++++++++++++-
2 files changed, 73 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/e0d7665c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index ac735e8..4a7cd69 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -116,7 +116,7 @@ class HiveOutputWriter(
private val serializer = {
val serializer =
tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
- serializer.initialize(null, tableDesc.getProperties)
+ serializer.initialize(jobConf, tableDesc.getProperties)
serializer
}
@@ -130,7 +130,7 @@ class HiveOutputWriter(
private val standardOI = ObjectInspectorUtils
.getStandardObjectInspector(
- tableDesc.getDeserializer.getObjectInspector,
+ tableDesc.getDeserializer(jobConf).getObjectInspector,
ObjectInspectorCopyOption.JAVA)
.asInstanceOf[StructObjectInspector]
http://git-wip-us.apache.org/repos/asf/spark/blob/e0d7665c/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 9ed39cc..fbf6877 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive.client
-import java.io.{ByteArrayOutputStream, File, PrintStream}
+import java.io.{ByteArrayOutputStream, File, PrintStream, PrintWriter}
import java.net.URI
import org.apache.hadoop.conf.Configuration
@@ -841,6 +841,76 @@ class VersionsSuite extends SparkFunSuite with Logging {
}
}
+ test(s"$version: SPARK-17920: Insert into/overwrite avro table") {
+ withTempDir { dir =>
+ val path = dir.getAbsolutePath
+ val schemaPath = s"""$path${File.separator}avroschemadir"""
+
+ new File(schemaPath).mkdir()
+ val avroSchema =
+ """{
+ | "name": "test_record",
+ | "type": "record",
+ | "fields": [ {
+ | "name": "f0",
+ | "type": [
+ | "null",
+ | {
+ | "precision": 38,
+ | "scale": 2,
+ | "type": "bytes",
+ | "logicalType": "decimal"
+ | }
+ | ]
+ | } ]
+ |}
+ """.stripMargin
+ val schemaUrl = s"""$schemaPath${File.separator}avroDecimal.avsc"""
+ val schemaFile = new File(schemaPath, "avroDecimal.avsc")
+ val writer = new PrintWriter(schemaFile)
+ writer.write(avroSchema)
+ writer.close()
+
+ val url =
Thread.currentThread().getContextClassLoader.getResource("avroDecimal")
+ val srcLocation = new File(url.getFile)
+ val destTableName = "tab1"
+ val srcTableName = "tab2"
+
+ withTable(srcTableName, destTableName) {
+ versionSpark.sql(
+ s"""
+ |CREATE EXTERNAL TABLE $srcTableName
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+ |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true')
+ |STORED AS
+ | INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+ | OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+ |LOCATION '$srcLocation'
+ |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl')
+ """.stripMargin
+ )
+
+ versionSpark.sql(
+ s"""
+ |CREATE TABLE $destTableName
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
+ |WITH SERDEPROPERTIES ('respectSparkSchema' = 'true')
+ |STORED AS
+ | INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
+ | OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
+ |TBLPROPERTIES ('avro.schema.url' = '$schemaUrl')
+ """.stripMargin
+ )
+ versionSpark.sql(
+ s"""INSERT OVERWRITE TABLE $destTableName SELECT * FROM
$srcTableName""")
+ val result = versionSpark.table(srcTableName).collect()
+ assert(versionSpark.table(destTableName).collect() === result)
+ versionSpark.sql(
+ s"""INSERT INTO TABLE $destTableName SELECT * FROM
$srcTableName""")
+ assert(versionSpark.table(destTableName).collect().toSeq === result
++ result)
+ }
+ }
+ }
// TODO: add more tests.
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]