This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
commit f796f02791fb792a94ef7b652f2ee862d454a5b7 Author: sivabalan <[email protected]> AuthorDate: Tue Dec 13 07:28:26 2022 -0800 resolving conflicts for #7334 --- .../scala/org/apache/hudi/HoodieDataSourceHelper.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala index 8bd295c7f3d..47c7b6efece 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala @@ -18,13 +18,17 @@ package org.apache.hudi +import org.apache.avro.Schema +import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileStatus import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.util.StringUtils.isNullOrEmpty +import org.apache.hudi.common.util.ValidationUtils.checkState import org.apache.hudi.internal.schema.InternalSchema import org.apache.hudi.internal.schema.utils.SerDeHelper import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.avro.HoodieAvroDeserializer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{PredicateHelper, SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.execution.datasources.PartitionedFile @@ -84,4 +88,17 @@ object HoodieDataSourceHelper extends PredicateHelper with SparkAdapterSupport { PartitionedFile(partitionValues, filePath.toUri.toString, offset, size) } } + + trait AvroDeserializerSupport extends SparkAdapterSupport { + protected val avroSchema: Schema + protected val structTypeSchema: StructType + + private lazy val deserializer: HoodieAvroDeserializer = + sparkAdapter.createAvroDeserializer(avroSchema, structTypeSchema) + + protected def deserialize(avroRecord: GenericRecord): InternalRow = { + checkState(avroRecord.getSchema.getFields.size() == structTypeSchema.fields.length) + deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow] + } + } }
