This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch phase-18-HoodieAvroUtils-removal-p2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 04487e48a5ad6b36fcb8f0116c587efb7cefa269
Author: voon <[email protected]>
AuthorDate: Mon Dec 15 20:43:04 2025 +0800

    Remove HoodieAvroUtils from hudi-client-common
---
 .../apache/hudi/cli/commands/TestTableCommand.java |  12 +-
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  10 +-
 .../bootstrap/HoodieBootstrapSchemaProvider.java   |   7 +-
 ...ConcurrentSchemaEvolutionTableSchemaGetter.java |   8 +-
 .../SchemaConflictResolutionStrategy.java          |   5 +-
 .../SimpleSchemaConflictResolutionStrategy.java    |  26 +-
 .../apache/hudi/client/utils/TransactionUtils.java |   6 +-
 .../org/apache/hudi/index/HoodieIndexUtils.java    |  66 ++-
 .../hudi/metadata/HoodieMetadataWriteUtils.java    |  10 +-
 ...ConcurrentSchemaEvolutionTableSchemaGetter.java |  66 +--
 ...TestSimpleSchemaConflictResolutionStrategy.java |  29 +-
 .../apache/hudi/index/TestHoodieIndexUtils.java    | 300 ++++++------
 .../GenericRecordValidationTestUtils.java          |  16 +-
 .../hudi/testutils/HoodieMergeOnReadTestUtils.java |  36 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |   4 -
 .../apache/hudi/common/schema/HoodieSchema.java    |   4 +
 .../HoodieSchemaComparatorForSchemaEvolution.java  | 375 +++++++++++++++
 .../hudi/common/schema/HoodieSchemaUtils.java      |   4 +
 ...stHoodieSchemaComparatorForSchemaEvolution.java | 505 +++++++++++++++++++++
 .../hive/TestHoodieCombineHiveInputFormat.java     |   3 +-
 .../hudi/hadoop/testutils/InputFormatTestUtil.java |   4 +-
 ...DataValidationCheckForLogCompactionActions.java |   4 +-
 .../org/apache/hudi/functional/TestBootstrap.java  |  17 +-
 23 files changed, 1179 insertions(+), 338 deletions(-)

diff --git 
a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java 
b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
index 061ba21f2b07..dce2c7064f3c 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.cli.commands;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
 import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
@@ -27,6 +26,8 @@ import 
org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -36,7 +37,6 @@ import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.jupiter.api.BeforeEach;
@@ -259,10 +259,10 @@ public class TestTableCommand extends 
CLIFunctionalTestHarness {
     assertTrue(ShellEvaluationResultUtil.isSuccess(result));
 
     String actualSchemaStr = 
result.toString().substring(result.toString().indexOf("{"));
-    Schema actualSchema = new Schema.Parser().parse(actualSchemaStr);
+    HoodieSchema actualSchema = HoodieSchema.parse(actualSchemaStr);
 
-    Schema expectedSchema = new Schema.Parser().parse(schemaStr);
-    expectedSchema = HoodieAvroUtils.addMetadataFields(expectedSchema);
+    HoodieSchema expectedSchema = HoodieSchema.parse(schemaStr);
+    expectedSchema = HoodieSchemaUtils.addMetadataFields(expectedSchema);
     assertEquals(actualSchema, expectedSchema);
 
     File file = File.createTempFile("temp", null);
@@ -270,7 +270,7 @@ public class TestTableCommand extends 
CLIFunctionalTestHarness {
     assertTrue(ShellEvaluationResultUtil.isSuccess(result));
 
     actualSchemaStr = getFileContent(file.getAbsolutePath());
-    actualSchema = new Schema.Parser().parse(actualSchemaStr);
+    actualSchema = HoodieSchema.parse(actualSchemaStr);
     assertEquals(actualSchema, expectedSchema);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 86efd7db1e08..4c4a1347903e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.client;
 
 import org.apache.hudi.avro.AvroSchemaUtils;
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
 import org.apache.hudi.avro.model.HoodieIndexPlan;
@@ -46,6 +45,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -341,15 +341,15 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     FileBasedInternalSchemaStorageManager schemasManager = new 
FileBasedInternalSchemaStorageManager(table.getMetaClient());
     if (!historySchemaStr.isEmpty() || 
Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key())))
 {
       InternalSchema internalSchema;
-      Schema avroSchema = 
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), 
config.allowOperationMetadataField());
+      HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(config.getSchema()), 
config.allowOperationMetadataField());
       if (historySchemaStr.isEmpty()) {
-        internalSchema = 
SerDeHelper.fromJson(config.getInternalSchema()).orElseGet(() -> 
InternalSchemaConverter.convert(HoodieSchema.fromAvroSchema(avroSchema)));
+        internalSchema = 
SerDeHelper.fromJson(config.getInternalSchema()).orElseGet(() -> 
InternalSchemaConverter.convert(schema));
         internalSchema.setSchemaId(Long.parseLong(instantTime));
       } else {
         internalSchema = 
InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
             SerDeHelper.parseSchemas(historySchemaStr));
       }
-      InternalSchema evolvedSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema, 
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
+      InternalSchema evolvedSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(schema.toAvroSchema(), internalSchema, 
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
       if (evolvedSchema.equals(internalSchema)) {
         metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, 
SerDeHelper.toJson(evolvedSchema));
         //TODO save history schema by metaTable
@@ -361,7 +361,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
         schemasManager.persistHistorySchemaStr(instantTime, 
SerDeHelper.inheritSchemas(evolvedSchema, historySchemaStr));
       }
       // update SCHEMA_KEY
-      metadata.addMetadata(SCHEMA_KEY, 
InternalSchemaConverter.convert(evolvedSchema, 
avroSchema.getFullName()).toString());
+      metadata.addMetadata(SCHEMA_KEY, 
InternalSchemaConverter.convert(evolvedSchema, 
schema.getFullName()).toString());
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java
index ce4c42ba1665..99b2dbad1be7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java
@@ -18,9 +18,10 @@
 
 package org.apache.hudi.client.bootstrap;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 
