This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch phase-18-HoodieAvroUtils-removal-p4
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 483d0a62f7661530d11ebd924b508bc1f2feaa85
Author: voon <[email protected]>
AuthorDate: Sat Dec 27 19:06:51 2025 +0800

    feat(schema): phase 18 - Remove HoodieAvroUtils usage (Part 4)
---
 .../hudi/execution/CopyOnWriteInsertHandler.java   |  5 +-
 .../hudi/execution/HoodieLazyInsertIterable.java   | 13 +++--
 .../hudi/execution/ExplicitWriteHandler.java       |  3 +-
 .../hudi/execution/FlinkLazyInsertIterable.java    |  7 ++-
 .../run/strategy/JavaExecutionStrategy.java        | 11 ++--
 .../strategy/JavaSortAndSizeExecutionStrategy.java |  4 +-
 .../hudi/execution/JavaLazyInsertIterable.java     |  7 ++-
 .../JavaCustomColumnsSortPartitioner.java          |  7 ++-
 .../TestJavaBulkInsertInternalPartitioner.java     |  2 +-
 .../MultipleSparkJobExecutionStrategy.java         |  5 +-
 ...SparkJobConsistentHashingExecutionStrategy.java | 13 +++--
 ...onsistentBucketClusteringExecutionStrategy.java |  3 +-
 .../SparkSingleFileSortExecutionStrategy.java      |  5 +-
 .../SparkSortAndSizeExecutionStrategy.java         |  5 +-
 .../client/utils/SparkMetadataWriterUtils.java     |  8 ++-
 .../hudi/execution/SparkLazyInsertIterable.java    | 13 +++--
 .../bulkinsert/RDDBucketIndexPartitioner.java      | 11 ++--
 .../TestBoundedInMemoryExecutorInSpark.java        |  6 +--
 .../hudi/execution/TestBoundedInMemoryQueue.java   | 14 ++---
 .../hudi/execution/TestDisruptorMessageQueue.java  |  6 +--
 .../org/apache/hudi/common/util/SortUtils.java     |  8 ++-
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 11 ++--
 ...tOverwriteNonDefaultsWithLatestAvroPayload.java | 62 +++++++++++-----------
 .../org/apache/hudi/common/util/TestSortUtils.java |  3 +-
 .../common/util/collection/TestBitCaskDiskMap.java |  5 +-
 .../hudi/metadata/TestHoodieTableMetadataUtil.java |  8 +--
 .../realtime/TestHoodieRealtimeRecordReader.java   |  6 +--
 .../utilities/schema/KafkaOffsetPostProcessor.java | 32 ++++++-----
 .../schema/SchemaProviderWithPostProcessor.java    |  2 +
 .../DeleteSupportSchemaPostProcessor.java          | 25 +++++----
 .../DropColumnSchemaPostProcessor.java             | 21 +++++---
 .../add/AddPrimitiveColumnSchemaPostProcessor.java | 38 +++++++------
 32 files changed, 188 insertions(+), 181 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
index d51c259d7251..1a9183053501 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
@@ -21,7 +21,6 @@ package org.apache.hudi.execution;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.queue.HoodieConsumer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import 
org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenResult;
@@ -82,7 +81,7 @@ public class CopyOnWriteInsertHandler<T>
     String partitionPath = record.getPartitionPath();
     // just skip the ignored record,do not make partitions on fs
     try {
-      if (record.shouldIgnore(genResult.schema, config.getProps())) {
+      if (record.shouldIgnore(genResult.schema.toAvroSchema(), 
config.getProps())) {
         numSkippedRecords++;
         return;
       }
@@ -111,7 +110,7 @@ public class CopyOnWriteInsertHandler<T>
           record.getPartitionPath(), idPrefix, taskContextSupplier);
       handles.put(partitionPath, handle);
     }
-    handle.write(record, HoodieSchema.fromAvroSchema(genResult.schema), 
config.getProps());
+    handle.write(record, genResult.schema, config.getProps());
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
index 8225dff16ad6..8487a1f01ee2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
@@ -22,14 +22,13 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.LazyIterableIterator;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.io.WriteHandleFactory;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.util.ExecutorFactory;
 
-import org.apache.avro.Schema;
-
 import java.util.Iterator;
 import java.util.List;
 import java.util.function.Function;
@@ -76,9 +75,9 @@ public abstract class HoodieLazyInsertIterable<T>
   // Used for caching HoodieRecord along with insertValue. We need this to 
