This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fec154a8db12 feat(schema): Migrate spark schema conversion utils to 
their HoodieSchema equivalent (#17765)
fec154a8db12 is described below

commit fec154a8db1256d3b6a15301adf0f25e4c4a6ff5
Author: Tim Brown <[email protected]>
AuthorDate: Thu Jan 1 19:57:28 2026 -0500

    feat(schema): Migrate spark schema conversion utils to their HoodieSchema 
equivalent (#17765)
    
    * migrate schema conversion to use HoodieSchema util
    
    * delete class
    
    * update spark4, update error handling
    
    * fix integ test
---
 .../MultipleSparkJobExecutionStrategy.java         |   4 +-
 .../client/common/SparkReaderContextFactory.java   |   4 +-
 .../client/utils/SparkMetadataWriterUtils.java     |   4 +-
 .../hudi/client/utils/SparkPartitionUtils.java     |   4 +-
 .../hudi/client/utils/SparkValidatorUtils.java     |   6 +-
 .../hudi/common/model/HoodieSparkRecord.java       |   3 +-
 .../apache/hudi/merge/SparkRecordMergingUtils.java |   6 +-
 .../org/apache/hudi/AvroConversionUtils.scala      | 191 +--------------------
 .../apache/hudi/HoodieSchemaConversionUtils.scala  |  27 ++-
 .../util/OrderingValueEngineTypeConverter.java     |   4 +-
 .../sql/avro/HoodieSparkAvroSchemaConverters.scala |  40 -----
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   5 +-
 .../hudi/integ/testsuite/utils/SparkSqlUtils.scala |   9 +-
 .../apache/hudi/TestDefaultSparkRecordMerger.java  |  12 +-
 .../org/apache/hudi/TestAvroConversionUtils.scala  | 112 +-----------
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala |   2 +-
 .../TestHoodieSparkSqlWriterWithTestFormat.scala   |   2 +-
 .../model/TestHoodieRecordSerialization.scala      |   8 +-
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |   8 +-
 .../hudi/functional/ColumnStatIndexTestBase.scala  |   2 +-
 .../functional/PartitionStatsIndexTestBase.scala   |   5 +-
 .../hudi/functional/TestBasicSchemaEvolution.scala |   6 +-
 .../apache/hudi/functional/TestCOWDataSource.scala |   6 +-
 .../functional/TestColumnStatsIndexWithSQL.scala   |   4 +-
 .../hudi/functional/TestPartitionStatsIndex.scala  |   7 +-
 .../util/TestSparkInternalSchemaConverter.scala    |   4 +-
 .../execution/benchmark/AvroSerDerBenchmark.scala  |   4 +-
 .../spark/sql/adapter/BaseSpark3Adapter.scala      |  10 +-
 .../spark/sql/adapter/BaseSpark4Adapter.scala      |  10 +-
 .../utilities/HoodieMetadataTableValidator.java    |   6 +-
 .../hudi/utilities/schema/HiveSchemaProvider.java  |  15 +-
 .../utilities/schema/RowBasedSchemaProvider.java   |   4 +-
 .../helpers/TestCloudObjectsSelectorCommon.java    |  10 +-
 .../TestMercifulJsonToRowConverterBase.java        |   4 +-
 34 files changed, 121 insertions(+), 427 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index f378ef46a5a3..be6d516a303f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.client.clustering.run.strategy;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
@@ -306,7 +306,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
     // broadcast reader context.
     HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
     ReaderContextFactory<InternalRow> readerContextFactory = 
getEngineContext().getReaderContextFactory(metaClient);
-    StructType sparkSchemaWithMetaFields = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields.toAvroSchema());
+    StructType sparkSchemaWithMetaFields = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(tableSchemaWithMetaFields);
 
     RDD<InternalRow> internalRowRDD = jsc.parallelize(clusteringOps, 
clusteringOps.size()).flatMap(new FlatMapFunction<ClusteringOperation, 
InternalRow>() {
       @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
index a4e88b06915f..4a2f52707121 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.client.common;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.SparkFileFormatInternalRowReaderContext;
@@ -146,7 +146,7 @@ public class SparkReaderContextFactory implements 
ReaderContextFactory<InternalR
                                                           Configuration 
configs,
                                                           SparkAdapter 
sparkAdapter) {
     try {
-      StructType dataSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableAvroSchema());
+      StructType dataSchema = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(resolver.getTableSchema());
       return sparkAdapter.createOrcFileReader(false, sqlConf, options, 
configs, dataSchema);
     } catch (Exception e) {
       throw new HoodieException("Failed to broadcast ORC file reader", e);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index b86fa9612d1e..11da57ce3f02 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -19,7 +19,7 @@
 
 package org.apache.hudi.client.utils;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.SparkRowSerDe;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
@@ -295,7 +295,7 @@ public class SparkMetadataWriterUtils {
             
getExpressionIndexRecordsIterator(readerContextFactory.getContext(), 
metaClient, tableSchema, readerSchema, dataWriteConfig, entry));
 
     // Generate dataset with expression index metadata
-    StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema.toAvroSchema())
+    StructType structType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(readerSchema)
         
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION, 
DataTypes.StringType, false, Metadata.empty()))
         
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
 DataTypes.StringType, false, Metadata.empty()))
         
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE, 
DataTypes.LongType, false, Metadata.empty()));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
index 8cf96101bf1a..d3f7f9908324 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.client.utils;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.Option;
@@ -41,7 +41,7 @@ public class SparkPartitionUtils {
         partitionFields.get(),
         partitionPath,
         new StoragePath(basePath),
-        
AvroConversionUtils.convertAvroSchemaToStructType(writerSchema.toAvroSchema()),
+        
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(writerSchema),
         hadoopConf.get("timeZone", SQLConf.get().sessionLocalTimeZone()),
         hadoopConf.getBoolean("spark.sql.sources.validatePartitionColumns", 
true));
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
index f3de90d587be..76fd622ec7fc 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.client.utils;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.validator.SparkPreCommitValidator;
@@ -137,8 +137,8 @@ public class SparkValidatorUtils {
       try {
         return sqlContext.createDataFrame(
             sqlContext.emptyDataFrame().rdd(),
-            AvroConversionUtils.convertAvroSchemaToStructType(
-                new 
TableSchemaResolver(table.getMetaClient()).getTableAvroSchema()));
+            HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(
+                new 
TableSchemaResolver(table.getMetaClient()).getTableSchema()));
       } catch (Exception e) {
         LOG.warn("Cannot get table schema from before state.", e);
         LOG.warn("Using the schema from after state (current transaction) to 
create the empty Spark dataframe: {}", newStructTypeSchema);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index 6747e1512a38..64d48f2ca8a0 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.model;
 
 import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.SparkFileFormatInternalRecordContext;
 import org.apache.hudi.client.model.HoodieInternalRow;
@@ -323,7 +324,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
     if (data == null) {
       return Option.empty();
     }
-    StructType structType = schema == null ? 
AvroConversionUtils.convertAvroSchemaToStructType(recordSchema) : schema;
+    StructType structType = schema == null ? 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(HoodieSchema.fromAvroSchema(recordSchema))
 : schema;
     GenericRecord convertedRecord = 
AvroConversionUtils.createInternalRowToAvroConverter(structType, recordSchema, 
false).apply(data);
     return Option.of(new HoodieAvroIndexedRecord(key, convertedRecord));
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
index a838e1378d63..29408c131dfa 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
@@ -19,7 +19,7 @@
 
 package org.apache.hudi.merge;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.common.engine.RecordContext;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieSparkRecord;
@@ -218,8 +218,8 @@ public class SparkRecordMergingUtils {
             }
           }
           StructType mergedStructType = new 
StructType(mergedFieldList.toArray(new StructField[0]));
-          HoodieSchema mergedSchema = 
HoodieSchemaCache.intern(HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(
-              mergedStructType, readerSchema.getName(), 
readerSchema.getNamespace().orElse(null))));
+          HoodieSchema mergedSchema = 
HoodieSchemaCache.intern(HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+              mergedStructType, readerSchema.getName(), 
readerSchema.getNamespace().orElse(null)));
           return Pair.of(mergedMapping, Pair.of(mergedStructType, 
mergedSchema));
         });
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 94b06f551a74..26b1dbf584dd 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -20,28 +20,22 @@ package org.apache.hudi
 
 import org.apache.hudi.HoodieSparkUtils.{getCatalystRowSerDe, sparkAdapter}
 import org.apache.hudi.avro.AvroSchemaUtils
