This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 399af74b369f feat(schema): Migrate StreamSync code path and its 
dependencies to use HoodieSchema (#17600)
399af74b369f is described below

commit 399af74b369f17235115a1ad3632f15718b3e830
Author: Tim Brown <[email protected]>
AuthorDate: Mon Dec 22 16:25:15 2025 -0500

    feat(schema): Migrate StreamSync code path and its dependencies to use 
HoodieSchema (#17600)
---
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |  58 +++++------
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  69 -------------
 .../common/schema/HoodieSchemaCompatibility.java   |   6 ++
 .../hudi/common/schema/HoodieSchemaField.java      |   5 +
 .../hudi/common/schema/HoodieSchemaUtils.java      |  43 ++++++++
 .../hudi/common/table/TableSchemaResolver.java     |  15 +++
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  |  55 ----------
 .../hudi/common/schema/TestHoodieSchemaUtils.java  |  49 +++++++++
 .../common/testutils/HoodieTestDataGenerator.java  |   2 +
 .../testsuite/dag/nodes/SparkDeleteNode.scala      |   4 +-
 .../testsuite/dag/nodes/SparkInsertNode.scala      |   3 +-
 .../org/apache/hudi/HoodieCreateRecordUtils.scala  |   3 +-
 .../org/apache/hudi/TestHoodieSparkUtils.scala     |  25 ++---
 .../execution/benchmark/AvroSerDerBenchmark.scala  |  14 +--
 .../benchmark/CreateHandleBenchmark.scala          |  15 ++-
 .../org/apache/hudi/utilities/UtilHelpers.java     |  18 ++--
 .../apache/hudi/utilities/schema/SchemaSet.java    |  11 +-
 .../helpers/CloudObjectsSelectorCommon.java        | 114 ++++++++++-----------
 .../utilities/streamer/HoodieStreamerUtils.java    |  19 ++--
 .../utilities/streamer/SourceFormatAdapter.java    |  16 +--
 .../apache/hudi/utilities/streamer/StreamSync.java |  53 ++++------
 .../utilities/transform/ChainedTransformer.java    |  12 +--
 .../ErrorTableAwareChainedTransformer.java         |   4 +-
 ...TestHoodieDeltaStreamerSchemaEvolutionBase.java |  10 +-
 ...estHoodieDeltaStreamerSchemaEvolutionQuick.java |  35 ++++---
 .../deltastreamer/TestSourceFormatAdapter.java     |  10 +-
 .../functional/TestChainedTransformer.java         |  14 +--
 .../utilities/sources/TestGenericRddTransform.java |   6 +-
 .../utilities/testutils/SanitizationTestUtils.java |  38 +++----
 29 files changed, 346 insertions(+), 380 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 97dfb1b3d5c1..153aca96a154 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -19,9 +19,11 @@
 package org.apache.hudi
 
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
+import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.TimestampKeyGeneratorConfig
 import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.util.{Option => HOption}
 import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator
 import org.apache.hudi.keygen.constant.KeyGeneratorType
 import org.apache.hudi.storage.StoragePath
@@ -38,8 +40,8 @@ import 
org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
 import org.apache.spark.sql.execution.SQLConfInjectingRDD
 import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DateType, StringType, StructField, 
StructType, TimestampType}
-import org.apache.spark.sql.{DataFrame, HoodieUnsafeUtils}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.JavaConverters._
@@ -76,27 +78,22 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
    */
   @Deprecated
   def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
reconcileToLatestSchema: Boolean,
-                latestTableSchema: org.apache.hudi.common.util.Option[Schema] 
= org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
+                latestTableSchema: HOption[HoodieSchema] = HOption.empty()): 
RDD[GenericRecord] = {
     createRdd(df, structName, recordNamespace, 
toScalaOption(latestTableSchema))
   }
 
   def createRdd(df: DataFrame, structName: String, recordNamespace: String): 
RDD[GenericRecord] =
     createRdd(df, structName, recordNamespace, None)
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
readerAvroSchemaOpt: Option[Schema]): RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
readerSchemaOpt: Option[HoodieSchema]): RDD[GenericRecord] = {
     val writerSchema = df.schema
-    val writerAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(writerSchema, structName, 
recordNamespace)
-    val readerAvroSchema = readerAvroSchemaOpt.getOrElse(writerAvroSchema)
+    val writerHoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(writerSchema, 
structName, recordNamespace)
+    val readerHoodieSchema = readerSchemaOpt.getOrElse(writerHoodieSchema)
     // We check whether passed in reader schema is identical to writer schema 
to avoid costly serde loop of
     // making Spark deserialize its internal representation [[InternalRow]] 
into [[Row]] for subsequent conversion
     // (and back)
-    val sameSchema = writerAvroSchema.equals(readerAvroSchema)
-    val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(writerAvroSchema) 
!= writerAvroSchema
-
-    // NOTE: We have to serialize Avro schema, and then subsequently parse it 
on the executor node, since Spark
-    //       serializer is not able to digest it
-    val readerAvroSchemaStr = readerAvroSchema.toString
-    val writerAvroSchemaStr = writerAvroSchema.toString
+    val sameSchema = writerHoodieSchema.equals(readerHoodieSchema)
+    val nullable = writerHoodieSchema.isNullable
 
     // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to 
[[Row]] conversion
     //       Additionally, we have to explicitly wrap around resulting [[RDD]] 
into the one
@@ -106,17 +103,15 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
       if (rows.isEmpty) {
         Iterator.empty
       } else {
-        val readerAvroSchema = new Schema.Parser().parse(readerAvroSchemaStr)
         val transform: GenericRecord => GenericRecord =
           if (sameSchema) identity
           else {
-            HoodieAvroUtils.rewriteRecordDeep(_, readerAvroSchema)
+            HoodieAvroUtils.rewriteRecordDeep(_, 
readerHoodieSchema.toAvroSchema)
           }
 
         // Since caller might request to get records in a different 
("evolved") schema, we will be rewriting from
         // existing Writer's schema into Reader's (avro) schema
-        val writerAvroSchema = new Schema.Parser().parse(writerAvroSchemaStr)
-        val convert = 
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema, 
writerAvroSchema, nullable = nullable)
+        val convert = 
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema, 
writerHoodieSchema.toAvroSchema, nullable = nullable)
 
         rows.map { ir => transform(convert(ir)) }
       }
@@ -137,9 +132,9 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
   }
 
   def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: 
String, reconcileToLatestSchema: Boolean,
-                    latestTableSchema: 
org.apache.hudi.common.util.Option[Schema] = 
org.apache.hudi.common.util.Option.empty()):
+                    latestTableSchema: HOption[HoodieSchema] = 
HOption.empty()):
   Tuple2[RDD[GenericRecord], RDD[String]] = {
-    var latestTableSchemaConverted: Option[Schema] = None
+    var latestTableSchemaConverted: Option[HoodieSchema] = None
 
     if (latestTableSchema.isPresent && reconcileToLatestSchema) {
       latestTableSchemaConverted = Some(latestTableSchema.get())
@@ -151,33 +146,26 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
     safeCreateRDD(df, structName, recordNamespace, latestTableSchemaConverted);
   }
 
-  def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: 
String, readerAvroSchemaOpt: Option[Schema]):
+  def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: 
String, readerSchemaOpt: Option[HoodieSchema]):
   Tuple2[RDD[GenericRecord], RDD[String]] = {
     val writerSchema = df.schema
-    val writerAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(writerSchema, structName, 
recordNamespace)
-    val readerAvroSchema = readerAvroSchemaOpt.getOrElse(writerAvroSchema)
+    val writerHoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(writerSchema, 
structName, recordNamespace)
+    val readerHoodieSchema = readerSchemaOpt.getOrElse(writerHoodieSchema)
     // We check whether passed in reader schema is identical to writer schema 
to avoid costly serde loop of
     // making Spark deserialize its internal representation [[InternalRow]] 
into [[Row]] for subsequent conversion
     // (and back)
-    val sameSchema = writerAvroSchema.equals(readerAvroSchema)
-    val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(writerAvroSchema) 
!= writerAvroSchema
+    val sameSchema = writerHoodieSchema.equals(readerHoodieSchema)
+    val nullable = writerHoodieSchema.isNullable
 
-    // NOTE: We have to serialize Avro schema, and then subsequently parse it 
on the executor node, since Spark
-    //       serializer is not able to digest it
-    val writerAvroSchemaStr = writerAvroSchema.toString
-    val readerAvroSchemaStr = readerAvroSchema.toString
     // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to 
[[Row]] conversion
-
     if (!sameSchema) {
       val rdds: RDD[Either[GenericRecord, InternalRow]] = 
df.queryExecution.toRdd.mapPartitions { rows =>
         if (rows.isEmpty) {
           Iterator.empty
         } else {
-          val writerAvroSchema = new Schema.Parser().parse(writerAvroSchemaStr)
-          val readerAvroSchema = new Schema.Parser().parse(readerAvroSchemaStr)
-          val convert = 
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema, 
writerAvroSchema, nullable = nullable)
+          val convert = 
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema, 
writerHoodieSchema.toAvroSchema, nullable = nullable)
           val transform: InternalRow => Either[GenericRecord, InternalRow] = 
internalRow => try {
-            Left(HoodieAvroUtils.rewriteRecordDeep(convert(internalRow), 
readerAvroSchema, true))
+            Left(HoodieAvroUtils.rewriteRecordDeep(convert(internalRow), 
readerHoodieSchema.toAvroSchema, true))
           } catch {
             case _: Throwable =>
               Right(internalRow)
@@ -197,7 +185,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
         if (rows.isEmpty) {
           Iterator.empty
         } else {
-          val convert = 
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema, 
writerAvroSchema, nullable = nullable)
+          val convert = 
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema, 
writerHoodieSchema.toAvroSchema, nullable = nullable)
           rows.map(convert)
         }
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index c4ea3709c115..46b98f9ca795 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -90,7 +90,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.TreeMap;
-import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -1370,74 +1369,6 @@ public class HoodieAvroUtils {
         scale, new MathContext(precision, RoundingMode.HALF_UP));
   }
 