offload computation work to buffering thread.
   public static class HoodieInsertValueGenResult<R extends HoodieRecord> {
     private final R record;
-    public final Schema schema;
+    public final HoodieSchema schema;
 
-    public HoodieInsertValueGenResult(R record, Schema schema) {
+    public HoodieInsertValueGenResult(R record, HoodieSchema schema) {
       this.record = record;
       this.schema = schema;
     }
@@ -92,13 +91,13 @@ public abstract class HoodieLazyInsertIterable<T>
    * Transformer function to help transform a HoodieRecord. This transformer 
is used by BufferedIterator to offload some
    * expensive operations of transformation to the reader thread.
    */
-  public <T> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getTransformer(Schema schema,
+  public <T> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getTransformer(HoodieSchema schema,
                                                                                
                 HoodieWriteConfig writeConfig) {
     return getTransformerInternal(schema, writeConfig);
   }
 
-  public static <T> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(Schema schema,
-                                                                               
                                 HoodieWriteConfig writeConfig) {
+  public static <T> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(HoodieSchema 
schema,
+                                                                               
                                HoodieWriteConfig writeConfig) {
     // NOTE: Whether record have to be cloned here is determined based on the 
executor type used
     //       for writing: executors relying on an inner queue, will be keeping 
references to the records
     //       and therefore in the environments where underlying buffer holding 
the record could be
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java
index 7c5482a735ba..0b6f04002f42 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/ExplicitWriteHandler.java
@@ -20,7 +20,6 @@ package org.apache.hudi.execution;
 
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.queue.HoodieConsumer;
 import org.apache.hudi.io.HoodieWriteHandle;
 
@@ -46,7 +45,7 @@ public class ExplicitWriteHandler<T>
   @Override
   public void 
consume(HoodieLazyInsertIterable.HoodieInsertValueGenResult<HoodieRecord> 
genResult) {
     final HoodieRecord insertPayload = genResult.getResult();
-    handle.write(insertPayload, HoodieSchema.fromAvroSchema(genResult.schema), 
this.handle.getConfig().getProps());
+    handle.write(insertPayload, genResult.schema, 
this.handle.getConfig().getProps());
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
index 00f17b0d48c4..dbeb24c550ec 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
@@ -18,10 +18,11 @@
 
 package org.apache.hudi.execution;
 
-import org.apache.hudi.avro.AvroSchemaCache;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
 import org.apache.hudi.common.util.queue.HoodieExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -30,8 +31,6 @@ import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.util.ExecutorFactory;
 
-import org.apache.avro.Schema;
-
 import java.util.Iterator;
 import java.util.List;
 
@@ -60,7 +59,7 @@ public class FlinkLazyInsertIterable<T> extends 
HoodieLazyInsertIterable<T> {
     // Executor service used for launching writer thread.
     HoodieExecutor<List<WriteStatus>> executor = null;
     try {
-      Schema schema = AvroSchemaCache.intern(new 
Schema.Parser().parse(hoodieConfig.getSchema()));
+      HoodieSchema schema = 
HoodieSchemaCache.intern(HoodieSchema.parse(hoodieConfig.getSchema()));
       executor = ExecutorFactory.create(hoodieConfig, inputItr, 
getExplicitInsertHandler(),
           getTransformer(schema, hoodieConfig));
       final List<WriteStatus> result = executor.execute();
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
index c8b2b76a5952..e5ec83557c56 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java
@@ -19,7 +19,6 @@
 
 package org.apache.hudi.client.clustering.run.strategy;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.WriteStatus;
@@ -32,6 +31,8 @@ import org.apache.hudi.common.model.ClusteringOperation;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -97,7 +98,7 @@ public abstract class JavaExecutionStrategy<T>
    */
   public abstract List<WriteStatus> performClusteringWithRecordList(
       final List<HoodieRecord<T>> inputRecords, final int numOutputGroups, 
final String instantTime,
-      final Map<String, String> strategyParams, final Schema schema,
+      final Map<String, String> strategyParams, final HoodieSchema schema,
       final List<HoodieFileGroupId> fileGroupIdList, final boolean 
preserveHoodieMetadata);
 
   /**
@@ -107,11 +108,11 @@ public abstract class JavaExecutionStrategy<T>
    * @param schema         Schema of the data including metadata fields.
    * @return partitioner for the java engine
    */
-  protected BulkInsertPartitioner<List<HoodieRecord<T>>> 
getPartitioner(Map<String, String> strategyParams, Schema schema) {
+  protected BulkInsertPartitioner<List<HoodieRecord<T>>> 
getPartitioner(Map<String, String> strategyParams, HoodieSchema schema) {
     if (strategyParams.containsKey(PLAN_STRATEGY_SORT_COLUMNS.key())) {
       return new JavaCustomColumnsSortPartitioner(
           strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()).split(","),
-          HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());
+          HoodieSchemaUtils.addMetadataFields(schema), getWriteConfig());
     } else {
       return 
JavaBulkInsertInternalPartitionerFactory.get(getWriteConfig().getBulkInsertSortMode());
     }
@@ -124,7 +125,7 @@ public abstract class JavaExecutionStrategy<T>
       HoodieClusteringGroup clusteringGroup, Map<String, String> 
strategyParams,
       boolean preserveHoodieMetadata, String instantTime) {
     List<HoodieRecord<T>> inputRecords = readRecordsForGroup(clusteringGroup, 
instantTime);
-    Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(getWriteConfig().getSchema()));
+    HoodieSchema readerSchema = 
HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(getWriteConfig().getSchema()));
     List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
         .map(info -> new HoodieFileGroupId(info.getPartitionPath(), 
info.getFileId()))
         .collect(Collectors.toList());
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
index ffc815fd48f2..22d2b4eb53a6 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaSortAndSizeExecutionStrategy.java
@@ -25,13 +25,13 @@ import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.CreateHandleFactory;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.JavaBulkInsertHelper;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 
 import java.util.List;
 import java.util.Map;
@@ -54,7 +54,7 @@ public class JavaSortAndSizeExecutionStrategy<T>
   @Override
   public List<WriteStatus> performClusteringWithRecordList(
       final List<HoodieRecord<T>> inputRecords, final int numOutputGroups,
-      final String instantTime, final Map<String, String> strategyParams, 
final Schema schema,
+      final String instantTime, final Map<String, String> strategyParams, 
final HoodieSchema schema,
       final List<HoodieFileGroupId> fileGroupIdList, final boolean 
preserveHoodieMetadata) {
     log.info("Starting clustering for a group, parallelism: {} commit: {}", 
numOutputGroups, instantTime);
 
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
index 0a88fd62f71a..f00625ef3cd5 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/JavaLazyInsertIterable.java
@@ -18,10 +18,11 @@
 
 package org.apache.hudi.execution;
 
-import org.apache.hudi.avro.AvroSchemaCache;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
 import org.apache.hudi.common.util.queue.HoodieExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -29,8 +30,6 @@ import org.apache.hudi.io.WriteHandleFactory;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.util.ExecutorFactory;
 
-import org.apache.avro.Schema;
-
 import java.util.Iterator;
 import java.util.List;
 
@@ -64,7 +63,7 @@ public class JavaLazyInsertIterable<T> extends 
HoodieLazyInsertIterable<T> {
     HoodieExecutor<List<WriteStatus>> executor =
         null;
     try {
-      final Schema schema = AvroSchemaCache.intern(new 
Schema.Parser().parse(hoodieConfig.getSchema()));
+      final HoodieSchema schema = 
HoodieSchemaCache.intern(HoodieSchema.parse(hoodieConfig.getSchema()));
       executor = ExecutorFactory.create(hoodieConfig, inputItr, 
getInsertHandler(), getTransformer(schema, hoodieConfig));
       final List<WriteStatus> result = executor.execute();
       checkState(result != null && !result.isEmpty());
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
index 54f0c94ff069..1faf0ad45d75 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/execution/bulkinsert/JavaCustomColumnsSortPartitioner.java
@@ -20,13 +20,12 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.SortUtils;
 import org.apache.hudi.common.util.collection.FlatLists;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
-import org.apache.avro.Schema;
-
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -41,11 +40,11 @@ public class JavaCustomColumnsSortPartitioner<T>
     implements BulkInsertPartitioner<List<HoodieRecord<T>>> {
 
   private final String[] sortColumnNames;
-  private final Schema schema;
+  private final HoodieSchema schema;
   private final boolean consistentLogicalTimestampEnabled;
   private final boolean suffixRecordKey;
 
-  public JavaCustomColumnsSortPartitioner(String[] columnNames, Schema schema, 
HoodieWriteConfig config) {
+  public JavaCustomColumnsSortPartitioner(String[] columnNames, HoodieSchema 
schema, HoodieWriteConfig config) {
     this.sortColumnNames = columnNames;
     this.schema = schema;
     this.consistentLogicalTimestampEnabled = 
config.isConsistentLogicalTimestampEnabled();
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
index ff8909b85ddc..2dff037f4472 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestJavaBulkInsertInternalPartitioner.java
@@ -70,7 +70,7 @@ public class TestJavaBulkInsertInternalPartitioner extends 
HoodieJavaClientTestH
     cfg.setValue(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME, 
"partition_path");
     
cfg.setValue(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED,
 "false");
     testBulkInsertInternalPartitioner(
-        new JavaCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA, cfg),
+        new JavaCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.HOODIE_SCHEMA, cfg),
         records, true, generatePartitionNumRecords(records), 
Option.of(columnComparator));
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 69d50c1367ca..928f0c3abee3 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client.clustering.run.strategy;
 
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.SparkAdapterSupport$;
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.SparkTaskContextSupplier;
@@ -172,7 +171,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
                                                                           
final int numOutputGroups,
                                                                           
final String instantTime,
                                                                           
final Map<String, String> strategyParams,
-                                                                          
final Schema schema,
+                                                                          
final HoodieSchema schema,
                                                                           
final List<HoodieFileGroupId> fileGroupIdList,
                                                                           
final boolean shouldPreserveHoodieMetadata,
                                                                           
final Map<String, String> extraMetadata);
@@ -230,7 +229,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
     return CompletableFuture.supplyAsync(() -> {
       JavaSparkContext jsc = 
HoodieSparkEngineContext.getSparkContext(getEngineContext());
       HoodieData<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, 
clusteringGroup, instantTime);
-      Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(getWriteConfig().getSchema()));
+      HoodieSchema readerSchema = 
HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(getWriteConfig().getSchema()));
       // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
       //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
       //       it since these records will be shuffled later.
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
index c753d83d3462..173cd5e477f3 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.client.clustering.run.strategy;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.LazyConcatenatingIterator;
@@ -33,6 +32,7 @@ import org.apache.hudi.common.model.ConsistentHashingNode;
 import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -50,7 +50,6 @@ import 
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucket
 import org.apache.hudi.util.ExecutorFactory;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -72,13 +71,13 @@ import java.util.function.Supplier;
 public class SingleSparkJobConsistentHashingExecutionStrategy<T> extends 
SingleSparkJobExecutionStrategy<T> {
 
   private final String indexKeyFields;
-  private final Schema readerSchema;
+  private final HoodieSchema readerSchema;
 
   public SingleSparkJobConsistentHashingExecutionStrategy(HoodieTable table, 
HoodieEngineContext engineContext,
                                                           HoodieWriteConfig 
writeConfig) {
     super(table, engineContext, writeConfig);
     this.indexKeyFields = table.getConfig().getBucketIndexHashField();
-    this.readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(writeConfig.getSchema()));
+    this.readerSchema = 
HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(writeConfig.getSchema()));
   }
 
   @Override
@@ -148,10 +147,10 @@ public class 
SingleSparkJobConsistentHashingExecutionStrategy<T> extends SingleS
     private final boolean recordsSorted;
     private final Map<String/*fileIdPrefix*/, HoodieWriteHandle> writeHandles;
     private final Function<HoodieRecord, String> fileIdPrefixExtractor;
-    private final Schema schema;
+    private final HoodieSchema schema;
 
     public InsertHandler(HoodieWriteConfig config, String instantTime, 
HoodieTable hoodieTable, TaskContextSupplier taskContextSupplier,
-                         WriteHandleFactory writeHandleFactory, boolean 
recordsSorted, Function<HoodieRecord, String> fileIdPrefixExtractor, Schema 
schema) {
+                         WriteHandleFactory writeHandleFactory, boolean 
recordsSorted, Function<HoodieRecord, String> fileIdPrefixExtractor, 
HoodieSchema schema) {
       this.config = config;
       this.instantTime = instantTime;
       this.hoodieTable = hoodieTable;
@@ -176,7 +175,7 @@ public class 
SingleSparkJobConsistentHashingExecutionStrategy<T> extends SingleS
         handle = writeHandleFactory.create(config, instantTime, hoodieTable, 
record.getPartitionPath(), fileIdPrefix, taskContextSupplier);
         writeHandles.put(fileIdPrefix, handle);
       }
-      handle.write(record, HoodieSchema.fromAvroSchema(schema), 
config.getProps());
+      handle.write(record, schema, config.getProps());
     }
 
     @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
index 3b63e84bda07..379f2cb4b858 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
@@ -37,7 +37,6 @@ import 
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucket
 import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -84,7 +83,7 @@ public class 
SparkConsistentBucketClusteringExecutionStrategy<T extends HoodieRe
 
   @Override
   public HoodieData<WriteStatus> 
performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> inputRecords, int 
numOutputGroups, String instantTime,
-                                                                 Map<String, 
String> strategyParams, Schema schema, List<HoodieFileGroupId> fileGroupIdList,
+                                                                 Map<String, 
String> strategyParams, HoodieSchema schema, List<HoodieFileGroupId> 
fileGroupIdList,
                                                                  boolean 
preserveHoodieMetadata, Map<String, String> extraMetadata) {
 
     log.info("Starting clustering for a group, parallelism:{} commit:{}", 
numOutputGroups, instantTime);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
index 6f201e5b4c70..ea778ce4fc90 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
@@ -35,7 +35,6 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -89,7 +88,7 @@ public class SparkSingleFileSortExecutionStrategy<T>
                                                                  int 
numOutputGroups,
                                                                  String 
instantTime,
                                                                  Map<String, 
String> strategyParams,
-                                                                 Schema schema,
+                                                                 HoodieSchema 
schema,
                                                                  
List<HoodieFileGroupId> fileGroupIdList,
                                                                  boolean 
shouldPreserveHoodieMetadata,
                                                                  Map<String, 
String> extraMetadata) {
@@ -105,7 +104,7 @@ public class SparkSingleFileSortExecutionStrategy<T>
     newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, 
String.valueOf(Long.MAX_VALUE));
 
     return (HoodieData<WriteStatus>) 
SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, 
getHoodieTable(), newConfig,
-        false, getRDDPartitioner(strategyParams, 
HoodieSchema.fromAvroSchema(schema)), true, numOutputGroups,
+        false, getRDDPartitioner(strategyParams, schema), true, 
numOutputGroups,
         new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), 
shouldPreserveHoodieMetadata));
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
index d66691d77fdd..05d9b23a8934 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
@@ -33,7 +33,6 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -82,7 +81,7 @@ public class SparkSortAndSizeExecutionStrategy<T>
                                                                  final int 
numOutputGroups,
                                                                  final String 
instantTime,
                                                                  final 
Map<String, String> strategyParams,
-                                                                 final Schema 
schema,
+                                                                 final 
HoodieSchema schema,
                                                                  final 
List<HoodieFileGroupId> fileGroupIdList,
                                                                  final boolean 
shouldPreserveHoodieMetadata,
                                                                  final 
Map<String, String> extraMetadata) {
@@ -95,6 +94,6 @@ public class SparkSortAndSizeExecutionStrategy<T>
     newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, 
String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
 
     return (HoodieData<WriteStatus>) 
SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, 
getHoodieTable(),
-        newConfig, false, getRDDPartitioner(strategyParams, 
HoodieSchema.fromAvroSchema(schema)), true, numOutputGroups, new 
CreateHandleFactory(shouldPreserveHoodieMetadata));
+        newConfig, false, getRDDPartitioner(strategyParams, schema), true, 
numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata));
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 9b1f36908b59..6cfec6eff675 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -72,7 +72,6 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.JavaScalaConverters;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.spark.api.java.function.FlatMapGroupsFunction;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Column;
@@ -104,7 +103,6 @@ import java.util.stream.Stream;
 
 import scala.collection.immutable.Seq;
 
