danny0405 commented on a change in pull request #2309:
URL: https://github.com/apache/hudi/pull/2309#discussion_r546694715



##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -308,17 +309,79 @@ public static GenericRecord 
rewriteRecordWithOnlyNewSchemaFields(GenericRecord r
     return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), 
newSchema);
   }
 
+  private static void setDefaultVal(GenericRecord newRecord, Schema.Field f) {
+    if (f.defaultVal() instanceof JsonProperties.Null) {
+      newRecord.put(f.name(), null);
+    } else {
+      newRecord.put(f.name(), f.defaultVal());
+    }
+  }
+
+  /*
+   *  OldRecord:                     NewRecord:
+   *      field1 : String                field1 : String

Review comment:
       We can enclose the `/* OldRecord*/` part with token `<pre></pre>` to 
have better readability.

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -308,17 +309,79 @@ public static GenericRecord 
rewriteRecordWithOnlyNewSchemaFields(GenericRecord r
     return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), 
newSchema);
   }
 
+  private static void setDefaultVal(GenericRecord newRecord, Schema.Field f) {
+    if (f.defaultVal() instanceof JsonProperties.Null) {
+      newRecord.put(f.name(), null);
+    } else {
+      newRecord.put(f.name(), f.defaultVal());
+    }
+  }
+
+  /*
+   *  OldRecord:                     NewRecord:
+   *      field1 : String                field1 : String
+   *      field2 : record                field2 : record
+   *         field_21 : string              field_21 : string
+   *         field_22 : Integer             field_22 : Integer
+   *      field3: Integer                   field_23 : String
+   *                                       field_24 : Integer
+   *                                     field3: Integer
+   *
+   *  When a nested record has changed/evolved, newRecord.put(field2, 
oldRecord.get(field2)), is not sufficient.
+   *  Requires a deep-copy/rewrite of the evolved field.

Review comment:
       A new paragraph should start with token `<p>`.

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -308,17 +309,79 @@ public static GenericRecord 
rewriteRecordWithOnlyNewSchemaFields(GenericRecord r
     return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), 
newSchema);
   }
 
+  private static void setDefaultVal(GenericRecord newRecord, Schema.Field f) {
+    if (f.defaultVal() instanceof JsonProperties.Null) {
+      newRecord.put(f.name(), null);
+    } else {
+      newRecord.put(f.name(), f.defaultVal());
+    }
+  }
+
+  /*
+   *  OldRecord:                     NewRecord:
+   *      field1 : String                field1 : String
+   *      field2 : record                field2 : record
+   *         field_21 : string              field_21 : string
+   *         field_22 : Integer             field_22 : Integer
+   *      field3: Integer                   field_23 : String
+   *                                       field_24 : Integer
+   *                                     field3: Integer
+   *
+   *  When a nested record has changed/evolved, newRecord.put(field2, 
oldRecord.get(field2)), is not sufficient.
+   *  Requires a deep-copy/rewrite of the evolved field.
+   */
+  private static Object rewriteEvolvedFields(Object datum, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(datum instanceof GenericRecord)) {
+          return datum;
+        }
+        GenericRecord record = (GenericRecord) datum;
+        // if schema of the record being rewritten does not match
+        // with the new schema, some nested records with schema change
+        // will require rewrite.
+        if (!record.getSchema().equals(newSchema)) {
+          GenericRecord newRecord = new GenericData.Record(newSchema);
+          for (Schema.Field f : newSchema.getFields()) {
+            if (record.get(f.name()) == null) {
+              setDefaultVal(newRecord, f);
+            } else {
+              newRecord.put(f.name(), 
rewriteEvolvedFields(record.get(f.name()), f.schema()));
+            }
+          }
+          return newRecord;
+        }
+        return datum;
+      case UNION:
+        Integer idx = (newSchema.getTypes().get(0).getType() == 
Schema.Type.NULL) ? 1 : 0;
+        return rewriteEvolvedFields(datum, newSchema.getTypes().get(idx));

Review comment:
       Yeah, for `UNION` i think the check in the nested recursion is valid.

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
##########
@@ -207,4 +208,82 @@ public void testAddingAndRemovingMetadataFields() {
     Schema schemaWithoutMetaCols = 
HoodieAvroUtils.removeMetadataFields(schemaWithMetaCols);
     assertEquals(schemaWithoutMetaCols.getFields().size(), 
NUM_FIELDS_IN_EXAMPLE_SCHEMA);
   }
+
+  @Test
+  public void testRewriteToEvolvedNestedRecord() throws Exception {
+    // schema definition for inner record
+    Schema nestedSchema = 
SchemaBuilder.record("inner_rec").fields().requiredDouble("color_id").endRecord();

Review comment:
       The test is good, although it misses the `ARRAY` and `MAP` type check, i 
think it is okey.

##########
File path: hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
##########
@@ -308,17 +309,79 @@ public static GenericRecord 
rewriteRecordWithOnlyNewSchemaFields(GenericRecord r
     return rewrite(record, new LinkedHashSet<>(newSchema.getFields()), 
newSchema);
   }
 
+  private static void setDefaultVal(GenericRecord newRecord, Schema.Field f) {
+    if (f.defaultVal() instanceof JsonProperties.Null) {
+      newRecord.put(f.name(), null);
+    } else {
+      newRecord.put(f.name(), f.defaultVal());
+    }
+  }
+
+  /*
+   *  OldRecord:                     NewRecord:
+   *      field1 : String                field1 : String
+   *      field2 : record                field2 : record
+   *         field_21 : string              field_21 : string
+   *         field_22 : Integer             field_22 : Integer
+   *      field3: Integer                   field_23 : String
+   *                                       field_24 : Integer
+   *                                     field3: Integer
+   *
+   *  When a nested record has changed/evolved, newRecord.put(field2, 
oldRecord.get(field2)), is not sufficient.
+   *  Requires a deep-copy/rewrite of the evolved field.
+   */
+  private static Object rewriteEvolvedFields(Object datum, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(datum instanceof GenericRecord)) {
+          return datum;
+        }
+        GenericRecord record = (GenericRecord) datum;
+        // if schema of the record being rewritten does not match
+        // with the new schema, some nested records with schema change
+        // will require rewrite.
+        if (!record.getSchema().equals(newSchema)) {
+          GenericRecord newRecord = new GenericData.Record(newSchema);
+          for (Schema.Field f : newSchema.getFields()) {
+            if (record.get(f.name()) == null) {
+              setDefaultVal(newRecord, f);
+            } else {
+              newRecord.put(f.name(), 
rewriteEvolvedFields(record.get(f.name()), f.schema()));
+            }
+          }
+          return newRecord;
+        }
+        return datum;
+      case UNION:
+        Integer idx = (newSchema.getTypes().get(0).getType() == 
Schema.Type.NULL) ? 1 : 0;
+        return rewriteEvolvedFields(datum, newSchema.getTypes().get(idx));
+      case ARRAY:
+        List<Object> arrayValue = (List)datum;
+        List<Object> arrayCopy = new GenericData.Array<Object>(

Review comment:
       The force type coercion `(List)datum` does not have any protection 
logic, thus, the method `rewriteEvolvedFields(Object datum, Schema newSchema)` 
works assuming that the `datum` has compatible schema with `newSchema`, this 
implicit contract should be kept by the invoker, i would suggest to add some 
notion to the java doc to make it more clear. Same with `Map` type.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to