@@ -48,8 +49,8 @@ public abstract class HoodieBootstrapSchemaProvider {
   public final Schema getBootstrapSchema(HoodieEngineContext context, 
List<Pair<String, List<HoodieFileStatus>>> partitions) {
     if (writeConfig.getSchema() != null) {
       // Use schema specified by user if set
-      Schema userSchema = new Schema.Parser().parse(writeConfig.getSchema());
-      if (!HoodieAvroUtils.getNullSchema().equals(userSchema)) {
+      HoodieSchema userSchema = HoodieSchema.parse(writeConfig.getSchema());
+      if (!HoodieSchema.create(HoodieSchemaType.NULL).equals(userSchema)) {
         return userSchema;
       }
     }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
index ab9c0ce426b7..0e5feab55e11 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
@@ -89,12 +89,16 @@ class ConcurrentSchemaEvolutionTableSchemaGetter {
     return schema;
   }
 
-  public Option<Schema> getTableAvroSchemaIfPresent(boolean 
includeMetadataFields, Option<HoodieInstant> instant) {
+  public Option<HoodieSchema> getTableSchemaIfPresent(boolean 
includeMetadataFields, Option<HoodieInstant> instant) {
     return getTableAvroSchemaFromTimelineWithCache(instant) // Get table 
schema from schema evolution timeline.
         .map(HoodieSchema::fromAvroSchema)
         .or(this::getTableCreateSchemaWithoutMetaField) // Fall back: read 
create schema from table config.
         .map(tableSchema -> includeMetadataFields ? 
HoodieSchemaUtils.addMetadataFields(tableSchema, false) : 
HoodieSchemaUtils.removeMetadataFields(tableSchema))
-        .map(this::handlePartitionColumnsIfNeeded)
+        .map(this::handlePartitionColumnsIfNeeded);
+  }
+
+  public Option<Schema> getTableAvroSchemaIfPresent(boolean 
includeMetadataFields, Option<HoodieInstant> instant) {
+    return getTableSchemaIfPresent(includeMetadataFields, instant)
         .map(HoodieSchema::toAvroSchema);
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SchemaConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SchemaConflictResolutionStrategy.java
index 36845c688c78..2dd39fbd0589 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SchemaConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SchemaConflictResolutionStrategy.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.transaction;
 import org.apache.hudi.ApiMaturityLevel;
 import org.apache.hudi.PublicAPIClass;
 import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -50,14 +51,14 @@ public interface SchemaConflictResolutionStrategy {
    * @throws HoodieWriteConflictException if schema conflicts cannot be 
resolved.
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  Option<Schema> resolveConcurrentSchemaEvolution(
+  Option<HoodieSchema> resolveConcurrentSchemaEvolution(
       HoodieTable table,
       HoodieWriteConfig config,
       Option<HoodieInstant> lastCompletedTxnOwnerInstant,
       Option<HoodieInstant> currTxnOwnerInstant);
 
   static void throwConcurrentSchemaEvolutionException(
-      Option<Schema> tableSchemaAtTxnStart, Option<Schema> 
tableSchemaAtTxnValidation, Schema writerSchemaOfTxn,
+      Option<HoodieSchema> tableSchemaAtTxnStart, Option<HoodieSchema> 
tableSchemaAtTxnValidation, HoodieSchema writerSchemaOfTxn,
       Option<HoodieInstant> lastCompletedTxnOwnerInstant,
       Option<HoodieInstant> currTxnOwnerInstant) throws 
HoodieWriteConflictException {
     String errMsg = String.format(
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java
index 688f14c38d42..5f72ce11bdf2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java
@@ -18,7 +18,8 @@
 
 package org.apache.hudi.client.transaction;
 
-import org.apache.hudi.avro.AvroSchemaComparatorForSchemaEvolution;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaComparatorForSchemaEvolution;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
@@ -28,11 +29,9 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.HoodieTable;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 
 import java.util.stream.Stream;
 
-import static org.apache.hudi.avro.HoodieAvroUtils.isSchemaNull;
 import static 
org.apache.hudi.client.transaction.SchemaConflictResolutionStrategy.throwConcurrentSchemaEvolutionException;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
 import static 
org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
@@ -46,7 +45,7 @@ import static 
org.apache.hudi.common.table.timeline.InstantComparison.compareTim
 public class SimpleSchemaConflictResolutionStrategy implements 
SchemaConflictResolutionStrategy {
 
   @Override
-  public Option<Schema> resolveConcurrentSchemaEvolution(
+  public Option<HoodieSchema> resolveConcurrentSchemaEvolution(
       HoodieTable table,
       HoodieWriteConfig config,
       Option<HoodieInstant> lastCompletedTxnOwnerInstant,
@@ -66,10 +65,10 @@ public class SimpleSchemaConflictResolutionStrategy 
implements SchemaConflictRes
       return Option.empty();
     }
 
-    Schema writerSchemaOfTxn = new 
Schema.Parser().parse(config.getWriteSchema());
+    HoodieSchema writerSchemaOfTxn = 
HoodieSchema.parse(config.getWriteSchema());
     // If a writer does not come with a meaningful schema, skip the schema 
resolution.
     ConcurrentSchemaEvolutionTableSchemaGetter schemaResolver = new 
ConcurrentSchemaEvolutionTableSchemaGetter(table.getMetaClient());
-    if (isSchemaNull(writerSchemaOfTxn)) {
+    if (writerSchemaOfTxn.isSchemaNull()) {
       return getTableSchemaAtInstant(schemaResolver, 
currTxnOwnerInstant.get());
     }
 
@@ -98,14 +97,14 @@ public class SimpleSchemaConflictResolutionStrategy 
implements SchemaConflictRes
       return Option.of(writerSchemaOfTxn);
     }
 
-    Option<Schema> tableSchemaAtTxnValidation = 
getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnValidation);
+    Option<HoodieSchema> tableSchemaAtTxnValidation = 
getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnValidation);
     // If table schema is not defined, it's still case 1. There can be cases 
where there are commits but they didn't
     // write any data.
     if (!tableSchemaAtTxnValidation.isPresent()) {
       return Option.of(writerSchemaOfTxn);
     }
     // Case 2, 4, 7: Both writers try to evolve to the same schema or neither 
evolves schema.
-    boolean writerSchemaIsCurrentTableSchema = 
AvroSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, 
tableSchemaAtTxnValidation.get());
+    boolean writerSchemaIsCurrentTableSchema = 
HoodieSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, 
tableSchemaAtTxnValidation.get());
     if (writerSchemaIsCurrentTableSchema) {
       return Option.of(writerSchemaOfTxn);
     }
@@ -122,7 +121,7 @@ public class SimpleSchemaConflictResolutionStrategy 
implements SchemaConflictRes
       throwConcurrentSchemaEvolutionException(
           Option.empty(), tableSchemaAtTxnValidation, writerSchemaOfTxn, 
lastCompletedTxnOwnerInstant, currTxnOwnerInstant);
     }
-    Option<Schema> tableSchemaAtTxnStart = 
getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnStart);
+    Option<HoodieSchema> tableSchemaAtTxnStart = 
getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnStart);
     // If no table schema is defined, fall back to case 3.
     if (!tableSchemaAtTxnStart.isPresent()) {
       throwConcurrentSchemaEvolutionException(
@@ -132,13 +131,13 @@ public class SimpleSchemaConflictResolutionStrategy 
implements SchemaConflictRes
     // Case 5:
     // Table schema has not changed from the start of the transaction till the 
pre-commit validation
     // If table schema parsing failed we will blindly go with writer schema. 
use option.empty
-    if 
(AvroSchemaComparatorForSchemaEvolution.schemaEquals(tableSchemaAtTxnStart.get(),
 tableSchemaAtTxnValidation.get())) {
+    if 
(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(tableSchemaAtTxnStart.get(),
 tableSchemaAtTxnValidation.get())) {
       return Option.of(writerSchemaOfTxn);
     }
 
     // Case 6: Current txn does not evolve schema, the tableSchema we saw at 
validation phase
     // might be an evolved one, use it.
-    if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, 
tableSchemaAtTxnStart.get())) {
+    if 
(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, 
tableSchemaAtTxnStart.get())) {
       return tableSchemaAtTxnValidation;
     }
 
@@ -164,9 +163,10 @@ public class SimpleSchemaConflictResolutionStrategy 
implements SchemaConflictRes
         .findFirst());
   }
 
-  private static Option<Schema> 
getTableSchemaAtInstant(ConcurrentSchemaEvolutionTableSchemaGetter 
schemaResolver, HoodieInstant instant) {
+  private static Option<HoodieSchema> 
getTableSchemaAtInstant(ConcurrentSchemaEvolutionTableSchemaGetter 
schemaResolver, HoodieInstant instant) {
     try {
-      return schemaResolver.getTableAvroSchemaIfPresent(false, 
Option.of(instant));
+      return schemaResolver.getTableAvroSchemaIfPresent(false, 
Option.of(instant))
+          .map(HoodieSchema::fromAvroSchema);
     } catch (Exception ex) {
       log.error("Cannot get table schema for instant {}", instant);
       throw new HoodieException("Unable to get table schema", ex);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
index 8e26add42992..e8cf8e7f9d28 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
@@ -23,6 +23,7 @@ import 
org.apache.hudi.client.transaction.ConflictResolutionStrategy;
 import 
org.apache.hudi.client.transaction.SimpleSchemaConflictResolutionStrategy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -36,7 +37,6 @@ import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.table.HoodieTable;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 
 import java.io.IOException;
 import java.util.Map;
@@ -78,7 +78,7 @@ public class TransactionUtils {
       Stream<HoodieInstant> completedInstantsDuringCurrentWriteOperation =
           
getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(), 
pendingInstants);
       ConflictResolutionStrategy resolutionStrategy = 
config.getWriteConflictResolutionStrategy();
-      Option<Schema> newTableSchema = resolveSchemaConflictIfNeeded(table, 
config, lastCompletedTxnOwnerInstant, currentTxnOwnerInstant);
+      Option<HoodieSchema> newTableSchema = 
resolveSchemaConflictIfNeeded(table, config, lastCompletedTxnOwnerInstant, 
currentTxnOwnerInstant);
 
       Stream<HoodieInstant> instantStream = 
Stream.concat(resolutionStrategy.getCandidateInstants(
               table.getMetaClient(), currentTxnOwnerInstant.get(), 
lastCompletedTxnOwnerInstant),
@@ -117,7 +117,7 @@ public class TransactionUtils {
    * @param currentTxnOwnerInstant       current instant
    * @return new table schema after successful schema resolution; empty if 
nothing to be resolved.
    */
-  public static Option<Schema> resolveSchemaConflictIfNeeded(final HoodieTable 
table,
+  public static Option<HoodieSchema> resolveSchemaConflictIfNeeded(final 
HoodieTable table,
                                                              final 
HoodieWriteConfig config,
                                                              final 
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
                                                              final 
Option<HoodieInstant> currentTxnOwnerInstant) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index dca188c8ed02..55e2b605ba94 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -40,6 +40,8 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.MetadataValues;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaCache;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -136,10 +138,10 @@ public class HoodieIndexUtils {
    * @param tableSchema  table schema
    * @return true if each field's data type are supported for secondary index, 
false otherwise
    */
-  static boolean validateDataTypeForSecondaryIndex(List<String> sourceFields, 
Schema tableSchema) {
+  static boolean validateDataTypeForSecondaryIndex(List<String> sourceFields, 
HoodieSchema tableSchema) {
     return sourceFields.stream().allMatch(fieldToIndex -> {
-      Schema schema = getNestedFieldSchemaFromWriteSchema(tableSchema, 
fieldToIndex);
-      return isSecondaryIndexSupportedType(schema);
+      Option<Pair<String, HoodieSchemaField>> schema = 
HoodieSchemaUtils.getNestedField(tableSchema, fieldToIndex);
+      return isSecondaryIndexSupportedType(schema.get().getRight().schema());
     });
   }
 
@@ -150,10 +152,11 @@ public class HoodieIndexUtils {
    * @param tableSchema  table schema
    * @return true if each field's data types are supported, false otherwise
    */
-  public static boolean 
validateDataTypeForSecondaryOrExpressionIndex(List<String> sourceFields, Schema 
tableSchema) {
+  public static boolean 
validateDataTypeForSecondaryOrExpressionIndex(List<String> sourceFields, 
HoodieSchema tableSchema) {
     return sourceFields.stream().anyMatch(fieldToIndex -> {
-      Schema schema = getNestedFieldSchemaFromWriteSchema(tableSchema, 
fieldToIndex);
-      return schema.getType() != Schema.Type.RECORD && schema.getType() != 
Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP;
+      Option<Pair<String, HoodieSchemaField>> nestedFieldOpt = 
HoodieSchemaUtils.getNestedField(tableSchema, fieldToIndex);
+      HoodieSchema fieldSchema = nestedFieldOpt.get().getRight().schema();
+      return fieldSchema.getType() != HoodieSchemaType.RECORD && 
fieldSchema.getType() != HoodieSchemaType.ARRAY && fieldSchema.getType() != 
HoodieSchemaType.MAP;
     });
   }
 
@@ -161,39 +164,26 @@ public class HoodieIndexUtils {
    * Check if the given schema type is supported for secondary index.
    * Supported types are: String (including CHAR), Integer types (Int, BigInt, 
Long, Short), and timestamp
    */
-  private static boolean isSecondaryIndexSupportedType(Schema schema) {
+  private static boolean isSecondaryIndexSupportedType(HoodieSchema schema) {
     // Handle union types (nullable fields)
-    if (schema.getType() == Schema.Type.UNION) {
+    if (schema.getType() == HoodieSchemaType.UNION) {
       // For union types, check if any of the types is supported
       return schema.getTypes().stream()
-          .anyMatch(s -> s.getType() != Schema.Type.NULL && 
isSecondaryIndexSupportedType(s));
+          .anyMatch(s -> s.getType() != HoodieSchemaType.NULL && 
isSecondaryIndexSupportedType(s));
     }
 
     // Check basic types
     switch (schema.getType()) {
       case STRING:
-        // STRING type can have UUID logical type which we don't support
-        return schema.getLogicalType() == null; // UUID and other string-based 
logical types are not supported
-      // Regular STRING (includes CHAR)
       case INT:
-        // INT type can represent regular integers or dates/times with logical 
types
-        if (schema.getLogicalType() != null) {
-          // Support date and time-millis logical types
-          return schema.getLogicalType() == LogicalTypes.date()
-              || schema.getLogicalType() == LogicalTypes.timeMillis();
-        }
-        return true; // Regular INT
       case LONG:
-        // LONG type can represent regular longs or timestamps with logical 
types
-        if (schema.getLogicalType() != null) {
-          // Support timestamp logical types
-          return schema.getLogicalType() == LogicalTypes.timestampMillis()
-              || schema.getLogicalType() == LogicalTypes.timestampMicros()
-              || schema.getLogicalType() == LogicalTypes.timeMicros();
-        }
-        return true; // Regular LONG
       case DOUBLE:
-        return true; // Support DOUBLE type
+      case DATE:
+      case TIME:
+        return true;
+      case TIMESTAMP:
+        // LOCAL timestamps are not supported
+        return ((HoodieSchema.Timestamp) schema).isUtcAdjusted();
       default:
         return false;
     }
@@ -721,40 +711,36 @@ public class HoodieIndexUtils {
                                                                Map<String, 
String> options,
                                                                Map<String, 
Map<String, String>> columns,
                                                                String 
userIndexName) throws Exception {
-    Schema tableSchema = new 
TableSchemaResolver(metaClient).getTableAvroSchema();
+    HoodieSchema tableSchema = new 
TableSchemaResolver(metaClient).getTableSchema();
     List<String> sourceFields = new ArrayList<>(columns.keySet());
     String columnName = sourceFields.get(0); // We know there's only one 
column from the check above
 
     // First check if the field exists
-    try {
-      getNestedFieldSchemaFromWriteSchema(tableSchema, columnName);
-    } catch (Exception e) {
+    Option<Pair<String, HoodieSchemaField>> fieldSchemaOpt = 
HoodieSchemaUtils.getNestedField(tableSchema, columnName);
+    if (fieldSchemaOpt.isEmpty()) {
       throw new HoodieMetadataIndexException(String.format(
           "Cannot create %s index '%s': Column '%s' does not exist in the 
table schema. "
-          + "Please verify the column name and ensure it exists in the table.",
+              + "Please verify the column name and ensure it exists in the 
table.",
           indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ? "secondary" : 
"expression",
           userIndexName, columnName));
     }
 
+    Pair<String, HoodieSchemaField> fieldSchema = fieldSchemaOpt.get();
+
     // Check for complex types (RECORD, ARRAY, MAP) - not supported for any 
index type
     if (!validateDataTypeForSecondaryOrExpressionIndex(sourceFields, 
tableSchema)) {
-      Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(tableSchema, 
columnName);
       throw new HoodieMetadataIndexException(String.format(
           "Cannot create %s index '%s': Column '%s' has unsupported data type 
'%s'. "
           + "Complex types (RECORD, ARRAY, MAP) are not supported for 
indexing. "
           + "Please choose a column with a primitive data type.",
           indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ? "secondary" : 
"expression",
-          userIndexName, columnName, fieldSchema.getType()));
+          userIndexName, columnName, 
fieldSchema.getRight().schema().getType()));
     }
 
     // For secondary index, apply stricter data type validation
     if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX)) {
       if (!validateDataTypeForSecondaryIndex(sourceFields, tableSchema)) {
-        Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(tableSchema, 
columnName);
-        String actualType = fieldSchema.getType().toString();
-        if (fieldSchema.getLogicalType() != null) {
-          actualType += " with logical type " + fieldSchema.getLogicalType();
-        }
+        String actualType = 
fieldSchema.getRight().schema().getType().toString();
 
         throw new HoodieMetadataIndexException(String.format(
             "Cannot create secondary index '%s': Column '%s' has unsupported 
data type '%s'. "
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index c970e0d1361c..5c138ec3cc81 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -46,6 +46,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -79,7 +80,6 @@ import 
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy
 import org.apache.hudi.util.Lazy;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -95,7 +95,6 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.toList;
-import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ASYNC_CLEAN;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_CLEANER_COMMITS_RETAINED;
 import static 
org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
@@ -410,14 +409,15 @@ public class HoodieMetadataWriteUtils {
                                                                                
HoodieTableMetadata tableMetadata, HoodieMetadataConfig metadataConfig,
                                                                                
Option<HoodieRecord.HoodieRecordType> recordTypeOpt, boolean isDeletePartition) 
{
     try {
-      Option<Schema> writerSchema =
+      Option<HoodieSchema> writerSchema =
           
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
               .flatMap(writerSchemaStr ->
                   isNullOrEmpty(writerSchemaStr)
                       ? Option.empty()
-                      : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+                      : Option.of(HoodieSchema.parse(writerSchemaStr)));
       HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
-      Option<HoodieSchema> tableSchema = writerSchema.map(schema -> 
tableConfig.populateMetaFields() ? addMetadataFields(schema) : 
schema).map(HoodieSchema::fromAvroSchema);
+      Option<HoodieSchema> tableSchema = writerSchema.map(schema -> 
tableConfig.populateMetaFields() ? HoodieSchemaUtils.addMetadataFields(schema) 
: schema);
+
       if (tableSchema.isEmpty()) {
         return engineContext.emptyHoodieData();
       }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java
index 4e02d74d1983..7317cd13050a 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java
@@ -32,6 +32,8 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import 
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler;
@@ -41,7 +43,6 @@ import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 
-import org.apache.avro.Schema;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -57,7 +58,6 @@ import java.util.HashMap;
 import java.util.Properties;
 import java.util.stream.Stream;
 
-import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLUSTERING_ACTION;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
@@ -122,10 +122,10 @@ public class 
TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon
       + "    
{\"name\":\"partitionColumn\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}"
       + "  ]\n"
       + "}";
-  private static Schema SCHEMA_WITHOUT_METADATA2 = new 
Schema.Parser().parse(SCHEMA_WITHOUT_METADATA_STR2);
-  private static Schema SCHEMA_WITHOUT_METADATA = new 
Schema.Parser().parse(SCHEMA_WITHOUT_METADATA_STR);
-  private static Schema SCHEMA_WITH_METADATA = 
addMetadataFields(SCHEMA_WITHOUT_METADATA, false);
-  private static Schema SCHEMA_WITH_PARTITION_COLUMN = new 
Schema.Parser().parse(SCHEMA_WITH_PARTITION_COLUMN_STR);
+  private static HoodieSchema SCHEMA_WITHOUT_METADATA2 = 
HoodieSchema.parse(SCHEMA_WITHOUT_METADATA_STR2);
+  private static HoodieSchema SCHEMA_WITHOUT_METADATA = 
HoodieSchema.parse(SCHEMA_WITHOUT_METADATA_STR);
+  private static HoodieSchema SCHEMA_WITH_METADATA = 
HoodieSchemaUtils.addMetadataFields(SCHEMA_WITHOUT_METADATA);
+  private static HoodieSchema SCHEMA_WITH_PARTITION_COLUMN = 
HoodieSchema.parse(SCHEMA_WITH_PARTITION_COLUMN_STR);
 
   @BeforeEach
   public void setUp() throws Exception {
@@ -195,7 +195,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     metaClient.reloadActiveTimeline();
 
     ConcurrentSchemaEvolutionTableSchemaGetter resolver = new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient);
-    Option<Schema> schemaOption = resolver.getTableAvroSchemaIfPresent(false, 
Option.empty());
+    Option<HoodieSchema> schemaOption = 
resolver.getTableSchemaIfPresent(false, Option.empty());
     assertTrue(schemaOption.isPresent());
     assertEquals(SCHEMA_WITHOUT_METADATA, schemaOption.get());
   }
@@ -235,7 +235,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     metaClient.reloadActiveTimeline();
 
     ConcurrentSchemaEvolutionTableSchemaGetter resolver = new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient);
-    Option<Schema> schemaOption = resolver.getTableAvroSchemaIfPresent(false, 
Option.empty());
+    Option<HoodieSchema> schemaOption = 
resolver.getTableSchemaIfPresent(false, Option.empty());
     assertTrue(schemaOption.isPresent());
     assertEquals(SCHEMA_WITHOUT_METADATA, schemaOption.get());
   }
@@ -276,7 +276,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     metaClient.reloadActiveTimeline();
 
     ConcurrentSchemaEvolutionTableSchemaGetter resolver = new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient);
-    Option<Schema> schemaOption = resolver.getTableAvroSchemaIfPresent(true, 
Option.empty());
+    Option<HoodieSchema> schemaOption = resolver.getTableSchemaIfPresent(true, 
Option.empty());
     assertFalse(schemaOption.isPresent());
   }
 
@@ -296,7 +296,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     metaClient.reloadActiveTimeline();
 
     ConcurrentSchemaEvolutionTableSchemaGetter resolver = new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient);
-    Option<Schema> schemaOption = resolver.getTableAvroSchemaIfPresent(true, 
Option.empty());
+    Option<HoodieSchema> schemaOption = resolver.getTableSchemaIfPresent(true, 
Option.empty());
     assertTrue(schemaOption.isEmpty());
   }
 
@@ -383,7 +383,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
 
   @ParameterizedTest
   @MethodSource("schemaTestParams")
-  void testGetTableAvroSchema(Schema inputSchema, boolean 
includeMetadataFields, Schema expectedSchema) throws Exception {
+  void testGetTableAvroSchema(HoodieSchema inputSchema, boolean 
includeMetadataFields, HoodieSchema expectedSchema) throws Exception {
     metaClient = 
HoodieTestUtils.getMetaClientBuilder(HoodieTableType.COPY_ON_WRITE, new 
Properties(),"")
         .setTableCreateSchema(SCHEMA_WITH_METADATA.toString())
         .initTable(getDefaultStorageConf(), basePath);
@@ -397,9 +397,9 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
         inputSchema.toString(),
         COMMIT_ACTION)));
 
-    assertEquals(expectedSchema, new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent(includeMetadataFields,
 Option.empty()).get());
+    assertEquals(expectedSchema, new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent(includeMetadataFields,
 Option.empty()).get());
     HoodieInstant instant = 
metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
 COMMIT_ACTION, "0010", "0011");
-    assertEquals(expectedSchema, new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent(
+    assertEquals(expectedSchema, new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent(
         includeMetadataFields, Option.of(instant)).get());
   }
 
@@ -412,7 +412,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
 
   @ParameterizedTest
   @MethodSource("partitionColumnSchemaTestParams")
-  void testGetTableAvroSchemaAppendPartitionColumn(boolean 
shouldIncludePartitionColumns, Schema expectedSchema) throws Exception {
+  void testGetTableSchemaAppendPartitionColumn(boolean 
shouldIncludePartitionColumns, HoodieSchema expectedSchema) throws Exception {
     metaClient = 
HoodieTestUtils.getMetaClientBuilder(HoodieTableType.COPY_ON_WRITE, new 
Properties(),"")
         .setPartitionFields("partitionColumn")
         .setShouldDropPartitionColumns(shouldIncludePartitionColumns)
@@ -427,8 +427,8 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
         SCHEMA_WITHOUT_METADATA.toString(),
         COMMIT_ACTION)));
 
-    assertEquals(expectedSchema, new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent(false,
 Option.empty()).get());
-    assertEquals(expectedSchema, new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent(
+    assertEquals(expectedSchema, new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent(false,
 Option.empty()).get());
+    assertEquals(expectedSchema, new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent(
         false, Option.of(metaClient.getInstantGenerator().createNewInstant(
             HoodieInstant.State.COMPLETED, COMMIT_ACTION, "0010", 
"0011"))).get());
   }
@@ -442,15 +442,15 @@ public class 
TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon
 
   @ParameterizedTest
   @MethodSource("createSchemaTestParam")
-  void testGetTableCreateAvroSchema(boolean includeMetadataFields, Schema 
expectedSchema) throws Exception {
+  void testGetTableCreateAvroSchema(boolean includeMetadataFields, 
HoodieSchema expectedSchema) throws Exception {
     metaClient = 
HoodieTestUtils.getMetaClientBuilder(HoodieTableType.COPY_ON_WRITE, new 
Properties(),"")
         .setTableCreateSchema(SCHEMA_WITH_METADATA.toString())
         .initTable(getDefaultStorageConf(), basePath);
     testTable = HoodieTestTable.of(metaClient);
 
-    assertEquals(expectedSchema, new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent(includeMetadataFields,
 Option.empty()).get());
+    assertEquals(expectedSchema, new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent(includeMetadataFields,
 Option.empty()).get());
     // getTableAvroSchemaFromLatestCommit only cares about active timeline, 
since it is empty, no schema is returned.
-    assertEquals(expectedSchema, new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent(
+    assertEquals(expectedSchema, new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent(
         includeMetadataFields, 
Option.of(metaClient.getInstantGenerator().createNewInstant(
             HoodieInstant.State.COMPLETED, COMMIT_ACTION, "0010", 
"0011"))).get());
   }
@@ -467,10 +467,10 @@ public class 
TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon
     metaClient.reloadActiveTimeline();
 
     ConcurrentSchemaEvolutionTableSchemaGetter resolver = new 
ConcurrentSchemaEvolutionTableSchemaGetter(metaClient);
-    Option<Schema> schemaOption = resolver.getTableAvroSchemaIfPresent(true, 
Option.empty());
+    Option<HoodieSchema> schemaOption = resolver.getTableSchemaIfPresent(true, 
Option.empty());
 
     assertTrue(schemaOption.isPresent());
-    Schema resultSchema = schemaOption.get();
+    HoodieSchema resultSchema = schemaOption.get();
     assertTrue(resultSchema.getFields().stream()
         .anyMatch(f -> f.name().equals("partition_path")));
   }
@@ -481,8 +481,8 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     initMetaClient(false, tableType);
     testTable = HoodieTestTable.of(metaClient);
 
-    Schema schema1 = new 
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
-    Schema schema2 = new Schema.Parser().parse(TRIP_SCHEMA);
+    HoodieSchema schema1 = 
HoodieSchema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+    HoodieSchema schema2 = HoodieSchema.parse(TRIP_SCHEMA);
 
     // Create two commits with different schemas
     int startCommitTime = 10;
@@ -514,7 +514,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
 
     // Test getting schema from first instant
     String timestamp1 = padWithLeadingZeros(Integer.toString(10), 
REQUEST_TIME_LENGTH);
-    Option<Schema> schema1Option = resolver.getTableAvroSchemaIfPresent(
+    Option<HoodieSchema> schema1Option = resolver.getTableSchemaIfPresent(
         false,
         
Option.of(metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
 COMMIT_ACTION, timestamp1, incTimestampStrByOne(timestamp1))));
     assertTrue(schema1Option.isPresent());
@@ -522,7 +522,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
 
     // Test getting schema from second instant
     String timestamp2 = padWithLeadingZeros(Integer.toString(20), 
REQUEST_TIME_LENGTH);
-    Option<Schema> schema2Option = resolver.getTableAvroSchemaIfPresent(
+    Option<HoodieSchema> schema2Option = resolver.getTableSchemaIfPresent(
         false,
         
Option.of(metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
 COMMIT_ACTION, timestamp2, incTimestampStrByOne(timestamp2))));
     assertTrue(schema2Option.isPresent());
@@ -534,7 +534,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
 
     for (Integer i = startCommitTime + 10; i <= endCommitTime + 10; i += 10) {
       String timestampI = padWithLeadingZeros(Integer.toString(i), 
REQUEST_TIME_LENGTH);
-      schema2Option = resolver.getTableAvroSchemaIfPresent(false,
+      schema2Option = resolver.getTableSchemaIfPresent(false,
           
Option.of(metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED,
 COMMIT_ACTION, timestampI, incTimestampStrByOne(timestampI))));
       assertTrue(schema2Option.isPresent(), i::toString);
       assertEquals(schema2.toString(), schema2Option.get().toString());
@@ -548,8 +548,8 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     testTable = HoodieTestTable.of(metaClient);
 
     // Create test schema
-    Schema schema1 = new 
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
-    Schema schema2 = new 
Schema.Parser().parse(HoodieTestDataGenerator.SHORT_TRIP_SCHEMA);
+    HoodieSchema schema1 = 
HoodieSchema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+    HoodieSchema schema2 = 
HoodieSchema.parse(HoodieTestDataGenerator.SHORT_TRIP_SCHEMA);
 
     // Create a commit with schema1
     String commitTime1 = "0010";
@@ -578,7 +578,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     HoodieInstant instant1 = 
metaClient.getCommitsTimeline().filterCompletedInstants().nthInstant(0).get();
 
     // Case 1: First call with empty instant - should fetch from timeline and 
cache
-    Option<Schema> schemaOption1 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.empty());
+    Option<HoodieSchema> schemaOption1 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.empty()).map(HoodieSchema::fromAvroSchema);
     assertTrue(schemaOption1.isPresent());
     assertEquals(schema2, schemaOption1.get());
 
@@ -586,7 +586,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     verify(resolver, 
times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any());
 
     // Case 2: Second call with empty instant - should use cache
-    Option<Schema> schemaOption2 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.empty());
+    Option<HoodieSchema> schemaOption2 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.empty()).map(HoodieSchema::fromAvroSchema);
     assertTrue(schemaOption2.isPresent());
     assertEquals(schema2, schemaOption2.get());
 
@@ -594,7 +594,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     verify(resolver, 
times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any());
 
     // Case 3: Call with the latest valid instant - there should be a cache hit
-    Option<Schema> schemaOption3 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(instant2));
+    Option<HoodieSchema> schemaOption3 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(instant2)).map(HoodieSchema::fromAvroSchema);
     assertTrue(schemaOption3.isPresent());
     assertEquals(schema2, schemaOption3.get());
 
@@ -602,7 +602,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     verify(resolver, 
times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any());
 
     // Case 4: Second call with some other instant - should use cache
-    Option<Schema> schemaOption4 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(instant1));
+    Option<HoodieSchema> schemaOption4 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(instant1)).map(HoodieSchema::fromAvroSchema);
     assertTrue(schemaOption4.isPresent());
     assertEquals(schema1, schemaOption4.get());
 
@@ -613,7 +613,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     String nonExistentTime = "9999";
     HoodieInstant nonExistentInstant = 
metaClient.getInstantGenerator().createNewInstant(
         HoodieInstant.State.COMPLETED, COMMIT_ACTION, nonExistentTime, 
nonExistentTime);
-    Option<Schema> schemaOption5 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(nonExistentInstant));
+    Option<HoodieSchema> schemaOption5 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(nonExistentInstant)).map(HoodieSchema::fromAvroSchema);
     assertEquals(schema2, schemaOption5.get());
 
     // Verify one more call to timeline for non-existent instant
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleSchemaConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleSchemaConflictResolutionStrategy.java
index db6a6d5abf0e..4873a608cb38 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleSchemaConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleSchemaConflictResolutionStrategy.java
@@ -32,6 +32,7 @@ import 
org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
@@ -137,9 +138,9 @@ public class TestSimpleSchemaConflictResolutionStrategy {
   @Test
   void testNoConflictFirstCommit() throws Exception {
     setupInstants(null, null, SCHEMA1, true, false);
-    Schema result = strategy.resolveConcurrentSchemaEvolution(
+    HoodieSchema result = strategy.resolveConcurrentSchemaEvolution(
         table, config, Option.empty(), nonTableCompactionInstant).get();
-    assertEquals(new Schema.Parser().parse(SCHEMA1), result);
+    assertEquals(HoodieSchema.parse(SCHEMA1), result);
   }
 
   @Test
@@ -152,9 +153,9 @@ public class TestSimpleSchemaConflictResolutionStrategy {
   @Test
   void testNullTypeWriterSchema() throws Exception {
     setupInstants(SCHEMA1, SCHEMA1, NULL_SCHEMA, true, false);
-    Schema result = strategy.resolveConcurrentSchemaEvolution(
+    HoodieSchema result = strategy.resolveConcurrentSchemaEvolution(
         table, config, lastCompletedTxnOwnerInstant, 
nonTableCompactionInstant).get();
-    assertEquals(new Schema.Parser().parse(SCHEMA1), result);
+    assertEquals(HoodieSchema.parse(SCHEMA1), result);
   }
 
   @Test
@@ -167,41 +168,41 @@ public class TestSimpleSchemaConflictResolutionStrategy {
   @Test
   void testConflictSecondCommitSameSchema() throws Exception {
     setupInstants(null, SCHEMA1, SCHEMA1, true, false);
-    Schema result = strategy.resolveConcurrentSchemaEvolution(
+    HoodieSchema result = strategy.resolveConcurrentSchemaEvolution(
         table, config, Option.empty(), nonTableCompactionInstant).get();
-    assertEquals(new Schema.Parser().parse(SCHEMA1), result);
+    assertEquals(HoodieSchema.parse(SCHEMA1), result);
   }
 
   @Test
   void testNoConflictSameSchema() throws Exception {
     setupInstants(SCHEMA1, SCHEMA1, SCHEMA1, true, false);
-    Schema result = strategy.resolveConcurrentSchemaEvolution(
+    HoodieSchema result = strategy.resolveConcurrentSchemaEvolution(
         table, config, lastCompletedTxnOwnerInstant, 
nonTableCompactionInstant).get();
-    assertEquals(new Schema.Parser().parse(SCHEMA1), result);
+    assertEquals(HoodieSchema.parse(SCHEMA1), result);
   }
 
   @Test
   void testNoConflictBackwardsCompatible1() throws Exception {
     setupInstants(SCHEMA1, SCHEMA2, SCHEMA1, true, false);
-    Schema result = strategy.resolveConcurrentSchemaEvolution(
+    HoodieSchema result = strategy.resolveConcurrentSchemaEvolution(
         table, config, lastCompletedTxnOwnerInstant, 
nonTableCompactionInstant).get();
-    assertEquals(new Schema.Parser().parse(SCHEMA2), result);
+    assertEquals(HoodieSchema.parse(SCHEMA2), result);
   }
 
   @Test
   void testNoConflictBackwardsCompatible2() throws Exception {
     setupInstants(SCHEMA1, SCHEMA1, SCHEMA2, true, false);
-    Schema result = strategy.resolveConcurrentSchemaEvolution(
+    HoodieSchema result = strategy.resolveConcurrentSchemaEvolution(
         table, config, lastCompletedTxnOwnerInstant, 
nonTableCompactionInstant).get();
-    assertEquals(new Schema.Parser().parse(SCHEMA2), result);
+    assertEquals(HoodieSchema.parse(SCHEMA2), result);
   }
 
   @Test
   void testNoConflictConcurrentEvolutionSameSchema() throws Exception {
     setupInstants(SCHEMA1, SCHEMA2, SCHEMA2, true, false);
-    Schema result = strategy.resolveConcurrentSchemaEvolution(
+    HoodieSchema result = strategy.resolveConcurrentSchemaEvolution(
         table, config, lastCompletedTxnOwnerInstant, 
nonTableCompactionInstant).get();
-    assertEquals(new Schema.Parser().parse(SCHEMA2), result);
+    assertEquals(HoodieSchema.parse(SCHEMA2), result);
   }
 
   @Test
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/TestHoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/TestHoodieIndexUtils.java
index 656203d8a905..a2b233925d73 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/TestHoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/TestHoodieIndexUtils.java
@@ -18,15 +18,15 @@
 
 package org.apache.hudi.index;
 
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.exception.HoodieMetadataIndexException;
 import org.apache.hudi.metadata.MetadataPartitionType;
 
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mock;
@@ -79,20 +79,17 @@ public class TestHoodieIndexUtils {
   public void testIsEligibleForSecondaryIndexWithSupportedDataTypes() {
     // Given: A schema with supported data types for secondary index 
(String/CHAR, Int, Long, Float, Double)
     // Note: CHAR is represented as STRING in Avro schema
-    Schema schema = SchemaBuilder.record("TestRecord")
-        .fields()
-        .requiredString("stringField")
-        .requiredString("charField") // CHAR is represented as STRING in Avro
-        .optionalInt("intField")
-        .requiredLong("longField")
-        .name("doubleField").type().doubleType().noDefault()
-        .endRecord();
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
Arrays.asList(
+        HoodieSchemaField.of("stringField", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+        HoodieSchemaField.of("charField", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+        HoodieSchemaField.of("intField", 
HoodieSchema.createNullable(HoodieSchemaType.INT)),
+        HoodieSchemaField.of("longField", 
HoodieSchema.create(HoodieSchemaType.LONG)),
+        HoodieSchemaField.of("doubleField", 
HoodieSchema.create(HoodieSchemaType.DOUBLE))
+    ));
 
     // Mock the schema resolver
     try (MockedConstruction<TableSchemaResolver> mockedResolver = 
Mockito.mockConstruction(TableSchemaResolver.class,
-        (mock, context) -> {
-          when(mock.getTableAvroSchema()).thenReturn(schema);
-        })) {
+        (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) {
 
       // Test case 1: Secondary index with record index already present
       // Given: Record index partition already exists
@@ -143,21 +140,18 @@ public class TestHoodieIndexUtils {
     }
   }
 
+  @Test
   public void testValidateDataTypeForSecondaryOrExpressionIndex() {
     // Create a dummy schema with both complex and primitive types
-    Schema schema = SchemaBuilder.record("TestRecord")
-        .fields()
-        .requiredString("stringField")
-        .optionalInt("intField")
-        .name("arrayField").type().array().items().stringType().noDefault()
-        .name("mapField").type().map().values().intType().noDefault()
-        .name("structField").type().record("NestedRecord")
-        .fields()
-        .requiredString("nestedString")
-        .endRecord()
-        .noDefault()
-        .endRecord();
-
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
Arrays.asList(
+        HoodieSchemaField.of("stringField", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+        HoodieSchemaField.of("intField", 
HoodieSchema.createNullable(HoodieSchemaType.INT)),
+        HoodieSchemaField.of("arrayField", 
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING))),
+        HoodieSchemaField.of("mapField", 
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT))),
+        HoodieSchemaField.of("structField", 
HoodieSchema.createRecord("NestedRecord", null, null, Arrays.asList(
+            HoodieSchemaField.of("nestedString", 
HoodieSchema.create(HoodieSchemaType.STRING))
+        )))
+    ));
     // Test for primitive fields
     
assertTrue(validateDataTypeForSecondaryOrExpressionIndex(Arrays.asList("stringField",
 "intField"), schema));
 
@@ -175,24 +169,21 @@ public class TestHoodieIndexUtils {
   @Test
   public void testValidateDataTypeForSecondaryIndex() {
     // Create a schema with various data types
-    Schema schema = SchemaBuilder.record("TestRecord")
-        .fields()
-        .requiredString("stringField")
-        .requiredString("charField") // CHAR is represented as STRING in Avro
-        .optionalInt("intField")
-        .requiredLong("longField")
-        .name("timestampField").type().longType().longDefault(0L) // timestamp 
as long
-        .name("booleanField").type().booleanType().noDefault()
-        .name("floatField").type().floatType().noDefault()
-        .name("doubleField").type().doubleType().noDefault()
-        .name("arrayField").type().array().items().stringType().noDefault()
-        .name("mapField").type().map().values().intType().noDefault()
-        .name("structField").type().record("NestedRecord")
-        .fields()
-        .requiredString("nestedString")
-        .endRecord()
-        .noDefault()
-        .endRecord();
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
Arrays.asList(
+        HoodieSchemaField.of("stringField", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+        HoodieSchemaField.of("charField", 
HoodieSchema.create(HoodieSchemaType.STRING)), // CHAR is represented as STRING
+        HoodieSchemaField.of("intField", 
HoodieSchema.createNullable(HoodieSchemaType.INT)),
+        HoodieSchemaField.of("longField", 
HoodieSchema.create(HoodieSchemaType.LONG)),
+        HoodieSchemaField.of("timestampField", 
HoodieSchema.create(HoodieSchemaType.LONG), null, 0L), // timestamp as long
+        HoodieSchemaField.of("booleanField", 
HoodieSchema.create(HoodieSchemaType.BOOLEAN)),
+        HoodieSchemaField.of("floatField", 
HoodieSchema.create(HoodieSchemaType.FLOAT)),
+        HoodieSchemaField.of("doubleField", 
HoodieSchema.create(HoodieSchemaType.DOUBLE)),
+        HoodieSchemaField.of("arrayField", 
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING))),
+        HoodieSchemaField.of("mapField", 
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT))),
+        HoodieSchemaField.of("structField", 
HoodieSchema.createRecord("NestedRecord", null, null, Arrays.asList(
+            HoodieSchemaField.of("nestedString", 
HoodieSchema.create(HoodieSchemaType.STRING))
+        )))
+    ));
 
     // Test supported types for secondary index
     
assertTrue(validateDataTypeForSecondaryIndex(Collections.singletonList("stringField"),
 schema));
@@ -227,32 +218,31 @@ public class TestHoodieIndexUtils {
   @Test
   public void testValidateDataTypeForSecondaryIndexWithLogicalTypes() {
     // Supported logical types
-    Schema timestampMillis = 
LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
-    Schema timestampMicros = 
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
-    Schema date = 
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
-    Schema timeMillis = 
LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT));
-    Schema timeMicros = 
LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
-    
+    HoodieSchema timestampMillis = HoodieSchema.createTimestampMillis();
+    HoodieSchema timestampMicros = HoodieSchema.createTimestampMicros();
+    HoodieSchema date = HoodieSchema.createDate();
+    HoodieSchema timeMillis = HoodieSchema.createTimeMillis();
+    HoodieSchema timeMicros = HoodieSchema.createTimeMicros();
+
     // Unsupported logical types
-    Schema decimal = LogicalTypes.decimal(10, 
2).addToSchema(Schema.create(Schema.Type.BYTES));
-    Schema uuid = 
LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING));
-    Schema localTimestampMillis = 
LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
-    Schema localTimestampMicros = 
LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
-    
-    Schema schemaWithLogicalTypes = SchemaBuilder.record("TestRecord")
-        .fields()
+    HoodieSchema decimal = HoodieSchema.createDecimal(10, 2);
+    HoodieSchema uuid = HoodieSchema.createUUID();
+    HoodieSchema localTimestampMillis = 
HoodieSchema.createLocalTimestampMillis();
+    HoodieSchema localTimestampMicros = 
HoodieSchema.createLocalTimestampMicros();
+
+    HoodieSchema schemaWithLogicalTypes = 
HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList(
         // Supported logical types
-        .name("timestampMillisField").type(timestampMillis).noDefault()
-        .name("timestampMicrosField").type(timestampMicros).noDefault()
-        .name("dateField").type(date).noDefault()
-        .name("timeMillisField").type(timeMillis).noDefault()
-        .name("timeMicrosField").type(timeMicros).noDefault()
+        HoodieSchemaField.of("timestampMillisField", timestampMillis),
+        HoodieSchemaField.of("timestampMicrosField", timestampMicros),
+        HoodieSchemaField.of("dateField", date),
+        HoodieSchemaField.of("timeMillisField", timeMillis),
+        HoodieSchemaField.of("timeMicrosField", timeMicros),
         // Unsupported logical types
-        .name("decimalField").type(decimal).noDefault()
-        .name("uuidField").type(uuid).noDefault()
-        
.name("localTimestampMillisField").type(localTimestampMillis).noDefault()
-        
.name("localTimestampMicrosField").type(localTimestampMicros).noDefault()
-        .endRecord();
+        HoodieSchemaField.of("decimalField", decimal),
+        HoodieSchemaField.of("uuidField", uuid),
+        HoodieSchemaField.of("localTimestampMillisField", 
localTimestampMillis),
+        HoodieSchemaField.of("localTimestampMicrosField", localTimestampMicros)
+    ));
 
     // Test supported timestamp and date/time logical types
     
assertTrue(validateDataTypeForSecondaryIndex(Collections.singletonList("timestampMillisField"),
 schemaWithLogicalTypes));
@@ -282,21 +272,18 @@ public class TestHoodieIndexUtils {
   public void testIsEligibleForSecondaryIndexWithUnsupportedDataTypes() {
     // Given: A schema with unsupported data types for secondary index 
(Boolean, Decimal)
     // Note: Float and Double are now supported
-    Schema decimalType = LogicalTypes.decimal(10, 
2).addToSchema(Schema.create(Schema.Type.BYTES));
-    Schema schema = SchemaBuilder.record("TestRecord")
-        .fields()
-        .requiredString("stringField")
-        .name("floatField").type().floatType().noDefault()
-        .name("doubleField").type().doubleType().noDefault()
-        .name("booleanField").type().booleanType().noDefault()
-        .name("decimalField").type(decimalType).noDefault()
-        .endRecord();
+    HoodieSchema decimalType = HoodieSchema.createDecimal(10, 2);
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
Arrays.asList(
+        HoodieSchemaField.of("stringField", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+        HoodieSchemaField.of("floatField", 
HoodieSchema.create(HoodieSchemaType.FLOAT)),
+        HoodieSchemaField.of("doubleField", 
HoodieSchema.create(HoodieSchemaType.DOUBLE)),
+        HoodieSchemaField.of("booleanField", 
HoodieSchema.create(HoodieSchemaType.BOOLEAN)),
+        HoodieSchemaField.of("decimalField", decimalType)
+    ));
 
     // Mock the schema resolver
     try (MockedConstruction<TableSchemaResolver> mockedResolver = 
Mockito.mockConstruction(TableSchemaResolver.class,
-        (mock, context) -> {
-          when(mock.getTableAvroSchema()).thenReturn(schema);
-        })) {
+        (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) {
 
       // Given: Record index partition exists
       Set<String> partitions = new HashSet<>();
@@ -351,8 +338,7 @@ public class TestHoodieIndexUtils {
           () -> 
HoodieIndexUtils.validateEligibilityForSecondaryOrExpressionIndex(
               mockMetaClient, PARTITION_NAME_SECONDARY_INDEX, options, 
columns, "test_index"));
       assertTrue(ex4.getMessage().contains("unsupported data type"));
-      assertTrue(ex4.getMessage().contains("BYTES with logical type"));
-      assertTrue(ex4.getMessage().contains("Decimal"));
+      assertTrue(ex4.getMessage().contains("DECIMAL"));
       assertTrue(ex4.getMessage().contains("Secondary indexes only support"));
 
       // Test case 5: Mix of supported fields (now including double)
@@ -378,20 +364,16 @@ public class TestHoodieIndexUtils {
   @Test
   public void testIsEligibleForSecondaryIndexWithLogicalTypes() {
     // Given: A schema with timestamp and date logical types
-    Schema timestampMillis = 
LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
-    Schema date = 
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
-    
-    Schema schema = SchemaBuilder.record("TestRecord")
-        .fields()
-        .name("timestampField").type(timestampMillis).noDefault()
-        .name("dateField").type(date).noDefault()
-        .endRecord();
+    HoodieSchema timestampMillis = HoodieSchema.createTimestampMillis();
+    HoodieSchema date = HoodieSchema.createDate();
 
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
Arrays.asList(
+          HoodieSchemaField.of("timestampField", timestampMillis),
+          HoodieSchemaField.of("dateField", date)
+        ));
     // Mock the schema resolver
     try (MockedConstruction<TableSchemaResolver> mockedResolver = 
Mockito.mockConstruction(TableSchemaResolver.class,
-        (mock, context) -> {
-          when(mock.getTableAvroSchema()).thenReturn(schema);
-        })) {
+        (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) {
 
       // Given: Record index partition exists
       Set<String> partitions = new HashSet<>();
@@ -420,16 +402,13 @@ public class TestHoodieIndexUtils {
   @Test
   public void testIsEligibleForSecondaryIndexWithoutRecordIndex() {
     // Given: A schema with supported data types
-    Schema schema = SchemaBuilder.record("TestRecord")
-        .fields()
-        .requiredString("stringField")
-        .endRecord();
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
Collections.singletonList(
+        HoodieSchemaField.of("stringField", 
HoodieSchema.create(HoodieSchemaType.STRING))
+    ));
 
     // Mock the schema resolver
     try (MockedConstruction<TableSchemaResolver> mockedResolver = 
Mockito.mockConstruction(TableSchemaResolver.class,
-        (mock, context) -> {
-          when(mock.getTableAvroSchema()).thenReturn(schema);
-        })) {
+        (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) {
 
       // Test case 1: No record index partition and not enabled in options
       // Given: No record index partition exists and not enabled in options
@@ -471,19 +450,16 @@ public class TestHoodieIndexUtils {
   @Test
   public void testIsEligibleForExpressionIndex() {
     // Given: A schema with various data types including complex types
-    Schema schema = SchemaBuilder.record("TestRecord")
-        .fields()
-        .requiredString("stringField")
-        .name("floatField").type().floatType().noDefault()
-        .name("arrayField").type().array().items().stringType().noDefault()
-        .name("mapField").type().map().values().intType().noDefault()
-        .endRecord();
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
Arrays.asList(
+        HoodieSchemaField.of("stringField", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+        HoodieSchemaField.of("floatField", 
HoodieSchema.create(HoodieSchemaType.FLOAT)),
+        HoodieSchemaField.of("arrayField", 
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING))),
+        HoodieSchemaField.of("mapField", 
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT)))
+    ));
 
     // Mock the schema resolver
     try (MockedConstruction<TableSchemaResolver> mockedResolver = 
Mockito.mockConstruction(TableSchemaResolver.class,
-        (mock, context) -> {
-          when(mock.getTableAvroSchema()).thenReturn(schema);
-        })) {
+        (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) {
 
       Map<String, Map<String, String>> columns = new HashMap<>();
       Map<String, String> options = new HashMap<>();
@@ -544,18 +520,17 @@ public class TestHoodieIndexUtils {
    */
   @Test
   public void testIsEligibleForExpressionIndexWithNullableFields() {
+    // An int with default 0 must have the int type defined first.
+    // If null is defined first, which HoodieSchema#createNullable does, an 
error will be thrown
+    HoodieSchema nullableIntWithDefault = 
HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.INT), 
HoodieSchema.create(HoodieSchemaType.NULL));
     // Given: A schema with nullable fields (union types)
-    Schema schema = SchemaBuilder.record("TestRecord")
-        .fields()
-        .optionalString("nullableStringField")
-        .name("nullableIntField").type().nullable().intType().intDefault(0)
-        .endRecord();
-
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
Arrays.asList(
+      HoodieSchemaField.of("nullableStringField", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+      HoodieSchemaField.of("nullableIntField", nullableIntWithDefault, null, 0)
+    ));
     // Mock the schema resolver
     try (MockedConstruction<TableSchemaResolver> mockedResolver = 
Mockito.mockConstruction(TableSchemaResolver.class,
-        (mock, context) -> {
-          when(mock.getTableAvroSchema()).thenReturn(schema);
-        })) {
+        (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) {
 
       Map<String, Map<String, String>> columns = new HashMap<>();
       columns.put("nullableStringField", Collections.emptyMap());
@@ -578,19 +553,18 @@ public class TestHoodieIndexUtils {
    */
   @Test
   public void testIsEligibleForSecondaryIndexWithNullableFields() {
+    HoodieSchema nullableIntWithDefault = 
HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.INT), 
HoodieSchema.create(HoodieSchemaType.NULL));
+    HoodieSchema nullableLongWithDefault = 
HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.LONG), 
HoodieSchema.create(HoodieSchemaType.NULL));
     // Given: A schema with nullable fields that are supported for secondary 
index
-    Schema schema = SchemaBuilder.record("TestRecord")
-        .fields()
-        .optionalString("nullableStringField")
-        .name("nullableIntField").type().nullable().intType().intDefault(0)
-        .name("nullableLongField").type().nullable().longType().longDefault(0L)
-        .endRecord();
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
Arrays.asList(
+          HoodieSchemaField.of("nullableStringField", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+          HoodieSchemaField.of("nullableIntField", nullableIntWithDefault, 
null, 0),
+          HoodieSchemaField.of("nullableLongField", nullableLongWithDefault, 
null, 0L)
+        ));
 
     // Mock the schema resolver
     try (MockedConstruction<TableSchemaResolver> mockedResolver = 
Mockito.mockConstruction(TableSchemaResolver.class,
-        (mock, context) -> {
-          when(mock.getTableAvroSchema()).thenReturn(schema);
-        })) {
+        (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) {
 
       // Given: Record index partition exists
       Set<String> partitions = new HashSet<>();
@@ -620,26 +594,23 @@ public class TestHoodieIndexUtils {
   @Test
   public void testIsEligibleForSecondaryIndexWithAllLogicalTypes() {
     // Given: A schema with all supported timestamp logical types
-    Schema timestampMillis = 
LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
-    Schema timestampMicros = 
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
-    Schema date = 
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
-    Schema timeMillis = 
LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT));
-    Schema timeMicros = 
LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
-    
-    Schema schema = SchemaBuilder.record("TestRecord")
-        .fields()
-        .name("timestampMillisField").type(timestampMillis).noDefault()
-        .name("timestampMicrosField").type(timestampMicros).noDefault()
-        .name("dateField").type(date).noDefault()
-        .name("timeMillisField").type(timeMillis).noDefault()
-        .name("timeMicrosField").type(timeMicros).noDefault()
-        .endRecord();
+    HoodieSchema timestampMillis = HoodieSchema.createTimestampMillis();
+    HoodieSchema timestampMicros = HoodieSchema.createTimestampMicros();
+    HoodieSchema date = HoodieSchema.createDate();
+    HoodieSchema timeMillis = HoodieSchema.createTimeMillis();
+    HoodieSchema timeMicros = HoodieSchema.createTimeMicros();
+
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
Arrays.asList(
+        HoodieSchemaField.of("timestampMillisField", timestampMillis),
+        HoodieSchemaField.of("timestampMicrosField", timestampMicros),
+        HoodieSchemaField.of("dateField", date),
+        HoodieSchemaField.of("timeMillisField", timeMillis),
+        HoodieSchemaField.of("timeMicrosField", timeMicros)
+        ));
 
     // Mock the schema resolver
     try (MockedConstruction<TableSchemaResolver> mockedResolver = 
Mockito.mockConstruction(TableSchemaResolver.class,
-        (mock, context) -> {
-          when(mock.getTableAvroSchema()).thenReturn(schema);
-        })) {
+        (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) {
 
       // Given: Record index is enabled
       Set<String> partitions = new HashSet<>();
@@ -671,16 +642,13 @@ public class TestHoodieIndexUtils {
   @Test
   public void testIsEligibleForSecondaryIndexWithColumnNotInSchema() {
     // Given: A schema without the requested column
-    Schema schema = SchemaBuilder.record("TestRecord")
-        .fields()
-        .requiredString("existingField")
-        .endRecord();
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
Arrays.asList(
+        HoodieSchemaField.of("existingField", 
HoodieSchema.create(HoodieSchemaType.STRING))
+    ));
 
     // Mock the schema resolver
     try (MockedConstruction<TableSchemaResolver> mockedResolver = 
Mockito.mockConstruction(TableSchemaResolver.class,
-        (mock, context) -> {
-          when(mock.getTableAvroSchema()).thenReturn(schema);
-        })) {
+        (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) {
 
       // Given: Record index is enabled
       Set<String> partitions = new HashSet<>();
@@ -711,19 +679,16 @@ public class TestHoodieIndexUtils {
   @Test
   public void testIsEligibleForSecondaryIndexWithStringLogicalTypes() {
     // Given: A schema with UUID logical type on string field
-    Schema uuidSchema = 
LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING));
-    
-    Schema schema = SchemaBuilder.record("TestRecord")
-        .fields()
-        .name("uuidField").type(uuidSchema).noDefault()
-        .requiredString("regularStringField")
-        .endRecord();
+    HoodieSchema uuidSchema = HoodieSchema.createUUID();
+
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
Arrays.asList(
+          HoodieSchemaField.of("uuidField", uuidSchema),
+          HoodieSchemaField.of("regularStringField", 
HoodieSchema.create(HoodieSchemaType.STRING))
+    ));
 
     // Mock the schema resolver
     try (MockedConstruction<TableSchemaResolver> mockedResolver = 
Mockito.mockConstruction(TableSchemaResolver.class,
-        (mock, context) -> {
-          when(mock.getTableAvroSchema()).thenReturn(schema);
-        })) {
+        (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) {
 
       // Given: Record index is enabled
       Set<String> partitions = new HashSet<>();
@@ -767,16 +732,13 @@ public class TestHoodieIndexUtils {
   @Test
   public void testIsEligibleForExpressionIndexWithColumnNotInSchema() {
     // Given: A schema without the requested column
-    Schema schema = SchemaBuilder.record("TestRecord")
-        .fields()
-        .requiredString("existingField")
-        .endRecord();
+    HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, 
Arrays.asList(
+        HoodieSchemaField.of("existingField", 
HoodieSchema.create(HoodieSchemaType.STRING))
+    ));
 
     // Mock the schema resolver
     try (MockedConstruction<TableSchemaResolver> mockedResolver = 
Mockito.mockConstruction(TableSchemaResolver.class,
-        (mock, context) -> {
-          when(mock.getTableAvroSchema()).thenReturn(schema);
-        })) {
+        (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) {
 
       Map<String, Map<String, String>> columns = new HashMap<>();
       columns.put("nonExistentField", Collections.emptyMap());
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
index 67d189ed1b1d..b0993f5d0839 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java
@@ -18,10 +18,13 @@
 
 package org.apache.hudi.testutils;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -35,7 +38,6 @@ import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.ArrayWritable;
@@ -67,8 +69,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class GenericRecordValidationTestUtils {
 
   public static void assertGenericRecords(GenericRecord record1, GenericRecord 
record2,
-                                          Schema schema, List<String> 
excludeFields) {
-    for (Schema.Field f: schema.getFields()) {
+                                          HoodieSchema schema, List<String> 
excludeFields) {
+    for (HoodieSchemaField f: schema.getFields()) {
       String fieldName = f.name();
       if (excludeFields.contains(fieldName)) {
         continue;
@@ -81,7 +83,7 @@ public class GenericRecordValidationTestUtils {
               
HoodieRealtimeRecordReaderUtils.arrayWritableToString((ArrayWritable) value2));
         } else if (value1 instanceof Text && value2 instanceof BytesWritable) {
           assertArrayEquals(((Text) value1).getBytes(), ((BytesWritable) 
value2).getBytes());
-        } else if (f.schema().getType() == Schema.Type.ENUM
+        } else if (f.schema().getType() == HoodieSchemaType.ENUM
             && value1 instanceof BytesWritable && value2 instanceof Text) {
           // TODO(HUDI-8660): Revisit ENUM handling in Spark parquet reader 
and writer
           assertArrayEquals(((BytesWritable) value1).getBytes(), ((Text) 
value2).getBytes());
@@ -126,8 +128,8 @@ public class GenericRecordValidationTestUtils {
     // Verify row count.
     assertEquals(prevRecordsMap.size(), newRecordsMap.size());
 
-    Schema readerSchema = HoodieAvroUtils.addMetadataFields(
-        new Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
+    HoodieSchema readerSchema = HoodieSchemaUtils.addMetadataFields(
+        HoodieSchema.parse(config.getSchema()), 
config.allowOperationMetadataField());
 
     // Verify every field.
     prevRecordsMap.forEach((key, value) -> {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
index cd90c8d2613a..01c830ff198b 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java
@@ -18,7 +18,9 @@
 
 package org.apache.hudi.testutils;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.HadoopFSTestUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -28,8 +30,6 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.storage.StorageConfiguration;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
@@ -65,43 +65,43 @@ public class HoodieMergeOnReadTestUtils {
 
   public static List<GenericRecord> 
getRecordsUsingInputFormat(StorageConfiguration<?> conf, List<String> 
inputPaths,
                                                                String 
basePath, JobConf jobConf, boolean realtime, boolean populateMetaFields) {
-    Schema schema = new 
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+    HoodieSchema schema = 
HoodieSchema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
     return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf, 
realtime, schema,
         HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES, false, new 
ArrayList<>(), populateMetaFields);
   }
 
-  public static List<GenericRecord> 
getRecordsUsingInputFormat(StorageConfiguration<?> conf, List<String> 
inputPaths, String basePath, JobConf jobConf, boolean realtime, Schema 
rawSchema,
+  public static List<GenericRecord> 
getRecordsUsingInputFormat(StorageConfiguration<?> conf, List<String> 
inputPaths, String basePath, JobConf jobConf, boolean realtime, HoodieSchema 
rawSchema,
                                                                String 
rawHiveColumnTypes, boolean projectCols, List<String> projectedColumns) {
     return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf, 
realtime, rawSchema, rawHiveColumnTypes, projectCols, projectedColumns, true);
   }
 
-  public static List<GenericRecord> 
getRecordsUsingInputFormat(StorageConfiguration<?> conf, List<String> 
inputPaths, String basePath, JobConf jobConf, boolean realtime, Schema 
rawSchema,
+  public static List<GenericRecord> 
getRecordsUsingInputFormat(StorageConfiguration<?> conf, List<String> 
inputPaths, String basePath, JobConf jobConf, boolean realtime, HoodieSchema 
rawSchema,
                                                                String 
rawHiveColumnTypes, boolean projectCols, List<String> projectedColumns, boolean 
populateMetaFields) {
 
     HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(conf, 
basePath);
     FileInputFormat inputFormat = 
HoodieInputFormatUtils.getInputFormat(metaClient.getTableConfig().getBaseFileFormat(),
 realtime, jobConf);
 
-    Schema schema;
+    HoodieSchema schema;
     String hiveColumnTypes;
 
     if (populateMetaFields) {
-      schema = HoodieAvroUtils.addMetadataFields(rawSchema);
-      hiveColumnTypes = 
HoodieAvroUtils.addMetadataColumnTypes(rawHiveColumnTypes);
+      schema = HoodieSchemaUtils.addMetadataFields(rawSchema);
+      hiveColumnTypes = 
HoodieSchemaUtils.addMetadataColumnTypes(rawHiveColumnTypes);
     } else {
       schema = rawSchema;
       hiveColumnTypes = rawHiveColumnTypes;
     }
 
     setPropsForInputFormat(inputFormat, jobConf, schema, hiveColumnTypes, 
projectCols, projectedColumns, populateMetaFields);
-    final List<Field> fields;
+    final List<HoodieSchemaField> fields;
     if (projectCols) {
       fields = schema.getFields().stream().filter(f -> 
projectedColumns.contains(f.name()))
           .collect(Collectors.toList());
     } else {
       fields = schema.getFields();
     }
-    final Schema projectedSchema = Schema.createRecord(fields.stream()
-        .map(HoodieAvroUtils::createNewSchemaField)
+    final HoodieSchema projectedSchema = 
HoodieSchema.createRecord("testRecord", null, null, fields.stream()
+        .map(HoodieSchemaUtils::createNewSchemaField)
         .collect(Collectors.toList()));
 
     List<GenericRecord> records = new ArrayList<>();
@@ -114,7 +114,7 @@ public class HoodieMergeOnReadTestUtils {
         Object key = recordReader.createKey();
         ArrayWritable writable = (ArrayWritable) recordReader.createValue();
         while (recordReader.next(key, writable)) {
-          GenericRecordBuilder newRecord = new 
GenericRecordBuilder(projectedSchema);
+          GenericRecordBuilder newRecord = new 
GenericRecordBuilder(projectedSchema.toAvroSchema());
           // writable returns an array with [field1, field2, 
_hoodie_commit_time,
           // _hoodie_commit_seqno]
           Writable[] values = writable.get();
@@ -122,7 +122,7 @@ public class HoodieMergeOnReadTestUtils {
               .filter(f -> !projectCols || projectedColumns.contains(f.name()))
               .map(f -> Pair.of(projectedSchema.getFields().stream()
                   .filter(p -> f.name().equals(p.name())).findFirst().get(), 
f))
-              .forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(), 
values[fieldsPair.getValue().pos()]));
+              .forEach(fieldsPair -> 
newRecord.set(fieldsPair.getKey().getAvroField(), 
values[fieldsPair.getValue().pos()]));
           records.add(newRecord.build());
         }
         recordReader.close();
@@ -133,12 +133,12 @@ public class HoodieMergeOnReadTestUtils {
     return records;
   }
 
-  private static void setPropsForInputFormat(FileInputFormat inputFormat, 
JobConf jobConf, Schema schema, String hiveColumnTypes, boolean projectCols, 
List<String> projectedCols,
+  private static void setPropsForInputFormat(FileInputFormat inputFormat, 
JobConf jobConf, HoodieSchema schema, String hiveColumnTypes, boolean 
projectCols, List<String> projectedCols,
                                              boolean 
populateMetaFieldsConfigValue) {
-    List<Field> fields = schema.getFields();
+    List<HoodieSchemaField> fields = schema.getFields();
     final List<String> projectedColNames;
     if (!projectCols) {
-      projectedColNames = 
fields.stream().map(Field::name).collect(Collectors.toList());
+      projectedColNames = 
fields.stream().map(HoodieSchemaField::name).collect(Collectors.toList());
     } else {
       projectedColNames = projectedCols;
     }
@@ -151,7 +151,7 @@ public class HoodieMergeOnReadTestUtils {
         .map(f -> String.valueOf(f.pos())).collect(Collectors.joining(","));
     String hiveColumnNames = fields.stream()
         .filter(field -> !field.name().equalsIgnoreCase("datestr"))
-        .map(Field::name).collect(Collectors.joining(","));
+        .map(HoodieSchemaField::name).collect(Collectors.joining(","));
     hiveColumnNames = hiveColumnNames + ",datestr";
 
     StorageConfiguration<?> conf = HoodieTestUtils.getDefaultStorageConf();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index c4ea3709c115..b5303f2f1bf1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -398,10 +398,6 @@ public class HoodieAvroUtils {
     return AvroSchemaUtils.createNewSchemaFromFieldsWithReference(schema, 
filteredFields);
   }
 
-  public static String addMetadataColumnTypes(String hiveColumnTypes) {
-    return "string,string,string,string,string," + hiveColumnTypes;
-  }
-
   public static Schema makeFieldNonNull(Schema schema, String fieldName, 
Object fieldDefaultValue) {
     ValidationUtils.checkArgument(fieldDefaultValue != null);
     List<Schema.Field> filteredFields = schema.getFields()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
index 2da62bb2924b..c01f04f63ff5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java
@@ -779,6 +779,10 @@ public class HoodieSchema implements Serializable {
         .anyMatch(schema -> schema.getType() == Schema.Type.NULL);
   }
 
+  public boolean isSchemaNull() {
+    return type == null || type == HoodieSchemaType.NULL;
+  }
+
   /**
    * If this is a union schema, returns the non-null type. Otherwise, returns 
this schema.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
new file mode 100644
index 000000000000..2b0b653bb24a
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Defines equality comparison rules for HoodieSchema schemas for schema 
evolution purposes.
+ *
+ * <p>This class provides schema comparison logic that focuses only on 
attributes that affect
+ * data readers/writers, ignoring metadata like documentation, namespace, and 
aliases which
+ * don't impact schema evolution compatibility.</p>
+ *
+ * <h2>Common Rules Across All Types</h2>
+ * Included in equality check:
+ * <ul>
+ *   <li>Name/identifier</li>
+ *   <li>Type including primitive type, complex type (see below), and logical 
type</li>
+ * </ul>
+ * Excluded from equality check:
+ * <ul>
+ *   <li>Namespace</li>
+ *   <li>Documentation</li>
+ *   <li>Aliases</li>
+ *   <li>Custom properties</li>
+ * </ul>
+ *
+ * <h2>Type-Specific Rules</h2>
+ *
+ * <h3>Record</h3>
+ * Included:
+ * <ul>
+ *   <li>Field names</li>
+ *   <li>Field types</li>
+ *   <li>Field order attribute</li>
+ *   <li>Default values</li>
+ * </ul>
+ * Excluded:
+ * <ul>
+ *   <li>Field documentation</li>
+ *   <li>Field aliases</li>
+ * </ul>
+ *
+ * <h3>Enum</h3>
+ * Included:
+ * <ul>
+ *   <li>Name</li>
+ *   <li>Symbol order</li>
+ *   <li>Symbol value</li>
+ * </ul>
+ * Excluded:
+ * <ul>
+ *   <li>Custom properties</li>
+ * </ul>
+ *
+ * <h3>Array</h3>
+ * Included:
+ * <ul>
+ *   <li>Items schema</li>
+ * </ul>
+ * Excluded:
+ * <ul>
+ *   <li>Documentation</li>
+ *   <li>Custom properties</li>
+ * </ul>
+ *
+ * <h3>Map</h3>
+ * Included:
+ * <ul>
+ *   <li>Values schema</li>
+ * </ul>
+ * Excluded:
+ * <ul>
+ *   <li>Documentation</li>
+ *   <li>Custom properties</li>
+ * </ul>
+ *
+ * <h3>Fixed</h3>
+ * Included:
+ * <ul>
+ *   <li>Size</li>
+ *   <li>Name</li>
+ * </ul>
+ * Excluded:
+ * <ul>
+ *   <li>Namespace</li>
+ *   <li>Aliases</li>
+ * </ul>
+ *
+ * <h3>Union</h3>
+ * Included:
+ * <ul>
+ *   <li>Member types</li>
+ * </ul>
+ * Excluded:
+ * <ul>
+ *   <li>Member order</li>
+ * </ul>
+ *
+ * <h3>Logical Types</h3>
+ * Included:
+ * <ul>
+ *   <li>Logical type name (via schema subclass)</li>
+ *   <li>Underlying primitive type</li>
+ *   <li>Decimal precision/scale (if applicable)</li>
+ *   <li>Timestamp/Time precision (if applicable)</li>
+ * </ul>
+ * Excluded:
+ * <ul>
+ *   <li>Documentation</li>
+ *   <li>Custom properties</li>
+ * </ul>
+ */
+public class HoodieSchemaComparatorForSchemaEvolution {
+
+  protected HoodieSchemaComparatorForSchemaEvolution() {
+  }
+
+  private static final HoodieSchemaComparatorForSchemaEvolution VALIDATOR = 
new HoodieSchemaComparatorForSchemaEvolution();
+
+  public static boolean schemaEquals(HoodieSchema s1, HoodieSchema s2) {
+    return VALIDATOR.schemaEqualsInternal(s1, s2);
+  }
+
+  protected boolean schemaEqualsInternal(HoodieSchema s1, HoodieSchema s2) {
+    if (s1 == s2) {
+      return true;
+    }
+    if (s1 == null || s2 == null) {
+      return false;
+    }
+    if (s1.getType() != s2.getType()) {
+      return false;
+    }
+
+    switch (s1.getType()) {
+      case RECORD:
+        return recordSchemaEquals(s1, s2);
+      case ENUM:
+        return enumSchemaEquals(s1, s2);
+      case ARRAY:
+        return arraySchemaEquals(s1, s2);
+      case MAP:
+        return mapSchemaEquals(s1, s2);
+      case FIXED:
+        return fixedSchemaEquals(s1, s2);
+      case UNION:
+        return unionSchemaEquals(s1, s2);
+      case STRING:
+      case BYTES:
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case BOOLEAN:
+      case NULL:
+      case DECIMAL:
+      case TIME:
+      case TIMESTAMP:
+      case DATE:
+      case UUID:
+        return primitiveSchemaEquals(s1, s2);
+      default:
+        throw new IllegalArgumentException("Unknown schema type: " + 
s1.getType());
+    }
+  }
+
+  protected boolean validateRecord(HoodieSchema s1, HoodieSchema s2) {
+    if (s1.isError() != s2.isError()) {
+      return false;
+    }
+
+    return logicalTypeSchemaEquals(s1, s2);
+  }
+
+  private boolean recordSchemaEquals(HoodieSchema s1, HoodieSchema s2) {
+    if (!validateRecord(s1, s2)) {
+      return false;
+    }
+
+    List<HoodieSchemaField> fields1 = s1.getFields();
+    List<HoodieSchemaField> fields2 = s2.getFields();
+
+    if (fields1.size() != fields2.size()) {
+      return false;
+    }
+
+    for (int i = 0; i < fields1.size(); i++) {
+      if (!fieldEquals(fields1.get(i), fields2.get(i))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  protected boolean validateField(HoodieSchemaField f1, HoodieSchemaField f2) {
+    if (!f1.name().equals(f2.name())) {
+      return false;
+    }
+
+    if (f1.order() != f2.order()) {
+      return false;
+    }
+
+    // Check if both have default values
+    if (f1.hasDefaultValue() != f2.hasDefaultValue()) {
+      return false;
+    }
+
+    // If both have default values, they must be equal
+    if (f1.hasDefaultValue() && 
!f1.defaultVal().get().equals(f2.defaultVal().get())) {
+      return false;
+    }
+
+    return true;
+  }
+
+  private boolean fieldEquals(HoodieSchemaField f1, HoodieSchemaField f2) {
+    if (!validateField(f1, f2)) {
+      return false;
+    }
+
+    return schemaEqualsInternal(f1.schema(), f2.schema());
+  }
+
+  protected boolean enumSchemaEquals(HoodieSchema s1, HoodieSchema s2) {
+    // Check name equality first
+    if (!s1.getName().equals(s2.getName())) {
+      return false;
+    }
+
+    List<String> symbols1 = s1.getEnumSymbols();
+    List<String> symbols2 = s2.getEnumSymbols();
+
+    // Quick size check before creating sets
+    if (symbols1.size() != symbols2.size()) {
+      return false;
+    }
+
+    return symbols1.equals(symbols2);
+  }
+
+  protected boolean unionSchemaEquals(HoodieSchema s1, HoodieSchema s2) {
+    List<HoodieSchema> types1 = s1.getTypes();
+    List<HoodieSchema> types2 = s2.getTypes();
+
+    if (types1.size() != types2.size()) {
+      return false;
+    }
+
+    // Create sets of effectively equal types
+    Set<SchemaWrapper> set1 = 
types1.stream().map(SchemaWrapper::new).collect(Collectors.toSet());
+    Set<SchemaWrapper> set2 = 
types2.stream().map(SchemaWrapper::new).collect(Collectors.toSet());
+
+    // Compare sets instead of ordered lists
+    return set1.equals(set2);
+  }
+
+  private boolean arraySchemaEquals(HoodieSchema s1, HoodieSchema s2) {
+    return schemaEqualsInternal(s1.getElementType(), s2.getElementType());
+  }
+
+  private boolean mapSchemaEquals(HoodieSchema s1, HoodieSchema s2) {
+    return schemaEqualsInternal(s1.getValueType(), s2.getValueType());
+  }
+
+  protected boolean validateFixed(HoodieSchema s1, HoodieSchema s2) {
+    return s1.getName().equals(s2.getName()) && s1.getFixedSize() == 
s2.getFixedSize();
+  }
+
+  private boolean fixedSchemaEquals(HoodieSchema s1, HoodieSchema s2) {
+    if (!validateFixed(s1, s2)) {
+      return false;
+    }
+    return logicalTypeSchemaEquals(s1, s2);
+  }
+
+  private static boolean primitiveSchemaEquals(HoodieSchema s1, HoodieSchema 
s2) {
+    // For primitive types, just check logical type
+    return logicalTypeSchemaEquals(s1, s2);
+  }
+
+  private static boolean logicalTypeSchemaEquals(HoodieSchema s1, HoodieSchema 
s2) {
+    // Check if both schemas are of the same logical type class
+    boolean s1IsDecimal = s1.getType() == HoodieSchemaType.DECIMAL;
+    boolean s2IsDecimal = s2.getType() == HoodieSchemaType.DECIMAL;
+    boolean s1IsTimestamp = s1.getType() == HoodieSchemaType.TIMESTAMP;
+    boolean s2IsTimestamp = s2.getType() == HoodieSchemaType.TIMESTAMP;
+    boolean s1IsTime = s1.getType() == HoodieSchemaType.TIME;
+    boolean s2IsTime = s2.getType() == HoodieSchemaType.TIME;
+
+    // If one is a logical type and the other isn't, they're not equal
+    if (s1IsDecimal != s2IsDecimal || s1IsTimestamp != s2IsTimestamp || 
s1IsTime != s2IsTime) {
+      return false;
+    }
+
+    // If both are decimals, compare precision, scale, and underlying type 
(FIXED vs BYTES)
+    if (s1IsDecimal) {
+      HoodieSchema.Decimal d1 = (HoodieSchema.Decimal) s1;
+      HoodieSchema.Decimal d2 = (HoodieSchema.Decimal) s2;
+      // Check if both use same underlying representation (FIXED vs BYTES)
+      if (d1.isFixed() != d2.isFixed()) {
+        return false;
+      }
+      return d1.getPrecision() == d2.getPrecision() && d1.getScale() == 
d2.getScale();
+    }
+
+    // If both are timestamps, compare precision and UTC adjustment
+    if (s1IsTimestamp) {
+      HoodieSchema.Timestamp t1 = (HoodieSchema.Timestamp) s1;
+      HoodieSchema.Timestamp t2 = (HoodieSchema.Timestamp) s2;
+      return t1.getPrecision() == t2.getPrecision() && t1.isUtcAdjusted() == 
t2.isUtcAdjusted();
+    }
+
+    // If both are time types, compare precision
+    // Note: time-millis is INT, time-micros is LONG, so they have different 
underlying types
+    // which is reflected in their precision values
+    if (s1IsTime) {
+      HoodieSchema.Time t1 = (HoodieSchema.Time) s1;
+      HoodieSchema.Time t2 = (HoodieSchema.Time) s2;
+      return t1.getPrecision() == t2.getPrecision();
+    }
+
+    // For non-logical types, they're equal
+    return true;
+  }
+
+  /**
+   * Wrapper class to use HoodieSchema in HashSet with our custom equality
+   */
+  static class SchemaWrapper {
+    private final HoodieSchema schema;
+
+    public SchemaWrapper(HoodieSchema schema) {
+      this.schema = schema;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      SchemaWrapper that = (SchemaWrapper) o;
+      return schemaEquals(schema, that.schema);
+    }
+
+    @Override
+    public int hashCode() {
+      // This is a simplified hash code that considers only the type
+      // It's not perfect but good enough for our use case
+      return schema.getType().hashCode();
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
index 4a2b458044d9..a994bb5761bd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
@@ -598,4 +598,8 @@ public final class HoodieSchemaUtils {
   public static HoodieSchema projectSchema(HoodieSchema fileSchema, 
List<String> fields) {
     return 
HoodieSchema.fromAvroSchema(HoodieAvroUtils.projectSchema(fileSchema.toAvroSchema(),
 fields));
   }
+
+  public static String addMetadataColumnTypes(String hiveColumnTypes) {
+    return "string,string,string,string,string," + hiveColumnTypes;
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
new file mode 100644
index 000000000000..c044ae9bce87
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema;
+
+import org.apache.hudi.io.util.FileIOUtils;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestHoodieSchemaComparatorForSchemaEvolution {
+  @Test
+  void testAttrsIrrelevantToEquality() throws IOException {
+    // Validates that schemas with different non-essential attributes (like 
doc strings or aliases)
+    // are still considered equivalent for schema evolution purposes
+    String schemaA = 
FileIOUtils.readAsUTFString(TestHoodieSchemaComparatorForSchemaEvolution.class.getResourceAsStream("/avro-schema-evo/schema-allshapes-A.txt"));
+    String schemaB = 
FileIOUtils.readAsUTFString(TestHoodieSchemaComparatorForSchemaEvolution.class.getResourceAsStream("/avro-schema-evo/schema-allshapes-B.txt"));
+
+    HoodieSchema schema1 = HoodieSchema.parse(schemaA);
+    HoodieSchema schema2 = HoodieSchema.parse(schemaB);
+    assertNotEquals(schema1, schema2);
+    assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema1, 
schema2));
+    assertEquals(new 
HoodieSchemaComparatorForSchemaEvolution.SchemaWrapper(schema1),
+        new HoodieSchemaComparatorForSchemaEvolution.SchemaWrapper(schema2));
+  }
+
+  @Test
+  void testComparingPrimitiveTypes() {
+    // Tests comparison of all primitive types against each other
+    // Validates that each primitive type is equal only to other schemas 
sharing the same
+    // primitive type.
+    HoodieSchemaType[] primitiveTypes = {
+        HoodieSchemaType.NULL, HoodieSchemaType.BOOLEAN, HoodieSchemaType.INT,
+        HoodieSchemaType.LONG, HoodieSchemaType.FLOAT, HoodieSchemaType.DOUBLE,
+        HoodieSchemaType.BYTES, HoodieSchemaType.STRING
+    };
+
+    for (HoodieSchemaType primitiveType : primitiveTypes) {
+      for (HoodieSchemaType type : primitiveTypes) {
+        if (primitiveType == type) {
+          assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+              HoodieSchema.create(primitiveType),
+              HoodieSchema.create(type)
+          ));
+        } else {
+          assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+              HoodieSchema.create(primitiveType),
+              HoodieSchema.create(type)
+          ), String.format("Types %s and %s should not be equal",
+              primitiveType, type));
+        }
+      }
+    }
+  }
+
+  @Test
+  void testEqualToSelf() {
+    // Validates that a schema is equal to itself
+    // Basic sanity check for schema comparison
+    String schema = "{\"type\":\"record\",\"name\":\"R\",\"fields\":["
+        + "{\"name\":\"field1\",\"type\":\"string\"}]}";
+    assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(schema),
+        HoodieSchema.parse(schema)
+    ));
+  }
+
+  @Test
+  void testIsErrorFieldInRecordSchema() {
+    // Tests that a record schema is not equal to an error schema
+    // even if they have the same structure
+    HoodieSchema record1 = HoodieSchema.createRecord("TestRecord", null, null, 
false,
+        Arrays.asList(
+            HoodieSchemaField.of("field1", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null)
+        ));
+
+    HoodieSchema record2 = HoodieSchema.createRecord("TestRecord", null, null, 
true, // error record
+        Arrays.asList(
+            HoodieSchemaField.of("field1", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null)
+        ));
+
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, 
record2));
+  }
+
+  @Test
+  void testRecordFieldTypes() {
+    // Validates that records with fields of different types are not 
considered equal
+    // even if the field names are the same
+    String schema1 = "{\"type\":\"record\",\"name\":\"R\",\"fields\":["
+        + "{\"name\":\"field1\",\"type\":\"string\"}]}";
+    String schema2 = "{\"type\":\"record\",\"name\":\"R\",\"fields\":["
+        + "{\"name\":\"field1\",\"type\":\"int\"}]}";
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(schema1),
+        HoodieSchema.parse(schema2)
+    ));
+  }
+
+  @Test
+  void testRecordFieldOrderAttribute() {
+    // Tests that records with different field order attributes are not equal
+    // This is important for schema evolution as order affects serialization
+    String schema1 = "{\"type\":\"record\",\"name\":\"R\",\"fields\":["
+        + 
"{\"name\":\"field1\",\"type\":\"string\",\"order\":\"ascending\"}]}";
+    String schema2 = "{\"type\":\"record\",\"name\":\"R\",\"fields\":["
+        + 
"{\"name\":\"field1\",\"type\":\"string\",\"order\":\"descending\"}]}";
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(schema1),
+        HoodieSchema.parse(schema2)
+    ));
+  }
+
+  @Test
+  void testArraySchema() {
+    // Validates that array schemas with different item types are not equal
+    // even if the array structure is the same
+    String schema1 = "{\"type\":\"array\",\"items\":\"string\"}";
+    String schema2 = "{\"type\":\"array\",\"items\":\"int\"}";
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(schema1),
+        HoodieSchema.parse(schema2)
+    ));
+  }
+
+  @Test
+  void testMapSchema() {
+    // Tests that map schemas with different value types are not equal
+    // even if the map structure is the same
+    String schema1 = "{\"type\":\"map\",\"values\":\"string\"}";
+    String schema2 = "{\"type\":\"map\",\"values\":\"int\"}";
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(schema1),
+        HoodieSchema.parse(schema2)
+    ));
+  }
+
+  @Test
+  void testFixedSchemaSizeAttr() {
+    // Validates that fixed-type schemas with different sizes are not equal
+    // Size is a critical attribute for fixed-length fields
+    String schema1 = "{\"type\":\"fixed\",\"name\":\"F\",\"size\":16}";
+    String schema2 = "{\"type\":\"fixed\",\"name\":\"F\",\"size\":32}";
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(schema1),
+        HoodieSchema.parse(schema2)
+    ));
+  }
+
+  @Test
+  void testUnionMemberTypes() {
+    // Tests that unions with different member types are not equal
+    // even if they have the same number of members
+    String schema1 = "[\"null\",\"string\"]";
+    String schema2 = "[\"null\",\"int\"]";
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(schema1),
+        HoodieSchema.parse(schema2)
+    ));
+  }
+
+  @Test
+  void testUnionMemberOrdering() {
+    // Validates that the order of union members doesn't affect equality
+    String schema1 = "[\"null\",\"string\"]";
+    String schema2 = "[\"string\",\"null\"]";
+    assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(schema1),
+        HoodieSchema.parse(schema2)
+    ));
+  }
+
+  @Test
+  void testLogicalTypeDecimalAttr() {
+    // Tests that decimal logical types with different precision and scale are 
not equal
+    String schema1 = "{\"type\":\"fixed\",\"name\":\"D\",\"size\":16,"
+        + "\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}";
+    String schema2 = "{\"type\":\"fixed\",\"name\":\"D\",\"size\":16,"
+        + "\"logicalType\":\"decimal\",\"precision\":8,\"scale\":2}";
+    String schema3 = "{\"type\":\"fixed\",\"name\":\"D\",\"size\":16,"
+        + "\"logicalType\":\"decimal\",\"precision\":8,\"scale\":3}";
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(schema1),
+        HoodieSchema.parse(schema2)
+    ));
+
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(schema2),
+        HoodieSchema.parse(schema3)
+    ));
+  }
+
+  @Test
+  void testLogicalType() {
+    // Validates that different logical types on the same underlying type are 
not equal
+    String schema1 = "{\"type\":\"int\",\"logicalType\":\"date\"}";
+    String schema2 = "{\"type\":\"int\",\"logicalType\":\"time-millis\"}";
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(schema1),
+        HoodieSchema.parse(schema2)
+    ));
+  }
+
+  @Test
+  void testLogicalTypesWithDifferentPrimitiveTypes() {
+    // Tests that logical types with different underlying types are not equal
+    // even if they represent the same logical concept (decimal)
+    String decimalFixed = 
"{\"type\":\"fixed\",\"name\":\"D\",\"size\":16,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}";
+    String decimalBytes = 
"{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}";
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(decimalFixed),
+        HoodieSchema.parse(decimalBytes)
+    ));
+  }
+
+  @Test
+  void testComparingSchemaFieldNames() {
+    // Validates that schemas with different names are not equal
+    // even if their structure is identical - tests for records, enums, and 
fixed types
+    String record1 = 
"{\"type\":\"record\",\"name\":\"R1\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}";
+    String record2 = 
"{\"type\":\"record\",\"name\":\"R2\",\"fields\":[{\"name\":\"f2\",\"type\":\"string\"}]}";
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(record1),
+        HoodieSchema.parse(record2)
+    ));
+
+    // Enum
+    String enum1 = "{\"type\":\"enum\",\"name\":\"E1\",\"symbols\":[\"A\"]}";
+    String enum2 = "{\"type\":\"enum\",\"name\":\"E2\",\"symbols\":[\"A\"]}";
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(enum1),
+        HoodieSchema.parse(enum2)
+    ));
+
+    // Fixed
+    String fixed1 = "{\"type\":\"fixed\",\"name\":\"F1\",\"size\":16}";
+    String fixed2 = "{\"type\":\"fixed\",\"name\":\"F2\",\"size\":16}";
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(fixed1),
+        HoodieSchema.parse(fixed2)
+    ));
+  }
+
+  @Test
+  void testEnumSchemaName() {
+    // Tests that enum schemas with different names are not equal
+    // even if they have the same symbols
+    HoodieSchema schema1 = HoodieSchema.createEnum("enum1", null, null, 
Arrays.asList("A", "B", "C"));
+    HoodieSchema schema2 = HoodieSchema.createEnum("enum2", null, null, 
Arrays.asList("A", "B", "C"));
+    HoodieSchema schema3 = HoodieSchema.createEnum("enum1", null, null, 
Arrays.asList("A", "B", "C"));
+
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema1, 
schema2));
+    assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema1, 
schema3));
+  }
+
+  @Test
+  void testEnumSchema() {
+    // Validates that enum schemas with different symbol sets are not equal
+    // even if one is a subset of the other
+    HoodieSchema schema1 = HoodieSchema.createEnum("enum", null, null, 
Arrays.asList("A", "C"));
+    HoodieSchema schema2 = HoodieSchema.createEnum("enum", null, null, 
Arrays.asList("A", "B", "C"));
+
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema1, 
schema2));
+  }
+
+  @Test
+  void testEnumSymbolsOrder() {
+    // Tests that enum schemas with different symbol orders are not equal
+    // Order matters for enum serialization
+    String schema1 = 
"{\"type\":\"enum\",\"name\":\"E\",\"symbols\":[\"A\",\"B\"]}";
+    String schema2 = 
"{\"type\":\"enum\",\"name\":\"E\",\"symbols\":[\"B\",\"A\"]}";
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(schema1),
+        HoodieSchema.parse(schema2)
+    ));
+  }
+
+  @Test
+  void testDefaultValueEquality() {
+    // Tests comparison of schemas with different default values
+    HoodieSchemaField field1 = HoodieSchemaField.of("field", 
HoodieSchema.create(HoodieSchemaType.STRING), null, "default1");
+    HoodieSchemaField field2 = HoodieSchemaField.of("field", 
HoodieSchema.create(HoodieSchemaType.STRING), null, "default2");
+    HoodieSchemaField field3 = HoodieSchemaField.of("field", 
HoodieSchema.create(HoodieSchemaType.STRING), null, "default1");
+    HoodieSchemaField fieldNoDefault = HoodieSchemaField.of("field", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null);
+
+    HoodieSchema record1 = HoodieSchema.createRecord("test", null, null, 
false, Collections.singletonList(field1));
+    HoodieSchema record2 = HoodieSchema.createRecord("test", null, null, 
false, Collections.singletonList(field2));
+    HoodieSchema record3 = HoodieSchema.createRecord("test", null, null, 
false, Collections.singletonList(field3));
+    HoodieSchema recordNoDefault = HoodieSchema.createRecord("test", null, 
null, false, Collections.singletonList(fieldNoDefault));
+
+    // Different default values should not be equal
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, 
record2));
+
+    // Same default values should be equal
+    assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, 
record3));
+
+    // No default value vs default value should not be equal
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, 
recordNoDefault));
+
+    // No default values should be equal to each other
+    
assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(recordNoDefault,
 recordNoDefault));
+  }
+
+  @Test
+  void testComplexDefaultValueEquality() {
+    // Tests equality comparison of schemas with complex default values 
(nested records)
+    // Validates that default value comparison works correctly for nested 
structures
+    HoodieSchema innerSchema = HoodieSchema.createRecord("inner", null, null, 
false,
+        Collections.singletonList(
+            HoodieSchemaField.of("value", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null)
+        ));
+
+    // Create default values as JSON-compatible Maps
+    Map<String, Object> defaultValue1 = new HashMap<>();
+    defaultValue1.put("value", "test");
+
+    Map<String, Object> defaultValue2 = new HashMap<>();
+    defaultValue2.put("value", "test");
+
+    Map<String, Object> defaultValue3 = new HashMap<>();
+    defaultValue3.put("value", "different");
+
+    HoodieSchemaField field1 = HoodieSchemaField.of("field", innerSchema, 
null, defaultValue1);
+    HoodieSchemaField field2 = HoodieSchemaField.of("field", innerSchema, 
null, defaultValue2);
+    HoodieSchemaField field3 = HoodieSchemaField.of("field", innerSchema, 
null, defaultValue3);
+
+    HoodieSchema record1 = HoodieSchema.createRecord("test", null, null, 
false, Collections.singletonList(field1));
+    HoodieSchema record2 = HoodieSchema.createRecord("test", null, null, 
false, Collections.singletonList(field2));
+    HoodieSchema record3 = HoodieSchema.createRecord("test", null, null, 
false, Collections.singletonList(field3));
+
+    // Same complex default values should be equal
+    assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, 
record2));
+
+    // Different complex default values should not be equal
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, 
record3));
+  }
+
+  @Test
+  void testArrayDefaultValueEquality() {
+    // Tests equality comparison of schemas with array default values
+    // Validates that default value comparison works correctly for array types
+    List<String> defaultArray1 = Arrays.asList("a", "b", "c");
+    List<String> defaultArray2 = Arrays.asList("a", "b", "c");
+    List<String> defaultArray3 = Arrays.asList("x", "y", "z");
+
+    HoodieSchema arraySchema = 
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING));
+
+    HoodieSchemaField field1 = HoodieSchemaField.of("field", arraySchema, 
null, defaultArray1);
+    HoodieSchemaField field2 = HoodieSchemaField.of("field", arraySchema, 
null, defaultArray2);
+    HoodieSchemaField field3 = HoodieSchemaField.of("field", arraySchema, 
null, defaultArray3);
+
+    HoodieSchema record1 = HoodieSchema.createRecord("test", null, null, 
false, Collections.singletonList(field1));
+    HoodieSchema record2 = HoodieSchema.createRecord("test", null, null, 
false, Collections.singletonList(field2));
+    HoodieSchema record3 = HoodieSchema.createRecord("test", null, null, 
false, Collections.singletonList(field3));
+
+    // Same array default values should be equal
+    assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, 
record2));
+
+    // Different array default values should not be equal
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, 
record3));
+  }
+
+  @Test
+  void testCompareWithNull() {
+    // Tests schema comparison behavior when one or both schemas are null
+    // Validates proper handling of null cases in the comparator
+    HoodieSchema schema = HoodieSchema.create(HoodieSchemaType.STRING);
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema, 
null));
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(null, 
schema));
+    assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(null, 
null));
+  }
+
+  @Test
+  void testRecordFieldCountMismatch() {
+    // Tests that records with different number of fields are not equal
+    // even if all common fields match
+    HoodieSchema record1 = HoodieSchema.createRecord("TestRecord", null, null, 
false,
+        Collections.singletonList(
+            HoodieSchemaField.of("field1", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null)
+        ));
+
+    HoodieSchema record2 = HoodieSchema.createRecord("TestRecord", null, null, 
false,
+        Arrays.asList(
+            HoodieSchemaField.of("field1", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+            HoodieSchemaField.of("field2", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null)
+        ));
+
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, 
record2));
+  }
+
+  @Test
+  void testUnionSizeMismatch() {
+    // Tests that unions with different number of types are not equal
+    // even if all common types match
+    HoodieSchema union1 = HoodieSchema.createUnion(Arrays.asList(
+        HoodieSchema.create(HoodieSchemaType.NULL),
+        HoodieSchema.create(HoodieSchemaType.STRING)
+    ));
+
+    HoodieSchema union2 = HoodieSchema.createUnion(Arrays.asList(
+        HoodieSchema.create(HoodieSchemaType.NULL),
+        HoodieSchema.create(HoodieSchemaType.STRING),
+        HoodieSchema.create(HoodieSchemaType.INT)
+    ));
+
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(union1, 
union2));
+  }
+
+  @Test
+  void testUnionOrder() {
+    // Tests that the order of types in a union doesn't affect equality
+    // Important for schema evolution as union member order shouldn't matter
+    HoodieSchema union1 = HoodieSchema.createUnion(Arrays.asList(
+        HoodieSchema.create(HoodieSchemaType.NULL),
+        HoodieSchema.create(HoodieSchemaType.STRING)
+    ));
+
+    HoodieSchema union2 = HoodieSchema.createUnion(Arrays.asList(
+        HoodieSchema.create(HoodieSchemaType.STRING),
+        HoodieSchema.create(HoodieSchemaType.NULL)
+    ));
+
+    assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(union1, 
union2));
+  }
+
+  @Test
+  void testLogicalTypeOneNull() {
+    // Tests comparison of schemas where one has a logical type and the other 
doesn't
+    // Validates that logical type presence affects equality
+    String schema1 = "{\"type\":\"int\",\"logicalType\":\"date\"}";
+    String schema2 = "{\"type\":\"int\"}";
+
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(schema1),
+        HoodieSchema.parse(schema2)
+    ));
+
+    // Swap the 2 schema position should have no effect.
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(schema2),
+        HoodieSchema.parse(schema1)
+    ));
+  }
+
+  @Test
+  void testSchemaWrapperNullAndTypeMismatch() {
+    // Tests SchemaWrapper's null handling and type comparison behavior
+    // Validates proper handling of edge cases in the wrapper class
+    HoodieSchema schema = HoodieSchema.create(HoodieSchemaType.STRING);
+    HoodieSchemaComparatorForSchemaEvolution.SchemaWrapper wrapper = new 
HoodieSchemaComparatorForSchemaEvolution.SchemaWrapper(schema);
+
+    assertNotNull(wrapper);
+    assertNotEquals(wrapper, new Object());
+  }
+
+  @Test
+  void testTimestampLogicalTypeEquality() {
+    // Tests that timestamp logical types with different precisions are not 
equal
+    String timestampMillis = 
"{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}";
+    String timestampMicros = 
"{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}";
+
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(timestampMillis),
+        HoodieSchema.parse(timestampMicros)
+    ));
+  }
+
+  @Test
+  void testTimeLogicalTypeEquality() {
+    // Tests that time logical types with different precisions are not equal
+    String timeMillis = "{\"type\":\"int\",\"logicalType\":\"time-millis\"}";
+    String timeMicros = "{\"type\":\"long\",\"logicalType\":\"time-micros\"}";
+
+    assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(
+        HoodieSchema.parse(timeMillis),
+        HoodieSchema.parse(timeMicros)
+    ));
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
index e90956d175e1..e571d936462b 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.hadoop.hive;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -336,7 +335,7 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
 
     String hiveColumnNames = 
