bvaradar commented on a change in pull request #3668:
URL: https://github.com/apache/hudi/pull/3668#discussion_r713621765



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
##########
@@ -493,4 +496,83 @@ public static MessageType readSchemaFromLogFile(FileSystem 
fs, Path path) throws
     }
     return null;
   }
+
+  /**
+   * Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata 
of the instant.
+   *
+   * @return InternalSchema for this table
+   */
+  public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata() {
+    HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+    if (timeline.lastInstant().isPresent()) {
+      return 
getTableInternalSchemaFromCommitMetadata(timeline.lastInstant().get());
+    } else {
+      return Option.empty();
+    }
+  }
+
+  /**
+   * Gets the InternalSchema for a hoodie table from the HoodieCommitMetadata 
of the instant.
+   *
+   * @return InternalSchema for this table
+   */
+  private Option<InternalSchema> 
getTableInternalSchemaFromCommitMetadata(HoodieInstant instant) {
+    try {
+      HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+      byte[] data = timeline.getInstantDetails(instant).get();

Review comment:
       instead of timeline, use metadata.getActiveTimeline()

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -213,6 +224,27 @@ protected void commit(HoodieTable table, String 
commitActionType, String instant
                       List<HoodieWriteStat> stats) throws IOException {
     LOG.info("Committing " + instantTime + " action " + commitActionType);
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    // do save internal schema to support Implicitly add columns in write 
process
+    if 
(!metadata.getExtraMetadata().containsKey(InternalSchemaParser.LATESTSCHEMA) && 
metadata.getExtraMetadata().containsKey(SCHEMA_KEY)) {

Review comment:
       when will this case happen ? SCHEMA_KEY is passed but not the 
LATESTSCHEMA ?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSchemaUtils.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.ArrayType$;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.BooleanType$;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.CharType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DecimalType$;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.DoubleType$;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.FloatType$;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.IntegerType$;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.MapType$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StringType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.sql.types.TimestampType$;
+import org.apache.spark.sql.types.UserDefinedType;
+import org.apache.spark.sql.types.VarcharType;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class SparkSchemaUtils {
+  private SparkSchemaUtils() {
+
+  }
+
+  public static final String HOODIE_QUERY_SCHEMA = "hadoop.hoodie.querySchema";
+  public static final String HOODIE_TABLE_PATH = "hadoop.hoodie.tablePath";
+
+  /**
+   * Converts a spark schema to an hudi internal schema. Fields without IDs 
are kept and assigned fallback IDs.
+   *
+   * @param sparkSchema a spark schema
+   * @return a matching internal schema for the provided spark schema
+   */
+  public static InternalSchema convertStructTypeToInternalSchema(StructType 
sparkSchema) {
+    Type newType = buildTypeFromStructType(sparkSchema, true, new 
AtomicInteger(0));
+    return new InternalSchema(((Types.RecordType)newType).fields());
+  }
+
+  public static Type buildTypeFromStructType(DataType sparkType, Boolean 
firstVisitRoot, AtomicInteger nextId) {

Review comment:
       why AtomicInteger ? Is this method expected to be called concurrently ? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
##########
@@ -152,7 +160,7 @@ public void scan() {
       // iterate over the paths
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
           logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
-          readerSchema, readBlocksLazily, reverseReader, bufferSize);
+          internalSchema != null ? null : readerSchema, readBlocksLazily, 
reverseReader, bufferSize);

Review comment:
       not following this. why this is set null when internalSchema is not null 
?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChanges.java
##########
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class TableChanges {

Review comment:
       This class contains a mix of helper methods and derived classes of 
TableChange. Lets rename this class as TableSchemaHelper. 
   
   There are bunch of helper classes across. Will need to rethink alignment to 
see if we can make readability better.  Will focus on it in the next pass.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/MergeSchemaAction.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * auxiliary class.
+ * help to merge file schema and query schema to produce final read schema for 
avro/parquet file
+ */
+public class MergeSchemaAction {

Review comment:
       nit: Rename to SchemaMerger ?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -1068,4 +1076,150 @@ public void close() {
     this.heartbeatClient.stop();
     this.txnManager.close();
   }
+
+  /**
+   * add columns to table.
+   *
+   * @param colName col name to be added. if we want to add col to a nested 
filed, the fullName should be specify
+   * @param schema col type to be added.
+   * @param doc col doc to be added.
+   * @param position col position to be added
+   * @param positionType col position change type. now support three change 
types: first/after/before
+   */
+  public void addColumns(String colName, Schema schema, String doc, String 
position, String positionType) {
+    Pair<InternalSchema, HoodieTableMetaClient> pair = 
getInternalSchemaAndMetaClient();
+    TableChanges.ColumnAddChange add = 
TableChanges.ColumnAddChange.get(pair.getLeft());
+    String parentName = TableChanges.getParentName(colName);
+    add.addColumns(parentName, colName, AvroSchemaUtil.convertToField(schema), 
doc);
+    if (position != null && !position.isEmpty() && positionType != null && 
!position.isEmpty()) {

Review comment:
       Also, can you please add some comments on the if blocks as there are 
quite a few of them. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -213,6 +224,27 @@ protected void commit(HoodieTable table, String 
commitActionType, String instant
                       List<HoodieWriteStat> stats) throws IOException {
     LOG.info("Committing " + instantTime + " action " + commitActionType);
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    // do save internal schema to support Implicitly add columns in write 
process

Review comment:
       We need to introduce a config to enable/disable schema evolution. Only 
if the config is enables, we should let any side-effects of schema evolution to 
take effect.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -213,6 +224,27 @@ protected void commit(HoodieTable table, String 
commitActionType, String instant
                       List<HoodieWriteStat> stats) throws IOException {
     LOG.info("Committing " + instantTime + " action " + commitActionType);
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    // do save internal schema to support Implicitly add columns in write 
process

Review comment:
       Similar logic needs to be also be present in BaseCommitExecutor.commit. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -1068,4 +1100,150 @@ public void close() {
     this.heartbeatClient.stop();
     this.txnManager.close();
   }
+
+  /**
+   * add columns to table.
+   *
+   * @param colName col name to be added. if we want to add col to a nested 
filed, the fullName should be specify
+   * @param schema col type to be added.
+   * @param doc col doc to be added.
+   * @param position col position to be added
+   * @param positionType col position change type. now support three change 
types: first/after/before
+   */
+  public void addColumn(String colName, Schema schema, String doc, String 
position, String positionType) {
+    Pair<InternalSchema, HoodieTableMetaClient> pair = 
getInternalSchemaAndMetaClient();
+    TableChanges.ColumnAddChange add = 
TableChanges.ColumnAddChange.get(pair.getLeft());
+    String parentName = TableChanges.getParentName(colName);
+    add.addColumns(parentName, colName, AvroSchemaUtil.convertToField(schema), 
doc);
+    if (position != null && !position.isEmpty() && positionType != null && 
!position.isEmpty()) {
+      String referParentName = TableChanges.getParentName(position);
+      if (!parentName.equals(referParentName)) {
+        throw new IllegalArgumentException("cannot reorder two columns which 
has different parent");
+      }
+      add.addPositionChange(colName, position, positionType);
+    } else if (positionType != null && positionType.equals("first")) {
+      add.addPositionChange(colName, "", "first");
+    }
+    InternalSchema newSchema = 
SchemaChangeUtils.applyTableChanges2Schema(pair.getLeft(), add);
+    commitTableChange(newSchema, pair.getRight());
+  }
+
+  public void addColumn(String colName, Schema schema) {
+    addColumn(colName, schema, null, null, null);
+  }
+
+  /**
+   * delete columns to table.
+   *
+   * @param colName col name to be deleted. if we want to delete col from a 
nested filed, the fullName should be specify
+   */
+  public void deleteColumns(List<String> colName) {
+    Pair<InternalSchema, HoodieTableMetaClient> pair = 
getInternalSchemaAndMetaClient();
+    TableChanges.ColumnDeleteChange delete = 
TableChanges.ColumnDeleteChange.get(pair.getLeft());
+    colName.stream().forEach(col -> delete.deleteColumn(col));
+    InternalSchema newSchema = 
SchemaChangeUtils.applyTableChanges2Schema(pair.getLeft(), delete);
+    commitTableChange(newSchema, pair.getRight());
+  }
+
+  /**
+   * rename col name for hudi table.
+   *
+   * @param colName col name to be renamed. if we want to rename col from a 
nested filed, the fullName should be specify
+   * @param newName new name for current col. no need to specify fullName.
+   */
+  public void renameColumn(String colName, String newName) {
+    Pair<InternalSchema, HoodieTableMetaClient> pair = 
getInternalSchemaAndMetaClient();
+    TableChanges.ColumnUpdateChange updateChange = 
TableChanges.ColumnUpdateChange.get(pair.getLeft());
+    updateChange.renameColumn(colName, newName);
+    InternalSchema newSchema = 
SchemaChangeUtils.applyTableChanges2Schema(pair.getLeft(), updateChange);
+    commitTableChange(newSchema, pair.getRight());
+  }
+
+  /**
+   * update col nullable attribute for hudi table.
+   *
+   * @param colName col name to be changed. if we want to change col from a 
nested filed, the fullName should be specify
+   * @param nullable .
+   */
+  public void updateColumnNullability(String colName, boolean nullable) {
+    Pair<InternalSchema, HoodieTableMetaClient> pair = 
getInternalSchemaAndMetaClient();
+    TableChanges.ColumnUpdateChange updateChange = 
TableChanges.ColumnUpdateChange.get(pair.getLeft());
+    updateChange.updateColumnNullability(colName, nullable);
+    InternalSchema newSchema = 
SchemaChangeUtils.applyTableChanges2Schema(pair.getLeft(), updateChange);
+    commitTableChange(newSchema, pair.getRight());
+  }
+
+  /**
+   * update col comment for hudi table.
+   *
+   * @param colName col name to be changed. if we want to change col from a 
nested filed, the fullName should be specify
+   * @param doc .
+   */
+  public void updateColumnComment(String colName, String doc) {
+    Pair<InternalSchema, HoodieTableMetaClient> pair = 
getInternalSchemaAndMetaClient();
+    TableChanges.ColumnUpdateChange updateChange = 
TableChanges.ColumnUpdateChange.get(pair.getLeft());
+    updateChange.updateColumnComment(colName, doc);
+    InternalSchema newSchema = 
SchemaChangeUtils.applyTableChanges2Schema(pair.getLeft(), updateChange);
+    commitTableChange(newSchema, pair.getRight());
+  }
+
+  /**
+   * reorder the position of col.
+   *
+   * @param colName column which need to be reordered. if we want to change 
col from a nested filed, the fullName should be specify.
+   * @param referColName reference position.
+   * @param orderType col position change type. now support three change 
types: first/after/before
+   */
+  public void reOrderColPosition(String colName, String referColName, String 
orderType) {
+    if (colName == null || orderType == null || referColName == null) {

Review comment:
       For all the schema manipulation APIs in this class, can you move the 
schema changing logic to separate helper class and this class can simply 
delegate, getback the schema(s) to be persisted and commits.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSchemaUtils.java
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.ArrayType$;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.BooleanType$;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.CharType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DecimalType$;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.DoubleType$;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.FloatType$;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.IntegerType$;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.MapType$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StringType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.sql.types.TimestampType$;
+import org.apache.spark.sql.types.UserDefinedType;
+import org.apache.spark.sql.types.VarcharType;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class SparkSchemaUtils {
+  private SparkSchemaUtils() {
+
+  }
+
+  public static final String HOODIE_QUERY_SCHEMA = "hadoop.hoodie.querySchema";

Review comment:
       what are the purpose of this config and the below one ?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -1068,4 +1100,150 @@ public void close() {
     this.heartbeatClient.stop();
     this.txnManager.close();
   }
+
+  /**
+   * add columns to table.
+   *
+   * @param colName col name to be added. if we want to add col to a nested 
filed, the fullName should be specify
+   * @param schema col type to be added.
+   * @param doc col doc to be added.
+   * @param position col position to be added
+   * @param positionType col position change type. now support three change 
types: first/after/before
+   */
+  public void addColumn(String colName, Schema schema, String doc, String 
position, String positionType) {
+    Pair<InternalSchema, HoodieTableMetaClient> pair = 
getInternalSchemaAndMetaClient();
+    TableChanges.ColumnAddChange add = 
TableChanges.ColumnAddChange.get(pair.getLeft());

Review comment:
       rename add to addCol

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -1068,4 +1100,150 @@ public void close() {
     this.heartbeatClient.stop();
     this.txnManager.close();
   }
+
+  /**
+   * add columns to table.
+   *
+   * @param colName col name to be added. if we want to add col to a nested 
filed, the fullName should be specify
+   * @param schema col type to be added.
+   * @param doc col doc to be added.
+   * @param position col position to be added
+   * @param positionType col position change type. now support three change 
types: first/after/before

Review comment:
        positionChangeType should be enum 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -1068,4 +1100,150 @@ public void close() {
     this.heartbeatClient.stop();
     this.txnManager.close();
   }
+
+  /**
+   * add columns to table.
+   *
+   * @param colName col name to be added. if we want to add col to a nested 
filed, the fullName should be specify
+   * @param schema col type to be added.
+   * @param doc col doc to be added.
+   * @param position col position to be added
+   * @param positionType col position change type. now support three change 
types: first/after/before
+   */
+  public void addColumn(String colName, Schema schema, String doc, String 
position, String positionType) {
+    Pair<InternalSchema, HoodieTableMetaClient> pair = 
getInternalSchemaAndMetaClient();
+    TableChanges.ColumnAddChange add = 
TableChanges.ColumnAddChange.get(pair.getLeft());
+    String parentName = TableChanges.getParentName(colName);
+    add.addColumns(parentName, colName, AvroSchemaUtil.convertToField(schema), 
doc);
+    if (position != null && !position.isEmpty() && positionType != null && 
!position.isEmpty()) {
+      String referParentName = TableChanges.getParentName(position);
+      if (!parentName.equals(referParentName)) {
+        throw new IllegalArgumentException("cannot reorder two columns which 
has different parent");
+      }
+      add.addPositionChange(colName, position, positionType);
+    } else if (positionType != null && positionType.equals("first")) {
+      add.addPositionChange(colName, "", "first");
+    }
+    InternalSchema newSchema = 
SchemaChangeUtils.applyTableChanges2Schema(pair.getLeft(), add);
+    commitTableChange(newSchema, pair.getRight());
+  }
+
+  public void addColumn(String colName, Schema schema) {
+    addColumn(colName, schema, null, null, null);
+  }
+
+  /**
+   * delete columns to table.
+   *
+   * @param colName col name to be deleted. if we want to delete col from a 
nested filed, the fullName should be specify
+   */
+  public void deleteColumns(List<String> colName) {
+    Pair<InternalSchema, HoodieTableMetaClient> pair = 
getInternalSchemaAndMetaClient();
+    TableChanges.ColumnDeleteChange delete = 
TableChanges.ColumnDeleteChange.get(pair.getLeft());
+    colName.stream().forEach(col -> delete.deleteColumn(col));
+    InternalSchema newSchema = 
SchemaChangeUtils.applyTableChanges2Schema(pair.getLeft(), delete);
+    commitTableChange(newSchema, pair.getRight());
+  }
+
+  /**
+   * rename col name for hudi table.
+   *
+   * @param colName col name to be renamed. if we want to rename col from a 
nested filed, the fullName should be specify
+   * @param newName new name for current col. no need to specify fullName.
+   */
+  public void renameColumn(String colName, String newName) {
+    Pair<InternalSchema, HoodieTableMetaClient> pair = 
getInternalSchemaAndMetaClient();
+    TableChanges.ColumnUpdateChange updateChange = 
TableChanges.ColumnUpdateChange.get(pair.getLeft());
+    updateChange.renameColumn(colName, newName);
+    InternalSchema newSchema = 
SchemaChangeUtils.applyTableChanges2Schema(pair.getLeft(), updateChange);
+    commitTableChange(newSchema, pair.getRight());
+  }
+
+  /**
+   * update col nullable attribute for hudi table.
+   *
+   * @param colName col name to be changed. if we want to change col from a 
nested filed, the fullName should be specify
+   * @param nullable .
+   */
+  public void updateColumnNullability(String colName, boolean nullable) {
+    Pair<InternalSchema, HoodieTableMetaClient> pair = 
getInternalSchemaAndMetaClient();
+    TableChanges.ColumnUpdateChange updateChange = 
TableChanges.ColumnUpdateChange.get(pair.getLeft());
+    updateChange.updateColumnNullability(colName, nullable);
+    InternalSchema newSchema = 
SchemaChangeUtils.applyTableChanges2Schema(pair.getLeft(), updateChange);

Review comment:
       nit: applyTableChanges2Schema to simply apply ?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -213,6 +224,27 @@ protected void commit(HoodieTable table, String 
commitActionType, String instant
                       List<HoodieWriteStat> stats) throws IOException {
     LOG.info("Committing " + instantTime + " action " + commitActionType);
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
+    // do save internal schema to support Implicitly add columns in write 
process
+    if 
(!metadata.getExtraMetadata().containsKey(InternalSchemaParser.LATESTSCHEMA) && 
metadata.getExtraMetadata().containsKey(SCHEMA_KEY)) {
+      TableSchemaResolver schemaUtils = new 
TableSchemaResolver(table.getMetaClient());
+      String historySchemaStr = 
schemaUtils.getTableHistorySchemaStrFromCommitMetadata().orElse("");
+      if (!historySchemaStr.isEmpty()) {

Review comment:
       High level : Instead of storing historical schema in commit file, its 
better to store them as separate files under a new directory under .hoodie. 
Something like .hoodie/schema/...
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/TableInternalSchemaUtils.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.InternalSchemaParser;
+
+import java.util.TreeMap;
+
+/**
+ * helper class.
+ * use static cache to cache historySchema info to prevent fetching from hdfs 
every time.
+ * all task belong to the same executor/container will share this cache.
+ *
+ */
+public class TableInternalSchemaUtils {
+  private static final Cache<String, TreeMap<Long, InternalSchema>>

Review comment:
       a static cache is problematic. Query engines will reuse the same JVM 
instance for running different queries. Lets avoid this pattern. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
##########
@@ -38,7 +38,7 @@
   private HoodieUnMergedLogRecordScanner(FileSystem fs, String basePath, 
List<String> logFilePaths, Schema readerSchema,
                                          String latestInstantTime, boolean 
readBlocksLazily, boolean reverseReader, int bufferSize,
                                          LogRecordScannerCallback callback, 
Option<InstantRange> instantRange) {
-    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
readBlocksLazily, reverseReader, bufferSize, instantRange, false);
+    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
readBlocksLazily, reverseReader, bufferSize, instantRange, false, null);

Review comment:
       Is this a todo ?  Shouldn't we be passing internalSchema ?

##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
##########
@@ -87,4 +88,9 @@ trait SparkAdapter extends Serializable {
    * Create Like expression.
    */
   def createLike(left: Expression, right: Expression): Expression
+
+  /**
+    * Create parquet file read for hudi.
+    */
+  def createHoodieParquetFileFormat(): ParquetFileFormat

Review comment:
       @xiarixiaoyao : It would be helpful to review if you can list the gaps 
in the current PR before reaching full spark support 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
##########
@@ -0,0 +1,779 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.Types.Field;
+import org.apache.hudi.internal.schema.Types.RecordType;
+import org.apache.hudi.internal.schema.action.MergeSchemaAction;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class InternalSchemaUtils {
+
+  private InternalSchemaUtils() {
+  }
+
+  /**
+   * build a mapping from id to full field name for a internal Type.
+   * if a field y belong to a struct filed x, then the full name of y is x.y
+   *
+   * @param type hoodie internal type
+   * @return a mapping from id to full field name
+   */
+  public static Map<Integer, String> buildIdToName(Type type) {
+    Map<Integer, String> result = new HashMap<>();
+    buildNameToId(type).forEach((k, v) -> result.put(v, k));
+    return result;
+  }
+
+  /**
+   * build a mapping from full field name to id for a internal Type.
+   * if a field y belong to a struct filed x, then the full name of y is x.y
+   *
+   * @param type hoodie internal type
+   * @return a mapping from full field name to id
+   */
+  public static Map<String, Integer> buildNameToId(Type type) {
+    Deque<String> fieldNames = new LinkedList<>();
+    Map<String, Integer> nameToId = new HashMap<>();
+    visitNameToId(type, fieldNames, nameToId);
+    return nameToId;
+  }
+
+  /**
+   * build a mapping from id to field for a internal Type.
+   *
+   * @param type hoodie internal type
+   * @return a mapping from id to field
+   */
+  public static Map<Integer, Field> buildIdToField(Type type) {
+    Map<Integer, Field> idToField = new HashMap<>();
+    visitIdToField(type, idToField);
+    return idToField;
+  }
+
+  private static void visitIdToField(Type type, Map<Integer, Field> index) {
+    switch (type.typeId()) {
+      case RECORD:
+        RecordType record = (RecordType) type;
+        for (Field field : record.fields()) {
+          visitIdToField(field.type(), index);
+          index.put(field.fieldId(), field);
+        }
+        return;
+      case ARRAY:
+        Types.ArrayType array = (Types.ArrayType) type;
+        visitIdToField(array.elementType(), index);
+        for (Field field : array.fields()) {
+          index.put(field.fieldId(), field);
+        }
+        return;
+      case MAP:
+        Types.MapType map = (Types.MapType) type;
+        visitIdToField(map.keyType(), index);
+        visitIdToField(map.valueType(), index);
+        for (Field field : map.fields()) {
+          index.put(field.fieldId(), field);
+        }
+        return;
+      default:
+        return;
+    }
+  }
+
+  private static void visitNameToId(Type type, Deque<String> fieldNames, 
Map<String, Integer> nameToId) {

Review comment:
       Consider using Visitor design pattern using classes instead of static 
methods.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChange.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * TableChange subclasses represent requested changes to a table.
+ * now only column changes support.
+ * to do support partition changes
+ */
+public interface TableChange {
+  /* The action Type of schema change. */
+  enum ColumnChangeID {
+    ADD, UPDATE, DELETE, PROPERTY_CHANGE;
+    private String name;
+
+    private ColumnChangeID() {
+      this.name = this.name().toLowerCase(Locale.ROOT);
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+
+  static ColumnChangeID fromValue(String value) {
+    switch (value.toLowerCase(Locale.ROOT)) {
+      case "add":
+        return ColumnChangeID.ADD;
+      case "change":
+        return ColumnChangeID.UPDATE;
+      case "delete":
+        return ColumnChangeID.DELETE;
+      case "property":
+        return ColumnChangeID.PROPERTY_CHANGE;
+      default:
+        throw new IllegalArgumentException("Invalid value of Type.");
+    }
+  }
+
+  ColumnChangeID columnChangeId();
+
+  default boolean withPositionChange() {
+    return false;
+  }
+
+  abstract class BaseColumnChange implements TableChange {
+    protected final InternalSchema internalSchema;
+    protected final Map<Integer, Integer> id2parent;
+    protected final Map<Integer, ArrayList<ColumnPositionChange>> 
positionChangeMap = new HashMap<>();
+
+    BaseColumnChange(InternalSchema schema) {
+      this.internalSchema = schema;
+      this.id2parent = InternalSchemaUtils.index2Parents(schema.getRecord());
+    }
+
+    /**
+     * add position change.
+     *
+     * @param srcName column which need to be reordered
+     * @param dsrName reference position
+     * @param orderType change types
+     * @return this
+     */
+    public BaseColumnChange addPositionChange(String srcName, String dsrName, 
String orderType) {
+      Integer srcId = findIdByFullName(srcName);
+      Integer dsrId = findIdByFullName(dsrName);
+      Integer srcParentId = id2parent.get(srcId);
+      Integer dsrParentId = id2parent.get(dsrId);
+      int parentId;
+      if (srcParentId != null && dsrParentId != null && 
srcParentId.equals(dsrParentId)) {
+        Types.Field parentField = internalSchema.findField(srcParentId);
+        if (!(parentField.type() instanceof Types.RecordType)) {
+          throw new HoodieSchemaException(String.format("only support reorder 
fields in struct type, but find: %s", parentField.type()));
+        }
+        parentId = parentField.fieldId();
+      } else if (srcParentId == null &&  dsrParentId == null) {
+        parentId = -1;
+      } else if (srcParentId != null && dsrParentId == null && 
orderType.equals("first")) {
+        parentId = srcParentId;
+      } else {
+        throw new HoodieSchemaException("cannot order position from different 
parent");
+      }
+
+      ArrayList<ColumnPositionChange> changes = 
positionChangeMap.getOrDefault(parentId, new ArrayList<>());
+      changes.add(ColumnPositionChange.get(srcId, dsrId, orderType));
+      positionChangeMap.put(parentId, changes);
+      return this;
+    }
+
+    /**
+     * abstract method.
+     * give a column fullName and return the field id
+     *
+     * @param fullName column fullName
+     * @return field id of current column
+     */
+    public abstract Integer findIdByFullName(String fullName);

Review comment:
       Should this be protected ? From the description and name, this method 
should be part of the InternalSchema itself and not as part of ColumnChange.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
##########
@@ -45,7 +45,7 @@ private HoodieMetadataMergedLogRecordScanner(FileSystem fs, 
String basePath, Lis
                                               String spillableMapBasePath, 
Set<String> mergeKeyFilter,
                                                
ExternalSpillableMap.DiskMapType diskMapType, boolean 
isBitCaskDiskMapCompressionEnabled) {
     super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
maxMemorySizeInBytes, false, false, bufferSize,
-        spillableMapBasePath, Option.empty(), false, diskMapType, 
isBitCaskDiskMapCompressionEnabled, false);
+        spillableMapBasePath, Option.empty(), false, diskMapType, 
isBitCaskDiskMapCompressionEnabled, false, null);

Review comment:
       Same comment as the one in HoodieUnMergedLogRecordScanner

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/MergeSchemaAction.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * auxiliary class.
+ * help to merge file schema and query schema to produce final read schema for 
avro/parquet file
+ */
+public class MergeSchemaAction {
+  private final InternalSchema fileSchema;
+  private final InternalSchema querySchema;
+  private final Boolean mergeRequiredFiledForce;
+  public MergeSchemaAction(InternalSchema fileSchema, InternalSchema 
querySchema, Boolean mergeRequiredFiledForce) {
+    this.fileSchema = fileSchema;
+    this.querySchema = querySchema;
+    this.mergeRequiredFiledForce = mergeRequiredFiledForce;
+  }
+
+  public List<Types.Field> buildNewFields(List<Types.Field> oldFields, 
List<Type> newTypes) {

Review comment:
       Looks like this called for Record types, can you rename the methods to 
convey full meaning. It would be great if you could look at the naming across 
the code changes to make sure they are consistent. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaParser.java
##########
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.util.TokenBuffer;
+
+import org.apache.avro.JsonProperties;
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class InternalSchemaParser {
+  private InternalSchemaParser() {
+
+  }
+
+  public static final String LATESTSCHEMA = "latestSchema";
+  public static final String SCHEMAS = "schemas";
+  private static final String MAX_COLUMN_ID = "max_column_id";
+  private static final String VERSION_ID = "version-id";
+  private static final String TYPE = "type";
+  private static final String RECORD = "record";
+  private static final String ARRAY = "array";
+  private static final String MAP = "map";
+  private static final String FIELDS = "fields";
+  private static final String ELEMENT = "element";
+  private static final String KEY = "key";
+  private static final String VALUE = "value";
+  private static final String DOC = "doc";
+  private static final String NAME = "name";
+  private static final String ID = "id";
+  private static final String ELEMENT_ID = "element-id";
+  private static final String KEY_ID = "key-id";
+  private static final String VALUE_ID = "value-id";
+  private static final String OPTIONAL = "optional";
+  private static final String ELEMENT_OPTIONAL = "element_optional";
+  private static final String VALUE_OPTIONAL = "value_optional";
+
+  private static final Pattern FIXED = Pattern.compile("fixed\\[(\\d+)\\]");
+  private static final Pattern DECIMAL = 
Pattern.compile("decimal\\((\\d+),\\s+(\\d+)\\)");
+
+  public static JsonNode toJsonNode(Object datum) {

Review comment:
       Add docs to all public APIs

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChange.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * TableChange subclasses represent requested changes to a table.
+ * now only column changes support.
+ * to do support partition changes
+ */
+public interface TableChange {
+  /* The action Type of schema change. */
+  enum ColumnChangeID {
+    ADD, UPDATE, DELETE, PROPERTY_CHANGE;
+    private String name;
+
+    private ColumnChangeID() {
+      this.name = this.name().toLowerCase(Locale.ROOT);
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+
+  static ColumnChangeID fromValue(String value) {
+    switch (value.toLowerCase(Locale.ROOT)) {
+      case "add":
+        return ColumnChangeID.ADD;
+      case "change":
+        return ColumnChangeID.UPDATE;
+      case "delete":
+        return ColumnChangeID.DELETE;
+      case "property":
+        return ColumnChangeID.PROPERTY_CHANGE;
+      default:
+        throw new IllegalArgumentException("Invalid value of Type.");
+    }
+  }
+
+  ColumnChangeID columnChangeId();
+
+  default boolean withPositionChange() {
+    return false;
+  }
+
+  abstract class BaseColumnChange implements TableChange {
+    protected final InternalSchema internalSchema;
+    protected final Map<Integer, Integer> id2parent;
+    protected final Map<Integer, ArrayList<ColumnPositionChange>> 
positionChangeMap = new HashMap<>();
+
+    BaseColumnChange(InternalSchema schema) {
+      this.internalSchema = schema;
+      this.id2parent = InternalSchemaUtils.index2Parents(schema.getRecord());
+    }
+
+    /**
+     * add position change.
+     *
+     * @param srcName column which need to be reordered
+     * @param dsrName reference position
+     * @param orderType change types
+     * @return this
+     */
+    public BaseColumnChange addPositionChange(String srcName, String dsrName, 
String orderType) {
+      Integer srcId = findIdByFullName(srcName);
+      Integer dsrId = findIdByFullName(dsrName);
+      Integer srcParentId = id2parent.get(srcId);
+      Integer dsrParentId = id2parent.get(dsrId);
+      int parentId;
+      if (srcParentId != null && dsrParentId != null && 
srcParentId.equals(dsrParentId)) {
+        Types.Field parentField = internalSchema.findField(srcParentId);
+        if (!(parentField.type() instanceof Types.RecordType)) {
+          throw new HoodieSchemaException(String.format("only support reorder 
fields in struct type, but find: %s", parentField.type()));
+        }
+        parentId = parentField.fieldId();
+      } else if (srcParentId == null &&  dsrParentId == null) {
+        parentId = -1;
+      } else if (srcParentId != null && dsrParentId == null && 
orderType.equals("first")) {
+        parentId = srcParentId;
+      } else {
+        throw new HoodieSchemaException("cannot order position from different 
parent");
+      }
+
+      ArrayList<ColumnPositionChange> changes = 
positionChangeMap.getOrDefault(parentId, new ArrayList<>());
+      changes.add(ColumnPositionChange.get(srcId, dsrId, orderType));
+      positionChangeMap.put(parentId, changes);
+      return this;
+    }
+
+    /**
+     * abstract method.
+     * give a column fullName and return the field id
+     *
+     * @param fullName column fullName
+     * @return field id of current column
+     */
+    public abstract Integer findIdByFullName(String fullName);
+
+    @Override
+    public boolean withPositionChange() {
+      return false;
+    }
+  }
+
+  /**
+   * Column position change.
+   * now support three change types: FIRST/AFTER/BEFORE
+   * FIRST means the specified column should be the first column.
+   * AFTER means the specified column should be put after the given column.
+   * BEFORE means the specified column should be put before the given column.
+   * Note that, the specified column may be a nested field:
+   * AFTER/BEFORE means the given columns should in the same struct;
+   * FIRST means this field should be the first one within the struct.
+   */
+  class ColumnPositionChange {
+    public enum ColumnPositionType {

Review comment:
       these enums must be reused in AbstractHoodieWriteClient

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -52,26 +54,33 @@ case class HoodieMergeOnReadTableState(tableStructSchema: 
StructType,
                                        requiredAvroSchema: String,
                                        hoodieRealtimeFileSplits: 
List[HoodieMergeOnReadFileSplit],
                                        preCombineField: Option[String],
-                                       recordKeyFieldOpt: Option[String])
+                                       recordKeyFieldOpt: Option[String],
+                                       internalSchema: Option[InternalSchema] 
= None,
+                                       requiredInternalSchema: 
Option[InternalSchema] = None)
 
 class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
                                   val optParams: Map[String, String],
                                   val userSchema: StructType,
                                   val globPaths: Option[Seq[Path]],
                                   val metaClient: HoodieTableMetaClient)
-  extends BaseRelation with PrunedFilteredScan with Logging {
+  extends BaseRelation with PrunedFilteredScan with Logging with 
SparkAdapterSupport {
 
   private val conf = sqlContext.sparkContext.hadoopConfiguration
   private val jobConf = new JobConf(conf)
   // use schema from latest metadata, if not present, read schema from the 
data file
   private val schemaUtil = new TableSchemaResolver(metaClient)
-  private lazy val tableAvroSchema = {
+  private lazy val (tableAvroSchema, internalSchema) = {
     try {
-      schemaUtil.getTableAvroSchema
+      val internalSchemaOpt = 
schemaUtil.getTableInternalSchemaFromCommitMetadata
+      if (internalSchemaOpt.isPresent) {

Review comment:
       nit: You can simplify with something like 
   schemaUtil.getTableAvroSchema, internalSchemaOpt.orElse(null)

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaUtil.java
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import static org.apache.avro.Schema.Type.UNION;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class AvroSchemaUtil {

Review comment:
       For the many conversions we are doing Avro, InternalSchema, Spark, 
Parquet... , instead of implementing these methods in helper/utils class, let 
rename the classes as "xxxxConverter" and make sure only the conversion 
functions are defined.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/MergeSchemaAction.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * auxiliary class.
+ * help to merge file schema and query schema to produce final read schema for 
avro/parquet file
+ */
+public class MergeSchemaAction {
+  private final InternalSchema fileSchema;
+  private final InternalSchema querySchema;
+  private final Boolean mergeRequiredFiledForce;

Review comment:
       type. Can you add  comments when this flag is turned on ? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/TableChange.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * TableChange subclasses represent requested changes to a table.
+ * now only column changes support.
+ * to do support partition changes
+ */
+public interface TableChange {
+  /* The action Type of schema change. */
+  enum ColumnChangeID {
+    ADD, UPDATE, DELETE, PROPERTY_CHANGE;
+    private String name;
+
+    private ColumnChangeID() {
+      this.name = this.name().toLowerCase(Locale.ROOT);
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+
+  static ColumnChangeID fromValue(String value) {
+    switch (value.toLowerCase(Locale.ROOT)) {
+      case "add":
+        return ColumnChangeID.ADD;
+      case "change":
+        return ColumnChangeID.UPDATE;
+      case "delete":
+        return ColumnChangeID.DELETE;
+      case "property":
+        return ColumnChangeID.PROPERTY_CHANGE;
+      default:
+        throw new IllegalArgumentException("Invalid value of Type.");
+    }
+  }
+
+  ColumnChangeID columnChangeId();
+
+  default boolean withPositionChange() {
+    return false;
+  }
+
+  abstract class BaseColumnChange implements TableChange {
+    protected final InternalSchema internalSchema;
+    protected final Map<Integer, Integer> id2parent;
+    protected final Map<Integer, ArrayList<ColumnPositionChange>> 
positionChangeMap = new HashMap<>();
+
+    BaseColumnChange(InternalSchema schema) {
+      this.internalSchema = schema;
+      this.id2parent = InternalSchemaUtils.index2Parents(schema.getRecord());
+    }
+
+    /**
+     * add position change.
+     *
+     * @param srcName column which need to be reordered
+     * @param dsrName reference position
+     * @param orderType change types
+     * @return this
+     */
+    public BaseColumnChange addPositionChange(String srcName, String dsrName, 
String orderType) {
+      Integer srcId = findIdByFullName(srcName);
+      Integer dsrId = findIdByFullName(dsrName);
+      Integer srcParentId = id2parent.get(srcId);
+      Integer dsrParentId = id2parent.get(dsrId);
+      int parentId;
+      if (srcParentId != null && dsrParentId != null && 
srcParentId.equals(dsrParentId)) {
+        Types.Field parentField = internalSchema.findField(srcParentId);
+        if (!(parentField.type() instanceof Types.RecordType)) {
+          throw new HoodieSchemaException(String.format("only support reorder 
fields in struct type, but find: %s", parentField.type()));
+        }
+        parentId = parentField.fieldId();
+      } else if (srcParentId == null &&  dsrParentId == null) {
+        parentId = -1;
+      } else if (srcParentId != null && dsrParentId == null && 
orderType.equals("first")) {
+        parentId = srcParentId;
+      } else {
+        throw new HoodieSchemaException("cannot order position from different 
parent");
+      }
+
+      ArrayList<ColumnPositionChange> changes = 
positionChangeMap.getOrDefault(parentId, new ArrayList<>());
+      changes.add(ColumnPositionChange.get(srcId, dsrId, orderType));
+      positionChangeMap.put(parentId, changes);
+      return this;
+    }
+
+    /**
+     * abstract method.
+     * give a column fullName and return the field id
+     *
+     * @param fullName column fullName
+     * @return field id of current column
+     */
+    public abstract Integer findIdByFullName(String fullName);
+
+    @Override
+    public boolean withPositionChange() {
+      return false;
+    }
+  }
+
+  /**
+   * Column position change.
+   * now support three change types: FIRST/AFTER/BEFORE
+   * FIRST means the specified column should be the first column.
+   * AFTER means the specified column should be put after the given column.
+   * BEFORE means the specified column should be put before the given column.
+   * Note that, the specified column may be a nested field:
+   * AFTER/BEFORE means the given columns should in the same struct;
+   * FIRST means this field should be the first one within the struct.
+   */
+  class ColumnPositionChange {
+    public enum ColumnPositionType {
+      FIRST,
+      BEFORE,
+      AFTER
+    }
+
+    private final int srcId;
+    private final int dsrId;
+    private final ColumnPositionType type;
+
+    static ColumnPositionChange first(int srcId) {
+      return new ColumnPositionChange(srcId, -1, ColumnPositionType.FIRST);
+    }
+
+    static ColumnPositionChange before(int srcId, int dsrId) {
+      return new ColumnPositionChange(srcId, dsrId, ColumnPositionType.BEFORE);
+    }
+
+    static ColumnPositionChange after(int srcId, int dsrId) {
+      return new ColumnPositionChange(srcId, dsrId, ColumnPositionType.AFTER);
+    }
+
+    static ColumnPositionChange get(int srcId, int dsrId, String type) {
+      switch (type) {

Review comment:
       Can we use the enum type here ?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -1068,4 +1076,150 @@ public void close() {
     this.heartbeatClient.stop();
     this.txnManager.close();
   }
+
+  /**
+   * add columns to table.
+   *
+   * @param colName col name to be added. if we want to add col to a nested 
filed, the fullName should be specify
+   * @param schema col type to be added.
+   * @param doc col doc to be added.
+   * @param position col position to be added
+   * @param positionType col position change type. now support three change 
types: first/after/before
+   */
+  public void addColumns(String colName, Schema schema, String doc, String 
position, String positionType) {
+    Pair<InternalSchema, HoodieTableMetaClient> pair = 
getInternalSchemaAndMetaClient();
+    TableChanges.ColumnAddChange add = 
TableChanges.ColumnAddChange.get(pair.getLeft());
+    String parentName = TableChanges.getParentName(colName);
+    add.addColumns(parentName, colName, AvroSchemaUtil.convertToField(schema), 
doc);
+    if (position != null && !position.isEmpty() && positionType != null && 
!position.isEmpty()) {

Review comment:
       !position.isEmpty() is still appearing twice. Can you take a look ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaParser.java
##########
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.util.TokenBuffer;
+
+import org.apache.avro.JsonProperties;
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class InternalSchemaParser {
+  private InternalSchemaParser() {
+
+  }
+
+  public static final String LATESTSCHEMA = "latestSchema";
+  public static final String SCHEMAS = "schemas";
+  private static final String MAX_COLUMN_ID = "max_column_id";
+  private static final String VERSION_ID = "version-id";
+  private static final String TYPE = "type";
+  private static final String RECORD = "record";
+  private static final String ARRAY = "array";
+  private static final String MAP = "map";
+  private static final String FIELDS = "fields";
+  private static final String ELEMENT = "element";
+  private static final String KEY = "key";
+  private static final String VALUE = "value";
+  private static final String DOC = "doc";
+  private static final String NAME = "name";
+  private static final String ID = "id";
+  private static final String ELEMENT_ID = "element-id";
+  private static final String KEY_ID = "key-id";
+  private static final String VALUE_ID = "value-id";
+  private static final String OPTIONAL = "optional";
+  private static final String ELEMENT_OPTIONAL = "element_optional";
+  private static final String VALUE_OPTIONAL = "value_optional";
+
+  private static final Pattern FIXED = Pattern.compile("fixed\\[(\\d+)\\]");
+  private static final Pattern DECIMAL = 
Pattern.compile("decimal\\((\\d+),\\s+(\\d+)\\)");
+
+  public static JsonNode toJsonNode(Object datum) {

Review comment:
       looks like this class performs both serialization and deserialization. 
If that is the case,  lets move those methods to a SerDeHelper class

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
##########
@@ -311,6 +319,12 @@ private boolean isNewInstantBlock(HoodieLogBlock logBlock) 
{
   private void processDataBlock(HoodieDataBlock dataBlock) throws Exception {
     // TODO (NA) - Implement getRecordItr() in HoodieAvroDataBlock and use 
that here
     List<IndexedRecord> recs = dataBlock.getRecords();
+    if (internalSchema != null) {
+      Long currentTime = 
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
+      InternalSchema fileSchema = 
TableInternalSchemaUtils.searchSchemaAndCache(currentTime, path, fs.getConf());
+      Schema readSchema = 
AvroSchemaUtil.convert(InternalSchemaUtils.mergeSchema(fileSchema, 
internalSchema), readerSchema != null ? readerSchema.getName() : "tableName");
+      recs = recs.stream().map(rec -> 
AvroSchemaUtil.rewriteRecord((GenericRecord) rec, 
readSchema)).collect(Collectors.toList());

Review comment:
       We need to rewrite record only when there is some delta in the schema. 
Can we make sure that is done ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to