-  public static boolean hasDecimalField(Schema schema) {
-    return hasDecimalWithCondition(schema, unused -> true);
-  }
-
-  /**
-   * Checks whether the provided schema contains a decimal with a precision 
less than or equal to 18,
-   * which allows the decimal to be stored as int/long instead of a fixed size 
byte array in
-   * <a 
href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md";>parquet
 logical types</a>
-   * @param schema the input schema to search
-   * @return true if the schema contains a small precision decimal field and 
false otherwise
-   */
-  public static boolean hasSmallPrecisionDecimalField(Schema schema) {
-    return hasDecimalWithCondition(schema, 
HoodieAvroUtils::isSmallPrecisionDecimalField);
-  }
-
-  private static boolean hasDecimalWithCondition(Schema schema, 
Function<Decimal, Boolean> condition) {
-    switch (schema.getType()) {
-      case RECORD:
-        for (Field field : schema.getFields()) {
-          if (hasDecimalWithCondition(field.schema(), condition)) {
-            return true;
-          }
-        }
-        return false;
-      case ARRAY:
-        return hasDecimalWithCondition(schema.getElementType(), condition);
-      case MAP:
-        return hasDecimalWithCondition(schema.getValueType(), condition);
-      case UNION:
-        return hasDecimalWithCondition(getActualSchemaFromUnion(schema, null), 
condition);
-      default:
-        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
-          Decimal decimal = (Decimal) schema.getLogicalType();
-          return condition.apply(decimal);
-        } else {
-          return false;
-        }
-    }
-  }
-
-  private static boolean isSmallPrecisionDecimalField(Decimal decimal) {
-    return decimal.getPrecision() <= 18;
-  }
-
-  /**
-   * Checks whether the provided schema contains a list or map field.
-   * @param schema input
-   * @return true if a list or map is present, false otherwise
-   */
-  public static boolean hasListOrMapField(Schema schema) {
-    switch (schema.getType()) {
-      case RECORD:
-        for (Field field : schema.getFields()) {
-          if (hasListOrMapField(field.schema())) {
-            return true;
-          }
-        }
-        return false;
-      case ARRAY:
-      case MAP:
-        return true;
-      case UNION:
-        return hasListOrMapField(getActualSchemaFromUnion(schema, null));
-      default:
-        return false;
-    }
-  }
-
   /**
    * Avro does not support type promotion from numbers to string. This 
function returns true if
    * it will be necessary to rewrite the record to support this promotion.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java
index ab2f04b2ae9e..59e830cd555f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java
@@ -24,6 +24,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.internal.schema.HoodieSchemaException;
 
+import org.apache.avro.SchemaCompatibility;
+
 import java.util.Collections;
 import java.util.Set;
 
@@ -46,6 +48,10 @@ public final class HoodieSchemaCompatibility {
   private HoodieSchemaCompatibility() {
   }
 
+  public static boolean areSchemasCompatible(HoodieSchema tableSchema, 
HoodieSchema writerSchema) {
+    return 
SchemaCompatibility.checkReaderWriterCompatibility(tableSchema.toAvroSchema(), 
writerSchema.toAvroSchema()).getType() == 
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
+  }
+
   /**
    * Checks if writer schema is compatible with table schema for write 
operations.
    * This is equivalent to AvroSchemaUtils.checkSchemaCompatible() but 
operates on HoodieSchemas.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaField.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaField.java
index aa5782ff1d57..b2671cbc57a0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaField.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaField.java
@@ -27,6 +27,7 @@ import org.apache.avro.Schema;
 import java.io.Serializable;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 
 /**
  * Wrapper class for Avro Schema.Field that provides Hudi-specific field 
functionality
@@ -237,6 +238,10 @@ public class HoodieSchemaField implements Serializable {
     avroField.addProp(key, value);
   }
 
+  public Set<String> aliases() {
+    return avroField.aliases();
+  }
+
   /**
    * Returns the underlying Avro field for compatibility purposes.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
index 0c7034434260..cd0be22fd52e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
@@ -36,6 +36,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 /**
@@ -681,4 +682,46 @@ public final class HoodieSchemaUtils {
     // Delegate to AvroSchemaUtils
     return AvroSchemaUtils.getAvroRecordQualifiedName(tableName);
   }
+
+  public static boolean hasDecimalField(HoodieSchema schema) {
+    return hasDecimalWithCondition(schema, unused -> true);
+  }
+
+  /**
+   * Checks whether the provided schema contains a decimal with a precision 
less than or equal to 18,
+   * which allows the decimal to be stored as int/long instead of a fixed size 
byte array in
+   * <a 
href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md";>parquet
 logical types</a>
+   * @param schema the input schema to search
+   * @return true if the schema contains a small precision decimal field and 
false otherwise
+   */
+  public static boolean hasSmallPrecisionDecimalField(HoodieSchema schema) {
+    return hasDecimalWithCondition(schema, 
HoodieSchemaUtils::isSmallPrecisionDecimalField);
+  }
+
+  private static boolean hasDecimalWithCondition(HoodieSchema schema, 
Function<HoodieSchema.Decimal, Boolean> condition) {
+    switch (schema.getType()) {
+      case RECORD:
+        for (HoodieSchemaField field : schema.getFields()) {
+          if (hasDecimalWithCondition(field.schema(), condition)) {
+            return true;
+          }
+        }
+        return false;
+      case ARRAY:
+        return hasDecimalWithCondition(schema.getElementType(), condition);
+      case MAP:
+        return hasDecimalWithCondition(schema.getValueType(), condition);
+      case UNION:
+        return hasDecimalWithCondition(schema.getNonNullType(), condition);
+      case DECIMAL:
+        HoodieSchema.Decimal decimal = (HoodieSchema.Decimal) schema;
+        return condition.apply(decimal);
+      default:
+        return false;
+    }
+  }
+
+  private static boolean isSmallPrecisionDecimalField(HoodieSchema.Decimal 
decimal) {
+    return decimal.getPrecision() <= 18;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 893dac2ba464..e879c8f8d222 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -350,6 +350,21 @@ public class TableSchemaResolver {
     return Option.empty();
   }
 
+  /**
+   * Returns table's latest {@link HoodieSchema} iff table is non-empty (ie 
there's at least
+   * a single commit)
+   *
+   * This method differs from {@link #getTableAvroSchema(boolean)} in that it 
won't fallback
+   * to use table's schema used at creation
+   */
+  public Option<HoodieSchema> getTableSchemaFromLatestCommit(boolean 
includeMetadataFields) {
+    if (metaClient.isTimelineNonEmpty()) {
+      return getTableAvroSchemaInternal(includeMetadataFields, 
Option.empty()).map(HoodieSchema::fromAvroSchema);
+    }
+
+    return Option.empty();
+  }
+
   /**
    * Read schema from a data file from the last compaction commit done.
    *
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 245900a11f5f..ab57ebb4bea5 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -63,7 +63,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.RewriteAvroPayload;
 import org.apache.hudi.common.schema.HoodieSchema;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -122,7 +121,6 @@ import static 
org.apache.hudi.avro.HoodieAvroWrapperUtils.wrapValueIntoAvro;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -941,32 +939,6 @@ public class TestHoodieAvroUtils {
     assertEquals(decfield, after);
   }
 
-  @Test
-  void testHasDecimalField() {
-    assertTrue(HoodieAvroUtils.hasDecimalField(new 
Schema.Parser().parse(SCHEMA_WITH_DECIMAL_FIELD)));
-    assertFalse(HoodieAvroUtils.hasDecimalField(new 
Schema.Parser().parse(EVOLVED_SCHEMA)));
-    assertFalse(HoodieAvroUtils.hasDecimalField(new 
Schema.Parser().parse(SCHEMA_WITH_NON_NULLABLE_FIELD)));
-    
assertTrue(HoodieAvroUtils.hasDecimalField(HoodieTestDataGenerator.AVRO_SCHEMA));
-    
assertTrue(HoodieAvroUtils.hasDecimalField(HoodieTestDataGenerator.AVRO_TRIP_ENCODED_DECIMAL_SCHEMA));
-    Schema recordWithMapAndArray = 
Schema.createRecord("recordWithMapAndArray", null, null, false,
-        Arrays.asList(new Schema.Field("mapfield", 
Schema.createMap(Schema.create(Schema.Type.INT)), null, null),
-            new Schema.Field("arrayfield", 
Schema.createArray(Schema.create(Schema.Type.INT)), null, null)
-        ));
-    assertFalse(HoodieAvroUtils.hasDecimalField(recordWithMapAndArray));
-    Schema recordWithDecMapAndArray = 
Schema.createRecord("recordWithDecMapAndArray", null, null, false,
-        Arrays.asList(new Schema.Field("mapfield",
-                
Schema.createMap(LogicalTypes.decimal(10,6).addToSchema(Schema.create(Schema.Type.BYTES))),
 null, null),
-            new Schema.Field("arrayfield", 
Schema.createArray(Schema.create(Schema.Type.INT)), null, null)
-        ));
-    assertTrue(HoodieAvroUtils.hasDecimalField(recordWithDecMapAndArray));
-    Schema recordWithMapAndDecArray = 
Schema.createRecord("recordWithMapAndDecArray", null, null, false,
-        Arrays.asList(new Schema.Field("mapfield",
-            Schema.createMap(Schema.create(Schema.Type.INT)), null, null), new 
Schema.Field("arrayfield",
-            
Schema.createArray(LogicalTypes.decimal(10,6).addToSchema(Schema.create(Schema.Type.BYTES))),
 null, null)
-        ));
-    assertTrue(HoodieAvroUtils.hasDecimalField(recordWithMapAndDecArray));
-  }
-
   @Test
   void testCreateFullName() {
     String result = HoodieAvroUtils.createFullName(new 
ArrayDeque<>(Arrays.asList("a", "b", "c")));
@@ -1005,33 +977,6 @@ public class TestHoodieAvroUtils {
     }
   }
 
-  @Test
-  void testHasListOrMapField() {
-    Schema nestedList = Schema.createRecord("nestedList", null, null, false, 
Arrays.asList(
-        new Schema.Field("intField", Schema.create(Schema.Type.INT), null, 
null),
-        new Schema.Field("nested", Schema.createRecord("nestedSchema", null, 
null, false, Collections.singletonList(
-            new Schema.Field("listField", 
Schema.createArray(Schema.create(Schema.Type.INT)), null, null)
-        )), null, null)
-    ));
-    Schema nestedMap = Schema.createRecord("nestedMap", null, null, false, 
Arrays.asList(
-        new Schema.Field("intField", Schema.create(Schema.Type.INT), null, 
null),
-        new Schema.Field("nested", 
Schema.createUnion(Schema.create(Schema.Type.NULL),
-            Schema.createRecord("nestedSchema", null, null, false,
-                Collections.singletonList(new Schema.Field("mapField", 
Schema.createMap(Schema.create(Schema.Type.INT)), null, null)
-                ))), null, null)
-    ));
-    assertTrue(HoodieAvroUtils.hasListOrMapField(nestedList));
-    assertTrue(HoodieAvroUtils.hasListOrMapField(nestedMap));
-    assertFalse(HoodieAvroUtils.hasListOrMapField(new 
Schema.Parser().parse(EXAMPLE_SCHEMA)));
-  }
-
-  @Test
-  void testHasSmallPrecisionDecimalField() {
-    assertTrue(HoodieAvroUtils.hasSmallPrecisionDecimalField(new 
Schema.Parser().parse(SCHEMA_WITH_DECIMAL_FIELD)));
-    assertFalse(HoodieAvroUtils.hasSmallPrecisionDecimalField(new 
Schema.Parser().parse(SCHEMA_WITH_AVRO_TYPES_STR)));
-    assertFalse(HoodieAvroUtils.hasSmallPrecisionDecimalField(new 
Schema.Parser().parse(EXAMPLE_SCHEMA)));
-  }
-
   public static Stream<Arguments> getSchemaForFieldParams() {
     Object[][] data =
         new Object[][] {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
index b9c91249a8f2..e29ee000f766 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.schema;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -99,6 +100,15 @@ public class TestHoodieSchemaUtils {
       + 
"{\"name\":\"localTimestampMicrosField\",\"type\":\"long\",\"logicalType\":\"local-timestamp-micros\"}"
       + "]}";
 
+  private static final String SCHEMA_WITH_NON_NULLABLE_FIELD =
+      "{\"type\": \"record\",\"name\": \"testrec3\",\"fields\": [ "
+          + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": 
\"_row_key\", \"type\": \"string\"},"
+          + "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+          + "{\"name\": \"pii_col\", \"type\": \"string\", 
\"column_category\": \"user_profile\"},"
+          + "{\"name\": \"nullable_field\",\"type\": [\"null\" 
,\"string\"],\"default\": null},"
+          + "{\"name\": \"non_nullable_field_wo_default\",\"type\": 
\"string\"},"
+          + "{\"name\": \"non_nullable_field_with_default\",\"type\": 
\"string\", \"default\": \"dummy\"}]}";
+
   private static String SCHEMA_WITH_NESTED_FIELD_LARGE_STR = 
"{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["
       + "{\"name\":\"firstname\",\"type\":\"string\"},"
       + "{\"name\":\"lastname\",\"type\":\"string\"},"
@@ -106,6 +116,11 @@ public class TestHoodieSchemaUtils {
       + 
"{\"name\":\"student\",\"type\":{\"name\":\"student\",\"type\":\"record\",\"fields\":["
       + "{\"name\":\"firstname\",\"type\":[\"null\" ,\"string\"],\"default\": 
null},{\"name\":\"lastname\",\"type\":[\"null\" ,\"string\"],\"default\": 
null}]}}]}";
 
+  private static final String SCHEMA_WITH_DECIMAL_FIELD = 
"{\"type\":\"record\",\"name\":\"record\",\"fields\":["
+      + "{\"name\":\"key_col\",\"type\":[\"null\",\"int\"],\"default\":null},"
+      + "{\"name\":\"decimal_col\",\"type\":[\"null\","
+      + 
"{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":8,\"scale\":4}],\"default\":null}]}";
+
   private static HoodieSchema SCHEMA_WITH_NESTED_FIELD_LARGE = 
HoodieSchema.parse(SCHEMA_WITH_NESTED_FIELD_LARGE_STR);
 
   @Test
@@ -1539,4 +1554,38 @@ public class TestHoodieSchemaUtils {
     assertTrue(result instanceof LocalDate);
     assertEquals(LocalDate.of(2023, 1, 1), result);
   }
+
+  @Test
+  void testHasDecimalField() {
+    
assertTrue(HoodieSchemaUtils.hasDecimalField(HoodieSchema.parse(SCHEMA_WITH_DECIMAL_FIELD)));
+    
assertFalse(HoodieSchemaUtils.hasDecimalField(HoodieSchema.parse(EVOLVED_SCHEMA)));
+    
assertFalse(HoodieSchemaUtils.hasDecimalField(HoodieSchema.parse(SCHEMA_WITH_NON_NULLABLE_FIELD)));
+    
assertTrue(HoodieSchemaUtils.hasDecimalField(HoodieTestDataGenerator.HOODIE_SCHEMA));
+    
assertTrue(HoodieSchemaUtils.hasDecimalField(HoodieTestDataGenerator.HOODIE_TRIP_ENCODED_DECIMAL_SCHEMA));
+    HoodieSchema recordWithMapAndArray = 
HoodieSchema.createRecord("recordWithMapAndArray", null, null, false,
+        Arrays.asList(
+            HoodieSchemaField.of("mapfield", 
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT)), null, null),
+            HoodieSchemaField.of("arrayfield", 
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.INT)), null, null)
+        ));
+    assertFalse(HoodieSchemaUtils.hasDecimalField(recordWithMapAndArray));
+    HoodieSchema recordWithDecMapAndArray = 
HoodieSchema.createRecord("recordWithDecMapAndArray", null, null, false,
+        Arrays.asList(
+            HoodieSchemaField.of("mapfield", 
HoodieSchema.createMap(HoodieSchema.createDecimal(10, 6)), null, null),
+            HoodieSchemaField.of("arrayfield", 
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.INT)), null, null)
+        ));
+    assertTrue(HoodieSchemaUtils.hasDecimalField(recordWithDecMapAndArray));
+    HoodieSchema recordWithMapAndDecArray = 
HoodieSchema.createRecord("recordWithMapAndDecArray", null, null, false,
+        Arrays.asList(
+            HoodieSchemaField.of("mapfield", 
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT)), null, null),
+            HoodieSchemaField.of("arrayfield", 
HoodieSchema.createArray(HoodieSchema.createDecimal(10, 6)), null, null)
+        ));
+    assertTrue(HoodieSchemaUtils.hasDecimalField(recordWithMapAndDecArray));
+  }
+
+  @Test
+  void testHasSmallPrecisionDecimalField() {
+    
assertTrue(HoodieSchemaUtils.hasSmallPrecisionDecimalField(HoodieSchema.parse(SCHEMA_WITH_DECIMAL_FIELD)));
+    
assertFalse(HoodieSchemaUtils.hasSmallPrecisionDecimalField(HoodieSchema.parse(SCHEMA_WITH_AVRO_TYPES_STR)));
+    
assertFalse(HoodieSchemaUtils.hasSmallPrecisionDecimalField(HoodieSchema.parse(EXAMPLE_SCHEMA)));
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 0fa61f3f5e9d..6a4eeb71400d 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -240,11 +240,13 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
   public static final HoodieSchema HOODIE_SCHEMA = 
HoodieSchema.fromAvroSchema(AVRO_SCHEMA);
   public static final Schema AVRO_SCHEMA_WITH_SPECIFIC_COLUMNS = new 
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA_WITH_PAYLOAD_SPECIFIC_COLS);
   public static final Schema NESTED_AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_NESTED_EXAMPLE_SCHEMA);
+  public static final HoodieSchema NESTED_SCHEMA = 
HoodieSchema.fromAvroSchema(NESTED_AVRO_SCHEMA);
   public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
       HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
   public static final HoodieSchema HOODIE_SCHEMA_WITH_METADATA_FIELDS = 
HoodieSchema.fromAvroSchema(AVRO_SCHEMA_WITH_METADATA_FIELDS);
   public static final Schema AVRO_SHORT_TRIP_SCHEMA = new 
Schema.Parser().parse(SHORT_TRIP_SCHEMA);
   public static final Schema AVRO_TRIP_ENCODED_DECIMAL_SCHEMA = new 
Schema.Parser().parse(TRIP_ENCODED_DECIMAL_SCHEMA);
+  public static final HoodieSchema HOODIE_TRIP_ENCODED_DECIMAL_SCHEMA = 
HoodieSchema.parse(TRIP_ENCODED_DECIMAL_SCHEMA);
   public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA = new 
Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA);
   public static final HoodieSchema HOODIE_SCHEMA_TRIP_LOGICAL_TYPES_SCHEMA = 
HoodieSchema.parse(TRIP_LOGICAL_TYPES_SCHEMA);
   public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6 = new 
Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA_V6);
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala
index d64be904101c..5bfbca63a04b 100644
--- 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala
@@ -20,11 +20,11 @@ package org.apache.hudi.integ.testsuite.dag.nodes
 
 import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
HoodieSparkUtils}
 import org.apache.hudi.client.WriteStatus
+import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
 import org.apache.hudi.integ.testsuite.dag.ExecutionContext
 
-import org.apache.avro.Schema
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SaveMode
 import org.slf4j.LoggerFactory
@@ -58,7 +58,7 @@ class SparkDeleteNode(dagNodeConfig: Config) extends 
DagNode[RDD[WriteStatus]] {
     val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + 
batchIdRecords.getKey()
     val avroDf = 
context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead)
     val genRecsRDD = HoodieSparkUtils.createRdd(avroDf, "testStructName", 
"testNamespace", false,
-      org.apache.hudi.common.util.Option.of(new 
Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
+      
org.apache.hudi.common.util.Option.of(HoodieSchema.parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
 
     val inputDF = AvroConversionUtils.createDataFrame(genRecsRDD,
       context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
index e5ada9363928..f4dd17f383f2 100644
--- 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
@@ -20,6 +20,7 @@ package org.apache.hudi.integ.testsuite.dag.nodes
 
 import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
HoodieSparkUtils}
 import org.apache.hudi.client.WriteStatus
+import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.util.collection.Pair
 import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
@@ -61,7 +62,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends 
DagNode[RDD[WriteStatus]] {
     val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + 
batchIdRecords.getKey()
     val avroDf = 
context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead)
     val genRecsRDD = HoodieSparkUtils.createRdd(avroDf, "testStructName", 
"testNamespace", false,
-      org.apache.hudi.common.util.Option.of(new 
Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
+      
org.apache.hudi.common.util.Option.of(HoodieSchema.parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
 
     val inputDF = AvroConversionUtils.createDataFrame(genRecsRDD,
       context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
index a1575af230ab..25151cda0008 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
@@ -115,8 +115,7 @@ object HoodieCreateRecordUtils {
     recordType match {
       case HoodieRecord.HoodieRecordType.AVRO =>
         // avroRecords will contain meta fields when isPrepped is true.
-        val avroRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, 
recordName, recordNameSpace,
-          Some(writerSchema.getAvroSchema))
+        val avroRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, 
recordName, recordNameSpace, Some(writerSchema))
 
         avroRecords.mapPartitions(it => {
           val sparkPartitionId = TaskContext.getPartitionId()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
index d3e7f1b22806..0198beaf2b58 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
@@ -19,6 +19,7 @@
 package org.apache.hudi
 
 import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.testutils.DataSourceTestUtils
 import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
 
@@ -97,7 +98,7 @@ class TestHoodieSparkUtils {
     val df1 = 
spark.createDataFrame(spark.sparkContext.parallelize(recordsSeq), structType)
 
     var genRecRDD = HoodieSparkUtils.createRdd(df1, "test_struct_name", 
"test_namespace", true,
-      org.apache.hudi.common.util.Option.of(schema.toAvroSchema))
+      org.apache.hudi.common.util.Option.of(schema))
     genRecRDD.collect()
 
     val evolSchema = DataSourceTestUtils.getStructTypeExampleEvolvedSchema
@@ -105,13 +106,13 @@ class TestHoodieSparkUtils {
     recordsSeq = convertRowListToSeq(records)
 
     genRecRDD = HoodieSparkUtils.createRdd(df1, "test_struct_name", 
"test_namespace", true,
-      org.apache.hudi.common.util.Option.of(evolSchema.toAvroSchema))
+      org.apache.hudi.common.util.Option.of(evolSchema))
     genRecRDD.collect()
 
     // pass in evolved schema but with records serialized with old schema. 
should be able to convert with out any exception.
     // Before https://github.com/apache/hudi/pull/2927, this will throw 
exception.
     genRecRDD = HoodieSparkUtils.createRdd(df1, "test_struct_name", 
"test_namespace", true,
-      org.apache.hudi.common.util.Option.of(evolSchema.toAvroSchema))
+      org.apache.hudi.common.util.Option.of(evolSchema))
     val genRecs = genRecRDD.collect()
     // if this succeeds w/o throwing any exception, test succeeded.
     assertEquals(genRecs.size, 5)
@@ -127,29 +128,29 @@ class TestHoodieSparkUtils {
     val innerStruct1 = new 
StructType().add("innerKey","string",false).add("innerValue", "long", true)
     val structType1 = new StructType().add("key", "string", false)
       
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct1,true)
-    val schema1 = 
AvroConversionUtils.convertStructTypeToAvroSchema(structType1, 
"test_struct_name", "test_namespace")
+    val schema1 = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType1, 
"test_struct_name", "test_namespace")
     val records1 = Seq(Row("key1", Row("innerKey1_1", 1L), Row("innerKey1_2", 
2L)))
 
     val df1 = spark.createDataFrame(spark.sparkContext.parallelize(records1), 
structType1)
     val genRecRDD1 = HoodieSparkUtils.createRdd(df1, "test_struct_name", 
"test_namespace", true,
       org.apache.hudi.common.util.Option.of(schema1))
-    assert(schema1.equals(genRecRDD1.collect()(0).getSchema))
+    assert(schema1.toAvroSchema.equals(genRecRDD1.collect()(0).getSchema))
 
     // create schema2 which has one addition column at the root level compared 
to schema1
     val structType2 = new StructType().add("key", "string", false)
       
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct1,true)
       .add("nullableInnerStruct2",innerStruct1,true)
-    val schema2 = 
AvroConversionUtils.convertStructTypeToAvroSchema(structType2, 
"test_struct_name", "test_namespace")
+    val schema2 = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType2, 
"test_struct_name", "test_namespace")
     val records2 = Seq(Row("key2", Row("innerKey2_1", 2L), Row("innerKey2_2", 
2L), Row("innerKey2_3", 2L)))
     val df2 = spark.createDataFrame(spark.sparkContext.parallelize(records2), 
structType2)
     val genRecRDD2 = HoodieSparkUtils.createRdd(df2, "test_struct_name", 
"test_namespace", true,
       org.apache.hudi.common.util.Option.of(schema2))
-    assert(schema2.equals(genRecRDD2.collect()(0).getSchema))
+    assert(schema2.toAvroSchema.equals(genRecRDD2.collect()(0).getSchema))
 
     // send records1 with schema2. should succeed since the new column is 
nullable.
     val genRecRDD3 = HoodieSparkUtils.createRdd(df1, "test_struct_name", 
"test_namespace", true,
       org.apache.hudi.common.util.Option.of(schema2))
-    assert(genRecRDD3.collect()(0).getSchema.equals(schema2))
+    assert(genRecRDD3.collect()(0).getSchema.equals(schema2.toAvroSchema))
     genRecRDD3.foreach(entry => assertNull(entry.get("nullableInnerStruct2")))
 
     val innerStruct3 = new 
StructType().add("innerKey","string",false).add("innerValue", "long", true)
@@ -159,17 +160,17 @@ class TestHoodieSparkUtils {
     val structType4 = new StructType().add("key", "string", false)
       
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct3,true)
 
-    val schema4 = 
AvroConversionUtils.convertStructTypeToAvroSchema(structType4, 
"test_struct_name", "test_namespace")
+    val schema4 = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType4, 
"test_struct_name", "test_namespace")
     val records4 = Seq(Row("key2", Row("innerKey2_1", 2L), Row("innerKey2_2", 
2L, "new_nested_col_val1")))
     val df4 = spark.createDataFrame(spark.sparkContext.parallelize(records4), 
structType4)
     val genRecRDD4 = HoodieSparkUtils.createRdd(df4, "test_struct_name", 
"test_namespace", true,
       org.apache.hudi.common.util.Option.of(schema4))
-    assert(schema4.equals(genRecRDD4.collect()(0).getSchema))
+    assert(schema4.toAvroSchema.equals(genRecRDD4.collect()(0).getSchema))
 
     // convert batch 1 with schema4. should succeed.
     val genRecRDD5 = HoodieSparkUtils.createRdd(df1, "test_struct_name", 
"test_namespace", true,
       org.apache.hudi.common.util.Option.of(schema4))
-    assert(schema4.equals(genRecRDD4.collect()(0).getSchema))
+    assert(schema4.toAvroSchema.equals(genRecRDD4.collect()(0).getSchema))
     val genRec = genRecRDD5.collect()(0)
     val nestedRec : GenericRecord = 
genRec.get("nullableInnerStruct").asInstanceOf[GenericRecord]
     assertNull(nestedRec.get("new_nested_col"))
@@ -182,7 +183,7 @@ class TestHoodieSparkUtils {
     val structType6 = new StructType().add("key", "string", false)
       
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct4,true)
 
-    val schema6 = 
AvroConversionUtils.convertStructTypeToAvroSchema(structType6, 
"test_struct_name", "test_namespace")
+    val schema6 = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType6, 
"test_struct_name", "test_namespace")
     // convert batch 1 with schema5. should fail since the missed out column 
is not nullable.
     try {
       val genRecRDD6 = HoodieSparkUtils.createRdd(df1, "test_struct_name", 
"test_namespace", true,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
index bb640cec019e..dfd97349a8ba 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
@@ -18,7 +18,7 @@
 
 package org.apache.spark.sql.execution.benchmark
 
-import org.apache.hudi.{AvroConversionUtils, HoodieSparkUtils}
+import org.apache.hudi.{AvroConversionUtils, HoodieSchemaConversionUtils, 
HoodieSparkUtils}
 
 import org.apache.spark.hudi.benchmark.{HoodieBenchmark, HoodieBenchmarkBase}
 import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -55,9 +55,9 @@ object AvroSerDerBenchmark extends HoodieBenchmarkBase {
     val benchmark = new HoodieBenchmark(s"perf avro serializer for hoodie", 
50000000)
     benchmark.addCase("serialize internalRow to avro Record") { _ =>
       val df = getDataFrame(50000000)
-      val avroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, "record", "my")
-      spark.sparkContext.getConf.registerAvroSchemas(avroSchema)
-      HoodieSparkUtils.createRdd(df,"record", "my", 
Some(avroSchema)).foreach(f => f)
+      val schema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(df.schema, 
"record", "my")
+      spark.sparkContext.getConf.registerAvroSchemas(schema.toAvroSchema)
+      HoodieSparkUtils.createRdd(df,"record", "my", Some(schema)).foreach(f => 
f)
     }
     benchmark.run()
   }
@@ -73,11 +73,11 @@ object AvroSerDerBenchmark extends HoodieBenchmarkBase {
     val benchmark = new HoodieBenchmark(s"perf avro deserializer for hoodie", 
10000000)
     val df = getDataFrame(10000000)
     val sparkSchema = df.schema
-    val avroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, "record", "my")
-    val testRdd = HoodieSparkUtils.createRdd(df,"record", "my", 
Some(avroSchema))
+    val schema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(df.schema, 
"record", "my")
+    val testRdd = HoodieSparkUtils.createRdd(df,"record", "my", Some(schema))
     testRdd.cache()
     testRdd.foreach(f => f)
-    spark.sparkContext.getConf.registerAvroSchemas(avroSchema)
+    spark.sparkContext.getConf.registerAvroSchemas(schema.toAvroSchema)
     benchmark.addCase("deserialize avro Record to internalRow") { _ =>
       testRdd.mapPartitions { iter =>
         val schema = 
AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, "record", "my")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/CreateHandleBenchmark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/CreateHandleBenchmark.scala
index 75e7bd02025f..2881ff145725 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/CreateHandleBenchmark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/CreateHandleBenchmark.scala
@@ -18,8 +18,7 @@
 
 package org.apache.spark.sql.execution.benchmark
 
-import org.apache.hudi.AvroConversionUtils
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.{AvroConversionUtils, HoodieSchemaConversionUtils, 
HoodieSparkUtils}
 import org.apache.hudi.client.WriteStatus
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.data.HoodieData
@@ -117,22 +116,22 @@ object CreateHandleBenchmark extends HoodieBenchmarkBase {
   private def createHandleBenchmark: Unit = {
     val benchmark = new HoodieBenchmark(s"perf create handle for hoodie", 
10000)
     val df = getDataFrame(100000)
-    val avroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, "record", "my")
-    spark.sparkContext.getConf.registerAvroSchemas(avroSchema)
+    val schema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(df.schema, 
"record", "my")
+    spark.sparkContext.getConf.registerAvroSchemas(schema.toAvroSchema)
 
     
df.write.format("hudi").option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
"key")
       .option(HoodieMetadataConfig.ENABLE.key(), "false")
       .option(HoodieTableConfig.NAME.key(), 
"tbl_name").mode(SaveMode.Overwrite).save("/tmp/sample_test_table")
     val dummpProps = new Properties()
     val avroRecords: java.util.List[HoodieRecord[_]] = 
HoodieSparkUtils.createRdd(df, "struct_name", "name_space",
-      Some(avroSchema)).mapPartitions(
+      Some(schema)).mapPartitions(
       it => {
         it.map { genRec =>
           val hoodieKey = new HoodieKey(genRec.get("key").toString, "")
           HoodieRecordUtils.createHoodieRecord(genRec, 0L, hoodieKey, 
classOf[DefaultHoodieRecordPayload].getName, false, null)
         }
       }).toJavaRDD().collect().stream().map[HoodieRecord[_]](hoodieRec => {
-      
hoodieRec.asInstanceOf[HoodieAvroIndexedRecord].toIndexedRecord(avroSchema, 
dummpProps)
+      
hoodieRec.asInstanceOf[HoodieAvroIndexedRecord].toIndexedRecord(schema.toAvroSchema,
 dummpProps)
       hoodieRec
     }).collect(Collectors.toList[HoodieRecord[_]])
 
@@ -140,7 +139,7 @@ object CreateHandleBenchmark extends HoodieBenchmarkBase {
       val props = new Properties()
       props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "key")
       val writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp/sample_test_table").withPreCombineField("col1")
-        .withSchema(avroSchema.toString)
+        .withSchema(schema.toString)
         .withMarkersType(MarkerType.DIRECT.name())
         
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
         .withProps(props).build()
@@ -151,7 +150,7 @@ object CreateHandleBenchmark extends HoodieBenchmarkBase {
       val createHandle = new HoodieCreateHandle(writeConfig, "000000001", 
hoodieTable, "", UUID.randomUUID().toString, new LocalTaskContextSupplier())
       avroRecords.forEach(record => {
         val newAvroRec = new HoodieAvroIndexedRecord(record.getKey, 
record.getData.asInstanceOf[IndexedRecord], 0L, record.getOperation)
-        createHandle.write(newAvroRec, 
HoodieSchema.fromAvroSchema(avroSchema), writeConfig.getProps)
+        createHandle.write(newAvroRec, schema, writeConfig.getProps)
       })
       createHandle.close()
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 6622190cf571..e330b821cc51 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.utilities;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -226,12 +226,12 @@ public class UtilHelpers {
 
   public static StructType getSourceSchema(SchemaProvider schemaProvider) {
     if (schemaProvider != null && schemaProvider.getSourceHoodieSchema() != 
null && schemaProvider.getSourceHoodieSchema() != InputBatch.NULL_SCHEMA) {
-      return 
AvroConversionUtils.convertAvroSchemaToStructType(schemaProvider.getSourceHoodieSchema().toAvroSchema());
+      return 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schemaProvider.getSourceHoodieSchema());
     }
     return null;
   }
 
-  public static Option<Transformer> createTransformer(Option<List<String>> 
classNamesOpt, Supplier<Option<Schema>> sourceSchemaSupplier,
+  public static Option<Transformer> createTransformer(Option<List<String>> 
classNamesOpt, Supplier<Option<HoodieSchema>> sourceSchemaSupplier,
                                                       boolean 
isErrorTableWriterEnabled) throws IOException {
 
     try {
@@ -535,7 +535,7 @@ public class UtilHelpers {
             structType = 
SparkAdapterSupport$.MODULE$.sparkAdapter().getSchemaUtils()
                 .getSchema(conn, rs, dialect, false, false);
           }
-          return 
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(structType,
 table, "hoodie." + table));
+          return 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, table, 
"hoodie." + table);
         }
       }
     } catch (HoodieException e) {
@@ -592,15 +592,15 @@ public class UtilHelpers {
     return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc, 
null);
   }
 
-  public static Option<Schema> getLatestTableSchema(JavaSparkContext jssc,
-                                                    HoodieStorage storage,
-                                                    String basePath,
-                                                    HoodieTableMetaClient 
tableMetaClient) {
+  public static Option<HoodieSchema> getLatestTableSchema(JavaSparkContext 
jssc,
+                                                          HoodieStorage 
storage,
+                                                          String basePath,
+                                                          
HoodieTableMetaClient tableMetaClient) {
     try {
       if (FSUtils.isTableExists(basePath, storage)) {
         TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(tableMetaClient);
 
-        return tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false);
+        return tableSchemaResolver.getTableSchemaFromLatestCommit(false);
       }
     } catch (Exception e) {
       LOG.warn("Failed to fetch latest table's schema", e);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaSet.java 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaSet.java
index fe9340ad2e7c..6f69fc8a6c37 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaSet.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaSet.java
@@ -18,7 +18,8 @@
 
 package org.apache.hudi.utilities.schema;
 
-import org.apache.avro.Schema;
+import org.apache.hudi.common.schema.HoodieSchema;
+
 import org.apache.avro.SchemaNormalization;
 
 import java.io.Serializable;
@@ -32,13 +33,13 @@ public class SchemaSet implements Serializable {
 
   private final Set<Long> processedSchema = new HashSet<>();
 
-  public boolean isSchemaPresent(Schema schema) {
-    long schemaKey = SchemaNormalization.parsingFingerprint64(schema);
+  public boolean isSchemaPresent(HoodieSchema schema) {
+    long schemaKey = 
SchemaNormalization.parsingFingerprint64(schema.toAvroSchema());
     return processedSchema.contains(schemaKey);
   }
 
-  public void addSchema(Schema schema) {
-    long schemaKey = SchemaNormalization.parsingFingerprint64(schema);
+  public void addSchema(HoodieSchema schema) {
+    long schemaKey = 
SchemaNormalization.parsingFingerprint64(schema.toAvroSchema());
     processedSchema.add(schemaKey);
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index accbd55a8a83..2e08a078daf0 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -18,10 +18,12 @@
 
 package org.apache.hudi.utilities.sources.helpers;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
@@ -35,7 +37,6 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.InputBatch;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -290,10 +291,9 @@ public class CloudObjectsSelectorCommon {
     if (schemaProviderOption.isPresent()) {
       HoodieSchema sourceSchema = 
schemaProviderOption.get().getSourceHoodieSchema();
       if (sourceSchema != null && 
!sourceSchema.equals(InputBatch.NULL_SCHEMA)) {
-        Schema sourceAvroSchema = sourceSchema.toAvroSchema();
-        rowSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(sourceAvroSchema);
+        rowSchema = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(sourceSchema);
         if (isCoalesceRequired(properties, sourceSchema)) {
-          reader = reader.schema(addAliasesToRowSchema(sourceAvroSchema, 
rowSchema));
+          reader = reader.schema(addAliasesToRowSchema(sourceSchema, 
rowSchema));
         } else {
           reader = reader.schema(rowSchema);
         }
@@ -331,7 +331,7 @@ public class CloudObjectsSelectorCommon {
     if (schemaProviderOption.isPresent()) {
       HoodieSchema sourceSchema = 
schemaProviderOption.get().getSourceHoodieSchema();
       if (isCoalesceRequired(properties, sourceSchema)) {
-        dataset = spark.createDataFrame(coalesceAliasFields(dataset, 
sourceSchema.toAvroSchema()).rdd(), rowSchema);
+        dataset = spark.createDataFrame(coalesceAliasFields(dataset, 
sourceSchema).rdd(), rowSchema);
       }
     }
 
@@ -352,19 +352,19 @@ public class CloudObjectsSelectorCommon {
   private static boolean isCoalesceRequired(TypedProperties properties, 
HoodieSchema sourceSchema) {
     return getBooleanWithAltKeys(properties, 
CloudSourceConfig.SPARK_DATASOURCE_READER_COALESCE_ALIAS_COLUMNS)
         && Objects.nonNull(sourceSchema)
-        && hasFieldWithAliases(sourceSchema.toAvroSchema());
+        && hasFieldWithAliases(sourceSchema);
   }
 
   /**
-   * Recursively checks if an Avro schema or any of its nested fields contain 
aliases.
+   * Recursively checks if a schema or any of its nested fields contain 
aliases.
    *
-   * @param schema The Avro schema to check.
+   * @param schema The schema to check.
    * @return True if the schema or any of its fields contain aliases, false 
otherwise.
    */
-  private static boolean hasFieldWithAliases(Schema schema) {
+  private static boolean hasFieldWithAliases(HoodieSchema schema) {
     // If the schema is a record, check its fields recursively
     if (isNestedRecord(schema)) {
-      for (Schema.Field field : getRecordFields(schema)) {
+      for (HoodieSchemaField field : getRecordFields(schema)) {
         // Check if the field has aliases
         if (!field.aliases().isEmpty()) {
           return true;
@@ -379,27 +379,27 @@ public class CloudObjectsSelectorCommon {
     return false;
   }
 
-  private static StructType addAliasesToRowSchema(Schema avroSchema, 
StructType rowSchema) {
+  private static StructType addAliasesToRowSchema(HoodieSchema schema, 
StructType rowSchema) {
     Map<String, StructField> rowFieldsMap = Arrays.stream(rowSchema.fields())
         .collect(Collectors.toMap(StructField::name, Function.identity()));
 
-    StructField[] modifiedFields = getRecordFields(avroSchema).stream()
-        .flatMap(avroField -> generateRowFieldsWithAliases(avroField, 
rowFieldsMap.get(avroField.name())).stream())
+    StructField[] modifiedFields = getRecordFields(schema).stream()
+        .flatMap(field -> generateRowFieldsWithAliases(field, 
rowFieldsMap.get(field.name())).stream())
         .toArray(StructField[]::new);
 
     return new StructType(modifiedFields);
   }
 
-  private static List<Schema.Field> getRecordFields(Schema schema) {
-    if (schema.getType() == Schema.Type.RECORD) {
+  private static List<HoodieSchemaField> getRecordFields(HoodieSchema schema) {
+    if (schema.getType() == HoodieSchemaType.RECORD) {
       return schema.getFields();
     }
 
-    if (schema.getType() == Schema.Type.UNION) {
+    if (schema.getType() == HoodieSchemaType.UNION) {
       return schema.getTypes().stream()
-          .filter(subSchema -> subSchema.getType() == Schema.Type.RECORD)
+          .filter(subSchema -> subSchema.getType() == HoodieSchemaType.RECORD)
           .findFirst()
-          .map(Schema::getFields)
+          .map(HoodieSchema::getFields)
           .orElse(Collections.emptyList());
     }
 
@@ -407,35 +407,35 @@ public class CloudObjectsSelectorCommon {
   }
 
   /**
-   * Generates a list of StructFields with aliases applied based on the 
provided Avro field schema.
+   * Generates a list of StructFields with aliases applied based on the 
provided field's schema.
    * <p>
-   * This method processes a given Avro field and its corresponding Spark SQL 
StructField, handling
-   * nested records and aliases. If the Avro field contains nested records, 
the method recursively
-   * updates the schema for these records and applies any aliases defined in 
the Avro schema.
-   * If the Avro field has aliases, they are added as new fields with nullable 
set to true and
+   * This method processes a given field and its corresponding Spark SQL 
StructField, handling
+   * nested records and aliases. If the field contains nested records, the 
method recursively
+   * updates the schema for these records and applies any aliases defined in 
the schema.
+   * If the field has aliases, they are added as new fields with nullable set 
to true and
    * appropriate metadata in the returned list. If no aliases or nesting are 
present, the original
    * StructField is returned unchanged.
    *
-   * @param avroField The Avro field schema to process.
-   * @param rowField  The corresponding Spark SQL StructField to map the Avro 
field to.
-   * @return A list of StructFields with aliases applied as per the Avro 
schema.
+   * @param field The field from the schema to process.
+   * @param rowField  The corresponding Spark SQL StructField to map the field 
to.
+   * @return A list of StructFields with aliases applied as per the provided 
schema.
    */
-  private static List<StructField> generateRowFieldsWithAliases(Schema.Field 
avroField, StructField rowField) {
+  private static List<StructField> 
generateRowFieldsWithAliases(HoodieSchemaField field, StructField rowField) {
     List<StructField> fieldList = new ArrayList<>();
 
     // Handle nested records
-    if (isNestedRecord(avroField.schema())) {
-      StructType updatedSchema = addAliasesToRowSchema(avroField.schema(), 
(StructType) rowField.dataType());
+    if (isNestedRecord(field.schema())) {
+      StructType updatedSchema = addAliasesToRowSchema(field.schema(), 
(StructType) rowField.dataType());
 
-      if (schemaModifiedOrHasAliases(avroField, updatedSchema, rowField)) {
+      if (schemaModifiedOrHasAliases(field, updatedSchema, rowField)) {
         // Add the original field with the updated schema and add aliases if 
present
-        addFieldWithAliases(fieldList, avroField.name(), updatedSchema, 
rowField.metadata(), avroField.aliases());
+        addFieldWithAliases(fieldList, field.name(), updatedSchema, 
rowField.metadata(), field.aliases());
       } else {
         fieldList.add(rowField);
       }
-    } else if (!avroField.aliases().isEmpty()) {
+    } else if (!field.aliases().isEmpty()) {
       // If the field has aliases, add them to the schema
-      addFieldWithAliases(fieldList, avroField.name(), rowField.dataType(), 
rowField.metadata(), avroField.aliases());
+      addFieldWithAliases(fieldList, field.name(), rowField.dataType(), 
rowField.metadata(), field.aliases());
     } else {
       // No aliases or nesting, return the original field
       fieldList.add(rowField);
@@ -448,22 +448,22 @@ public class CloudObjectsSelectorCommon {
     aliases.forEach(alias -> fieldList.add(new StructField(alias, dataType, 
true, metadata)));
   }
 
-  private static Dataset<Row> coalesceAliasFields(Dataset<Row> dataset, Schema 
sourceSchema) {
+  private static Dataset<Row> coalesceAliasFields(Dataset<Row> dataset, 
HoodieSchema sourceSchema) {
     return coalesceNestedAliases(coalesceTopLevelAliases(dataset, 
sourceSchema), sourceSchema);
   }
 
   /**
    * Merges top-level fields with their aliases in the dataset.
    * <p>
-   * This method goes through the top-level fields in the Avro schema, and for 
any field that has aliases,
+   * This method goes through the top-level fields in the schema, and for any 
field that has aliases,
    * it combines them in the dataset using a coalesce operation. This ensures 
that if a field is null,
    * the value from its alias is used instead.
    *
    * @param dataset      The dataset to process.
-   * @param sourceSchema The Avro schema defining the fields and their aliases.
+   * @param sourceSchema The schema defining the fields and their aliases.
    * @return A dataset with fields merged with their aliases.
    */
-  private static Dataset<Row> coalesceTopLevelAliases(Dataset<Row> dataset, 
Schema sourceSchema) {
+  private static Dataset<Row> coalesceTopLevelAliases(Dataset<Row> dataset, 
HoodieSchema sourceSchema) {
     return getRecordFields(sourceSchema).stream()
         .filter(field -> !field.aliases().isEmpty())
         .reduce(dataset,
@@ -482,17 +482,17 @@ public class CloudObjectsSelectorCommon {
   /**
    * Merges nested fields with their aliases in the dataset.
    * <p>
-   * This method iterates through the fields of the provided Avro schema and 
checks if they represent
+   * This method iterates through the fields of the provided schema and checks 
if they represent
    * nested records. For each nested record, it verifies if there are any 
alias fields present. If
    * aliases are found, the method generates a list of nested fields, 
coalescing them with their aliases,
    * and creates a new column in the dataset with the merged data.
    *
    * @param dataset      The dataset to process.
-   * @param sourceSchema The Avro schema defining the structure and aliases of 
the data.
+   * @param sourceSchema The schema defining the structure and aliases of the 
data.
    * @return A dataset with nested fields merged with their aliases.
    */
-  private static Dataset<Row> coalesceNestedAliases(Dataset<Row> dataset, 
Schema sourceSchema) {
-    for (Schema.Field field : getRecordFields(sourceSchema)) {
+  private static Dataset<Row> coalesceNestedAliases(Dataset<Row> dataset, 
HoodieSchema sourceSchema) {
+    for (HoodieSchemaField field : getRecordFields(sourceSchema)) {
       // check if this is a nested record and contains an alias field within
       if (isNestedRecord(field.schema()) && 
hasFieldWithAliases(field.schema())) {
         dataset = dataset.withColumn(field.name(), 
functions.struct(getNestedFields("", field, dataset)));
@@ -501,32 +501,32 @@ public class CloudObjectsSelectorCommon {
     return dataset;
   }
 
-  private static Column[] getNestedFields(String parentField, Schema.Field 
field, Dataset<Row> dataset) {
+  private static Column[] getNestedFields(String parentField, 
HoodieSchemaField field, Dataset<Row> dataset) {
     return getRecordFields(field.schema()).stream()
-        .map(avroField -> {
+        .map(schemaField -> {
           List<Column> columns = new ArrayList<>();
           String newParentField = getFullName(parentField, field.name());
-          if (isNestedRecord(avroField.schema())) {
+          if (isNestedRecord(schemaField.schema())) {
             // if field is nested, recursively fetch nested column
-            columns.add(functions.struct(getNestedFields(newParentField, 
avroField, dataset)));
+            columns.add(functions.struct(getNestedFields(newParentField, 
schemaField, dataset)));
           } else {
-            columns.add(dataset.col(getFullName(newParentField, 
avroField.name())));
+            columns.add(dataset.col(getFullName(newParentField, 
schemaField.name())));
           }
-          avroField.aliases().forEach(alias -> 
columns.add(dataset.col(getFullName(newParentField, alias))));
-          // if avro field contains aliases, coalesce the column with others 
matching the aliases otherwise return actual column
-          return avroField.aliases().isEmpty() ? columns.get(0)
-              : functions.coalesce(columns.toArray(new 
Column[0])).alias(avroField.name());
+          schemaField.aliases().forEach(alias -> 
columns.add(dataset.col(getFullName(newParentField, alias))));
+          // if field contains aliases, coalesce the column with others 
matching the aliases otherwise return actual column
+          return schemaField.aliases().isEmpty() ? columns.get(0)
+              : functions.coalesce(columns.toArray(new 
Column[0])).alias(schemaField.name());
         }).toArray(Column[]::new);
   }
 
-  private static boolean isNestedRecord(Schema schema) {
-    if (schema.getType() == Schema.Type.RECORD) {
+  private static boolean isNestedRecord(HoodieSchema schema) {
+    if (schema.getType() == HoodieSchemaType.RECORD) {
       return true;
     }
 
-    if (schema.getType() == Schema.Type.UNION) {
+    if (schema.getType() == HoodieSchemaType.UNION) {
       return schema.getTypes().stream()
-          .anyMatch(subSchema -> subSchema.getType() == Schema.Type.RECORD);
+          .anyMatch(subSchema -> subSchema.getType() == 
HoodieSchemaType.RECORD);
     }
 
     return false;
@@ -536,8 +536,8 @@ public class CloudObjectsSelectorCommon {
     return namespace.isEmpty() ? fieldName : namespace + "." + fieldName;
   }
 
-  private static boolean schemaModifiedOrHasAliases(Schema.Field avroField, 
StructType modifiedNestedSchema, StructField rowField) {
-    return !modifiedNestedSchema.equals(rowField.dataType()) || 
!avroField.aliases().isEmpty();
+  private static boolean schemaModifiedOrHasAliases(HoodieSchemaField field, 
StructType modifiedNestedSchema, StructField rowField) {
+    return !modifiedNestedSchema.equals(rowField.dataType()) || 
!field.aliases().isEmpty();
   }
 
   private static Option<String> getPropVal(TypedProperties props, 
ConfigProperty<String> configProperty) {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index 236f8fb1cda8..12e620156c49 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -19,11 +19,10 @@
 
 package org.apache.hudi.utilities.streamer;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.avro.AvroRecordContext;
 import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -31,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieSparkRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.read.DeleteContext;
 import org.apache.hudi.common.util.ConfigUtils;
@@ -104,8 +104,8 @@ public class HoodieStreamerUtils {
     boolean requiresPayload = isChangingRecords(cfg.operation) && 
!HoodieWriteConfig.isFileGroupReaderBasedMergeHandle(props);
 
     return avroRDDOptional.map(avroRDD -> {
-      SerializableSchema avroSchema = new 
SerializableSchema(schemaProvider.getTargetHoodieSchema().toAvroSchema());
-      SerializableSchema processedAvroSchema = new 
SerializableSchema(isDropPartitionColumns(props) ? 
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
+      HoodieSchema targetSchema = schemaProvider.getTargetHoodieSchema();
+      HoodieSchema processedSchema = isDropPartitionColumns(props) ? 
HoodieSchemaUtils.removeMetadataFields(targetSchema) : targetSchema;
       JavaRDD<Either<HoodieRecord,String>> records;
       if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
         records = avroRDD.mapPartitions(
@@ -119,8 +119,7 @@ public class HoodieStreamerUtils {
                 
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
               }
               BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
-              HoodieSchema processedHoodieSchema = 
HoodieSchema.fromAvroSchema(processedAvroSchema.get());
-              DeleteContext deleteContext = new DeleteContext(props, 
processedHoodieSchema).withReaderSchema(processedHoodieSchema);
+              DeleteContext deleteContext = new DeleteContext(props, 
processedSchema).withReaderSchema(processedSchema);
               return new 
CloseableMappingIterator<>(ClosableIterator.wrap(genericRecordIterator), genRec 
-> {
                 try {
                   if (shouldErrorTable) {
@@ -154,10 +153,10 @@ public class HoodieStreamerUtils {
             props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, 
instantTime);
           }
           BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
-          StructType baseStructType = 
AvroConversionUtils.convertAvroSchemaToStructType(processedAvroSchema.get());
-          StructType targetStructType = isDropPartitionColumns(props) ? 
AvroConversionUtils
-              
.convertAvroSchemaToStructType(HoodieAvroUtils.removeFields(processedAvroSchema.get(),
 partitionColumns)) : baseStructType;
-          HoodieAvroDeserializer deserializer = 
SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(HoodieSchema.fromAvroSchema(processedAvroSchema.get()),
 baseStructType);
+          StructType baseStructType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(processedSchema);
+          StructType targetStructType = isDropPartitionColumns(props) ? 
HoodieSchemaConversionUtils
+              
.convertHoodieSchemaToStructType(HoodieSchemaUtils.removeFields(processedSchema,
 partitionColumns)) : baseStructType;
+          HoodieAvroDeserializer deserializer = 
SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(processedSchema,
 baseStructType);
 
           return new CloseableMappingIterator<>(ClosableIterator.wrap(itr), 
rec -> {
             InternalRow row = (InternalRow) 
deserializer.deserialize(rec).get();
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
index d23e91b18a1d..bffe46f637fd 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
@@ -20,11 +20,12 @@
 package org.apache.hudi.utilities.streamer;
 
 import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.MercifulJsonConverter;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
@@ -42,7 +43,6 @@ import org.apache.hudi.utilities.sources.helpers.RowConverter;
 import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
 
 import com.google.protobuf.Message;
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Column;
@@ -206,7 +206,7 @@ public class SourceFormatAdapter implements Closeable {
                     // pass in the schema for the Row-to-Avro conversion
                     // to avoid nullability mismatch between Avro schema and 
Row schema
                     ? HoodieSparkUtils.createRdd(rdd, 
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, true,
-                    
Option.ofNullable(r.getSchemaProvider().getSourceHoodieSchema().toAvroSchema())
+                    
Option.ofNullable(r.getSchemaProvider().getSourceHoodieSchema())
                 ).toJavaRDD() : HoodieSparkUtils.createRdd(rdd,
                     HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, false, 
Option.empty()).toJavaRDD();
             })
@@ -255,12 +255,12 @@ public class SourceFormatAdapter implements Closeable {
       }
       case JSON: {
         InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) 
source).fetchNext(lastCheckpoint, sourceLimit);
-        Schema sourceSchema = 
r.getSchemaProvider().getSourceHoodieSchema().toAvroSchema();
+        HoodieSchema sourceSchema = 
r.getSchemaProvider().getSourceHoodieSchema();
         // Decimal fields need additional decoding from json generated by 
kafka-connect. JSON -> ROW conversion is done through
         // a spark library that has not implemented this decoding
         if (isFieldNameSanitizingEnabled()
-            || (HoodieAvroUtils.hasDecimalField(sourceSchema) && source 
instanceof KafkaSource)) {
-          StructType dataType = 
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
+            || (HoodieSchemaUtils.hasDecimalField(sourceSchema) && source 
instanceof KafkaSource)) {
+          StructType dataType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(sourceSchema);
           JavaRDD<Row> rowRDD = transformJsonToRowRdd(r);
           if (rowRDD != null) {
             Dataset<Row> rowDataset = 
source.getSparkSession().createDataFrame(rowRDD, dataType);
@@ -274,7 +274,7 @@ public class SourceFormatAdapter implements Closeable {
           // if error table writer is enabled, during spark read 
`columnNameOfCorruptRecord` option is configured.
           // Any records which spark is unable to read successfully are 
transferred to the column
           // configured via this option. The column is then used to trigger 
error events.
-          StructType dataType = 
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema)
+          StructType dataType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(sourceSchema)
               .add(new StructField(ERROR_TABLE_CURRUPT_RECORD_COL_NAME, 
DataTypes.StringType, true, Metadata.empty()));
           StructType nullableStruct = dataType.asNullable();
           Option<Dataset<Row>> dataset = r.getBatch().map(rdd -> 
source.getSparkSession().read()
@@ -288,7 +288,7 @@ public class SourceFormatAdapter implements Closeable {
               eventsDataset,
               r.getCheckpointForNextBatch(), r.getSchemaProvider());
         } else {
-          StructType dataType = 
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
+          StructType dataType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(sourceSchema);
           return new InputBatch<>(
               Option.ofNullable(
                   r.getBatch().map(rdd -> 
HoodieSparkUtils.maybeWrapDataFrameWithException(source.getSparkSession().read().schema(dataType).json(rdd),
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index dc5513d1fdab..2995081d82d3 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -19,15 +19,13 @@
 
 package org.apache.hudi.utilities.streamer;
 
-import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.HoodieConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.HoodieSchemaUtils;
 import org.apache.hudi.HoodieSparkSqlWriter;
 import org.apache.hudi.HoodieSparkUtils;
-import org.apache.hudi.avro.AvroSchemaUtils;
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.callback.common.WriteStatusValidator;
 import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDWriteClient;
@@ -51,6 +49,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.model.debezium.DebeziumConstants;
 import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCompatibility;
 import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -118,7 +117,6 @@ import org.apache.hudi.utilities.transform.Transformer;
 
 import com.codahale.metrics.Timer;
 import org.apache.avro.Schema;
-import org.apache.avro.SchemaCompatibility;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -148,6 +146,7 @@ import java.util.stream.Collectors;
 import scala.Tuple2;
 
 import static 
org.apache.hudi.DataSourceUtils.createUserDefinedBulkInsertPartitioner;
+import static 
org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordQualifiedName;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_HISTORY_PATH;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING;
@@ -339,7 +338,7 @@ public class StreamSync implements Serializable, Closeable {
     Source source = UtilHelpers.createSource(cfg.sourceClassName, props, 
hoodieSparkContext.jsc(), sparkSession, metrics, streamContext);
     this.formatAdapter = new SourceFormatAdapter(source, 
this.errorTableWriter, Option.of(props));
 
-    Supplier<Option<Schema>> schemaSupplier = schemaProvider == null ? 
Option::empty : () -> 
Option.ofNullable(schemaProvider.getSourceHoodieSchema()).map(HoodieSchema::toAvroSchema);
+    Supplier<Option<HoodieSchema>> schemaSupplier = schemaProvider == null ? 
Option::empty : () -> Option.ofNullable(schemaProvider.getSourceHoodieSchema());
     this.transformer = 
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames), 
schemaSupplier, this.errorTableWriter.isPresent());
   }
 
@@ -541,18 +540,18 @@ public class StreamSync implements Serializable, 
Closeable {
     } else {
       HoodieSchema newSourceSchema = 
inputBatch.getSchemaProvider().getSourceHoodieSchema();
       HoodieSchema newTargetSchema = 
inputBatch.getSchemaProvider().getTargetHoodieSchema();
-      if ((newSourceSchema != null && 
!processedSchema.isSchemaPresent(newSourceSchema.toAvroSchema()))
-          || (newTargetSchema != null && 
!processedSchema.isSchemaPresent(newTargetSchema.toAvroSchema()))) {
+      if ((newSourceSchema != null && 
!processedSchema.isSchemaPresent(newSourceSchema))
+          || (newTargetSchema != null && 
!processedSchema.isSchemaPresent(newTargetSchema))) {
         String sourceStr = newSourceSchema == null ? NULL_PLACEHOLDER : 
newSourceSchema.toString(true);
         String targetStr = newTargetSchema == null ? NULL_PLACEHOLDER : 
newTargetSchema.toString(true);
         LOG.info("Seeing new schema. Source: {}, Target: {}", sourceStr, 
targetStr);
         // We need to recreate write client with new schema and register them.
         reInitWriteClient(newSourceSchema, newTargetSchema, 
inputBatch.getBatch(), metaClient);
         if (newSourceSchema != null) {
-          processedSchema.addSchema(newSourceSchema.toAvroSchema());
+          processedSchema.addSchema(newSourceSchema);
         }
         if (newTargetSchema != null) {
-          processedSchema.addSchema(newTargetSchema.toAvroSchema());
+          processedSchema.addSchema(newTargetSchema);
         }
       }
     }
@@ -714,7 +713,7 @@ public class StreamSync implements Serializable, Closeable {
                 rowDataset -> {
                   Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs = 
HoodieSparkUtils.safeCreateRDD(rowDataset,
                       HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
-                      
Option.of(finalSchemaProvider.getTargetHoodieSchema().toAvroSchema()));
+                      Option.of(finalSchemaProvider.getTargetHoodieSchema()));
                   
errorTableWriter.get().addErrorEvents(safeCreateRDDs._2().toJavaRDD()
                       .map(evStr -> new ErrorEvent<>(evStr,
                           
ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE)));
@@ -722,14 +721,14 @@ public class StreamSync implements Serializable, 
Closeable {
                 });
           } else {
             avroRDDOptional = transformed.map(
-                rowDataset -> getTransformedRDD(rowDataset, reconcileSchema, 
finalSchemaProvider.getTargetHoodieSchema().toAvroSchema()));
+                rowDataset -> getTransformedRDD(rowDataset, reconcileSchema, 
finalSchemaProvider.getTargetHoodieSchema()));
           }
         }
       } else {
         // Deduce proper target (writer's) schema for the input dataset, 
reconciling its
         // schema w/ the table's one
         HoodieSchema incomingSchema = transformed.map(df ->
-                
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
 AvroSchemaUtils.getAvroRecordQualifiedName(cfg.targetTableName))))
+                
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(df.schema(), 
getRecordQualifiedName(cfg.targetTableName)))
             
.orElseGet(dataAndCheckpoint.getSchemaProvider()::getTargetHoodieSchema);
         schemaProvider = getDeducedSchemaProvider(incomingSchema, 
dataAndCheckpoint.getSchemaProvider(), metaClient);
 
@@ -738,7 +737,7 @@ public class StreamSync implements Serializable, Closeable {
         } else {
           // Rewrite transformed records into the expected target schema
           SchemaProvider finalSchemaProvider = schemaProvider;
-          avroRDDOptional = transformed.map(t -> getTransformedRDD(t, 
reconcileSchema, finalSchemaProvider.getTargetHoodieSchema().toAvroSchema()));
+          avroRDDOptional = transformed.map(t -> getTransformedRDD(t, 
reconcileSchema, finalSchemaProvider.getTargetHoodieSchema()));
         }
       }
     } else {
@@ -792,7 +791,7 @@ public class StreamSync implements Serializable, Closeable {
    */
   @VisibleForTesting
   SchemaProvider getDeducedSchemaProvider(HoodieSchema incomingSchema, 
SchemaProvider sourceSchemaProvider, HoodieTableMetaClient metaClient) {
-    Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), storage, 
cfg.targetBasePath, metaClient);
+    Option<HoodieSchema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), storage, 
cfg.targetBasePath, metaClient);
     Option<InternalSchema> internalSchemaOpt = 
HoodieConversionUtils.toJavaOption(
         HoodieSchemaUtils.getLatestTableInternalSchema(
             HoodieStreamer.Config.getProps(conf, cfg), metaClient));
@@ -801,7 +800,7 @@ public class StreamSync implements Serializable, Closeable {
 
     HoodieSchema targetSchema = HoodieSchemaUtils.deduceWriterSchema(
             incomingSchema == null ? 
HoodieSchema.create(HoodieSchemaType.NULL) : 
org.apache.hudi.common.schema.HoodieSchemaUtils.removeMetadataFields(incomingSchema),
-            latestTableSchemaOpt.map(HoodieSchema::fromAvroSchema),
+            latestTableSchemaOpt,
             internalSchemaOpt,
             props);
 
@@ -810,7 +809,7 @@ public class StreamSync implements Serializable, Closeable {
                 new SimpleSchemaProvider(hoodieSparkContext.jsc(), 
targetSchema, props));
   }
 
-  private JavaRDD<GenericRecord> getTransformedRDD(Dataset<Row> rowDataset, 
boolean reconcileSchema, Schema readerSchema) {
+  private JavaRDD<GenericRecord> getTransformedRDD(Dataset<Row> rowDataset, 
boolean reconcileSchema, HoodieSchema readerSchema) {
     return HoodieSparkUtils.createRdd(rowDataset, HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE, reconcileSchema,
         Option.ofNullable(readerSchema)).toJavaRDD();
   }
@@ -1089,7 +1088,7 @@ public class StreamSync implements Serializable, 
Closeable {
   private void reInitWriteClient(HoodieSchema sourceSchema, HoodieSchema 
targetSchema, Option<JavaRDD<HoodieRecord>> recordsOpt, HoodieTableMetaClient 
metaClient) throws IOException {
     LOG.info("Setting up new Hoodie Write Client");
     if (HoodieStreamerUtils.isDropPartitionColumns(props)) {
-      targetSchema = 
HoodieSchema.fromAvroSchema(HoodieAvroUtils.removeFields(targetSchema.toAvroSchema(),
 HoodieStreamerUtils.getPartitionColumns(props)));
+      targetSchema = 
org.apache.hudi.common.schema.HoodieSchemaUtils.removeFields(targetSchema, 
HoodieStreamerUtils.getPartitionColumns(props));
     }
     final Pair<HoodieWriteConfig, HoodieSchema> initialWriteConfigAndSchema = 
getHoodieClientConfigAndWriterSchema(targetSchema, true, metaClient);
     final HoodieWriteConfig initialWriteConfig = 
initialWriteConfigAndSchema.getLeft();
@@ -1223,16 +1222,15 @@ public class StreamSync implements Serializable, 
Closeable {
     HoodieSchema newWriteSchema = targetSchema;
     try {
       // check if targetSchema is equal to NULL schema
-      Schema nullSchema = InputBatch.NULL_SCHEMA.toAvroSchema();
-      if (targetSchema == null || 
(SchemaCompatibility.checkReaderWriterCompatibility(targetSchema.toAvroSchema(),
 nullSchema).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE
-          && SchemaCompatibility.checkReaderWriterCompatibility(nullSchema, 
targetSchema.toAvroSchema()).getType() == 
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE)) {
+      HoodieSchema nullSchema = InputBatch.NULL_SCHEMA;
+      if (targetSchema == null || 
(HoodieSchemaCompatibility.areSchemasCompatible(targetSchema, nullSchema) && 
HoodieSchemaCompatibility.areSchemasCompatible(nullSchema, targetSchema))) {
         // target schema is null. fetch schema from commit metadata and use it
         int totalCompleted = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
         if (totalCompleted > 0) {
           TableSchemaResolver schemaResolver = new 
TableSchemaResolver(metaClient);
-          Option<Schema> tableSchema = 
schemaResolver.getTableAvroSchemaIfPresent(false);
+          Option<HoodieSchema> tableSchema = 
schemaResolver.getTableSchemaIfPresent(false);
           if (tableSchema.isPresent()) {
-            newWriteSchema = HoodieSchema.fromAvroSchema(tableSchema.get());
+            newWriteSchema = tableSchema.get();
           } else {
             LOG.warn("Could not fetch schema from table. Falling back to using 
target schema from schema provider");
           }
@@ -1244,17 +1242,6 @@ public class StreamSync implements Serializable, 
Closeable {
     }
   }
 
-  /**
-   * Register Avro Schemas.
-   *
-   * @param schemaProvider Schema Provider
-   */
-  private void registerAvroSchemas(SchemaProvider schemaProvider) {
-    if (null != schemaProvider) {
-      registerAvroSchemas(schemaProvider.getSourceHoodieSchema(), 
schemaProvider.getTargetHoodieSchema());
-    }
-  }
-
   /**
    * Register Avro Schemas.
    *
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
index af8b2c5fa2d7..726770e5ad4a 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
@@ -18,15 +18,15 @@
 
 package org.apache.hudi.utilities.transform;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.utilities.exception.HoodieTransformPlanException;
 import org.apache.hudi.utilities.streamer.HoodieStreamer;
 
-import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -52,7 +52,7 @@ public class ChainedTransformer implements Transformer {
   private static final String ID_TRANSFORMER_CLASS_NAME_DELIMITER = ":";
 
   protected final List<TransformerInfo> transformers;
-  private final Supplier<Option<Schema>> sourceSchemaSupplier;
+  private final Supplier<Option<HoodieSchema>> sourceSchemaSupplier;
 
   public ChainedTransformer(List<Transformer> transformersList) {
     this.transformers = new ArrayList<>(transformersList.size());
@@ -69,7 +69,7 @@ public class ChainedTransformer implements Transformer {
    * @param sourceSchemaSupplier              Supplies the schema (if schema 
provider is present) for the dataset the transform is applied to
    * @param configuredTransformers            List of configured transformer 
class names.
    */
-  public ChainedTransformer(List<String> configuredTransformers, 
Supplier<Option<Schema>> sourceSchemaSupplier) {
+  public ChainedTransformer(List<String> configuredTransformers, 
Supplier<Option<HoodieSchema>> sourceSchemaSupplier) {
     this.transformers = new ArrayList<>(configuredTransformers.size());
     this.sourceSchemaSupplier = sourceSchemaSupplier;
 
@@ -121,12 +121,12 @@ public class ChainedTransformer implements Transformer {
 
   private StructType getExpectedTransformedSchema(TransformerInfo 
transformerInfo, JavaSparkContext jsc, SparkSession sparkSession,
                                                   Option<StructType> 
incomingStructOpt, Option<Dataset<Row>> rowDatasetOpt, TypedProperties 
properties) {
-    Option<Schema> sourceSchemaOpt = sourceSchemaSupplier.get();
+    Option<HoodieSchema> sourceSchemaOpt = sourceSchemaSupplier.get();
     if (!sourceSchemaOpt.isPresent() && !rowDatasetOpt.isPresent()) {
       throw new HoodieTransformPlanException("Either source schema or source 
dataset should be available to fetch the schema");
     }
     StructType incomingStruct = incomingStructOpt
-        .orElseGet(() -> sourceSchemaOpt.isPresent() ? 
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchemaOpt.get()) : 
rowDatasetOpt.get().schema());
+        .orElseGet(() -> sourceSchemaOpt.isPresent() ? 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(sourceSchemaOpt.get())
 : rowDatasetOpt.get().schema());
     return transformerInfo.getTransformer().transformedSchema(jsc, 
sparkSession, incomingStruct, properties).asNullable();
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java
index 4d18ea9f11ba..a7d2979e90f0 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java
@@ -20,10 +20,10 @@
 package org.apache.hudi.utilities.transform;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.utilities.streamer.ErrorTableUtils;
 
-import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -39,7 +39,7 @@ import java.util.function.Supplier;
  * if that column is not dropped in any of the transformations.
  */
 public class ErrorTableAwareChainedTransformer extends ChainedTransformer {
-  public ErrorTableAwareChainedTransformer(List<String> 
configuredTransformers, Supplier<Option<Schema>> sourceSchemaSupplier) {
+  public ErrorTableAwareChainedTransformer(List<String> 
configuredTransformers, Supplier<Option<HoodieSchema>> sourceSchemaSupplier) {
     super(configuredTransformers, sourceSchemaSupplier);
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index 9d92cb84ed47..60519d333de2 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -19,8 +19,8 @@
 
 package org.apache.hudi.utilities.deltastreamer;
 
-import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.TestHoodieSparkUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
@@ -226,11 +226,11 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase 
extends HoodieDeltaStrea
 
   protected void addData(Dataset<Row> df, Boolean isFirst) {
     if (useSchemaProvider) {
-      TestSchemaProvider.sourceSchema = HoodieSchema.fromAvroSchema(
-          AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(), 
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE));
+      TestSchemaProvider.sourceSchema =
+          
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(df.schema(), 
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE);
       if (withErrorTable && isFirst) {
-        TestSchemaProvider.setTargetSchema(HoodieSchema.fromAvroSchema(
-            
AvroConversionUtils.convertStructTypeToAvroSchema(TestHoodieSparkUtils.getSchemaColumnNotNullable(df.schema(),
 "_row_key"),"idk", "idk")));
+        TestSchemaProvider.setTargetSchema(
+            
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(TestHoodieSparkUtils.getSchemaColumnNotNullable(df.schema(),
 "_row_key"),"idk", "idk"));
       }
     }
     if (useKafkaSource) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index 150851cc3e08..b11e21d4bbe3 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -21,6 +21,8 @@ package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.TestHoodieSparkUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
@@ -28,7 +30,6 @@ import org.apache.hudi.exception.MissingSchemaFieldException;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.streamer.HoodieStreamer;
 
-import org.apache.avro.Schema;
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -334,10 +335,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     deltaStreamer.sync();
 
     metaClient.reloadActiveTimeline();
-    Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
+    Option<HoodieSchema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
         dsConfig.targetBasePath, metaClient);
-    assertTrue(latestTableSchemaOpt.get().getField("rider").schema().getTypes()
-        .stream().anyMatch(t -> t.getType().equals(Schema.Type.STRING)));
+    
assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes()
+        .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING));
     
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
 > 0);
   }
 
@@ -409,10 +410,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
       assertTrue(allowNullForDeletedCols || targetSchemaSameAsTableSchema);
 
       metaClient.reloadActiveTimeline();
-      Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
+      Option<HoodieSchema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
           dsConfig.targetBasePath, metaClient);
-      
assertTrue(latestTableSchemaOpt.get().getField("rider").schema().getTypes()
-          .stream().anyMatch(t -> t.getType().equals(Schema.Type.STRING)));
+      
assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes()
+          .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING));
       
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
 > 0);
     } catch (MissingSchemaFieldException e) {
       assertFalse(allowNullForDeletedCols || targetSchemaSameAsTableSchema);
@@ -489,10 +490,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
       assertTrue(allowNullForDeletedCols || targetSchemaSameAsTableSchema);
 
       metaClient.reloadActiveTimeline();
-      Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
+      Option<HoodieSchema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
           dsConfig.targetBasePath, metaClient);
-      
assertTrue(latestTableSchemaOpt.get().getField("rider").schema().getTypes()
-          .stream().anyMatch(t -> t.getType().equals(Schema.Type.STRING)));
+      
assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes()
+          .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING));
       
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
 > 0);
     } catch (Exception e) {
       assertTrue(containsErrorMessage(e, "has no default value and is 
non-nullable",
@@ -569,11 +570,11 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
       assertFalse(targetSchemaSameAsTableSchema);
 
       metaClient.reloadActiveTimeline();
-      Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
+      Option<HoodieSchema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
           dsConfig.targetBasePath, metaClient);
-      
assertTrue(latestTableSchemaOpt.get().getField("distance_in_meters").schema().getTypes()
-              .stream().anyMatch(t -> t.getType().equals(Schema.Type.DOUBLE)),
-          
latestTableSchemaOpt.get().getField("distance_in_meters").schema().toString());
+      
assertTrue(latestTableSchemaOpt.get().getField("distance_in_meters").get().schema().getTypes()
+              .stream().anyMatch(t -> t.getType() == HoodieSchemaType.DOUBLE),
+          
latestTableSchemaOpt.get().getField("distance_in_meters").get().schema().toString());
       
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
 > 0);
     } catch (Exception e) {
       assertTrue(targetSchemaSameAsTableSchema);
@@ -657,10 +658,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     deltaStreamer.sync();
 
     metaClient.reloadActiveTimeline();
-    Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
+    Option<HoodieSchema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jsc, storage,
         dsConfig.targetBasePath, metaClient);
-    
assertTrue(latestTableSchemaOpt.get().getField("current_ts").schema().getTypes()
-        .stream().anyMatch(t -> t.getType().equals(Schema.Type.LONG)));
+    
assertTrue(latestTableSchemaOpt.get().getField("current_ts").get().schema().getTypes()
+        .stream().anyMatch(t -> t.getType() == HoodieSchemaType.LONG));
     
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
 > 0);
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
index d80517affd5e..fa09c8b5eb94 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
@@ -19,7 +19,7 @@
 
 package org.apache.hudi.utilities.deltastreamer;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
@@ -40,7 +40,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.avro.SchemaConverters;
+import org.apache.spark.sql.avro.HoodieSparkSchemaConverters;
 import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
@@ -110,7 +110,7 @@ public class TestSourceFormatAdapter {
     TypedProperties typedProperties = new TypedProperties();
     
typedProperties.put(HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(), 
true);
     
typedProperties.put(HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(),
 "__");
-    setupJsonSource(rdd, 
HoodieSchema.fromAvroSchema(SchemaConverters.toAvroType(sanitizedSchema, false, 
"record", "")));
+    setupJsonSource(rdd, 
HoodieSparkSchemaConverters.toHoodieType(sanitizedSchema, false, "record", ""));
     SourceFormatAdapter sourceFormatAdapter = new 
SourceFormatAdapter(testJsonDataSource, Option.empty(), 
Option.of(typedProperties));
     return sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(new 
StreamerCheckpointV2(DUMMY_CHECKPOINT)), 10L);
   }
@@ -122,8 +122,8 @@ public class TestSourceFormatAdapter {
     assertEquals(2, ds.collectAsList().size());
     assertEquals(sanitizedSchema, ds.schema());
     if (inputBatch.getSchemaProvider() instanceof RowBasedSchemaProvider) {
-      
assertEquals(HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(sanitizedSchema,
-          "hoodie_source", "hoodie.source")), 
inputBatch.getSchemaProvider().getSourceHoodieSchema());
+      
assertEquals(HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(sanitizedSchema,
+          "hoodie_source", "hoodie.source"), 
inputBatch.getSchemaProvider().getSourceHoodieSchema());
     }
     assertEquals(expectedRDD.collect(), ds.toJSON().collectAsList());
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
index cb4bffd7e823..4fe858672635 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
@@ -20,13 +20,13 @@
 package org.apache.hudi.utilities.functional;
 
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 import org.apache.hudi.utilities.exception.HoodieTransformPlanException;
 import org.apache.hudi.utilities.transform.ChainedTransformer;
 import org.apache.hudi.utilities.transform.Transformer;
 
-import org.apache.avro.Schema;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
@@ -43,8 +43,8 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
-import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
-import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.NESTED_AVRO_SCHEMA;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.HOODIE_SCHEMA;
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.NESTED_SCHEMA;
 import static org.apache.spark.sql.types.DataTypes.IntegerType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 import static org.apache.spark.sql.types.DataTypes.createStructField;
@@ -115,7 +115,7 @@ public class TestChainedTransformer extends 
SparkClientFunctionalTestHarness {
   @Test
   public void testChainedTransformerTransformedSchema() {
     String transformerName = 
"org.apache.hudi.utilities.transform.FlatteningTransformer";
-    ChainedTransformer transformer = new 
ChainedTransformer(Arrays.asList(transformerName.split(",")), () -> 
Option.of(NESTED_AVRO_SCHEMA));
+    ChainedTransformer transformer = new 
ChainedTransformer(Arrays.asList(transformerName.split(",")), () -> 
Option.of(NESTED_SCHEMA));
     StructType transformedSchema = transformer.transformedSchema(jsc(), 
spark(), null, new TypedProperties());
     // Verify transformed nested fields are present in the transformed schema
     
assertTrue(Arrays.asList(transformedSchema.fieldNames()).contains("fare_amount"));
@@ -127,11 +127,11 @@ public class TestChainedTransformer extends 
SparkClientFunctionalTestHarness {
   public void assertSchemaSupplierIsCalledPerInvocationOfTransformedSchema() {
     String transformerName = 
"org.apache.hudi.utilities.transform.FlatteningTransformer";
     AtomicInteger count = new AtomicInteger(0);
-    Supplier<Option<Schema>> schemaSupplier = () -> {
+    Supplier<Option<HoodieSchema>> schemaSupplier = () -> {
       if (count.getAndIncrement() == 0) {
-        return Option.of(AVRO_SCHEMA);
+        return Option.of(HOODIE_SCHEMA);
       } else {
-        return Option.of(NESTED_AVRO_SCHEMA);
+        return Option.of(NESTED_SCHEMA);
       }
     };
     ChainedTransformer transformer = new 
ChainedTransformer(Arrays.asList(transformerName.split(",")), schemaSupplier);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
index 8adfdb4dc377..9c31a39abe70 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
@@ -18,13 +18,13 @@
 
 package org.apache.hudi.utilities.sources;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.rdd.RDD;
@@ -53,7 +53,7 @@ public class TestGenericRddTransform extends 
SparkClientFunctionalTestHarness {
     StructType structType = new StructType(new StructField[] {
         new StructField("id", DataTypes.StringType, false, Metadata.empty()),
         new StructField("null_check_col", DataTypes.StringType, false, 
Metadata.empty())});
-    Schema nonNullSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(structType,"record","record");
+    HoodieSchema nonNullSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType,"record","record");
     Tuple2<RDD<GenericRecord>, RDD<String>> failSafeRdds = 
HoodieSparkUtils.safeCreateRDD(ds, "record",
         "record",false, Option.of(nonNullSchema));
     assertEquals(5, failSafeRdds._1.count());
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
index e2a2b53bdd38..d07e2ddc1744 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
@@ -24,8 +24,6 @@ import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaType;
 
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
 import org.apache.spark.sql.types.ArrayType;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.MapType;
@@ -134,29 +132,25 @@ public class SanitizationTestUtils {
   }
 
   public static HoodieSchema generateRenamedSchemaWithDefaultReplacement() {
-    Schema addressSchema = SchemaBuilder.record("__Address").fields()
-        .nullableString("__stree9add__ress", "@@@any_address")
-        .requiredString("cit__y__")
-        .endRecord();
-    Schema personSchema = SchemaBuilder.record("Person").fields()
-        .requiredString("__firstname")
-        .requiredString("__lastname")
-        .name("address").type(addressSchema).noDefault()
-        .endRecord();
-    return HoodieSchema.fromAvroSchema(personSchema);
+    HoodieSchema addressSchema = HoodieSchema.createRecord("__Address", null, 
null,
+        Arrays.asList(
+            HoodieSchemaField.of("__stree9add__ress", 
HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.STRING), 
HoodieSchema.create(HoodieSchemaType.NULL)), null, "@@@any_address"),
+            HoodieSchemaField.of("cit__y__", 
HoodieSchema.create(HoodieSchemaType.STRING))));
+    return HoodieSchema.createRecord("Person", null, null,
+        Arrays.asList(HoodieSchemaField.of("__firstname", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+            HoodieSchemaField.of("__lastname", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+            HoodieSchemaField.of("address", addressSchema)));
   }
 
   public static HoodieSchema generateRenamedSchemaWithConfiguredReplacement() {
-    Schema addressSchema = SchemaBuilder.record("_Address").fields()
-        .nullableString("_stree9add_ress", "@@@any_address")
-        .requiredString("cit_y_")
-        .endRecord();
-    Schema personSchema = SchemaBuilder.record("Person").fields()
-        .requiredString("_firstname")
-        .requiredString("_lastname")
-        .name("address").type(addressSchema).noDefault()
-        .endRecord();
-    return HoodieSchema.fromAvroSchema(personSchema);
+    HoodieSchema addressSchema = HoodieSchema.createRecord("_Address", null, 
null,
+        Arrays.asList(
+            HoodieSchemaField.of("_stree9add_ress", 
HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.STRING), 
HoodieSchema.create(HoodieSchemaType.NULL)), null, "@@@any_address"),
+            HoodieSchemaField.of("cit_y_", 
HoodieSchema.create(HoodieSchemaType.STRING))));
+    return HoodieSchema.createRecord("Person", null, null,
+        Arrays.asList(HoodieSchemaField.of("_firstname", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+            HoodieSchemaField.of("_lastname", 
HoodieSchema.create(HoodieSchemaType.STRING)),
+            HoodieSchemaField.of("address", addressSchema)));
   }
 
   public static Stream<Arguments> provideDataFiles() {

Reply via email to