codope commented on a change in pull request #2982:
URL: https://github.com/apache/hudi/pull/2982#discussion_r794524309



##########
File path: 
hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
##########
@@ -236,4 +237,81 @@ public void testGetNestedFieldVal() {
     }
   }
 
+  @Test
+  public void testRewriteToEvolvedNestedRecord() throws Exception {

Review comment:
       remove `Exception` here and below?

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -326,18 +327,86 @@ public static GenericRecord rewriteRecord(GenericRecord 
oldRecord, Schema newSch
     return newRecord;
   }
 
-  private static void copyOldValueOrSetDefault(GenericRecord oldRecord, 
GenericRecord newRecord, Schema.Field f) {
+  private static void copyOldValueOrSetDefault(GenericRecord oldRecord, 
GenericRecord newRecord, Schema.Field newField) {
     // cache the result of oldRecord.get() to save CPU expensive hash lookup
     Schema oldSchema = oldRecord.getSchema();
-    Object fieldValue = oldSchema.getField(f.name()) == null ? null : 
oldRecord.get(f.name());
-    if (fieldValue == null) {
-      if (f.defaultVal() instanceof JsonProperties.Null) {
-        newRecord.put(f.name(), null);
+    String fieldName = newField.name();
+    if (oldRecord.get(fieldName) == null) {
+      setDefaultVal(newRecord, newField);
+    } else {
+      if (newField.schema().equals(oldSchema.getField(fieldName).schema())) {
+        newRecord.put(fieldName, oldRecord.get(fieldName));
       } else {
-        newRecord.put(f.name(), f.defaultVal());
+        newRecord.put(fieldName, 
rewriteEvolvedFields(oldRecord.get(fieldName), newField.schema()));
       }
+    }
+  }
+
+  private static void setDefaultVal(GenericRecord newRecord, Schema.Field f) {
+    if (f.defaultVal() instanceof JsonProperties.Null) {
+      newRecord.put(f.name(), null);
     } else {
-      newRecord.put(f.name(), fieldValue);
+      newRecord.put(f.name(), f.defaultVal());
+    }
+  }
+
+  /*
+   * <pre>
+   *  OldRecord:                     NewRecord:
+   *      field1 : String                field1 : String
+   *      field2 : record                field2 : record
+   *         field_21 : string              field_21 : string
+   *         field_22 : Integer             field_22 : Integer
+   *      field3: Integer                   field_23 : String
+   *                                        field_24 : Integer
+   *                                     field3: Integer
+   * </pre>
+   * <p>
+   *  When a nested record has changed/evolved, newRecord.put(field2, 
oldRecord.get(field2)), is not sufficient.
+   *  Requires a deep-copy/rewrite of the evolved field.
+   */
+  private static Object rewriteEvolvedFields(Object datum, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(datum instanceof GenericRecord)) {
+          return datum;
+        }
+        GenericRecord record = (GenericRecord) datum;
+        // if schema of the record being rewritten does not match
+        // with the new schema, some nested records with schema change
+        // will require rewrite.
+        if (!record.getSchema().equals(newSchema)) {
+          GenericRecord newRecord = new GenericData.Record(newSchema);
+          for (Schema.Field f : newSchema.getFields()) {
+            if (record.get(f.name()) == null) {
+              setDefaultVal(newRecord, f);
+            } else {
+              newRecord.put(f.name(), 
rewriteEvolvedFields(record.get(f.name()), f.schema()));
+            }
+          }
+          return newRecord;
+        }
+        return datum;
+      case UNION:
+        Integer idx = (newSchema.getTypes().get(0).getType() == 
Schema.Type.NULL) ? 1 : 0;

Review comment:
       instead of wrapper class, can we use the primitive type int here?

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
##########
@@ -236,4 +237,81 @@ public void testGetNestedFieldVal() {
     }
   }
 
+  @Test
+  public void testRewriteToEvolvedNestedRecord() throws Exception {
+    // schema definition for inner record
+    Schema nestedSchema = 
SchemaBuilder.record("inner_rec").fields().requiredDouble("color_id").endRecord();
+    Schema evolvedNestedSchema = 
SchemaBuilder.record("inner_rec").fields().requiredDouble("color_id")
+        .optionalString("color_name").endRecord();
+
+    // schema definition for outer record
+    Schema recordSchema = 
SchemaBuilder.record("outer_rec").fields().requiredDouble("timestamp")
+        
.requiredString("_row_key").requiredString("non_pii_col").name("color_rec").type(nestedSchema)
+        .noDefault().requiredString("pii_col").endRecord();
+    Schema evolvedRecordSchema = 
SchemaBuilder.record("outer_rec").fields().requiredDouble("timestamp")
+        
.requiredString("_row_key").requiredString("non_pii_col").name("color_rec").type(evolvedNestedSchema)
+        .noDefault().requiredString("pii_col").endRecord();
+
+    // populate inner record, with fewer fields
+    GenericRecord nestedRec = new GenericData.Record(nestedSchema);
+    nestedRec.put("color_id", 55.5);
+
+    // populate outer record
+    GenericRecord rec = new GenericData.Record(recordSchema);
+    rec.put("timestamp", 3.5);
+    rec.put("_row_key", "key1");
+    rec.put("non_pii_col", "val1");
+    rec.put("color_rec", nestedRec);
+    rec.put("pii_col", "val2");
+
+    // rewrite record with less number of fields into an evolved record (with 
optional fields added).
+    try {
+      GenericRecord newRecord = HoodieAvroUtils.rewriteRecord(rec, 
evolvedRecordSchema);
+      assertEquals("val2", newRecord.get("pii_col"));
+      assertEquals(null, 
((GenericRecord)newRecord.get("color_rec")).get("color_name"));
+    } catch (Exception e) {
+      e.printStackTrace();

Review comment:
       let's remove `printStackTrace` here and below?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to