prashantwason commented on a change in pull request #2424:
URL: https://github.com/apache/hudi/pull/2424#discussion_r555258349



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -292,53 +284,57 @@ public static GenericRecord stitchRecords(GenericRecord 
left, GenericRecord righ
     return result;
   }
 
-  /**
-   * Given a avro record with a given schema, rewrites it into the new schema 
while setting fields only from the old
-   * schema.
-   */
-  public static GenericRecord rewriteRecord(GenericRecord record, Schema 
newSchema) {
-    return rewrite(record, getCombinedFieldsToWrite(record.getSchema(), 
newSchema), newSchema);
-  }
-
   /**
    * Given a avro record with a given schema, rewrites it into the new schema 
while setting fields only from the new
    * schema.
+   * NOTE: Here, the assumption is that you cannot go from an evolved schema 
(schema with (N) fields)
+   * to an older schema (schema with (N-1) fields). All fields present in the 
older record schema MUST be present in the
+   * new schema and the default/existing values are carried over.
+   * This particular method does the following things :
+   * a) Create a new empty GenericRecord with the new schema.
+   * b) Set default values for all fields of this transformed schema in the 
new GenericRecord or copy over the data
+   * from the old schema to the new schema
+   * c) hoodie_metadata_fields have a special treatment. This is done because 
for code generated AVRO classes
+   * (only HoodieMetadataRecord), the avro record is a SpecificBaseRecord type 
instead of a GenericRecord.
+   * SpecificBaseRecord throws null pointer exception for record.get(name) if 
name is not present in the schema of the
+   * record (which happens when converting a SpecificBaseRecord without 
hoodie_metadata_fields to a new record with.
    */
-  public static GenericRecord 
rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) {
-    return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), 
newSchema);
-  }
-
-  private static GenericRecord rewrite(GenericRecord record, 
LinkedHashSet<Field> fieldsToWrite, Schema newSchema) {
+  public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema 
newSchema) {
     GenericRecord newRecord = new GenericData.Record(newSchema);
-    for (Schema.Field f : fieldsToWrite) {
-      if (record.get(f.name()) == null) {
+    boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase;
+    for (Schema.Field f : newSchema.getFields()) {
+      if (!isMetadataField(f.name()) && oldRecord.get(f.name()) == null) {
+        // if not metadata field, set defaults
         if (f.defaultVal() instanceof JsonProperties.Null) {
           newRecord.put(f.name(), null);
         } else {
           newRecord.put(f.name(), f.defaultVal());
         }
-      } else {
-        newRecord.put(f.name(), record.get(f.name()));
+      } else if (!isMetadataField(f.name())) {
+        // if not metadata field, copy old value
+        newRecord.put(f.name(), oldRecord.get(f.name()));
+      } else if (!isSpecificRecord) {
+        // if not specific record, copy value for hoodie metadata fields as 
well

Review comment:
       This seems counter-intuitive to the comment in the method. 
   
   If SpecificRecord.get() throws NULL exception if the field is not there, 
wont we want to populate the metadata fields for it?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -292,53 +284,57 @@ public static GenericRecord stitchRecords(GenericRecord 
left, GenericRecord righ
     return result;
   }
 
-  /**
-   * Given a avro record with a given schema, rewrites it into the new schema 
while setting fields only from the old
-   * schema.
-   */
-  public static GenericRecord rewriteRecord(GenericRecord record, Schema 
newSchema) {
-    return rewrite(record, getCombinedFieldsToWrite(record.getSchema(), 
newSchema), newSchema);
-  }
-
   /**
    * Given a avro record with a given schema, rewrites it into the new schema 
while setting fields only from the new
    * schema.
+   * NOTE: Here, the assumption is that you cannot go from an evolved schema 
(schema with (N) fields)
+   * to an older schema (schema with (N-1) fields). All fields present in the 
older record schema MUST be present in the
+   * new schema and the default/existing values are carried over.
+   * This particular method does the following things :
+   * a) Create a new empty GenericRecord with the new schema.
+   * b) Set default values for all fields of this transformed schema in the 
new GenericRecord or copy over the data
+   * from the old schema to the new schema
+   * c) hoodie_metadata_fields have a special treatment. This is done because 
for code generated AVRO classes
+   * (only HoodieMetadataRecord), the avro record is a SpecificBaseRecord type 
instead of a GenericRecord.
+   * SpecificBaseRecord throws null pointer exception for record.get(name) if 
name is not present in the schema of the
+   * record (which happens when converting a SpecificBaseRecord without 
hoodie_metadata_fields to a new record with.
    */
-  public static GenericRecord 
rewriteRecordWithOnlyNewSchemaFields(GenericRecord record, Schema newSchema) {
-    return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), 
newSchema);
-  }
-
-  private static GenericRecord rewrite(GenericRecord record, 
LinkedHashSet<Field> fieldsToWrite, Schema newSchema) {
+  public static GenericRecord rewriteRecord(GenericRecord oldRecord, Schema 
newSchema) {
     GenericRecord newRecord = new GenericData.Record(newSchema);
-    for (Schema.Field f : fieldsToWrite) {
-      if (record.get(f.name()) == null) {
+    boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase;
+    for (Schema.Field f : newSchema.getFields()) {
+      if (!isMetadataField(f.name()) && oldRecord.get(f.name()) == null) {
+        // if not metadata field, set defaults
         if (f.defaultVal() instanceof JsonProperties.Null) {
           newRecord.put(f.name(), null);
         } else {
           newRecord.put(f.name(), f.defaultVal());
         }
-      } else {
-        newRecord.put(f.name(), record.get(f.name()));
+      } else if (!isMetadataField(f.name())) {
+        // if not metadata field, copy old value
+        newRecord.put(f.name(), oldRecord.get(f.name()));
+      } else if (!isSpecificRecord) {
+        // if not specific record, copy value for hoodie metadata fields as 
well
+        newRecord.put(f.name(), oldRecord.get(f.name()));
       }
     }
     if (!GenericData.get().validate(newSchema, newRecord)) {
       throw new SchemaCompatibilityException(
-          "Unable to validate the rewritten record " + record + " against 
schema " + newSchema);
+          "Unable to validate the rewritten record " + oldRecord + " against 
schema " + newSchema);
     }
     return newRecord;
   }
 
-  /**
-   * Generates a super set of fields from both old and new schema.
-   */
-  private static LinkedHashSet<Field> getCombinedFieldsToWrite(Schema 
oldSchema, Schema newSchema) {
-    LinkedHashSet<Field> allFields = new 
LinkedHashSet<>(oldSchema.getFields());
-    for (Schema.Field f : newSchema.getFields()) {
-      if (!allFields.contains(f) && !isMetadataField(f.name())) {
-        allFields.add(f);
+  private static void addAllFieldsExcludingMetadataFields(Schema schema, 
List<Field> fieldsList) {

Review comment:
       private, static and used only at one place. Simpler to inline this 
method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to