alexeykudinkin commented on code in PR #5376:
URL: https://github.com/apache/hudi/pull/5376#discussion_r855624045


##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -741,10 +765,23 @@ private static Object rewriteRecordWithNewSchema(Object 
oldRecord, Schema oldSch
 
         for (int i = 0; i < fields.size(); i++) {
           Schema.Field field = fields.get(i);
+          String fieldName = field.name();
+          fieldNames.push(fieldName);
           if (oldSchema.getField(field.name()) != null) {
             Schema.Field oldField = oldSchema.getField(field.name());
-            helper.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema()));
+            helper.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+          } else {
+            String fieldFullName = createFullName(fieldNames);
+            String[] colNamePartsFromOldSchema = 
renameCols.getOrDefault(fieldFullName, "").split("\\.");

Review Comment:
   We don't need to split actually, we just need to find the part after the 
last "." (will reduce amount of memory churn)



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -405,6 +407,14 @@ public static GenericRecord 
rewriteRecordWithMetadata(GenericRecord genericRecor
     return newRecord;
   }
 
+  // TODO Unify the logical of rewriteRecordWithMetadata and 
rewriteEvolutionRecordWithMetadata, and delete this function.
+  public static GenericRecord rewriteEvolutionRecordWithMetadata(GenericRecord 
genericRecord, Schema newSchema, String fileName) {
+    GenericRecord newRecord = 
HoodieAvroUtils.rewriteRecordWithNewSchema(genericRecord, newSchema, new 
HashMap<>());

Review Comment:
   @xiarixiaoyao in general instead of doing `new HashMap` let's do 
`Collections.emptyMap` to avoid allocating any unnecessary objects on the 
hot-path



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -765,27 +802,41 @@ private static Object rewriteRecordWithNewSchema(Object 
oldRecord, Schema oldSch
         }
         Collection array = (Collection)oldRecord;
         List<Object> newArray = new ArrayList();
+        fieldNames.push("element");
         for (Object element : array) {
-          newArray.add(rewriteRecordWithNewSchema(element, 
oldSchema.getElementType(), newSchema.getElementType()));
+          newArray.add(rewriteRecordWithNewSchema(element, 
oldSchema.getElementType(), newSchema.getElementType(), renameCols, 
fieldNames));
         }
+        fieldNames.pop();
         return newArray;
       case MAP:
         if (!(oldRecord instanceof Map)) {
           throw new IllegalArgumentException("cannot rewrite record with 
different type");
         }
         Map<Object, Object> map = (Map<Object, Object>) oldRecord;
         Map<Object, Object> newMap = new HashMap<>();
+        fieldNames.push("value");

Review Comment:
   Please make all these static constants



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java:
##########
@@ -131,12 +150,15 @@ private List<Types.Field> 
buildRecordType(List<Types.Field> oldFields, List<Type
   private Types.Field dealWithRename(int fieldId, Type newType, Types.Field 
oldField) {
     Types.Field fieldFromFileSchema = fileSchema.findField(fieldId);
     String nameFromFileSchema = fieldFromFileSchema.name();
+    String nameFromQuerySchema = querySchema.findField(fieldId).name();
     Type typeFromFileSchema = fieldFromFileSchema.type();
     // Current design mechanism guarantees nestedType change is not allowed, 
so no need to consider.
     if (newType.isNestedType()) {
-      return Types.Field.get(oldField.fieldId(), oldField.isOptional(), 
nameFromFileSchema, newType, oldField.doc());
+      return Types.Field.get(oldField.fieldId(), oldField.isOptional(),
+          useColNameFromFileSchema ? nameFromFileSchema : nameFromQuerySchema, 
newType, oldField.doc());

Review Comment:
   Please inline as a var and reuse



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -379,7 +380,7 @@ private void processDataBlock(HoodieDataBlock dataBlock, 
Option<KeySpec> keySpec
       Option<Schema> schemaOption = getMergedSchema(dataBlock);
       while (recordIterator.hasNext()) {
         IndexedRecord currentRecord = recordIterator.next();
-        IndexedRecord record = schemaOption.isPresent() ? 
HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get()) : 
currentRecord;
+        IndexedRecord record = schemaOption.isPresent() ? 
HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), 
new HashMap<>()) : currentRecord;

Review Comment:
   Ditto here and everywhere



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala:
##########
@@ -445,28 +445,19 @@ class TestSpark3DDL extends TestHoodieSqlBase {
             Seq(null),
             Seq(Map("t1" -> 10.0d))
           )
+          spark.sql(s"alter table ${tableName} rename column members to mem")

Review Comment:
   Let's in addition to these ones add tests for record rewriting utils in 
`HoodieAvroUtils`



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -719,14 +729,28 @@ public static Object getRecordColumnValues(HoodieRecord<? 
extends HoodieRecordPa
    *
    * @param oldRecord oldRecord to be rewritten
    * @param newSchema newSchema used to rewrite oldRecord
+   * @param renameCols a map store all rename cols, (k, v)-> 
(colNameFromNewSchema, colNameFromOldSchema)
    * @return newRecord for new Schema
    */
-  public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord 
oldRecord, Schema newSchema) {
-    Object newRecord = rewriteRecordWithNewSchema(oldRecord, 
oldRecord.getSchema(), newSchema);
+  public static GenericRecord rewriteRecordWithNewSchema(IndexedRecord 
oldRecord, Schema newSchema, Map<String, String> renameCols) {
+    Object newRecord = rewriteRecordWithNewSchema(oldRecord, 
oldRecord.getSchema(), newSchema, renameCols, new LinkedList<>());

Review Comment:
   Would suggest to use `ArrayDeque` instead (it's more performant than 
`LinkedList` under most loads)



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -741,10 +765,23 @@ private static Object rewriteRecordWithNewSchema(Object 
oldRecord, Schema oldSch
 
         for (int i = 0; i < fields.size(); i++) {
           Schema.Field field = fields.get(i);
+          String fieldName = field.name();
+          fieldNames.push(fieldName);
           if (oldSchema.getField(field.name()) != null) {
             Schema.Field oldField = oldSchema.getField(field.name());
-            helper.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema()));
+            helper.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+          } else {
+            String fieldFullName = createFullName(fieldNames);
+            String[] colNamePartsFromOldSchema = 
renameCols.getOrDefault(fieldFullName, "").split("\\.");
+            String lastColNameFromOldSchema = 
colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1];

Review Comment:
   nit: `fieldNameFromOldSchema`



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -741,10 +765,23 @@ private static Object rewriteRecordWithNewSchema(Object 
oldRecord, Schema oldSch
 
         for (int i = 0; i < fields.size(); i++) {
           Schema.Field field = fields.get(i);
+          String fieldName = field.name();
+          fieldNames.push(fieldName);
           if (oldSchema.getField(field.name()) != null) {
             Schema.Field oldField = oldSchema.getField(field.name());
-            helper.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema()));
+            helper.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));

Review Comment:
   Why do we need `helper`? We can just insert into the target record right 
away, right?



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -741,10 +765,23 @@ private static Object rewriteRecordWithNewSchema(Object 
oldRecord, Schema oldSch
 
         for (int i = 0; i < fields.size(); i++) {
           Schema.Field field = fields.get(i);
+          String fieldName = field.name();
+          fieldNames.push(fieldName);
           if (oldSchema.getField(field.name()) != null) {
             Schema.Field oldField = oldSchema.getField(field.name());
-            helper.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema()));
+            helper.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+          } else {
+            String fieldFullName = createFullName(fieldNames);
+            String[] colNamePartsFromOldSchema = 
renameCols.getOrDefault(fieldFullName, "").split("\\.");
+            String lastColNameFromOldSchema = 
colNamePartsFromOldSchema[colNamePartsFromOldSchema.length - 1];
+            // deal with rename
+            if (oldSchema.getField(field.name()) == null && 
oldSchema.getField(lastColNameFromOldSchema) != null) {
+              // find rename
+              Schema.Field oldField = 
oldSchema.getField(lastColNameFromOldSchema);
+              helper.put(i, 
rewriteRecordWithNewSchema(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema(), renameCols, fieldNames));
+            }
           }
+          fieldNames.pop();
         }
         GenericData.Record newRecord = new GenericData.Record(newSchema);
         for (int i = 0; i < fields.size(); i++) {

Review Comment:
   Please check my comment above, we can do this while we iterate over the 
fields directly to do it in a single loop



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -765,27 +802,41 @@ private static Object rewriteRecordWithNewSchema(Object 
oldRecord, Schema oldSch
         }
         Collection array = (Collection)oldRecord;
         List<Object> newArray = new ArrayList();
+        fieldNames.push("element");
         for (Object element : array) {
-          newArray.add(rewriteRecordWithNewSchema(element, 
oldSchema.getElementType(), newSchema.getElementType()));
+          newArray.add(rewriteRecordWithNewSchema(element, 
oldSchema.getElementType(), newSchema.getElementType(), renameCols, 
fieldNames));
         }
+        fieldNames.pop();
         return newArray;
       case MAP:
         if (!(oldRecord instanceof Map)) {
           throw new IllegalArgumentException("cannot rewrite record with 
different type");
         }
         Map<Object, Object> map = (Map<Object, Object>) oldRecord;
         Map<Object, Object> newMap = new HashMap<>();
+        fieldNames.push("value");
         for (Map.Entry<Object, Object> entry : map.entrySet()) {
-          newMap.put(entry.getKey(), 
rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), 
newSchema.getValueType()));
+          newMap.put(entry.getKey(), 
rewriteRecordWithNewSchema(entry.getValue(), oldSchema.getValueType(), 
newSchema.getValueType(), renameCols, fieldNames));
         }
+        fieldNames.pop();
         return newMap;
       case UNION:
-        return rewriteRecordWithNewSchema(oldRecord, 
getActualSchemaFromUnion(oldSchema, oldRecord), 
getActualSchemaFromUnion(newSchema, oldRecord));
+        return rewriteRecordWithNewSchema(oldRecord, 
getActualSchemaFromUnion(oldSchema, oldRecord), 
getActualSchemaFromUnion(newSchema, oldRecord), renameCols, fieldNames);
       default:
         return rewritePrimaryType(oldRecord, oldSchema, newSchema);
     }
   }
 
+  private static String createFullName(Deque<String> fieldNames) {
+    String result = "";
+    if (!fieldNames.isEmpty()) {
+      List<String> parentNames = new ArrayList<>();
+      fieldNames.descendingIterator().forEachRemaining(parentNames::add);

Review Comment:
   You don't need additional list, you can just iterate over deque itself



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -379,7 +380,7 @@ private void processDataBlock(HoodieDataBlock dataBlock, 
Option<KeySpec> keySpec
       Option<Schema> schemaOption = getMergedSchema(dataBlock);
       while (recordIterator.hasNext()) {
         IndexedRecord currentRecord = recordIterator.next();
-        IndexedRecord record = schemaOption.isPresent() ? 
HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get()) : 
currentRecord;
+        IndexedRecord record = schemaOption.isPresent() ? 
HoodieAvroUtils.rewriteRecordWithNewSchema(currentRecord, schemaOption.get(), 
new HashMap<>()) : currentRecord;

Review Comment:
   Same comment regarding `HashMap`



##########
hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java:
##########
@@ -741,10 +765,23 @@ private static Object rewriteRecordWithNewSchema(Object 
oldRecord, Schema oldSch
 
         for (int i = 0; i < fields.size(); i++) {

Review Comment:
   Why not just iterating over `fields` themselves?



##########
hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java:
##########
@@ -349,7 +349,7 @@ public void testReWriteNestRecord() {
     );
 
     Schema newAvroSchema = AvroInternalSchemaConverter.convert(newRecord, 
schema.getName());
-    GenericRecord newAvroRecord = 
HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema);
+    GenericRecord newAvroRecord = 
HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new 
HashMap<>());

Review Comment:
   Here as well



##########
hudi-common/src/test/java/org/apache/hudi/internal/schema/utils/TestAvroSchemaEvolutionUtils.java:
##########
@@ -284,7 +284,7 @@ public void testReWriteRecordWithTypeChanged() {
         .updateColumnType("col6", Types.StringType.get());
     InternalSchema newSchema = 
SchemaChangeUtils.applyTableChanges2Schema(internalSchema, updateChange);
     Schema newAvroSchema = AvroInternalSchemaConverter.convert(newSchema, 
avroSchema.getName());
-    GenericRecord newRecord = 
HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema);
+    GenericRecord newRecord = 
HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, newAvroSchema, new 
HashMap<>());

Review Comment:
   Same comment as above



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java:
##########
@@ -267,4 +267,20 @@ public static String createFullName(String name, 
Deque<String> fieldNames) {
     }
     return result;
   }
+
+  /**
+   * Try to find all renamed cols between oldSchema and newSchema.
+   *
+   * @param oldSchema oldSchema
+   * @param newSchema newSchema which modified from oldSchema
+   * @return renameCols Map. (k, v) -> (colNameFromNewSchema, 
colNameFromOldSchema)
+   */
+  public static Map<String, String> collectRenameCols(InternalSchema 
oldSchema, InternalSchema newSchema) {
+    List<String> colNamesFromWriteSchema = oldSchema.getAllColsFullName();
+    return colNamesFromWriteSchema.stream().filter(f -> {
+      int filedIdFromWriteSchema = oldSchema.findIdByName(f);
+      // try to find the cols which has the same id, but have different 
colName;
+      return newSchema.getAllIds().contains(filedIdFromWriteSchema) && 
!newSchema.findfullName(filedIdFromWriteSchema).equalsIgnoreCase(f);

Review Comment:
   Instead of duplicating the code just do a `map` first, where you map the 
name if it's a rename, otherwise return null, then filter all nulls



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java:
##########
@@ -48,6 +48,25 @@ public class InternalSchemaMerger {
   // we can pass decimalType to reWriteRecordWithNewSchema directly, 
everything is ok.
   private boolean useColumnTypeFromFileSchema = true;
 
+  // deal with rename
+  // Whether to use column name from file schema to read files when we find 
some column name has changed.
+  // spark parquetReader need the original column name to read data, otherwise 
the parquetReader will read nothing.
+  // eg: current column name is colOldName, now we rename it to colNewName,
+  // we should not pass colNewName to parquetReader, we must pass colOldName 
to it; when we read out the data.
+  // for log reader
+  // since our reWriteRecordWithNewSchema function support rewrite directly, 
so we no need this parameter
+  // eg: current column name is colOldName, now we rename it to colNewName,
+  // we can pass colNewName to reWriteRecordWithNewSchema directly, everything 
is ok.
+  private boolean useColNameFromFileSchema = true;
+
+  public InternalSchemaMerger(InternalSchema fileSchema, InternalSchema 
querySchema, boolean ignoreRequiredAttribute, boolean 
useColumnTypeFromFileSchema, boolean useColNameFromFileSchema) {

Review Comment:
   Please chain ctors (ie one invokes another), to avoid duplication



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to