-import org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField
 import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.exception.SchemaCompatibilityException
 import org.apache.hudi.internal.schema.HoodieSchemaException
 
-import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema}
-import org.apache.avro.Schema.Type
+import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
-import org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
+import org.apache.spark.sql.types.StructType
 
 import java.util.concurrent.ConcurrentHashMap
 
-import scala.collection.JavaConverters._
-
 object AvroConversionUtils {
   private val ROW_TO_AVRO_CONVERTER_CACHE =
     new ConcurrentHashMap[Tuple3[StructType, Schema, Boolean], 
Function1[InternalRow, GenericRecord]]
-  private val AVRO_SCHEMA_CACHE = new ConcurrentHashMap[Schema, StructType]
 
   /**
    * Creates converter to transform Avro payload into Spark's Catalyst one
@@ -102,10 +96,10 @@ object AvroConversionUtils {
                             structName: String,
                             recordNamespace: String): Row => GenericRecord = {
     val serde = getCatalystRowSerDe(sourceSqlType)
-    val avroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(sourceSqlType, structName, 
recordNamespace)
-    val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(avroSchema) != 
avroSchema
+    val schema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(sourceSqlType, 
structName, recordNamespace)
+    val nullable = schema.isNullable
 
-    val converter = 
AvroConversionUtils.createInternalRowToAvroConverter(sourceSqlType, avroSchema, 
nullable)
+    val converter = 
AvroConversionUtils.createInternalRowToAvroConverter(sourceSqlType, 
schema.toAvroSchema, nullable)
 
     row => converter.apply(serde.serializeRow(row))
   }
@@ -119,77 +113,12 @@ object AvroConversionUtils {
     ss.createDataFrame(rdd.mapPartitions { records =>
       if (records.isEmpty) Iterator.empty
       else {
-        val schema = new Schema.Parser().parse(schemaStr)
-        val dataType = convertAvroSchemaToStructType(schema)
-        val converter = createConverterToRow(schema, dataType)
+        val schema = HoodieSchema.parse(schemaStr)
+        val dataType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
+        val converter = createConverterToRow(schema.toAvroSchema, dataType)
         records.map { r => converter(r) }
       }
-    }, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr)))
-  }
-
-  /**
-   * Converts [[StructType]] into Avro's [[Schema]]
-   *
-   * @param structType    Catalyst's [[StructType]]
-   * @param qualifiedName Avro's schema qualified name
-   * @return Avro schema corresponding to given struct type.
-   */
-  def convertStructTypeToAvroSchema(structType: DataType,
-                                    qualifiedName: String): Schema = {
-    val (namespace, name) = {
-      val parts = qualifiedName.split('.')
-      (parts.init.mkString("."), parts.last)
-    }
-    convertStructTypeToAvroSchema(structType, name, namespace)
-  }
-
-
-  /**
-   * Converts [[StructType]] into Avro's [[Schema]]
-   *
-   * @param structType      Catalyst's [[StructType]]
-   * @param structName      Avro record name
-   * @param recordNamespace Avro record namespace
-   * @return Avro schema corresponding to given struct type.
-   */
-  def convertStructTypeToAvroSchema(structType: DataType,
-                                    structName: String,
-                                    recordNamespace: String): Schema = {
-    try {
-      HoodieSparkAvroSchemaConverters.toAvroType(structType, nullable = false, 
structName, recordNamespace)
-    } catch {
-      case a: AvroRuntimeException => throw new 
HoodieSchemaException(a.getMessage, a)
-      case e: Exception => throw new HoodieSchemaException("Failed to convert 
struct type to avro schema: " + structType, e)
-    }
-  }
-
-  /**
-   * Converts Avro's [[Schema]] to Catalyst's [[StructType]]
-   */
-  def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
-    val loader: java.util.function.Function[Schema, StructType] = key => {
-      try {
-        HoodieSparkAvroSchemaConverters.toSqlType(key) match {
-          case (dataType, _) => dataType.asInstanceOf[StructType]
-        }
-      } catch {
-        case e: Exception => throw new HoodieSchemaException("Failed to 
convert avro schema to struct type: " + avroSchema, e)
-      }
-    }
-    AVRO_SCHEMA_CACHE.computeIfAbsent(avroSchema, loader)
-  }
-
-  /**
-   * Converts Avro's [[Schema]] to Catalyst's [[DataType]]
-   */
-  def convertAvroSchemaToDataType(avroSchema: Schema): DataType = {
-    try {
-      HoodieSparkAvroSchemaConverters.toSqlType(avroSchema) match {
-        case (dataType, _) => dataType
-      }
-    } catch {
-      case e: Exception => throw new HoodieSchemaException("Failed to convert 
avro schema to DataType: " + avroSchema, e)
-    }
+    }, 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(HoodieSchema.parse(schemaStr)))
   }
 
   /**
@@ -201,106 +130,4 @@ object AvroConversionUtils {
     val nameParts = qualifiedName.split('.')
     (nameParts.last, nameParts.init.mkString("."))
   }
-
-  /**
-   * Recursively aligns the nullable property of hoodie table schema, 
supporting nested structures
-   */
-  def alignFieldsNullability(sourceSchema: StructType, avroSchema: Schema): 
StructType = {
-    // Converts Avro fields to a Map for efficient lookup
-    val avroFieldsMap = avroSchema.getFields.asScala.map(f => (f.name, 
f)).toMap
-
-    // Recursively process fields
-    val alignedFields = sourceSchema.fields.map { field =>
-      avroFieldsMap.get(field.name) match {
-        case Some(avroField) =>
-          // Process the nullable property of the current field
-          val alignedField = field.copy(nullable = avroField.schema.isNullable)
-
-          // Recursively handle nested structures
-          field.dataType match {
-            case structType: StructType =>
-              // For struct type, recursively process its internal fields
-              val nestedAvroSchema = unwrapNullableSchema(avroField.schema)
-              if (nestedAvroSchema.getType == Schema.Type.RECORD) {
-                alignedField.copy(dataType = 
alignFieldsNullability(structType, nestedAvroSchema))
-              } else {
-                alignedField
-              }
-
-            case ArrayType(elementType, containsNull) =>
-              // For array type, process element type
-              val arraySchema = unwrapNullableSchema(avroField.schema)
-              if (arraySchema.getType == Schema.Type.ARRAY) {
-                val elemSchema = arraySchema.getElementType
-                val newElementType = updateElementType(elementType, elemSchema)
-                alignedField.copy(dataType = ArrayType(newElementType, 
elemSchema.isNullable))
-              } else {
-                alignedField
-              }
-
-            case MapType(keyType, valueType, valueContainsNull) =>
-              // For Map type, process value type
-              val mapSchema = unwrapNullableSchema(avroField.schema)
-              if (mapSchema.getType == Schema.Type.MAP) {
-                val valueSchema = mapSchema.getValueType
-                val newValueType = updateElementType(valueType, valueSchema)
-                alignedField.copy(dataType = MapType(keyType, newValueType, 
valueSchema.isNullable))
-              } else {
-                alignedField
-              }
-
-            case _ => alignedField // Basic types are returned directly
-          }
-
-        case None => field.copy() // Field not found in Avro schema remains 
unchanged
-      }
-    }
-
-    StructType(alignedFields)
-  }
-
-  /**
-   * Returns the non-null schema if the schema is a UNION type containing NULL
-   */
-  private def unwrapNullableSchema(schema: Schema): Schema = {
-    if (schema.getType == Schema.Type.UNION) {
-      val types = schema.getTypes.asScala
-      val nonNullTypes = types.filter(_.getType != Schema.Type.NULL)
-      if (nonNullTypes.size == 1) nonNullTypes.head else schema
-    } else {
-      schema
-    }
-  }
-
-  /**
-   * Updates the element type, handling nested structures
-   */
-  private def updateElementType(dataType: DataType, avroSchema: Schema): 
DataType = {
-    dataType match {
-      case structType: StructType =>
-        if (avroSchema.getType == Schema.Type.RECORD) {
-          alignFieldsNullability(structType, avroSchema)
-        } else {
-          structType
-        }
-
-      case ArrayType(elemType, containsNull) =>
-        if (avroSchema.getType == Schema.Type.ARRAY) {
-          val elemSchema = avroSchema.getElementType
-          ArrayType(updateElementType(elemType, elemSchema), 
elemSchema.isNullable)
-        } else {
-          dataType
-        }
-
-      case MapType(keyType, valueType, valueContainsNull) =>
-        if (avroSchema.getType == Schema.Type.MAP) {
-          val valueSchema = avroSchema.getValueType
-          MapType(keyType, updateElementType(valueType, valueSchema), 
valueSchema.isNullable)
-        } else {
-          dataType
-        }
-
-      case _ => dataType // Basic types are returned directly
-    }
-  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
index 1af72b9c7b91..86061f1cc414 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
@@ -22,12 +22,16 @@ import org.apache.avro.generic.GenericRecord
 import org.apache.hudi.HoodieSparkUtils.sparkAdapter
 import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType, 
HoodieSchemaUtils}
 import org.apache.hudi.internal.schema.HoodieSchemaException
+
+import org.apache.avro.{AvroRuntimeException, Schema}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.avro.HoodieSparkSchemaConverters
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 
+import java.util.concurrent.ConcurrentHashMap
+
 import scala.collection.JavaConverters._
 
 /**
@@ -37,6 +41,8 @@ import scala.collection.JavaConverters._
  * handling defaults and nullability alignment.
  */
 object HoodieSchemaConversionUtils {
+  private val SCHEMA_CACHE = new ConcurrentHashMap[HoodieSchema, StructType]
+
 
   /**
    * Converts HoodieSchema to Catalyst's StructType.
@@ -46,14 +52,20 @@ object HoodieSchemaConversionUtils {
    * @throws HoodieSchemaException if conversion fails
    */
   def convertHoodieSchemaToStructType(hoodieSchema: HoodieSchema): StructType 
= {
-    try {
-      HoodieSparkSchemaConverters.toSqlType(hoodieSchema) match {
-        case (dataType, _) => dataType.asInstanceOf[StructType]
+    val loader: java.util.function.Function[HoodieSchema, StructType] =
+      new java.util.function.Function[HoodieSchema, StructType]() {
+        override def apply(schema: HoodieSchema): StructType = {
+          try {
+            HoodieSparkSchemaConverters.toSqlType(schema) match {
+              case (dataType, _) => dataType.asInstanceOf[StructType]
+            }
+          } catch {
+            case e: Exception => throw new HoodieSchemaException(
+              s"Failed to convert HoodieSchema to StructType: $schema", e)
+          }
+        }
       }
-    } catch {
-      case e: Exception => throw new HoodieSchemaException(
-        s"Failed to convert HoodieSchema to StructType: $hoodieSchema", e)
-    }
+    SCHEMA_CACHE.computeIfAbsent(hoodieSchema, loader)
   }
 
   /**
@@ -126,6 +138,7 @@ object HoodieSchemaConversionUtils {
     try {
       HoodieSparkSchemaConverters.toHoodieType(structType, nullable, 
structName, recordNamespace)
     } catch {
+      case a: AvroRuntimeException => throw new 
HoodieSchemaException(a.getMessage, a)
       case e: Exception => throw new HoodieSchemaException(
         s"Failed to convert struct type to HoodieSchema: $structType", e)
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/OrderingValueEngineTypeConverter.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/OrderingValueEngineTypeConverter.java
index aebf8740d46b..271f5a1fa4ce 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/OrderingValueEngineTypeConverter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/OrderingValueEngineTypeConverter.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.util;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaField;
@@ -68,7 +68,7 @@ public class OrderingValueEngineTypeConverter {
       if (fieldSchemaOpt.isEmpty()) {
         return Function.<Comparable>identity();
       } else {
-        DataType fieldType = 
AvroConversionUtils.convertAvroSchemaToDataType(fieldSchemaOpt.get().toAvroSchema());
+        DataType fieldType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(fieldSchemaOpt.get());
         return createConverter(fieldType, fieldSchemaOpt.get());
       }
     }).collect(Collectors.toList());
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala
deleted file mode 100644
index a853c1fdecde..000000000000
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.avro
-
-import org.apache.avro.Schema
-import org.apache.spark.sql.avro.SchemaConverters.SchemaType
-import org.apache.spark.sql.types.DataType
-
-/**
- * This interface is simply a facade abstracting away Spark's 
[[SchemaConverters]] implementation, allowing
- * the rest of the code-base to not depend on it directly
- */
-object HoodieSparkAvroSchemaConverters extends HoodieAvroSchemaConverters {
-
-  override def toSqlType(avroSchema: Schema): (DataType, Boolean) =
-    SchemaConverters.toSqlType(avroSchema) match {
-      case SchemaType(dataType, nullable) => (dataType, nullable)
-    }
-
-  override def toAvroType(catalystType: DataType, nullable: Boolean, 
recordName: String, nameSpace: String): Schema =
-    SchemaConverters.toAvroType(catalystType, nullable, recordName, nameSpace)
-
-}
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index af45ebc53444..1afcc78df058 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -23,8 +23,7 @@ import org.apache.hudi.client.model.HoodieInternalRow
 import org.apache.hudi.common.model.FileSlice
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
-import org.apache.hudi.storage.{StorageConfiguration, StoragePath}
-import org.apache.avro.Schema
+import org.apache.hudi.storage.StorageConfiguration
 import org.apache.hadoop.conf.Configuration
 import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.parquet.schema.MessageType
@@ -166,7 +165,7 @@ trait SparkAdapter extends Serializable {
    */
   def createRelation(sqlContext: SQLContext,
                      metaClient: HoodieTableMetaClient,
-                     schema: Schema,
+                     schema: HoodieSchema,
                      parameters: java.util.Map[String, String]): BaseRelation
 
   /**
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala
index e111c9306f40..d4586cac82d5 100644
--- 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala
@@ -19,14 +19,15 @@
 
 package org.apache.hudi.integ.testsuite.utils
 
-import org.apache.hudi.{AvroConversionUtils, HoodieSparkUtils}
+import org.apache.hudi.HoodieSchemaConversionUtils
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.util.Option
 import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
 import 
org.apache.hudi.integ.testsuite.generator.GenericRecordFullPayloadGenerator
 import org.apache.hudi.utilities.schema.RowBasedSchemaProvider
 
-import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.sql.SparkSession
@@ -137,8 +138,8 @@ object SparkSqlUtils {
    * @return an array of field names and types
    */
   def getFieldNamesAndTypes(avroSchemaString: String): Array[(String, String)] 
= {
-    val schema = new Schema.Parser().parse(avroSchemaString)
-    val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+    val schema = HoodieSchema.parse(avroSchemaString)
+    val structType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
     structType.fields.map(field => (field.name, field.dataType.simpleString))
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDefaultSparkRecordMerger.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDefaultSparkRecordMerger.java
index d035f453fe4f..3fc95008cd58 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDefaultSparkRecordMerger.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDefaultSparkRecordMerger.java
@@ -74,8 +74,7 @@ class TestDefaultSparkRecordMerger {
     HoodieKey key = new HoodieKey(ANY_KEY, ANY_PARTITION);
     Row oldValue = getSpecificValue(key, "001", 1L, "file1", 1, "1");
     Row newValue = getSpecificValue(key, "002", 2L, "file2", 2, "2");
-    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(
-        SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE));
+    HoodieSchema schema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(SPARK_SCHEMA, 
ANY_NAME, ANY_NAMESPACE);
     BufferedRecord<InternalRow> oldRecord = 
BufferedRecords.fromEngineRecord(InternalRow.apply(oldValue.toSeq()), ANY_KEY, 
schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null);
     BufferedRecord<InternalRow> newRecord = 
BufferedRecords.fromEngineRecord(InternalRow.apply(newValue.toSeq()), ANY_KEY, 
schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null);
 
@@ -100,8 +99,7 @@ class TestDefaultSparkRecordMerger {
     HoodieKey key = new HoodieKey(ANY_KEY, ANY_PARTITION);
     Row oldValue = getSpecificValue(key, "001", 1L, "file1", 3, "1");
     Row newValue = getSpecificValue(key, "002", 2L, "file2", 2, "2");
-    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(
-        SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE));
+    HoodieSchema schema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(SPARK_SCHEMA, 
ANY_NAME, ANY_NAMESPACE);
     BufferedRecord<InternalRow> oldRecord = 
BufferedRecords.fromEngineRecord(InternalRow.apply(oldValue.toSeq()), ANY_KEY, 
schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null);
     BufferedRecord<InternalRow> newRecord = 
BufferedRecords.fromEngineRecord(InternalRow.apply(newValue.toSeq()), ANY_KEY, 
schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null);
 
@@ -123,8 +121,7 @@ class TestDefaultSparkRecordMerger {
   void testMergerWithNewRecordAsDelete() throws IOException {
     HoodieKey key = new HoodieKey(ANY_KEY, ANY_PARTITION);
     Row oldValue = getSpecificValue(key, "001", 1L, "file1", 1, "1");
-    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(
-        SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE));
+    HoodieSchema schema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(SPARK_SCHEMA, 
ANY_NAME, ANY_NAMESPACE);
     BufferedRecord<InternalRow> oldRecord = 
BufferedRecords.fromEngineRecord(InternalRow.apply(oldValue.toSeq()), ANY_KEY, 
schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null);
     BufferedRecord<InternalRow> newRecord = 
BufferedRecords.createDelete(key.getRecordKey(), OrderingValues.getDefault());
 
@@ -143,8 +140,7 @@ class TestDefaultSparkRecordMerger {
   void testMergerWithOldRecordAsDelete() throws IOException {
     HoodieKey key = new HoodieKey(ANY_KEY, ANY_PARTITION);
     Row newValue = getSpecificValue(key, "001", 1L, "file1", 1, "1");
-    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(
-        SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE));
+    HoodieSchema schema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(SPARK_SCHEMA, 
ANY_NAME, ANY_NAMESPACE);
     BufferedRecord<InternalRow> oldRecord = 
BufferedRecords.createDelete(key.getRecordKey(), OrderingValues.getDefault());
     BufferedRecord<InternalRow> newRecord = 
BufferedRecords.fromEngineRecord(InternalRow.apply(newValue.toSeq()), ANY_KEY, 
schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null);
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
index a717bca8110b..b8a26c04fa10 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
@@ -176,12 +176,12 @@ class TestAvroConversionUtils extends FunSuite with 
Matchers {
       .add("nullableMap", mapType, true).add("map", mapType, false)
       .add("nullableArray", arrayType, true).add("array", arrayType, false)
 
-    val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct, 
"SchemaName", "SchemaNS")
+    val schema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(struct, 
"SchemaName", "SchemaNS")
 
     val expectedSchemaStr = complexSchemaStr
-    val expectedAvroSchema = HoodieSchema.parse(expectedSchemaStr).toAvroSchema
+    val expectedSchema = HoodieSchema.parse(expectedSchemaStr)
 
-    assert(avroSchema.equals(expectedAvroSchema))
+    assert(schema.equals(expectedSchema))
   }
 
   test("test convertStructTypeToAvroSchema with Nested StructField comment") {
@@ -194,7 +194,7 @@ class TestAvroConversionUtils extends FunSuite with 
Matchers {
       .add("nullableMap", mapType, true).add("map",mapType,false)
       .add("nullableArray", arrayType, true).add("array",arrayType,false)
 
-    val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct, 
"SchemaName", "SchemaNS")
+    val schema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(struct, 
"SchemaName", "SchemaNS")
 
     val expectedSchemaStr = s"""
         {
@@ -395,9 +395,9 @@ class TestAvroConversionUtils extends FunSuite with 
Matchers {
         }
     """
 
-    val expectedAvroSchema = HoodieSchema.parse(expectedSchemaStr).toAvroSchema
+    val expectedSchema = HoodieSchema.parse(expectedSchemaStr)
 
-    assert(avroSchema.equals(expectedAvroSchema))
+    assert(schema.equals(expectedSchema))
   }
 
   test("test converter with binary") {
@@ -456,105 +456,7 @@ class TestAvroConversionUtils extends FunSuite with 
Matchers {
       .add("name", DataTypes.StringType, true)
       .add("name", DataTypes.StringType, true)
     the[HoodieSchemaException] thrownBy {
-      AvroConversionUtils.convertStructTypeToAvroSchema(struct, "SchemaName", 
"SchemaNS")
+      HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(struct, 
"SchemaName", "SchemaNS")
     } should have message "Duplicate field name in record SchemaNS.SchemaName: 
name type:UNION pos:2 and name type:UNION pos:1."
   }
-
-  test("test alignFieldsNullability function") {
-    val sourceTableSchema: StructType =
-      StructType(
-        Seq(
-          StructField("intType", IntegerType, nullable = false),
-          StructField("longType", LongType),
-          StructField("stringType", StringType, nullable = false),
-          StructField("doubleType", DoubleType),
-          StructField("floatType", FloatType, nullable = true),
-          StructField("structType", new StructType(
-            Array(StructField("structType_1", StringType), 
StructField("structType_2", StringType)))),
-          StructField("dateType", DateType),
-          StructField("listType", new ArrayType(StringType, true)),
-          StructField("decimalType", new DecimalType(7, 3), nullable = false),
-          StructField("timeStampType", TimestampType),
-          StructField("mapType", new MapType(StringType, IntegerType, true))
-        )
-      )
-
-    val writeStructSchema: StructType =
-      StructType(
-        Seq(
-          StructField("intType", IntegerType, nullable = false),
-          StructField("longType", LongType),
-          StructField("stringType", StringType, nullable = true),
-          StructField("doubleType", DoubleType),
-          StructField("floatType", FloatType, nullable = false),
-          StructField("structType", new StructType(
-            Array(StructField("structType_1", StringType, nullable = false), 
StructField("structType_2", StringType)))),
-          StructField("dateType", DateType, nullable = false),
-          StructField("listType", new ArrayType(StringType, true)),
-          StructField("decimalType", new DecimalType(7, 3)),
-          StructField("timeStampType", TimestampType),
-          StructField("mapType", new MapType(StringType, IntegerType, true)),
-          StructField("notInTableSchemaTimeStampType_1", TimestampType),
-          StructField("notInTableSchemaIntType", IntegerType, nullable = 
false),
-          StructField("notInTableSchemaMapType", new MapType(StringType, 
IntegerType, true))
-        )
-      )
-    val tableAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(sourceTableSchema, "data")
-
-    val alignedSchema = 
AvroConversionUtils.alignFieldsNullability(writeStructSchema, tableAvroSchema)
-
-    val nameToNullableSourceSchema = sourceTableSchema.fields.map(item => 
(item.name, item.nullable)).toMap
-
-    val nameToNullableWriteSchema = writeStructSchema.fields.map(item => 
(item.name, item.nullable)).toMap
-
-    // Validate alignment rules:
-    // 1. For fields existing in both schemas: use source table's nullability
-    // 2. For fields only in write schema: retain original nullability
-    for (field <- alignedSchema.fields) {
-      if (nameToNullableSourceSchema.contains(field.name) && 
nameToNullableWriteSchema.contains(field.name)) {
-        assertTrue(field.nullable == nameToNullableSourceSchema(field.name))
-      }
-      if (!nameToNullableSourceSchema.contains(field.name) && 
nameToNullableWriteSchema.contains(field.name)) {
-        assertTrue(field.nullable == nameToNullableWriteSchema(field.name))
-      }
-    }
-
-    for (field <- alignedSchema.fields) {
-      if (field.name.equals("intType")) {
-        // Common field: both schemas specify nullable=false → aligned 
nullable=false
-        assertFalse(field.nullable)
-      }
-      if (field.name.equals("longType")) {
-        // Common field: both schemas default to nullable=true → aligned 
nullable=true
-        assertTrue(field.nullable)
-      }
-      if (field.name.equals("stringType")) {
-        // Conflicting case:
-        // Write schema (nullable=true) overridden by table schema 
(nullable=false) → aligned nullable=false
-        assertFalse(field.nullable)
-      }
-
-      if (field.name.equals("structType")) {
-        val fields = field.dataType.asInstanceOf[StructType].fields
-        assertTrue(fields.apply(0).nullable)
-        assertTrue(fields.apply(1).nullable)
-      }
-
-      if (field.name.equals("dateType")) {
-        // Conflicting case:
-        // Write schema specifies nullable=false but table schema defaults to 
true → aligned nullable=true
-        assertTrue(field.nullable)
-      }
-
-      if (field.name.equals("notInTableSchemaIntType")) {
-        // Write-exclusive field: retains original nullability=false
-        assertFalse(field.nullable)
-      }
-
-      if (field.name.equals("notInTableSchemaMapType")) {
-        // Write-exclusive field: retains original nullability=true
-        assertTrue(field.nullable)
-      }
-    }
-  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index dd3ea6b45df5..561e829dbe35 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -507,7 +507,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
     // generate the inserts
     val schema = DataSourceTestUtils.getStructTypeExampleSchema
     val structType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
-    val modifiedSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(structType, "trip", 
"example.schema")
+    val modifiedSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, "trip", 
"example.schema")
     val records = DataSourceTestUtils.generateRandomRows(100)
     val recordsSeq = convertRowListToSeq(records)
     val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
index f81209820e5c..2cf0dc20ad08 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
@@ -378,7 +378,7 @@ class TestHoodieSparkSqlWriterWithTestFormat extends 
HoodieSparkWriterTestBase {
     // generate the inserts
     val schema = DataSourceTestUtils.getStructTypeExampleSchema
     val structType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
-    val modifiedSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(structType, "trip", 
"example.schema")
+    val modifiedSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, "trip", 
"example.schema")
     val records = DataSourceTestUtils.generateRandomRows(100)
     val recordsSeq = convertRowListToSeq(records)
     val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
index 44f6b03f2748..562f484956f6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.common.model
 
-import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport, SparkRowSerDe}
-import org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema, 
createInternalRowToAvroConverter}
+import org.apache.hudi.{HoodieSchemaConversionUtils, HoodieSparkUtils, 
SparkAdapterSupport, SparkRowSerDe}
+import org.apache.hudi.AvroConversionUtils.createInternalRowToAvroConverter
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import 
org.apache.hudi.common.model.TestHoodieRecordSerialization.{cloneUsingKryo, 
convertToAvroRecord, toUnsafeRow, OverwriteWithLatestAvroPayloadWithEquality}
 import org.apache.hudi.common.schema.HoodieSchema
@@ -183,9 +183,9 @@ object TestHoodieRecordSerialization {
   }
 
   private def convertToAvroRecord(row: Row): GenericRecord = {
-    val schema = convertStructTypeToAvroSchema(row.schema, "testRecord", 
"testNamespace")
+    val schema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(row.schema, 
"testRecord", "testNamespace")
 
-    createInternalRowToAvroConverter(row.schema, schema, nullable = false)
+    createInternalRowToAvroConverter(row.schema, schema.toAvroSchema, nullable 
= false)
       .apply(toUnsafeRow(row, row.schema))
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 74c547d991b6..9a527b693c6e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -19,7 +19,7 @@
 
 package org.apache.hudi.common.table.read
 
-import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
HoodieSchemaConversionUtils, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
 import org.apache.hudi.DataSourceWriteOptions.{OPERATION, RECORDKEY_FIELD, 
TABLE_TYPE}
 import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode, 
TypedProperties}
 import org.apache.hudi.common.engine.HoodieReaderContext
@@ -41,7 +41,7 @@ import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.{HoodieSparkKryoRegistrar, SparkConf}
 import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils, Row, SaveMode, 
SparkSession}
-import org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters
+import org.apache.spark.sql.avro.HoodieSparkSchemaConverters
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
 import org.apache.spark.sql.execution.datasources.SparkColumnarFileReader
@@ -108,7 +108,7 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
 
   override def getHoodieReaderContext(tablePath: String, schema: HoodieSchema, 
storageConf: StorageConfiguration[_], metaClient: HoodieTableMetaClient): 
HoodieReaderContext[InternalRow] = {
     val parquetReader = sparkAdapter.createParquetFileReader(vectorized = 
false, spark.sessionState.conf, Map.empty, 
storageConf.unwrapAs(classOf[Configuration]))
-    val dataSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(schema.toAvroSchema)
+    val dataSchema = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
     val orcReader = sparkAdapter.createOrcFileReader(vectorized = false, 
spark.sessionState.conf, Map.empty, 
storageConf.unwrapAs(classOf[Configuration]), dataSchema)
     val multiFormatReader = new 
MultipleColumnarFileFormatReader(parquetReader, orcReader)
     new SparkFileFormatInternalRowReaderContext(multiFormatReader, Seq.empty, 
Seq.empty, getStorageConf, metaClient.getTableConfig)
@@ -138,7 +138,7 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
 
   override def assertRecordsEqual(schema: HoodieSchema, expected: InternalRow, 
actual: InternalRow): Unit = {
     assertEquals(expected.numFields, actual.numFields)
-    val expectedStruct = 
HoodieSparkAvroSchemaConverters.toSqlType(schema.toAvroSchema)._1.asInstanceOf[StructType]
+    val expectedStruct = 
HoodieSparkSchemaConverters.toSqlType(schema)._1.asInstanceOf[StructType]
 
     
expected.toSeq(expectedStruct).zip(actual.toSeq(expectedStruct)).zipWithIndex.foreach
 {
       case ((v1, v2), i) =>
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
index 2f18a202639c..50686448b408 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
@@ -374,7 +374,7 @@ class ColumnStatIndexTestBase extends 
HoodieSparkClientTestBase {
       assertEquals(asJson(sort(pExpectedColStatsIndexTableDf.drop(colsToDrop: 
_*), pValidationSortColumns)),
         asJson(sort(pTransposedColStatsDF.drop(colsToDrop: _*), 
pValidationSortColumns)))
 
-      val convertedSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(AvroConversionUtils.convertStructTypeToAvroSchema(pExpectedColStatsSchema,
 "col_stats_schema"))
+      val convertedSchema = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(pExpectedColStatsSchema,
 "col_stats_schema"))
 
       if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) {
         val manualColStatsTableDF =
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
index 1ce99067f9c4..81fd89ec6d41 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
@@ -19,9 +19,8 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.{AvroConversionUtils, HoodieSchemaConversionUtils, 
PartitionStatsIndexSupport}
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.PartitionStatsIndexSupport
 import org.apache.hudi.TestHoodieSparkUtils.dropMetaFields
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.schema.HoodieSchema
@@ -114,7 +113,7 @@ class PartitionStatsIndexTestBase extends 
HoodieStatsIndexTestBase {
     val partitionStatsIndex = new PartitionStatsIndexSupport(
       spark,
       inputDf.schema,
-      
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(inputDf.schema,
 "record", "")),
+      
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(inputDf.schema, 
"record", ""),
       HoodieMetadataConfig.newBuilder().enable(true).build(),
       metaClient)
     val partitionStats = 
partitionStatsIndex.loadColumnStatsIndexRecords(List("partition", "trip_type"), 
shouldReadInMemory = true).collectAsList()
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
index 26fa1c722344..34963c7b4e22 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
@@ -17,9 +17,9 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, 
ScalaAssertionSupport, SparkAdapterSupport}
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSchemaConversionUtils, 
ScalaAssertionSupport, SparkAdapterSupport}
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
-import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode}
+import org.apache.hudi.common.config.RecordMergeMode
 import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
 import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver}
 import org.apache.hudi.common.util.Option
@@ -125,7 +125,7 @@ class TestBasicSchemaEvolution extends 
HoodieSparkClientTestBase with ScalaAsser
       tableMetaClient.reloadActiveTimeline()
 
       val resolver = new TableSchemaResolver(tableMetaClient)
-      val latestTableSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableAvroSchema(false))
+      val latestTableSchema = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(resolver.getTableSchema(false))
 
       val df =
         spark.read.format("org.apache.hudi")
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index a49591f429b9..6473bb3de343 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -17,7 +17,7 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils, 
QuickstartUtils, ScalaAssertionSupport}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSchemaConversionUtils, 
HoodieSparkUtils, QuickstartUtils, ScalaAssertionSupport}
 import org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE, 
