nsivabalan commented on a change in pull request #2927:
URL: https://github.com/apache/hudi/pull/2927#discussion_r629421999
##########
File path:
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -139,6 +139,7 @@ object AvroConversionHelper {
val length = struct.fields.length
val converters = new Array[AnyRef => AnyRef](length)
val avroFieldIndexes = new Array[Int](length)
+ //val avroFieldNames = new Array[String](length)
Review comment:
I am not making this change in this diff as I am not sure in general,
AvroConversionHelper and AvroConversionUtils are capable of handling schema
evolution where new columns are added in the middle. Need to do some more
investigation to validate this. But the tests are succeeding w/o this fix and
hence leaving it this way for now.
##########
File path:
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -111,6 +112,34 @@ object HoodieSparkUtils {
}
}
+ def createRddWithLatestSchema(df: DataFrame, latestSchema: Schema,
structName: String, recordNamespace: String): RDD[GenericRecord] = {
+ val avroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName,
recordNamespace)
+ // if schema generated from df.schema is same as latest schema, no special
handling is required.
+ if(TableSchemaResolver.isSchemaEquals(avroSchema, latestSchema)) {
+ createRdd(df, avroSchema, structName, recordNamespace)
+ } else { // if not, it means that table schema got evolved, but this batch
of records were generated with an older
+ // schema.
+ createRddWithLatestSchema(df, avroSchema, latestSchema, structName,
recordNamespace)
+ }
+ }
+
+ def createRddWithLatestSchema(df: DataFrame, avroSchema: Schema,
latestSchema: Schema, structName: String, recordNamespace: String)
+ : RDD[GenericRecord] = {
+ // Use the Avro schema to derive the StructType which has the correct
nullability information
+ val dataType =
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+ val encoder = RowEncoder.apply(dataType).resolveAndBind()
+ val deserializer = HoodieSparkUtils.createRowSerDe(encoder)
+ val latestDataType =
SchemaConverters.toSqlType(latestSchema).dataType.asInstanceOf[StructType]
Review comment:
this path is different from existing createRdd(). in this method, we
first deserialize DF<Row> with old schema, and then use latestSchema to convert
to avro in line 137. where as in createRdd(), same schema is used in both
places. We can try to fold both these into single method. open to discussion.
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -353,6 +360,89 @@ public static boolean isSchemaCompatible(String oldSchema,
String newSchema) {
return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new
Schema.Parser().parse(newSchema));
}
+ /**
+ * Get latest schema either from incoming schema or table schema.
+ * @param incomingSchema incoming batch's schema.
+ * @param convertTableSchemaToAddNamespace {@code true} if table schema
needs to be converted to add namespace. false otherwise.
+ * @param converterFn converter function to be called over table schema.
+ * @return the latest schema.
+ */
+ public Schema getLatestSchema(Schema incomingSchema, boolean
convertTableSchemaToAddNamespace,
+ Function1<Schema, Schema> converterFn) {
+ Schema latestSchema = incomingSchema;
+ try {
+ if (isTimelineNonEmpty()) {
+ Schema tableSchema = getTableAvroSchemaWithoutMetadataFields();
+ if (convertTableSchemaToAddNamespace) {
+ tableSchema = converterFn.apply(tableSchema);
+ }
+ if (incomingSchema.getFields().size() < tableSchema.getFields().size()
&& isSchemaSubset(tableSchema, incomingSchema)) {
Review comment:
we can't really introduce schema comparators as two schemas are not
comparable similar to numbers or strings. for eg, if two schemas have same no
of columns, but differs in data type of one column, do we return -1 or 1? So,
resorting to use isSchemaSubset(). let me know if that makes sense.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]