-import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -385,14 +383,14 @@ public class SparkMetadataWriterUtils {
     HoodieIndexDefinition indexDefinition = 
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition, 
dataMetaClient);
     List<String> columnsToIndex = 
Collections.singletonList(indexDefinition.getSourceFields().get(0));
     try {
-      Option<Schema> writerSchema =
+      Option<HoodieSchema> writerSchema =
           
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
               .flatMap(writerSchemaStr ->
                   isNullOrEmpty(writerSchemaStr)
                       ? Option.empty()
-                      : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+                      : Option.of(HoodieSchema.parse(writerSchemaStr)));
       HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
-      HoodieSchema tableSchema = writerSchema.map(schema -> 
tableConfig.populateMetaFields() ? addMetadataFields(schema) : 
schema).map(HoodieSchema::fromAvroSchema)
+      HoodieSchema tableSchema = writerSchema.map(schema -> 
tableConfig.populateMetaFields() ? HoodieSchemaUtils.addMetadataFields(schema) 
: schema)
           .orElseThrow(() -> new IllegalStateException(String.format("Expected 
writer schema in commit metadata %s", commitMetadata)));
       List<Pair<String, HoodieSchema>> columnsToIndexSchemaMap = 
columnsToIndex.stream()
           .map(columnToIndex -> HoodieSchemaUtils.getNestedField(tableSchema, 
columnToIndex))
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index 45e29a18110d..6c2c34683dff 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -18,11 +18,12 @@
 
 package org.apache.hudi.execution;
 
-import org.apache.hudi.avro.AvroSchemaCache;
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.queue.HoodieExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -30,8 +31,6 @@ import org.apache.hudi.io.WriteHandleFactory;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.util.ExecutorFactory;
 
-import org.apache.avro.Schema;
-
 import java.util.Iterator;
 import java.util.List;
 
@@ -69,11 +68,11 @@ public class SparkLazyInsertIterable<T> extends 
HoodieLazyInsertIterable<T> {
     HoodieExecutor<List<WriteStatus>> bufferedIteratorExecutor = null;
     try {
       // config.getSchema is not canonicalized, while config.getWriteSchema is 
canonicalized. So, we have to use the canonicalized schema to read the existing 
data.
-      Schema schema = new Schema.Parser().parse(hoodieConfig.getWriteSchema());
+      HoodieSchema schema = HoodieSchema.parse(hoodieConfig.getWriteSchema());
       if (useWriterSchema) {
-        schema = HoodieAvroUtils.addMetadataFields(schema);
+        schema = HoodieSchemaUtils.addMetadataFields(schema);
       }
-      schema = AvroSchemaCache.intern(schema);
+      schema = HoodieSchemaCache.intern(schema);
 
       bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, 
inputItr, getInsertHandler(),
           getTransformer(schema, hoodieConfig), 
hoodieTable.getPreExecuteRunnable());
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
index fdbfae12ce32..92404d259d0c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
@@ -19,15 +19,14 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.SparkAdapterSupport$;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.collection.FlatLists;
 import org.apache.hudi.table.BucketIndexBulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.avro.Schema;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.spark.Partitioner;
@@ -87,12 +86,12 @@ public abstract class RDDBucketIndexPartitioner<T> extends 
BucketIndexBulkInsert
    */
   private JavaRDD<HoodieRecord<T>> 
doPartitionAndCustomColumnSort(JavaRDD<HoodieRecord<T>> records, Partitioner 
partitioner) {
     final String[] sortColumns = sortColumnNames;
-    final SerializableSchema schema = new 
SerializableSchema(HoodieAvroUtils.addMetadataFields((new 
Schema.Parser().parse(table.getConfig().getSchema()))));
+    final HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(table.getConfig().getSchema()));
     Comparator<HoodieRecord<T>> comparator = (Comparator<HoodieRecord<T>> & 
Serializable) (t1, t2) -> {
       FlatLists.ComparableList obj1 = 
FlatLists.ofComparableArray(utf8StringFactory.wrapArrayOfObjects(
-          t1.getColumnValues(schema.get(), sortColumns, 
consistentLogicalTimestampEnabled)));
+          t1.getColumnValues(schema.toAvroSchema(), sortColumns, 
consistentLogicalTimestampEnabled)));
       FlatLists.ComparableList obj2 = 
FlatLists.ofComparableArray(utf8StringFactory.wrapArrayOfObjects(
-          t2.getColumnValues(schema.get(), sortColumns, 
consistentLogicalTimestampEnabled)));
+          t2.getColumnValues(schema.toAvroSchema(), sortColumns, 
consistentLogicalTimestampEnabled)));
       return obj1.compareTo(obj2);
     };
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
index 9035c10a83fe..deef0929bced 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryExecutorInSpark.java
@@ -96,7 +96,7 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieSparkClientTestHar
     BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> executor = null;
     try {
       executor = new 
BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), 
hoodieRecords.iterator(), consumer,
-          getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, 
writeConfig), getPreExecuteRunnable());
+          getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, 
writeConfig), getPreExecuteRunnable());
       int result = executor.execute();
 
       assertEquals(100, result);
