This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 192c3247fd57 feat(schema): Migrate log reader and partitioners to take 
HoodieSchema (#17548)
192c3247fd57 is described below

commit 192c3247fd5783fe58c2241d0fc18e9d3e6b2f03
Author: Tim Brown <[email protected]>
AuthorDate: Sat Dec 13 00:36:21 2025 -0500

    feat(schema): Migrate log reader and partitioners to take HoodieSchema 
(#17548)
---
 .../hudi/cli/commands/ArchivedCommitsCommand.java  |   5 +-
 .../apache/hudi/cli/commands/ExportCommand.java    |   3 +-
 .../hudi/cli/commands/HoodieLogFileCommand.java    |   4 +-
 .../cli/commands/TestHoodieLogFileCommand.java     |   2 +-
 .../utils/LegacyArchivedMetaEntryReader.java       |   3 +-
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |   4 +-
 .../MultipleSparkJobExecutionStrategy.java         |  10 +-
 .../SparkSingleFileSortExecutionStrategy.java      |   5 +-
 .../SparkSortAndSizeExecutionStrategy.java         |   4 +-
 .../RDDCustomColumnsSortPartitioner.java           |  13 +-
 .../bulkinsert/RDDSpatialCurveSortPartitioner.java |  15 ++-
 .../functional/TestHoodieBackedTableMetadata.java  |   2 +-
 .../table/log/AbstractHoodieLogRecordScanner.java  |   7 +-
 .../table/log/BaseHoodieLogRecordReader.java       |   4 +-
 .../table/log/HoodieCDCLogRecordIterator.java      |   6 +-
 .../hudi/common/table/log/HoodieLogFileReader.java |  21 ++--
 .../hudi/common/table/log/HoodieLogFormat.java     |   6 +-
 .../common/table/log/HoodieLogFormatReader.java    |   6 +-
 .../table/log/HoodieLogFormatReverseReader.java    | 135 ---------------------
 .../table/log/HoodieMergedLogRecordScanner.java    |  18 ++-
 .../table/log/HoodieUnMergedLogRecordScanner.java  |   9 +-
 .../table/log/block/HoodieAvroDataBlock.java       |  17 ++-
 .../versioning/v1/ArchivedTimelineLoaderV1.java    |   3 +-
 .../org/apache/hudi/common/util/SortUtils.java     |   7 +-
 .../hudi/table/format/cdc/CdcInputFormat.java      |   2 +-
 .../common/functional/TestHoodieLogFormat.java     | 130 ++++++++++----------
 .../table/timeline/TestArchivedTimelineV1.java     |   3 +-
 .../common/testutils/HoodieCommonTestHarness.java  |   9 +-
 .../hudi/metadata/TestHoodieTableMetadataUtil.java |   2 +-
 .../realtime/RealtimeCompactedRecordReader.java    |   2 +-
 .../realtime/RealtimeUnmergedRecordReader.java     |   3 +-
 .../reader/DFSHoodieDatasetInputReader.java        |   2 +-
 .../org/apache/hudi/cdc/CDCFileGroupIterator.scala |   2 +-
 .../procedures/ExportInstantsProcedure.scala       |   3 +-
 .../ShowHoodieLogFileMetadataProcedure.scala       |   3 +-
 .../ShowHoodieLogFileRecordsProcedure.scala        |   7 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |   2 +-
 .../TestBulkInsertInternalPartitioner.java         |   4 +-
 .../hudi/functional/TestHoodieBackedMetadata.java  |   4 +-
 .../java/org/apache/hudi/io/TestMergeHandle.java   |   2 +-
 .../hudi/table/TestHoodieMergeOnReadTable.java     |   4 +-
 .../hudi/functional/TestMORDataSourceStorage.scala |   2 +-
 .../hudi/functional/cdc/HoodieCDCTestBase.scala    |   2 +-
 .../sql/hudi/common/HoodieSparkSqlTestBase.scala   |   4 +-
 .../dml/others/TestPartialUpdateForMergeInto.scala |  11 +-
 .../utilities/HoodieMetadataTableValidator.java    |   2 +-
 46 files changed, 192 insertions(+), 322 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
index 3f309326f357..c451db21dfa9 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ArchivedCommitsCommand.java
@@ -29,6 +29,7 @@ import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
@@ -116,7 +117,7 @@ public class ArchivedCommitsCommand {
     for (StoragePathInfo pathInfo : pathInfoList) {
       // read the archived file
       try (Reader reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(pathInfo.getPath()),
-          HoodieArchivedMetaEntry.getClassSchema())) {
+          
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()))) {
         List<IndexedRecord> readRecords = new ArrayList<>();
         // read the avro blocks
         while (reader.hasNext()) {
@@ -190,7 +191,7 @@ public class ArchivedCommitsCommand {
     for (StoragePathInfo pathInfo : pathInfoList) {
       // read the archived file
       try (HoodieLogFormat.Reader reader = 
HoodieLogFormat.newReader(HoodieStorageUtils.getStorage(basePath, 
HoodieCLI.conf),
-          new HoodieLogFile(pathInfo.getPath()), 
HoodieArchivedMetaEntry.getClassSchema())) {
+          new HoodieLogFile(pathInfo.getPath()), 
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()))) {
         List<IndexedRecord> readRecords = new ArrayList<>();
         // read the avro blocks
         while (reader.hasNext()) {
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
index f9db200d99bb..8692efb8f6ba 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/ExportCommand.java
@@ -27,6 +27,7 @@ import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
@@ -130,7 +131,7 @@ public class ExportCommand {
 
     for (StoragePathInfo pathInfo : pathInfoList) {
       // read the archived file
-      try (Reader reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(pathInfo.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
+      try (Reader reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(pathInfo.getPath()), 
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()))) {
 
         // read the avro blocks
         while (reader.hasNext() && copyCount++ < limit) {
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index a469ce23d00f..ffafaed6ce60 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -116,7 +116,7 @@ public class HoodieLogFileCommand {
       } else {
         fileName = path.getName();
       }
-      Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, 
path);
+      HoodieSchema writerSchema = 
HoodieSchema.fromAvroSchema(TableSchemaResolver.readSchemaFromLogFile(storage, 
path));
       try (Reader reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(path), writerSchema)) {
 
         // read the avro blocks
@@ -265,7 +265,7 @@ public class HoodieLogFileCommand {
         Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(
             client.getStorage(), new StoragePath(logFile));
         try (HoodieLogFormat.Reader reader =
-                 HoodieLogFormat.newReader(storage, new HoodieLogFile(new 
StoragePath(logFile)), writerSchema)) {
+                 HoodieLogFormat.newReader(storage, new HoodieLogFile(new 
StoragePath(logFile)), HoodieSchema.fromAvroSchema(writerSchema))) {
           // read the avro blocks
           while (reader.hasNext()) {
             HoodieLogBlock n = reader.next();
diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
index fe4ea1c8ffe1..e30c0d5ccc84 100644
--- 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
+++ 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java
@@ -238,7 +238,7 @@ public class TestHoodieLogFileCommand extends 
CLIFunctionalTestHarness {
         .withStorage(storage)
         .withBasePath(tablePath)
         .withLogFilePaths(logFilePaths)
-        .withReaderSchema(schema.toAvroSchema())
+        .withReaderSchema(schema)
         .withLatestInstantTime(INSTANT_TIME)
         .withMaxMemorySizeInBytes(
             HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
index 6e81266ba93a..1b445fa734da 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/LegacyArchivedMetaEntryReader.java
@@ -23,6 +23,7 @@ import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
 import org.apache.hudi.client.timeline.ActiveActionWithDetails;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
@@ -268,7 +269,7 @@ public class LegacyArchivedMetaEntryReader {
             reader = HoodieLogFormat.newReader(
                 metaClient.getStorage(),
                 new HoodieLogFile(pathInfo.getPath()),
-                HoodieArchivedMetaEntry.getClassSchema());
+                
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()));
           } catch (IOException ioe) {
             throw new HoodieIOException(
                 "Error initializing the reader for archived log: " + 
pathInfo.getPath(), ioe);
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 6fdb5b3b4757..bc3a6867dbe3 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -902,7 +902,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       }
 
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
-          new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
+          new HoodieLogFile(pathInfoList.get(0).getPath()), 
HoodieSchema.fromAvroSchema(writerSchema))) {
         while (logFileReader.hasNext()) {
           HoodieLogBlock logBlock = logFileReader.next();
           if (logBlock instanceof HoodieDataBlock) {
@@ -2886,7 +2886,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       }
 
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
-          new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
+          new HoodieLogFile(pathInfoList.get(0).getPath()), 
HoodieSchema.fromAvroSchema(writerSchema))) {
         while (logFileReader.hasNext()) {
           HoodieLogBlock logBlock = logFileReader.next();
           if (logBlock instanceof HoodieDataBlock) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index d34dd06bbeb1..6d30eaf2b7ca 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -179,12 +179,12 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
                                                                           
final Map<String, String> extraMetadata);
 
   protected BulkInsertPartitioner<Dataset<Row>> getRowPartitioner(Map<String, 
String> strategyParams,
-                                                                  Schema 
schema) {
+                                                                  HoodieSchema 
schema) {
     return getPartitioner(strategyParams, schema, true);
   }
 
   protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> 
getRDDPartitioner(Map<String, String> strategyParams,
-                                                                              
Schema schema) {
+                                                                              
HoodieSchema schema) {
     return getPartitioner(strategyParams, schema, false);
   }
 
@@ -195,7 +195,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
    * @param schema         Schema of the data including metadata fields.
    */
   private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> 
strategyParams,
-                                                      Schema schema,
+                                                      HoodieSchema schema,
                                                       boolean 
isRowPartitioner) {
     Option<String[]> orderByColumnsOpt =
         Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
@@ -209,11 +209,11 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
           return isRowPartitioner
               ? new RowSpatialCurveSortPartitioner(getWriteConfig())
               : new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext) 
getEngineContext(), orderByColumns, layoutOptStrategy,
-              getWriteConfig().getLayoutOptimizationCurveBuildMethod(), 
HoodieAvroUtils.addMetadataFields(schema), recordType);
+              getWriteConfig().getLayoutOptimizationCurveBuildMethod(), 
HoodieSchemaUtils.addMetadataFields(schema), recordType);
         case LINEAR:
           return isRowPartitioner
               ? new RowCustomColumnsSortPartitioner(orderByColumns, 
getWriteConfig())
-              : new RDDCustomColumnsSortPartitioner(orderByColumns, 
HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());
+              : new RDDCustomColumnsSortPartitioner(orderByColumns, 
HoodieSchemaUtils.addMetadataFields(schema), getWriteConfig());
         default:
           throw new UnsupportedOperationException(String.format("Layout 
optimization strategy '%s' is not supported", layoutOptStrategy));
       }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
index b3f640c646ba..c3592d30beae 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
@@ -79,7 +79,7 @@ public class SparkSingleFileSortExecutionStrategy<T>
     // Since clustering will write to single file group using 
HoodieUnboundedCreateHandle, set max file size to a large value.
     newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, 
String.valueOf(Long.MAX_VALUE));
 
-    BulkInsertPartitioner<Dataset<Row>> partitioner = 
getRowPartitioner(strategyParams, schema.toAvroSchema());
+    BulkInsertPartitioner<Dataset<Row>> partitioner = 
getRowPartitioner(strategyParams, schema);
     Dataset<Row> repartitionedRecords = 
partitioner.repartitionRecords(inputRecords, numOutputGroups);
 
     return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, 
instantTime, getHoodieTable(), newConfig,
@@ -107,6 +107,7 @@ public class SparkSingleFileSortExecutionStrategy<T>
     newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, 
String.valueOf(Long.MAX_VALUE));
 
     return (HoodieData<WriteStatus>) 
SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, 
getHoodieTable(), newConfig,
-        false, getRDDPartitioner(strategyParams, schema), true, 
numOutputGroups, new 
SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), 
shouldPreserveHoodieMetadata));
+        false, getRDDPartitioner(strategyParams, 
HoodieSchema.fromAvroSchema(schema)), true, numOutputGroups,
+        new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), 
shouldPreserveHoodieMetadata));
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
index 6023c36f0f5c..000915daee4a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
@@ -71,7 +71,7 @@ public class SparkSortAndSizeExecutionStrategy<T>
 
     newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, 
