This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 29b0a479cac6 feat(schema): Phase 18 - HoodieAvroUtils removal (Part 3) 
(#17659)
29b0a479cac6 is described below

commit 29b0a479cac67fb07b756babf26f77dc0bc7f78c
Author: voonhous <[email protected]>
AuthorDate: Thu Jan 15 03:47:15 2026 +0800

    feat(schema): Phase 18 - HoodieAvroUtils removal (Part 3) (#17659)
---
 .../hudi/aws/sync/AWSGlueCatalogSyncClient.java    |   6 +-
 .../hudi/cli/commands/HoodieLogFileCommand.java    |  13 +-
 .../org/apache/hudi/cli/commands/TableCommand.java |   4 +-
 ...ConcurrentSchemaEvolutionTableSchemaGetter.java |   3 +-
 .../hudi/table/action/compact/HoodieCompactor.java |   4 +-
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |   9 +-
 .../apache/hudi/index/HoodieSparkIndexClient.java  |   2 +-
 .../functional/TestHoodieBackedTableMetadata.java  |   2 +-
 .../hudi/common/schema/HoodieSchemaUtils.java      |  47 +++++
 .../hudi/common/table/HoodieTableConfig.java       |   6 +-
 .../hudi/common/table/TableSchemaResolver.java     | 190 ++++-------------
 .../common/table/read/PartialUpdateHandler.java    |   4 +-
 .../index/secondary/SecondaryIndexManager.java     |   8 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |   2 +-
 .../hudi/common/schema/TestHoodieSchemaUtils.java  | 229 +++++++++++++++++++++
 .../table/read/TestHoodieFileGroupReaderBase.java  |   6 +-
 .../java/org/apache/hudi/util/CompactionUtil.java  |   2 +-
 .../hudi/table/catalog/TestHoodieCatalog.java      |   3 +-
 .../hudi/table/catalog/TestHoodieHiveCatalog.java  |   3 +-
 .../hudi/common/table/TestTableSchemaResolver.java |  46 +----
 .../apache/hudi/hadoop/SchemaEvolutionContext.java |   2 +-
 .../hadoop/hive/HoodieCombineHiveInputFormat.java  |   2 +-
 .../HoodieMergeOnReadTableInputFormat.java         |  18 +-
 .../scala/org/apache/hudi/BucketIndexSupport.scala |   6 +-
 .../scala/org/apache/hudi/HoodieCLIUtils.scala     |   2 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |   5 +-
 .../procedures/RunClusteringProcedure.scala        |   2 +-
 .../ShowHoodieLogFileMetadataProcedure.scala       |   2 +-
 .../ShowHoodieLogFileRecordsProcedure.scala        |   4 +-
 .../hudi/client/TestHoodieClientMultiWriter.java   |   2 +-
 .../hudi/functional/TestHoodieBackedMetadata.java  |   9 +-
 .../hudi/functional/TestRecordLevelIndex.scala     |   2 +-
 .../hudi/functional/TestTimeTravelQuery.scala      |  18 +-
 .../apache/spark/sql/hudi/ddl/TestAlterTable.scala |   2 +-
 .../hudi/feature/index/TestSecondaryIndex.scala    |   6 +-
 .../org/apache/hudi/hive/HoodieHiveSyncClient.java |   4 +-
 .../org/apache/hudi/utilities/HoodieCompactor.java |   4 +-
 .../utilities/HoodieMetadataTableValidator.java    |   3 +-
 .../org/apache/hudi/utilities/UtilHelpers.java     |   3 +-
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 120 +++++++----
 ...estHoodieDeltaStreamerSchemaEvolutionQuick.java |  32 ++-
 41 files changed, 510 insertions(+), 327 deletions(-)

diff --git 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
index 4050d47ff96b..ccd09675fb51 100644
--- 
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
+++ 
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AWSGlueCatalogSyncClient.java
@@ -467,7 +467,7 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
 
   private String getTableDoc() {
     try {
-      return tableSchemaResolver.getTableAvroSchema(true).getDoc();
+      return tableSchemaResolver.getTableSchema(true).getDoc().orElseGet(() -> 
"");
     } catch (Exception e) {
       throw new HoodieGlueSyncException("Failed to get schema's doc from 
storage : ", e);
     }
@@ -476,10 +476,10 @@ public class AWSGlueCatalogSyncClient extends 
HoodieSyncClient {
   @Override
   public List<FieldSchema> getStorageFieldSchemas() {
     try {
-      return tableSchemaResolver.getTableAvroSchema(true)
+      return tableSchemaResolver.getTableSchema(true)
           .getFields()
           .stream()
-          .map(f -> new FieldSchema(f.name(), f.schema().getType().getName(), 
f.doc()))
+          .map(f -> new FieldSchema(f.name(), 
f.schema().getType().toAvroType().getName(), f.doc()))
           .collect(Collectors.toList());
     } catch (Exception e) {
       throw new HoodieGlueSyncException("Failed to get field schemas from 
storage : ", e);
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index ec06709ab982..6138e1fb3839 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -46,14 +46,13 @@ import 
org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.io.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.io.util.FileIOUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.springframework.shell.standard.ShellComponent;
 import org.springframework.shell.standard.ShellMethod;
@@ -116,7 +115,7 @@ public class HoodieLogFileCommand {
       } else {
         fileName = path.getName();
       }
-      HoodieSchema writerSchema = 
HoodieSchema.fromAvroSchema(TableSchemaResolver.readSchemaFromLogFile(storage, 
path));
+      HoodieSchema writerSchema = 
TableSchemaResolver.readSchemaFromLogFile(storage, path);
       try (Reader reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(path), writerSchema)) {
 
         // read the avro blocks
@@ -221,10 +220,10 @@ public class HoodieLogFileCommand {
     HoodieSchema readerSchema = null;
     // get schema from last log file
     for (int i = logFilePaths.size() - 1; i >= 0; i--) {
-      Schema schema = TableSchemaResolver.readSchemaFromLogFile(
+      HoodieSchema schema = TableSchemaResolver.readSchemaFromLogFile(
           storage, new StoragePath(logFilePaths.get(i)));
       if (schema != null) {
-        readerSchema = HoodieSchema.fromAvroSchema(schema);
+        readerSchema = schema;
         break;
       }
     }
@@ -262,10 +261,10 @@ public class HoodieLogFileCommand {
       }
     } else {
       for (String logFile : logFilePaths) {
-        Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(
+        HoodieSchema writerSchema = TableSchemaResolver.readSchemaFromLogFile(
             client.getStorage(), new StoragePath(logFile));
         try (HoodieLogFormat.Reader reader =
-                 HoodieLogFormat.newReader(storage, new HoodieLogFile(new 
StoragePath(logFile)), HoodieSchema.fromAvroSchema(writerSchema))) {
+                 HoodieLogFormat.newReader(storage, new HoodieLogFile(new 
StoragePath(logFile)), writerSchema)) {
           // read the avro blocks
           while (reader.hasNext()) {
             HoodieLogBlock n = reader.next();
diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
index bdc16cf45196..0e3f7a4029dc 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/TableCommand.java
@@ -25,6 +25,7 @@ import org.apache.hudi.cli.TableHeader;
 import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -41,7 +42,6 @@ import org.apache.hudi.storage.StoragePath;
 import 
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.springframework.shell.standard.ShellComponent;
 import org.springframework.shell.standard.ShellMethod;
 import org.springframework.shell.standard.ShellOption;
@@ -204,7 +204,7 @@ public class TableCommand {
               help = "File path to write schema") final String outputFilePath) 
throws Exception {
     HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
     TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(client);
-    Schema schema = tableSchemaResolver.getTableAvroSchema();
+    HoodieSchema schema = tableSchemaResolver.getTableSchema();
     if (outputFilePath != null) {
       log.info("Latest table schema : " + schema.toString(true));
       writeToFile(outputFilePath, schema.toString(true));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
index a51dbcc396cf..0bb7db3fa583 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
@@ -96,8 +96,7 @@ class ConcurrentSchemaEvolutionTableSchemaGetter {
   }
 
   private Option<HoodieSchema> getTableCreateSchemaWithoutMetaField() {
-    return metaClient.getTableConfig().getTableCreateSchema()
-        .map(HoodieSchema::fromAvroSchema);
+    return metaClient.getTableConfig().getTableCreateSchema();
   }
 
   private void setCachedLatestCommitWithValidSchema(Option<HoodieInstant> 
instantOption) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index 04aaef78ec03..85dd1d45dc12 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.InstantRange;
@@ -46,7 +47,6 @@ import org.apache.hudi.io.HoodieMergeHandleFactory;
 import org.apache.hudi.table.HoodieTable;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 
 import java.io.IOException;
@@ -110,7 +110,7 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
     // the same with the table schema.
     try {
       if (StringUtils.isNullOrEmpty(config.getInternalSchema())) {
-        Schema readerSchema = schemaResolver.getTableAvroSchema(false);
+        HoodieSchema readerSchema = schemaResolver.getTableSchema(false);
         config.setSchema(readerSchema.toString());
       }
     } catch (Exception e) {
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 44bb6d2f89ad..3525995fb540 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -109,7 +109,6 @@ import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 import org.apache.hudi.testutils.TestHoodieMetadataBase;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.api.AfterEach;
@@ -892,7 +891,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
   private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> 
logFiles, boolean enableMetaFields) throws IOException {
     for (HoodieLogFile logFile : logFiles) {
       List<StoragePathInfo> pathInfoList = 
storage.listDirectEntries(logFile.getPath());
-      Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage,
+      HoodieSchema writerSchema = 
TableSchemaResolver.readSchemaFromLogFile(storage,
           logFile.getPath());
       if (writerSchema == null) {
         // not a data block
@@ -900,7 +899,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       }
 
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
-          new HoodieLogFile(pathInfoList.get(0).getPath()), 
HoodieSchema.fromAvroSchema(writerSchema))) {
+          new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
         while (logFileReader.hasNext()) {
           HoodieLogBlock logBlock = logFileReader.next();
           if (logBlock instanceof HoodieDataBlock) {
@@ -2875,7 +2874,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
   private void verifyMetadataColumnStatsRecords(List<HoodieLogFile> logFiles) 
throws IOException {
     for (HoodieLogFile logFile : logFiles) {
       List<StoragePathInfo> pathInfoList = 
storage.listDirectEntries(logFile.getPath());
-      Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage,
+      HoodieSchema writerSchema = 
TableSchemaResolver.readSchemaFromLogFile(storage,
           logFile.getPath());
       if (writerSchema == null) {
         // not a data block
@@ -2883,7 +2882,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       }
 
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
-          new HoodieLogFile(pathInfoList.get(0).getPath()), 
HoodieSchema.fromAvroSchema(writerSchema))) {
+          new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
         while (logFileReader.hasNext()) {
           HoodieLogBlock logBlock = logFileReader.next();
           if (logBlock instanceof HoodieDataBlock) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java
index 392e8adb95ca..7cc8e7545ae0 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java
@@ -204,7 +204,7 @@ public class HoodieSparkIndexClient extends 
BaseHoodieIndexClient {
                                              Option<String> indexTypeOpt, 
Map<String, String> configs) {
     try {
       TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
-      String schemaStr = schemaUtil.getTableAvroSchema(false).toString();
+      String schemaStr = schemaUtil.getTableSchema(false).toString();
       TypedProperties props = getProps(metaClient, indexDefinitionOpt, 
indexTypeOpt, schemaStr);
       if (!engineContextOpt.isPresent()) {
         engineContextOpt = Option.of(new HoodieSparkEngineContext(new 
JavaSparkContext(sparkSessionOpt.get().sparkContext())));
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 974e717a211f..353d94904ceb 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -520,7 +520,7 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
     for (HoodieLogFile logFile : logFiles) {
       List<StoragePathInfo> pathInfoList = 
storage.listDirectEntries(logFile.getPath());
       HoodieSchema writerSchema  =
-          
HoodieSchema.fromAvroSchema(TableSchemaResolver.readSchemaFromLogFile(storage, 
logFile.getPath()));
+          TableSchemaResolver.readSchemaFromLogFile(storage, 
logFile.getPath());
       if (writerSchema == null) {
         // not a data block
         continue;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
index 82fac31a07bd..f188d6c7b3bd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
@@ -28,6 +28,7 @@ import org.apache.hudi.internal.schema.HoodieSchemaException;
 
 import org.apache.avro.JsonProperties;
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -794,4 +795,50 @@ public final class HoodieSchemaUtils {
   public static boolean isMetadataField(String fieldName) {
     return HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldName);
   }
+
+  /**
+   * Converts a HoodieSchemaField's default value to its Java representation.
+   * This is equivalent to {@link 
org.apache.hudi.avro.HoodieAvroUtils#toJavaDefaultValue(org.apache.avro.Schema.Field)}
+   * but operates on HoodieSchemaField.
+   *
+   * <p>For primitive types (STRING, INT, LONG, FLOAT, DOUBLE, BOOLEAN, ENUM, 
BYTES),
+   * the default value is returned as-is. For complex types (ARRAY, MAP, 
RECORD),
+   * Avro's GenericData utility is used to properly construct the default 
value.</p>
+   *
+   * @param field the HoodieSchemaField containing the default value
+   * @return the Java representation of the default value, or null if no 
default value exists
+   * @throws IllegalArgumentException if the field's type is not supported
+   * @since 1.2.0
+   */
+  public static Object toJavaDefaultValue(HoodieSchemaField field) {
+    ValidationUtils.checkArgument(field != null, "Field cannot be null");
+
+    Option<Object> defaultValOpt = field.defaultVal();
+    if (!defaultValOpt.isPresent() || defaultValOpt.get() == 
HoodieJsonProperties.NULL_VALUE) {
+      return null;
+    }
+
+    Object defaultVal = defaultValOpt.get();
+    HoodieSchemaType type = field.getNonNullSchema().getType();
+
+    switch (type) {
+      case STRING:
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case BOOLEAN:
+      case ENUM:
+      case BYTES:
+        return defaultVal;
+      case ARRAY:
+      case MAP:
+      case RECORD:
+        // Use Avro's standard GenericData utility for complex types
+        // Delegate to the underlying Avro field
+        return GenericData.get().getDefaultValue(field.getAvroField());
+      default:
+        throw new IllegalArgumentException("Unsupported HoodieSchema type: " + 
type);
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index c5bcfbaff6f6..cfcf203ebc40 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -44,6 +44,7 @@ import org.apache.hudi.common.model.PartialUpdateAvroPayload;
 import org.apache.hudi.common.model.debezium.DebeziumConstants;
 import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
 import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -63,7 +64,6 @@ import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.avro.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -1113,9 +1113,9 @@ public class HoodieTableConfig extends HoodieConfig {
     return Option.ofNullable(getString(BOOTSTRAP_BASE_PATH));
   }
 
-  public Option<Schema> getTableCreateSchema() {
+  public Option<HoodieSchema> getTableCreateSchema() {
     if (contains(CREATE_SCHEMA)) {
-      return Option.of(new Schema.Parser().parse(getString(CREATE_SCHEMA)));
+      return Option.of(HoodieSchema.parse(getString(CREATE_SCHEMA)));
     } else {
       return Option.empty();
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 768db4b533e5..29a20e735c13 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.table;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -32,7 +31,6 @@ import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
@@ -51,7 +49,6 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.Lazy;
 
-import org.apache.avro.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -107,138 +104,70 @@ public class TableSchemaResolver {
     this.hasOperationField = Lazy.lazily(this::hasOperationField);
   }
 
-  public Schema getTableAvroSchemaFromDataFile() throws Exception {
-    return 
getTableAvroSchemaFromDataFileInternal().orElseThrow(schemaNotFoundError());
-  }
-
   /**
    * Gets full schema (user + metadata) for a hoodie table from data file as 
HoodieSchema.
-   * Delegates to getTableAvroSchemaFromDataFile and wraps the result in a 
HoodieSchema.
    *
    * @return HoodieSchema for this table from data file
    * @throws Exception
    */
   public HoodieSchema getTableSchemaFromDataFile() throws Exception {
-    Schema avroSchema = getTableAvroSchemaFromDataFile();
-    return HoodieSchema.fromAvroSchema(avroSchema);
+    return 
getTableSchemaFromDataFileInternal().orElseThrow(schemaNotFoundError());
   }
 
-  private Option<Schema> getTableAvroSchemaFromDataFileInternal() {
+  private Option<HoodieSchema> getTableSchemaFromDataFileInternal() {
     return getTableParquetSchemaFromDataFile();
   }
 
   /**
-   * Gets full schema (user + metadata) for a hoodie table as HoodieSchema.
-   * Delegates to getTableAvroSchema and wraps the result in a HoodieSchema.
+   * Gets full schema (user + metadata) for a hoodie table.
    *
    * @return HoodieSchema for this table
    * @throws Exception
    */
   public HoodieSchema getTableSchema() throws Exception {
-    Schema avroSchema = 
getTableAvroSchema(metaClient.getTableConfig().populateMetaFields());
-    return HoodieSchema.fromAvroSchema(avroSchema);
+    return getTableSchema(metaClient.getTableConfig().populateMetaFields());
   }
 
   /**
-   * Gets full schema (user + metadata) for a hoodie table as HoodieSchema.
-   * Delegates to getTableAvroSchema and wraps the result in a HoodieSchema.
+   * Gets schema for a hoodie table, can choose if include metadata fields 
should be included.
    *
    * @param includeMetadataFields choice if include metadata fields
-   * @return HoodieSchema for this table
+   * @return schema for this table
    * @throws Exception
    */
   public HoodieSchema getTableSchema(boolean includeMetadataFields) throws 
Exception {
-    Schema avroSchema = getTableAvroSchema(includeMetadataFields);
-    return HoodieSchema.fromAvroSchema(avroSchema);
+    return getTableSchemaInternal(includeMetadataFields, 
Option.empty()).orElseThrow(schemaNotFoundError());
   }
 
   /**
-   * Fetches tables schema in Avro format as of the given instant as 
HoodieSchema.
+   * Fetches tables schema as of the given instant
    *
    * @param timestamp as of which table's schema will be fetched
    */
   public HoodieSchema getTableSchema(String timestamp) throws Exception {
-    Schema avroSchema = getTableAvroSchema(timestamp);
-    return HoodieSchema.fromAvroSchema(avroSchema);
-  }
-
-  /**
-   * Fetches HoodieSchema as of the given instant
-   *
-   * @param instant as of which table's schema will be fetched
-   */
-  public HoodieSchema getTableSchema(HoodieInstant instant, boolean 
includeMetadataFields) throws Exception {
-    Schema schema = getTableAvroSchema(instant, includeMetadataFields);
-    return HoodieSchema.fromAvroSchema(schema);
-  }
-
-  public Option<HoodieSchema> getTableSchemaIfPresent(boolean 
includeMetadataFields) {
-    return getTableAvroSchemaInternal(includeMetadataFields, 
Option.empty()).map(HoodieSchema::fromAvroSchema);
-  }
-
-  /**
-   * Gets full schema (user + metadata) for a hoodie table in Avro format.
-   *
-   * @return Avro schema for this table
-   * @throws Exception
-   */
-  public Schema getTableAvroSchema() throws Exception {
-    return 
getTableAvroSchema(metaClient.getTableConfig().populateMetaFields());
-  }
-
-  /**
-   * Gets schema for a hoodie table in Avro format, can choice if include 
metadata fields.
-   *
-   * @param includeMetadataFields choice if include metadata fields
-   * @return Avro schema for this table
-   * @throws Exception
-   */
-  public Schema getTableAvroSchema(boolean includeMetadataFields) throws 
Exception {
-    return getTableAvroSchemaInternal(includeMetadataFields, 
Option.empty()).orElseThrow(schemaNotFoundError());
-  }
-
-  /**
-   * Fetches tables schema in Avro format as of the given instant
-   *
-   * @param timestamp as of which table's schema will be fetched
-   */
-  public Schema getTableAvroSchema(String timestamp) throws Exception {
     Option<HoodieInstant> instant = 
metaClient.getActiveTimeline().getCommitsTimeline()
         .filterCompletedInstants()
         .findInstantsBeforeOrEquals(timestamp)
         .lastInstant();
-    return 
getTableAvroSchemaInternal(metaClient.getTableConfig().populateMetaFields(), 
instant)
+    return 
getTableSchemaInternal(metaClient.getTableConfig().populateMetaFields(), 
instant)
         .orElseThrow(schemaNotFoundError());
   }
 
   /**
-   * Fetches tables schema in Avro format as of the given instant
+   * Fetches tables schema as of the given instant
    *
    * @param instant as of which table's schema will be fetched
    */
-  public Schema getTableAvroSchema(HoodieInstant instant, boolean 
includeMetadataFields) throws Exception {
-    return getTableAvroSchemaInternal(includeMetadataFields, 
Option.of(instant)).orElseThrow(schemaNotFoundError());
-  }
-
-  /**
-   * Gets users data schema for a hoodie table in Avro format.
-   *
-   * @return  Avro user data schema
-   * @throws Exception
-   *
-   * @deprecated use {@link #getTableAvroSchema(boolean)} instead
-   */
-  @Deprecated
-  public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
-    return getTableAvroSchemaInternal(false, 
Option.empty()).orElseThrow(schemaNotFoundError());
+  public HoodieSchema getTableSchema(HoodieInstant instant, boolean 
includeMetadataFields) throws Exception {
+    return getTableSchemaInternal(includeMetadataFields, 
Option.of(instant)).orElseThrow(schemaNotFoundError());
   }
 
-  public Option<Schema> getTableAvroSchemaIfPresent(boolean 
includeMetadataFields) {
-    return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
+  public Option<HoodieSchema> getTableSchemaIfPresent(boolean 
includeMetadataFields) {
+    return getTableSchemaInternal(includeMetadataFields, Option.empty());
   }
 
-  private Option<Schema> getTableAvroSchemaInternal(boolean 
includeMetadataFields, Option<HoodieInstant> instantOpt) {
-    Option<Schema> schema =
+  private Option<HoodieSchema> getTableSchemaInternal(boolean 
includeMetadataFields, Option<HoodieInstant> instantOpt) {
+    Option<HoodieSchema> schema =
         (instantOpt.isPresent()
             ? getTableSchemaFromCommitMetadata(instantOpt.get(), 
includeMetadataFields)
             : getTableSchemaFromLatestCommitMetadata(includeMetadataFields))
@@ -246,38 +175,37 @@ public class TableSchemaResolver {
                 metaClient.getTableConfig().getTableCreateSchema()
                     .map(tableSchema ->
                         includeMetadataFields
-                            ? HoodieAvroUtils.addMetadataFields(tableSchema, 
hasOperationField.get())
+                            ? HoodieSchemaUtils.addMetadataFields(tableSchema, 
hasOperationField.get())
                             : tableSchema)
             )
             .or(() -> {
-              Option<Schema> schemaFromDataFile = 
getTableAvroSchemaFromDataFileInternal();
+              Option<HoodieSchema> schemaFromDataFile = 
getTableSchemaFromDataFileInternal();
               return includeMetadataFields
                   ? schemaFromDataFile
-                  : 
schemaFromDataFile.map(HoodieAvroUtils::removeMetadataFields);
+                  : 
schemaFromDataFile.map(HoodieSchemaUtils::removeMetadataFields);
             });
 
     // TODO partition columns have to be appended in all read-paths
     if (metaClient.getTableConfig().shouldDropPartitionColumns() && 
schema.isPresent()) {
-      HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(schema.get());
+      HoodieSchema hoodieSchema = schema.get();
       return metaClient.getTableConfig().getPartitionFields()
           .map(partitionFields -> appendPartitionColumns(hoodieSchema, 
Option.ofNullable(partitionFields)))
-          .map(HoodieSchema::toAvroSchema)
           .or(() -> schema);
     }
 
     return schema;
   }
 
-  private Option<Schema> getTableSchemaFromLatestCommitMetadata(boolean 
includeMetadataFields) {
+  private Option<HoodieSchema> getTableSchemaFromLatestCommitMetadata(boolean 
includeMetadataFields) {
     Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata 
= getLatestCommitMetadataWithValidSchema();
     if (instantAndCommitMetadata.isPresent()) {
       HoodieCommitMetadata commitMetadata = 
instantAndCommitMetadata.get().getRight();
       String schemaStr = 
commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
-      Schema schema = new Schema.Parser().parse(schemaStr);
+      HoodieSchema schema = HoodieSchema.parse(schemaStr);
       if (includeMetadataFields) {
-        schema = HoodieAvroUtils.addMetadataFields(schema, 
hasOperationField.get());
+        schema = HoodieSchemaUtils.addMetadataFields(schema, 
hasOperationField.get());
       } else {
-        schema = HoodieAvroUtils.removeMetadataFields(schema);
+        schema = HoodieSchemaUtils.removeMetadataFields(schema);
       }
       return Option.of(schema);
     } else {
@@ -285,7 +213,7 @@ public class TableSchemaResolver {
     }
   }
 
-  private Option<Schema> getTableSchemaFromCommitMetadata(HoodieInstant 
instant, boolean includeMetadataFields) {
+  private Option<HoodieSchema> getTableSchemaFromCommitMetadata(HoodieInstant 
instant, boolean includeMetadataFields) {
     try {
       HoodieCommitMetadata metadata = getCachedCommitMetadata(instant);
       String existingSchemaStr = 
metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
@@ -294,11 +222,11 @@ public class TableSchemaResolver {
         return Option.empty();
       }
 
-      Schema schema = new Schema.Parser().parse(existingSchemaStr);
+      HoodieSchema schema = HoodieSchema.parse(existingSchemaStr);
       if (includeMetadataFields) {
-        schema = HoodieAvroUtils.addMetadataFields(schema, 
hasOperationField.get());
+        schema = HoodieSchemaUtils.addMetadataFields(schema, 
hasOperationField.get());
       } else {
-        schema = HoodieAvroUtils.removeMetadataFields(schema);
+        schema = HoodieSchemaUtils.removeMetadataFields(schema);
       }
       return Option.of(schema);
     } catch (Exception e) {
@@ -309,7 +237,7 @@ public class TableSchemaResolver {
   /**
    * Fetches the schema for a table from any the table's data files
    */
-  private Option<Schema> getTableParquetSchemaFromDataFile() {
+  private Option<HoodieSchema> getTableParquetSchemaFromDataFile() {
     Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata 
= getLatestCommitMetadataWithInsertOrUpdate();
     switch (metaClient.getTableType()) {
       case COPY_ON_WRITE:
@@ -335,59 +263,22 @@ public class TableSchemaResolver {
     }
   }
 
-  /**
-   * Returns table's latest Avro {@link Schema} iff table is non-empty (ie 
there's at least
-   * a single commit)
-   *
-   * This method differs from {@link #getTableAvroSchema(boolean)} in that it 
won't fallback
-   * to use table's schema used at creation
-   */
-  public Option<Schema> getTableAvroSchemaFromLatestCommit(boolean 
includeMetadataFields) throws Exception {
-    if (metaClient.isTimelineNonEmpty()) {
-      return getTableAvroSchemaInternal(includeMetadataFields, Option.empty());
-    }
-
-    return Option.empty();
-  }
-
   /**
    * Returns table's latest {@link HoodieSchema} iff table is non-empty (ie 
there's at least
    * a single commit)
    *
-   * This method differs from {@link #getTableAvroSchema(boolean)} in that it 
won't fallback
+   * This method differs from {@link #getTableSchema(boolean)} in that it 
won't fallback
    * to use table's schema used at creation
    */
-  public Option<HoodieSchema> getTableSchemaFromLatestCommit(boolean 
includeMetadataFields) {
+  public Option<HoodieSchema> getTableSchemaFromLatestCommit(boolean 
includeMetadataFields) throws Exception {
     if (metaClient.isTimelineNonEmpty()) {
-      return getTableAvroSchemaInternal(includeMetadataFields, 
Option.empty()).map(HoodieSchema::fromAvroSchema);
+      return getTableSchemaInternal(includeMetadataFields, Option.empty());
     }
 
     return Option.empty();
   }
 
-  /**
-   * Read schema from a data file from the last compaction commit done.
-   *
-   * @deprecated please use {@link #getTableAvroSchema(HoodieInstant, 
boolean)} instead
-   */
-  public Schema readSchemaFromLastCompaction(Option<HoodieInstant> 
lastCompactionCommitOpt) throws Exception {
-    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
-
-    HoodieInstant lastCompactionCommit = 
lastCompactionCommitOpt.orElseThrow(() -> new Exception(
-        "Could not read schema from last compaction, no compaction commits 
found on path " + metaClient));
-
-    // Read from the compacted file wrote
-    HoodieCommitMetadata compactionMetadata =
-        activeTimeline.readCommitMetadata(lastCompactionCommit);
-    String filePath = 
compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().stream().findAny()
-        .orElseThrow(() -> new IllegalArgumentException("Could not find any 
data file written for compaction "
-            + lastCompactionCommit + ", could not get schema for table " + 
metaClient.getBasePath()));
-    StoragePath path = new StoragePath(filePath);
-    return HoodieIOFactory.getIOFactory(metaClient.getStorage())
-        .getFileFormatUtils(path).readSchema(metaClient.getStorage(), 
path).toAvroSchema();
-  }
-
-  private Schema readSchemaFromLogFile(StoragePath path) throws IOException {
+  private HoodieSchema readSchemaFromLogFile(StoragePath path) throws 
IOException {
     return readSchemaFromLogFile(metaClient.getRawStorage(), path);
   }
 
@@ -396,7 +287,7 @@ public class TableSchemaResolver {
    *
    * @return
    */
-  public static Schema readSchemaFromLogFile(HoodieStorage storage, 
StoragePath path) throws IOException {
+  public static HoodieSchema readSchemaFromLogFile(HoodieStorage storage, 
StoragePath path) throws IOException {
     // We only need to read the schema from the log block header,
     // so we read the block lazily to avoid reading block content
     // containing the records
@@ -408,7 +299,7 @@ public class TableSchemaResolver {
           lastBlock = (HoodieDataBlock) block;
         }
       }
-      return lastBlock != null ? lastBlock.getSchema().toAvroSchema() : null;
+      return lastBlock != null ? lastBlock.getSchema() : null;
     }
   }
 
@@ -476,10 +367,10 @@ public class TableSchemaResolver {
    */
   public boolean hasOperationField() {
     try {
-      Schema tableAvroSchema = getTableAvroSchemaFromDataFile();
-      return tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD) 
!= null;
+      HoodieSchema tableSchema = getTableSchemaFromDataFile();
+      return 
tableSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD).isPresent();
     } catch (Exception e) {
-      LOG.info("Failed to read operation field from avro schema ({})", 
e.getMessage());
+      LOG.info("Failed to read operation field from schema ({})", 
e.getMessage());
       return false;
     }
   }
@@ -560,16 +451,15 @@ public class TableSchemaResolver {
         });
   }
 
-  private Schema fetchSchemaFromFiles(Stream<StoragePath> filePaths) {
+  private HoodieSchema fetchSchemaFromFiles(Stream<StoragePath> filePaths) {
     return filePaths.map(filePath -> {
       try {
         if (FSUtils.isLogFile(filePath)) {
           // this is a log file
           return readSchemaFromLogFile(filePath);
         } else {
-          HoodieSchema hoodieSchema = 
HoodieIOFactory.getIOFactory(metaClient.getStorage())
+          return HoodieIOFactory.getIOFactory(metaClient.getStorage())
               
.getFileFormatUtils(filePath).readSchema(metaClient.getStorage(), filePath);
-          return hoodieSchema != null ? hoodieSchema.toAvroSchema() : null;
         }
       } catch (IOException e) {
         throw new HoodieIOException("Failed to read schema from file: " + 
filePath, e);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java
index 9af83d5054f9..58f4043f16b3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PartialUpdateHandler.java
@@ -19,12 +19,12 @@
 
 package org.apache.hudi.common.table.read;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.RecordContext;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.PartialUpdateMode;
 
 import java.io.Serializable;
@@ -117,7 +117,7 @@ public class PartialUpdateHandler<T> implements 
Serializable {
       String fieldName = field.name();
       // The default value only from the top-level data type is validated. 
That means,
       // for nested columns, we do not check the leaf level data type defaults.
-      Object defaultValue = 
HoodieAvroUtils.toJavaDefaultValue(field.getAvroField());
+      Object defaultValue = HoodieSchemaUtils.toJavaDefaultValue(field);
       Object newValue = recordContext.getValue(highOrderRecord.getRecord(), 
highOrderSchema, fieldName);
       if (defaultValue == newValue) {
         fieldVals[idx++] = recordContext.getValue(lowOrderRecord.getRecord(), 
lowOrderSchema, fieldName);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java
 
b/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java
index 3ad8bcf29dca..2cc3a44b2dbe 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/index/secondary/SecondaryIndexManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.index.secondary;
 
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
@@ -29,7 +30,6 @@ import 
org.apache.hudi.exception.HoodieSecondaryIndexException;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 
 import java.util.Collections;
 import java.util.LinkedHashMap;
@@ -82,16 +82,16 @@ public class SecondaryIndexManager {
       Map<String, String> options) {
     Option<List<HoodieSecondaryIndex>> secondaryIndexes = 
SecondaryIndexUtils.getSecondaryIndexes(metaClient);
     Set<String> colNames = columns.keySet();
-    Schema avroSchema;
+    HoodieSchema schema;
     try {
-      avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema(false);
+      schema = new TableSchemaResolver(metaClient).getTableSchema(false);
     } catch (Exception e) {
       throw new HoodieSecondaryIndexException(
           "Failed to get table avro schema: " + 
metaClient.getTableConfig().getTableName());
     }
 
     for (String col : colNames) {
-      if (avroSchema.getField(col) == null) {
+      if (schema.getField(col).isEmpty()) {
         throw new HoodieSecondaryIndexException("Field not exists: " + col);
       }
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index b2839706ae78..30683f901219 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1607,7 +1607,7 @@ public class HoodieTableMetadataUtil {
 
     // NOTE: Writer schema added to commit metadata will not contain Hudi's 
metadata fields
     Option<HoodieSchema> tableSchema = writerSchema.isEmpty()
-        ? tableConfig.getTableCreateSchema().map(HoodieSchema::fromAvroSchema) 
// the write schema does not set up correctly
+        ? tableConfig.getTableCreateSchema() // the write schema does not set 
up correctly
         : writerSchema.map(schema -> tableConfig.populateMetaFields() ? 
addMetadataFields(schema) : schema)
             .map(HoodieSchema::fromAvroSchema);
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
index 9cafba1fb80d..f79405cfbf30 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
@@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.internal.schema.HoodieSchemaException;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -36,15 +37,20 @@ import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
@@ -1760,6 +1766,229 @@ public class TestHoodieSchemaUtils {
     assertEquals(HoodieSchemaType.STRING, result.getType());
   }
 
+  @Test
+  public void testToJavaDefaultValueNull() {
+    // Field with no default value
+    HoodieSchemaField field = HoodieSchemaField.of("testField", 
HoodieSchema.create(HoodieSchemaType.STRING));
+    Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+    assertNull(result);
+  }
+
+  @Test
+  public void testToJavaDefaultValueNullValue() {
+    // Field with explicit NULL default value
+    HoodieSchemaField field = HoodieSchemaField.of("testField",
+        
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.STRING)),
+        null,
+        HoodieSchema.NULL_VALUE);
+    Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+    assertNull(result);
+  }
+
+  @Test
+  public void testToJavaDefaultValueString() {
+    String defaultVal = "defaultString";
+    HoodieSchemaField field = HoodieSchemaField.of("stringField",
+        HoodieSchema.create(HoodieSchemaType.STRING),
+        null,
+        defaultVal);
+    Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+    assertEquals(defaultVal, result);
+  }
+
+  @Test
+  public void testToJavaDefaultValueInt() {
+    int defaultVal = 42;
+    HoodieSchemaField field = HoodieSchemaField.of("intField",
+        HoodieSchema.create(HoodieSchemaType.INT),
+        null,
+        defaultVal);
+    Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+    assertEquals(defaultVal, result);
+  }
+
+  @Test
+  public void testToJavaDefaultValueLong() {
+    long defaultVal = 12345L;
+    HoodieSchemaField field = HoodieSchemaField.of("longField",
+        HoodieSchema.create(HoodieSchemaType.LONG),
+        null,
+        defaultVal);
+    Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+    assertEquals(defaultVal, result);
+  }
+
+  @Test
+  public void testToJavaDefaultValueFloat() {
+    float defaultVal = 3.14f;
+    HoodieSchemaField field = HoodieSchemaField.of("floatField",
+        HoodieSchema.create(HoodieSchemaType.FLOAT),
+        null,
+        defaultVal);
+    Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+    assertEquals(defaultVal, result);
+  }
+
+  @Test
+  public void testToJavaDefaultValueDouble() {
+    double defaultVal = 2.718;
+    HoodieSchemaField field = HoodieSchemaField.of("doubleField",
+        HoodieSchema.create(HoodieSchemaType.DOUBLE),
+        null,
+        defaultVal);
+    Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+    assertEquals(defaultVal, result);
+  }
+
+  @Test
+  public void testToJavaDefaultValueBoolean() {
+    HoodieSchemaField field = HoodieSchemaField.of("boolField",
+        HoodieSchema.create(HoodieSchemaType.BOOLEAN),
+        null,
+        true);
+    Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+    assertEquals(true, result);
+  }
+
+  @Test
+  public void testToJavaDefaultValueBytes() {
+    byte[] defaultBytes = new byte[]{1, 2, 3, 4};
+    HoodieSchemaField field = HoodieSchemaField.of("bytesField",
+        HoodieSchema.create(HoodieSchemaType.BYTES),
+        null,
+        defaultBytes);
+    Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+    assertArrayEquals(defaultBytes, (byte[]) result);
+  }
+
+  @Test
+  public void testToJavaDefaultValueEnum() {
+    HoodieSchema enumSchema = HoodieSchema.createEnum("Status", null, null, 
Arrays.asList("ACTIVE", "INACTIVE", "PENDING"));
+    HoodieSchemaField field = HoodieSchemaField.of("statusField",
+        enumSchema,
+        null,
+        "ACTIVE");
+    Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+    assertEquals("ACTIVE", result);
+  }
+
+  @Test
+  public void testToJavaDefaultValueArray() {
+    // Create array schema with int elements
+    HoodieSchema arraySchema = 
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.INT));
+
+    // Default value as a list
+    List<Integer> defaultList = Arrays.asList(1, 2, 3);
+    HoodieSchemaField field = HoodieSchemaField.of("arrayField",
+        arraySchema,
+        null,
+        defaultList);
+
+    Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+    assertNotNull(result);
+    assertInstanceOf(Collection.class, result);
+    assertArrayEquals(defaultList.toArray(), ((Collection<?>) 
result).toArray());
+  }
+
+  @Test
+  public void testToJavaDefaultValueMap() {
+    // Create map schema with string values
+    HoodieSchema mapSchema = 
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.STRING));
+
+    // Default value as a map
+    Map<String, String> defaultMap = new HashMap<>();
+    defaultMap.put("key1", "value1");
+    defaultMap.put("key2", "value2");
+
+    HoodieSchemaField field = HoodieSchemaField.of("mapField",
+        mapSchema,
+        null,
+        defaultMap);
+
+    Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+    assertNotNull(result);
+    // GenericData should return a map
+    assertInstanceOf(Map.class, result);
+  }
+
+  @Test
+  public void testToJavaDefaultValueRecord() {
+    // Create nested record schema
+    HoodieSchema nestedRecordSchema = HoodieSchema.createRecord(
+        "NestedRecord",
+        null,
+        null,
+        Arrays.asList(
+            HoodieSchemaField.of("field1", 
HoodieSchema.create(HoodieSchemaType.STRING), null, "default1"),
+            HoodieSchemaField.of("field2", 
HoodieSchema.create(HoodieSchemaType.INT), null, 10)
+        )
+    );
+
+    // Create a default record value
+    Map<String, Object> defaultRecord = new HashMap<>();
+    defaultRecord.put("field1", "default1");
+    defaultRecord.put("field2", 10);
+
+    HoodieSchemaField field = HoodieSchemaField.of("recordField",
+        nestedRecordSchema,
+        null,
+        defaultRecord);
+
+    Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+    assertNotNull(result);
+    // GenericData should return a GenericRecord
+    assertInstanceOf(GenericRecord.class, result);
+  }
+
+  @Test
+  public void testToJavaDefaultValueNullableField() {
+    // Create nullable string field with default value
+    // With a non-null defaultValue, the union type must be ["string", null]
+    HoodieSchema nullableStringSchema = HoodieSchema.createUnion(
+        HoodieSchema.create(HoodieSchemaType.STRING), 
HoodieSchema.create(HoodieSchemaType.NULL));
+    HoodieSchemaField field = HoodieSchemaField.of("nullableField",
+        nullableStringSchema,
+        null,
+        "defaultValue");
+
+    Object result = HoodieSchemaUtils.toJavaDefaultValue(field);
+    assertEquals("defaultValue", result);
+  }
+
+  @SuppressWarnings("DataFlowIssue")
+  @Test
+  public void testToJavaDefaultValueNullFieldArgument() {
+    // Should throw IllegalArgumentException for null field
+    assertThrows(IllegalArgumentException.class, () -> 
HoodieSchemaUtils.toJavaDefaultValue(null));
+  }
+
+  @Test
+  public void testToJavaDefaultValueConsistencyWithAvro() {
+    // Test that HoodieSchemaUtils.toJavaDefaultValue produces equivalent 
results to HoodieAvroUtils.toJavaDefaultValue
+
+    // Test with primitive types
+    HoodieSchemaField stringField = HoodieSchemaField.of("stringField",
+        HoodieSchema.create(HoodieSchemaType.STRING),
+        null,
+        "test");
+
+    Object hoodieResult = HoodieSchemaUtils.toJavaDefaultValue(stringField);
+    Object avroResult = 
HoodieAvroUtils.toJavaDefaultValue(stringField.getAvroField());
+
+    assertEquals(avroResult, hoodieResult);
+
+    // Test with int
+    HoodieSchemaField intField = HoodieSchemaField.of("intField",
+        HoodieSchema.create(HoodieSchemaType.INT),
+        null,
+        42);
+
+    Object hoodieIntResult = HoodieSchemaUtils.toJavaDefaultValue(intField);
+    Object avroIntResult = 
HoodieAvroUtils.toJavaDefaultValue(intField.getAvroField());
+
+    assertEquals(avroIntResult, hoodieIntResult);
+  }
+
   @Test
   void testLogicalTypesRetainedAfterPruneWithNestedRecords() {
     final String logicalTypeKey = "logicalType";
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index 4c79254fdd22..2b850aaa95c7 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -715,8 +715,8 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
     HoodieSchema schema = new TableSchemaResolver(metaClient).getTableSchema();
     expectedHoodieRecords = 
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieRecords, metaClient, 
schema);
     expectedHoodieUnmergedRecords = 
getExpectedHoodieRecordsWithOrderingValue(expectedHoodieUnmergedRecords, 
metaClient, schema);
-    List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords = 
convertHoodieRecords(expectedHoodieRecords, schema.toAvroSchema(), 
orderingFields);
-    List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords = 
convertHoodieRecords(expectedHoodieUnmergedRecords, schema.toAvroSchema(), 
orderingFields);
+    List<HoodieTestDataGenerator.RecordIdentifier> expectedRecords = 
convertHoodieRecords(expectedHoodieRecords, schema, orderingFields);
+    List<HoodieTestDataGenerator.RecordIdentifier> expectedUnmergedRecords = 
convertHoodieRecords(expectedHoodieUnmergedRecords, schema, orderingFields);
     validateOutputFromFileGroupReaderWithExistingRecords(
         storageConf, tablePath, containsBaseFile, expectedLogFileNum, 
recordMergeMode,
         expectedRecords, expectedUnmergedRecords);
@@ -936,7 +936,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
             .collect(Collectors.toList());
   }
 
-  private List<HoodieTestDataGenerator.RecordIdentifier> 
convertHoodieRecords(List<HoodieRecord> records, Schema schema, String[] 
orderingFields) {
+  private List<HoodieTestDataGenerator.RecordIdentifier> 
convertHoodieRecords(List<HoodieRecord> records, HoodieSchema schema, String[] 
orderingFields) {
     return records.stream().map(record -> 
HoodieTestDataGenerator.RecordIdentifier.fromTripTestPayload((HoodieAvroIndexedRecord)
 record, orderingFields)).collect(Collectors.toList());
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
index e796ef174d8f..b2e5072cdbbb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java
@@ -129,7 +129,7 @@ public class CompactionUtil {
    */
   public static void inferChangelogMode(Configuration conf, 
HoodieTableMetaClient metaClient) throws Exception {
     TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
-    HoodieSchema tableAvroSchema = 
HoodieSchema.fromAvroSchema(tableSchemaResolver.getTableAvroSchemaFromDataFile());
+    HoodieSchema tableAvroSchema = 
tableSchemaResolver.getTableSchemaFromDataFile();
     if 
(tableAvroSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD).isPresent()) {
       conf.set(FlinkOptions.CHANGELOG_ENABLED, true);
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index d8f41fd984ef..ab8ce6f58317 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -22,6 +22,7 @@ import 
org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -261,7 +262,7 @@ public class TestHoodieCatalog extends 
BaseTestHoodieCatalog {
     HoodieTableConfig tableConfig = StreamerUtil.getTableConfig(
         catalog.getTable(tablePath).getOptions().get(FlinkOptions.PATH.key()),
         HadoopConfigurations.getHadoopConf(new Configuration())).get();
-    Option<org.apache.avro.Schema> tableCreateSchema = 
tableConfig.getTableCreateSchema();
+    Option<HoodieSchema> tableCreateSchema = 
tableConfig.getTableCreateSchema();
     assertTrue(tableCreateSchema.isPresent(), "Table should have been 
created");
     assertThat(tableCreateSchema.get().getFullName(), 
is("hoodie.tb1.tb1_record"));
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index 795730e9b098..f3096b5d0df8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -220,7 +221,7 @@ public class TestHoodieHiveCatalog extends 
BaseTestHoodieCatalog {
 
     // validate the full name of table create schema
     HoodieTableConfig tableConfig = 
StreamerUtil.getTableConfig(table1.getOptions().get(FlinkOptions.PATH.key()), 
hoodieCatalog.getHiveConf()).get();
-    Option<org.apache.avro.Schema> tableCreateSchema = 
tableConfig.getTableCreateSchema();
+    Option<HoodieSchema> tableCreateSchema = 
tableConfig.getTableCreateSchema();
     assertTrue(tableCreateSchema.isPresent(), "Table should have been 
created");
     assertThat(tableCreateSchema.get().getFullName(), 
is("hoodie.test.test_record"));
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 956b116ecd8d..4a26f8715e96 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -40,10 +40,10 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.internal.schema.HoodieSchemaException;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
-import org.apache.hudi.storage.HoodieStorageUtils;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
@@ -64,7 +64,6 @@ import static 
org.apache.hudi.common.testutils.HoodieCommonTestHarness.getDataBl
 import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
@@ -113,49 +112,13 @@ class TestTableSchemaResolver {
     }
   }
 
-  @Test
-  void testGetTableSchema() throws Exception {
-    // Setup: Create mock metaClient and configure behavior
-    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
-    HoodieSchema expectedSchema = getSimpleSchema();
-
-    // Mock table setup
-    when(metaClient.getTableConfig().populateMetaFields()).thenReturn(true);
-    when(metaClient.getTableConfig().getTableCreateSchema())
-        .thenReturn(Option.of(expectedSchema.toAvroSchema()));
-
-    when(metaClient.getActiveTimeline().getLastCommitMetadataWithValidSchema())
-        .thenReturn(Option.empty());
-
-    // Create resolver and call both methods
-    TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
-
-    // Test 1: getTableSchema() - should use table config's populateMetaFields 
(true)
-    Schema avroSchema = resolver.getTableAvroSchema();
-    HoodieSchema hoodieSchema = resolver.getTableSchema();
-    assertNotNull(hoodieSchema);
-    assertEquals(avroSchema, hoodieSchema.getAvroSchema());
-
-    // Test 2: getTableSchema(true) - explicitly include metadata fields
-    Schema avroSchemaWithMetadata = resolver.getTableAvroSchema(true);
-    HoodieSchema hoodieSchemaWithMetadata = resolver.getTableSchema(true);
-    assertNotNull(hoodieSchemaWithMetadata);
-    assertEquals(avroSchemaWithMetadata, 
hoodieSchemaWithMetadata.getAvroSchema());
-
-    // Test 3: getTableSchema(false) - explicitly exclude metadata fields
-    Schema avroSchemaWithoutMetadata = resolver.getTableAvroSchema(false);
-    HoodieSchema hoodieSchemaWithoutMetadata = resolver.getTableSchema(false);
-    assertNotNull(hoodieSchemaWithoutMetadata);
-    assertEquals(avroSchemaWithoutMetadata, 
hoodieSchemaWithoutMetadata.getAvroSchema());
-  }
-
   @Test
   void testReadSchemaFromLogFile() throws IOException, URISyntaxException, 
InterruptedException {
     String testDir = initTestDir("read_schema_from_log_file");
     StoragePath partitionPath = new StoragePath(testDir, "partition1");
     HoodieSchema expectedSchema = getSimpleSchema();
     StoragePath logFilePath = writeLogFile(partitionPath, 
expectedSchema.toAvroSchema());
-    assertEquals(expectedSchema.toAvroSchema(), 
TableSchemaResolver.readSchemaFromLogFile(
+    assertEquals(expectedSchema, TableSchemaResolver.readSchemaFromLogFile(
         HoodieStorageUtils.getStorage(new StoragePath(logFilePath.toString()), 
HoodieTestUtils.getDefaultStorageConf()),
         logFilePath));
   }
@@ -194,8 +157,9 @@ class TestTableSchemaResolver {
     try (MockedStatic<HoodieIOFactory> ioFactoryMockedStatic = 
mockStatic(HoodieIOFactory.class);
          MockedStatic<TableSchemaResolver> tableSchemaResolverMockedStatic = 
mockStatic(TableSchemaResolver.class)) {
       // return null for first parquet file to force iteration to inspect the 
next file
-      Schema schema = Schema.createRecord("test_schema", null, 
"test_namespace", false);
-      schema.setFields(Arrays.asList(new Schema.Field("int_field", 
Schema.create(Schema.Type.INT)), new Schema.Field("_hoodie_operation", 
Schema.create(Schema.Type.STRING))));
+      HoodieSchema schema = HoodieSchema.createRecord("test_schema", null, 
"test_namespace", false,
+          Arrays.asList(HoodieSchemaField.of("int_field", 
HoodieSchema.create(HoodieSchemaType.INT)),
+              HoodieSchemaField.of("_hoodie_operation", 
HoodieSchema.create(HoodieSchemaType.STRING))));
 
       // mock parquet file schema reading to return null for the first base 
file to force iteration
       HoodieIOFactory ioFactory = mock(HoodieIOFactory.class);
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
index 631065bba788..dd7cc6d17809 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
@@ -133,7 +133,7 @@ public class SchemaEvolutionContext {
         json -> Option.ofNullable(new Schema.Parser().parse(json)));
     if (avroSchemaOpt == null) {
       // the code path should only be invoked in tests.
-      return new TableSchemaResolver(this.metaClient).getTableAvroSchema();
+      return new 
TableSchemaResolver(this.metaClient).getTableSchema().toAvroSchema();
     }
     return avroSchemaOpt.orElseThrow(() -> new HoodieValidationException("The 
avro schema cache should always be set up together with the internal schema 
cache"));
   }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
index 51477deffee6..a71695be427e 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
@@ -409,7 +409,7 @@ public class HoodieCombineHiveInputFormat<K extends 
WritableComparable, V extend
         for (String path : uniqTablePaths) {
           HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(path).setConf(new 
HadoopStorageConfiguration(job)).build();
           TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
-          String avroSchema = schemaUtil.getTableAvroSchema().toString();
+          String avroSchema = schemaUtil.getTableSchema().toString();
           Option<InternalSchema> internalSchema = 
schemaUtil.getTableInternalSchemaFromCommitMetadata();
           if (internalSchema.isPresent()) {
             LOG.info("Set internal and avro schema cache with path: {}", path);
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
index 4d4d8a4da0e8..de352328973e 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
@@ -42,10 +43,10 @@ import org.apache.hudi.hadoop.RealtimeFileStatus;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.storage.StoragePathInfo;
 
-import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -419,14 +420,19 @@ public class HoodieMergeOnReadTableInputFormat extends 
HoodieCopyOnWriteTableInp
     }
     TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
     try {
-      Schema schema = tableSchemaResolver.getTableAvroSchema();
-      boolean isNonPartitionedKeyGen = 
StringUtils.isNullOrEmpty(tableConfig.getPartitionFieldProp());
+      HoodieSchema schema = tableSchemaResolver.getTableSchema();
+      String partitionFieldProp = tableConfig.getPartitionFieldProp();
+      boolean isNonPartitionedKeyGen = 
StringUtils.isNullOrEmpty(partitionFieldProp);
       return Option.of(
           new HoodieVirtualKeyInfo(
               tableConfig.getRecordKeyFieldProp(),
-              isNonPartitionedKeyGen ? Option.empty() : 
Option.of(tableConfig.getPartitionFieldProp()),
-              schema.getField(tableConfig.getRecordKeyFieldProp()).pos(),
-              isNonPartitionedKeyGen ? Option.empty() : 
Option.of(schema.getField(tableConfig.getPartitionFieldProp()).pos())));
+              isNonPartitionedKeyGen ? Option.empty() : 
Option.of(partitionFieldProp),
+              schema.getField(tableConfig.getRecordKeyFieldProp())
+                  .orElseThrow(() -> new HoodieSchemaException("Field: " + 
tableConfig.getRecordKeyFieldProp() + " not found"))
+                  .pos(),
+              isNonPartitionedKeyGen ? Option.empty() : 
Option.of(schema.getField(partitionFieldProp)
+                  .orElseThrow(() -> new HoodieSchemaException("Field: " + 
partitionFieldProp + " not found"))
+                  .pos())));
     } catch (Exception exception) {
       throw new HoodieException("Fetching table schema failed with exception 
", exception);
     }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
index dcc002adfb6f..71c0ac4d379a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BucketIndexSupport.scala
@@ -52,7 +52,7 @@ class BucketIndexSupport(spark: SparkSession,
     HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
   }
 
-  private lazy val avroSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema(false)
+  private lazy val schema = new 
TableSchemaResolver(metaClient).getTableSchema(false)
 
   override def getIndexName: String = BucketIndexSupport.INDEX_NAME
 
@@ -133,7 +133,7 @@ class BucketIndexSupport(spark: SparkSession,
     if (hashValuePairs.size != indexBucketHashFields.size) {
       matchedBuckets.setUntil(numBuckets)
     } else {
-      val record = new GenericData.Record(avroSchema)
+      val record = new GenericData.Record(schema.toAvroSchema)
       hashValuePairs.foreach(p => record.put(p.getKey, p.getValue))
       val hoodieKey = keyGenerator.getKey(record)
       matchedBuckets.set(BucketIdentifier.getBucketId(hoodieKey.getRecordKey, 
indexBucketHashFieldsOpt.get, numBuckets))
@@ -153,7 +153,7 @@ class BucketIndexSupport(spark: SparkSession,
   private def getBucketsBySingleHashFields(expr: Expression, bucketColumnName: 
String, numBuckets: Int): BitSet = {
 
     def getBucketNumber(attr: Attribute, v: Any): Int = {
-      val record = new GenericData.Record(avroSchema)
+      val record = new GenericData.Record(schema.toAvroSchema)
       record.put(attr.name, v)
       val hoodieKey = keyGenerator.getKey(record)
       BucketIdentifier.getBucketId(hoodieKey.getRecordKey, 
indexBucketHashFieldsOpt.get, numBuckets)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
index e2c39c3938bb..1b40b0fe5701 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCLIUtils.scala
@@ -50,7 +50,7 @@ object HoodieCLIUtils extends Logging {
     val metaClient = HoodieTableMetaClient.builder().setBasePath(basePath)
       
.setConf(HadoopFSUtils.getStorageConf(sparkSession.sessionState.newHadoopConf())).build()
     val schemaUtil = new TableSchemaResolver(metaClient)
-    val schemaStr = schemaUtil.getTableAvroSchema(false).toString
+    val schemaStr = schemaUtil.getTableSchema(false).toString
 
     // If tableName is provided, we need to add catalog props
     val catalogProps = tableName match {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f17d641edb11..38b22715f4c4 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -446,7 +446,7 @@ class HoodieSparkSqlWriterInternal {
             }
 
             // Issue the delete.
-            val schemaStr = new 
TableSchemaResolver(tableMetaClient).getTableAvroSchema(false).toString
+            val schemaStr = new 
TableSchemaResolver(tableMetaClient).getTableSchema(false).toString
             val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
                 schemaStr, path, tblName,
                 parameters.asJava))
@@ -683,8 +683,7 @@ class HoodieSparkSqlWriterInternal {
 
   private def getLatestTableSchema(tableMetaClient: HoodieTableMetaClient, 
schemaFromCatalog: Option[HoodieSchema]): Option[HoodieSchema] = {
     val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
-    
toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false))
-      .map(HoodieSchema.fromAvroSchema)
+    toScalaOption(tableSchemaResolver.getTableSchemaFromLatestCommit(false))
       .orElse(schemaFromCatalog)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
index 4a79350b35c3..74b04503a563 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala
@@ -233,7 +233,7 @@ class RunClusteringProcedure extends BaseProcedure
     }
 
     val tableSchemaResolver = new TableSchemaResolver(metaClient)
-    val fields = tableSchemaResolver.getTableAvroSchema(false)
+    val fields = tableSchemaResolver.getTableSchema(false)
       .getFields.asScala.map(_.name().toLowerCase)
     orderColumns.split(",").foreach(col => {
       if (!fields.contains(col.toLowerCase)) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
index 1ffdd0d80cad..5257f3c20bda 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
@@ -74,7 +74,7 @@ class ShowHoodieLogFileMetadataProcedure extends 
BaseProcedure with ProcedureBui
       logFilePath => {
         val statuses = storage.listDirectEntries(new StoragePath(logFilePath))
         val schema = TableSchemaResolver.readSchemaFromLogFile(storage, new 
StoragePath(logFilePath))
-        val reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(statuses.get(0).getPath), HoodieSchema.fromAvroSchema(schema))
+        val reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(statuses.get(0).getPath), schema)
 
         // read the avro blocks
         while (reader.hasNext) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index 58c4fa1acb43..4b9e0e83c7d5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -70,7 +70,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure 
with ProcedureBuil
     ValidationUtils.checkArgument(logFilePaths.nonEmpty, "There is no log 
file")
     val allRecords: java.util.List[IndexedRecord] = new 
java.util.ArrayList[IndexedRecord]
     if (merge) {
-      val schema = 
HoodieSchema.fromAvroSchema(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage,
 new StoragePath(logFilePaths.last))))
+      val schema = 
Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new 
StoragePath(logFilePaths.last)))
       val scanner = HoodieMergedLogRecordScanner.newBuilder
         .withStorage(storage)
         .withBasePath(basePath)
@@ -94,7 +94,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure 
with ProcedureBuil
       logFilePaths.toStream.takeWhile(_ => allRecords.size() < limit).foreach {
         logFilePath => {
           val schema = 
Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new 
StoragePath(logFilePath)))
-          val reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(logFilePath), HoodieSchema.fromAvroSchema(schema))
+          val reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(logFilePath), schema)
           while (reader.hasNext) {
             val block = reader.next()
             block match {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 72d0db584ae8..1a0c0c730741 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -320,7 +320,7 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
       // Validate table schema in the end.
       TableSchemaResolver r = new TableSchemaResolver(metaClient);
       // Assert no table schema is defined.
-      assertThrows(HoodieSchemaNotFoundException.class, () -> 
r.getTableAvroSchema(false));
+      assertThrows(HoodieSchemaNotFoundException.class, () -> 
r.getTableSchema(false));
     }
 
     // Start txn 002 altering table schema
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index a35348f6e3aa..82750b5ab4b7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -123,7 +123,6 @@ import org.apache.hudi.testutils.HoodieClientTestUtils;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.spark.api.java.JavaRDD;
@@ -1402,7 +1401,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     HoodieStorage storage = table.getStorage();
     for (HoodieLogFile logFile : logFiles) {
       List<StoragePathInfo> pathInfoList = 
storage.listDirectEntries(logFile.getPath());
-      Schema writerSchema  =
+      HoodieSchema writerSchema  =
           TableSchemaResolver.readSchemaFromLogFile(storage, 
logFile.getPath());
       if (writerSchema == null) {
         // not a data block
@@ -1410,7 +1409,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       }
 
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
-          new HoodieLogFile(pathInfoList.get(0).getPath()), 
HoodieSchema.fromAvroSchema(writerSchema))) {
+          new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
         while (logFileReader.hasNext()) {
           HoodieLogBlock logBlock = logFileReader.next();
           if (logBlock instanceof HoodieDataBlock) {
@@ -3953,7 +3952,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
   private static void verifyMetadataColumnStatsRecords(HoodieStorage storage, 
List<HoodieLogFile> logFiles) throws IOException {
     for (HoodieLogFile logFile : logFiles) {
       List<StoragePathInfo> pathInfoList = 
storage.listDirectEntries(logFile.getPath());
-      Schema writerSchema =
+      HoodieSchema writerSchema =
           TableSchemaResolver.readSchemaFromLogFile(storage, 
logFile.getPath());
       if (writerSchema == null) {
         // not a data block
@@ -3961,7 +3960,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       }
 
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
-          new HoodieLogFile(pathInfoList.get(0).getPath()), 
HoodieSchema.fromAvroSchema(writerSchema))) {
+          new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
         while (logFileReader.hasNext()) {
           HoodieLogBlock logBlock = logFileReader.next();
           if (logBlock instanceof HoodieDataBlock) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 475a5bf55d23..c3888baf7113 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -370,7 +370,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
with SparkDatasetMix
     assertEquals(10, spark.read.format("hudi").load(basePath).count())
     metaClient.reloadActiveTimeline()
     val tableSchemaResolver = new TableSchemaResolver(metaClient)
-    val latestTableSchemaFromCommitMetadata = 
tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false)
+    val latestTableSchemaFromCommitMetadata = 
tableSchemaResolver.getTableSchemaFromLatestCommit(false)
 
     if (failAndDoRollback) {
       val updatesToFail =  dataGen.generateUniqueUpdates("003", 3)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
index e4eb32814875..4593e21839a7 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestTimeTravelQuery.scala
@@ -279,9 +279,9 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase 
with ScalaAssertionS
       .select("id", "name", "value", "version")
       .take(1)(0)
     assertEquals(Row(1, "a1", 10, 1000), result1)
-    val schema1 = tableSchemaResolver.getTableAvroSchema(firstCommit)
-    assertNull(schema1.getField("year"))
-    assertNull(schema1.getField("month"))
+    val schema1 = tableSchemaResolver.getTableSchema(firstCommit)
+    assertTrue(schema1.getField("year").isEmpty)
+    assertTrue(schema1.getField("month").isEmpty)
 
     // Query as of secondCommitTime
     val result2 = spark.read.format("hudi")
@@ -290,9 +290,9 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase 
with ScalaAssertionS
       .select("id", "name", "value", "version", "year")
       .take(1)(0)
     assertEquals(Row(1, "a1", 12, 1001, "2022"), result2)
-    val schema2 = tableSchemaResolver.getTableAvroSchema(secondCommit)
-    assertNotNull(schema2.getField("year"))
-    assertNull(schema2.getField("month"))
+    val schema2 = tableSchemaResolver.getTableSchema(secondCommit)
+    assertTrue(schema2.getField("year").isPresent)
+    assertTrue(schema2.getField("month").isEmpty)
 
     // Query as of thirdCommitTime
     val result3 = spark.read.format("hudi")
@@ -301,9 +301,9 @@ class TestTimeTravelQuery extends HoodieSparkClientTestBase 
with ScalaAssertionS
       .select("id", "name", "value", "version", "year", "month")
       .take(1)(0)
     assertEquals(Row(1, "a1", 13, 1002, "2022", "08"), result3)
-    val schema3 = tableSchemaResolver.getTableAvroSchema(thirdCommit)
-    assertNotNull(schema3.getField("year"))
-    assertNotNull(schema3.getField("month"))
+    val schema3 = tableSchemaResolver.getTableSchema(thirdCommit)
+    assertTrue(schema3.getField("year").isPresent)
+    assertTrue(schema3.getField("month").isPresent)
   }
 
   @ParameterizedTest
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
index 87d5ae5bb666..9a3af922bc2c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
@@ -301,7 +301,7 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
 
   def validateTableSchema(tablePath: String): Unit = {
     val metaClient = createMetaClient(spark, tablePath)
-    val schema = new TableSchemaResolver(metaClient).getTableAvroSchema(false)
+    val schema = new TableSchemaResolver(metaClient).getTableSchema(false)
     assertFalse(schema.getFields.asScala.exists(f => 
HoodieRecord.HOODIE_META_COLUMNS.contains(f.name())),
       "Metadata fields should be excluded from the table schema")
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
index 537a01e9ec80..5d592e213712 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
@@ -735,10 +735,10 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
       .setConf(HoodieTestUtils.getDefaultStorageConf)
       .build()
     val schemaResolver = new TableSchemaResolver(metaClient)
-    val tableSchema = schemaResolver.getTableAvroSchema(false)
+    val tableSchema = schemaResolver.getTableSchema(false)
     val field = tableSchema.getField(fieldName)
-    assertNotNull(field, s"$fieldName field should exist in table schema")
-    val fieldType = field.schema()
+    assertTrue(field.isPresent, s"$fieldName field should exist in table 
schema")
+    val fieldType = field.get().schema()
     assertTrue(
       fieldType.toString.contains(expectedType),
       s"$fieldName field should be of type $expectedType, but got: 
${fieldType.toString}"
diff --git 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
index 0701305707b1..f8dc69285aab 100644
--- 
a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
+++ 
b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveSyncClient.java
@@ -445,10 +445,10 @@ public class HoodieHiveSyncClient extends 
HoodieSyncClient {
   @Override
   public List<FieldSchema> getStorageFieldSchemas() {
     try {
-      return tableSchemaResolver.getTableAvroSchema(false)
+      return tableSchemaResolver.getTableSchema(false)
           .getFields()
           .stream()
-          .map(f -> new FieldSchema(f.name(), f.schema().getType().getName(), 
f.doc()))
+          .map(f -> new FieldSchema(f.name(), 
f.schema().getType().toAvroType().getName(), f.doc()))
           .collect(Collectors.toList());
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to get field schemas from 
storage : ", e);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index ba7c21e0eb59..6dd1211f1d83 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -22,6 +22,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -37,7 +38,6 @@ import 
org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionS
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
-import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -310,7 +310,7 @@ public class HoodieCompactor {
 
   private String getSchemaFromLatestInstant() throws Exception {
     TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
-    Schema schema = schemaUtil.getTableAvroSchema(false);
+    HoodieSchema schema = schemaUtil.getTableSchema(false);
     return schema.toString();
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 02fc43dd37fc..cbda5d4bf07a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1655,8 +1655,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     for (String logFilePathStr : logFilePathSet) {
       HoodieLogFormat.Reader reader = null;
       try {
-        HoodieSchema readerSchema =
-            
HoodieSchema.fromAvroSchema(TableSchemaResolver.readSchemaFromLogFile(storage, 
new StoragePath(logFilePathStr)));
+        HoodieSchema readerSchema = 
TableSchemaResolver.readSchemaFromLogFile(storage, new 
StoragePath(logFilePathStr));
         if (readerSchema == null) {
           LOG.warn("Cannot read schema from log file {}. Skip the check as 
it's likely being written by an inflight instant.", logFilePathStr);
           continue;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 1ee952d9d763..f181c4b44da5 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -76,7 +76,6 @@ import org.apache.hudi.utilities.transform.ChainedTransformer;
 import org.apache.hudi.utilities.transform.ErrorTableAwareChainedTransformer;
 import org.apache.hudi.utilities.transform.Transformer;
 
-import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -646,7 +645,7 @@ public class UtilHelpers {
 
   public static String getSchemaFromLatestInstant(HoodieTableMetaClient 
metaClient) throws Exception {
     TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
-    Schema schema = schemaResolver.getTableAvroSchema(false);
+    HoodieSchema schema = schemaResolver.getTableSchema(false);
     return schema.toString();
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index ab1c73c56c33..95c4b75f1181 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -50,6 +50,9 @@ import org.apache.hudi.common.model.PartialUpdateAvroPayload;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchema.TimePrecision;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -126,7 +129,6 @@ import 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
 import org.apache.hudi.utilities.transform.Transformer;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
@@ -660,11 +662,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
     TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
         HoodieTestUtils.createMetaClient(storage, tableBasePath));
-    Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+    HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false);
     assertNotNull(tableSchema);
 
-    Schema expectedSchema;
-    expectedSchema = new Schema.Parser().parse(fs.open(new Path(basePath + 
"/source_evolved.avsc")));
+    HoodieSchema expectedSchema = HoodieSchema.parse(fs.open(new Path(basePath 
+ "/source_evolved.avsc")));
     assertEquals(expectedSchema, tableSchema);
 
     // clean up and reinit
@@ -698,8 +699,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
     TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
         HoodieTestUtils.createMetaClient(storage, tableBasePath));
-    Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
-    assertEquals("timestamp-millis", 
tableSchema.getField("current_ts").schema().getLogicalType().getName());
+    HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false);
+    Option<HoodieSchemaField> currentTsFieldOpt = 
tableSchema.getField("current_ts");
+    assertTrue(currentTsFieldOpt.isPresent());
+    HoodieSchema.Timestamp currentTsSchema = (HoodieSchema.Timestamp) 
currentTsFieldOpt.get().schema();
+    assertEquals(HoodieSchemaType.TIMESTAMP, currentTsSchema.getType());
+    assertEquals(TimePrecision.MILLIS, currentTsSchema.getPrecision());
     assertEquals(1000, 
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts
 > '1980-01-01'").count());
 
     cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT, 
Collections.singletonList(TestIdentityTransformer.class.getName()),
@@ -718,8 +723,12 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
     tableSchemaResolver = new TableSchemaResolver(
         HoodieTestUtils.createMetaClient(storage, tableBasePath));
-    tableSchema = tableSchemaResolver.getTableAvroSchema(false);
-    assertEquals("timestamp-millis", 
tableSchema.getField("current_ts").schema().getLogicalType().getName());
+    tableSchema = tableSchemaResolver.getTableSchema(false);
+    currentTsFieldOpt = tableSchema.getField("current_ts");
+    assertTrue(currentTsFieldOpt.isPresent());
+    currentTsSchema = (HoodieSchema.Timestamp) 
currentTsFieldOpt.get().schema();
+    assertEquals(HoodieSchemaType.TIMESTAMP, currentTsSchema.getType());
+    assertEquals(TimePrecision.MILLIS, currentTsSchema.getPrecision());
     sqlContext.clearCache();
     assertEquals(1450, 
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts
 > '1980-01-01'").count());
     assertEquals(1450, 
sqlContext.read().options(hudiOpts).format("org.apache.hudi").load(tableBasePath).filter("current_ts
 < '2080-01-01'").count());
@@ -759,7 +768,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       TestHelpers.assertCommitMetadata("00000", tableBasePath, 1);
       TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
           HoodieTestUtils.createMetaClient(storage, tableBasePath));
-      Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+      HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false);
       Map<String, String> hudiOpts = new HashMap<>();
       hudiOpts.put("hoodie.datasource.write.recordkey.field", "id");
       logicalAssertions(tableSchema, tableBasePath, hudiOpts, 
HoodieTableVersion.current().versionCode());
@@ -779,7 +788,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       TestHelpers.assertCommitMetadata("00001", tableBasePath, 2);
       tableSchemaResolver = new TableSchemaResolver(
           HoodieTestUtils.createMetaClient(storage, tableBasePath));
-      tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+      tableSchema = tableSchemaResolver.getTableSchema(false);
       logicalAssertions(tableSchema, tableBasePath, hudiOpts, 
HoodieTableVersion.current().versionCode());
     } finally {
       defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
@@ -824,7 +833,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       TestHelpers.assertCommitMetadata(topicName + ",0:500,1:500", 
tableBasePath, 1);
       TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
           HoodieTestUtils.createMetaClient(storage, tableBasePath));
-      Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+      HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false);
       Map<String, String> hudiOpts = new HashMap<>();
       hudiOpts.put("hoodie.datasource.write.recordkey.field", "id");
       logicalAssertions(tableSchema, tableBasePath, hudiOpts, 
HoodieTableVersion.EIGHT.versionCode());
@@ -840,7 +849,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
       TestHelpers.assertCommitMetadata(topicName + ",0:1000,1:1000", 
tableBasePath, 2);
       tableSchemaResolver = new TableSchemaResolver(
           HoodieTestUtils.createMetaClient(storage, tableBasePath));
-      tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+      tableSchema = tableSchemaResolver.getTableSchema(false);
       logicalAssertions(tableSchema, tableBasePath, hudiOpts, 
HoodieTableVersion.EIGHT.versionCode());
     } finally {
       defaultSchemaProviderClassName = FilebasedSchemaProvider.class.getName();
@@ -882,7 +891,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
       TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
           HoodieTestUtils.createMetaClient(storage, tableBasePath));
-      Schema tableSchema = tableSchemaResolver.getTableAvroSchema(false);
+      HoodieSchema tableSchema = tableSchemaResolver.getTableSchema(false);
       Map<String, String> hudiOpts = new HashMap<>();
       hudiOpts.put("hoodie.datasource.write.recordkey.field", "id");
       logicalAssertions(tableSchema, tableBasePath, hudiOpts, 
version.versionCode());
@@ -908,34 +917,67 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     });
   }
 
-  private void logicalAssertions(Schema tableSchema, String tableBasePath, 
Map<String, String> hudiOpts, int tableVersion) {
+  private void logicalAssertions(HoodieSchema tableSchema, String 
tableBasePath, Map<String, String> hudiOpts, int tableVersion) {
     if (tableVersion > 8) {
-      assertEquals("timestamp-millis", 
tableSchema.getField("ts_millis").schema().getLogicalType().getName());
+      Option<HoodieSchemaField> tsMillisFieldOpt = 
tableSchema.getField("ts_millis");
+      assertTrue(tsMillisFieldOpt.isPresent());
+      HoodieSchema.Timestamp tsMillisFieldSchema = (HoodieSchema.Timestamp) 
tsMillisFieldOpt.get().schema();
+      assertEquals(HoodieSchemaType.TIMESTAMP, tsMillisFieldSchema.getType());
+      assertEquals(TimePrecision.MILLIS, tsMillisFieldSchema.getPrecision());
+      assertTrue(tsMillisFieldSchema.isUtcAdjusted());
     }
-    assertEquals("timestamp-micros", 
tableSchema.getField("ts_micros").schema().getLogicalType().getName());
+    Option<HoodieSchemaField> tsMicrosFieldOpt = 
tableSchema.getField("ts_micros");
+    assertTrue(tsMicrosFieldOpt.isPresent());
+    HoodieSchema.Timestamp tsMicrosFieldSchema = (HoodieSchema.Timestamp) 
tsMicrosFieldOpt.get().schema();
+    assertEquals(HoodieSchemaType.TIMESTAMP, tsMicrosFieldSchema.getType());
+    assertEquals(TimePrecision.MICROS, tsMicrosFieldSchema.getPrecision());
+    assertTrue(tsMicrosFieldSchema.isUtcAdjusted());
     if (tableVersion > 8 && !HoodieSparkUtils.isSpark3_3()) {
-      assertEquals("local-timestamp-millis", 
tableSchema.getField("local_ts_millis").schema().getLogicalType().getName());
-      assertEquals("local-timestamp-micros", 
tableSchema.getField("local_ts_micros").schema().getLogicalType().getName());
+      Option<HoodieSchemaField> localTsMillisFieldOpt = 
tableSchema.getField("local_ts_millis");
+      assertTrue(localTsMillisFieldOpt.isPresent());
+      HoodieSchema.Timestamp localTsMillisFieldSchema = 
(HoodieSchema.Timestamp) localTsMillisFieldOpt.get().schema();
+      assertEquals(HoodieSchemaType.TIMESTAMP, 
localTsMillisFieldSchema.getType());
+      assertEquals(TimePrecision.MILLIS, 
localTsMillisFieldSchema.getPrecision());
+      assertFalse(localTsMillisFieldSchema.isUtcAdjusted());
+
+      Option<HoodieSchemaField> localTsMicrosFieldOpt = 
tableSchema.getField("local_ts_micros");
+      assertTrue(localTsMicrosFieldOpt.isPresent());
+      HoodieSchema.Timestamp localTsMicrosFieldSchema = 
(HoodieSchema.Timestamp) localTsMicrosFieldOpt.get().schema();
+      assertEquals(HoodieSchemaType.TIMESTAMP, 
localTsMicrosFieldSchema.getType());
+      assertEquals(TimePrecision.MICROS, 
localTsMicrosFieldSchema.getPrecision());
+      assertFalse(localTsMicrosFieldSchema.isUtcAdjusted());
     }
-
-    assertEquals("date", 
tableSchema.getField("event_date").schema().getLogicalType().getName());
+    Option<HoodieSchemaField> eventDateFieldOpt = 
tableSchema.getField("event_date");
+    assertTrue(eventDateFieldOpt.isPresent());
+    assertEquals(HoodieSchemaType.DATE, 
eventDateFieldOpt.get().schema().getType());
 
     if (tableVersion > 8) {
-      assertEquals("bytes", 
tableSchema.getField("dec_plain_large").schema().getType().getName());
-      assertEquals("decimal", 
tableSchema.getField("dec_plain_large").schema().getLogicalType().getName());
-      assertEquals(20, ((LogicalTypes.Decimal) 
tableSchema.getField("dec_plain_large").schema().getLogicalType()).getPrecision());
-      assertEquals(10, ((LogicalTypes.Decimal) 
tableSchema.getField("dec_plain_large").schema().getLogicalType()).getScale());
+      Option<HoodieSchemaField> decPlainLargeFieldOpt = 
tableSchema.getField("dec_plain_large");
+      assertTrue(decPlainLargeFieldOpt.isPresent());
+      HoodieSchema.Decimal decPlainLargeSchema = (HoodieSchema.Decimal) 
decPlainLargeFieldOpt.get().schema();
+      // decimal backed by bytes (are not fixed length byte arrays)
+      assertFalse(decPlainLargeSchema.isFixed());
+      assertEquals(HoodieSchemaType.DECIMAL, decPlainLargeSchema.getType());
+      assertEquals(20, decPlainLargeSchema.getPrecision());
+      assertEquals(10, decPlainLargeSchema.getScale());
     }
-    assertEquals("fixed", 
tableSchema.getField("dec_fixed_small").schema().getType().getName());
-    assertEquals(3, 
tableSchema.getField("dec_fixed_small").schema().getFixedSize());
-    assertEquals("decimal", 
tableSchema.getField("dec_fixed_small").schema().getLogicalType().getName());
-    assertEquals(5, ((LogicalTypes.Decimal) 
tableSchema.getField("dec_fixed_small").schema().getLogicalType()).getPrecision());
-    assertEquals(2, ((LogicalTypes.Decimal) 
tableSchema.getField("dec_fixed_small").schema().getLogicalType()).getScale());
-    assertEquals("fixed", 
tableSchema.getField("dec_fixed_large").schema().getType().getName());
-    assertEquals(8, 
tableSchema.getField("dec_fixed_large").schema().getFixedSize());
-    assertEquals("decimal", 
tableSchema.getField("dec_fixed_large").schema().getLogicalType().getName());
-    assertEquals(18, ((LogicalTypes.Decimal) 
tableSchema.getField("dec_fixed_large").schema().getLogicalType()).getPrecision());
-    assertEquals(9, ((LogicalTypes.Decimal) 
tableSchema.getField("dec_fixed_large").schema().getLogicalType()).getScale());
+    Option<HoodieSchemaField> decFixedSmallOpt = 
tableSchema.getField("dec_fixed_small");
+    assertTrue(decFixedSmallOpt.isPresent());
+    HoodieSchema.Decimal decFixedSmallSchema = (HoodieSchema.Decimal) 
decFixedSmallOpt.get().schema();
+    assertTrue(decFixedSmallSchema.isFixed());
+    assertEquals(3, decFixedSmallSchema.getFixedSize());
+    assertEquals(HoodieSchemaType.DECIMAL, decFixedSmallSchema.getType());
+    assertEquals(5, decFixedSmallSchema.getPrecision());
+    assertEquals(2, decFixedSmallSchema.getScale());
+
+    Option<HoodieSchemaField> decFixedLargeOpt = 
tableSchema.getField("dec_fixed_large");
+    assertTrue(decFixedLargeOpt.isPresent());
+    HoodieSchema.Decimal decFixedLargeSchema = (HoodieSchema.Decimal) 
decFixedLargeOpt.get().schema();
+    assertTrue(decFixedLargeSchema.isFixed());
+    assertEquals(8, decFixedLargeSchema.getFixedSize());
+    assertEquals(HoodieSchemaType.DECIMAL, decFixedLargeSchema.getType());
+    assertEquals(18, decFixedLargeSchema.getPrecision());
+    assertEquals(9, decFixedLargeSchema.getScale());
 
     sqlContext.clearCache();
     Dataset<Row> df = sqlContext.read()
@@ -2292,7 +2334,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
         // validate table schema fetches valid schema from last but one commit.
         TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
-        assertNotEquals(tableSchemaResolver.getTableAvroSchema(), 
Schema.create(Schema.Type.NULL).toString());
+        assertNotEquals(tableSchemaResolver.getTableSchema(), 
Schema.create(Schema.Type.NULL).toString());
         // schema from latest commit and last but one commit should match
         compareLatestTwoSchemas(metaClient);
         prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
@@ -2848,7 +2890,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
 
       // validate table schema fetches valid schema from last but one commit.
       TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
-      assertNotEquals(tableSchemaResolver.getTableAvroSchema(), 
Schema.create(Schema.Type.NULL).toString());
+      assertNotEquals(tableSchemaResolver.getTableSchema(), 
Schema.create(Schema.Type.NULL).toString());
       // schema from latest commit and last but one commit should match
       compareLatestTwoSchemas(metaClient);
       prepareParquetDFSSource(useSchemaProvider, hasTransformer, 
"source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
@@ -3201,7 +3243,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     // validate schema is set in commit even if target schema returns null on 
empty batch
     TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
     HoodieInstant secondCommit = 
metaClient.reloadActiveTimeline().lastInstant().get();
-    Schema lastCommitSchema = 
tableSchemaResolver.getTableAvroSchema(secondCommit, true);
+    HoodieSchema lastCommitSchema = 
tableSchemaResolver.getTableSchema(secondCommit, true);
     assertNotEquals(firstCommit, secondCommit);
     assertNotEquals(lastCommitSchema, Schema.create(Schema.Type.NULL));
     deltaStreamer2.shutdownGracefully();
@@ -3741,10 +3783,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(
         HoodieTestUtils.createMetaClient(storage, tableBasePath));
     // get schema from data file written in the latest commit
-    Schema tableSchema = tableSchemaResolver.getTableAvroSchemaFromDataFile();
+    HoodieSchema tableSchema = 
tableSchemaResolver.getTableSchemaFromDataFile();
     assertNotNull(tableSchema);
 
-    List<String> tableFields = 
tableSchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+    List<String> tableFields = 
tableSchema.getFields().stream().map(HoodieSchemaField::name).collect(Collectors.toList());
     // now assert that the partition column is not in the target schema
     assertFalse(tableFields.contains("partition_path"));
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index 828f0936f2f7..e2fc16dc6a2f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -22,6 +22,7 @@ package org.apache.hudi.utilities.deltastreamer;
 import org.apache.hudi.TestHoodieSparkUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -338,8 +339,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     metaClient.reloadActiveTimeline();
     Option<HoodieSchema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
         dsConfig.targetBasePath, metaClient);
-    
assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes()
-        .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING));
+    Option<HoodieSchemaField> riderFieldOpt = 
latestTableSchemaOpt.get().getField("rider");
+    assertTrue(riderFieldOpt.isPresent());
+    assertTrue(riderFieldOpt.get().schema().getTypes()
+        .stream().anyMatch(t -> HoodieSchemaType.STRING == t.getType()));
     
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
 > 0);
   }
 
@@ -413,8 +416,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
       metaClient.reloadActiveTimeline();
       Option<HoodieSchema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
           dsConfig.targetBasePath, metaClient);
-      
assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes()
-          .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING));
+      Option<HoodieSchemaField> riderFieldOpt = 
latestTableSchemaOpt.get().getField("rider");
+      assertTrue(riderFieldOpt.isPresent());
+      assertTrue(riderFieldOpt.get().schema().getTypes()
+          .stream().anyMatch(t -> HoodieSchemaType.STRING == t.getType()));
       
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
 > 0);
     } catch (MissingSchemaFieldException e) {
       assertFalse(allowNullForDeletedCols || targetSchemaSameAsTableSchema);
@@ -493,8 +498,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
       metaClient.reloadActiveTimeline();
       Option<HoodieSchema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
           dsConfig.targetBasePath, metaClient);
-      
assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes()
-          .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING));
+      Option<HoodieSchemaField> riderFieldOpt = 
latestTableSchemaOpt.get().getField("rider");
+      assertTrue(riderFieldOpt.isPresent());
+      assertTrue(riderFieldOpt.get().schema().getTypes()
+          .stream().anyMatch(t -> HoodieSchemaType.STRING == t.getType()));
       
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
 > 0);
     } catch (Exception e) {
       assertTrue(containsErrorMessage(e, "has no default value and is 
non-nullable",
@@ -573,9 +580,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
       metaClient.reloadActiveTimeline();
       Option<HoodieSchema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
           dsConfig.targetBasePath, metaClient);
-      
assertTrue(latestTableSchemaOpt.get().getField("distance_in_meters").get().schema().getTypes()
-              .stream().anyMatch(t -> t.getType() == HoodieSchemaType.DOUBLE),
-          
latestTableSchemaOpt.get().getField("distance_in_meters").get().schema().toString());
+      Option<HoodieSchemaField> distanceInMetersFieldOpt = 
latestTableSchemaOpt.get().getField("distance_in_meters");
+      assertTrue(distanceInMetersFieldOpt.isPresent());
+      assertTrue(distanceInMetersFieldOpt.get().schema().getTypes()
+              .stream().anyMatch(t -> HoodieSchemaType.DOUBLE == t.getType()));
       
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
 > 0);
     } catch (Exception e) {
       assertTrue(targetSchemaSameAsTableSchema);
@@ -661,8 +669,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     metaClient.reloadActiveTimeline();
     Option<HoodieSchema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
         dsConfig.targetBasePath, metaClient);
-    
assertTrue(latestTableSchemaOpt.get().getField("current_ts").get().schema().getTypes()
-        .stream().anyMatch(t -> t.getType() == HoodieSchemaType.LONG));
+    Option<HoodieSchemaField> currentTsFieldOpt = 
latestTableSchemaOpt.get().getField("current_ts");
+    assertTrue(currentTsFieldOpt.isPresent());
+    assertTrue(currentTsFieldOpt.get().schema().getTypes()
+        .stream().anyMatch(t -> HoodieSchemaType.LONG == t.getType()));
     
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
 > 0);
   }
 

Reply via email to