nsivabalan commented on a change in pull request #4789:
URL: https://github.com/apache/hudi/pull/4789#discussion_r812932709



##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -133,46 +129,49 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, 
fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
reconcileToLatestSchema: Boolean, latestTableSchema:
-  org.apache.hudi.common.util.Option[Schema] = 
org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
-    val dfWriteSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
recordNamespace)
-    var writeSchema : Schema = null;
-    var toReconcileSchema : Schema = null;
-    if (reconcileToLatestSchema && latestTableSchema.isPresent) {
-      // if reconcileToLatestSchema is set to true and latestSchema is 
present, then try to leverage latestTableSchema.
-      // this code path will handle situations where records are serialized in 
odl schema, but callers wish to convert
-      // to Rdd[GenericRecord] using different schema(could be evolved schema 
or could be latest table schema)
-      writeSchema = dfWriteSchema
-      toReconcileSchema = latestTableSchema.get()
-    } else {
-      // there are paths where callers wish to use latestTableSchema to 
convert to Rdd[GenericRecords] and not use
-      // row's schema. So use latestTableSchema if present. if not available, 
fallback to using row's schema.
-      writeSchema = if (latestTableSchema.isPresent) { 
latestTableSchema.get()} else { dfWriteSchema}
-    }
-    createRddInternal(df, writeSchema, toReconcileSchema, structName, 
recordNamespace)
+  /**
+   * @deprecated please use other overload [[createRdd]]
+   */
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
reconcileToLatestSchema: Boolean,
+                latestTableSchema: org.apache.hudi.common.util.Option[Schema] 
= org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
+    val latestTableSchemaConverted = if (latestTableSchema.isPresent && 
reconcileToLatestSchema) Some(latestTableSchema.get()) else None
+    createRdd(df, structName, recordNamespace, latestTableSchemaConverted)
   }
 
-  def createRddInternal(df: DataFrame, writeSchema: Schema, latestTableSchema: 
Schema, structName: String, recordNamespace: String)
-  : RDD[GenericRecord] = {
-    // Use the write avro schema to derive the StructType which has the 
correct nullability information
-    val writeDataType = 
AvroConversionUtils.convertAvroSchemaToStructType(writeSchema)
-    val encoder = RowEncoder.apply(writeDataType).resolveAndBind()
-    val deserializer = sparkAdapter.createSparkRowSerDe(encoder)
-    // if records were serialized with old schema, but an evolved schema was 
passed in with latestTableSchema, we need
-    // latestTableSchema equivalent datatype to be passed in to 
AvroConversionHelper.createConverterToAvro()
-    val reconciledDataType =
-      if (latestTableSchema != null) 
AvroConversionUtils.convertAvroSchemaToStructType(latestTableSchema) else 
writeDataType
-    // Note: deserializer.deserializeRow(row) is not capable of handling 
evolved schema. i.e. if Row was serialized in
-    // old schema, but deserializer was created with an encoder with evolved 
schema, deserialization fails.
-    // Hence we always need to deserialize in the same schema as serialized 
schema.
-    df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
-      .mapPartitions { records =>
-        if (records.isEmpty) Iterator.empty
-        else {
-          val convertor = 
AvroConversionHelper.createConverterToAvro(reconciledDataType, structName, 
recordNamespace)
-          records.map { x => convertor(x).asInstanceOf[GenericRecord] }
-        }
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
readerAvroSchemaOpt: Option[Schema]): RDD[GenericRecord] = {
+    val writerSchema = df.schema
+    val writerAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(writerSchema, structName, 
recordNamespace)
+    val readerAvroSchema = readerAvroSchemaOpt.getOrElse(writerAvroSchema)
+    // We check whether passed in reader schema is identical to writer schema 
to avoid costly serde loop of
+    // making Spark deserialize its internal representation [[InternalRow]] 
into [[Row]] for subsequent conversion
+    // (and back)
+    val sameSchema = writerAvroSchema.equals(readerAvroSchema)
+    val (nullable, _) = 
AvroConversionUtils.resolveAvroTypeNullability(writerAvroSchema)
+
+    // NOTE: We have to serialize Avro schema, and then subsequently parse it 
on the executor node, since Spark
+    //       serializer is not able to digest it
+    val readerAvroSchemaStr = readerAvroSchema.toString
+    val writerAvroSchemaStr = writerAvroSchema.toString
+    // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to 
[[Row]] conversion
+    df.queryExecution.toRdd.mapPartitions { rows =>
+      if (rows.isEmpty) {
+        Iterator.empty
+      } else {
+        val transform: GenericRecord => GenericRecord =
+          if (sameSchema) identity
+          else {
+            val readerAvroSchema = new 
Schema.Parser().parse(readerAvroSchemaStr)
+            rewriteRecord(_, readerAvroSchema)

Review comment:
       sounds good ! 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to