@@ -136,7 +136,7 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieSparkClientTestHar
 
     BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> executor =
         new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), 
hoodieRecords.iterator(), consumer,
-            getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, 
writeConfig), getPreExecuteRunnable());
+            getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, 
writeConfig), getPreExecuteRunnable());
 
     // Interrupt the current thread (therefore triggering executor to throw as 
soon as it
     // invokes [[get]] on the [[CompletableFuture]])
@@ -179,7 +179,7 @@ public class TestBoundedInMemoryExecutorInSpark extends 
HoodieSparkClientTestHar
 
     BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> executor =
         new BoundedInMemoryExecutor(writeConfig.getWriteBufferLimitBytes(), 
unboundedRecordIter,
-            consumer, 
getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig),
+            consumer, 
getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig),
             getPreExecuteRunnable());
     executor.shutdownNow();
     boolean terminatedGracefully = executor.awaitTermination();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
index ab1535140b4b..7f098eea5f59 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestBoundedInMemoryQueue.java
@@ -92,7 +92,7 @@ public class TestBoundedInMemoryQueue extends 
HoodieSparkClientTestHarness {
     final int numRecords = 128;
     final List<HoodieRecord> hoodieRecords = 
dataGen.generateInserts(instantTime, numRecords);
     final BoundedInMemoryQueue<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
-        new BoundedInMemoryQueue(FileIOUtils.KB, 
getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig));
+        new BoundedInMemoryQueue(FileIOUtils.KB, 
getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig));
     // Produce
     Future<Boolean> resFuture = executorService.submit(() -> {
       new 
IteratorBasedQueueProducer<>(hoodieRecords.iterator()).produce(queue);
@@ -130,7 +130,7 @@ public class TestBoundedInMemoryQueue extends 
HoodieSparkClientTestHarness {
     final List<List<HoodieRecord>> recs = new ArrayList<>();
 
     final BoundedInMemoryQueue<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
-        new BoundedInMemoryQueue(FileIOUtils.KB, 
getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig));
+        new BoundedInMemoryQueue(FileIOUtils.KB, 
getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig));
 
     // Record Key to <Producer Index, Rec Index within a producer>
     Map<String, Tuple2<Integer, Integer>> keyToProducerAndIndexMap = new 
HashMap<>();
@@ -225,11 +225,11 @@ public class TestBoundedInMemoryQueue extends 
HoodieSparkClientTestHarness {
     final int recordLimit = 5;
     final SizeEstimator<HoodieLazyInsertIterable.HoodieInsertValueGenResult> 
sizeEstimator = new DefaultSizeEstimator<>();
     HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult =
-        getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, 
writeConfig).apply(hoodieRecords.get(0));
+        getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, 
writeConfig).apply(hoodieRecords.get(0));
     final long objSize = sizeEstimator.sizeEstimate(genResult);
     final long memoryLimitInBytes = recordLimit * objSize;
     final BoundedInMemoryQueue<HoodieRecord, 