fields.stream().map(HoodieSchemaField::name).collect(Collectors.joining(","));
     hiveColumnNames = hiveColumnNames + ",year,month,day";
-    String modifiedHiveColumnTypes = 
HoodieAvroUtils.addMetadataColumnTypes(tripsHiveColumnTypes);
+    String modifiedHiveColumnTypes = 
HoodieSchemaUtils.addMetadataColumnTypes(tripsHiveColumnTypes);
     modifiedHiveColumnTypes = modifiedHiveColumnTypes + 
",string,string,string";
     jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
     jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, 
modifiedHiveColumnTypes);
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index a8148715dc87..2f502033f208 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -468,7 +468,7 @@ public class InputFormatTestUtil {
     String hiveColumnNames = fields.stream().filter(field -> 
!field.name().equalsIgnoreCase("datestr"))
         .map(Schema.Field::name).collect(Collectors.joining(","));
     hiveColumnNames = hiveColumnNames + ",datestr";
-    String modifiedHiveColumnTypes = 
HoodieAvroUtils.addMetadataColumnTypes(hiveColumnTypes);
+    String modifiedHiveColumnTypes = 
HoodieSchemaUtils.addMetadataColumnTypes(hiveColumnTypes);
     modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string";
     jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
     jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, 
modifiedHiveColumnTypes);
@@ -495,7 +495,7 @@ public class InputFormatTestUtil {
     String hiveColumnNames = fields.stream().filter(field -> 
!field.name().equalsIgnoreCase("datestr"))
         .map(Schema.Field::name).collect(Collectors.joining(","));
     hiveColumnNames = hiveColumnNames + ",datestr";
-    String modifiedHiveColumnTypes = 
HoodieAvroUtils.addMetadataColumnTypes(hiveColumnTypes);
+    String modifiedHiveColumnTypes = 
HoodieSchemaUtils.addMetadataColumnTypes(hiveColumnTypes);
     modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string";
     jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
     jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, 
modifiedHiveColumnTypes);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
index f421d3a2741a..73568ebb48f2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java
@@ -28,6 +28,7 @@ import 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
@@ -46,7 +47,6 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
 import org.apache.hudi.testutils.HoodieSparkWriteableTestTable;
 import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