KEYGENERATOR_CLASS_NAME}
 import org.apache.hudi.HoodieConversionUtils.toJavaOption
 import org.apache.hudi.QuickstartUtils.{convertToStringList, 
getQuickstartWriteConfigs}
@@ -696,12 +696,12 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
     val tableMetaClient = createMetaClient(spark, basePath)
     assertFalse(tableMetaClient.getArchivedTimeline.empty())
 
-    val actualSchema = new 
TableSchemaResolver(tableMetaClient).getTableAvroSchema(false)
+    val actualSchema = new 
TableSchemaResolver(tableMetaClient).getTableSchema(false)
     val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(CommonOptionUtils.commonOpts(HoodieWriteConfig.TBL_NAME.key))
     spark.sparkContext.getConf.registerKryoClasses(
       Array(classOf[org.apache.avro.generic.GenericData],
         classOf[org.apache.avro.Schema]))
-    val schema = AvroConversionUtils.convertStructTypeToAvroSchema(structType, 
structName, nameSpace)
+    val schema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, 
structName, nameSpace)
     assertTrue(actualSchema != null)
     assertEquals(schema, actualSchema)
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
index e5ff78e4488d..12712b4db617 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport, 
DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex}
+import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport, 
DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex, 
HoodieSchemaConversionUtils}
 import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL, 
