This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8547899a39 [HUDI-4285] add ByteBuffer#rewind after ByteBuffer#get in 
AvroDeseria… (#5907)
8547899a39 is described below

commit 8547899a39168c8399d32ee7ee22f35dfe3f7c84
Author: komao <[email protected]>
AuthorDate: Thu Jun 30 20:48:50 2022 +0800

    [HUDI-4285] add ByteBuffer#rewind after ByteBuffer#get in AvroDeseria… 
(#5907)
    
    * [HUDI-4285] add ByteBuffer#rewind after ByteBuffer#get in AvroDeserializer
    
    * add ut
    
    Co-authored-by: wangzixuan.wzxuan <[email protected]>
---
 .../org/apache/hudi/TestAvroConversionUtils.scala  | 57 +++++++++++++++++++++-
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  2 +
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  2 +
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  2 +
 4 files changed, 62 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
index bacd44753d..16df1f869c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
@@ -18,8 +18,13 @@
 
 package org.apache.hudi
 
+import java.nio.ByteBuffer
+import java.util.Objects
 import org.apache.avro.Schema
-import org.apache.spark.sql.types.{DataTypes, StructType, StringType, 
ArrayType}
+import org.apache.avro.generic.GenericData
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, DataTypes, 
MapType, StringType, StructField, StructType}
 import org.scalatest.{FunSuite, Matchers}
 
 class TestAvroConversionUtils extends FunSuite with Matchers {
@@ -377,4 +382,54 @@ class TestAvroConversionUtils extends FunSuite with 
Matchers {
 
     assert(avroSchema.equals(expectedAvroSchema))
   }
+
+  test("test converter with binary") {
+    val avroSchema = new 
Schema.Parser().parse("{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\""
+      + 
":[{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null}]}")
+    val sparkSchema = StructType(List(StructField("col9", BinaryType, nullable 
= true)))
+    // create a test record with avroSchema
+    val avroRecord = new GenericData.Record(avroSchema)
+    val bb = ByteBuffer.wrap(Array[Byte](97, 48, 53))
+    avroRecord.put("col9", bb)
+    val row1 = 
AvroConversionUtils.createAvroToInternalRowConverter(avroSchema, 
sparkSchema).apply(avroRecord).get
+    val row2 = 
AvroConversionUtils.createAvroToInternalRowConverter(avroSchema, 
sparkSchema).apply(avroRecord).get
+    internalRowCompare(row1, row2, sparkSchema)
+  }
+
+  private def internalRowCompare(expected: Any, actual: Any, schema: 
DataType): Unit = {
+    schema match {
+      case StructType(fields) =>
+        val expectedRow = expected.asInstanceOf[InternalRow]
+        val actualRow = actual.asInstanceOf[InternalRow]
+        fields.zipWithIndex.foreach { case (field, i) => 
internalRowCompare(expectedRow.get(i, field.dataType), actualRow.get(i, 
field.dataType), field.dataType) }
+      case ArrayType(elementType, _) =>
+        val expectedArray = 
expected.asInstanceOf[ArrayData].toSeq[Any](elementType)
+        val actualArray = 
actual.asInstanceOf[ArrayData].toSeq[Any](elementType)
+        if (expectedArray.size != actualArray.size) {
+          throw new AssertionError()
+        } else {
+          expectedArray.zip(actualArray).foreach { case (e1, e2) => 
internalRowCompare(e1, e2, elementType) }
+        }
+      case MapType(keyType, valueType, _) =>
+        val expectedKeyArray = expected.asInstanceOf[MapData].keyArray()
+        val expectedValueArray = expected.asInstanceOf[MapData].valueArray()
+        val actualKeyArray = actual.asInstanceOf[MapData].keyArray()
+        val actualValueArray = actual.asInstanceOf[MapData].valueArray()
+        internalRowCompare(expectedKeyArray, actualKeyArray, 
ArrayType(keyType))
+        internalRowCompare(expectedValueArray, actualValueArray, 
ArrayType(valueType))
+      case StringType => if (checkNull(expected, actual) || 
!expected.toString.equals(actual.toString)) {
+        throw new AssertionError(String.format("%s is not equals %s", 
expected.toString, actual.toString))
+      }
+      case BinaryType => if (checkNull(expected, actual) || 
!expected.asInstanceOf[Array[Byte]].sameElements(actual.asInstanceOf[Array[Byte]]))
 {
+        throw new AssertionError(String.format("%s is not equals %s", 
expected.toString, actual.toString))
+      }
+      case _ => if (!Objects.equals(expected, actual)) {
+        throw new AssertionError(String.format("%s is not equals %s", 
expected.toString, actual.toString))
+      }
+    }
+  }
+
+  private def checkNull(left: Any, right: Any): Boolean = {
+    (left == null && right != null) || (left == null && right != null)
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 2e0946f1eb..385577dd30 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -146,6 +146,8 @@ class AvroDeserializer(rootAvroType: Schema, 
rootCatalystType: DataType) {
           case b: ByteBuffer =>
             val bytes = new Array[Byte](b.remaining)
             b.get(bytes)
+            // Do not forget to reset the position
+            b.rewind()
             bytes
           case b: Array[Byte] => b
           case other => throw new RuntimeException(s"$other is not a valid 
avro binary.")
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 717df0f407..5fb6d907bd 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -167,6 +167,8 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
           case b: ByteBuffer =>
             val bytes = new Array[Byte](b.remaining)
             b.get(bytes)
+            // Do not forget to reset the position
+            b.rewind()
             bytes
           case b: Array[Byte] => b
           case other => throw new RuntimeException(s"$other is not a valid 
avro binary.")
diff --git 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index ef9b590920..0b60933075 100644
--- 
a/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark3/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -181,6 +181,8 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
           case b: ByteBuffer =>
             val bytes = new Array[Byte](b.remaining)
             b.get(bytes)
+            // Do not forget to reset the position
+            b.rewind()
             bytes
           case b: Array[Byte] => b
           case other =>

Reply via email to