HoodieLazyInsertIterable.HoodieInsertValueGenResult> queue =
-        new BoundedInMemoryQueue(memoryLimitInBytes, 
getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig));
+        new BoundedInMemoryQueue(memoryLimitInBytes, 
getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig));
 
     // Produce
     executorService.submit(() -> {
@@ -274,7 +274,7 @@ public class TestBoundedInMemoryQueue extends 
HoodieSparkClientTestHarness {
     final SizeEstimator<Tuple2<HoodieRecord, Option<IndexedRecord>>> 
sizeEstimator = new DefaultSizeEstimator<>();
     // queue memory limit
     HoodieLazyInsertIterable.HoodieInsertValueGenResult genResult =
-        getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, 
writeConfig).apply(hoodieRecords.get(0));
+        getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, 
writeConfig).apply(hoodieRecords.get(0));
     final long objSize = sizeEstimator.sizeEstimate(new 
Tuple2(genResult.getResult(), 
genResult.getResult().toIndexedRecord(HoodieTestDataGenerator.AVRO_SCHEMA, new 
Properties())));
     final long memoryLimitInBytes = 4 * objSize;
 
@@ -282,7 +282,7 @@ public class TestBoundedInMemoryQueue extends 
HoodieSparkClientTestHarness {
     // stops and throws
     // correct exception back.
     BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>> queue1 =
-        new BoundedInMemoryQueue(memoryLimitInBytes, 
getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig));
+        new BoundedInMemoryQueue(memoryLimitInBytes, 
getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig));
 
     // Produce
     Future<Boolean> resFuture = executorService.submit(() -> {
@@ -310,7 +310,7 @@ public class TestBoundedInMemoryQueue extends 
HoodieSparkClientTestHarness {
     when(mockHoodieRecordsIterator.hasNext()).thenReturn(true);
     when(mockHoodieRecordsIterator.next()).thenThrow(expectedException);
     BoundedInMemoryQueue<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>> queue2 =
-        new BoundedInMemoryQueue(memoryLimitInBytes, 
getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig));
+        new BoundedInMemoryQueue(memoryLimitInBytes, 
getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig));
 
     // Produce
     Future<Boolean> res = executorService.submit(() -> {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
index d796ffde358a..30a9ab3f63b9 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/TestDisruptorMessageQueue.java
@@ -128,7 +128,7 @@ public class TestDisruptorMessageQueue extends 
HoodieSparkClientTestHarness {
 
     try {
       exec = new 
DisruptorExecutor(writeConfig.getWriteExecutorDisruptorWriteBufferLimitBytes(), 
hoodieRecords.iterator(), consumer,
-          getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, 
writeConfig), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
+          getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, 
writeConfig), WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
       int result = exec.execute();
       // It should buffer and write 100 records
       assertEquals(100, result);
@@ -157,7 +157,7 @@ public class TestDisruptorMessageQueue extends 
HoodieSparkClientTestHarness {
     final List<List<HoodieRecord>> recs = new ArrayList<>();
 
     final DisruptorMessageQueue<HoodieRecord, HoodieInsertValueGenResult> 
queue =
-        new DisruptorMessageQueue(1024, 
getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig),
+        new DisruptorMessageQueue(1024, 
getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig),
             "BLOCKING_WAIT", numProducers, new Runnable() {
               @Override
           public void run() {
@@ -305,7 +305,7 @@ public class TestDisruptorMessageQueue extends 
HoodieSparkClientTestHarness {
         };
 
     DisruptorExecutor<HoodieRecord, Tuple2<HoodieRecord, 
Option<IndexedRecord>>, Integer> exec = new DisruptorExecutor(1024,
-        producers, consumer, 
getTransformerInternal(HoodieTestDataGenerator.AVRO_SCHEMA, writeConfig),
+        producers, consumer, 
getTransformerInternal(HoodieTestDataGenerator.HOODIE_SCHEMA, writeConfig),
         WaitStrategyFactory.DEFAULT_STRATEGY, getPreExecuteRunnable());
 
     final Throwable thrown = assertThrows(HoodieException.class, exec::execute,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
index c043f7a2fef0..23c9595d6f31 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SortUtils.java
@@ -23,8 +23,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.collection.FlatLists;
 
-import org.apache.avro.Schema;
-
 import java.util.function.Function;
 
 /**
@@ -58,12 +56,12 @@ public class SortUtils {
   public static FlatLists.ComparableList<Comparable<HoodieRecord>> 
getComparableSortColumns(
       HoodieRecord record,
       String[] sortColumnNames,
-      Schema schema,
+      HoodieSchema schema,
       boolean suffixRecordKey,
       boolean consistentLogicalTimestampEnabled
   ) {
     if (record.getRecordType() == HoodieRecord.HoodieRecordType.SPARK) {
-      Object[] columnValues = record.getColumnValues(schema, sortColumnNames, 
consistentLogicalTimestampEnabled);
+      Object[] columnValues = record.getColumnValues(schema.toAvroSchema(), 
sortColumnNames, consistentLogicalTimestampEnabled);
       if (suffixRecordKey) {
         return FlatLists.ofComparableArray(
             prependPartitionPathAndSuffixRecordKey(record.getPartitionPath(), 
record.getRecordKey(), columnValues));
@@ -72,7 +70,7 @@ public class SortUtils {
     } else if (record.getRecordType() == HoodieRecord.HoodieRecordType.AVRO) {
       return FlatLists.ofComparableArray(
           HoodieAvroUtils.getSortColumnValuesWithPartitionPathAndRecordKey(
-              record, sortColumnNames, schema, suffixRecordKey, 
consistentLogicalTimestampEnabled
+              record, sortColumnNames, schema.toAvroSchema(), suffixRecordKey, 
consistentLogicalTimestampEnabled
           ));
     }
     throw new IllegalArgumentException("Invalid recordType" + 
record.getRecordType());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 7fa98d0c6ebb..78c385db925e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -162,8 +162,6 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.toList;
-import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
-import static org.apache.hudi.avro.HoodieAvroUtils.projectSchema;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION;
@@ -1597,20 +1595,19 @@ public class HoodieTableMetadataUtil {
 
   public static Map<String, HoodieSchema> 
getColumnsToIndex(HoodieCommitMetadata commitMetadata, HoodieTableMetaClient 
dataMetaClient,
                                                HoodieMetadataConfig 
metadataConfig, Option<HoodieRecordType> recordTypeOpt) {
-    Option<Schema> writerSchema =
+    Option<HoodieSchema> writerSchema =
         
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
             .flatMap(writerSchemaStr ->
                 isNullOrEmpty(writerSchemaStr)
                     ? Option.empty()
-                    : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+                    : Option.of(HoodieSchema.parse(writerSchemaStr)));
 
     HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
 
     // NOTE: Writer schema added to commit metadata will not contain Hudi's 
metadata fields
     Option<HoodieSchema> tableSchema = writerSchema.isEmpty()
         ? tableConfig.getTableCreateSchema().map(HoodieSchema::fromAvroSchema) 
// the write schema does not set up correctly
-        : writerSchema.map(schema -> tableConfig.populateMetaFields() ? 
addMetadataFields(schema) : schema)
-            .map(HoodieSchema::fromAvroSchema);
+        : writerSchema.map(schema -> tableConfig.populateMetaFields() ? 
HoodieSchemaUtils.addMetadataFields(schema) : schema);
 
     HoodieIndexVersion indexVersion = 
existingIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS, dataMetaClient);
     return getColumnsToIndex(tableConfig, metadataConfig,
@@ -2585,7 +2582,7 @@ public class HoodieTableMetadataUtil {
     List<String> mergedFields = new ArrayList<>(partitionFields.size() + 
sourceFields.size());
     mergedFields.addAll(partitionFields);
     mergedFields.addAll(sourceFields);
-    return 
HoodieSchema.fromAvroSchema(addMetadataFields(projectSchema(tableSchema.toAvroSchema(),
 mergedFields)));
+    return 
HoodieSchemaUtils.addMetadataFields(HoodieSchemaUtils.projectSchema(tableSchema,
 mergedFields));
   }
 
   public static StoragePath filePath(StoragePath basePath, String partition, 
String filename) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
index 97b0ac5c108c..88cb718326ec 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteNonDefaultsWithLatestAvroPayload.java
@@ -18,7 +18,10 @@
 
 package org.apache.hudi.common.model;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 
 import org.apache.avro.JsonProperties;
 import org.apache.avro.Schema;
@@ -40,25 +43,24 @@ import static 
org.junit.jupiter.api.Assertions.assertNotSame;
  * Unit tests {@link TestOverwriteNonDefaultsWithLatestAvroPayload}.
  */
 public class TestOverwriteNonDefaultsWithLatestAvroPayload {
-  private Schema schema;
+  private HoodieSchema schema;
 
   @BeforeEach
   public void setUp() throws Exception {
-    schema = Schema.createRecord(Arrays.asList(
-            new Schema.Field("id", Schema.create(Schema.Type.STRING), "", 
null),
-            new Schema.Field("partition", Schema.create(Schema.Type.STRING), 
"", ""),
-            new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null),
-            new Schema.Field("_hoodie_is_deleted", 
Schema.create(Schema.Type.BOOLEAN), "", false),
-            new Schema.Field("city", Schema.create(Schema.Type.STRING), "", 
"NY"),
-            new Schema.Field("child", 
Schema.createArray(Schema.create(Schema.Type.STRING)), "", 
Collections.emptyList())
-            ));
+    schema = HoodieSchema.createRecord("TestSchema", null, null, false, 
Arrays.asList(
+        HoodieSchemaField.of("id", 
HoodieSchema.create(HoodieSchemaType.STRING), "", null),
+        HoodieSchemaField.of("partition", 
HoodieSchema.create(HoodieSchemaType.STRING), "", ""),
+        HoodieSchemaField.of("ts", HoodieSchema.create(HoodieSchemaType.LONG), 
"", null),
+        HoodieSchemaField.of("_hoodie_is_deleted", 
HoodieSchema.create(HoodieSchemaType.BOOLEAN), "", false),
+        HoodieSchemaField.of("city", 
HoodieSchema.create(HoodieSchemaType.STRING), "", "NY"),
+        HoodieSchemaField.of("child", 
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING)), "", 
Collections.emptyList())));
   }
 
   @Test
   public void testActiveRecords() throws IOException {
-    Schema writerSchema = HoodieAvroUtils.addMetadataFields(schema);
+    HoodieSchema writerSchema = HoodieSchemaUtils.addMetadataFields(schema);
 
-    GenericRecord record1 = new GenericData.Record(schema);
+    GenericRecord record1 = new GenericData.Record(schema.toAvroSchema());
     record1.put("id", "1");
     record1.put("partition", "partition1");
     record1.put("ts", 0L);
@@ -66,7 +68,7 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload {
     record1.put("city", "NY0");
     record1.put("child", Collections.singletonList("A"));
 
-    GenericRecord record2 = new GenericData.Record(schema);
+    GenericRecord record2 = new GenericData.Record(schema.toAvroSchema());
     record2.put("id", "2");
     record2.put("partition", "");
     record2.put("ts", 1L);
@@ -74,7 +76,7 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload {
     record2.put("city", "NY");
     record2.put("child", Collections.emptyList());
 
-    GenericRecord record3 = new GenericData.Record(schema);
+    GenericRecord record3 = new GenericData.Record(schema.toAvroSchema());
     record3.put("id", "2");
     record3.put("partition", "partition1");
     record3.put("ts", 1L);
@@ -115,14 +117,14 @@ public class 
TestOverwriteNonDefaultsWithLatestAvroPayload {
     assertEquals(payload1.preCombine(payload2), payload2);
     assertEquals(payload2.preCombine(payload1), payload2);
 
-    assertEquals(record1, payload1.getInsertValue(schema).get());
-    assertEquals(record2, payload2.getInsertValue(schema).get());
+    assertEquals(record1, 
payload1.getInsertValue(schema.toAvroSchema()).get());
+    assertEquals(record2, 
payload2.getInsertValue(schema.toAvroSchema()).get());
 
-    IndexedRecord combinedVal1 = payload1.combineAndGetUpdateValue(record2, 
schema).get();
+    IndexedRecord combinedVal1 = payload1.combineAndGetUpdateValue(record2, 
schema.toAvroSchema()).get();
     assertEquals(combinedVal1, record1);
     assertNotSame(combinedVal1, record1);
 
-    IndexedRecord combinedVal2 = payload2.combineAndGetUpdateValue(record1, 
schema).get();
+    IndexedRecord combinedVal2 = payload2.combineAndGetUpdateValue(record1, 
schema.toAvroSchema()).get();
     assertEquals(combinedVal2, record3);
     assertNotSame(combinedVal2, record3);
 
@@ -130,19 +132,19 @@ public class 
TestOverwriteNonDefaultsWithLatestAvroPayload {
     // the payload record could include the metadata fields (for compaction) 
or not (for normal writer path).
 
     // case1: validate normal writer path
-    IndexedRecord combinedVal3 = payload2.combineAndGetUpdateValue(record4, 
schema).get();
+    IndexedRecord combinedVal3 = payload2.combineAndGetUpdateValue(record4, 
schema.toAvroSchema()).get();
     assertEquals(combinedVal3, record3);
     assertNotSame(combinedVal3, record3);
 
     // case2: validate compaction path
-    IndexedRecord combinedVal4 = payload5.combineAndGetUpdateValue(record4, 
writerSchema).get();
+    IndexedRecord combinedVal4 = payload5.combineAndGetUpdateValue(record4, 
writerSchema.toAvroSchema()).get();
     assertEquals(combinedVal4, record6);
     assertNotSame(combinedVal4, record6);
   }
 
   @Test
   public void testDeletedRecord() throws IOException {
-    GenericRecord record1 = new GenericData.Record(schema);
+    GenericRecord record1 = new GenericData.Record(schema.toAvroSchema());
     record1.put("id", "1");
     record1.put("partition", "partition0");
     record1.put("ts", 0L);
@@ -150,7 +152,7 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload {
     record1.put("city", "NY0");
     record1.put("child", Collections.emptyList());
 
-    GenericRecord delRecord1 = new GenericData.Record(schema);
+    GenericRecord delRecord1 = new GenericData.Record(schema.toAvroSchema());
     delRecord1.put("id", "2");
     delRecord1.put("partition", "partition1");
     delRecord1.put("ts", 1L);
@@ -158,7 +160,7 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload {
     delRecord1.put("city", "NY0");
     delRecord1.put("child", Collections.emptyList());
 
-    GenericRecord record2 = new GenericData.Record(schema);
+    GenericRecord record2 = new GenericData.Record(schema.toAvroSchema());
     record2.put("id", "1");
     record2.put("partition", "partition0");
     record2.put("ts", 0L);
@@ -172,13 +174,13 @@ public class 
TestOverwriteNonDefaultsWithLatestAvroPayload {
     assertEquals(payload1.preCombine(payload2), payload2);
     assertEquals(payload2.preCombine(payload1), payload2);
 
-    assertEquals(record1, payload1.getInsertValue(schema).get());
-    assertFalse(payload2.getInsertValue(schema).isPresent());
+    assertEquals(record1, 
payload1.getInsertValue(schema.toAvroSchema()).get());
+    assertFalse(payload2.getInsertValue(schema.toAvroSchema()).isPresent());
 
-    assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(), 
record2);
-    assertFalse(payload2.combineAndGetUpdateValue(record1, 
schema).isPresent());
+    assertEquals(payload1.combineAndGetUpdateValue(delRecord1, 
schema.toAvroSchema()).get(), record2);
+    assertFalse(payload2.combineAndGetUpdateValue(record1, 
schema.toAvroSchema()).isPresent());
   }
-  
+
   @Test
   public void testNullColumn() throws IOException {
     Schema avroSchema = Schema.createRecord(Arrays.asList(
@@ -209,8 +211,8 @@ public class TestOverwriteNonDefaultsWithLatestAvroPayload {
     assertEquals(payload2.combineAndGetUpdateValue(record1, avroSchema).get(), 
record3);
   }
 
-  private static GenericRecord createRecordWithMetadataFields(Schema schema, 
String recordKey, String partitionPath) {
-    GenericRecord record = new GenericData.Record(schema);
+  private static GenericRecord createRecordWithMetadataFields(HoodieSchema 
schema, String recordKey, String partitionPath) {
+    GenericRecord record = new GenericData.Record(schema.toAvroSchema());
     record.put(HoodieRecord.COMMIT_TIME_METADATA_FIELD, "001");
     record.put(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, "123");
     record.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recordKey);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java
index 38df25837a62..233380f3ce2c 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSortUtils.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.RewriteAvroPayload;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.collection.FlatLists;
 
 import org.apache.avro.Schema;
@@ -77,7 +78,7 @@ public class TestSortUtils {
       record = new TestSparkRecord(new HoodieKey("record1", "partition1"), 
payload);
     }
     String[] userSortColumns = new String[] {"non_pii_col", "timestamp"};
-    FlatLists.ComparableList<Comparable<HoodieRecord>> comparableList = 
SortUtils.getComparableSortColumns(record, userSortColumns, 
Schema.parse(SCHEMA), suffixRecordKey, true);
+    FlatLists.ComparableList<Comparable<HoodieRecord>> comparableList = 
SortUtils.getComparableSortColumns(record, userSortColumns, 
HoodieSchema.parse(SCHEMA), suffixRecordKey, true);
     Object[] expectedSortColumnValues;
     if (suffixRecordKey) {
       expectedSortColumnValues = new Object[] {"partition1", "val1", 3.5, 
"record1"};
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
index cb6d03972da5..323ac96104df 100755
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/util/collection/TestBitCaskDiskMap.java
@@ -36,7 +36,6 @@ import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SpillableMapUtils;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.api.BeforeEach;
@@ -143,7 +142,7 @@ public class TestBitCaskDiskMap extends 
HoodieCommonTestHarness {
   @ParameterizedTest
   @ValueSource(booleans = {false, true})
   public void testSimpleUpsert(boolean isCompressionEnabled) throws 
IOException, URISyntaxException {
-    Schema schema = 
HoodieAvroUtils.addMetadataFields(getSimpleSchema().toAvroSchema());
+    HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(getSimpleSchema());
 
     try (BitCaskDiskMap records = new BitCaskDiskMap<>(basePath, new 
DefaultSerializer<>(), isCompressionEnabled)) {
       SchemaTestUtil testUtil = new SchemaTestUtil();
@@ -174,7 +173,7 @@ public class TestBitCaskDiskMap extends 
HoodieCommonTestHarness {
         HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
         assert recordKeys.contains(rec.getRecordKey());
         try {
-          IndexedRecord indexedRecord = (IndexedRecord) 
rec.getData().getInsertValue(schema).get();
+          IndexedRecord indexedRecord = (IndexedRecord) 
rec.getData().getInsertValue(schema.toAvroSchema()).get();
           String latestCommitTime =
               ((GenericRecord) 
indexedRecord).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
           assertEquals(latestCommitTime, newCommitTime);
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 6f6cf310a0d4..53ba2ea76b3b 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -19,7 +19,6 @@
 
 package org.apache.hudi.metadata;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
@@ -33,6 +32,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
@@ -518,12 +518,12 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     expected.add("intField");
     assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
         Lazy.eagerly(Option.of(schema)), false, V1).keySet()));
-    //test with avro schema with meta cols
-    HoodieSchema hoodieSchemaWithMetadataFields = 
HoodieSchema.fromAvroSchema(HoodieAvroUtils.addMetadataFields(schema.toAvroSchema()));
+    //test schema with meta cols
+    HoodieSchema hoodieSchemaWithMetadataFields = 
HoodieSchemaUtils.addMetadataFields(schema);
     assertListEquality(expected, new 
ArrayList<>(HoodieTableMetadataUtil.getColumnsToIndex(tableConfig, 
metadataConfig,
         Lazy.eagerly(Option.of(hoodieSchemaWithMetadataFields)), false, 
V1).keySet()));
 
-    //test with avro schema with type filter
+    //test schema with type filter
     metadataConfig = HoodieMetadataConfig.newBuilder()
         .enable(true).withMetadataIndexColumnStats(true)
         .withMaxColumnsToIndexForColStats(100)
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 2a258d40123d..be0aa261b047 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.hadoop.realtime;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMemoryConfig;
@@ -55,7 +54,6 @@ import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -970,8 +968,8 @@ public class TestHoodieRealtimeRecordReader {
 
     // In some queries, generic records that Hudi gets are just part of the 
full records.
     // Here test the case that some fields are missing in the record.
-    Schema schemaWithMetaFields = 
HoodieAvroUtils.addMetadataFields(schema.toAvroSchema());
-    ArrayWritable aWritable2 = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, 
schemaWithMetaFields);
+    HoodieSchema schemaWithMetaFields = 
HoodieSchemaUtils.addMetadataFields(schema);
+    ArrayWritable aWritable2 = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, 
schemaWithMetaFields.toAvroSchema());
     assertEquals(schemaWithMetaFields.getFields().size(), 
aWritable2.get().length);
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
index b93f73ad4005..4d64d009af59 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/KafkaOffsetPostProcessor.java
@@ -18,14 +18,15 @@
 
 package org.apache.hudi.utilities.schema;
 
-import org.apache.hudi.avro.AvroSchemaUtils;
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.internal.schema.HoodieSchemaException;
 import org.apache.hudi.utilities.config.HoodieStreamerConfig;
 
-import org.apache.avro.JsonProperties;
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
 
@@ -64,30 +65,37 @@ public class KafkaOffsetPostProcessor extends 
SchemaPostProcessor {
   @Override
   @Deprecated
   public Schema processSchema(Schema schema) {
+    return processSchema(HoodieSchema.fromAvroSchema(schema)).toAvroSchema();
+  }
+
+  @Override
+  public HoodieSchema processSchema(HoodieSchema schema) {
     // this method adds kafka offset fields namely source offset, partition, 
timestamp and kafka message key to the schema of the batch.
-    List<Schema.Field> fieldList = schema.getFields();
-    Set<String> fieldNames = 
fieldList.stream().map(Schema.Field::name).collect(Collectors.toSet());
+    List<HoodieSchemaField> fieldList = schema.getFields();
+    Set<String> fieldNames = 
fieldList.stream().map(HoodieSchemaField::name).collect(Collectors.toSet());
     // if the source schema already contains the kafka offset fields, then 
return the schema as is.
     if (fieldNames.containsAll(Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, 
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN, 
KAFKA_SOURCE_KEY_COLUMN))) {
       return schema;
     }
     try {
-      List<Schema.Field> newFieldList = fieldList.stream()
-          
.map(HoodieAvroUtils::createNewSchemaField).collect(Collectors.toList());
+      List<HoodieSchemaField> newFieldList = fieldList.stream()
+          
.map(HoodieSchemaUtils::createNewSchemaField).collect(Collectors.toList());
       // handle case where source schema provider may have already set 1 or 
more of these fields
       if (!fieldNames.contains(KAFKA_SOURCE_OFFSET_COLUMN)) {
-        newFieldList.add(new Schema.Field(KAFKA_SOURCE_OFFSET_COLUMN, 
Schema.create(Schema.Type.LONG), "offset column", 0));
+        newFieldList.add(HoodieSchemaField.of(KAFKA_SOURCE_OFFSET_COLUMN, 
HoodieSchema.create(HoodieSchemaType.LONG), "offset column", 0));
       }
       if (!fieldNames.contains(KAFKA_SOURCE_PARTITION_COLUMN)) {
-        newFieldList.add(new Schema.Field(KAFKA_SOURCE_PARTITION_COLUMN, 
Schema.create(Schema.Type.INT), "partition column", 0));
+        newFieldList.add(HoodieSchemaField.of(KAFKA_SOURCE_PARTITION_COLUMN, 
HoodieSchema.create(HoodieSchemaType.INT), "partition column", 0));
       }
       if (!fieldNames.contains(KAFKA_SOURCE_TIMESTAMP_COLUMN)) {
-        newFieldList.add(new Schema.Field(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
Schema.create(Schema.Type.LONG), "timestamp column", 0));
+        newFieldList.add(HoodieSchemaField.of(KAFKA_SOURCE_TIMESTAMP_COLUMN, 
HoodieSchema.create(HoodieSchemaType.LONG), "timestamp column", 0));
       }
       if (!fieldNames.contains(KAFKA_SOURCE_KEY_COLUMN)) {
-        newFieldList.add(new Schema.Field(KAFKA_SOURCE_KEY_COLUMN, 
AvroSchemaUtils.createNullableSchema(Schema.Type.STRING), "kafka key column", 
JsonProperties.NULL_VALUE));
+        newFieldList.add(
+            HoodieSchemaField.of(KAFKA_SOURCE_KEY_COLUMN,
+                
HoodieSchemaUtils.createNullableSchema(HoodieSchema.create(HoodieSchemaType.STRING)),
 "kafka key column", HoodieSchema.NULL_VALUE));
       }
-      return Schema.createRecord(schema.getName() + "_processed", 
schema.getDoc(), schema.getNamespace(), false, newFieldList);
+      return HoodieSchema.createRecord(schema.getName() + "_processed", 
schema.getDoc().orElse(null), schema.getNamespace().orElse(null), false, 
newFieldList);
     } catch (Exception e) {
       throw new HoodieSchemaException("Kafka offset post processor failed with 
schema: " + schema, e);
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java
index aca1677f53b0..09ec411b24d4 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaProviderWithPostProcessor.java
@@ -39,6 +39,7 @@ public class SchemaProviderWithPostProcessor extends 
SchemaProvider {
   }
 
   @Override
+  @Deprecated
   public Schema getSourceSchema() {
     HoodieSchema sourceSchema = schemaProvider.getSourceHoodieSchema();
     return schemaPostProcessor.map(processor -> 
processor.processSchema(sourceSchema).toAvroSchema())
@@ -46,6 +47,7 @@ public class SchemaProviderWithPostProcessor extends 
SchemaProvider {
   }
 
   @Override
+  @Deprecated
   public Schema getTargetSchema() {
     HoodieSchema targetSchema = schemaProvider.getTargetHoodieSchema();
     return schemaPostProcessor.map(processor -> 
processor.processSchema(targetSchema).toAvroSchema())
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
index a479e67af5cf..c8a8d9624240 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DeleteSupportSchemaPostProcessor.java
@@ -20,6 +20,10 @@ package org.apache.hudi.utilities.schema.postprocessor;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.utilities.schema.SchemaPostProcessor;
 
 import org.apache.avro.Schema;
@@ -30,8 +34,6 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField;
-
 /**
  * An implementation of {@link SchemaPostProcessor} which will add a column 
named "_hoodie_is_deleted" to the end of
  * a given schema.
@@ -45,25 +47,28 @@ public class DeleteSupportSchemaPostProcessor extends 
SchemaPostProcessor {
   }
 
   @Override
+  @Deprecated
   public Schema processSchema(Schema schema) {
+    return processSchema(HoodieSchema.fromAvroSchema(schema)).toAvroSchema();
+  }
 
+  @Override
+  public HoodieSchema processSchema(HoodieSchema schema) {
     if (schema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) != null) {
       LOG.warn("column {} already exists!", 
HoodieRecord.HOODIE_IS_DELETED_FIELD);
       return schema;
     }
 
-    List<Schema.Field> sourceFields = schema.getFields();
-    List<Schema.Field> targetFields = new ArrayList<>(sourceFields.size() + 1);
+    List<HoodieSchemaField> sourceFields = schema.getFields();
+    List<HoodieSchemaField> targetFields = new ArrayList<>(sourceFields.size() 
+ 1);
     // copy existing columns
-    for (Schema.Field sourceField : sourceFields) {
-      targetFields.add(createNewSchemaField(sourceField));
+    for (HoodieSchemaField sourceField : sourceFields) {
+      targetFields.add(HoodieSchemaUtils.createNewSchemaField(sourceField));
     }
     // add _hoodie_is_deleted column
-    targetFields.add(new Schema.Field(HoodieRecord.HOODIE_IS_DELETED_FIELD, 
Schema.create(Schema.Type.BOOLEAN), null, false));
-
-    return Schema.createRecord(schema.getName(), schema.getDoc(), 
schema.getNamespace(), false, targetFields);
+    
targetFields.add(HoodieSchemaField.of(HoodieRecord.HOODIE_IS_DELETED_FIELD, 
HoodieSchema.create(HoodieSchemaType.BOOLEAN), null, false));
+    return HoodieSchema.createRecord(schema.getName(), 
schema.getDoc().orElse(null), schema.getNamespace().orElse(null), false, 
targetFields);
   }
-
 }
 
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
index 020825e2d6f7..703dea14544f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/DropColumnSchemaPostProcessor.java
@@ -19,6 +19,9 @@
 package org.apache.hudi.utilities.schema.postprocessor;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.utilities.config.SchemaProviderPostProcessorConfig;
 import org.apache.hudi.utilities.exception.HoodieSchemaPostProcessException;
@@ -36,7 +39,6 @@ import java.util.Locale;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 
 /**
@@ -57,14 +59,20 @@ public class DropColumnSchemaPostProcessor extends 
SchemaPostProcessor {
 
   @Deprecated
   public static class Config {
+
     @Deprecated
     public static final String DELETE_COLUMN_POST_PROCESSOR_COLUMN_PROP =
         
SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN.key();
   }
 
   @Override
+  @Deprecated
   public Schema processSchema(Schema schema) {
+    return processSchema(HoodieSchema.fromAvroSchema(schema)).toAvroSchema();
+  }
 
+  @Override
+  public HoodieSchema processSchema(HoodieSchema schema) {
     String columnToDeleteStr = getStringWithAltKeys(
         this.config, 
SchemaProviderPostProcessorConfig.DELETE_COLUMN_POST_PROCESSOR_COLUMN);
 
@@ -78,12 +86,12 @@ public class DropColumnSchemaPostProcessor extends 
SchemaPostProcessor {
         .map(filed -> filed.toLowerCase(Locale.ROOT))
         .collect(Collectors.toSet());
 
-    List<Schema.Field> sourceFields = schema.getFields();
-    List<Schema.Field> targetFields = new LinkedList<>();
+    List<HoodieSchemaField> sourceFields = schema.getFields();
+    List<HoodieSchemaField> targetFields = new LinkedList<>();
 
-    for (Schema.Field sourceField : sourceFields) {
+    for (HoodieSchemaField sourceField : sourceFields) {
       if 
(!columnsToDelete.contains(sourceField.name().toLowerCase(Locale.ROOT))) {
-        targetFields.add(createNewSchemaField(sourceField));
+        targetFields.add(HoodieSchemaUtils.createNewSchemaField(sourceField));
       }
     }
 
@@ -91,7 +99,6 @@ public class DropColumnSchemaPostProcessor extends 
SchemaPostProcessor {
       throw new HoodieSchemaPostProcessException("Target schema is empty, you 
can not remove all columns!");
     }
 
-    return Schema.createRecord(schema.getName(), schema.getDoc(), 
schema.getNamespace(), false, targetFields);
+    return HoodieSchema.createRecord(schema.getName(), 
schema.getDoc().orElse(null), schema.getNamespace().orElse(null), false, 
targetFields);
   }
-
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java
index 2be4942ab80b..e83cf9d9a52d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/postprocessor/add/AddPrimitiveColumnSchemaPostProcessor.java
@@ -19,6 +19,10 @@
 package org.apache.hudi.utilities.schema.postprocessor.add;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -32,7 +36,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
 
-import static org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField;
 import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getRawValueWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
@@ -57,29 +60,34 @@ public class AddPrimitiveColumnSchemaPostProcessor extends 
SchemaPostProcessor {
   }
 
   @Override
+  @Deprecated
   public Schema processSchema(Schema schema) {
+    return processSchema(HoodieSchema.fromAvroSchema(schema)).toAvroSchema();
+  }
+
+  @Override
+  public HoodieSchema processSchema(HoodieSchema schema) {
     String newColumnName = getStringWithAltKeys(this.config, 
SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP);
 
     if (schema.getField(newColumnName) != null) {
       throw new HoodieSchemaPostProcessException(String.format("Column %s 
already exist!", newColumnName));
     }
 
-    List<Schema.Field> sourceFields = schema.getFields();
-    List<Schema.Field> targetFields = new ArrayList<>(sourceFields.size() + 1);
+    List<HoodieSchemaField> sourceFields = schema.getFields();
+    List<HoodieSchemaField> targetFields = new ArrayList<>(sourceFields.size() 
+ 1);
 
 
-    for (Schema.Field sourceField : sourceFields) {
-      targetFields.add(createNewSchemaField(sourceField));
+    for (HoodieSchemaField sourceField : sourceFields) {
+      targetFields.add(HoodieSchemaUtils.createNewSchemaField(sourceField));
     }
 
     // add new column to the end
     targetFields.add(buildNewColumn());
 
-    return Schema.createRecord(schema.getName(), schema.getDoc(), 
schema.getNamespace(), false, targetFields);
+    return HoodieSchema.createRecord(schema.getName(), 
schema.getDoc().orElse(null), schema.getNamespace().orElse(null), false, 
targetFields);
   }
 
-  private Schema.Field buildNewColumn() {
-
+  private HoodieSchemaField buildNewColumn() {
     String columnName = getStringWithAltKeys(this.config, 
SCHEMA_POST_PROCESSOR_ADD_COLUMN_NAME_PROP);
     String type = getStringWithAltKeys(this.config, 
SCHEMA_POST_PROCESSOR_ADD_COLUMN_TYPE_PROP).toUpperCase(Locale.ROOT);
     String doc = getStringWithAltKeys(this.config, 
SCHEMA_POST_PROCESSOR_ADD_COLUMN_DOC_PROP, true);
@@ -91,17 +99,13 @@ public class AddPrimitiveColumnSchemaPostProcessor extends 
SchemaPostProcessor {
     ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(type));
     ValidationUtils.checkArgument(!Schema.Type.NULL.getName().equals(type));
 
-    Schema newSchema = createSchema(type, nullable);
-
-    return new Schema.Field(columnName, newSchema, doc, 
defaultValue.isPresent() ? defaultValue.get() : null);
+    HoodieSchema newSchema = createSchema(type, nullable);
+    return HoodieSchemaField.of(columnName, newSchema, doc, 
defaultValue.isPresent() ? defaultValue.get() : null);
   }
 
-  private Schema createSchema(String type, boolean nullable) {
-    Schema schema = Schema.create(Schema.Type.valueOf(type));
-    if (nullable) {
-      schema = Schema.createUnion(Schema.create(Schema.Type.NULL), schema);
-    }
-    return schema;
+  private HoodieSchema createSchema(String type, boolean nullable) {
+    HoodieSchema schema = HoodieSchema.create(HoodieSchemaType.valueOf(type));
+    return nullable ? HoodieSchema.createNullable(schema) : schema;
   }
 
 }


Reply via email to