This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.12.2-shadow
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f796f02791fb792a94ef7b652f2ee862d454a5b7
Author: sivabalan <[email protected]>
AuthorDate: Tue Dec 13 07:28:26 2022 -0800

    resolving conflicts for #7334
---
 .../scala/org/apache/hudi/HoodieDataSourceHelper.scala  | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
index 8bd295c7f3d..47c7b6efece 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
@@ -18,13 +18,17 @@
 
 package org.apache.hudi
 
+import org.apache.avro.Schema
+import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileStatus
 import org.apache.hudi.client.utils.SparkInternalSchemaConverter
 import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.utils.SerDeHelper
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.avro.HoodieAvroDeserializer
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, 
SpecificInternalRow, UnsafeProjection}
 import org.apache.spark.sql.execution.datasources.PartitionedFile
@@ -84,4 +88,17 @@ object HoodieDataSourceHelper extends PredicateHelper with 
SparkAdapterSupport {
       PartitionedFile(partitionValues, filePath.toUri.toString, offset, size)
     }
   }
+
+  trait AvroDeserializerSupport extends SparkAdapterSupport {
+    protected val avroSchema: Schema
+    protected val structTypeSchema: StructType
+
+    private lazy val deserializer: HoodieAvroDeserializer =
+      sparkAdapter.createAvroDeserializer(avroSchema, structTypeSchema)
+
+    protected def deserialize(avroRecord: GenericRecord): InternalRow = {
+      checkState(avroRecord.getSchema.getFields.size() == 
structTypeSchema.fields.length)
+      deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
+    }
+  }
 }

Reply via email to