RECORDKEY_FIELD}
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
@@ -485,7 +485,7 @@ class TestColumnStatsIndexWithSQL extends 
ColumnStatIndexTestBase {
 
     var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + 
("path" -> basePath), includeLogFiles = true)
     val metadataConfig = 
HoodieMetadataConfig.newBuilder.withMetadataIndexColumnStats(true).enable(true).build
-    val hoodieSchema = 
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(fileIndex.schema,
 "record", ""))
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(fileIndex.schema, 
"record", "")
     val cis = new ColumnStatsIndexSupport(spark, fileIndex.schema, 
hoodieSchema,  metadataConfig, metaClient)
     // unpartitioned table - get all file slices
     val fileSlices = fileIndex.prunePartitionsAndGetFileSlices(Seq.empty, 
Seq())
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index ec8ba1d8e54d..85f20ad3050f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -19,7 +19,7 @@
 
 package org.apache.hudi.functional
 
-import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport, 
DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex, 
PartitionStatsIndexSupport}
+import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceReadOptions, 
DataSourceWriteOptions, HoodieFileIndex, HoodieSchemaConversionUtils, 
PartitionStatsIndexSupport}
 import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL, 
MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, UPSERT_OPERATION_OPT_VAL}
 import org.apache.hudi.avro.model.HoodieCleanMetadata
 import org.apache.hudi.client.SparkRDDWriteClient
