bvaradar commented on a change in pull request #4910:
URL: https://github.com/apache/hudi/pull/4910#discussion_r829107917



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaUtil.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import static org.apache.avro.Schema.Type.UNION;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Conversions;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class AvroSchemaUtil {
+  private AvroSchemaUtil() {
+  }
+
+  private static final long MILLIS_PER_DAY = 86400000L;
+
+  //Export for test
+  public static final Conversions.DecimalConversion DECIMAL_CONVERSION = new 
Conversions.DecimalConversion();
+
+  /**
+   * Given a avro record with a given schema, rewrites it into the new schema 
while setting fields only from the new schema.
+   * support deep rewrite for nested record.
+   * This particular method does the following things :
+   * a) Create a new empty GenericRecord with the new schema.
+   * b) For GenericRecord, copy over the data from the old schema to the new 
schema or set default values for all fields of this transformed schema
+   *
+   * @param oldRecord oldRecord to be rewrite
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @return newRecord for new Schema
+   */
+  public static GenericRecord rewriteRecord(IndexedRecord oldRecord, Schema 
newSchema) {
+    Object newRecord = rewriteRecord(oldRecord, oldRecord.getSchema(), 
newSchema);
+    return (GenericData.Record) newRecord;
+  }
+
+  private static Object rewriteRecord(Object oldRecord, Schema oldSchema, 
Schema newSchema) {
+    if (oldRecord == null) {
+      return null;
+    }
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(oldRecord instanceof IndexedRecord)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
+        List<Schema.Field> fields = newSchema.getFields();
+        Map<Integer, Object> helper = new HashMap<>();
+
+        for (int i = 0; i < fields.size(); i++) {
+          Schema.Field field = fields.get(i);
+          if (oldSchema.getField(field.name()) != null) {
+            Schema.Field oldField = oldSchema.getField(field.name());
+            helper.put(i, rewriteRecord(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema()));
+          }
+        }
+        GenericData.Record newRecord = new GenericData.Record(newSchema);
+        for (int i = 0; i < fields.size(); i++) {
+          if (helper.containsKey(i)) {
+            newRecord.put(i, helper.get(i));
+          } else {
+            if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
+              newRecord.put(i, null);
+            } else {
+              newRecord.put(i, fields.get(i).defaultVal());
+            }
+          }
+        }
+        return newRecord;
+      case ARRAY:
+        if (!(oldRecord instanceof Collection)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        Collection array = (Collection)oldRecord;
+        List<Object> newArray = new ArrayList();
+        for (Object element : array) {
+          newArray.add(rewriteRecord(element, oldSchema.getElementType(), 
newSchema.getElementType()));
+        }
+        return newArray;
+      case MAP:
+        if (!(oldRecord instanceof Map)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        Map<Object, Object> map = (Map<Object, Object>) oldRecord;
+        Map<Object, Object> newMap = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : map.entrySet()) {
+          newMap.put(entry.getKey(), rewriteRecord(entry.getValue(), 
oldSchema.getValueType(), newSchema.getValueType()));
+        }
+        return newMap;
+      case UNION:
+        return rewriteRecord(oldRecord, getActualSchemaFromUnion(oldSchema, 
oldRecord), getActualSchemaFromUnion(newSchema, oldRecord));
+      default:
+        return rewritePrimaryType(oldRecord, oldSchema, newSchema);
+    }
+  }
+
+  private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, 
Schema newSchema) {
+    Schema realOldSchema = oldSchema;
+    if (realOldSchema.getType() == UNION) {
+      realOldSchema = getActualSchemaFromUnion(oldSchema, oldValue);
+    }
+    if (realOldSchema.getType() == newSchema.getType()) {
+      switch (realOldSchema.getType()) {
+        case NULL:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BYTES:
+        case STRING:
+          return oldValue;
+        case FIXED:
+          // fixed size and name must match:
+          if (!SchemaCompatibility.schemaNameEquals(realOldSchema, newSchema) 
|| realOldSchema.getFixedSize() != newSchema.getFixedSize()) {
+            // deal with the precision change for decimalType
+            if (realOldSchema.getLogicalType() instanceof 
LogicalTypes.Decimal) {
+              final byte[] bytes;
+              bytes = ((GenericFixed) oldValue).bytes();
+              LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
realOldSchema.getLogicalType();
+              BigDecimal bd = new BigDecimal(new BigInteger(bytes), 
decimal.getScale()).setScale(((LogicalTypes.Decimal) 
newSchema.getLogicalType()).getScale());
+              return DECIMAL_CONVERSION.toFixed(bd, newSchema, 
newSchema.getLogicalType());
+            }
+          } else {
+            return oldValue;
+          }
+          return oldValue;
+        default:
+          throw new AvroRuntimeException("Unknown schema type: " + 
newSchema.getType());
+      }
+    } else {
+      return rewritePrimaryTypeWithDiffSchemaType(oldValue, realOldSchema, 
newSchema);
+    }
+  }
+
+  private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, 
Schema oldSchema, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case NULL:
+      case BOOLEAN:
+        break;
+      case INT:
+        if (newSchema.getLogicalType() == LogicalTypes.date() && 
oldSchema.getType() == Schema.Type.STRING) {
+          return fromJavaDate(Date.valueOf(oldValue.toString()));
+        }
+        break;
+      case LONG:
+        if (oldSchema.getType() == Schema.Type.INT) {
+          return ((Integer) oldValue).longValue();
+        }
+        break;
+      case FLOAT:
+        if ((oldSchema.getType() == Schema.Type.INT)
+            || (oldSchema.getType() == Schema.Type.LONG)) {
+          return oldSchema.getType() == Schema.Type.INT ? ((Integer) 
oldValue).floatValue() : ((Long) oldValue).floatValue();
+        }
+        break;
+      case DOUBLE:
+        if (oldSchema.getType() == Schema.Type.FLOAT) {
+          // java float cannot convert to double directly, deal with float 
precision change
+          return Double.valueOf(oldValue + "");
+        } else if (oldSchema.getType() == Schema.Type.INT) {
+          return ((Integer) oldValue).doubleValue();
+        } else if (oldSchema.getType() == Schema.Type.LONG) {
+          return ((Long) oldValue).doubleValue();
+        }
+        break;
+      case BYTES:
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return ((String) oldValue).getBytes(StandardCharsets.UTF_8);
+        }
+        break;
+      case STRING:
+        if (oldSchema.getType() == Schema.Type.BYTES) {
+          return String.valueOf(((byte[]) oldValue));
+        }
+        if (oldSchema.getLogicalType() == LogicalTypes.date()) {
+          return toJavaDate((Integer) oldValue).toString();
+        }
+        if (oldSchema.getType() == Schema.Type.INT
+            || oldSchema.getType() == Schema.Type.LONG
+            || oldSchema.getType() == Schema.Type.FLOAT
+            || oldSchema.getType() == Schema.Type.DOUBLE) {
+          return oldValue.toString();
+        }
+        if (oldSchema.getType() == Schema.Type.FIXED && 
oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          final byte[] bytes;
+          bytes = ((GenericFixed) oldValue).bytes();
+          LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+          BigDecimal bd = new BigDecimal(new BigInteger(bytes), 
decimal.getScale());
+          return bd.toString();
+        }
+        break;
+      case FIXED:
+        // deal with decimal Type
+        if (newSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          // TODO: support more types
+          if (oldSchema.getType() == Schema.Type.STRING
+              || oldSchema.getType() == Schema.Type.DOUBLE
+              || oldSchema.getType() == Schema.Type.INT
+              || oldSchema.getType() == Schema.Type.LONG
+              || oldSchema.getType() == Schema.Type.FLOAT) {
+            LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
newSchema.getLogicalType();
+            BigDecimal bigDecimal = null;
+            if (oldSchema.getType() == Schema.Type.STRING) {
+              bigDecimal = new java.math.BigDecimal((String) oldValue)
+                  .setScale(decimal.getScale());
+            } else {
+              // Due to Java, there will be precision problems in direct 
conversion, we should use string instead of use double
+              bigDecimal = new java.math.BigDecimal(oldValue.toString())
+                  .setScale(decimal.getScale());
+            }
+            return DECIMAL_CONVERSION.toFixed(bigDecimal, newSchema, 
newSchema.getLogicalType());
+          }
+        }
+        break;
+      default:
+    }
+    throw new AvroRuntimeException(String.format("cannot support rewrite value 
for schema type: %s since the old schema type is: %s", newSchema, oldSchema));
+  }
+
+  // convert days to Date
+  private static Date toJavaDate(int days) {
+    long localMillis = Math.multiplyExact(days, MILLIS_PER_DAY);
+    int timeZoneOffset;
+    TimeZone defaultTimeZone = TimeZone.getDefault();
+    if (defaultTimeZone instanceof sun.util.calendar.ZoneInfo) {
+      timeZoneOffset = ((sun.util.calendar.ZoneInfo) 
defaultTimeZone).getOffsetsByWall(localMillis, null);
+    } else {
+      timeZoneOffset = defaultTimeZone.getOffset(localMillis - 
defaultTimeZone.getRawOffset());
+    }
+    return new Date(localMillis - timeZoneOffset);
+  }
+
+  // convert Date to days
+  private static int fromJavaDate(Date date) {
+    long millisUtc = date.getTime();
+    long millisLocal = millisUtc + TimeZone.getDefault().getOffset(millisUtc);
+    int julianDays = Math.toIntExact(Math.floorDiv(millisLocal, 
MILLIS_PER_DAY));
+    return julianDays;
+  }
+
+  private static Schema getActualSchemaFromUnion(Schema schema, Object data) {
+    Schema actualSchema;
+    if (!schema.getType().equals(UNION)) {
+      return schema;
+    }
+    if (schema.getTypes().size() == 2
+        && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+      actualSchema = schema.getTypes().get(1);
+    } else if (schema.getTypes().size() == 2
+        && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+      actualSchema = schema.getTypes().get(0);
+    } else if (schema.getTypes().size() == 1) {
+      actualSchema = schema.getTypes().get(0);
+    } else {
+      // deal complex union. this should not happened in hoodie,
+      // since flink/spark do not write this type.
+      int i = GenericData.get().resolveUnion(schema, data);
+      actualSchema = schema.getTypes().get(i);
+    }
+    return actualSchema;
+  }
+
+  /**
+   * Given avro records, rewrites them with new schema.
+   *
+   * @param oldRecords oldRecords to be rewrite
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @return a iterator of rewrote GeneriRcords
+   */
+  public static Iterator<GenericRecord> rewriteRecords(Iterator<GenericRecord> 
oldRecords, Schema newSchema) {
+    if (oldRecords == null || newSchema == null) {
+      return Collections.emptyIterator();
+    }
+    return new Iterator<GenericRecord>() {
+      @Override
+      public boolean hasNext() {
+        return oldRecords.hasNext();
+      }
+
+      @Override
+      public GenericRecord next() {
+        return rewriteRecord(oldRecords.next(), newSchema);
+      }
+    };
+  }
+
+  /**
+   * support evolution from a new avroSchema.
+   * notice: this is not a universal method,

Review comment:
       Can you add more descriptive comments about this notice to capture when 
and why this method is used

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaUtil.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import static org.apache.avro.Schema.Type.UNION;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Conversions;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class AvroSchemaUtil {
+  private AvroSchemaUtil() {
+  }
+
+  private static final long MILLIS_PER_DAY = 86400000L;
+
+  //Export for test
+  public static final Conversions.DecimalConversion DECIMAL_CONVERSION = new 
Conversions.DecimalConversion();
+
+  /**
+   * Given a avro record with a given schema, rewrites it into the new schema 
while setting fields only from the new schema.
+   * support deep rewrite for nested record.
+   * This particular method does the following things :
+   * a) Create a new empty GenericRecord with the new schema.
+   * b) For GenericRecord, copy over the data from the old schema to the new 
schema or set default values for all fields of this transformed schema
+   *
+   * @param oldRecord oldRecord to be rewrite
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @return newRecord for new Schema
+   */
+  public static GenericRecord rewriteRecord(IndexedRecord oldRecord, Schema 
newSchema) {
+    Object newRecord = rewriteRecord(oldRecord, oldRecord.getSchema(), 
newSchema);
+    return (GenericData.Record) newRecord;
+  }
+
+  private static Object rewriteRecord(Object oldRecord, Schema oldSchema, 
Schema newSchema) {
+    if (oldRecord == null) {
+      return null;
+    }
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(oldRecord instanceof IndexedRecord)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
+        List<Schema.Field> fields = newSchema.getFields();
+        Map<Integer, Object> helper = new HashMap<>();
+
+        for (int i = 0; i < fields.size(); i++) {
+          Schema.Field field = fields.get(i);
+          if (oldSchema.getField(field.name()) != null) {
+            Schema.Field oldField = oldSchema.getField(field.name());
+            helper.put(i, rewriteRecord(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema()));
+          }
+        }
+        GenericData.Record newRecord = new GenericData.Record(newSchema);
+        for (int i = 0; i < fields.size(); i++) {
+          if (helper.containsKey(i)) {
+            newRecord.put(i, helper.get(i));
+          } else {
+            if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
+              newRecord.put(i, null);
+            } else {
+              newRecord.put(i, fields.get(i).defaultVal());
+            }
+          }
+        }
+        return newRecord;
+      case ARRAY:
+        if (!(oldRecord instanceof Collection)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        Collection array = (Collection)oldRecord;
+        List<Object> newArray = new ArrayList();
+        for (Object element : array) {
+          newArray.add(rewriteRecord(element, oldSchema.getElementType(), 
newSchema.getElementType()));
+        }
+        return newArray;
+      case MAP:
+        if (!(oldRecord instanceof Map)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        Map<Object, Object> map = (Map<Object, Object>) oldRecord;
+        Map<Object, Object> newMap = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : map.entrySet()) {
+          newMap.put(entry.getKey(), rewriteRecord(entry.getValue(), 
oldSchema.getValueType(), newSchema.getValueType()));
+        }
+        return newMap;
+      case UNION:
+        return rewriteRecord(oldRecord, getActualSchemaFromUnion(oldSchema, 
oldRecord), getActualSchemaFromUnion(newSchema, oldRecord));
+      default:
+        return rewritePrimaryType(oldRecord, oldSchema, newSchema);
+    }
+  }
+
+  private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, 
Schema newSchema) {
+    Schema realOldSchema = oldSchema;
+    if (realOldSchema.getType() == UNION) {
+      realOldSchema = getActualSchemaFromUnion(oldSchema, oldValue);
+    }
+    if (realOldSchema.getType() == newSchema.getType()) {
+      switch (realOldSchema.getType()) {
+        case NULL:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BYTES:
+        case STRING:
+          return oldValue;
+        case FIXED:
+          // fixed size and name must match:
+          if (!SchemaCompatibility.schemaNameEquals(realOldSchema, newSchema) 
|| realOldSchema.getFixedSize() != newSchema.getFixedSize()) {
+            // deal with the precision change for decimalType
+            if (realOldSchema.getLogicalType() instanceof 
LogicalTypes.Decimal) {
+              final byte[] bytes;
+              bytes = ((GenericFixed) oldValue).bytes();
+              LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
realOldSchema.getLogicalType();
+              BigDecimal bd = new BigDecimal(new BigInteger(bytes), 
decimal.getScale()).setScale(((LogicalTypes.Decimal) 
newSchema.getLogicalType()).getScale());
+              return DECIMAL_CONVERSION.toFixed(bd, newSchema, 
newSchema.getLogicalType());
+            }
+          } else {
+            return oldValue;
+          }
+          return oldValue;
+        default:
+          throw new AvroRuntimeException("Unknown schema type: " + 
newSchema.getType());
+      }
+    } else {
+      return rewritePrimaryTypeWithDiffSchemaType(oldValue, realOldSchema, 
newSchema);
+    }
+  }
+
+  private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, 
Schema oldSchema, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case NULL:
+      case BOOLEAN:
+        break;
+      case INT:
+        if (newSchema.getLogicalType() == LogicalTypes.date() && 
oldSchema.getType() == Schema.Type.STRING) {
+          return fromJavaDate(Date.valueOf(oldValue.toString()));
+        }
+        break;
+      case LONG:
+        if (oldSchema.getType() == Schema.Type.INT) {
+          return ((Integer) oldValue).longValue();
+        }
+        break;
+      case FLOAT:
+        if ((oldSchema.getType() == Schema.Type.INT)
+            || (oldSchema.getType() == Schema.Type.LONG)) {
+          return oldSchema.getType() == Schema.Type.INT ? ((Integer) 
oldValue).floatValue() : ((Long) oldValue).floatValue();
+        }
+        break;
+      case DOUBLE:
+        if (oldSchema.getType() == Schema.Type.FLOAT) {
+          // java float cannot convert to double directly, deal with float 
precision change
+          return Double.valueOf(oldValue + "");
+        } else if (oldSchema.getType() == Schema.Type.INT) {
+          return ((Integer) oldValue).doubleValue();
+        } else if (oldSchema.getType() == Schema.Type.LONG) {
+          return ((Long) oldValue).doubleValue();
+        }
+        break;
+      case BYTES:
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return ((String) oldValue).getBytes(StandardCharsets.UTF_8);
+        }
+        break;
+      case STRING:
+        if (oldSchema.getType() == Schema.Type.BYTES) {
+          return String.valueOf(((byte[]) oldValue));
+        }
+        if (oldSchema.getLogicalType() == LogicalTypes.date()) {
+          return toJavaDate((Integer) oldValue).toString();
+        }
+        if (oldSchema.getType() == Schema.Type.INT
+            || oldSchema.getType() == Schema.Type.LONG
+            || oldSchema.getType() == Schema.Type.FLOAT
+            || oldSchema.getType() == Schema.Type.DOUBLE) {
+          return oldValue.toString();
+        }
+        if (oldSchema.getType() == Schema.Type.FIXED && 
oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          final byte[] bytes;
+          bytes = ((GenericFixed) oldValue).bytes();
+          LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+          BigDecimal bd = new BigDecimal(new BigInteger(bytes), 
decimal.getScale());
+          return bd.toString();
+        }
+        break;
+      case FIXED:
+        // deal with decimal Type
+        if (newSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          // TODO: support more types
+          if (oldSchema.getType() == Schema.Type.STRING
+              || oldSchema.getType() == Schema.Type.DOUBLE
+              || oldSchema.getType() == Schema.Type.INT
+              || oldSchema.getType() == Schema.Type.LONG
+              || oldSchema.getType() == Schema.Type.FLOAT) {
+            LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
newSchema.getLogicalType();
+            BigDecimal bigDecimal = null;
+            if (oldSchema.getType() == Schema.Type.STRING) {
+              bigDecimal = new java.math.BigDecimal((String) oldValue)
+                  .setScale(decimal.getScale());
+            } else {
+              // Due to Java, there will be precision problems in direct 
conversion, we should use string instead of use double
+              bigDecimal = new java.math.BigDecimal(oldValue.toString())
+                  .setScale(decimal.getScale());
+            }
+            return DECIMAL_CONVERSION.toFixed(bigDecimal, newSchema, 
newSchema.getLogicalType());
+          }
+        }
+        break;
+      default:
+    }
+    throw new AvroRuntimeException(String.format("cannot support rewrite value 
for schema type: %s since the old schema type is: %s", newSchema, oldSchema));
+  }
+
+  // convert days to Date
+  private static Date toJavaDate(int days) {
+    long localMillis = Math.multiplyExact(days, MILLIS_PER_DAY);
+    int timeZoneOffset;
+    TimeZone defaultTimeZone = TimeZone.getDefault();
+    if (defaultTimeZone instanceof sun.util.calendar.ZoneInfo) {
+      timeZoneOffset = ((sun.util.calendar.ZoneInfo) 
defaultTimeZone).getOffsetsByWall(localMillis, null);
+    } else {
+      timeZoneOffset = defaultTimeZone.getOffset(localMillis - 
defaultTimeZone.getRawOffset());
+    }
+    return new Date(localMillis - timeZoneOffset);
+  }
+
+  // convert Date to days
+  private static int fromJavaDate(Date date) {
+    long millisUtc = date.getTime();
+    long millisLocal = millisUtc + TimeZone.getDefault().getOffset(millisUtc);
+    int julianDays = Math.toIntExact(Math.floorDiv(millisLocal, 
MILLIS_PER_DAY));
+    return julianDays;
+  }
+
+  private static Schema getActualSchemaFromUnion(Schema schema, Object data) {
+    Schema actualSchema;
+    if (!schema.getType().equals(UNION)) {
+      return schema;
+    }
+    if (schema.getTypes().size() == 2
+        && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+      actualSchema = schema.getTypes().get(1);
+    } else if (schema.getTypes().size() == 2
+        && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+      actualSchema = schema.getTypes().get(0);
+    } else if (schema.getTypes().size() == 1) {
+      actualSchema = schema.getTypes().get(0);
+    } else {
+      // deal complex union. this should not happened in hoodie,
+      // since flink/spark do not write this type.
+      int i = GenericData.get().resolveUnion(schema, data);
+      actualSchema = schema.getTypes().get(i);
+    }
+    return actualSchema;
+  }
+
+  /**
+   * Given avro records, rewrites them with new schema.
+   *
+   * @param oldRecords oldRecords to be rewrite
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @return a iterator of rewrote GeneriRcords
+   */
+  public static Iterator<GenericRecord> rewriteRecords(Iterator<GenericRecord> 
oldRecords, Schema newSchema) {
+    if (oldRecords == null || newSchema == null) {
+      return Collections.emptyIterator();
+    }
+    return new Iterator<GenericRecord>() {
+      @Override
+      public boolean hasNext() {
+        return oldRecords.hasNext();
+      }
+
+      @Override
+      public GenericRecord next() {
+        return rewriteRecord(oldRecords.next(), newSchema);
+      }
+    };
+  }
+
+  /**
+   * support evolution from a new avroSchema.
+   * notice: this is not a universal method,
+   * now hoodie support implicitly add columns when hoodie write operation,
+   * This ability needs to be preserved, so implicitly evolution for 
internalSchema should supported.
+   *
+   * @param evolvedSchema implicitly evolution of avro when hoodie write 
operation
+   * @param oldSchema old internalSchema
+   * @param supportPositionReorder support position reorder
+   * @return evolution Schema
+   */
+  public static InternalSchema evolutionSchemaFromNewAvroSchema(Schema 
evolvedSchema, InternalSchema oldSchema, Boolean supportPositionReorder) {
+    InternalSchema evolvedInternalSchema = 
AvroInternalSchemaConverter.convert(evolvedSchema);
+    // do check, only support add column evolution
+    List<String> colNamesFromEvolved = 
evolvedInternalSchema.getAllColsFullName();
+    List<String> colNamesFromOldSchema = oldSchema.getAllColsFullName();
+    List<String> diffFromOldSchema = colNamesFromOldSchema.stream().filter(f 
-> !colNamesFromEvolved.contains(f)).collect(Collectors.toList());
+    List<Types.Field> newFields = new ArrayList<>();
+    if (colNamesFromEvolved.size() == colNamesFromOldSchema.size() && 
diffFromOldSchema.size() == 0) {
+      // no changes happen
+      if (supportPositionReorder) {
+        evolvedInternalSchema.getRecord().fields().forEach(f -> 
newFields.add(oldSchema.getRecord().field(f.name())));
+        return new InternalSchema(newFields);
+      }
+      return oldSchema;
+    }
+    // try to find all added columns
+    if (diffFromOldSchema.size() != 0) {
+      throw new UnsupportedOperationException("cannot evolution schema 
implicitly, find delete/rename operation");
+    }
+
+    List<String> diffFromEvolutionSchema = 
colNamesFromEvolved.stream().filter(f -> 
!colNamesFromOldSchema.contains(f)).collect(Collectors.toList());
+    // Remove redundancy from diffFromEvolutionSchema.
+    // for example, now we add a struct col in evolvedSchema, the struct col 
is " user struct<name:string, age:int> "
+    // when we do diff operation: user, user.name, user.age will appeared in 
the resultSet which is redundancy, user.name and user.age should be excluded.
+    // deal with add operation
+    TreeMap<Integer, String> finalAddAction = new TreeMap<>();
+    for (int i = 0; i < diffFromEvolutionSchema.size(); i++)  {
+      String name = diffFromEvolutionSchema.get(i);
+      int splitPoint = name.lastIndexOf(".");
+      String parentName = splitPoint > 0 ? name.substring(0, splitPoint) : "";
+      if (!parentName.isEmpty() && 
diffFromEvolutionSchema.contains(parentName)) {
+        // find redundancy, skip it
+        continue;
+      }
+      finalAddAction.put(evolvedInternalSchema.findIdByName(name), name);
+    }
+
+    TableChanges.ColumnAddChange addChange = 
TableChanges.ColumnAddChange.get(oldSchema);
+    finalAddAction.entrySet().stream().forEach(f -> {
+      String name = f.getValue();
+      int splitPoint = name.lastIndexOf(".");
+      String parentName = splitPoint > 0 ? name.substring(0, splitPoint) : "";
+      String rawName = splitPoint > 0 ? name.substring(splitPoint + 1) : name;
+      addChange.addColumns(parentName, rawName, 
evolvedInternalSchema.findType(name), null);
+    });
+
+    InternalSchema res = SchemaChangeUtils.applyTableChanges2Schema(oldSchema, 
addChange);
+    if (supportPositionReorder) {
+      evolvedInternalSchema.getRecord().fields().forEach(f -> 
newFields.add(oldSchema.getRecord().field(f.name())));
+      return new InternalSchema(newFields);
+    } else {
+      return res;
+    }
+  }
+
+  public static InternalSchema evolutionSchemaFromNewAvroSchema(Schema 
evolvedSchema, InternalSchema oldSchema) {
+    return evolutionSchemaFromNewAvroSchema(evolvedSchema, oldSchema, false);
+  }
+
+  /**
+   * canonical the nullability.
+   * do not allow change cols Nullability field from optional to required.
+   * if above problem occurs, try to correct it.
+   *
+   * @param writeSchema writeSchema hoodie used to write data.
+   * @param readSchema read schema
+   * @return canonical Schema
+   */
+  public static Schema canonicalColumnNullability(Schema writeSchema, Schema 
readSchema) {

Review comment:
       nit: canonical to canonicalize

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java
##########
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SerDeHelper {
+  private SerDeHelper() {
+
+  }
+
+  public static final String LATESTSCHEMA = "latestSchema";
+  public static final String SCHEMAS = "schemas";
+  private static final String MAX_COLUMN_ID = "max_column_id";
+  private static final String VERSION_ID = "version-id";
+  private static final String TYPE = "type";
+  private static final String RECORD = "record";
+  private static final String ARRAY = "array";
+  private static final String MAP = "map";
+  private static final String FIELDS = "fields";
+  private static final String ELEMENT = "element";
+  private static final String KEY = "key";
+  private static final String VALUE = "value";
+  private static final String DOC = "doc";
+  private static final String NAME = "name";
+  private static final String ID = "id";
+  private static final String ELEMENT_ID = "element-id";
+  private static final String KEY_ID = "key-id";
+  private static final String VALUE_ID = "value-id";
+  private static final String OPTIONAL = "optional";
+  private static final String ELEMENT_OPTIONAL = "element_optional";
+  private static final String VALUE_OPTIONAL = "value_optional";
+
+  private static final Pattern FIXED = Pattern.compile("fixed\\[(\\d+)\\]");
+  private static final Pattern DECIMAL = 
Pattern.compile("decimal\\((\\d+),\\s+(\\d+)\\)");
+
+  /**
+   * convert history internalSchemas to json.
+   * this is used when save history schemas into hudi.
+   *
+   * @param internalSchemas history internal schemas
+   * @return a string
+   */
+  public static String toJson(List<InternalSchema> internalSchemas) {
+    try {
+      StringWriter writer = new StringWriter();
+      JsonGenerator generator = (new JsonFactory()).createGenerator(writer);
+      generator.writeStartObject();
+      generator.writeArrayFieldStart(SCHEMAS);
+      for (InternalSchema schema : internalSchemas) {
+        toJson(schema, generator);
+      }
+      generator.writeEndArray();
+      generator.writeEndObject();
+      generator.flush();
+      return writer.toString();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * convert internalSchemas to json.
+   *
+   * @param internalSchema a internal schema
+   * @return a string
+   */
+  public static String toJson(InternalSchema internalSchema) {
+    if (internalSchema == null || internalSchema.isDummySchema()) {
+      return "";
+    }
+    try {
+      StringWriter writer = new StringWriter();
+      JsonGenerator generator = (new JsonFactory()).createGenerator(writer);
+      toJson(internalSchema, generator);
+      generator.flush();
+      return writer.toString();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static void toJson(InternalSchema internalSchema, JsonGenerator 
generator) throws IOException {
+    toJson(internalSchema.getRecord(), internalSchema.getMax_column_id(), 
internalSchema.schemaId(), generator);
+  }
+
+  private static void toJson(Types.RecordType record, Integer maxColumnId, 
Long versionId, JsonGenerator generator) throws IOException {
+    generator.writeStartObject();
+    if (maxColumnId != null) {
+      generator.writeNumberField(MAX_COLUMN_ID, maxColumnId);
+    }
+    if (versionId != null) {
+      generator.writeNumberField(VERSION_ID, versionId);
+    }
+    generator.writeStringField(TYPE, RECORD);
+    generator.writeArrayFieldStart(FIELDS);
+    for (Types.Field field : record.fields()) {
+      generator.writeStartObject();
+      generator.writeNumberField(ID, field.fieldId());
+      generator.writeStringField(NAME, field.name());
+      generator.writeBooleanField(OPTIONAL, field.isOptional());
+      generator.writeFieldName(TYPE);
+      toJson(field.type(), generator);
+      if (field.doc() != null) {
+        generator.writeStringField(DOC, field.doc());
+      }
+      generator.writeEndObject();
+    }
+    generator.writeEndArray();
+    generator.writeEndObject();
+  }
+
+  private static void toJson(Type type, JsonGenerator generator) throws 
IOException {
+    switch (type.typeId()) {
+      case RECORD:
+        toJson((Types.RecordType) type, null, null, generator);
+        break;
+      case ARRAY:
+        Types.ArrayType array = (Types.ArrayType) type;
+        generator.writeStartObject();
+        generator.writeStringField(TYPE, ARRAY);
+        generator.writeNumberField(ELEMENT_ID, array.elementId());
+        generator.writeFieldName(ELEMENT);
+        toJson(array.elementType(), generator);
+        generator.writeBooleanField(ELEMENT_OPTIONAL, 
array.isElementOptional());
+        generator.writeEndObject();
+        break;
+      case MAP:
+        Types.MapType map = (Types.MapType) type;
+        generator.writeStartObject();
+        generator.writeStringField(TYPE, MAP);
+        generator.writeNumberField(KEY_ID, map.keyId());
+        generator.writeFieldName(KEY);
+        toJson(map.keyType(), generator);
+        generator.writeNumberField(VALUE_ID, map.valueId());
+        generator.writeFieldName(VALUE);
+        toJson(map.valueType(), generator);
+        generator.writeBooleanField(VALUE_OPTIONAL, map.isValueOptional());
+        generator.writeEndObject();
+        break;
+      default:
+        if (!type.isNestedType()) {
+          generator.writeString(type.toString());
+        } else {
+          throw new IllegalArgumentIOException(String.format("cannot write 
unknown types: %s", type));
+        }
+    }
+  }
+
+  private static Type parserTypeFromJson(JsonNode jsonNode) {
+    if (jsonNode.isTextual()) {
+      String type = jsonNode.asText().toLowerCase(Locale.ROOT);
+      // deal with fixed and decimal
+      Matcher fixed = FIXED.matcher(type);
+      if (fixed.matches()) {
+        return Types.FixedType.getFixed(Integer.parseInt(fixed.group(1)));
+      }
+      Matcher decimal = DECIMAL.matcher(type);
+      if (decimal.matches()) {
+        return Types.DecimalType.get(
+            Integer.parseInt(decimal.group(1)),
+            Integer.parseInt(decimal.group(2)));
+      }
+      // deal with other type
+      switch (Type.fromValue(type)) {
+        case BOOLEAN:
+          return Types.BooleanType.get();
+        case INT:
+          return Types.IntType.get();
+        case LONG:
+          return Types.LongType.get();
+        case FLOAT:
+          return Types.FloatType.get();
+        case DOUBLE:
+          return Types.DoubleType.get();
+        case DATE:
+          return Types.DateType.get();
+        case TIME:
+          return Types.TimeType.get();
+        case TIMESTAMP:
+          return Types.TimestampType.get();
+        case STRING:
+          return Types.StringType.get();
+        case UUID:
+          return Types.UUIDType.get();
+        case BINARY:
+          return Types.BinaryType.get();
+        default:
+          throw new IllegalArgumentException("cannot parser types from 
jsonNode");
+      }
+    } else if (jsonNode.isObject()) {
+      String typeStr = jsonNode.get(TYPE).asText();
+      if (RECORD.equals(typeStr)) {
+        JsonNode fieldNodes = jsonNode.get(FIELDS);
+        Iterator<JsonNode> iter = fieldNodes.elements();
+        List<Types.Field> fields = new ArrayList<>();
+        while (iter.hasNext()) {
+          JsonNode field = iter.next();
+          // extract
+          int id = field.get(ID).asInt();
+          String name = field.get(NAME).asText();
+          Type type = parserTypeFromJson(field.get(TYPE));
+          String doc = field.has(DOC) ? field.get(DOC).asText() : null;
+          boolean optional = field.get(OPTIONAL).asBoolean();
+          // build fields
+          fields.add(Types.Field.get(id, optional, name, type, doc));
+        }
+        return Types.RecordType.get(fields);
+      } else if (ARRAY.equals(typeStr)) {
+        int elementId = jsonNode.get(ELEMENT_ID).asInt();
+        Type elementType = parserTypeFromJson(jsonNode.get(ELEMENT));
+        boolean optional = jsonNode.get(ELEMENT_OPTIONAL).asBoolean();
+        return Types.ArrayType.get(elementId, optional, elementType);
+      } else if (MAP.equals(typeStr)) {
+        int keyId = jsonNode.get(KEY_ID).asInt();
+        Type keyType = parserTypeFromJson(jsonNode.get(KEY));
+        int valueId = jsonNode.get(VALUE_ID).asInt();
+        Type valueType = parserTypeFromJson(jsonNode.get(VALUE));
+        boolean optional = jsonNode.get(VALUE_OPTIONAL).asBoolean();
+        return Types.MapType.get(keyId, valueId, keyType, valueType, optional);
+      }
+    }
+    throw new IllegalArgumentException(String.format("cannot parse type from 
jsonNode: %s", jsonNode));
+  }
+
+  /**
+   * convert jsonNode to internalSchema.
+   *
+   * @param jsonNode a jsonNode.
+   * @return a internalSchema.
+   */
+  public static InternalSchema fromJson(JsonNode jsonNode) {
+    Integer maxColumnId = !jsonNode.has(MAX_COLUMN_ID) ? null : 
jsonNode.get(MAX_COLUMN_ID).asInt();
+    Long versionId = !jsonNode.has(VERSION_ID) ? null : 
jsonNode.get(VERSION_ID).asLong();
+    Types.RecordType type = (Types.RecordType)parserTypeFromJson(jsonNode);
+    if (versionId == null) {
+      return new InternalSchema(type.fields());
+    } else {
+      if (maxColumnId != null) {
+        return new InternalSchema(versionId, maxColumnId, type.fields());
+      } else {
+        return new InternalSchema(versionId, type.fields());
+      }
+    }
+  }
+
+  /**
+   * convert string to internalSchema.
+   *
+   * @param json a json string.
+   * @return a internalSchema.
+   */
+  public static Option<InternalSchema> fromJson(String json) {
+    if (json == null || json.isEmpty()) {
+      return Option.empty();
+    }
+    try {
+      return Option.of(fromJson((new ObjectMapper(new 
JsonFactory())).readValue(json, JsonNode.class)));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * convert json string to history internalSchemas.
+   * TreeMap is used to hold history internalSchemas.
+   *
+   * @param json a json string
+   * @return a TreeMap
+   */
+  public static TreeMap<Long, InternalSchema> parseSchemas(String json) {
+    TreeMap<Long, InternalSchema> result = new TreeMap<>();
+    try {
+      JsonNode jsonNode = (new ObjectMapper(new 
JsonFactory())).readValue(json, JsonNode.class);
+      if (!jsonNode.has(SCHEMAS)) {
+        throw new IllegalArgumentException(String.format("cannot parser 
schemas from current json string, missing key name: %s", SCHEMAS));
+      }
+      JsonNode schemas = jsonNode.get(SCHEMAS);
+      Iterator<JsonNode> iter = schemas.elements();
+      while (iter.hasNext()) {
+        JsonNode schema = iter.next();
+        InternalSchema current = fromJson(schema);
+        result.put(current.schemaId(), current);
+      }
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+    return result;
+  }
+
+  /**
+   * search target internalSchema by version number.
+   *
+   * @param versionId the internalSchema version to be search.
+   * @param internalSchemas internalSchemas to be searched.
+   * @return a internalSchema.
+   */
+  public static InternalSchema searchSchema(long versionId, 
List<InternalSchema> internalSchemas) {

Review comment:
       The methods: searchSchema and inheritSchema does not belong to this 
class. This class should only deal with conversion to/from json. 
   
   There are many classes which are named as "xxxxUtils", "xxxxHelper" which 
does a bunch of different things affecting code readability :)  
   
   Instead, it would be better if we group only related methods in a class and 
name them accordingly. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaUtil.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import static org.apache.avro.Schema.Type.UNION;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Conversions;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class AvroSchemaUtil {
+  private AvroSchemaUtil() {
+  }
+
+  private static final long MILLIS_PER_DAY = 86400000L;
+
+  //Export for test
+  public static final Conversions.DecimalConversion DECIMAL_CONVERSION = new 
Conversions.DecimalConversion();
+
+  /**
+   * Given a avro record with a given schema, rewrites it into the new schema 
while setting fields only from the new schema.
+   * support deep rewrite for nested record.
+   * This particular method does the following things :
+   * a) Create a new empty GenericRecord with the new schema.
+   * b) For GenericRecord, copy over the data from the old schema to the new 
schema or set default values for all fields of this transformed schema
+   *
+   * @param oldRecord oldRecord to be rewrite
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @return newRecord for new Schema
+   */
+  public static GenericRecord rewriteRecord(IndexedRecord oldRecord, Schema 
newSchema) {
+    Object newRecord = rewriteRecord(oldRecord, oldRecord.getSchema(), 
newSchema);
+    return (GenericData.Record) newRecord;
+  }
+
+  private static Object rewriteRecord(Object oldRecord, Schema oldSchema, 
Schema newSchema) {
+    if (oldRecord == null) {
+      return null;
+    }
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(oldRecord instanceof IndexedRecord)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
+        List<Schema.Field> fields = newSchema.getFields();
+        Map<Integer, Object> helper = new HashMap<>();
+
+        for (int i = 0; i < fields.size(); i++) {
+          Schema.Field field = fields.get(i);
+          if (oldSchema.getField(field.name()) != null) {
+            Schema.Field oldField = oldSchema.getField(field.name());
+            helper.put(i, rewriteRecord(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema()));
+          }
+        }
+        GenericData.Record newRecord = new GenericData.Record(newSchema);
+        for (int i = 0; i < fields.size(); i++) {
+          if (helper.containsKey(i)) {
+            newRecord.put(i, helper.get(i));
+          } else {
+            if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
+              newRecord.put(i, null);
+            } else {
+              newRecord.put(i, fields.get(i).defaultVal());
+            }
+          }
+        }
+        return newRecord;
+      case ARRAY:
+        if (!(oldRecord instanceof Collection)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        Collection array = (Collection)oldRecord;
+        List<Object> newArray = new ArrayList();
+        for (Object element : array) {
+          newArray.add(rewriteRecord(element, oldSchema.getElementType(), 
newSchema.getElementType()));
+        }
+        return newArray;
+      case MAP:
+        if (!(oldRecord instanceof Map)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        Map<Object, Object> map = (Map<Object, Object>) oldRecord;
+        Map<Object, Object> newMap = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : map.entrySet()) {
+          newMap.put(entry.getKey(), rewriteRecord(entry.getValue(), 
oldSchema.getValueType(), newSchema.getValueType()));
+        }
+        return newMap;
+      case UNION:
+        return rewriteRecord(oldRecord, getActualSchemaFromUnion(oldSchema, 
oldRecord), getActualSchemaFromUnion(newSchema, oldRecord));
+      default:
+        return rewritePrimaryType(oldRecord, oldSchema, newSchema);
+    }
+  }
+
+  private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, 
Schema newSchema) {
+    Schema realOldSchema = oldSchema;
+    if (realOldSchema.getType() == UNION) {
+      realOldSchema = getActualSchemaFromUnion(oldSchema, oldValue);
+    }
+    if (realOldSchema.getType() == newSchema.getType()) {
+      switch (realOldSchema.getType()) {
+        case NULL:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BYTES:
+        case STRING:
+          return oldValue;
+        case FIXED:
+          // fixed size and name must match:
+          if (!SchemaCompatibility.schemaNameEquals(realOldSchema, newSchema) 
|| realOldSchema.getFixedSize() != newSchema.getFixedSize()) {
+            // deal with the precision change for decimalType
+            if (realOldSchema.getLogicalType() instanceof 
LogicalTypes.Decimal) {
+              final byte[] bytes;
+              bytes = ((GenericFixed) oldValue).bytes();
+              LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
realOldSchema.getLogicalType();
+              BigDecimal bd = new BigDecimal(new BigInteger(bytes), 
decimal.getScale()).setScale(((LogicalTypes.Decimal) 
newSchema.getLogicalType()).getScale());
+              return DECIMAL_CONVERSION.toFixed(bd, newSchema, 
newSchema.getLogicalType());
+            }
+          } else {
+            return oldValue;
+          }
+          return oldValue;
+        default:
+          throw new AvroRuntimeException("Unknown schema type: " + 
newSchema.getType());
+      }
+    } else {
+      return rewritePrimaryTypeWithDiffSchemaType(oldValue, realOldSchema, 
newSchema);
+    }
+  }
+
+  private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, 
Schema oldSchema, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case NULL:
+      case BOOLEAN:
+        break;
+      case INT:
+        if (newSchema.getLogicalType() == LogicalTypes.date() && 
oldSchema.getType() == Schema.Type.STRING) {
+          return fromJavaDate(Date.valueOf(oldValue.toString()));
+        }
+        break;
+      case LONG:
+        if (oldSchema.getType() == Schema.Type.INT) {
+          return ((Integer) oldValue).longValue();
+        }
+        break;
+      case FLOAT:
+        if ((oldSchema.getType() == Schema.Type.INT)
+            || (oldSchema.getType() == Schema.Type.LONG)) {
+          return oldSchema.getType() == Schema.Type.INT ? ((Integer) 
oldValue).floatValue() : ((Long) oldValue).floatValue();
+        }
+        break;
+      case DOUBLE:
+        if (oldSchema.getType() == Schema.Type.FLOAT) {
+          // java float cannot convert to double directly, deal with float 
precision change
+          return Double.valueOf(oldValue + "");
+        } else if (oldSchema.getType() == Schema.Type.INT) {
+          return ((Integer) oldValue).doubleValue();
+        } else if (oldSchema.getType() == Schema.Type.LONG) {
+          return ((Long) oldValue).doubleValue();
+        }
+        break;
+      case BYTES:
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return ((String) oldValue).getBytes(StandardCharsets.UTF_8);
+        }
+        break;
+      case STRING:
+        if (oldSchema.getType() == Schema.Type.BYTES) {
+          return String.valueOf(((byte[]) oldValue));
+        }
+        if (oldSchema.getLogicalType() == LogicalTypes.date()) {
+          return toJavaDate((Integer) oldValue).toString();
+        }
+        if (oldSchema.getType() == Schema.Type.INT
+            || oldSchema.getType() == Schema.Type.LONG
+            || oldSchema.getType() == Schema.Type.FLOAT
+            || oldSchema.getType() == Schema.Type.DOUBLE) {
+          return oldValue.toString();
+        }
+        if (oldSchema.getType() == Schema.Type.FIXED && 
oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          final byte[] bytes;
+          bytes = ((GenericFixed) oldValue).bytes();
+          LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+          BigDecimal bd = new BigDecimal(new BigInteger(bytes), 
decimal.getScale());
+          return bd.toString();
+        }
+        break;
+      case FIXED:
+        // deal with decimal Type
+        if (newSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          // TODO: support more types
+          if (oldSchema.getType() == Schema.Type.STRING
+              || oldSchema.getType() == Schema.Type.DOUBLE
+              || oldSchema.getType() == Schema.Type.INT
+              || oldSchema.getType() == Schema.Type.LONG
+              || oldSchema.getType() == Schema.Type.FLOAT) {
+            LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
newSchema.getLogicalType();
+            BigDecimal bigDecimal = null;
+            if (oldSchema.getType() == Schema.Type.STRING) {
+              bigDecimal = new java.math.BigDecimal((String) oldValue)
+                  .setScale(decimal.getScale());
+            } else {
+              // Due to Java, there will be precision problems in direct 
conversion, we should use string instead of use double
+              bigDecimal = new java.math.BigDecimal(oldValue.toString())
+                  .setScale(decimal.getScale());
+            }
+            return DECIMAL_CONVERSION.toFixed(bigDecimal, newSchema, 
newSchema.getLogicalType());
+          }
+        }
+        break;
+      default:
+    }
+    throw new AvroRuntimeException(String.format("cannot support rewrite value 
for schema type: %s since the old schema type is: %s", newSchema, oldSchema));
+  }
+
+  // convert days to Date
+  private static Date toJavaDate(int days) {
+    long localMillis = Math.multiplyExact(days, MILLIS_PER_DAY);
+    int timeZoneOffset;
+    TimeZone defaultTimeZone = TimeZone.getDefault();
+    if (defaultTimeZone instanceof sun.util.calendar.ZoneInfo) {
+      timeZoneOffset = ((sun.util.calendar.ZoneInfo) 
defaultTimeZone).getOffsetsByWall(localMillis, null);
+    } else {
+      timeZoneOffset = defaultTimeZone.getOffset(localMillis - 
defaultTimeZone.getRawOffset());
+    }
+    return new Date(localMillis - timeZoneOffset);
+  }
+
+  // convert Date to days
+  private static int fromJavaDate(Date date) {
+    long millisUtc = date.getTime();
+    long millisLocal = millisUtc + TimeZone.getDefault().getOffset(millisUtc);
+    int julianDays = Math.toIntExact(Math.floorDiv(millisLocal, 
MILLIS_PER_DAY));
+    return julianDays;
+  }
+
+  private static Schema getActualSchemaFromUnion(Schema schema, Object data) {
+    Schema actualSchema;
+    if (!schema.getType().equals(UNION)) {
+      return schema;
+    }
+    if (schema.getTypes().size() == 2
+        && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+      actualSchema = schema.getTypes().get(1);
+    } else if (schema.getTypes().size() == 2
+        && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+      actualSchema = schema.getTypes().get(0);
+    } else if (schema.getTypes().size() == 1) {
+      actualSchema = schema.getTypes().get(0);
+    } else {
+      // deal complex union. this should not happened in hoodie,
+      // since flink/spark do not write this type.
+      int i = GenericData.get().resolveUnion(schema, data);
+      actualSchema = schema.getTypes().get(i);
+    }
+    return actualSchema;
+  }
+
+  /**
+   * Given avro records, rewrites them with new schema.
+   *
+   * @param oldRecords oldRecords to be rewrite
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @return a iterator of rewrote GeneriRcords
+   */
+  public static Iterator<GenericRecord> rewriteRecords(Iterator<GenericRecord> 
oldRecords, Schema newSchema) {
+    if (oldRecords == null || newSchema == null) {
+      return Collections.emptyIterator();
+    }
+    return new Iterator<GenericRecord>() {
+      @Override
+      public boolean hasNext() {
+        return oldRecords.hasNext();
+      }
+
+      @Override
+      public GenericRecord next() {
+        return rewriteRecord(oldRecords.next(), newSchema);
+      }
+    };
+  }
+
+  /**
+   * support evolution from a new avroSchema.
+   * notice: this is not a universal method,
+   * now hoodie support implicitly add columns when hoodie write operation,
+   * This ability needs to be preserved, so implicitly evolution for 
internalSchema should supported.
+   *
+   * @param evolvedSchema implicitly evolution of avro when hoodie write 
operation
+   * @param oldSchema old internalSchema
+   * @param supportPositionReorder support position reorder
+   * @return evolution Schema
+   */
+  public static InternalSchema evolutionSchemaFromNewAvroSchema(Schema 
evolvedSchema, InternalSchema oldSchema, Boolean supportPositionReorder) {

Review comment:
       Can we move methods that handles InternalSchema to one class. 
   Can you look at HoodieAvroUtils and move the avro specific methods to that 
class. There is a rewrite method there (without schema evolution handling).It 
would be better to add the methods to that class

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
##########
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.Types.Field;
+import org.apache.hudi.internal.schema.Types.RecordType;
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+import org.apache.hudi.internal.schema.visitor.InternalSchemaVisitor;
+import org.apache.hudi.internal.schema.visitor.NameToIDVisitor;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class InternalSchemaUtils {

Review comment:
       This class primarily deals with building Internal Schema. Lets rename 
the class to InternalSchemaBuilder and make the relevant  methods non-static.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaUtil.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import static org.apache.avro.Schema.Type.UNION;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Conversions;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class AvroSchemaUtil {
+  private AvroSchemaUtil() {
+  }
+
+  private static final long MILLIS_PER_DAY = 86400000L;
+
+  //Export for test
+  public static final Conversions.DecimalConversion DECIMAL_CONVERSION = new 
Conversions.DecimalConversion();
+
+  /**
+   * Given a avro record with a given schema, rewrites it into the new schema 
while setting fields only from the new schema.
+   * support deep rewrite for nested record.
+   * This particular method does the following things :
+   * a) Create a new empty GenericRecord with the new schema.
+   * b) For GenericRecord, copy over the data from the old schema to the new 
schema or set default values for all fields of this transformed schema
+   *
+   * @param oldRecord oldRecord to be rewrite
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @return newRecord for new Schema
+   */
+  public static GenericRecord rewriteRecord(IndexedRecord oldRecord, Schema 
newSchema) {
+    Object newRecord = rewriteRecord(oldRecord, oldRecord.getSchema(), 
newSchema);
+    return (GenericData.Record) newRecord;
+  }
+
+  private static Object rewriteRecord(Object oldRecord, Schema oldSchema, 
Schema newSchema) {
+    if (oldRecord == null) {
+      return null;
+    }
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(oldRecord instanceof IndexedRecord)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
+        List<Schema.Field> fields = newSchema.getFields();
+        Map<Integer, Object> helper = new HashMap<>();
+
+        for (int i = 0; i < fields.size(); i++) {
+          Schema.Field field = fields.get(i);
+          if (oldSchema.getField(field.name()) != null) {
+            Schema.Field oldField = oldSchema.getField(field.name());
+            helper.put(i, rewriteRecord(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema()));
+          }
+        }
+        GenericData.Record newRecord = new GenericData.Record(newSchema);
+        for (int i = 0; i < fields.size(); i++) {
+          if (helper.containsKey(i)) {
+            newRecord.put(i, helper.get(i));
+          } else {
+            if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
+              newRecord.put(i, null);
+            } else {
+              newRecord.put(i, fields.get(i).defaultVal());
+            }
+          }
+        }
+        return newRecord;
+      case ARRAY:
+        if (!(oldRecord instanceof Collection)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        Collection array = (Collection)oldRecord;
+        List<Object> newArray = new ArrayList();
+        for (Object element : array) {
+          newArray.add(rewriteRecord(element, oldSchema.getElementType(), 
newSchema.getElementType()));
+        }
+        return newArray;
+      case MAP:
+        if (!(oldRecord instanceof Map)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        Map<Object, Object> map = (Map<Object, Object>) oldRecord;
+        Map<Object, Object> newMap = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : map.entrySet()) {
+          newMap.put(entry.getKey(), rewriteRecord(entry.getValue(), 
oldSchema.getValueType(), newSchema.getValueType()));
+        }
+        return newMap;
+      case UNION:
+        return rewriteRecord(oldRecord, getActualSchemaFromUnion(oldSchema, 
oldRecord), getActualSchemaFromUnion(newSchema, oldRecord));
+      default:
+        return rewritePrimaryType(oldRecord, oldSchema, newSchema);
+    }
+  }
+
+  private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, 
Schema newSchema) {
+    Schema realOldSchema = oldSchema;
+    if (realOldSchema.getType() == UNION) {
+      realOldSchema = getActualSchemaFromUnion(oldSchema, oldValue);
+    }
+    if (realOldSchema.getType() == newSchema.getType()) {
+      switch (realOldSchema.getType()) {
+        case NULL:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BYTES:
+        case STRING:
+          return oldValue;
+        case FIXED:
+          // fixed size and name must match:
+          if (!SchemaCompatibility.schemaNameEquals(realOldSchema, newSchema) 
|| realOldSchema.getFixedSize() != newSchema.getFixedSize()) {
+            // deal with the precision change for decimalType
+            if (realOldSchema.getLogicalType() instanceof 
LogicalTypes.Decimal) {
+              final byte[] bytes;
+              bytes = ((GenericFixed) oldValue).bytes();
+              LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
realOldSchema.getLogicalType();
+              BigDecimal bd = new BigDecimal(new BigInteger(bytes), 
decimal.getScale()).setScale(((LogicalTypes.Decimal) 
newSchema.getLogicalType()).getScale());
+              return DECIMAL_CONVERSION.toFixed(bd, newSchema, 
newSchema.getLogicalType());
+            }
+          } else {
+            return oldValue;
+          }
+          return oldValue;
+        default:
+          throw new AvroRuntimeException("Unknown schema type: " + 
newSchema.getType());
+      }
+    } else {
+      return rewritePrimaryTypeWithDiffSchemaType(oldValue, realOldSchema, 
newSchema);
+    }
+  }
+
+  private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, 
Schema oldSchema, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case NULL:
+      case BOOLEAN:
+        break;
+      case INT:
+        if (newSchema.getLogicalType() == LogicalTypes.date() && 
oldSchema.getType() == Schema.Type.STRING) {
+          return fromJavaDate(Date.valueOf(oldValue.toString()));
+        }
+        break;
+      case LONG:
+        if (oldSchema.getType() == Schema.Type.INT) {
+          return ((Integer) oldValue).longValue();
+        }
+        break;
+      case FLOAT:
+        if ((oldSchema.getType() == Schema.Type.INT)
+            || (oldSchema.getType() == Schema.Type.LONG)) {
+          return oldSchema.getType() == Schema.Type.INT ? ((Integer) 
oldValue).floatValue() : ((Long) oldValue).floatValue();
+        }
+        break;
+      case DOUBLE:
+        if (oldSchema.getType() == Schema.Type.FLOAT) {
+          // java float cannot convert to double directly, deal with float 
precision change
+          return Double.valueOf(oldValue + "");
+        } else if (oldSchema.getType() == Schema.Type.INT) {
+          return ((Integer) oldValue).doubleValue();
+        } else if (oldSchema.getType() == Schema.Type.LONG) {
+          return ((Long) oldValue).doubleValue();
+        }
+        break;
+      case BYTES:
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return ((String) oldValue).getBytes(StandardCharsets.UTF_8);
+        }
+        break;
+      case STRING:
+        if (oldSchema.getType() == Schema.Type.BYTES) {
+          return String.valueOf(((byte[]) oldValue));
+        }
+        if (oldSchema.getLogicalType() == LogicalTypes.date()) {
+          return toJavaDate((Integer) oldValue).toString();
+        }
+        if (oldSchema.getType() == Schema.Type.INT
+            || oldSchema.getType() == Schema.Type.LONG
+            || oldSchema.getType() == Schema.Type.FLOAT
+            || oldSchema.getType() == Schema.Type.DOUBLE) {
+          return oldValue.toString();
+        }
+        if (oldSchema.getType() == Schema.Type.FIXED && 
oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          final byte[] bytes;
+          bytes = ((GenericFixed) oldValue).bytes();
+          LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+          BigDecimal bd = new BigDecimal(new BigInteger(bytes), 
decimal.getScale());
+          return bd.toString();
+        }
+        break;
+      case FIXED:
+        // deal with decimal Type
+        if (newSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          // TODO: support more types
+          if (oldSchema.getType() == Schema.Type.STRING
+              || oldSchema.getType() == Schema.Type.DOUBLE
+              || oldSchema.getType() == Schema.Type.INT
+              || oldSchema.getType() == Schema.Type.LONG
+              || oldSchema.getType() == Schema.Type.FLOAT) {
+            LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
newSchema.getLogicalType();
+            BigDecimal bigDecimal = null;
+            if (oldSchema.getType() == Schema.Type.STRING) {
+              bigDecimal = new java.math.BigDecimal((String) oldValue)
+                  .setScale(decimal.getScale());
+            } else {
+              // Due to Java, there will be precision problems in direct 
conversion, we should use string instead of use double
+              bigDecimal = new java.math.BigDecimal(oldValue.toString())
+                  .setScale(decimal.getScale());
+            }
+            return DECIMAL_CONVERSION.toFixed(bigDecimal, newSchema, 
newSchema.getLogicalType());
+          }
+        }
+        break;
+      default:
+    }
+    throw new AvroRuntimeException(String.format("cannot support rewrite value 
for schema type: %s since the old schema type is: %s", newSchema, oldSchema));
+  }
+
+  // convert days to Date
+  private static Date toJavaDate(int days) {
+    long localMillis = Math.multiplyExact(days, MILLIS_PER_DAY);
+    int timeZoneOffset;
+    TimeZone defaultTimeZone = TimeZone.getDefault();
+    if (defaultTimeZone instanceof sun.util.calendar.ZoneInfo) {
+      timeZoneOffset = ((sun.util.calendar.ZoneInfo) 
defaultTimeZone).getOffsetsByWall(localMillis, null);
+    } else {
+      timeZoneOffset = defaultTimeZone.getOffset(localMillis - 
defaultTimeZone.getRawOffset());
+    }
+    return new Date(localMillis - timeZoneOffset);
+  }
+
+  // convert Date to days
+  private static int fromJavaDate(Date date) {
+    long millisUtc = date.getTime();
+    long millisLocal = millisUtc + TimeZone.getDefault().getOffset(millisUtc);
+    int julianDays = Math.toIntExact(Math.floorDiv(millisLocal, 
MILLIS_PER_DAY));
+    return julianDays;
+  }
+
+  private static Schema getActualSchemaFromUnion(Schema schema, Object data) {
+    Schema actualSchema;
+    if (!schema.getType().equals(UNION)) {
+      return schema;
+    }
+    if (schema.getTypes().size() == 2
+        && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+      actualSchema = schema.getTypes().get(1);
+    } else if (schema.getTypes().size() == 2
+        && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+      actualSchema = schema.getTypes().get(0);
+    } else if (schema.getTypes().size() == 1) {
+      actualSchema = schema.getTypes().get(0);
+    } else {
+      // deal complex union. this should not happened in hoodie,
+      // since flink/spark do not write this type.
+      int i = GenericData.get().resolveUnion(schema, data);
+      actualSchema = schema.getTypes().get(i);
+    }
+    return actualSchema;
+  }
+
+  /**
+   * Given avro records, rewrites them with new schema.
+   *
+   * @param oldRecords oldRecords to be rewrite
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @return a iterator of rewrote GeneriRcords
+   */
+  public static Iterator<GenericRecord> rewriteRecords(Iterator<GenericRecord> 
oldRecords, Schema newSchema) {
+    if (oldRecords == null || newSchema == null) {
+      return Collections.emptyIterator();
+    }
+    return new Iterator<GenericRecord>() {
+      @Override
+      public boolean hasNext() {
+        return oldRecords.hasNext();
+      }
+
+      @Override
+      public GenericRecord next() {
+        return rewriteRecord(oldRecords.next(), newSchema);
+      }
+    };
+  }
+
+  /**
+   * support evolution from a new avroSchema.
+   * notice: this is not a universal method,
+   * now hoodie support implicitly add columns when hoodie write operation,
+   * This ability needs to be preserved, so implicitly evolution for 
internalSchema should supported.
+   *
+   * @param evolvedSchema implicitly evolution of avro when hoodie write 
operation
+   * @param oldSchema old internalSchema
+   * @param supportPositionReorder support position reorder
+   * @return evolution Schema
+   */
+  public static InternalSchema evolutionSchemaFromNewAvroSchema(Schema 
evolvedSchema, InternalSchema oldSchema, Boolean supportPositionReorder) {
+    InternalSchema evolvedInternalSchema = 
AvroInternalSchemaConverter.convert(evolvedSchema);
+    // do check, only support add column evolution
+    List<String> colNamesFromEvolved = 
evolvedInternalSchema.getAllColsFullName();
+    List<String> colNamesFromOldSchema = oldSchema.getAllColsFullName();
+    List<String> diffFromOldSchema = colNamesFromOldSchema.stream().filter(f 
-> !colNamesFromEvolved.contains(f)).collect(Collectors.toList());
+    List<Types.Field> newFields = new ArrayList<>();
+    if (colNamesFromEvolved.size() == colNamesFromOldSchema.size() && 
diffFromOldSchema.size() == 0) {
+      // no changes happen
+      if (supportPositionReorder) {
+        evolvedInternalSchema.getRecord().fields().forEach(f -> 
newFields.add(oldSchema.getRecord().field(f.name())));
+        return new InternalSchema(newFields);
+      }
+      return oldSchema;
+    }
+    // try to find all added columns
+    if (diffFromOldSchema.size() != 0) {
+      throw new UnsupportedOperationException("cannot evolution schema 
implicitly, find delete/rename operation");

Review comment:
       nit: evolution -> evolve, cannot -> Cannot

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SchemaChangePersistHelper.java
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.action.TableChange;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.action.TableChangesHelper;
+
+import java.util.Arrays;
+
+public class SchemaChangePersistHelper {

Review comment:
       Instead of defining this class and SchemaChangeUtils as utils, can we 
reorganize this :  
   Create 2 classes InternalSchemaChangeApplier   and 
InternalTypeChangeApplier. You can move the relevant methods to each class and 
make them non-static. InternalSchemaChangeApplier should delegate to 
InternalTypeChangeApplier when needed.
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaUtil.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import static org.apache.avro.Schema.Type.UNION;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Conversions;
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaCompatibility;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class AvroSchemaUtil {
+  private AvroSchemaUtil() {
+  }
+
+  private static final long MILLIS_PER_DAY = 86400000L;
+
+  //Export for test
+  public static final Conversions.DecimalConversion DECIMAL_CONVERSION = new 
Conversions.DecimalConversion();
+
+  /**
+   * Given a avro record with a given schema, rewrites it into the new schema 
while setting fields only from the new schema.
+   * support deep rewrite for nested record.
+   * This particular method does the following things :
+   * a) Create a new empty GenericRecord with the new schema.
+   * b) For GenericRecord, copy over the data from the old schema to the new 
schema or set default values for all fields of this transformed schema
+   *
+   * @param oldRecord oldRecord to be rewrite
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @return newRecord for new Schema
+   */
+  public static GenericRecord rewriteRecord(IndexedRecord oldRecord, Schema 
newSchema) {
+    Object newRecord = rewriteRecord(oldRecord, oldRecord.getSchema(), 
newSchema);
+    return (GenericData.Record) newRecord;
+  }
+
+  private static Object rewriteRecord(Object oldRecord, Schema oldSchema, 
Schema newSchema) {
+    if (oldRecord == null) {
+      return null;
+    }
+    switch (newSchema.getType()) {
+      case RECORD:
+        if (!(oldRecord instanceof IndexedRecord)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        IndexedRecord indexedRecord = (IndexedRecord) oldRecord;
+        List<Schema.Field> fields = newSchema.getFields();
+        Map<Integer, Object> helper = new HashMap<>();
+
+        for (int i = 0; i < fields.size(); i++) {
+          Schema.Field field = fields.get(i);
+          if (oldSchema.getField(field.name()) != null) {
+            Schema.Field oldField = oldSchema.getField(field.name());
+            helper.put(i, rewriteRecord(indexedRecord.get(oldField.pos()), 
oldField.schema(), fields.get(i).schema()));
+          }
+        }
+        GenericData.Record newRecord = new GenericData.Record(newSchema);
+        for (int i = 0; i < fields.size(); i++) {
+          if (helper.containsKey(i)) {
+            newRecord.put(i, helper.get(i));
+          } else {
+            if (fields.get(i).defaultVal() instanceof JsonProperties.Null) {
+              newRecord.put(i, null);
+            } else {
+              newRecord.put(i, fields.get(i).defaultVal());
+            }
+          }
+        }
+        return newRecord;
+      case ARRAY:
+        if (!(oldRecord instanceof Collection)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        Collection array = (Collection)oldRecord;
+        List<Object> newArray = new ArrayList();
+        for (Object element : array) {
+          newArray.add(rewriteRecord(element, oldSchema.getElementType(), 
newSchema.getElementType()));
+        }
+        return newArray;
+      case MAP:
+        if (!(oldRecord instanceof Map)) {
+          throw new IllegalArgumentException("cannot rewrite record with 
different type");
+        }
+        Map<Object, Object> map = (Map<Object, Object>) oldRecord;
+        Map<Object, Object> newMap = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : map.entrySet()) {
+          newMap.put(entry.getKey(), rewriteRecord(entry.getValue(), 
oldSchema.getValueType(), newSchema.getValueType()));
+        }
+        return newMap;
+      case UNION:
+        return rewriteRecord(oldRecord, getActualSchemaFromUnion(oldSchema, 
oldRecord), getActualSchemaFromUnion(newSchema, oldRecord));
+      default:
+        return rewritePrimaryType(oldRecord, oldSchema, newSchema);
+    }
+  }
+
+  private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, 
Schema newSchema) {
+    Schema realOldSchema = oldSchema;
+    if (realOldSchema.getType() == UNION) {
+      realOldSchema = getActualSchemaFromUnion(oldSchema, oldValue);
+    }
+    if (realOldSchema.getType() == newSchema.getType()) {
+      switch (realOldSchema.getType()) {
+        case NULL:
+        case BOOLEAN:
+        case INT:
+        case LONG:
+        case FLOAT:
+        case DOUBLE:
+        case BYTES:
+        case STRING:
+          return oldValue;
+        case FIXED:
+          // fixed size and name must match:
+          if (!SchemaCompatibility.schemaNameEquals(realOldSchema, newSchema) 
|| realOldSchema.getFixedSize() != newSchema.getFixedSize()) {
+            // deal with the precision change for decimalType
+            if (realOldSchema.getLogicalType() instanceof 
LogicalTypes.Decimal) {
+              final byte[] bytes;
+              bytes = ((GenericFixed) oldValue).bytes();
+              LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
realOldSchema.getLogicalType();
+              BigDecimal bd = new BigDecimal(new BigInteger(bytes), 
decimal.getScale()).setScale(((LogicalTypes.Decimal) 
newSchema.getLogicalType()).getScale());
+              return DECIMAL_CONVERSION.toFixed(bd, newSchema, 
newSchema.getLogicalType());
+            }
+          } else {
+            return oldValue;
+          }
+          return oldValue;
+        default:
+          throw new AvroRuntimeException("Unknown schema type: " + 
newSchema.getType());
+      }
+    } else {
+      return rewritePrimaryTypeWithDiffSchemaType(oldValue, realOldSchema, 
newSchema);
+    }
+  }
+
+  private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, 
Schema oldSchema, Schema newSchema) {
+    switch (newSchema.getType()) {
+      case NULL:
+      case BOOLEAN:
+        break;
+      case INT:
+        if (newSchema.getLogicalType() == LogicalTypes.date() && 
oldSchema.getType() == Schema.Type.STRING) {
+          return fromJavaDate(Date.valueOf(oldValue.toString()));
+        }
+        break;
+      case LONG:
+        if (oldSchema.getType() == Schema.Type.INT) {
+          return ((Integer) oldValue).longValue();
+        }
+        break;
+      case FLOAT:
+        if ((oldSchema.getType() == Schema.Type.INT)
+            || (oldSchema.getType() == Schema.Type.LONG)) {
+          return oldSchema.getType() == Schema.Type.INT ? ((Integer) 
oldValue).floatValue() : ((Long) oldValue).floatValue();
+        }
+        break;
+      case DOUBLE:
+        if (oldSchema.getType() == Schema.Type.FLOAT) {
+          // java float cannot convert to double directly, deal with float 
precision change
+          return Double.valueOf(oldValue + "");
+        } else if (oldSchema.getType() == Schema.Type.INT) {
+          return ((Integer) oldValue).doubleValue();
+        } else if (oldSchema.getType() == Schema.Type.LONG) {
+          return ((Long) oldValue).doubleValue();
+        }
+        break;
+      case BYTES:
+        if (oldSchema.getType() == Schema.Type.STRING) {
+          return ((String) oldValue).getBytes(StandardCharsets.UTF_8);
+        }
+        break;
+      case STRING:
+        if (oldSchema.getType() == Schema.Type.BYTES) {
+          return String.valueOf(((byte[]) oldValue));
+        }
+        if (oldSchema.getLogicalType() == LogicalTypes.date()) {
+          return toJavaDate((Integer) oldValue).toString();
+        }
+        if (oldSchema.getType() == Schema.Type.INT
+            || oldSchema.getType() == Schema.Type.LONG
+            || oldSchema.getType() == Schema.Type.FLOAT
+            || oldSchema.getType() == Schema.Type.DOUBLE) {
+          return oldValue.toString();
+        }
+        if (oldSchema.getType() == Schema.Type.FIXED && 
oldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          final byte[] bytes;
+          bytes = ((GenericFixed) oldValue).bytes();
+          LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
oldSchema.getLogicalType();
+          BigDecimal bd = new BigDecimal(new BigInteger(bytes), 
decimal.getScale());
+          return bd.toString();
+        }
+        break;
+      case FIXED:
+        // deal with decimal Type
+        if (newSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
+          // TODO: support more types
+          if (oldSchema.getType() == Schema.Type.STRING
+              || oldSchema.getType() == Schema.Type.DOUBLE
+              || oldSchema.getType() == Schema.Type.INT
+              || oldSchema.getType() == Schema.Type.LONG
+              || oldSchema.getType() == Schema.Type.FLOAT) {
+            LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) 
newSchema.getLogicalType();
+            BigDecimal bigDecimal = null;
+            if (oldSchema.getType() == Schema.Type.STRING) {
+              bigDecimal = new java.math.BigDecimal((String) oldValue)
+                  .setScale(decimal.getScale());
+            } else {
+              // Due to Java, there will be precision problems in direct 
conversion, we should use string instead of use double
+              bigDecimal = new java.math.BigDecimal(oldValue.toString())
+                  .setScale(decimal.getScale());
+            }
+            return DECIMAL_CONVERSION.toFixed(bigDecimal, newSchema, 
newSchema.getLogicalType());
+          }
+        }
+        break;
+      default:
+    }
+    throw new AvroRuntimeException(String.format("cannot support rewrite value 
for schema type: %s since the old schema type is: %s", newSchema, oldSchema));
+  }
+
+  // convert days to Date
+  private static Date toJavaDate(int days) {
+    long localMillis = Math.multiplyExact(days, MILLIS_PER_DAY);
+    int timeZoneOffset;
+    TimeZone defaultTimeZone = TimeZone.getDefault();
+    if (defaultTimeZone instanceof sun.util.calendar.ZoneInfo) {
+      timeZoneOffset = ((sun.util.calendar.ZoneInfo) 
defaultTimeZone).getOffsetsByWall(localMillis, null);
+    } else {
+      timeZoneOffset = defaultTimeZone.getOffset(localMillis - 
defaultTimeZone.getRawOffset());
+    }
+    return new Date(localMillis - timeZoneOffset);
+  }
+
+  // convert Date to days
+  private static int fromJavaDate(Date date) {
+    long millisUtc = date.getTime();
+    long millisLocal = millisUtc + TimeZone.getDefault().getOffset(millisUtc);
+    int julianDays = Math.toIntExact(Math.floorDiv(millisLocal, 
MILLIS_PER_DAY));
+    return julianDays;
+  }
+
+  private static Schema getActualSchemaFromUnion(Schema schema, Object data) {
+    Schema actualSchema;
+    if (!schema.getType().equals(UNION)) {
+      return schema;
+    }
+    if (schema.getTypes().size() == 2
+        && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
+      actualSchema = schema.getTypes().get(1);
+    } else if (schema.getTypes().size() == 2
+        && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
+      actualSchema = schema.getTypes().get(0);
+    } else if (schema.getTypes().size() == 1) {
+      actualSchema = schema.getTypes().get(0);
+    } else {
+      // deal complex union. this should not happened in hoodie,
+      // since flink/spark do not write this type.
+      int i = GenericData.get().resolveUnion(schema, data);
+      actualSchema = schema.getTypes().get(i);
+    }
+    return actualSchema;
+  }
+
+  /**
+   * Given avro records, rewrites them with new schema.
+   *
+   * @param oldRecords oldRecords to be rewrite
+   * @param newSchema newSchema used to rewrite oldRecord
+   * @return a iterator of rewrote GeneriRcords
+   */
+  public static Iterator<GenericRecord> rewriteRecords(Iterator<GenericRecord> 
oldRecords, Schema newSchema) {
+    if (oldRecords == null || newSchema == null) {
+      return Collections.emptyIterator();
+    }
+    return new Iterator<GenericRecord>() {
+      @Override
+      public boolean hasNext() {
+        return oldRecords.hasNext();
+      }
+
+      @Override
+      public GenericRecord next() {
+        return rewriteRecord(oldRecords.next(), newSchema);
+      }
+    };
+  }
+
+  /**
+   * support evolution from a new avroSchema.
+   * notice: this is not a universal method,
+   * now hoodie support implicitly add columns when hoodie write operation,
+   * This ability needs to be preserved, so implicitly evolution for 
internalSchema should supported.
+   *
+   * @param evolvedSchema implicitly evolution of avro when hoodie write 
operation
+   * @param oldSchema old internalSchema
+   * @param supportPositionReorder support position reorder
+   * @return evolution Schema
+   */
+  public static InternalSchema evolutionSchemaFromNewAvroSchema(Schema 
evolvedSchema, InternalSchema oldSchema, Boolean supportPositionReorder) {
+    InternalSchema evolvedInternalSchema = 
AvroInternalSchemaConverter.convert(evolvedSchema);
+    // do check, only support add column evolution
+    List<String> colNamesFromEvolved = 
evolvedInternalSchema.getAllColsFullName();
+    List<String> colNamesFromOldSchema = oldSchema.getAllColsFullName();
+    List<String> diffFromOldSchema = colNamesFromOldSchema.stream().filter(f 
-> !colNamesFromEvolved.contains(f)).collect(Collectors.toList());
+    List<Types.Field> newFields = new ArrayList<>();
+    if (colNamesFromEvolved.size() == colNamesFromOldSchema.size() && 
diffFromOldSchema.size() == 0) {
+      // no changes happen
+      if (supportPositionReorder) {
+        evolvedInternalSchema.getRecord().fields().forEach(f -> 
newFields.add(oldSchema.getRecord().field(f.name())));
+        return new InternalSchema(newFields);
+      }
+      return oldSchema;
+    }
+    // try to find all added columns
+    if (diffFromOldSchema.size() != 0) {
+      throw new UnsupportedOperationException("cannot evolution schema 
implicitly, find delete/rename operation");
+    }
+
+    List<String> diffFromEvolutionSchema = 
colNamesFromEvolved.stream().filter(f -> 
!colNamesFromOldSchema.contains(f)).collect(Collectors.toList());
+    // Remove redundancy from diffFromEvolutionSchema.
+    // for example, now we add a struct col in evolvedSchema, the struct col 
is " user struct<name:string, age:int> "
+    // when we do diff operation: user, user.name, user.age will appeared in 
the resultSet which is redundancy, user.name and user.age should be excluded.
+    // deal with add operation
+    TreeMap<Integer, String> finalAddAction = new TreeMap<>();
+    for (int i = 0; i < diffFromEvolutionSchema.size(); i++)  {
+      String name = diffFromEvolutionSchema.get(i);
+      int splitPoint = name.lastIndexOf(".");
+      String parentName = splitPoint > 0 ? name.substring(0, splitPoint) : "";
+      if (!parentName.isEmpty() && 
diffFromEvolutionSchema.contains(parentName)) {
+        // find redundancy, skip it
+        continue;
+      }
+      finalAddAction.put(evolvedInternalSchema.findIdByName(name), name);
+    }
+
+    TableChanges.ColumnAddChange addChange = 
TableChanges.ColumnAddChange.get(oldSchema);
+    finalAddAction.entrySet().stream().forEach(f -> {
+      String name = f.getValue();
+      int splitPoint = name.lastIndexOf(".");
+      String parentName = splitPoint > 0 ? name.substring(0, splitPoint) : "";
+      String rawName = splitPoint > 0 ? name.substring(splitPoint + 1) : name;
+      addChange.addColumns(parentName, rawName, 
evolvedInternalSchema.findType(name), null);
+    });
+
+    InternalSchema res = SchemaChangeUtils.applyTableChanges2Schema(oldSchema, 
addChange);
+    if (supportPositionReorder) {
+      evolvedInternalSchema.getRecord().fields().forEach(f -> 
newFields.add(oldSchema.getRecord().field(f.name())));
+      return new InternalSchema(newFields);
+    } else {
+      return res;
+    }
+  }
+
+  public static InternalSchema evolutionSchemaFromNewAvroSchema(Schema 
evolvedSchema, InternalSchema oldSchema) {

Review comment:
       rename this and above method 
   evolutionSchemaFromNewAvroSchema to evolveSchemaFromNewAvroSchema




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to