This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 29e154b [SPARK-27858][SQL] Fix for avro deserialization on union
types with multiple non-null types
29e154b is described below
commit 29e154b2f12058c59eaa411989ead833119f165f
Author: Gabbi Merz <[email protected]>
AuthorDate: Mon May 27 20:09:23 2019 -0700
[SPARK-27858][SQL] Fix for avro deserialization on union types with
multiple non-null types
## What changes were proposed in this pull request?
This PR aims to fix an issue on a union avro type with more than one
non-null value (for instance `["string", "null", "int"]`) whose the
deserialization to a DataFrame would throw a
`java.lang.ArrayIndexOutOfBoundsException`. The issue was that the
`fieldWriter` relied on the index from the avro schema before nulls were
filtered out.
## How was this patch tested?
A test for the case of multiple non-null values was added and the tests
were run using sbt by running `testOnly org.apache.spark.sql.avro.AvroSuite`
Closes #24722 from gcmerz/master.
Authored-by: Gabbi Merz <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/avro/AvroDeserializer.scala | 3 ++-
.../org/apache/spark/sql/avro/AvroSuite.scala | 26 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 1 deletion(-)
diff --git
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 4e2224b..baacced 100644
---
a/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++
b/external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -225,6 +225,7 @@ class AvroDeserializer(rootAvroType: Schema,
rootCatalystType: DataType) {
case (UNION, _) =>
val allTypes = avroType.getTypes.asScala
val nonNullTypes = allTypes.filter(_.getType != NULL)
+ val nonNullAvroType = Schema.createUnion(nonNullTypes.asJava)
if (nonNullTypes.nonEmpty) {
if (nonNullTypes.length == 1) {
newWriter(nonNullTypes.head, catalystType, path)
@@ -253,7 +254,7 @@ class AvroDeserializer(rootAvroType: Schema,
rootCatalystType: DataType) {
(updater, ordinal, value) => {
val row = new SpecificInternalRow(st)
val fieldUpdater = new RowUpdater(row)
- val i = GenericData.get().resolveUnion(avroType, value)
+ val i = GenericData.get().resolveUnion(nonNullAvroType,
value)
fieldWriters(i)(fieldUpdater, i, value)
updater.set(ordinal, row)
}
diff --git
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index e4eeb78..49aa218 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -247,6 +247,32 @@ class AvroSuite extends QueryTest with SharedSQLContext
with SQLTestUtils {
}
}
+ test("SPARK-27858 Union type: More than one non-null type") {
+ withTempDir { dir =>
+ val complexNullUnionType = Schema.createUnion(
+ List(Schema.create(Type.INT), Schema.create(Type.NULL),
Schema.create(Type.STRING)).asJava)
+ val fields = Seq(
+ new Field("field1", complexNullUnionType, "doc",
null.asInstanceOf[AnyVal])).asJava
+ val schema = Schema.createRecord("name", "docs", "namespace", false)
+ schema.setFields(fields)
+ val datumWriter = new GenericDatumWriter[GenericRecord](schema)
+ val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
+ dataFileWriter.create(schema, new File(s"$dir.avro"))
+ val avroRec = new GenericData.Record(schema)
+ avroRec.put("field1", 42)
+ dataFileWriter.append(avroRec)
+ val avroRec2 = new GenericData.Record(schema)
+ avroRec2.put("field1", "Alice")
+ dataFileWriter.append(avroRec2)
+ dataFileWriter.flush()
+ dataFileWriter.close()
+
+ val df = spark.read.format("avro").load(s"$dir.avro")
+ assert(df.schema === StructType.fromDDL("field1 struct<member0: int,
member1: string>"))
+ assert(df.collect().toSet == Set(Row(Row(42, null)), Row(Row(null,
"Alice"))))
+ }
+ }
+
test("Complex Union Type") {
withTempPath { dir =>
val fixedSchema = Schema.createFixed("fixed_name", "doc", "namespace", 4)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]