@@ -178,7 +178,7 @@ public class TestDataValidationCheckForLogCompactionActions 
extends HoodieClient
     // Verify row count.
     assertEquals(mainRecordsMap.size(), experimentRecordsMap.size());
 
-    Schema readerSchema = new 
Schema.Parser().parse(mainTable.config.getSchema());
+    HoodieSchema readerSchema = 
HoodieSchema.parse(mainTable.config.getSchema());
     List<String> excludeFields = 
CollectionUtils.createImmutableList(COMMIT_TIME_METADATA_FIELD, 
COMMIT_SEQNO_METADATA_FIELD,
         FILENAME_METADATA_FIELD, OPERATION_METADATA_FIELD, 
RECORD_KEY_METADATA_FIELD);
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index c1d79a9a2ea5..f529ecd50adb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -165,7 +166,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     rtInputFormat.setConf(rtJobConf);
   }
 
-  public Schema generateNewDataSetAndReturnSchema(long timestamp, int 
numRecords, List<String> partitionPaths,
+  public HoodieSchema generateNewDataSetAndReturnSchema(long timestamp, int 
numRecords, List<String> partitionPaths,
       String srcPath) throws Exception {
     boolean isPartitioned = partitionPaths != null && 
!partitionPaths.isEmpty();
     Dataset<Row> df =
@@ -185,7 +186,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     HoodieAvroParquetReader parquetReader =
         new HoodieAvroParquetReader(metaClient.getStorage(), new 
StoragePath(filePath));
     //TODO boundary to revisit in later pr to use HoodieSchema directly
-    return parquetReader.getSchema().toAvroSchema();
+    return parquetReader.getSchema();
   }
 
   @Test
@@ -254,7 +255,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     }
     List<String> partitions = partitioned ? Arrays.asList("2020/04/01", 
"2020/04/02", "2020/04/03") : Collections.EMPTY_LIST;
     long timestamp = Instant.now().toEpochMilli();
