prashantwason commented on a change in pull request #2424:
URL: https://github.com/apache/hudi/pull/2424#discussion_r555258349
##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -292,53 +284,57 @@ public static GenericRecord stitchRecords(GenericRecord
left, GenericRecord righ
return result;
}
- /**
- * Given a avro record with a given schema, rewrites it into the new schema
while setting fields only from the old
- * schema.
- */
- public static GenericRecord rewriteRecord(GenericRecord record, Schema
newSchema) {
- return rewrite(record, getCombinedFieldsToWrite(record.getSchema(),
newSchema), newSchema);
- }
-
/**
* Given a avro record with a given schema, rewrites it into the new schema
while setting fields only from the new
* schema.
+ * NOTE: Here, the assumption is that you cannot go from an evolved schema
(schema with (N) fields)
+ * to an older schema (schema with (N-1) fields). All fields present in the
older record schema MUST be present in the
+ * new schema and the default/existing values are carried over.
+ * This particular method does the following things :
+ * a) Create a new empty GenericRecord with the new schema.
+ * b) Set default values for all fields of this transformed schema in the
new GenericRecord or copy over the data
+ * from the old schema to the new schema
+ * c) hoodie_metadata_fields have a special treatment. This is done because
for code generated AVRO classes
+ * (only HoodieMetadataRecord), the avro record is a SpecificBaseRecord type
instead of a GenericRecord.
+ * SpecificBaseRecord throws null pointer exception for record.get(name) if
name is not present in the schema of the
+ * record (which happens when converting a SpecificBaseRecord without
hoodie_metadata_fields to a new record with.
*/
- public static GenericRecord
rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) {
- return rewrite(record, new LinkedHashSet<>(newSchema.getFields()),
newSchema);
- }
-
- private static GenericRecord rewrite(GenericRecord record,
LinkedHashSet<Field> fieldsToWrite, Schema newSchema) {
+ public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema
newSchema) {
GenericRecord newRecord = new GenericData.Record(newSchema);
- for (Schema.Field f : fieldsToWrite) {
- if (record.get(f.name()) == null) {
+ boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase;
+ for (Schema.Field f : newSchema.getFields()) {
+ if (!isMetadataField(f.name()) && oldRecord.get(f.name()) == null) {
+ // if not metadata field, set defaults
if (f.defaultVal() instanceof JsonProperties.Null) {
newRecord.put(f.name(), null);
} else {
newRecord.put(f.name(), f.defaultVal());
}
- } else {
- newRecord.put(f.name(), record.get(f.name()));
+ } else if (!isMetadataField(f.name())) {
+ // if not metadata field, copy old value
+ newRecord.put(f.name(), oldRecord.get(f.name()));
+ } else if (!isSpecificRecord) {
+ // if not specific record, copy value for hoodie metadata fields as
well
Review comment:
This seems counter-intuitive to the comment in the method.
If SpecificRecord.get() throws NULL exception if the field is not there,
wont we want to populate the metadata fields for it?
##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -292,53 +284,57 @@ public static GenericRecord stitchRecords(GenericRecord
left, GenericRecord righ
return result;
}
- /**
- * Given a avro record with a given schema, rewrites it into the new schema
while setting fields only from the old
- * schema.
- */
- public static GenericRecord rewriteRecord(GenericRecord record, Schema
newSchema) {
- return rewrite(record, getCombinedFieldsToWrite(record.getSchema(),
newSchema), newSchema);
- }
-
/**
* Given a avro record with a given schema, rewrites it into the new schema
while setting fields only from the new
* schema.
+ * NOTE: Here, the assumption is that you cannot go from an evolved schema
(schema with (N) fields)
+ * to an older schema (schema with (N-1) fields). All fields present in the
older record schema MUST be present in the
+ * new schema and the default/existing values are carried over.
+ * This particular method does the following things :
+ * a) Create a new empty GenericRecord with the new schema.
+ * b) Set default values for all fields of this transformed schema in the
new GenericRecord or copy over the data
+ * from the old schema to the new schema
+ * c) hoodie_metadata_fields have a special treatment. This is done because
for code generated AVRO classes
+ * (only HoodieMetadataRecord), the avro record is a SpecificBaseRecord type
instead of a GenericRecord.
+ * SpecificBaseRecord throws null pointer exception for record.get(name) if
name is not present in the schema of the
+ * record (which happens when converting a SpecificBaseRecord without
hoodie_metadata_fields to a new record with.
*/
- public static GenericRecord
rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) {
- return rewrite(record, new LinkedHashSet<>(newSchema.getFields()),
newSchema);
- }
-
- private static GenericRecord rewrite(GenericRecord record,
LinkedHashSet<Field> fieldsToWrite, Schema newSchema) {
+ public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema
newSchema) {
GenericRecord newRecord = new GenericData.Record(newSchema);
- for (Schema.Field f : fieldsToWrite) {
- if (record.get(f.name()) == null) {
+ boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase;
+ for (Schema.Field f : newSchema.getFields()) {
+ if (!isMetadataField(f.name()) && oldRecord.get(f.name()) == null) {
+ // if not metadata field, set defaults
if (f.defaultVal() instanceof JsonProperties.Null) {
newRecord.put(f.name(), null);
} else {
newRecord.put(f.name(), f.defaultVal());
}
- } else {
- newRecord.put(f.name(), record.get(f.name()));
+ } else if (!isMetadataField(f.name())) {
+ // if not metadata field, copy old value
+ newRecord.put(f.name(), oldRecord.get(f.name()));
+ } else if (!isSpecificRecord) {
+ // if not specific record, copy value for hoodie metadata fields as
well
+ newRecord.put(f.name(), oldRecord.get(f.name()));
}
}
if (!GenericData.get().validate(newSchema, newRecord)) {
throw new SchemaCompatibilityException(
- "Unable to validate the rewritten record " + record + " against
schema " + newSchema);
+ "Unable to validate the rewritten record " + oldRecord + " against
schema " + newSchema);
}
return newRecord;
}
- /**
- * Generates a super set of fields from both old and new schema.
- */
- private static LinkedHashSet<Field> getCombinedFieldsToWrite(Schema
oldSchema, Schema newSchema) {
- LinkedHashSet<Field> allFields = new
LinkedHashSet<>(oldSchema.getFields());
- for (Schema.Field f : newSchema.getFields()) {
- if (!allFields.contains(f) && !isMetadataField(f.name())) {
- allFields.add(f);
+ private static void addAllFieldsExcludingMetadataFields(Schema schema,
List<Field> fieldsList) {
Review comment:
private, static and used only at one place. Simpler to inline this
method?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]