nsivabalan commented on a change in pull request #2927:
URL: https://github.com/apache/hudi/pull/2927#discussion_r629421999



##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -139,6 +139,7 @@ object AvroConversionHelper {
           val length = struct.fields.length
           val converters = new Array[AnyRef => AnyRef](length)
           val avroFieldIndexes = new Array[Int](length)
+          //val avroFieldNames = new Array[String](length)

Review comment:
       I am not making this change in this diff as I am not sure in general, 
AvroConversionHelper and AvroConversionUtils are capable of handling schema 
evolution where new columns are added in the middle. Need to do some more 
investigation to validate this. But the tests are succeeding w/o this fix and 
hence leaving it this way for now. 

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -111,6 +112,34 @@ object HoodieSparkUtils {
       }
   }
 
+  def createRddWithLatestSchema(df: DataFrame, latestSchema: Schema, 
structName: String, recordNamespace: String): RDD[GenericRecord] = {
+    val avroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
recordNamespace)
+    // if schema generated from df.schema is same as latest schema, no special 
handling is required.
+    if(TableSchemaResolver.isSchemaEquals(avroSchema, latestSchema)) {
+      createRdd(df, avroSchema, structName, recordNamespace)
+    } else { // if not, it means that table schema got evolved, but this batch 
of records were generated with an older
+      // schema.
+      createRddWithLatestSchema(df, avroSchema, latestSchema, structName, 
recordNamespace)
+    }
+  }
+
+  def createRddWithLatestSchema(df: DataFrame, avroSchema: Schema, 
latestSchema: Schema, structName: String, recordNamespace: String)
+  : RDD[GenericRecord] = {
+    // Use the Avro schema to derive the StructType which has the correct 
nullability information
+    val dataType = 
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+    val encoder = RowEncoder.apply(dataType).resolveAndBind()
+    val deserializer = HoodieSparkUtils.createRowSerDe(encoder)
+    val latestDataType = 
SchemaConverters.toSqlType(latestSchema).dataType.asInstanceOf[StructType]

Review comment:
       this path is different from existing createRdd(). in this method, we 
first deserialize DF<Row> with old schema, and then use latestSchema to convert 
to avro in line 137. where as in createRdd(), same schema is used in both 
places. We can try to fold both these into single method. open to discussion. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -353,6 +360,89 @@ public static boolean isSchemaCompatible(String oldSchema, 
String newSchema) {
     return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new 
Schema.Parser().parse(newSchema));
   }
 
+  /**
+   * Get latest schema either from incoming schema or table schema.
+   * @param incomingSchema incoming batch's schema.
+   * @param convertTableSchemaToAddNamespace {@code true} if table schema 
needs to be converted to add namespace. false otherwise.
+   * @param converterFn converter function to be called over table schema.
+   * @return the latest schema.
+   */
+  public Schema getLatestSchema(Schema incomingSchema, boolean 
convertTableSchemaToAddNamespace,
+      Function1<Schema, Schema> converterFn) {
+    Schema latestSchema = incomingSchema;
+    try {
+      if (isTimelineNonEmpty()) {
+        Schema tableSchema = getTableAvroSchemaWithoutMetadataFields();
+        if (convertTableSchemaToAddNamespace) {
+          tableSchema = converterFn.apply(tableSchema);
+        }
+        if (incomingSchema.getFields().size() < tableSchema.getFields().size() 
&& isSchemaSubset(tableSchema, incomingSchema)) {

Review comment:
       we can't really introduce schema comparators as two schemas are not 
comparable similar to numbers or strings. for eg, if two schemas have same no 
of columns, but differs in data type of one column, do we return -1 or 1? So, 
resorting to use isSchemaSubset(). let me know if that makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to