nsivabalan commented on code in PR #5376:
URL: https://github.com/apache/hudi/pull/5376#discussion_r855238436
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -405,6 +407,18 @@ public static GenericRecord
rewriteRecordWithMetadata(GenericRecord genericRecor
return newRecord;
}
+ // TODO Unify the logical of rewriteRecordWithMetadata and
rewriteEvolutionRecordWithMetadata, and delete this function.
+ public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord
genericRecord, Schema newSchema, String fileName) {
+ GenericRecord newRecord =
HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new
HashMap<>());
+ // do not preserve FILENAME_METADATA_FIELD
+ newRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, fileName);
+ if (!GenericData.get().validate(newSchema, newRecord)) {
Review Comment:
won't this take a perf hit if we validate schema compatability for every
record? can't we move this outside and do it once for just one of the record?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -109,10 +113,14 @@ public void runMerge(HoodieTable<T,
HoodieData<HoodieRecord<T>>, HoodieData<Hood
&& writeInternalSchema.findIdByName(f) ==
querySchema.findIdByName(f)
&& writeInternalSchema.findIdByName(f) != -1
&&
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
- readSchema = AvroInternalSchemaConverter.convert(new
InternalSchemaMerger(writeInternalSchema, querySchema, true,
false).mergeSchema(), readSchema.getName());
+ readSchema = AvroInternalSchemaConverter
+ .convert(new InternalSchemaMerger(writeInternalSchema, querySchema,
true, false, false).mergeSchema(), readSchema.getName());
Schema writeSchemaFromFile =
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
||
SchemaCompatibility.checkReaderWriterCompatibility(writeSchemaFromFile,
readSchema).getType() ==
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
+ if (needToReWriteRecord) {
Review Comment:
not exactly related to this patch. but in L120, are we passing the
arguements in right order? from the
docs(SchemaCompatibility.checkReaderWriterCompatibility()), first arg refers to
reader schema and 2nd arg refers to writer schema.
can you check that once please?
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -431,6 +431,18 @@ public static GenericRecord
rewriteRecordWithMetadata(GenericRecord genericRecor
return newRecord;
}
+ // TODO Unify the logical of rewriteRecordWithMetadata and
rewriteEvolutionRecordWithMetadata, and delete this function.
+ public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord
genericRecord, Schema newSchema, String fileName) {
+ GenericRecord newRecord =
HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new
HashMap<>());
+ // do not preserve FILENAME_METADATA_FIELD
Review Comment:
yes, lets revisit after 0.11 to see if we can avoid full rewrite in some
cases. I understand the intent to recreate new record to avoid mutations, but
it does incur perf hits. I should have thought about this when we fixed the
HoodieMergeHandle for commit time fix in earlier patch. missed to bring it up.
##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -741,10 +769,23 @@ private static Object rewriteRecordWithNewSchema(Object
oldRecord, Schema oldSch
for (int i = 0; i < fields.size(); i++) {
Schema.Field field = fields.get(i);
+ String fieldName = field.name();
+ fieldNames.push(fieldName);
if (oldSchema.getField(field.name()) != null) {
Schema.Field oldField = oldSchema.getField(field.name());
- helper.put(i,
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()),
oldField.schema(), fields.get(i).schema()));
+ helper.put(i,
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()),
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
Review Comment:
I am bit skeptical on making such changes at this stage of the release. Can
we add a new method when renameCols is not empty and not touch existing code
(if schema evol is not enabled). Atleast I want to make sure we don't cause any
unintentional regression for non schema evol code path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]