n3nash commented on a change in pull request #2927:
URL: https://github.com/apache/hudi/pull/2927#discussion_r629683323



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -111,6 +112,34 @@ object HoodieSparkUtils {
       }
   }
 
+  def createRddWithLatestSchema(df: DataFrame, latestSchema: Schema, 
structName: String, recordNamespace: String): RDD[GenericRecord] = {
+    val avroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
recordNamespace)
+    // if schema generated from df.schema is same as latest schema, no special 
handling is required.
+    if(TableSchemaResolver.isSchemaEquals(avroSchema, latestSchema)) {
+      createRdd(df, avroSchema, structName, recordNamespace)
+    } else { // if not, it means that table schema got evolved, but this batch 
of records were generated with an older
+      // schema.
+      createRddWithLatestSchema(df, avroSchema, latestSchema, structName, 
recordNamespace)
+    }
+  }
+
+  def createRddWithLatestSchema(df: DataFrame, avroSchema: Schema, 
latestSchema: Schema, structName: String, recordNamespace: String)
+  : RDD[GenericRecord] = {
+    // Use the Avro schema to derive the StructType which has the correct 
nullability information
+    val dataType = 
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+    val encoder = RowEncoder.apply(dataType).resolveAndBind()
+    val deserializer = HoodieSparkUtils.createRowSerDe(encoder)
+    val latestDataType = 
SchemaConverters.toSqlType(latestSchema).dataType.asInstanceOf[StructType]

Review comment:
       Let's fold this into a single method to avoid more confusions down the 
road. We can sync f2f today.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to