-    Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, 
partitions, bootstrapBasePath);
+    HoodieSchema schema = generateNewDataSetAndReturnSchema(timestamp, 
totalRecords, partitions, bootstrapBasePath);
     HoodieWriteConfig config = getConfigBuilder(schema.toString())
         .withSchema(schema.toString())
         .withKeyGenerator(keyGeneratorClass)
@@ -372,13 +373,13 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     testBootstrapCommon(true, true, EffectiveMode.MIXED_BOOTSTRAP_MODE);
   }
 
-  private void checkBootstrapResults(int totalRecords, Schema schema, String 
maxInstant, boolean checkNumRawFiles,
+  private void checkBootstrapResults(int totalRecords, HoodieSchema schema, 
String maxInstant, boolean checkNumRawFiles,
       int expNumInstants, long expTimestamp, long expROTimestamp, boolean 
isDeltaCommit, boolean validateRecordsForCommitTime) throws Exception {
     checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, 
expNumInstants, expNumInstants,
         expTimestamp, expROTimestamp, isDeltaCommit, 
Arrays.asList(maxInstant), validateRecordsForCommitTime);
   }
 
-  private void checkBootstrapResults(int totalRecords, Schema schema, String 
instant, boolean checkNumRawFiles,
+  private void checkBootstrapResults(int totalRecords, HoodieSchema schema, 
String instant, boolean checkNumRawFiles,
       int expNumInstants, int numVersions, long expTimestamp, long 
expROTimestamp, boolean isDeltaCommit,
       List<String> instantsWithValidRecords, boolean 
validateRecordsForCommitTime) throws Exception {
     metaClient.reloadActiveTimeline();
@@ -539,19 +540,19 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
         throw new HoodieIOException(e.getMessage(), e);
       }
       MessageType parquetSchema = 
reader.getFooter().getFileMetaData().getSchema();
-      Schema schema =  new AvroSchemaConverter().convert(parquetSchema);
+      HoodieSchema schema = HoodieSchema.fromAvroSchema(new 
AvroSchemaConverter().convert(parquetSchema));
       return generateInputBatch(jsc, partitionPaths, schema);
     }
   }
 
   private static JavaRDD<HoodieRecord> generateInputBatch(JavaSparkContext jsc,
-      List<Pair<String, List<HoodieFileStatus>>> partitionPaths, Schema 
writerSchema) {
+      List<Pair<String, List<HoodieFileStatus>>> partitionPaths, HoodieSchema 
writerSchema) {
     List<Pair<String, Path>> fullFilePathsWithPartition = 
partitionPaths.stream().flatMap(p -> p.getValue().stream()
         .map(x -> Pair.of(p.getKey(), 
HadoopFSUtils.toPath(x.getPath())))).collect(Collectors.toList());
     return jsc.parallelize(fullFilePathsWithPartition.stream().flatMap(p -> {
       try {
         Configuration conf = jsc.hadoopConfiguration();
-        AvroReadSupport.setAvroReadSchema(conf, writerSchema);
+        AvroReadSupport.setAvroReadSchema(conf, writerSchema.toAvroSchema());
         Iterator<GenericRecord> recIterator = new ParquetReaderIterator(
             
AvroParquetReader.<GenericRecord>builder(p.getValue()).withConf(conf).build());
         return 
StreamSupport.stream(Spliterators.spliteratorUnknownSize(recIterator, 0), 
false).map(gr -> {


Reply via email to