@@ -28,7 +28,6 @@ import 
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictReso
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, 
HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode, 
WriteOperationType}
-import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.timeline.HoodieInstant
 import 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadataLegacy
@@ -465,7 +464,7 @@ class TestPartitionStatsIndex extends 
PartitionStatsIndexTestBase {
     val partitionStatsIndex = new PartitionStatsIndexSupport(
       spark,
       latestDf.schema,
-      
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(latestDf.schema,
 "record", "")),
+      
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(latestDf.schema, 
"record", ""),
       HoodieMetadataConfig.newBuilder()
         .enable(true)
         .build(),
@@ -476,7 +475,7 @@ class TestPartitionStatsIndex extends 
PartitionStatsIndexTestBase {
       .collectAsList()
     assertTrue(partitionStats.size() > 0)
     // Assert column stats after restore.
-    val hoodieSchema = 
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(latestDf.schema,
 "record", ""))
+    val hoodieSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(latestDf.schema, 
"record", "")
     val columnStatsIndex = new ColumnStatsIndexSupport(
       spark, latestDf.schema,
       hoodieSchema,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestSparkInternalSchemaConverter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestSparkInternalSchemaConverter.scala
index 9dbe116dc27b..5c8ea92be008 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestSparkInternalSchemaConverter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestSparkInternalSchemaConverter.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.internal.schema.convert.TestInternalSchemaConverter._
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness
 
-import org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters
+import org.apache.spark.sql.avro.HoodieSparkSchemaConverters
 import org.apache.spark.sql.types._
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.Test
@@ -33,7 +33,7 @@ import org.junit.jupiter.api.Test
 class TestSparkInternalSchemaConverter extends HoodieSparkClientTestHarness 
with SparkAdapterSupport {
 
   private def getStructType(schema: HoodieSchema): DataType = {
-    HoodieSparkAvroSchemaConverters.toSqlType(schema.toAvroSchema)._1
+    HoodieSparkSchemaConverters.toSqlType(schema)._1
   }
 
   @Test
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
index dfd97349a8ba..4a488ca1a0eb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
@@ -80,8 +80,8 @@ object AvroSerDerBenchmark extends HoodieBenchmarkBase {
     spark.sparkContext.getConf.registerAvroSchemas(schema.toAvroSchema)
     benchmark.addCase("deserialize avro Record to internalRow") { _ =>
       testRdd.mapPartitions { iter =>
-        val schema = 
AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, "record", "my")
-        val avroToRowConverter = 
AvroConversionUtils.createAvroToInternalRowConverter(schema, sparkSchema)
+        val schema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(sparkSchema, 
"record", "my")
+        val avroToRowConverter = 
AvroConversionUtils.createAvroToInternalRowConverter(schema.toAvroSchema, 
sparkSchema)
         iter.map(record => avroToRowConverter.apply(record).get)
       }.foreach(f => f)
     }
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index dbce843fa936..4f2bbb4b5b75 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -17,22 +17,20 @@
 
 package org.apache.spark.sql.adapter
 
-import org.apache.hudi.{AvroConversionUtils, DefaultSource, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
Spark3HoodiePartitionCDCFileGroupMapping, Spark3HoodiePartitionFileSliceMapping}
+import org.apache.hudi.{DefaultSource, HoodiePartitionCDCFileGroupMapping, 
HoodiePartitionFileSliceMapping, HoodieSchemaConversionUtils, 
Spark3HoodiePartitionCDCFileGroupMapping, Spark3HoodiePartitionFileSliceMapping}
 import org.apache.hudi.client.model.{HoodieInternalRow, 
Spark3HoodieInternalRow}
 import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
 import org.apache.hudi.common.util.JsonUtils
 import org.apache.hudi.spark.internal.ReflectUtil
-import org.apache.hudi.storage.StoragePath
 
-import org.apache.avro.Schema
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, 
DataFrameUtil, Dataset, HoodieUnsafeUtils, HoodieUTF8StringFactory, 
Spark3DataFrameUtil, Spark3HoodieUnsafeUtils, Spark3HoodieUTF8StringFactory, 
SparkSession, SQLContext}
 import 
org.apache.spark.sql.FileFormatUtilsForFileGroupReader.applyFiltersToPlan
-import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, 
HoodieSparkAvroSchemaConverters, HoodieSparkSchemaConverters}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -101,9 +99,9 @@ abstract class BaseSpark3Adapter extends SparkAdapter with 
Logging {
 
   override def createRelation(sqlContext: SQLContext,
                               metaClient: HoodieTableMetaClient,
-                              schema: Schema,
+                              schema: HoodieSchema,
                               parameters: java.util.Map[String, String]): 
BaseRelation = {
-    val dataSchema = 
Option(schema).map(AvroConversionUtils.convertAvroSchemaToStructType).orNull
+    val dataSchema = 
Option(schema).map(HoodieSchemaConversionUtils.convertHoodieSchemaToStructType).orNull
     DefaultSource.createRelation(sqlContext, metaClient, dataSchema, 
parameters.asScala.toMap)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
index 1d0391aa219c..58ed3eb5b88c 100644
--- 
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
@@ -17,24 +17,22 @@
 
 package org.apache.spark.sql.adapter
 
-import org.apache.hudi.{AvroConversionUtils, DefaultSource, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
Spark4HoodiePartitionCDCFileGroupMapping, Spark4HoodiePartitionFileSliceMapping}
+import org.apache.hudi.{DefaultSource, HoodiePartitionCDCFileGroupMapping, 
HoodiePartitionFileSliceMapping, HoodieSchemaConversionUtils, 
Spark4HoodiePartitionCDCFileGroupMapping, Spark4HoodiePartitionFileSliceMapping}
 import org.apache.hudi.client.model.{HoodieInternalRow, 
Spark4HoodieInternalRow}
 import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
 import org.apache.hudi.common.util.JsonUtils
 import org.apache.hudi.spark.internal.ReflectUtil
 import org.apache.hudi.storage.StorageConfiguration
-import org.apache.hudi.storage.StoragePath
 
-import org.apache.avro.Schema
 import org.apache.parquet.schema.MessageType
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Column, DataFrame, 
DataFrameUtil, ExpressionColumnNodeWrapper, HoodieUnsafeUtils, 
HoodieUTF8StringFactory, Spark4DataFrameUtil, Spark4HoodieUnsafeUtils, 
Spark4HoodieUTF8StringFactory, SparkSession, SQLContext}
 import 
org.apache.spark.sql.FileFormatUtilsForFileGroupReader.applyFiltersToPlan
-import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, 
HoodieSparkAvroSchemaConverters, HoodieSparkSchemaConverters}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -106,9 +104,9 @@ abstract class BaseSpark4Adapter extends SparkAdapter with 
Logging {
 
   override def createRelation(sqlContext: SQLContext,
                               metaClient: HoodieTableMetaClient,
-                              schema: Schema,
+                              schema: HoodieSchema,
                               parameters: java.util.Map[String, String]): 
BaseRelation = {
-    val dataSchema = 
Option(schema).map(AvroConversionUtils.convertAvroSchemaToStructType).orNull
+    val dataSchema = 
Option(schema).map(HoodieSchemaConversionUtils.convertHoodieSchemaToStructType).orNull
     DefaultSource.createRelation(sqlContext, metaClient, dataSchema, 
parameters.asScala.toMap)
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index b17116374bff..02fc43dd37fc 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.utilities;
 
-import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.PartitionStatsIndexSupport;
 import org.apache.hudi.async.HoodieAsyncService;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -67,7 +67,6 @@ import 
org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.FileFormatUtils;
-import org.apache.hudi.io.util.FileIOUtils;
 import org.apache.hudi.common.util.HoodieDataUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
@@ -83,6 +82,7 @@ import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.util.FileIOUtils;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieIndexVersion;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
@@ -1032,7 +1032,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 
 
     PartitionStatsIndexSupport partitionStatsIndexSupport = new 
PartitionStatsIndexSupport(engineContext.getSqlContext().sparkSession(),
-        
AvroConversionUtils.convertAvroSchemaToStructType(metadataTableBasedContext.getSchema().toAvroSchema()),
+        
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(metadataTableBasedContext.getSchema()),
         metadataTableBasedContext.getSchema(),
         metadataTableBasedContext.getMetadataConfig(),
         metaClientOpt.get(), false);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
index 6c7b5c1b810a..6c58fb2739dc 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
@@ -19,8 +19,9 @@
 
 package org.apache.hudi.utilities.schema;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.utilities.config.HiveSchemaProviderConfig;
 import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
 
@@ -43,8 +44,8 @@ import static 
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
  */
 public class HiveSchemaProvider extends SchemaProvider {
 
-  private final Schema sourceSchema;
-  private Schema targetSchema;
+  private final HoodieSchema sourceSchema;
+  private HoodieSchema targetSchema;
 
   public HiveSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
@@ -57,7 +58,7 @@ public class HiveSchemaProvider extends SchemaProvider {
     try {
       TableIdentifier sourceSchemaTable = new 
TableIdentifier(sourceSchemaTableName, 
scala.Option.apply(sourceSchemaDatabaseName));
       StructType sourceSchema = 
spark.sessionState().catalog().getTableMetadata(sourceSchemaTable).schema();
-      this.sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(
+      this.sourceSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
           sourceSchema,
           sourceSchemaTableName,
           "hoodie." + sourceSchemaDatabaseName);
@@ -72,7 +73,7 @@ public class HiveSchemaProvider extends SchemaProvider {
       try {
         TableIdentifier targetSchemaTable = new 
TableIdentifier(targetSchemaTableName, 
scala.Option.apply(targetSchemaDatabaseName));
         StructType targetSchema = 
spark.sessionState().catalog().getTableMetadata(targetSchemaTable).schema();
-        this.targetSchema = AvroConversionUtils.convertStructTypeToAvroSchema(
+        this.targetSchema = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
             targetSchema,
             targetSchemaTableName,
             "hoodie." + targetSchemaDatabaseName);
@@ -84,13 +85,13 @@ public class HiveSchemaProvider extends SchemaProvider {
 
   @Override
   public Schema getSourceSchema() {
-    return sourceSchema;
+    return sourceSchema.toAvroSchema();
   }
 
   @Override
   public Schema getTargetSchema() {
     if (targetSchema != null) {
-      return targetSchema;
+      return targetSchema.toAvroSchema();
     } else {
       return super.getTargetSchema();
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
index 29504c01c006..b783ba32c4d9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.utilities.schema;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.common.config.TypedProperties;
 
 import org.apache.avro.Schema;
@@ -44,6 +44,6 @@ public class RowBasedSchemaProvider extends SchemaProvider {
 
   @Override
   public Schema getSourceSchema() {
-    return AvroConversionUtils.convertStructTypeToAvroSchema(rowStruct, 
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE);
+    return 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(rowStruct, 
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toAvroSchema();
   }
 }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
index 2704dc132a2e..d25f3082d082 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
@@ -18,13 +18,13 @@
 
 package org.apache.hudi.utilities.sources.helpers;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 
-import org.apache.avro.Schema;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
@@ -34,7 +34,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
@@ -194,8 +194,8 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
     List<Row> expected = Arrays.asList(person1, person2, person3);
     List<Row> actual = result.get().collectAsList();
     Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-    Schema schema = new Schema.Parser().parse(new File(schemaFilePath));
-    StructType expectedSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(schema);
+    HoodieSchema schema = HoodieSchema.parse(new 
FileInputStream(schemaFilePath));
+    StructType expectedSchema = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema);
     // assert final output schema matches with the source schema
     Assertions.assertEquals(expectedSchema, result.get().schema(), "output 
dataset schema should match source schema");
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java
index 36e53bfafc46..eb195de5a8c9 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.utilities.sources.helpers;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.avro.MercifulJsonConverterTestBase;
 import org.apache.hudi.common.schema.HoodieSchema;
@@ -686,7 +686,7 @@ public abstract class TestMercifulJsonToRowConverterBase 
extends MercifulJsonCon
   }
 
   private void validateSchemaCompatibility(List<Row> rows, HoodieSchema 
schema) {
-    StructType rowSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(schema.toAvroSchema());
+    StructType rowSchema = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema);
     Dataset<Row> dataset = spark.createDataFrame(rows, rowSchema);
     assertDoesNotThrow(dataset::collect, "Schema validation and dataset 
creation should not throw any exceptions.");
   }

Reply via email to