String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
 
-    BulkInsertPartitioner<Dataset<Row>> partitioner = 
getRowPartitioner(strategyParams, schema.toAvroSchema());
+    BulkInsertPartitioner<Dataset<Row>> partitioner = 
getRowPartitioner(strategyParams, schema);
     Dataset<Row> repartitionedRecords = 
partitioner.repartitionRecords(inputRecords, numOutputGroups);
 
     return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, 
instantTime, getHoodieTable(), newConfig,
@@ -96,6 +96,6 @@ public class SparkSortAndSizeExecutionStrategy<T>
     newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, 
String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
 
     return (HoodieData<WriteStatus>) 
SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, 
getHoodieTable(),
-        newConfig, false, getRDDPartitioner(strategyParams, schema), true, 
numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata));
+        newConfig, false, getRDDPartitioner(strategyParams, 
HoodieSchema.fromAvroSchema(schema)), true, numOutputGroups, new 
CreateHandleFactory(shouldPreserveHoodieMetadata));
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
index 4e1b7ff774d4..bb0ba6ff3770 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
@@ -19,13 +19,12 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.SparkAdapterSupport$;
-import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.SortUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
-import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.HoodieUTF8StringFactory;
 
@@ -43,22 +42,22 @@ public class RDDCustomColumnsSortPartitioner<T>
     implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
 
   private final String[] sortColumnNames;
-  private final SerializableSchema serializableSchema;
+  private final HoodieSchema schema;
   private final boolean consistentLogicalTimestampEnabled;
   private final boolean suffixRecordKey;
   private final HoodieUTF8StringFactory utf8StringFactory =
       SparkAdapterSupport$.MODULE$.sparkAdapter().getUTF8StringFactory();
 
   public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) {
-    this.serializableSchema = new SerializableSchema(new 
Schema.Parser().parse(config.getSchema()));
+    this.schema = HoodieSchema.parse(config.getSchema());
     this.sortColumnNames = getSortColumnName(config);
     this.consistentLogicalTimestampEnabled = 
config.isConsistentLogicalTimestampEnabled();
     this.suffixRecordKey = 
config.getBoolean(BULKINSERT_SUFFIX_RECORD_KEY_SORT_COLUMNS);
   }
 
-  public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema, 
HoodieWriteConfig config) {
+  public RDDCustomColumnsSortPartitioner(String[] columnNames, HoodieSchema 
schema, HoodieWriteConfig config) {
     this.sortColumnNames = columnNames;
-    this.serializableSchema = new SerializableSchema(schema);
+    this.schema = schema;
     this.consistentLogicalTimestampEnabled = 
config.isConsistentLogicalTimestampEnabled();
     this.suffixRecordKey = 
config.getBoolean(BULKINSERT_SUFFIX_RECORD_KEY_SORT_COLUMNS);
   }
@@ -67,7 +66,7 @@ public class RDDCustomColumnsSortPartitioner<T>
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records,
                                                      int 
outputSparkPartitions) {
     return records
-        .sortBy(record -> SortUtils.getComparableSortColumns(record, 
sortColumnNames, serializableSchema.get(), suffixRecordKey, 
consistentLogicalTimestampEnabled,
+        .sortBy(record -> SortUtils.getComparableSortColumns(record, 
sortColumnNames, schema, suffixRecordKey, consistentLogicalTimestampEnabled,
             utf8StringFactory::wrapArrayOfObjects), true, 
outputSparkPartitions);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
index 9840f6ee52dd..327eb8919529 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
@@ -22,17 +22,16 @@ import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.SparkConversionUtils;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieSparkRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieClusteringConfig;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
@@ -51,18 +50,18 @@ public class RDDSpatialCurveSortPartitioner<T>
     extends SpatialCurveSortPartitionerBase<JavaRDD<HoodieRecord<T>>> {
 
   private final transient HoodieSparkEngineContext sparkEngineContext;
-  private final SerializableSchema schema;
+  private final HoodieSchema schema;
   private final HoodieRecordType recordType;
 
   public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext 
sparkEngineContext,
                                         String[] orderByColumns,
                                         
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
                                         
HoodieClusteringConfig.SpatialCurveCompositionStrategyType 
curveCompositionStrategyType,
-                                        Schema schema,
+                                        HoodieSchema schema,
                                         HoodieRecordType recordType) {
     super(orderByColumns, layoutOptStrategy, curveCompositionStrategyType);
     this.sparkEngineContext = sparkEngineContext;
-    this.schema = new SerializableSchema(schema);
+    this.schema = schema;
     this.recordType = recordType;
   }
 
@@ -70,7 +69,7 @@ public class RDDSpatialCurveSortPartitioner<T>
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records, int outputSparkPartitions) {
     if (recordType == HoodieRecordType.AVRO) {
       JavaRDD<GenericRecord> genericRecordsRDD =
-          records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new 
Properties()).get().getData());
+          records.map(f -> (GenericRecord) 
f.toIndexedRecord(schema.toAvroSchema(), new Properties()).get().getData());
 
       Dataset<Row> sourceDataset =
           AvroConversionUtils.createDataFrame(
@@ -80,7 +79,7 @@ public class RDDSpatialCurveSortPartitioner<T>
           );
       Dataset<Row> sortedDataset = reorder(sourceDataset, 
outputSparkPartitions);
 
-      return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), 
schema.get().getNamespace(), false, Option.empty())
+      return HoodieSparkUtils.createRdd(sortedDataset, schema.getName(), 
schema.getNamespace().orElse(null), false, Option.empty())
           .toJavaRDD()
           .map(record -> {
             String key = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
@@ -90,7 +89,7 @@ public class RDDSpatialCurveSortPartitioner<T>
             return hoodieRecord;
           });
     } else if (recordType == HoodieRecordType.SPARK) {
-      StructType structType = 
HoodieInternalRowUtils.getCachedSchema(schema.get());
+      StructType structType = 
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema());
       Dataset<Row> sourceDataset = 
