bvaradar commented on a change in pull request #4910:
URL: https://github.com/apache/hudi/pull/4910#discussion_r832097237



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
##########
@@ -147,17 +159,22 @@ private RecordIterator(Schema readerSchema, Schema 
writerSchema, byte[] content)
       int version = this.dis.readInt();
       HoodieAvroDataBlockVersion logBlockVersion = new 
HoodieAvroDataBlockVersion(version);
 
-      this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
+      Schema finalReadSchema = readerSchema;
+      if (!internalSchema.isEmptySchema()) {
+        finalReadSchema = writerSchema;

Review comment:
       Can you add a descriptive comment mentioning why we set to writeSchema 
in this case ?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkInternalSchemaConverter.java
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.ArrayType$;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.BooleanType$;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.CharType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DecimalType$;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.DoubleType$;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.FloatType$;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.IntegerType$;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.MapType$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StringType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.sql.types.TimestampType$;
+import org.apache.spark.sql.types.UserDefinedType;
+import org.apache.spark.sql.types.VarcharType;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class SparkInternalSchemaConverter {
+  private SparkInternalSchemaConverter() {
+
+  }
+
+  public static final String HOODIE_QUERY_SCHEMA = 
"hoodie.schema.internal.querySchema";
+  public static final String HOODIE_TABLE_PATH = "hoodie.tablePath";
+  /**
+   * Converts a spark schema to an hudi internal schema. Fields without IDs 
are kept and assigned fallback IDs.
+   *
+   * @param sparkSchema a spark schema
+   * @return a matching internal schema for the provided spark schema
+   */
+  public static InternalSchema convertStructTypeToInternalSchema(StructType 
sparkSchema) {
+    Type newType = buildTypeFromStructType(sparkSchema, true, new 
AtomicInteger(0));
+    return new InternalSchema(((Types.RecordType)newType).fields());
+  }
+
+  public static Type buildTypeFromStructType(DataType sparkType, Boolean 
firstVisitRoot, AtomicInteger nextId) {
+    if (sparkType instanceof StructType) {
+      StructField[] fields = ((StructType) sparkType).fields();
+      int nextAssignId = firstVisitRoot ? 0 : nextId.get();
+      nextId.set(nextAssignId + fields.length);
+      List<Type> newTypes = new ArrayList<>();
+      for (StructField f : fields) {
+        newTypes.add(buildTypeFromStructType(f.dataType(), false, nextId));
+      }
+      List<Types.Field> newFields = new ArrayList<>();
+      for (int i = 0; i < newTypes.size(); i++) {
+        StructField f = fields[i];
+        newFields.add(Types.Field.get(nextAssignId + i, f.nullable(), 
f.name(), newTypes.get(i),
+            f.getComment().isDefined() ? f.getComment().get() : null));
+      }
+      return Types.RecordType.get(newFields);
+    } else if (sparkType instanceof MapType) {
+      MapType map = (MapType) sparkType;
+      DataType keyType = map.keyType();
+      DataType valueType = map.valueType();
+      int keyId = nextId.get();
+      int valueId = keyId + 1;
+      nextId.set(valueId + 1);
+      return Types.MapType.get(keyId, valueId, 
buildTypeFromStructType(keyType, false, nextId),
+          buildTypeFromStructType(valueType, false, nextId), 
map.valueContainsNull());
+    } else if (sparkType instanceof ArrayType) {
+      ArrayType array = (ArrayType) sparkType;
+      DataType et = array.elementType();
+      int elementId = nextId.get();
+      nextId.set(elementId + 1);
+      return Types.ArrayType.get(elementId, array.containsNull(), 
buildTypeFromStructType(et, false, nextId));
+    } else if (sparkType instanceof UserDefinedType) {
+      throw new UnsupportedOperationException("User-defined types are not 
supported");
+    } else if (sparkType instanceof BooleanType) {
+      return Types.BooleanType.get();
+    } else if (sparkType instanceof IntegerType
+        || sparkType instanceof ShortType
+        || sparkType instanceof ByteType) {
+      return Types.IntType.get();
+    } else if (sparkType instanceof LongType) {
+      return Types.LongType.get();
+    } else if (sparkType instanceof FloatType) {
+      return Types.FloatType.get();
+    } else if (sparkType instanceof DoubleType) {
+      return Types.DoubleType.get();
+    } else if (sparkType instanceof StringType
+        || sparkType instanceof CharType
+        || sparkType instanceof VarcharType) {
+      return Types.StringType.get();
+    } else if (sparkType instanceof DateType) {
+      return Types.DateType.get();
+      // spark 3.3.0 support TimeStampNTZ, to do support spark3.3.0
+    } else if (sparkType instanceof TimestampType) {
+      return Types.TimestampType.get();
+    } else if (sparkType instanceof DecimalType) {
+      return Types.DecimalType.get(
+          ((DecimalType) sparkType).precision(),
+          ((DecimalType) sparkType).scale());
+    } else if (sparkType instanceof BinaryType) {
+      return Types.BinaryType.get();
+    } else {
+      throw new UnsupportedOperationException(String.format("Not a supported 
type: %s",  sparkType.catalogString()));
+    }
+  }
+
+  /**
+   * Converts a spark schema to an hudi internal schema, and prunes fields.

Review comment:
       nit: Change doc to "Converts Spark schema to Hudi internal schema, and 
prune fields."

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/io/AbstractInternalSchemaStorageManager.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.io;
+
+import org.apache.hudi.common.util.Option;
+
+abstract class AbstractInternalSchemaStorageManager {
+
+  /**
+   * persist history schema str.
+   */
+  public abstract void persistHistorySchemaStr(String instantTime, String 
historySchemaStr);
+
+  /**
+   * get latest history schema string.
+   */
+  public abstract String getHistorySchemaStr();
+
+  /**
+   * Bulk Insert a batch of new records into Hoodie table at the supplied 
instantTime.

Review comment:
       The Javadoc is wrong. Need to fix this.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.SAVE_SCHEMA_ACTION;
+
+public class FileBasedInternalSchemaStorageManager extends 
AbstractInternalSchemaStorageManager {
+  private static final Logger LOG = 
LogManager.getLogger(FileBasedInternalSchemaStorageManager.class);
+
+  public static final String SCHEMA_NAME = ".schema";
+  private final Path baseSchemaPath;
+  private Configuration conf;
+  private HoodieTableMetaClient metaClient;
+
+  public FileBasedInternalSchemaStorageManager(Configuration conf, Path 
baseTablePath) {
+    Path metaPath = new Path(baseTablePath, ".hoodie");
+    this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME);
+    this.conf = conf;
+    this.metaClient = 
HoodieTableMetaClient.builder().setBasePath(metaPath.getParent().toString()).setConf(conf).build();
+  }
+
+  public FileBasedInternalSchemaStorageManager(HoodieTableMetaClient 
metaClient) {
+    Path metaPath = new Path(metaClient.getBasePath(), ".hoodie");
+    this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME);
+    this.conf = metaClient.getHadoopConf();
+    this.metaClient = metaClient;
+  }
+
+  @Override
+  public void persistHistorySchemaStr(String instantTime, String 
historySchemaStr) {
+    cleanResidualFiles();
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    HoodieInstant hoodieInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, SAVE_SCHEMA_ACTION, instantTime);
+    timeline.createNewInstant(hoodieInstant);
+    byte[] writeContent = historySchemaStr.getBytes(StandardCharsets.UTF_8);
+    timeline.transitionRequestedToInflight(hoodieInstant, Option.empty());
+    timeline.saveAsComplete(new HoodieInstant(HoodieInstant.State.INFLIGHT, 
hoodieInstant.getAction(), hoodieInstant.getTimestamp()), 
Option.of(writeContent));
+    LOG.info(String.format("persist history schema success on commit time: 
%s", instantTime));
+  }
+
+  private void cleanResidualFiles() {
+    List<String> validateCommits = getValidInstants();
+    try {
+      FileSystem fs = baseSchemaPath.getFileSystem(conf);
+      if (fs.exists(baseSchemaPath)) {
+        List<String> candidateSchemaFiles = 
Arrays.stream(fs.listStatus(baseSchemaPath)).filter(f -> f.isFile())
+            .map(file -> 
file.getPath().getName()).collect(Collectors.toList());
+        List<String> residualSchemaFiles = 
candidateSchemaFiles.stream().filter(f -> 
!validateCommits.contains(f.split("\\.")[0])).collect(Collectors.toList());
+        // clean residual files
+        residualSchemaFiles.forEach(f -> {
+          try {
+            fs.delete(new Path(metaClient.getSchemaFolderName(), f));
+          } catch (IOException o) {
+            throw new HoodieException(o);
+          }
+        });
+      }
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  public void cleanOldFiles(List<String> validateCommits) {
+    try {
+      FileSystem fs = baseSchemaPath.getFileSystem(conf);
+      if (fs.exists(baseSchemaPath)) {
+        List<String> candidateSchemaFiles = 
Arrays.stream(fs.listStatus(baseSchemaPath)).filter(f -> f.isFile())
+            .map(file -> 
file.getPath().getName()).collect(Collectors.toList());
+        List<String> validateSchemaFiles = 
candidateSchemaFiles.stream().filter(f -> 
validateCommits.contains(f.split("\\.")[0])).collect(Collectors.toList());
+        for (int i = 0; i < validateSchemaFiles.size(); i++) {
+          fs.delete(new Path(validateSchemaFiles.get(i)));
+        }
+      }
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  private List<String> getValidInstants() {
+    metaClient.reloadActiveTimeline();
+    return metaClient.getCommitsTimeline()
+        .filterCompletedInstants().getInstants().map(f -> 
f.getTimestamp()).collect(Collectors.toList());
+  }
+
+  @Override
+  public String getHistorySchemaStr() {
+    List<String> validateCommits = getValidInstants();

Review comment:
       Rename validateCommits to validCommits everywhere

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieDataSourceHelper.scala
##########
@@ -46,7 +49,7 @@ object HoodieDataSourceHelper extends PredicateHelper {
                                options: Map[String, String],
                                hadoopConf: Configuration): PartitionedFile => 
Iterator[InternalRow] = {
 
-    val readParquetFile: PartitionedFile => Iterator[Any] = new 
ParquetFileFormat().buildReaderWithPartitionValues(
+    val readParquetFile: PartitionedFile => Iterator[Any] = 
sparkAdapter.createHoodieParquetFileFormat().get.buildReaderWithPartitionValues(

Review comment:
       Do we need to adapt ParquetFileFormat based on Spark versions ?

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
##########
@@ -110,15 +110,15 @@ object AlterHoodieTableAddColumnsCommand {
       
HoodieWriterUtils.parametersWithWriteDefaults(hoodieCatalogTable.catalogProperties).asJava
     )
 
-    val commitActionType = 
CommitUtils.getCommitActionType(WriteOperationType.INSERT, 
hoodieCatalogTable.tableType)
+    val commitActionType = 
CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, 
hoodieCatalogTable.tableType)

Review comment:
       This is changing the existing behavior but aligns with other changes. We 
should document this in release notes as commit metadata will show the 
operation type to be ALTER_SCHEMA 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkInternalSchemaConverter.java
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.ArrayType$;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.BooleanType$;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.CharType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DecimalType$;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.DoubleType$;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.FloatType$;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.IntegerType$;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.MapType$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StringType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.sql.types.TimestampType$;
+import org.apache.spark.sql.types.UserDefinedType;
+import org.apache.spark.sql.types.VarcharType;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class SparkInternalSchemaConverter {
+  private SparkInternalSchemaConverter() {
+
+  }
+
+  public static final String HOODIE_QUERY_SCHEMA = 
"hoodie.schema.internal.querySchema";
+  public static final String HOODIE_TABLE_PATH = "hoodie.tablePath";
+  /**
+   * Converts a spark schema to an hudi internal schema. Fields without IDs 
are kept and assigned fallback IDs.
+   *
+   * @param sparkSchema a spark schema
+   * @return a matching internal schema for the provided spark schema
+   */
+  public static InternalSchema convertStructTypeToInternalSchema(StructType 
sparkSchema) {
+    Type newType = buildTypeFromStructType(sparkSchema, true, new 
AtomicInteger(0));
+    return new InternalSchema(((Types.RecordType)newType).fields());
+  }
+
+  public static Type buildTypeFromStructType(DataType sparkType, Boolean 
firstVisitRoot, AtomicInteger nextId) {
+    if (sparkType instanceof StructType) {
+      StructField[] fields = ((StructType) sparkType).fields();
+      int nextAssignId = firstVisitRoot ? 0 : nextId.get();
+      nextId.set(nextAssignId + fields.length);
+      List<Type> newTypes = new ArrayList<>();
+      for (StructField f : fields) {
+        newTypes.add(buildTypeFromStructType(f.dataType(), false, nextId));
+      }
+      List<Types.Field> newFields = new ArrayList<>();
+      for (int i = 0; i < newTypes.size(); i++) {
+        StructField f = fields[i];
+        newFields.add(Types.Field.get(nextAssignId + i, f.nullable(), 
f.name(), newTypes.get(i),
+            f.getComment().isDefined() ? f.getComment().get() : null));
+      }
+      return Types.RecordType.get(newFields);
+    } else if (sparkType instanceof MapType) {
+      MapType map = (MapType) sparkType;
+      DataType keyType = map.keyType();
+      DataType valueType = map.valueType();
+      int keyId = nextId.get();
+      int valueId = keyId + 1;
+      nextId.set(valueId + 1);
+      return Types.MapType.get(keyId, valueId, 
buildTypeFromStructType(keyType, false, nextId),
+          buildTypeFromStructType(valueType, false, nextId), 
map.valueContainsNull());
+    } else if (sparkType instanceof ArrayType) {
+      ArrayType array = (ArrayType) sparkType;
+      DataType et = array.elementType();
+      int elementId = nextId.get();
+      nextId.set(elementId + 1);
+      return Types.ArrayType.get(elementId, array.containsNull(), 
buildTypeFromStructType(et, false, nextId));
+    } else if (sparkType instanceof UserDefinedType) {
+      throw new UnsupportedOperationException("User-defined types are not 
supported");
+    } else if (sparkType instanceof BooleanType) {
+      return Types.BooleanType.get();
+    } else if (sparkType instanceof IntegerType
+        || sparkType instanceof ShortType
+        || sparkType instanceof ByteType) {
+      return Types.IntType.get();
+    } else if (sparkType instanceof LongType) {
+      return Types.LongType.get();
+    } else if (sparkType instanceof FloatType) {
+      return Types.FloatType.get();
+    } else if (sparkType instanceof DoubleType) {
+      return Types.DoubleType.get();
+    } else if (sparkType instanceof StringType
+        || sparkType instanceof CharType
+        || sparkType instanceof VarcharType) {
+      return Types.StringType.get();
+    } else if (sparkType instanceof DateType) {
+      return Types.DateType.get();
+      // spark 3.3.0 support TimeStampNTZ, to do support spark3.3.0
+    } else if (sparkType instanceof TimestampType) {
+      return Types.TimestampType.get();
+    } else if (sparkType instanceof DecimalType) {
+      return Types.DecimalType.get(
+          ((DecimalType) sparkType).precision(),
+          ((DecimalType) sparkType).scale());
+    } else if (sparkType instanceof BinaryType) {
+      return Types.BinaryType.get();
+    } else {
+      throw new UnsupportedOperationException(String.format("Not a supported 
type: %s",  sparkType.catalogString()));
+    }
+  }
+
+  /**
+   * Converts a spark schema to an hudi internal schema, and prunes fields.
+   * Fields without IDs are kept and assigned fallback IDs.
+   *
+   * @param sparkSchema a pruned spark schema
+   * @param originSchema a internal schema for hoodie table
+   * @return a pruned internal schema for the provided spark schema
+   */
+  public static InternalSchema 
convertAndPruneStructTypeToInternalSchema(StructType sparkSchema, 
InternalSchema originSchema) {
+    List<String> pruneNames = collectColNamesFromSparkStruct(sparkSchema);
+    return InternalSchemaUtils.pruneInternalSchema(originSchema, pruneNames);
+  }
+
+  /**
+   * collect all the leaf nodes names.
+   *
+   * @param sparkSchema a spark schema
+   * @return leaf nodes full names.
+   */
+  public static List<String> collectColNamesFromSparkStruct(StructType 
sparkSchema) {
+    List<String> result =  new ArrayList<>();
+    collectColNamesFromStructType(sparkSchema, new LinkedList<>(), result);
+    return result;
+  }
+
+  private static void collectColNamesFromStructType(DataType sparkType, 
Deque<String> fieldNames, List<String> resultSet) {
+    if (sparkType instanceof StructType) {
+      StructField[] fields = ((StructType) sparkType).fields();
+      for (StructField f : fields) {
+        fieldNames.push(f.name());
+        collectColNamesFromStructType(f.dataType(), fieldNames, resultSet);
+        fieldNames.pop();
+        addFullName(f.dataType(), f.name(), fieldNames, resultSet);
+      }
+    } else if (sparkType instanceof MapType) {
+      MapType map = (MapType) sparkType;
+      DataType keyType = map.keyType();
+      DataType valueType = map.valueType();
+      // key
+      fieldNames.push("key");
+      collectColNamesFromStructType(keyType, fieldNames, resultSet);
+      fieldNames.pop();
+      addFullName(keyType,"key", fieldNames, resultSet);
+      // value
+      fieldNames.push("value");
+      collectColNamesFromStructType(valueType, fieldNames, resultSet);
+      fieldNames.poll();
+      addFullName(valueType,"value", fieldNames, resultSet);
+    } else if (sparkType instanceof ArrayType) {
+      ArrayType array = (ArrayType) sparkType;
+      DataType et = array.elementType();
+      fieldNames.push("element");
+      collectColNamesFromStructType(et, fieldNames, resultSet);
+      fieldNames.pop();
+      addFullName(et, "element", fieldNames, resultSet);
+    } else if (sparkType instanceof UserDefinedType) {
+      throw new UnsupportedOperationException("User-defined types are not 
supported");
+    } else {
+      // do nothings
+    }
+  }
+
+  private static void addFullName(DataType sparkType, String name, 
Deque<String> fieldNames, List<String> resultSet) {
+    if (!(sparkType instanceof StructType) && !(sparkType instanceof 
ArrayType) && !(sparkType instanceof MapType)) {
+      resultSet.add(InternalSchemaUtils.createFullName(name, fieldNames));
+    }
+  }
+
+  public static StructType mergeSchema(InternalSchema fileSchema, 
InternalSchema querySchema) {
+    InternalSchema schema = new InternalSchemaMerger(fileSchema, querySchema, 
true, true).mergeSchema();
+    return constructSparkSchemaFromInternalSchema(schema);
+  }
+
+  public static Map<Integer, Pair<DataType, DataType>> 
collectTypeChangedCols(InternalSchema schema, InternalSchema other) {
+    return InternalSchemaUtils
+        .collectTypeChangedCols(schema, other)
+        .entrySet()
+        .stream()
+        .collect(Collectors.toMap(e -> e.getKey(), e -> 
Pair.of(constructSparkSchemaFromType(e.getValue().getLeft()), 
constructSparkSchemaFromType(e.getValue().getRight()))));
+  }
+
+  public static StructType 
constructSparkSchemaFromInternalSchema(InternalSchema schema) {
+    return (StructType) constructSparkSchemaFromType(schema.getRecord());
+  }
+
+  private static DataType constructSparkSchemaFromType(Type type) {
+    switch (type.typeId()) {
+      case RECORD:
+        Types.RecordType record = (Types.RecordType) type;
+        List<Types.Field> fields = record.fields();
+        List<StructField> structFields = new ArrayList<>();
+        for (Types.Field f : fields) {
+          DataType dataType = constructSparkSchemaFromType(f.type());
+          StructField structField = StructField.apply(f.name(), dataType, 
f.isOptional(), Metadata.empty());
+          structField = f.doc() == null ? structField : 
structField.withComment(f.doc());
+          structFields.add(structField);
+        }
+        return StructType$.MODULE$.apply(structFields);
+      case ARRAY:
+        Types.ArrayType array = (Types.ArrayType) type;
+        DataType elementType = 
constructSparkSchemaFromType(array.elementType());
+        return ArrayType$.MODULE$.apply(elementType, 
array.isElementOptional());
+      case MAP:
+        Types.MapType map = (Types.MapType) type;
+        DataType keyDataType = constructSparkSchemaFromType(map.keyType());
+        DataType valueDataType = constructSparkSchemaFromType(map.valueType());
+        return MapType$.MODULE$.apply(keyDataType, valueDataType, 
map.isValueOptional());
+      case BOOLEAN:
+        return BooleanType$.MODULE$;
+      case INT:
+        return IntegerType$.MODULE$;
+      case LONG:
+        return LongType$.MODULE$;
+      case FLOAT:
+        return FloatType$.MODULE$;
+      case DOUBLE:
+        return DoubleType$.MODULE$;
+      case DATE:
+        return DateType$.MODULE$;
+      case TIME:
+        throw new UnsupportedOperationException(String.format("cannot convert 
%s type to Spark", type));
+      case TIMESTAMP:
+        // todo support TimeStampNTZ
+        return TimestampType$.MODULE$;
+      case STRING:
+        return StringType$.MODULE$;
+      case UUID:
+        return StringType$.MODULE$;
+      case FIXED:
+        return BinaryType$.MODULE$;
+      case BINARY:
+        return BinaryType$.MODULE$;
+      case DECIMAL:
+        Types.DecimalType decimal = (Types.DecimalType) type;
+        return DecimalType$.MODULE$.apply(decimal.precision(), 
decimal.scale());
+      default:
+        throw new UnsupportedOperationException(String.format("cannot convert 
unknown type: %s to Spark", type));
+    }
+  }
+
+  /**
+   * Convert Int/long type to other Type.
+   * Now only support int/long -> long/float/double/string
+   * TODO: support more types
+   */
+  private static boolean convertIntLongType(WritableColumnVector oldV, 
WritableColumnVector newV, DataType newType, int len) {
+    boolean isInt = oldV.dataType() instanceof IntegerType;
+    if (newType instanceof LongType || newType instanceof FloatType
+        || newType instanceof DoubleType || newType instanceof StringType || 
newType instanceof DecimalType) {
+      for (int i = 0; i < len; i++) {
+        if (oldV.isNullAt(i)) {
+          newV.putNull(i);
+          continue;
+        }
+        // int/long -> long/float/double/string/decimal
+        if (newType instanceof LongType) {
+          newV.putLong(i, isInt ? oldV.getInt(i) : oldV.getLong(i));
+        } else if (newType instanceof FloatType) {
+          newV.putFloat(i, isInt ? oldV.getInt(i) : oldV.getLong(i));
+        } else if (newType instanceof DoubleType) {
+          newV.putDouble(i, isInt ? oldV.getInt(i) : oldV.getLong(i));
+        } else if (newType instanceof StringType) {
+          newV.putByteArray(i, ((isInt ? oldV.getInt(i) : oldV.getLong(i)) + 
"").getBytes(StandardCharsets.UTF_8));
+        } else if (newType instanceof DecimalType) {
+          Decimal oldDecimal = Decimal.apply(isInt ? oldV.getInt(i) : 
oldV.getLong(i));
+          oldDecimal.changePrecision(((DecimalType) newType).precision(), 
((DecimalType) newType).scale());
+          newV.putDecimal(i, oldDecimal, ((DecimalType) newType).precision());
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Convert float type to other Type.
+   * Now only support float -> double/String
+   * TODO: support more types
+   */
+  private static boolean convertFloatType(WritableColumnVector oldV, 
WritableColumnVector newV, DataType newType, int len) {
+    if (newType instanceof DoubleType || newType instanceof StringType || 
newType instanceof DecimalType) {
+      for (int i = 0; i < len; i++) {
+        if (oldV.isNullAt(i)) {
+          newV.putNull(i);
+          continue;
+        }
+        // float -> double/string/decimal
+        if (newType instanceof DoubleType) {
+          newV.putDouble(i, Double.valueOf(oldV.getFloat(i) + ""));
+        } else if (newType instanceof StringType) {
+          newV.putByteArray(i, (oldV.getFloat(i) + 
"").getBytes(StandardCharsets.UTF_8));
+        } else if (newType instanceof DecimalType) {
+          Decimal oldDecimal = Decimal.apply(oldV.getFloat(i));
+          oldDecimal.changePrecision(((DecimalType) newType).precision(), 
((DecimalType) newType).scale());
+          newV.putDecimal(i, oldDecimal, ((DecimalType) newType).precision());
+        }
+      }
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Convert double type to other Type.
+   * Now only support Double -> Decimal/String

Review comment:
       Can you describe what needs to be supported here ? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/action/InternalSchemaMerger.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * auxiliary class.
+ * help to merge file schema and query schema to produce final read schema for 
avro/parquet file
+ */
+public class InternalSchemaMerger {
+  private final InternalSchema fileSchema;
+  private final InternalSchema querySchema;
+  // now there exist some bugs when we use spark update/merge api,
+  // those operation will change col nullability from optional to required 
which is wrong.
+  // Before that bug is fixed, we need to do adapt.
+  // if mergeRequiredFiledForce is true, we will ignore the col's required 
attribute.
+  private final boolean ignoreRequiredAttribute;
+  // Whether to use column Type from file schema to read files when we find 
some column type has changed.

Review comment:
       When will this case arise ? 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkInternalSchemaConverter.java
##########
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.types.ArrayType;
+import org.apache.spark.sql.types.ArrayType$;
+import org.apache.spark.sql.types.BinaryType;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.BooleanType;
+import org.apache.spark.sql.types.BooleanType$;
+import org.apache.spark.sql.types.ByteType;
+import org.apache.spark.sql.types.CharType;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.DecimalType$;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.DoubleType$;
+import org.apache.spark.sql.types.FloatType;
+import org.apache.spark.sql.types.FloatType$;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.IntegerType$;
+import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.MapType;
+import org.apache.spark.sql.types.MapType$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.ShortType;
+import org.apache.spark.sql.types.StringType;
+import org.apache.spark.sql.types.StringType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.sql.types.TimestampType;
+import org.apache.spark.sql.types.TimestampType$;
+import org.apache.spark.sql.types.UserDefinedType;
+import org.apache.spark.sql.types.VarcharType;
+
+import java.nio.charset.StandardCharsets;
+import java.sql.Date;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class SparkInternalSchemaConverter {
+  private SparkInternalSchemaConverter() {
+
+  }
+
+  public static final String HOODIE_QUERY_SCHEMA = 
"hoodie.schema.internal.querySchema";
+  public static final String HOODIE_TABLE_PATH = "hoodie.tablePath";
+  /**
+   * Converts a spark schema to an hudi internal schema. Fields without IDs 
are kept and assigned fallback IDs.
+   *
+   * @param sparkSchema a spark schema
+   * @return a matching internal schema for the provided spark schema
+   */
+  public static InternalSchema convertStructTypeToInternalSchema(StructType 
sparkSchema) {
+    Type newType = buildTypeFromStructType(sparkSchema, true, new 
AtomicInteger(0));
+    return new InternalSchema(((Types.RecordType)newType).fields());
+  }
+
+  public static Type buildTypeFromStructType(DataType sparkType, Boolean 
firstVisitRoot, AtomicInteger nextId) {
+    if (sparkType instanceof StructType) {
+      StructField[] fields = ((StructType) sparkType).fields();
+      int nextAssignId = firstVisitRoot ? 0 : nextId.get();
+      nextId.set(nextAssignId + fields.length);
+      List<Type> newTypes = new ArrayList<>();
+      for (StructField f : fields) {
+        newTypes.add(buildTypeFromStructType(f.dataType(), false, nextId));
+      }
+      List<Types.Field> newFields = new ArrayList<>();
+      for (int i = 0; i < newTypes.size(); i++) {
+        StructField f = fields[i];
+        newFields.add(Types.Field.get(nextAssignId + i, f.nullable(), 
f.name(), newTypes.get(i),
+            f.getComment().isDefined() ? f.getComment().get() : null));
+      }
+      return Types.RecordType.get(newFields);
+    } else if (sparkType instanceof MapType) {
+      MapType map = (MapType) sparkType;
+      DataType keyType = map.keyType();
+      DataType valueType = map.valueType();
+      int keyId = nextId.get();
+      int valueId = keyId + 1;
+      nextId.set(valueId + 1);
+      return Types.MapType.get(keyId, valueId, 
buildTypeFromStructType(keyType, false, nextId),
+          buildTypeFromStructType(valueType, false, nextId), 
map.valueContainsNull());
+    } else if (sparkType instanceof ArrayType) {
+      ArrayType array = (ArrayType) sparkType;
+      DataType et = array.elementType();
+      int elementId = nextId.get();
+      nextId.set(elementId + 1);
+      return Types.ArrayType.get(elementId, array.containsNull(), 
buildTypeFromStructType(et, false, nextId));
+    } else if (sparkType instanceof UserDefinedType) {
+      throw new UnsupportedOperationException("User-defined types are not 
supported");
+    } else if (sparkType instanceof BooleanType) {
+      return Types.BooleanType.get();
+    } else if (sparkType instanceof IntegerType
+        || sparkType instanceof ShortType
+        || sparkType instanceof ByteType) {
+      return Types.IntType.get();
+    } else if (sparkType instanceof LongType) {
+      return Types.LongType.get();
+    } else if (sparkType instanceof FloatType) {
+      return Types.FloatType.get();
+    } else if (sparkType instanceof DoubleType) {
+      return Types.DoubleType.get();
+    } else if (sparkType instanceof StringType
+        || sparkType instanceof CharType
+        || sparkType instanceof VarcharType) {
+      return Types.StringType.get();
+    } else if (sparkType instanceof DateType) {
+      return Types.DateType.get();
+      // spark 3.3.0 support TimeStampNTZ, to do support spark3.3.0
+    } else if (sparkType instanceof TimestampType) {
+      return Types.TimestampType.get();
+    } else if (sparkType instanceof DecimalType) {
+      return Types.DecimalType.get(
+          ((DecimalType) sparkType).precision(),
+          ((DecimalType) sparkType).scale());
+    } else if (sparkType instanceof BinaryType) {
+      return Types.BinaryType.get();
+    } else {
+      throw new UnsupportedOperationException(String.format("Not a supported 
type: %s",  sparkType.catalogString()));
+    }
+  }
+
+  /**
+   * Converts a spark schema to an hudi internal schema, and prunes fields.
+   * Fields without IDs are kept and assigned fallback IDs.
+   *
+   * @param sparkSchema a pruned spark schema
+   * @param originSchema a internal schema for hoodie table
+   * @return a pruned internal schema for the provided spark schema
+   */
+  public static InternalSchema 
convertAndPruneStructTypeToInternalSchema(StructType sparkSchema, 
InternalSchema originSchema) {
+    List<String> pruneNames = collectColNamesFromSparkStruct(sparkSchema);
+    return InternalSchemaUtils.pruneInternalSchema(originSchema, pruneNames);
+  }
+
+  /**
+   * collect all the leaf nodes names.

Review comment:
       nit: Can you look at all javadocs in this PR and ensure the first word 
starts with capital letter

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Util methods to support evolve old avro schema based on a given schema.
+ */
+public class AvroSchemaEvolutionUtils {
+  private AvroSchemaEvolutionUtils() {
+  }
+
+  /**
+   * support evolution from a new avroSchema.

Review comment:
       support -> Support

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.SAVE_SCHEMA_ACTION;
+
+public class FileBasedInternalSchemaStorageManager extends 
AbstractInternalSchemaStorageManager {
+  private static final Logger LOG = 
LogManager.getLogger(FileBasedInternalSchemaStorageManager.class);
+
+  public static final String SCHEMA_NAME = ".schema";
+  private final Path baseSchemaPath;
+  private Configuration conf;
+  private HoodieTableMetaClient metaClient;
+
+  public FileBasedInternalSchemaStorageManager(Configuration conf, Path 
baseTablePath) {
+    Path metaPath = new Path(baseTablePath, ".hoodie");
+    this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME);
+    this.conf = conf;
+    this.metaClient = 
HoodieTableMetaClient.builder().setBasePath(metaPath.getParent().toString()).setConf(conf).build();
+  }
+
+  public FileBasedInternalSchemaStorageManager(HoodieTableMetaClient 
metaClient) {
+    Path metaPath = new Path(metaClient.getBasePath(), ".hoodie");
+    this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME);
+    this.conf = metaClient.getHadoopConf();
+    this.metaClient = metaClient;
+  }
+
+  @Override
+  public void persistHistorySchemaStr(String instantTime, String 
historySchemaStr) {
+    cleanResidualFiles();
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    HoodieInstant hoodieInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, SAVE_SCHEMA_ACTION, instantTime);
+    timeline.createNewInstant(hoodieInstant);
+    byte[] writeContent = historySchemaStr.getBytes(StandardCharsets.UTF_8);
+    timeline.transitionRequestedToInflight(hoodieInstant, Option.empty());
+    timeline.saveAsComplete(new HoodieInstant(HoodieInstant.State.INFLIGHT, 
hoodieInstant.getAction(), hoodieInstant.getTimestamp()), 
Option.of(writeContent));
+    LOG.info(String.format("persist history schema success on commit time: 
%s", instantTime));
+  }
+
+  private void cleanResidualFiles() {
+    List<String> validateCommits = getValidInstants();
+    try {
+      FileSystem fs = baseSchemaPath.getFileSystem(conf);
+      if (fs.exists(baseSchemaPath)) {
+        List<String> candidateSchemaFiles = 
Arrays.stream(fs.listStatus(baseSchemaPath)).filter(f -> f.isFile())
+            .map(file -> 
file.getPath().getName()).collect(Collectors.toList());
+        List<String> residualSchemaFiles = 
candidateSchemaFiles.stream().filter(f -> 
!validateCommits.contains(f.split("\\.")[0])).collect(Collectors.toList());
+        // clean residual files
+        residualSchemaFiles.forEach(f -> {
+          try {
+            fs.delete(new Path(metaClient.getSchemaFolderName(), f));
+          } catch (IOException o) {
+            throw new HoodieException(o);
+          }
+        });
+      }
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  public void cleanOldFiles(List<String> validateCommits) {
+    try {
+      FileSystem fs = baseSchemaPath.getFileSystem(conf);
+      if (fs.exists(baseSchemaPath)) {
+        List<String> candidateSchemaFiles = 
Arrays.stream(fs.listStatus(baseSchemaPath)).filter(f -> f.isFile())
+            .map(file -> 
file.getPath().getName()).collect(Collectors.toList());
+        List<String> validateSchemaFiles = 
candidateSchemaFiles.stream().filter(f -> 
validateCommits.contains(f.split("\\.")[0])).collect(Collectors.toList());
+        for (int i = 0; i < validateSchemaFiles.size(); i++) {
+          fs.delete(new Path(validateSchemaFiles.get(i)));
+        }
+      }
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  private List<String> getValidInstants() {
+    metaClient.reloadActiveTimeline();
+    return metaClient.getCommitsTimeline()
+        .filterCompletedInstants().getInstants().map(f -> 
f.getTimestamp()).collect(Collectors.toList());
+  }
+
+  @Override
+  public String getHistorySchemaStr() {
+    List<String> validateCommits = getValidInstants();
+    try {
+      if (metaClient.getFs().exists(baseSchemaPath)) {
+        List<String> validateSchemaFiles = 
Arrays.stream(metaClient.getFs().listStatus(baseSchemaPath))

Review comment:
       validateSchemaFiles -> validSchemaFiles

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/io/FileBasedInternalSchemaStorageManager.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.SAVE_SCHEMA_ACTION;
+
+public class FileBasedInternalSchemaStorageManager extends 
AbstractInternalSchemaStorageManager {
+  private static final Logger LOG = 
LogManager.getLogger(FileBasedInternalSchemaStorageManager.class);
+
+  public static final String SCHEMA_NAME = ".schema";
+  private final Path baseSchemaPath;
+  private Configuration conf;
+  private HoodieTableMetaClient metaClient;
+
+  public FileBasedInternalSchemaStorageManager(Configuration conf, Path 
baseTablePath) {
+    Path metaPath = new Path(baseTablePath, ".hoodie");
+    this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME);
+    this.conf = conf;
+    this.metaClient = 
HoodieTableMetaClient.builder().setBasePath(metaPath.getParent().toString()).setConf(conf).build();
+  }
+
+  public FileBasedInternalSchemaStorageManager(HoodieTableMetaClient 
metaClient) {
+    Path metaPath = new Path(metaClient.getBasePath(), ".hoodie");
+    this.baseSchemaPath = new Path(metaPath, SCHEMA_NAME);
+    this.conf = metaClient.getHadoopConf();
+    this.metaClient = metaClient;
+  }
+
+  @Override
+  public void persistHistorySchemaStr(String instantTime, String 
historySchemaStr) {
+    cleanResidualFiles();
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    HoodieInstant hoodieInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, SAVE_SCHEMA_ACTION, instantTime);
+    timeline.createNewInstant(hoodieInstant);
+    byte[] writeContent = historySchemaStr.getBytes(StandardCharsets.UTF_8);
+    timeline.transitionRequestedToInflight(hoodieInstant, Option.empty());
+    timeline.saveAsComplete(new HoodieInstant(HoodieInstant.State.INFLIGHT, 
hoodieInstant.getAction(), hoodieInstant.getTimestamp()), 
Option.of(writeContent));
+    LOG.info(String.format("persist history schema success on commit time: 
%s", instantTime));
+  }
+
+  private void cleanResidualFiles() {
+    List<String> validateCommits = getValidInstants();
+    try {
+      FileSystem fs = baseSchemaPath.getFileSystem(conf);
+      if (fs.exists(baseSchemaPath)) {
+        List<String> candidateSchemaFiles = 
Arrays.stream(fs.listStatus(baseSchemaPath)).filter(f -> f.isFile())
+            .map(file -> 
file.getPath().getName()).collect(Collectors.toList());
+        List<String> residualSchemaFiles = 
candidateSchemaFiles.stream().filter(f -> 
!validateCommits.contains(f.split("\\.")[0])).collect(Collectors.toList());
+        // clean residual files
+        residualSchemaFiles.forEach(f -> {
+          try {
+            fs.delete(new Path(metaClient.getSchemaFolderName(), f));
+          } catch (IOException o) {
+            throw new HoodieException(o);
+          }
+        });
+      }
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  public void cleanOldFiles(List<String> validateCommits) {
+    try {
+      FileSystem fs = baseSchemaPath.getFileSystem(conf);
+      if (fs.exists(baseSchemaPath)) {
+        List<String> candidateSchemaFiles = 
Arrays.stream(fs.listStatus(baseSchemaPath)).filter(f -> f.isFile())
+            .map(file -> 
file.getPath().getName()).collect(Collectors.toList());
+        List<String> validateSchemaFiles = 
candidateSchemaFiles.stream().filter(f -> 
validateCommits.contains(f.split("\\.")[0])).collect(Collectors.toList());
+        for (int i = 0; i < validateSchemaFiles.size(); i++) {
+          fs.delete(new Path(validateSchemaFiles.get(i)));
+        }
+      }
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  private List<String> getValidInstants() {
+    metaClient.reloadActiveTimeline();

Review comment:
       This is a costly operation and we are doing this every time we are 
calling getHistorySchemaStr(). Can we avoid this call ? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import 
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class InternalSchemaCache {
+  // use segment lock to reduce competition.
+  // the lock size should be powers of 2 for better hash.
+  private static Object[] lockList = new Object[16];
+
+  static {
+    for (int i = 0; i < lockList.length; i++) {
+      lockList[i] = new Object();
+    }
+  }
+
+  // historySchemas cache maintain a map about (tablePath, HistorySchemas).
+  // this is a Global cache, all threads in one container/executor share the 
same cache.
+  private static final Cache<String, TreeMap<Long, InternalSchema>>
+      HISTORICAL_SCHEMA_CACHE = 
Caffeine.newBuilder().maximumSize(1000).weakValues().build();
+
+  /**
+   * search internalSchema based on versionID.
+   * first step: try to get internalSchema from hoodie commit files, we no 
need to add lock.
+   * if we cannot get internalSchema by first step, then we try to get 
internalSchema from cache.
+   *
+   * @param versionID schema version_id need to search
+   * @param tablePath current hoodie table base path
+   * @param hadoopConf hadoopConf
+   * @return internalSchema
+   */
+  public static InternalSchema searchSchemaAndCache(long versionID, String 
tablePath, Configuration hadoopConf) {
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(tablePath).setConf(hadoopConf).build();
+    return searchSchemaAndCache(versionID, metaClient);
+  }
+
+  /**
+   * search internalSchema based on versionID.
+   * first step: try to get internalSchema from hoodie commit files, we no 
need to add lock.
+   * if we cannot get internalSchema by first step, then we try to get 
internalSchema from cache.
+   *
+   * @param versionID schema version_id need to search
+   * @param metaClient current hoodie metaClient
+   * @return internalSchema
+   */
+  public static InternalSchema searchSchemaAndCache(long versionID, 
HoodieTableMetaClient metaClient) {
+    Option<InternalSchema> candidateSchema = searchSchema(versionID, 
metaClient);
+    if (candidateSchema.isPresent()) {
+      return candidateSchema.get();
+    }
+    String tablePath = metaClient.getBasePath();
+    // use segment lock to reduce competition.
+    synchronized (lockList[tablePath.hashCode() & (lockList.length - 1)]) {

Review comment:
       Can you give us as a scenario in this context where segment lock would 
reduce contention. Do we expect this call to concurrently happen ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SerDeHelper {
+  private SerDeHelper() {
+
+  }
+
+  public static final String LATEST_SCHEMA = "latestSchema";

Review comment:
       Also change the constant latestSchema to latest_schema to make it 
consistent 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SerDeHelper.java
##########
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SerDeHelper {
+  private SerDeHelper() {
+
+  }
+
+  public static final String LATEST_SCHEMA = "latestSchema";
+  public static final String SCHEMAS = "schemas";
+  private static final String MAX_COLUMN_ID = "max_column_id";
+  private static final String VERSION_ID = "version-id";

Review comment:
       In the constants, lets use underscore "_" everywhere instead of mixing 
with "-".

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Util methods to support evolve old avro schema based on a given schema.
+ */
+public class AvroSchemaEvolutionUtils {
+  private AvroSchemaEvolutionUtils() {
+  }
+
+  /**
+   * support evolution from a new avroSchema.
+   * notice: this is not a universal method,
+   * now hoodie support implicitly add columns when hoodie write operation,
+   * This ability needs to be preserved, so implicitly evolution for 
internalSchema should supported.
+   *
+   * @param evolvedSchema implicitly evolution of avro when hoodie write 
operation
+   * @param oldSchema old internalSchema
+   * @param supportPositionReorder support position reorder
+   * @return evolution Schema
+   */
+  public static InternalSchema evolveSchemaFromNewAvroSchema(Schema 
evolvedSchema, InternalSchema oldSchema, Boolean supportPositionReorder) {
+    InternalSchema evolvedInternalSchema = 
AvroInternalSchemaConverter.convert(evolvedSchema);
+    // do check, only support add column evolution
+    List<String> colNamesFromEvolved = 
evolvedInternalSchema.getAllColsFullName();
+    List<String> colNamesFromOldSchema = oldSchema.getAllColsFullName();
+    List<String> diffFromOldSchema = colNamesFromOldSchema.stream().filter(f 
-> !colNamesFromEvolved.contains(f)).collect(Collectors.toList());
+    List<Types.Field> newFields = new ArrayList<>();
+    if (colNamesFromEvolved.size() == colNamesFromOldSchema.size() && 
diffFromOldSchema.size() == 0) {
+      // no changes happen
+      if (supportPositionReorder) {
+        evolvedInternalSchema.getRecord().fields().forEach(f -> 
newFields.add(oldSchema.getRecord().field(f.name())));
+        return new InternalSchema(newFields);
+      }
+      return oldSchema;
+    }
+    // try to find all added columns
+    if (diffFromOldSchema.size() != 0) {
+      throw new UnsupportedOperationException("Cannot evolve schema 
implicitly, find delete/rename operation");
+    }
+
+    List<String> diffFromEvolutionSchema = 
colNamesFromEvolved.stream().filter(f -> 
!colNamesFromOldSchema.contains(f)).collect(Collectors.toList());
+    // Remove redundancy from diffFromEvolutionSchema.
+    // for example, now we add a struct col in evolvedSchema, the struct col 
is " user struct<name:string, age:int> "
+    // when we do diff operation: user, user.name, user.age will appeared in 
the resultSet which is redundancy, user.name and user.age should be excluded.
+    // deal with add operation
+    TreeMap<Integer, String> finalAddAction = new TreeMap<>();
+    for (int i = 0; i < diffFromEvolutionSchema.size(); i++)  {
+      String name = diffFromEvolutionSchema.get(i);
+      int splitPoint = name.lastIndexOf(".");
+      String parentName = splitPoint > 0 ? name.substring(0, splitPoint) : "";
+      if (!parentName.isEmpty() && 
diffFromEvolutionSchema.contains(parentName)) {
+        // find redundancy, skip it
+        continue;
+      }
+      finalAddAction.put(evolvedInternalSchema.findIdByName(name), name);
+    }
+
+    TableChanges.ColumnAddChange addChange = 
TableChanges.ColumnAddChange.get(oldSchema);
+    finalAddAction.entrySet().stream().forEach(f -> {
+      String name = f.getValue();
+      int splitPoint = name.lastIndexOf(".");
+      String parentName = splitPoint > 0 ? name.substring(0, splitPoint) : "";
+      String rawName = splitPoint > 0 ? name.substring(splitPoint + 1) : name;
+      addChange.addColumns(parentName, rawName, 
evolvedInternalSchema.findType(name), null);
+    });
+
+    InternalSchema res = SchemaChangeUtils.applyTableChanges2Schema(oldSchema, 
addChange);
+    if (supportPositionReorder) {
+      evolvedInternalSchema.getRecord().fields().forEach(f -> 
newFields.add(oldSchema.getRecord().field(f.name())));
+      return new InternalSchema(newFields);
+    } else {
+      return res;
+    }
+  }
+
+  public static InternalSchema evolveSchemaFromNewAvroSchema(Schema 
evolvedSchema, InternalSchema oldSchema) {
+    return evolveSchemaFromNewAvroSchema(evolvedSchema, oldSchema, false);
+  }
+
+  /**
+   * canonical the nullability.

Review comment:
       canonical -> Canonicalize

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/SchemaChangeUtils.java
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.action.TableChanges;
+import org.apache.hudi.internal.schema.action.TableChangesHelper;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Helper methods for schema Change.
+ */
+public class SchemaChangeUtils {

Review comment:
       Can we move InternalSchemaChangeApplier to this package and merge the 
methods in this class to the new class. They both seem to be changing internal 
schema.

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/internal/schema/action/TestMergeSchema.java
##########
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.action;
+
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+
+import org.apache.hudi.internal.schema.utils.SchemaChangeUtils;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+public class TestMergeSchema {
+
+  @Test
+  public void testPrimitiveMerge() {
+    Types.RecordType record = Types.RecordType.get(Arrays.asList(new 
Types.Field[] {
+        Types.Field.get(0, "col1", Types.BooleanType.get()),
+        Types.Field.get(1, "col2", Types.IntType.get()),
+        Types.Field.get(2, "col3", Types.LongType.get()),
+        Types.Field.get(3, "col4", Types.FloatType.get())}));
+
+    InternalSchema oldSchema = new InternalSchema(record.fields());
+    // add c1 after 'col1', and c2 before 'col3'
+    TableChanges.ColumnAddChange addChange = 
TableChanges.ColumnAddChange.get(oldSchema);
+    addChange.addColumns("c1", Types.BooleanType.get(), "add c1 after col1");
+    addChange.addPositionChange("c1", "col1", "after");
+    addChange.addColumns("c2", Types.IntType.get(), "add c2 before col3");
+    addChange.addPositionChange("c2", "col3", "before");
+    InternalSchema newAddSchema = 
SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange);
+    TableChanges.ColumnDeleteChange deleteChange = 
TableChanges.ColumnDeleteChange.get(newAddSchema);
+    deleteChange.deleteColumn("col1");
+    deleteChange.deleteColumn("col3");
+    InternalSchema newDeleteSchema = 
SchemaChangeUtils.applyTableChanges2Schema(newAddSchema, deleteChange);
+
+    TableChanges.ColumnUpdateChange updateChange = 
TableChanges.ColumnUpdateChange.get(newDeleteSchema);
+    updateChange.updateColumnType("col2", Types.LongType.get())
+        .updateColumnComment("col2", "alter col2 comments")
+        .renameColumn("col2", "colx").addPositionChange("col2",
+        "col4", "after");
+    InternalSchema updateSchema = 
SchemaChangeUtils.applyTableChanges2Schema(newDeleteSchema, updateChange);
+
+    // add col1 again
+    TableChanges.ColumnAddChange addChange1 = 
TableChanges.ColumnAddChange.get(updateSchema);
+    addChange1.addColumns("col1", Types.BooleanType.get(), "add new col1");
+    InternalSchema finalSchema = 
SchemaChangeUtils.applyTableChanges2Schema(updateSchema, addChange1);
+    InternalSchema mergeSchema = new InternalSchemaMerger(oldSchema, 
finalSchema, true, false).mergeSchema();

Review comment:
       Can we assert on the state of mergeSchema ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/InternalSchemaUtils.java
##########
@@ -0,0 +1,856 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.internal.schema.utils;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.Types.Field;
+import org.apache.hudi.internal.schema.Types.RecordType;
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+import org.apache.hudi.internal.schema.visitor.InternalSchemaVisitor;
+import org.apache.hudi.internal.schema.visitor.NameToIDVisitor;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class InternalSchemaUtils {

Review comment:
       The class is still InternalSchemaUtils. Can you check again ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import 
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class InternalSchemaCache {
+  // use segment lock to reduce competition.
+  // the lock size should be powers of 2 for better hash.
+  private static Object[] lockList = new Object[16];
+
+  static {
+    for (int i = 0; i < lockList.length; i++) {
+      lockList[i] = new Object();
+    }
+  }
+
+  // historySchemas cache maintain a map about (tablePath, HistorySchemas).
+  // this is a Global cache, all threads in one container/executor share the 
same cache.
+  private static final Cache<String, TreeMap<Long, InternalSchema>>
+      HISTORICAL_SCHEMA_CACHE = 
Caffeine.newBuilder().maximumSize(1000).weakValues().build();
+
+  /**
+   * search internalSchema based on versionID.
+   * first step: try to get internalSchema from hoodie commit files, we no 
need to add lock.
+   * if we cannot get internalSchema by first step, then we try to get 
internalSchema from cache.
+   *
+   * @param versionID schema version_id need to search
+   * @param tablePath current hoodie table base path
+   * @param hadoopConf hadoopConf
+   * @return internalSchema
+   */
+  public static InternalSchema searchSchemaAndCache(long versionID, String 
tablePath, Configuration hadoopConf) {

Review comment:
       Are you referring to reloadActiveTimeline() call in 
    FileBasedInternalSchemaStorageManager.getValidInstants() ? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/RunCompactionActionExecutor.java
##########
@@ -70,6 +73,14 @@ public RunCompactionActionExecutor(HoodieEngineContext 
context,
       HoodieCompactionPlan compactionPlan =
           CompactionUtils.getCompactionPlan(table.getMetaClient(), 
instantTime);
 
+      // try to load internalSchema to support schema Evolution
+      Pair<Option<String>, Option<String>> schemaPair = 
TableInternalSchemaUtils
+          
.getInternalSchemaAndAvroSchemaForClusteringAndCompaction(table.getMetaClient(),
 instantTime);
+      if (schemaPair.getLeft().isPresent() && 
schemaPair.getRight().isPresent()) {
+        config.setInternalSchemaString(schemaPair.getLeft().get());

Review comment:
       I still see that we are not cloning the config. Is it fixed elsewhere?

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
##########
@@ -16,4 +16,5 @@
 # limitations under the License.
 
 
-org.apache.hudi.DefaultSource
\ No newline at end of file
+org.apache.hudi.DefaultSource
+org.apache.spark.sql.execution.datasources.parquet.SparkHoodieParquetFileFormat

Review comment:
       What is the reason for adding this class name ? 

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
##########
@@ -107,8 +113,21 @@ class DefaultSource extends RelationProvider
         case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
              (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
              (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) =>
-          new BaseFileOnlyRelation(sqlContext, metaClient, parameters, 
userSchema, globPaths)
-
+          val internalSchema = new 
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata
+          val sparkSchema = SchemaConverters.toSqlType(new 
TableSchemaResolver(metaClient).getTableAvroSchema).dataType.asInstanceOf[StructType]
+          val newParameters = parameters ++ 
Map(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA -> 
SerDeHelper.toJson(internalSchema.orElse(null)),
+            SparkInternalSchemaConverter.HOODIE_TABLE_PATH -> 
metaClient.getBasePath)
+          if (internalSchema.isPresent) {
+            // Use the HoodieFileIndex only if the 'path' is not globbed.
+            // Or else we use the original way to read hoodie table.

Review comment:
       Why does "*" need to be handled differently ? Can't we just rely on 
refresh table if new partitions got added ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/InternalSchemaCache.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.InternalSchema;
+import 
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.util.List;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+public class InternalSchemaCache {
+  // use segment lock to reduce competition.
+  // the lock size should be powers of 2 for better hash.
+  private static Object[] lockList = new Object[16];
+
+  static {
+    for (int i = 0; i < lockList.length; i++) {
+      lockList[i] = new Object();
+    }
+  }
+
+  // historySchemas cache maintain a map about (tablePath, HistorySchemas).
+  // this is a Global cache, all threads in one container/executor share the 
same cache.
+  private static final Cache<String, TreeMap<Long, InternalSchema>>
+      HISTORICAL_SCHEMA_CACHE = 
Caffeine.newBuilder().maximumSize(1000).weakValues().build();
+
+  /**
+   * search internalSchema based on versionID.
+   * first step: try to get internalSchema from hoodie commit files, we no 
need to add lock.
+   * if we cannot get internalSchema by first step, then we try to get 
internalSchema from cache.
+   *
+   * @param versionID schema version_id need to search
+   * @param tablePath current hoodie table base path
+   * @param hadoopConf hadoopConf
+   * @return internalSchema
+   */
+  public static InternalSchema searchSchemaAndCache(long versionID, String 
tablePath, Configuration hadoopConf) {

Review comment:
       The caller in AbstractHoodieLogRecordReader and IncrementalRelation can 
both be changed to pass HoodieTableMetaClient as they have instance of 
HoodieTableMetaClient created.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to