SparkConversionUtils.createDataFrame(records.rdd(),
           sparkEngineContext.getSqlContext().sparkSession(), structType);
       Dataset<Row> sortedDataset = reorder(sourceDataset, 
outputSparkPartitions);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 8ae72764c122..a35a7c3ce456 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -530,7 +530,7 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
       }
 
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
-          new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
+          new HoodieLogFile(pathInfoList.get(0).getPath()), 
HoodieSchema.fromAvroSchema(writerSchema))) {
         while (logFileReader.hasNext()) {
           HoodieLogBlock logBlock = logFileReader.next();
           if (logBlock instanceof HoodieDataBlock) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 33e723548031..2a44a3e02a66 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodiePayloadProps;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -92,7 +93,7 @@ public abstract class AbstractHoodieLogRecordScanner {
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractHoodieLogRecordScanner.class);
 
   // Reader schema for the records
-  protected final Schema readerSchema;
+  protected final HoodieSchema readerSchema;
   // Latest valid instant time
   // Log-Blocks belonging to inflight delta-instants are filtered-out using 
this high-watermark.
   private final String latestInstantTime;
@@ -155,7 +156,7 @@ public abstract class AbstractHoodieLogRecordScanner {
   private HoodieTimeline inflightInstantsTimeline = null;
 
   protected AbstractHoodieLogRecordScanner(HoodieStorage storage, String 
basePath, List<String> logFilePaths,
-                                           Schema readerSchema, String 
latestInstantTime,
+                                           HoodieSchema readerSchema, String 
latestInstantTime,
                                            boolean reverseReader, int 
bufferSize, Option<InstantRange> instantRange,
                                            boolean withOperationField, boolean 
forceFullScan,
                                            Option<String> 
partitionNameOverride,
@@ -867,7 +868,7 @@ public abstract class AbstractHoodieLogRecordScanner {
 
     public abstract Builder withLogFilePaths(List<String> logFilePaths);
 
-    public abstract Builder withReaderSchema(Schema schema);
+    public abstract Builder withReaderSchema(HoodieSchema schema);
 
     public abstract Builder withInternalSchema(InternalSchema internalSchema);
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index 33c0f7f340e0..3b721c481a00 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -215,7 +215,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
     try {
       // Iterate over the paths
       logFormatReaderWrapper = new HoodieLogFormatReader(storage, logFiles,
-          readerSchema.toAvroSchema(), reverseReader, bufferSize, 
shouldLookupRecords(), recordKeyField, internalSchema);
+          readerSchema, reverseReader, bufferSize, shouldLookupRecords(), 
recordKeyField, internalSchema);
 
       Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
       while (logFormatReaderWrapper.hasNext()) {
@@ -377,7 +377,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
     try {
       // Iterate over the paths
       logFormatReaderWrapper = new HoodieLogFormatReader(storage, logFiles,
-          readerSchema == null ? null : readerSchema.toAvroSchema(), 
reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, 
internalSchema);
+          readerSchema, reverseReader, bufferSize, shouldLookupRecords(), 
recordKeyField, internalSchema);
 
       /**
        * Scanning log blocks and placing the compacted blocks at the right 
place require two traversals.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
index 4d2417f9851e..b1ccb6019465 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieCDCLogRecordIterator.java
@@ -21,13 +21,13 @@ package org.apache.hudi.common.table.log;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.HoodieStorage;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 
 import java.io.IOException;
@@ -41,7 +41,7 @@ public class HoodieCDCLogRecordIterator implements 
ClosableIterator<IndexedRecor
 
   private final HoodieStorage storage;
 
-  private final Schema cdcSchema;
+  private final HoodieSchema cdcSchema;
 
   private final Iterator<HoodieLogFile> cdcLogFileIter;
 
@@ -51,7 +51,7 @@ public class HoodieCDCLogRecordIterator implements 
ClosableIterator<IndexedRecor
 
   private IndexedRecord record;
 
-  public HoodieCDCLogRecordIterator(HoodieStorage storage, HoodieLogFile[] 
cdcLogFiles, Schema cdcSchema) {
+  public HoodieCDCLogRecordIterator(HoodieStorage storage, HoodieLogFile[] 
cdcLogFiles, HoodieSchema cdcSchema) {
     this.storage = storage;
     this.cdcSchema = cdcSchema;
     this.cdcLogFileIter = Arrays.stream(cdcLogFiles).iterator();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 0172731693bc..2c1d91e7b7c1 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -44,7 +44,6 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StorageSchemes;
 
-import org.apache.avro.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +74,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   private final HoodieLogFile logFile;
   private final int bufferSize;
   private final byte[] magicBuffer = new byte[6];
-  private final Schema readerSchema;
+  private final HoodieSchema readerSchema;
   private final InternalSchema internalSchema;
   private final String keyField;
   private long reverseLogFilePosition;
@@ -85,21 +84,21 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   private boolean closed = false;
   private final SeekableDataInputStream inputStream;
 
-  public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, 
Schema readerSchema, int bufferSize) throws IOException {
+  public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, 
HoodieSchema readerSchema, int bufferSize) throws IOException {
     this(storage, logFile, readerSchema, bufferSize, false);
   }
 
-  public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, 
Schema readerSchema, int bufferSize,
+  public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, 
HoodieSchema readerSchema, int bufferSize,
                              boolean reverseReader) throws IOException {
     this(storage, logFile, readerSchema, bufferSize, reverseReader, false, 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
   }
 
-  public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, 
Schema readerSchema, int bufferSize, boolean reverseReader,
+  public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, 
HoodieSchema readerSchema, int bufferSize, boolean reverseReader,
                              boolean enableRecordLookups, String keyField) 
throws IOException {
     this(storage, logFile, readerSchema, bufferSize, reverseReader, 
enableRecordLookups, keyField, InternalSchema.getEmptyInternalSchema());
   }
 
-  public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, 
Schema readerSchema, int bufferSize, boolean reverseReader,
+  public HoodieLogFileReader(HoodieStorage storage, HoodieLogFile logFile, 
HoodieSchema readerSchema, int bufferSize, boolean reverseReader,
                              boolean enableRecordLookups, String keyField, 
InternalSchema internalSchema) throws IOException {
     this.storage = storage;
     // NOTE: We repackage {@code HoodieLogFile} here to make sure that the 
provided path
@@ -190,7 +189,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
           return HoodieAvroDataBlock.getBlock(content.get(), readerSchema, 
internalSchema);
         } else {
           return new HoodieAvroDataBlock(() -> getDataInputStream(storage, 
this.logFile, bufferSize), content, true, logBlockContentLoc,
-              getTargetReaderSchemaForBlock().isEmpty() ? Option.empty() : 
Option.of(HoodieSchema.fromAvroSchema(getTargetReaderSchemaForBlock().get())), 
header, footer, keyField);
+              getTargetReaderSchemaForBlock(), header, footer, keyField);
         }
 
       case HFILE_DATA_BLOCK:
@@ -198,14 +197,14 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
             String.format("HFile block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
         return new HoodieHFileDataBlock(
             () -> getDataInputStream(storage, this.logFile, bufferSize), 
content, true, logBlockContentLoc,
-            Option.ofNullable(HoodieSchema.fromAvroSchema(readerSchema)), 
header, footer, enableRecordLookups, logFile.getPath());
+            Option.ofNullable(readerSchema), header, footer, 
enableRecordLookups, logFile.getPath());
 
       case PARQUET_DATA_BLOCK:
         checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
             String.format("Parquet block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
 
         return new HoodieParquetDataBlock(() -> getDataInputStream(storage, 
this.logFile, bufferSize), content, true, logBlockContentLoc,
-            getTargetReaderSchemaForBlock().isEmpty() ? Option.empty() : 
Option.of(HoodieSchema.fromAvroSchema(getTargetReaderSchemaForBlock().get())), 
header, footer, keyField);
+            getTargetReaderSchemaForBlock(), header, footer, keyField);
 
       case DELETE_BLOCK:
         return new HoodieDeleteBlock(content, () -> 
getDataInputStream(storage, this.logFile, bufferSize), true, 
Option.of(logBlockContentLoc), header, footer);
@@ -214,14 +213,14 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
         return new HoodieCommandBlock(content, () -> 
getDataInputStream(storage, this.logFile, bufferSize), true, 
Option.of(logBlockContentLoc), header, footer);
 
       case CDC_DATA_BLOCK:
-        return new HoodieCDCDataBlock(() -> getDataInputStream(storage, 
this.logFile, bufferSize), content, true, logBlockContentLoc, 
HoodieSchema.fromAvroSchema(readerSchema), header, keyField);
+        return new HoodieCDCDataBlock(() -> getDataInputStream(storage, 
this.logFile, bufferSize), content, true, logBlockContentLoc, readerSchema, 
header, keyField);
 
       default:
         throw new HoodieNotSupportedException("Unsupported Block " + 
blockType);
     }
   }
 
-  private Option<Schema> getTargetReaderSchemaForBlock() {
+  private Option<HoodieSchema> getTargetReaderSchemaForBlock() {
     // we should use write schema to read log file,
     // since when we have done some DDL operation, the readerSchema maybe 
different from writeSchema, avro reader will throw exception.
     // eg: origin writeSchema is: "a String, b double" then we add a new 
column now the readerSchema will be: "a string, c int, b double". it's wrong to 
use readerSchema to read old log file.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
index edc5cd845d91..bfe17c7c5bb5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.util.Option;
@@ -28,7 +29,6 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.avro.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -297,12 +297,12 @@ public interface HoodieLogFormat {
     return new WriterBuilder();
   }
 
-  static HoodieLogFormat.Reader newReader(HoodieStorage storage, HoodieLogFile 
logFile, Schema readerSchema)
+  static HoodieLogFormat.Reader newReader(HoodieStorage storage, HoodieLogFile 
logFile, HoodieSchema readerSchema)
       throws IOException {
     return new HoodieLogFileReader(storage, logFile, readerSchema, 
HoodieLogFileReader.DEFAULT_BUFFER_SIZE);
   }
 
-  static HoodieLogFormat.Reader newReader(HoodieStorage storage, HoodieLogFile 
logFile, Schema readerSchema, boolean reverseReader) throws IOException {
+  static HoodieLogFormat.Reader newReader(HoodieStorage storage, HoodieLogFile 
logFile, HoodieSchema readerSchema, boolean reverseReader) throws IOException {
     return new HoodieLogFileReader(storage, logFile, readerSchema, 
HoodieLogFileReader.DEFAULT_BUFFER_SIZE, reverseReader);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index 0d3c99a732d5..77c3e78fcc32 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -19,12 +19,12 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.storage.HoodieStorage;
 
-import org.apache.avro.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,7 +39,7 @@ public class HoodieLogFormatReader implements 
HoodieLogFormat.Reader {
   private final List<HoodieLogFile> logFiles;
   private HoodieLogFileReader currentReader;
   private final HoodieStorage storage;
-  private final Schema readerSchema;
+  private final HoodieSchema readerSchema;
   private final InternalSchema internalSchema;
   private final String recordKeyField;
   private final boolean enableInlineReading;
@@ -47,7 +47,7 @@ public class HoodieLogFormatReader implements 
HoodieLogFormat.Reader {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLogFormatReader.class);
 
-  HoodieLogFormatReader(HoodieStorage storage, List<HoodieLogFile> logFiles, 
Schema readerSchema,
+  HoodieLogFormatReader(HoodieStorage storage, List<HoodieLogFile> logFiles, 
HoodieSchema readerSchema,
                         boolean reverseLogReader, int bufferSize, boolean 
enableRecordLookups,
                         String recordKeyField, InternalSchema internalSchema) 
throws IOException {
     this.logFiles = logFiles;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReverseReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReverseReader.java
deleted file mode 100644
index b746eabd7324..000000000000
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReverseReader.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.common.table.log;
-
-import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.storage.HoodieStorage;
-
-import org.apache.avro.Schema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * A log format reader by reading log files and blocks in reserve order.
- * This reader assumes that each log file only has one log block.
- */
-public class HoodieLogFormatReverseReader implements HoodieLogFormat.Reader {
-  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieLogFormatReader.class);
-  private final List<HoodieLogFile> logFiles;
-  // Readers for previously scanned log-files that are still open
-  private final List<HoodieLogFileReader> prevReadersInOpenState;
-  private HoodieLogFileReader currentReader;
-  private final HoodieStorage storage;
-  private final Schema readerSchema;
-  private InternalSchema internalSchema = 
InternalSchema.getEmptyInternalSchema();
-  private final boolean reverseLogReader;
-  private final String recordKeyField;
-  private final boolean enableInlineReading;
-  private final int bufferSize;
-  private int logFilePos = -1;
-
-  HoodieLogFormatReverseReader(HoodieStorage storage, List<HoodieLogFile> 
logFiles, Schema readerSchema,
-                               boolean reverseLogReader, int bufferSize, 
boolean enableRecordLookups,
-                               String recordKeyField, InternalSchema 
internalSchema) throws IOException {
-    this.logFiles = logFiles;
-    this.storage = storage;
-    this.readerSchema = readerSchema;
-    this.reverseLogReader = reverseLogReader;
-    this.bufferSize = bufferSize;
-    this.prevReadersInOpenState = new ArrayList<>();
-    this.recordKeyField = recordKeyField;
-    this.enableInlineReading = enableRecordLookups;
-    this.internalSchema =
-        internalSchema == null ? InternalSchema.getEmptyInternalSchema() : 
internalSchema;
-    logFilePos = logFiles.size() - 1;
-    if (logFilePos >= 0) {
-      HoodieLogFile nextLogFile = logFiles.get(logFilePos);
-      logFilePos--;
-      this.currentReader = new HoodieLogFileReader(storage, nextLogFile, 
readerSchema, bufferSize, false,
-          enableRecordLookups, recordKeyField, internalSchema);
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    for (HoodieLogFileReader reader : prevReadersInOpenState) {
-      reader.close();
-    }
-
-    prevReadersInOpenState.clear();
-
-    if (currentReader != null) {
-      currentReader.close();
-    }
-  }
-
-  @Override
-  public boolean hasNext() {
-    if (currentReader == null) {
-      return false;
-    } else if (currentReader.hasNext()) {
-      return true;
-    } else if (logFilePos >= 0) {
-      try {
-        HoodieLogFile nextLogFile = logFiles.get(logFilePos);
-        logFilePos--;
-        this.prevReadersInOpenState.add(currentReader);
-        this.currentReader = new HoodieLogFileReader(storage, nextLogFile, 
readerSchema, bufferSize, false,
-            enableInlineReading, recordKeyField, internalSchema);
-      } catch (IOException io) {
-        throw new HoodieIOException("unable to initialize read with log file 
", io);
-      }
-      LOG.info("Moving to the next reader for logfile {}", 
currentReader.getLogFile());
-      return hasNext();
-    }
-    return false;
-  }
-
-  @Override
-  public HoodieLogBlock next() {
-    return currentReader.next();
-  }
-
-  @Override
-  public HoodieLogFile getLogFile() {
-    return currentReader.getLogFile();
-  }
-
-  @Override
-  public void remove() {
-  }
-
-  @Override
-  public boolean hasPrev() {
-    return this.currentReader.hasPrev();
-  }
-
-  @Override
-  public HoodieLogBlock prev() throws IOException {
-    return this.currentReader.prev();
-  }
-}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index ef937241135a..6460ee0fa86f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -47,7 +47,6 @@ import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.avro.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,7 +100,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
   private final DeleteContext deleteContext;
 
   @SuppressWarnings("unchecked")
-  protected HoodieMergedLogRecordScanner(HoodieStorage storage, String 
basePath, List<String> logFilePaths, Schema readerSchema,
+  protected HoodieMergedLogRecordScanner(HoodieStorage storage, String 
basePath, List<String> logFilePaths, HoodieSchema readerSchema,
                                          String latestInstantTime, Long 
maxMemorySizeInBytes,
                                          boolean reverseReader, int 
bufferSize, String spillableMapBasePath,
                                          Option<InstantRange> instantRange,
@@ -121,13 +120,12 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
       this.maxMemorySizeInBytes = maxMemorySizeInBytes;
       // Store merged records for all versions for this log file, set the 
in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, 
spillableMapBasePath, new DefaultSizeEstimator(),
-          new HoodieRecordSizeEstimator(readerSchema), diskMapType, new 
DefaultSerializer<>(), isBitCaskDiskMapCompressionEnabled, 
getClass().getSimpleName());
+          new HoodieRecordSizeEstimator(readerSchema.toAvroSchema()), 
diskMapType, new DefaultSerializer<>(), isBitCaskDiskMapCompressionEnabled, 
getClass().getSimpleName());
       this.scannedPrefixes = new HashSet<>();
       this.allowInflightInstants = allowInflightInstants;
       this.orderingFields = 
ConfigUtils.getOrderingFields(this.hoodieTableMetaClient.getTableConfig().getProps());
       TypedProperties mergeProps = 
ConfigUtils.getMergeProps(getPayloadProps(), 
this.hoodieTableMetaClient.getTableConfig());
-      HoodieSchema readerHoodieSchema = 
HoodieSchema.fromAvroSchema(readerSchema);
-      this.deleteContext = new DeleteContext(mergeProps, 
readerHoodieSchema).withReaderSchema(readerHoodieSchema);
+      this.deleteContext = new DeleteContext(mergeProps, 
readerSchema).withReaderSchema(readerSchema);
     } catch (IOException e) {
       throw new HoodieIOException("IOException when creating 
ExternalSpillableMap at " + spillableMapBasePath, e);
     }
@@ -260,9 +258,9 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
     if (prevRecord != null) {
       // Merge and store the combined record
       RecordContext recordContext = 
AvroRecordContext.getFieldAccessorInstance();
-      BufferedRecord<T> prevBufferedRecord = 
BufferedRecords.fromHoodieRecord(prevRecord, 
HoodieSchema.fromAvroSchema(readerSchema),
+      BufferedRecord<T> prevBufferedRecord = 
BufferedRecords.fromHoodieRecord(prevRecord, readerSchema,
           recordContext, this.getPayloadProps(), orderingFields, 
deleteContext);
-      BufferedRecord<T> newBufferedRecord = 
BufferedRecords.fromHoodieRecord(newRecord, 
HoodieSchema.fromAvroSchema(readerSchema),
+      BufferedRecord<T> newBufferedRecord = 
BufferedRecords.fromHoodieRecord(newRecord, readerSchema,
           recordContext, this.getPayloadProps(), orderingFields, 
deleteContext);
       BufferedRecord<T> combinedRecord = 
recordMerger.merge(prevBufferedRecord, newBufferedRecord, recordContext, 
this.getPayloadProps());
       // If pre-combine returns existing record, no need to update it
@@ -293,7 +291,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
       // should be deleted or be kept. The old record is kept only if the 
DELETE record has smaller ordering val.
       // For same ordering values, uses the natural order(arrival time 
semantics).
 
-      Comparable curOrderingVal = 
oldRecord.getOrderingValue(this.readerSchema, 
this.hoodieTableMetaClient.getTableConfig().getProps(), orderingFields);
+      Comparable curOrderingVal = 
oldRecord.getOrderingValue(this.readerSchema.toAvroSchema(), 
this.hoodieTableMetaClient.getTableConfig().getProps(), orderingFields);
       Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
       // Checks the ordering value does not equal to 0
       // because we use 0 as the default value which means natural order
@@ -343,7 +341,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
     private HoodieStorage storage;
     private String basePath;
     private List<String> logFilePaths;
-    private Schema readerSchema;
+    private HoodieSchema readerSchema;
     private InternalSchema internalSchema = 
InternalSchema.getEmptyInternalSchema();
     private String latestInstantTime;
     private boolean reverseReader;
@@ -393,7 +391,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
     }
 
     @Override
-    public Builder withReaderSchema(Schema schema) {
+    public Builder withReaderSchema(HoodieSchema schema) {
       this.readerSchema = schema;
       return this;
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 7159cfda5799..941d11ab94b2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -32,8 +33,6 @@ import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.avro.Schema;
-
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -45,7 +44,7 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
   private final LogRecordScannerCallback callback;
   private final RecordDeletionCallback recordDeletionCallback;
 
-  private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String 
basePath, List<String> logFilePaths, Schema readerSchema,
+  private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String 
basePath, List<String> logFilePaths, HoodieSchema readerSchema,
                                          String latestInstantTime, boolean 
reverseReader, int bufferSize,
                                          LogRecordScannerCallback callback, 
RecordDeletionCallback recordDeletionCallback,
                                          Option<InstantRange> instantRange, 
InternalSchema internalSchema,
@@ -118,7 +117,7 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
     private HoodieStorage storage;
     private String basePath;
     private List<String> logFilePaths;
-    private Schema readerSchema;
+    private HoodieSchema readerSchema;
     private InternalSchema internalSchema;
     private String latestInstantTime;
     private boolean reverseReader;
@@ -154,7 +153,7 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
       return this;
     }
 
-    public Builder withReaderSchema(Schema schema) {
+    public Builder withReaderSchema(HoodieSchema schema) {
       this.readerSchema = schema;
       return this;
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 6efc9b60c8d0..26b8cbde6c79 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -412,11 +412,11 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
    * HoodieLogFormat V1.
    */
   @Deprecated
-  public HoodieAvroDataBlock(List<HoodieRecord> records, Schema schema) {
+  public HoodieAvroDataBlock(List<HoodieRecord> records, HoodieSchema schema) {
     super(records, Collections.singletonMap(HeaderMetadataType.SCHEMA, 
schema.toString()), new HashMap<>(), HoodieRecord.RECORD_KEY_METADATA_FIELD);
   }
 
-  public static HoodieAvroDataBlock getBlock(byte[] content, Schema 
readerSchema) throws IOException {
+  public static HoodieAvroDataBlock getBlock(byte[] content, HoodieSchema 
readerSchema) throws IOException {
     return getBlock(content, readerSchema, 
InternalSchema.getEmptyInternalSchema());
   }
 
@@ -425,7 +425,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
    * HoodieLogFormat V1.
    */
   @Deprecated
-  public static HoodieAvroDataBlock getBlock(byte[] content, Schema 
readerSchema, InternalSchema internalSchema) throws IOException {
+  public static HoodieAvroDataBlock getBlock(byte[] content, HoodieSchema 
readerSchema, InternalSchema internalSchema) throws IOException {
 
     SizeAwareDataInputStream dis = new SizeAwareDataInputStream(new 
DataInputStream(new ByteArrayInputStream(content)));
 
@@ -435,15 +435,12 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
     dis.readFully(compressedSchema, 0, schemaLength);
     Schema writerSchema = new 
Schema.Parser().parse(decompress(compressedSchema));
 
-    if (readerSchema == null) {
-      readerSchema = writerSchema;
+    if (readerSchema == null || !internalSchema.isEmptySchema()) {
+      readerSchema = HoodieSchema.fromAvroSchema(writerSchema);
     }
 
-    if (!internalSchema.isEmptySchema()) {
-      readerSchema = writerSchema;
-    }
-
-    GenericDatumReader<IndexedRecord> reader = new 
GenericDatumReader<>(writerSchema, readerSchema);
+    Schema readerAvroSchema = readerSchema.toAvroSchema();
+    GenericDatumReader<IndexedRecord> reader = new 
GenericDatumReader<>(writerSchema, readerAvroSchema);
     // 2. Get the total records
     int totalRecords = dis.readInt();
     List<HoodieRecord> records = new ArrayList<>(totalRecords);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineLoaderV1.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineLoaderV1.java
index 5e109ba64801..5f97ac4f3f5e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineLoaderV1.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineLoaderV1.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
 import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
@@ -115,7 +116,7 @@ public class ArchivedTimelineLoaderV1 implements 
ArchivedTimelineLoader {
         }
         // Read the archived file
         try (HoodieLogFormat.Reader reader = 
HoodieLogFormat.newReader(metaClient.getStorage(),
-            new HoodieLogFile(fs.getPath()), 
HoodieArchivedMetaEntry.getClassSchema())) {
+            new HoodieLogFile(fs.getPath()), 
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()))) {
           int instantsInPreviousFile = instantsInRange.size();
           // Read the avro blocks
           while (reader.hasNext() && (!hasLimit || loadedCount.get() < 
limit.get())) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
index 159abe61c2ae..c043f7a2fef0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.util;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.collection.FlatLists;
 
 import org.apache.avro.Schema;
@@ -90,13 +91,13 @@ public class SortUtils {
   public static FlatLists.ComparableList<Comparable<HoodieRecord>> 
getComparableSortColumns(
       HoodieRecord record,
       String[] sortColumnNames,
-      Schema schema,
+      HoodieSchema schema,
       boolean suffixRecordKey,
       boolean consistentLogicalTimestampEnabled,
       Function<Object[], Object[]> wrapUTF8StringFunc
   ) {
     if (record.getRecordType() == HoodieRecord.HoodieRecordType.SPARK) {
-      Object[] columnValues = record.getColumnValues(schema, sortColumnNames, 
consistentLogicalTimestampEnabled);
+      Object[] columnValues = record.getColumnValues(schema.toAvroSchema(), 
sortColumnNames, consistentLogicalTimestampEnabled);
       if (suffixRecordKey) {
         return FlatLists.ofComparableArray(wrapUTF8StringFunc.apply(
             prependPartitionPathAndSuffixRecordKey(record.getPartitionPath(), 
record.getRecordKey(), columnValues)));
@@ -105,7 +106,7 @@ public class SortUtils {
     } else if (record.getRecordType() == HoodieRecord.HoodieRecordType.AVRO) {
       return FlatLists.ofComparableArray(wrapUTF8StringFunc.apply(
           HoodieAvroUtils.getSortColumnValuesWithPartitionPathAndRecordKey(
-              record, sortColumnNames, schema, suffixRecordKey, 
consistentLogicalTimestampEnabled
+              record, sortColumnNames, schema.toAvroSchema(), suffixRecordKey, 
consistentLogicalTimestampEnabled
           )));
     }
     throw new IllegalArgumentException("Invalid recordType" + 
record.getRecordType());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index 17ea108514a4..4070008d8d51 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -509,7 +509,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
           throw new HoodieIOException("Fail to call getFileStatus", e);
         }
       }).toArray(HoodieLogFile[]::new);
-      this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles, 
cdcSchema.toAvroSchema());
+      this.cdcItr = new HoodieCDCLogRecordIterator(storage, cdcLogFiles, 
cdcSchema);
     }
 
     private int[] getRequiredPos(String tableSchema, HoodieSchema required) {
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 34b0a3ec4f2b..7f71303c629e 100755
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.SerializableIndexedRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -72,7 +73,6 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
@@ -447,7 +447,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(dataBlock);
     writer.close();
 
-    Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema().toAvroSchema());
+    Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema());
     assertTrue(reader.hasNext(), "We wrote a block, we should be able to read 
it");
     HoodieLogBlock nextBlock = reader.next();
     assertEquals(DEFAULT_DATA_BLOCK_TYPE, nextBlock.getBlockType(), "The next 
block should be a data block");
@@ -494,7 +494,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     }
     writer.close();
 
-    Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema().toAvroSchema(), true);
+    Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema(), true);
     assertTrue(reader.hasNext(), "We wrote a block, we should be able to read 
it");
     HoodieLogBlock nextBlock = reader.next();
     assertEquals(DEFAULT_DATA_BLOCK_TYPE, nextBlock.getBlockType(), "The next 
block should be a data block");
@@ -572,7 +572,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.close();
 
     Reader reader =
-        HoodieLogFormat.newReader(storage, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema().toAvroSchema());
+        HoodieLogFormat.newReader(storage, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema());
     assertTrue(reader.hasNext(), "First block should be available");
     HoodieLogBlock nextBlock = reader.next();
     HoodieDataBlock dataBlockRead = (HoodieDataBlock) nextBlock;
@@ -655,7 +655,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(dataBlock);
     writer.close();
 
-    Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(), 
cdcSchema.toAvroSchema());
+    Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(), 
cdcSchema);
     assertTrue(reader.hasNext());
     HoodieLogBlock block = reader.next();
     HoodieDataBlock dataBlockRead = (HoodieDataBlock) block;
@@ -715,7 +715,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
                                                    boolean preTableVersion8)
       throws IOException, URISyntaxException, InterruptedException {
     // Generate 4 delta-log files w/ random records
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 
400);
 
@@ -749,7 +749,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
 
     List<IndexedRecord> scannedRecords = new ArrayList<>();
     for (HoodieRecord record : scanner) {
-      Object data = record.toIndexedRecord(schema, 
CollectionUtils.emptyProps()).get().getData();
+      Object data = record.toIndexedRecord(schema.toAvroSchema(), 
CollectionUtils.emptyProps()).get().getData();
       if (data instanceof SerializableIndexedRecord) {
         scannedRecords.add(((SerializableIndexedRecord) data).getData());
       } else {
@@ -771,7 +771,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
                                                 boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
     // Generate 3 delta-log files w/ random records
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 
300);
 
@@ -826,7 +826,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     for (HoodieRecord record : scanner) {
       scannedHoodieRecords.add(record);
       scannedAvroRecords.add((IndexedRecord) ((SerializableIndexedRecord)
-          ((HoodieAvroRecord) 
record).getData().getInsertValue(schema).get()).getData());
+          ((HoodieAvroRecord) 
record).getData().getInsertValue(schema.toAvroSchema()).get()).getData());
     }
 
     assertEquals(sort(sampledRecords), sort(scannedAvroRecords));
@@ -859,7 +859,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
                                                              boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
     // Generate 3 delta-log files w/ random records
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> genRecords = testUtil.generateHoodieTestRecords(0, 
300);
 
@@ -911,7 +911,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     for (HoodieRecord record : scanner) {
       scannedHoodieRecords.add(record);
       scannedAvroRecords.add((IndexedRecord) ((SerializableIndexedRecord)
-          ((HoodieAvroRecord) 
record).getData().getInsertValue(schema).get()).getData());
+          ((HoodieAvroRecord) 
record).getData().getInsertValue(schema.toAvroSchema()).get()).getData());
     }
 
     assertEquals(sort(sampledRecords), sort(scannedAvroRecords));
@@ -960,7 +960,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     logFile = appendValidBlock(logFile.getPath(), "test-fileId1", "100", 10);
 
     // First round of reads - we should be able to read the first block and 
then EOF
-    Reader reader = HoodieLogFormat.newReader(storage, logFile, 
SchemaTestUtil.getSimpleSchema().toAvroSchema());
+    Reader reader = HoodieLogFormat.newReader(storage, logFile, 
SchemaTestUtil.getSimpleSchema());
     assertTrue(reader.hasNext(), "First block should be available");
     reader.next();
     assertTrue(reader.hasNext(), "We should have corrupted block next");
@@ -991,7 +991,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     logFile = appendValidBlock(logFile.getPath(), "test-fileId1", "100", 100);
 
     // Second round of reads - we should be able to read the first and last 
block
-    reader = HoodieLogFormat.newReader(storage, logFile, 
SchemaTestUtil.getSimpleSchema().toAvroSchema());
+    reader = HoodieLogFormat.newReader(storage, logFile, 
SchemaTestUtil.getSimpleSchema());
     assertTrue(reader.hasNext(), "First block should be available");
     reader.next();
     assertTrue(reader.hasNext(), "We should get the 1st corrupted block next");
@@ -1047,7 +1047,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     logFile = appendValidBlock(logFile.getPath(), "test-fileId1", "100", 10);
 
     // First round of reads - we should be able to read the first block and 
then EOF
-    Reader reader = HoodieLogFormat.newReader(storage, logFile, 
SchemaTestUtil.getSimpleSchema().toAvroSchema());
+    Reader reader = HoodieLogFormat.newReader(storage, logFile, 
SchemaTestUtil.getSimpleSchema());
     assertTrue(reader.hasNext(), "First block should be available");
     reader.next();
     assertTrue(reader.hasNext(), "We should have corrupted block next");
@@ -1130,7 +1130,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     appendValidBlock(writer.getLogFile().getPath(), "test-fileid1", "100", 10);
 
     // Read data and corrupt block
-    Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema().toAvroSchema());
+    Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema());
     assertTrue(reader.hasNext(), "First block should be available");
     reader.next();
     assertTrue(reader.hasNext(), "We should have corrupted block next");
@@ -1151,7 +1151,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
                                            boolean isCompressionEnabled,
                                            boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1163,7 +1163,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     // Write 1
     List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
 
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -1174,7 +1174,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     // Write 2
     List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords2 = records2.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
     dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
     writer.appendBlock(dataBlock);
@@ -1196,7 +1196,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
                                                            boolean 
isCompressionEnabled,
                                                            boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1207,7 +1207,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
 
@@ -1237,7 +1237,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
     List<IndexedRecord> records3 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords3 = records3.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
     dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records3, header);
     writer.appendBlock(dataBlock);
@@ -1260,7 +1260,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
                                                             boolean 
isCompressionEnabled,
                                                             boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1271,7 +1271,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1307,7 +1307,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "103");
     List<IndexedRecord> records3 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords3 = records3.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
 
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
     dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records3, header);
@@ -1331,7 +1331,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
                                                            boolean 
isCompressionEnabled,
                                                            boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1342,7 +1342,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1353,7 +1353,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
     List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords2 = records2.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
     writer.appendBlock(dataBlock);
 
@@ -1405,7 +1405,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
     scanner.forEach(s -> {
       try {
-        if (!((HoodieRecordPayload) 
s.getData()).getInsertValue(schema).isPresent()) {
+        if (!((HoodieRecordPayload) 
s.getData()).getInsertValue(schema.toAvroSchema()).isPresent()) {
           emptyPayloads.add(true);
         }
       } catch (IOException io) {
@@ -1449,7 +1449,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     final List<Boolean> newEmptyPayloads = new ArrayList<>();
     scanner.forEach(s -> {
       try {
-        if (!((HoodieRecordPayload) 
s.getData()).getInsertValue(schema).isPresent()) {
+        if (!((HoodieRecordPayload) 
s.getData()).getInsertValue(schema.toAvroSchema()).isPresent()) {
           newEmptyPayloads.add(true);
         }
       } catch (IOException io) {
@@ -1474,7 +1474,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
                                                                       boolean 
isCompressionEnabled,
                                                                       boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     String fileId = "test-fileid111";
     Writer writer =
@@ -1486,7 +1486,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1497,7 +1497,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
     List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> allRecordsInserted = records2.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     dataBlock = getDataBlock(HoodieLogBlockType.AVRO_DATA_BLOCK, records2, 
header);
     writer.appendBlock(dataBlock);
     allRecordsInserted.addAll(copyOfRecords1);
@@ -1561,7 +1561,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     final List<Boolean> newEmptyPayloads = new ArrayList<>();
     scanner.forEach(s -> {
       try {
-        if (!((HoodieRecordPayload) 
s.getData()).getInsertValue(schema).isPresent()) {
+        if (!((HoodieRecordPayload) 
s.getData()).getInsertValue(schema.toAvroSchema()).isPresent()) {
           newEmptyPayloads.add(true);
         }
       } catch (IOException io) {
@@ -1584,7 +1584,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   public void 
testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskMapType 
diskMapType,
                                                         boolean 
isCompressionEnabled)
       throws IOException, URISyntaxException, InterruptedException {
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1595,7 +1595,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1606,7 +1606,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
     List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords2 = records2.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
     writer.appendBlock(dataBlock);
 
@@ -1689,7 +1689,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     scanner.forEach(s -> readKeys.add(s.getRecordKey()));
     scanner.forEach(s -> {
       try {
-        if (!((HoodieRecordPayload) 
s.getData()).getInsertValue(schema).isPresent()) {
+        if (!((HoodieRecordPayload) 
s.getData()).getInsertValue(schema.toAvroSchema()).isPresent()) {
           emptyPayloadKeys.add(s.getRecordKey());
         }
       } catch (IOException io) {
@@ -1718,7 +1718,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       throws IOException, URISyntaxException, InterruptedException {
 
     // Write a Data block and Delete block with same InstantTime (written in 
same batch)
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1729,7 +1729,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
@@ -1788,7 +1788,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       throws IOException, URISyntaxException, InterruptedException {
 
     // Write a Data block and Delete block with same InstantTime (written in 
same batch)
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1799,7 +1799,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
@@ -1841,7 +1841,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
                                                          boolean 
isCompressionEnabled,
                                                          boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1879,7 +1879,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       throws IOException, URISyntaxException, InterruptedException {
 
     // Write a 3 Data blocs with same InstantTime (written in same batch)
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1890,7 +1890,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords1 = records1.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME, "100");
@@ -1934,7 +1934,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
                                                         boolean 
isCompressionEnabled,
                                                         boolean 
enableOptimizedLogBlocksScan)
       throws IOException, URISyntaxException, InterruptedException {
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -1959,7 +1959,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     List<IndexedRecord> records1 =
         testUtil.generateHoodieTestRecords(0, recordKeyList, "0000/00/00", 
"100");
     List<IndexedRecord> copyOfRecords1 = records1.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
@@ -1970,7 +1970,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
     List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
     List<IndexedRecord> copyOfRecords2 = records2.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)).collect(Collectors.toList());
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())).collect(Collectors.toList());
     dataBlock = getDataBlock(DEFAULT_DATA_BLOCK_TYPE, records2, header);
     writer.appendBlock(dataBlock);
 
@@ -2031,7 +2031,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
       scanner.forEach(s -> {
         try {
-          if (!((HoodieRecordPayload) s.getData()).getInsertValue(schema, new 
Properties()).isPresent()) {
+          if (!((HoodieRecordPayload) 
s.getData()).getInsertValue(schema.toAvroSchema(), new 
Properties()).isPresent()) {
             emptyPayloads.add(true);
           } else {
             recordKeys.add(s.getKey().getRecordKey());
@@ -2058,7 +2058,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
   @Test
   public void testAvroLogRecordReaderWithRollbackOlderBlocks()
       throws IOException, URISyntaxException, InterruptedException {
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -2119,7 +2119,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
       throws IOException, URISyntaxException, InterruptedException {
 
     // Write a 3 Data blocs with same InstantTime (written in same batch)
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -2214,7 +2214,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     // CRPB implies a corrupt block and CB implies a compacted block.
 
     // Write a 3 Data blocks with same InstantTime (written in same batch)
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
     // Set a small threshold so that every block is a new version
     Writer writer =
         HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -2225,7 +2225,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     SchemaTestUtil testUtil = new SchemaTestUtil();
     List<IndexedRecord> records1 = testUtil.generateHoodieTestRecords(0, 100);
     Set<String> recordKeysOfFirstTwoBatches = records1.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())
             
.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toSet());
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
@@ -2238,7 +2238,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     // Write 2nd data block
     List<IndexedRecord> records2 = testUtil.generateHoodieTestRecords(0, 100);
     recordKeysOfFirstTwoBatches.addAll(records2.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())
             
.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList()));
     header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "101");
@@ -2252,7 +2252,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     List<IndexedRecord> records3 = testUtil.generateHoodieTestRecords(0, 100);
     Set<String> recordKeysOfFirstThreeBatches = new 
HashSet<>(recordKeysOfFirstTwoBatches);
     recordKeysOfFirstThreeBatches.addAll(records3.stream()
-        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema)
+        .map(record -> HoodieAvroUtils.rewriteRecord((GenericRecord) record, 
schema.toAvroSchema())
             
.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString()).collect(Collectors.toList()));
     header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "102");
@@ -2415,7 +2415,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
                                                               boolean 
enableOptimizedLogBlocksScan) {
     try {
       // Write one Data block with same InstantTime (written in same batch)
-      Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+      HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
       SchemaTestUtil testUtil = new SchemaTestUtil();
       List<IndexedRecord> records = testUtil.generateHoodieTestRecords(0, 101);
       List<IndexedRecord> records2 = new ArrayList<>(records);
@@ -2551,7 +2551,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     FileCreateUtilsLegacy.createDeltaCommit(basePath, "100", storage);
 
     HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(), 
storage.getPathInfo(writer.getLogFile().getPath()).getLength());
-    try (HoodieLogFileReader reader = new HoodieLogFileReader(storage, 
logFile, SchemaTestUtil.getSimpleSchema().toAvroSchema(), BUFFER_SIZE, true)) {
+    try (HoodieLogFileReader reader = new HoodieLogFileReader(storage, 
logFile, SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, true)) {
 
       assertTrue(reader.hasPrev(), "Last block should be available");
       HoodieLogBlock prevBlock = reader.prev();
@@ -2636,7 +2636,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(),
         storage.getPathInfo(writer.getLogFile().getPath()).getLength());
 
-    try (HoodieLogFileReader reader = new HoodieLogFileReader(storage, 
logFile, schema.toAvroSchema(), BUFFER_SIZE, true)) {
+    try (HoodieLogFileReader reader = new HoodieLogFileReader(storage, 
logFile, schema, BUFFER_SIZE, true)) {
 
       assertTrue(reader.hasPrev(), "Last block should be available");
       HoodieLogBlock block = reader.prev();
@@ -2678,7 +2678,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     HoodieLogFile logFile = new HoodieLogFile(writer.getLogFile().getPath(),
         storage.getPathInfo(writer.getLogFile().getPath()).getLength());
     try (HoodieLogFileReader reader =
-             new HoodieLogFileReader(storage, logFile, 
SchemaTestUtil.getSimpleSchema().toAvroSchema(), BUFFER_SIZE, true)) {
+             new HoodieLogFileReader(storage, logFile, 
SchemaTestUtil.getSimpleSchema(), BUFFER_SIZE, true)) {
 
       assertTrue(reader.hasPrev(), "Third block should be available");
       reader.moveToPrev();
@@ -2710,11 +2710,11 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     List<IndexedRecord> recordsCopy = 
convertAvroToSerializableIndexedRecords(new ArrayList<>(records));
     assertEquals(100, records.size());
     assertEquals(100, recordsCopy.size());
-    HoodieAvroDataBlock dataBlock = new 
HoodieAvroDataBlock(records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
 schema.toAvroSchema());
+    HoodieAvroDataBlock dataBlock = new 
HoodieAvroDataBlock(records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
 schema);
     byte[] content = dataBlock.getBytes(schema.toAvroSchema());
     assertTrue(content.length > 0);
 
-    HoodieLogBlock logBlock = HoodieAvroDataBlock.getBlock(content, 
schema.toAvroSchema());
+    HoodieLogBlock logBlock = HoodieAvroDataBlock.getBlock(content, schema);
     assertEquals(HoodieLogBlockType.AVRO_DATA_BLOCK, logBlock.getBlockType());
     List<IndexedRecord> readRecords = getRecords((HoodieAvroDataBlock) 
logBlock);
     assertEquals(readRecords.size(), recordsCopy.size());
@@ -2767,9 +2767,9 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     writer.appendBlock(dataBlock);
     writer.close();
 
-    Schema projectedSchema = 
HoodieAvroUtils.generateProjectionSchema(schema.toAvroSchema(), 
Collections.singletonList("name"));
+    HoodieSchema projectedSchema = 
HoodieSchemaUtils.generateProjectionSchema(schema, 
Collections.singletonList("name"));
 
-    List<GenericRecord> projectedRecords = 
HoodieAvroUtils.rewriteRecords(records, projectedSchema);
+    List<GenericRecord> projectedRecords = 
HoodieAvroUtils.rewriteRecords(records, projectedSchema.toAvroSchema());
 
     try (Reader reader = HoodieLogFormat.newReader(storage, 
writer.getLogFile(), projectedSchema, false)) {
       assertTrue(reader.hasNext(), "First block should be available");
@@ -2797,7 +2797,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
           "Read records size should be equal to the written records size");
       assertEquals(expectedRecords, recordsRead,
           "Both records lists should be the same. (ordering guaranteed)");
-      assertEquals(dataBlockRead.getSchema().toAvroSchema(), projectedSchema);
+      assertEquals(dataBlockRead.getSchema(), projectedSchema);
 
       int bytesRead = (int) BenchmarkCounter.getBytesRead();
 
@@ -2926,7 +2926,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     outputStream.close();
 
     // First round of reads - we should be able to read the first block and 
then EOF
-    Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema().toAvroSchema());
+    Reader reader = HoodieLogFormat.newReader(storage, writer.getLogFile(), 
SchemaTestUtil.getSimpleSchema());
 
     assertTrue(reader.hasNext(), "First block should be available");
     reader.next();
@@ -2934,7 +2934,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     return reader;
   }
 
-  private void checkLogBlocksAndKeys(String latestInstantTime, Schema schema, 
ExternalSpillableMap.DiskMapType diskMapType,
+  private void checkLogBlocksAndKeys(String latestInstantTime, HoodieSchema 
schema, ExternalSpillableMap.DiskMapType diskMapType,
                                      boolean isCompressionEnabled, boolean 
enableOptimizedLogBlocksScan, int expectedTotalRecords,
                                      int expectedTotalKeys, 
Option<Set<String>> expectedKeys) throws IOException {
     List<String> allLogFiles =
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
index 5e37cbf08ae4..aa2b4ca4ba54 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/timeline/TestArchivedTimelineV1.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRollingStatMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
@@ -489,7 +490,7 @@ public class TestArchivedTimelineV1 extends 
HoodieCommonTestHarness {
 
   private void readAndValidateArchivedFile(String path, HoodieStorage storage) 
throws IOException {
     try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(
-        storage, new HoodieLogFile(path), 
HoodieArchivedMetaEntry.getClassSchema())) {
+        storage, new HoodieLogFile(path), 
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()))) {
 
       while (reader.hasNext()) {
         HoodieLogBlock block = reader.next();
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index 55643d5b239d..fd903383343c 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -327,7 +328,7 @@ public class HoodieCommonTestHarness {
 
   protected static List<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
                                                      Schema recordSchema,
-                                                     Schema writerSchema,
+                                                     HoodieSchema writerSchema,
                                                      List<HoodieRecord> 
records,
                                                      int numFiles,
                                                      HoodieStorage storage,
@@ -336,13 +337,13 @@ public class HoodieCommonTestHarness {
                                                      String commitTime)
       throws IOException, InterruptedException {
     List<IndexedRecord> indexedRecords = records.stream()
-        .map(record -> (IndexedRecord) 
record.rewriteRecordWithNewSchema(recordSchema, props, writerSchema).getData())
+        .map(record -> (IndexedRecord) 
record.rewriteRecordWithNewSchema(recordSchema, props, 
writerSchema.toAvroSchema()).getData())
         .collect(Collectors.toList());
     return writeLogFiles(partitionPath, writerSchema, indexedRecords, 
numFiles, storage, fileId, commitTime, "100");
   }
 
   protected static List<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
-                                                     Schema schema,
+                                                     HoodieSchema schema,
                                                      List<IndexedRecord> 
records,
                                                      int numFiles,
                                                      HoodieStorage storage)
@@ -351,7 +352,7 @@ public class HoodieCommonTestHarness {
   }
 
   protected static List<HoodieLogFile> writeLogFiles(StoragePath partitionPath,
-                                                     Schema schema,
+                                                     HoodieSchema schema,
                                                      List<IndexedRecord> 
records,
                                                      int numFiles,
                                                      HoodieStorage storage,
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 0eb13c6aa1ae..6f6cf310a0d4 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -259,7 +259,7 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
         FileSlice fileSlice2 = new FileSlice(p, instant2, fileId1);
         fileSlice2.setBaseFile(baseFile1);
         StoragePath storagePath2 = new 
StoragePath(partitionMetadataPath.getParent(), 
hoodieTestTable.getLogFileNameById(fileId1, 1));
-        writeLogFiles(new StoragePath(metaClient.getBasePath(), p), 
HoodieTestDataGenerator.AVRO_SCHEMA, 
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS,
+        writeLogFiles(new StoragePath(metaClient.getBasePath(), p), 
HoodieTestDataGenerator.AVRO_SCHEMA, 
HoodieTestDataGenerator.HOODIE_SCHEMA_WITH_METADATA_FIELDS,
             dataGen.generateInsertsForPartition(instant2, 10, p), 1, 
metaClient.getStorage(), new Properties(), fileId1, instant2);
         fileSlice2.addLogFile(new 
HoodieLogFile(storagePath2.toUri().toString()));
         partitionFileSlicePairs.add(Pair.of(p, fileSlice1));
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 6085773bf1d0..94bc8fac089f 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -103,7 +103,7 @@ public class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
             split.getPath().toString(), HadoopFSUtils.getStorageConf(jobConf)))
         .withBasePath(split.getBasePath())
         .withLogFilePaths(split.getDeltaLogPaths())
-        .withReaderSchema(getLogScannerReaderSchema())
+        
.withReaderSchema(HoodieSchema.fromAvroSchema(getLogScannerReaderSchema()))
         .withLatestInstantTime(split.getMaxCommitTime())
         
.withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf))
         .withReverseReader(false)
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index 396705c15189..171eca08e99b 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.hadoop.realtime;
 
 import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.log.HoodieUnMergedLogRecordScanner;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.Functions;
@@ -81,7 +82,7 @@ class RealtimeUnmergedRecordReader extends 
AbstractRealtimeRecordReader
                 split.getPath().toString(), 
HadoopFSUtils.getStorageConf(this.jobConf)))
             .withBasePath(split.getBasePath())
             .withLogFilePaths(split.getDeltaLogPaths())
-            .withReaderSchema(getReaderSchema())
+            .withReaderSchema(HoodieSchema.fromAvroSchema(getReaderSchema()))
             .withLatestInstantTime(split.getMaxCommitTime())
             .withReverseReader(false)
             
.withBufferSize(this.jobConf.getInt(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(),
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
index a2e2af8b397b..6a28d2af703c 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java
@@ -289,7 +289,7 @@ public class DFSHoodieDatasetInputReader extends 
DFSDeltaInputReader {
           .withLogFilePaths(
               fileSlice.getLogFiles().map(l -> l.getPath().getName())
                   .collect(Collectors.toList()))
-          .withReaderSchema(new Schema.Parser().parse(schemaStr))
+          .withReaderSchema(HoodieSchema.parse(schemaStr))
           
.withLatestInstantTime(metaClient.getActiveTimeline().getCommitsTimeline()
               .filterCompletedInstants().lastInstant().get().requestedTime())
           .withMaxMemorySizeInBytes(
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index 31a982e0910b..85ecad6002b9 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -451,7 +451,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
           val cdcLogFiles = currentCDCFileSplit.getCdcFiles.asScala.map { 
cdcFile =>
             new HoodieLogFile(storage.getPathInfo(new StoragePath(basePath, 
cdcFile)))
           }.toArray
-          cdcLogRecordIterator = new HoodieCDCLogRecordIterator(storage, 
cdcLogFiles, cdcHoodieSchema.toAvroSchema)
+          cdcLogRecordIterator = new HoodieCDCLogRecordIterator(storage, 
cdcLogFiles, cdcHoodieSchema)
         case REPLACE_COMMIT =>
           if (currentCDCFileSplit.getBeforeFileSlice.isPresent) {
             
loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
index 4ca87c42d3fb..5ba9960d1d3c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ExportInstantsProcedure.scala
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.avro.model.HoodieArchivedMetaEntry
 import org.apache.hudi.common.model.HoodieLogFile
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.log.HoodieLogFormat
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock
@@ -120,7 +121,7 @@ class ExportInstantsProcedure extends BaseProcedure with 
ProcedureBuilder with L
     for (fs <- statuses.asScala) {
       // read the archived file
       val reader = HoodieLogFormat.newReader(
-        storage, new HoodieLogFile(convertToStoragePath(fs.getPath)), 
HoodieArchivedMetaEntry.getClassSchema)
+        storage, new HoodieLogFile(convertToStoragePath(fs.getPath)), 
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema))
       // read the avro blocks
       while ( {
         reader.hasNext && copyCount < limit
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
index a986db8254af..1ffdd0d80cad 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command.procedures
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieLogFile
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.TableSchemaResolver
 import org.apache.hudi.common.table.log.HoodieLogFormat
 import org.apache.hudi.common.table.log.block.{HoodieCorruptBlock, 
HoodieDataBlock}
@@ -73,7 +74,7 @@ class ShowHoodieLogFileMetadataProcedure extends 
BaseProcedure with ProcedureBui
       logFilePath => {
         val statuses = storage.listDirectEntries(new StoragePath(logFilePath))
         val schema = TableSchemaResolver.readSchemaFromLogFile(storage, new 
StoragePath(logFilePath))
-        val reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(statuses.get(0).getPath), schema)
+        val reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(statuses.get(0).getPath), HoodieSchema.fromAvroSchema(schema))
 
         // read the avro blocks
         while (reader.hasNext) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index 7dbf57c1925a..db72ec23a123 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -21,6 +21,7 @@ import org.apache.hudi.common.config.{HoodieCommonConfig, 
HoodieMemoryConfig, Ho
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{HoodieLogFile, HoodieRecordPayload}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.TableSchemaResolver
 import org.apache.hudi.common.table.log.{HoodieLogFormat, 
HoodieMergedLogRecordScanner}
 import org.apache.hudi.common.table.log.block.HoodieDataBlock
@@ -68,7 +69,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure 
with ProcedureBuil
     ValidationUtils.checkArgument(logFilePaths.nonEmpty, "There is no log 
file")
     val allRecords: java.util.List[IndexedRecord] = new 
java.util.ArrayList[IndexedRecord]
     if (merge) {
-      val schema = 
Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new 
StoragePath(logFilePaths.last)))
+      val schema = 
HoodieSchema.fromAvroSchema(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage,
 new StoragePath(logFilePaths.last))))
       val scanner = HoodieMergedLogRecordScanner.newBuilder
         .withStorage(storage)
         .withBasePath(basePath)
@@ -83,7 +84,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure 
with ProcedureBuil
         
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue)
         .build
       scanner.asScala.foreach(hoodieRecord => {
-        val record = 
hoodieRecord.getData.asInstanceOf[HoodieRecordPayload[_]].getInsertValue(schema).get()
+        val record = 
hoodieRecord.getData.asInstanceOf[HoodieRecordPayload[_]].getInsertValue(schema.toAvroSchema).get()
         if (allRecords.size() < limit) {
           allRecords.add(record)
         }
@@ -92,7 +93,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure 
with ProcedureBuil
       logFilePaths.toStream.takeWhile(_ => allRecords.size() < limit).foreach {
         logFilePath => {
           val schema = 
Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new 
StoragePath(logFilePath)))
-          val reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(logFilePath), schema)
+          val reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(logFilePath), HoodieSchema.fromAvroSchema(schema))
           while (reader.hasNext) {
             val block = reader.next()
             block match {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 52ef94ab85fa..514cec157265 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -608,7 +608,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
       WriteClientTestUtils.startCommitWithTime(client, commitTime1);
       List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, 100);
       JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 10);
-      BulkInsertPartitioner<JavaRDD<HoodieRecord>> partitioner = new 
RDDCustomColumnsSortPartitioner(new String[] {"rider"}, 
HoodieTestDataGenerator.AVRO_SCHEMA, config);
+      BulkInsertPartitioner<JavaRDD<HoodieRecord>> partitioner = new 
RDDCustomColumnsSortPartitioner(new String[] {"rider"}, 
HoodieTestDataGenerator.HOODIE_SCHEMA, config);
       List<WriteStatus> statuses = client.bulkInsert(insertRecordsRDD1, 
commitTime1, Option.of(partitioner)).collect();
       assertNoWriteErrors(statuses);
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
index 69553b8f319e..f000a7a14232 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
@@ -229,9 +229,9 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
 
     JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
     JavaRDD<HoodieRecord> records2 = 
generateTripleTestRecordsForBulkInsert(jsc);
-    testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA, config),
+    testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.HOODIE_SCHEMA, config),
         records1, true, true, true, 
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator), 
true);
-    testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA, config),
+    testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.HOODIE_SCHEMA, config),
         records2, true, true, true, 
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator), 
true);
 
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 44703b04d6ca..8d51c21fbcf7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -1405,7 +1405,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       }
 
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
-          new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
+          new HoodieLogFile(pathInfoList.get(0).getPath()), 
HoodieSchema.fromAvroSchema(writerSchema))) {
         while (logFileReader.hasNext()) {
           HoodieLogBlock logBlock = logFileReader.next();
           if (logBlock instanceof HoodieDataBlock) {
@@ -3955,7 +3955,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       }
 
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
-          new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
+          new HoodieLogFile(pathInfoList.get(0).getPath()), 
HoodieSchema.fromAvroSchema(writerSchema))) {
         while (logFileReader.hasNext()) {
           HoodieLogBlock logBlock = logFileReader.next();
           if (logBlock instanceof HoodieDataBlock) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
index ed961747692b..0279bbe44d88 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMergeHandle.java
@@ -261,7 +261,7 @@ public class TestMergeHandle extends BaseTestHandle {
     String cdcFilePath = metaClient.getBasePath().toString() + "/" + 
writeStatus.getStat().getCdcStats().keySet().stream().findFirst().get();
     HoodieSchema cdcSchema = 
schemaBySupplementalLoggingMode(HoodieCDCSupplementalLoggingMode.OP_KEY_ONLY, 
HOODIE_SCHEMA);
     int recordKeyFieldIndex = cdcSchema.getField("record_key").get().pos();
-    try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(storage, 
new HoodieLogFile(cdcFilePath), cdcSchema.toAvroSchema())) {
+    try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(storage, 
new HoodieLogFile(cdcFilePath), cdcSchema)) {
       while (reader.hasNext()) {
         HoodieLogBlock logBlock = reader.next();
         if (logBlock instanceof HoodieDataBlock) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 8b524b1ffb10..93ae04e5df29 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -95,7 +95,7 @@ import static 
org.apache.hudi.common.model.HoodieWriteStat.NULL_COMMIT;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
-import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.HOODIE_SCHEMA;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
@@ -607,7 +607,7 @@ public class TestHoodieMergeOnReadTable extends 
SparkClientFunctionalTestHarness
         .filter(path -> FSUtils.isLogFile(path.getName()))
         .forEach(logFilePath -> {
           try {
-            HoodieLogFileReader reader = new 
HoodieLogFileReader(hoodieStorage(), new HoodieLogFile(logFilePath), 
AVRO_SCHEMA, 10000, false,
+            HoodieLogFileReader reader = new 
HoodieLogFileReader(hoodieStorage(), new HoodieLogFile(logFilePath), 
HOODIE_SCHEMA, 10000, false,
                 false, "_row_key", null);
             Map<HoodieLogBlock.HeaderMetadataType, String> headers = 
Collections.emptyMap();
             while (reader.hasNext()) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 9aa5767b7b98..a06c3533d9b0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -352,7 +352,7 @@ class TestMORDataSourceStorage extends 
SparkClientFunctionalTestHarness {
                                         shouldContainRecordPosition: Boolean,
                                         logFileList: List[HoodieLogFile],
                                         shouldBaseFileInstantTimeMatch: 
Boolean): Unit = {
-    val schema = new TableSchemaResolver(metaClient).getTableAvroSchema
+    val schema = new TableSchemaResolver(metaClient).getTableSchema
     val fsv = FileSystemViewManager.createInMemoryFileSystemView(
       context(), metaClient, HoodieMetadataConfig.newBuilder().build())
     logFileList.foreach(filename => {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
index a17f68b2479c..d8a2a99ee328 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/cdc/HoodieCDCTestBase.scala
@@ -120,7 +120,7 @@ abstract class HoodieCDCTestBase extends 
HoodieSparkClientTestBase {
   protected def getCDCBlocks(relativeLogFile: String, cdcSchema: 
HoodieSchema): List[HoodieDataBlock] = {
     val logFile = new HoodieLogFile(
       metaClient.getStorage.getPathInfo(new 
StoragePath(metaClient.getBasePath, relativeLogFile)))
-    val reader = HoodieLogFormat.newReader(storage, logFile, 
cdcSchema.toAvroSchema)
+    val reader = HoodieLogFormat.newReader(storage, logFile, cdcSchema)
     val blocks = scala.collection.mutable.ListBuffer.empty[HoodieDataBlock]
     while(reader.hasNext) {
       blocks.asJava.add(reader.next().asInstanceOf[HoodieDataBlock])
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index 40dd8f4c8a44..5a93fcef6e2c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -454,11 +454,11 @@ object HoodieSparkSqlTestBase {
     val logFilePathList: java.util.List[String] = 
HoodieTestUtils.getLogFileListFromFileSlice(fileSlice.get)
     Collections.sort(logFilePathList)
     var deleteLogBlockFound = false
-    val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
+    val schema = new TableSchemaResolver(metaClient).getTableSchema
     for (i <- 0 until logFilePathList.size()) {
       val logReader = new HoodieLogFileReader(
         metaClient.getStorage, new HoodieLogFile(logFilePathList.get(i)),
-        avroSchema, 1024 * 1024, false, false,
+        schema, 1024 * 1024, false, false,
         "id", null)
       assertTrue(logReader.hasNext)
       val logBlock = logReader.next()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
index 735542585235..fc08271bc9b1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestPartialUpdateForMergeInto.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.{DataSourceReadOptions, 
DataSourceWriteOptions}
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig, 
RecordMergeMode}
 import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
+import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaUtils}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
HoodieTableVersion, TableSchemaResolver}
 import org.apache.hudi.common.table.log.HoodieLogFileReader
 import org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType
@@ -743,11 +744,11 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
     val logFilePathList: List[String] = 
HoodieTestUtils.getLogFileListFromFileSlice(fileSlice.get)
     Collections.sort(logFilePathList)
 
-    val avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema
+    val schema = new TableSchemaResolver(metaClient).getTableSchema
     for (i <- 0 until expectedNumLogFile) {
       val logReader = new HoodieLogFileReader(
         metaClient.getStorage, new HoodieLogFile(logFilePathList.get(i)),
-        avroSchema, 1024 * 1024, false, false,
+        schema, 1024 * 1024, false, false,
         "id", null)
       assertTrue(logReader.hasNext)
       val logBlockHeader = logReader.next().getLogBlockHeader
@@ -758,9 +759,9 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
       } else {
         assertFalse(logBlockHeader.containsKey(HeaderMetadataType.IS_PARTIAL))
       }
-      val actualSchema = new 
Schema.Parser().parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
-      val expectedSchema = 
HoodieAvroUtils.addMetadataFields(HoodieAvroUtils.generateProjectionSchema(
-        avroSchema, changedFields(i).asJava), false)
+      val actualSchema = 
HoodieSchema.parse(logBlockHeader.get(HeaderMetadataType.SCHEMA))
+      val expectedSchema = 
HoodieSchemaUtils.addMetadataFields(HoodieSchemaUtils.generateProjectionSchema(
+        schema, changedFields(i).asJava), false)
       assertEquals(expectedSchema, actualSchema)
     }
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 5eb4b7e2158e..24de148ca1c5 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1662,7 +1662,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           continue;
         }
         reader =
-            HoodieLogFormat.newReader(storage, new 
HoodieLogFile(logFilePathStr), readerSchema.toAvroSchema(), false);
+            HoodieLogFormat.newReader(storage, new 
HoodieLogFile(logFilePathStr), readerSchema, false);
         // read the avro blocks
         if (reader.hasNext()) {
           HoodieLogBlock block = reader.next();


Reply via email to